需求
- 根据RFC959
- 高并发(C3K~C4K)
架构
在主线程使用epoll,监听listen在21端口的fd,和所有用户的命令链接的fd(以下均以cmdFd简称之)
一旦某个fd就绪,就将其封装成任务对象提交给线程池去执行
每个用户关联一个Session对象,根据RFC的要求,可以实现为,任意时刻只有小于等于一个线程在handle这个session对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19$ tree src/main
src/main
├── config
│ └── config.hpp
├── core
│ ├── FTP.hpp
│ ├── Login.hpp
│ ├── NetworkManager.hpp
│ └── Session.hpp
├── main.cpp
├── tools
│ ├── FileSystem.hpp
│ ├── ListFiles.hpp
│ └── ThreadPool.hpp
└── util
├── Def.hpp
├── NetUtility.hpp
├── ThreadUtility.hpp
└── Utility.hpp
线程安全的线程池类的设计
- 见笔者的另一篇博文Construct a Thread-Safe ThreadPool
Utility
对于POSIX的 read、write等IO函数、accept等网络函数、epoll、pthread等函数创建wrap函数,用于处理各种错误,使得业务方可以简单的使用
要根据manual,对所有可能出现的errno进行处理。我的处理方法是,对于可以明确知道不应该出现的errno,一旦出现,就调用bug函数;对于EINTR,根据需求进行再次尝试或返回;对于不明确是否是合法情况的错误,输出一个warning,然后不改变errno,返回给用户;尽量使得这些wrap函数不需要返回过于复杂的信息,能返回void尽量返回void,能返回bool就不要返回int
ReadWithBuf
函数:学的是CSAPP的方法,让用户每次调用都传入一个相同的ReadBuf
对象,在这个对象里存预读取的数据,这样子就可以每次读都读1024B(ReadBuf里面的buf的大小是1024B),从而减少read这个syscall调用的次数,可以提高效率
FTP类
Session在多线程下的线程安全的保证
FTP类需要保证很重要的一点:任意一个session在任意一个时刻,只有小于等于1个线程正在handle该Session
在代码里我是这样保证的
对于用户的cmdFd,epoll等待的事件不仅仅是
EPOLLIN
,还需要EPOLLONESHOT
。EPOLLONESHOT
的含义是,一旦一个Fd被从epoll_wait
返回,那么其就不再从epoll_wait
返回,即使有事件发生。直到用户对这个fd使用EPOLL_CTL_MOD
调用epoll_ctl
指示等待的事件EPOLLONESHOT (since Linux 2.6.2) Sets the one-shot behavior for the associated file descriptor. This means that after an event is pulled out with epoll_wait(2) the associated file descriptor is internally disabled and no other events will be reported by the epoll interface. The user must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file descriptor with a new event mask.
从而使得,一个Session正在被handle的过程中,没有新的线程会去handle这个Session
创建新Session的函数epoll_wait、accept的函数在同一个线程,从而在创建新Session时,不会有新的线程去handle这个正在创建的Session
destroySession时,持有一个mutex,从而,即使对应的cmdFd关闭了,OS复用了这个fd,但是其创建新Session会被阻塞,从而在该Session完全destroy之前,没有新线程会handle该Session
信号处理
- 按照CSAPP建议的6条规则,逐一介绍以下
- handler要尽可能简单,比如只是设置一个flag
- handler里只能调用异步信号安全的函数(比如只访问局部变量的函数,比如不能被信号中断的函数)。异步信号安全不同于线程安全,线程安全中,对于非线程安全的函数的调用,可以通过持有同一把锁来实现线程安全;但是因为信号是异步的,所以如果在持有锁时信号到来,handler运行,则会死锁——因为handler调用该函数前也要持有锁,CSAPP 3rd(英文版)的P757有一个异步线程安全的函数的表,其中不包括
exit
、printf
等常见函数 - 保存和恢复errno。因为信号handler中调用的函数可能会在失败时设置errno,所以可能会干扰正常程序中的errno,所以需要在刚进入handler时保存errno,而在退出前恢复errno
- 如果访问了全局的数据结构,那么需要阻塞所有信号。因为笔者的handler中只是设置了一个flag,而对
bool
型的flag的读或写,intel保证是原子的,所以无需如此引用自Intel® 64 and IA-32 Architectures Software Developer’s Manual (2018 5 18), Vol.3A ch8
- The Intel486 processor (and newer processors since) guarantees that the following basic memory operations will always be carried out atomically(
atomically
: That is, once started, the processor guarantees that the operation will be completed before another processor or bus agent is allowed access to the memory location.):- Reading or writing a byte
- Reading or writing a word aligned on a 16-bit boundary
- Reading or writing a doubleword aligned on a 32-bit boundary
- The Intel486 processor (and newer processors since) guarantees that the following basic memory operations will always be carried out atomically(
- 使用
volatile
声明flag,volatile
要求编译器每次在代码中引用flag时,都从内存中读取该值。不过需要注意的是,有些编译器下的volatile
并没有类似于java中的volatile
的内存可见性的保证Writing to a volatile field has the same memory effect as a monitor release, and reading from a volatile field has the same memory effect as a monitor acquire. In effect, because the new memory model places stricter constraints on reordering of volatile field accesses with other field accesses, volatile or not, anything that was visible to thread A when it writes to volatile field f becomes visible to thread B when it reads f.(引用自JSR 133 (Java Memory Model) FAQ)
使用
sig_atomic_t
声明变量。C99的sig_atomic_t
有这么一段注释1
2
3
4
5/*
* C99: An integer type that can be accessed as an atomic entity,
* even in the presence of asynchronous interrupts.
* It is not currently necessary for this to be machine-specific.
*/并且按照intel的manual,读或写一个byte是原子的,所以笔者直接用了
bool
还有一个很重要的点,race Condition。以下代码截取自FTP class
1
2
3
4
5
6
7
8
9// make sure there is no race condition:
// the signal occur after check willExit and before epoll_wait
// then the epoll_wait may not wake up.
pthreadSigmaskWrap(SIG_BLOCK, &sigToBlock, &oldSigSet);
if (willExit) {
break;
}
int waitFdCnt = epollPWaitWrap(this->epollFd, evArray, evArraySize, -1, oldSigSet);
pthreadSigmaskWrap(SIG_SETMASK, &oldSigSet);为什么要写的这么麻烦,而不是直接使用
epoll_wait
。因为,虽然如果没有阻塞信号时,epoll_wait
收到SIGINT
会返回,但是,如果该信号是在检查了willExit
标志之后、epoll_wait
之前到来呢,那么,我们将错过这个信号——epoll_wait
不会返回,从而没有机会去再次检查willExit
。所以,需要在检查willExit
与epoll_wait
之间阻塞信号,并且要在epoll_wait
期间不阻塞SIGINT
,这意味着需要原子的做这件事情,这就是epoll_pwait
帮我们做的ref from epoll_pwait manual
The following epoll_pwait() call:
1
2> ready = epoll_pwait(epfd, &events, maxevents, timeout, &sigmask);
>is equivalent to atomically executing the following calls:
1
2
3
4
5
6> sigset_t origmask;
>
> pthread_sigmask(SIG_SETMASK, &sigmask, &origmask);
> ready = epoll_wait(epfd, &events, maxevents, timeout);
> pthread_sigmask(SIG_SETMASK, &origmask, NULL);
>
压测
压测步骤
笔者的压测程序使用java写的,代码在这里https://github.com/H-ZeX/FTP-Implement/tree/master/test/FTPServerTester
压测包括
- 登录
- 列出某个目录
- 上传100KB的文件
- 根据配置,
sleep(HANG_TIME)
,在C3K~C4K的测试结果中,HANG_TIME=0
压测前,运行server的命令是
1
2
3
4ulimit -s unlimited -f unlimited -d unlimited -n unlimited
su root
echo 20000 > /proc/sys/net/core/somaxconn
sudo ./FTPServer [port] > /tmp/FTPServerOutput修改
somaxconn
是为了使得baclog足够大将stdout重定向到
/tmp/FTPServerOutput
是为了使得错误信息清晰的显示出来之所以需要
sudo
,是因为使用了OS的账户验证机制来实现用户登录./FTPServer [port]
的[port]
参数可以不指定,也就是直接sudo ./FTPServer > /tmp/FTPServerOutput
,这样将会监听在8001
端口
然后运行
java -ea -jar -Dexternal.config=file:/tmp/1.properties FTPServerTester-1.0-SNAPSHOT.jar
FTPServerTester-1.0-SNAPSHOT.jar
在根目录的test
目录下(不是在src/test
的那个test
目录)/tmp/1.properties
是配置文件,因为需要包含测试账户信息,所以需要自己定制,样例如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30StressTest.TestCnt=10
StressTest.MaxCmdConnectionCnt=1000
StressTest.MaxThreadCnt=1024
# the time(millisecond) to hand on the connection
StressTest.HangTime=1000
# 你运行FTPServer的host的地址
Tester.TesterServerAddress=10.243.6.109
# 你运行Tester的host地址
# 必须保证运行server的host与运行tester的host是可以互通的
# 包括,tester可以主动链接server,server也可以主动链接tester
Tester.YourselfAddress=10.243.6.43
Tester.ServerPort=8001
# 系统上账户的用户名
Tester.UserName=
# 系统上账户的密码
Tester.Password=
# 这些是以逗号分隔的目录列表
# 必须是绝对路径
# 这些目录数量应该>=20个,越多越好,太少的话,测试会很慢
# 因为多个用户访问少数几个目录会很慢
# 如果你的测试程序与这个FTPserver运行在不同的主机上,那么FTPServer运行的主机上应该存在这些目录
# 如果运行在同一台机子上,测试程序会创建这些目录
Tester.ListTestDir=/tmp/testDir_1,/tmp/testDir_2,/tmp/testDir_3,/tmp/testDir_4,/tmp/testDir_5,/tmp/testDir_6,/tmp/testDir_7,/tmp/testDir_8,/tmp/testDir_9,/tmp/testDir_10,/tmp/testDir_11,/tmp/testDir_12,/tmp/testDir_13,/tmp/testDir_14,/tmp/testDir_15,/tmp/testDir_16,/tmp/testDir_17
# 这个目录必须是绝对路径
# 如果你的测试程序与这个FTPserver运行在不同的主机上,那么FTPServer运行的主机上应该存在这些目录
# 如果运行在同一台机子上,测试程序会创建这些目录
Tester.StorTestDir=/tmp/FTPSeverTesterStorDirs____23233dd22/
压测结果
- 笔者的测试与server都跑在同一台机器上
- 笔者在自己的机器上(Intel i7-8550U, 16G内存,没有SSD),
StressTest.MaxCmdConnectionCnt
设为10240
及以下时,测试可以顺利完成。并且在测试过程使用linux的ftp
命令与server通信,是比较流畅的 在如下设置测试参数后
1
2
3StressTest.MaxCmdConnectionCnt=10000
StressTest.MaxThreadCnt=4024
StressTest.HangTime=0sudo watch -n 0.5 "netstat -anp | grep -i <PID> | grep -i "est" | wc -l "
可以看到数目是3K到4K之间波动,并且测试期间手动与服务器通信是比较流畅的
出现的问题
测试程序开50k个链接连server进行操作(50k个测试是依次提交给线程池,边提交线程池边运行),server需要openListenFd,由OS指定端口,但是在测试程序成功进行了15k多一点的测试后,server的这个openListenFd失败,errno是
Address already in use
。我猜测,可能是很多端口处于
TIME_WAIT
状态,虽然打开的socket设置了SO_REUSEADDR
,可以绑定这些TIME_WAIT链接的端口,但是刚好,对端也是上一次那个链接的端口,这是TCP禁止的——TCP允许复用处于TIME_WAIT的端口,但是不允许新的链接与TIME_WAIT的链接的(serverIp, serverPort, serverIp, serverPort)相同(其实也不是一定禁止,TCP规范是允许有例外),所以就提示Address already in use
server listen的端口的backlog设太小(20),测试程序开的链接数一多,就有一些链接三次握手成功,但是hang住在读取welcome信息那里,详见笔者的另一篇博文高并发情况下backlog过低出现的问题
缺点
- 因为使用是线程池,并且似乎一个线程不能设置另一个线程的uid(欢迎指正!),所以无法利用OS的机制实现权限控制
- 只有单个线程accept,可以通过linux内核3.9的一个特性
SO_REUSEPORT
来实现多线程accept,并且没有惊群、负载不均衡的问题(The SO_REUSEPORT socket option)