C/C++ 服務(wù)器并發(fā)
1. 單線程 / 進(jìn)程
在 TCP 通信過程中,服務(wù)器端啟動(dòng)之后可以同時(shí)和多個(gè)客戶端建立連接,并進(jìn)行網(wǎng)絡(luò)通信,但是在介紹 TCP 通信流程的時(shí)候,提供的服務(wù)器代碼卻不能完成這樣的需求,先簡單的看一下之前的服務(wù)器代碼的處理思路,再來分析代碼中的弊端:
//?server.c
#include?
#include?
#include?
#include?
#include?
int?main()
{
????//?1.?創(chuàng)建監(jiān)聽的套接字
????int?lfd?=?socket(AF_INET,?SOCK_STREAM,?0);
????//?2.?將socket()返回值和本地的IP端口綁定到一起
????struct?sockaddr_in?addr;
????addr.sin_family?=?AF_INET;
????addr.sin_port?=?htons(10000);???//?大端端口
????//?INADDR_ANY代表本機(jī)的所有IP,?假設(shè)有三個(gè)網(wǎng)卡就有三個(gè)IP地址
????//?這個(gè)宏可以代表任意一個(gè)IP地址
????addr.sin_addr.s_addr?=?INADDR_ANY;??//?這個(gè)宏的值為0?==?0.0.0.0
????int?ret?=?bind(lfd,?(struct?sockaddr*)&addr,?sizeof(addr));
????//?3.?設(shè)置監(jiān)聽
????ret?=?listen(lfd,?128);
????//?4.?阻塞等待并接受客戶端連接
????struct?sockaddr_in?cliaddr;
????int?clilen?=?sizeof(cliaddr);
????int?cfd?=?accept(lfd,?(struct?sockaddr*)&cliaddr,?&clilen);
????//?5.?和客戶端通信
????while(1)
????{
????????//?接收數(shù)據(jù)
????????char?buf[1024];
????????memset(buf,?0,?sizeof(buf));
????????int?len?=?read(cfd,?buf,?sizeof(buf));
????????if(len?>?0)
????????{
????????????printf("客戶端say:?%s\n",?buf);
????????????write(cfd,?buf,?len);
????????}
????????else?if(len??==?0)
????????{
????????????printf("客戶端斷開了連接...\n");
????????????break;
????????}
????????else
????????{
????????????perror("read");
????????????break;
????????}
????}
????close(cfd);
????close(lfd);
????return?0;
}
在上面的代碼中用到了三個(gè)會(huì)引起程序阻塞的函數(shù),分別是:
accept():如果服務(wù)器端沒有新客戶端連接,阻塞當(dāng)前進(jìn)程 / 線程,如果檢測到新連接解除阻塞,建立連接read():如果通信的套接字對(duì)應(yīng)的讀緩沖區(qū)沒有數(shù)據(jù),阻塞當(dāng)前進(jìn)程 / 線程,檢測到數(shù)據(jù)解除阻塞,接收數(shù)據(jù)write():如果通信的套接字寫緩沖區(qū)被寫滿了,阻塞當(dāng)前進(jìn)程 / 線程(這種情況比較少見)
如果需要和發(fā)起新的連接請(qǐng)求的客戶端建立連接,那么就必須在服務(wù)器端通過一個(gè)循環(huán)調(diào)用 accept() 函數(shù),另外已經(jīng)和服務(wù)器建立連接的客戶端需要和服務(wù)器通信,發(fā)送數(shù)據(jù)時(shí)的阻塞可以忽略,當(dāng)接收不到數(shù)據(jù)時(shí)程序也會(huì)被阻塞,這時(shí)候就會(huì)非常矛盾,被 accept() 阻塞就無法通信,被 read() 阻塞就無法和客戶端建立新連接。因此得出一個(gè)結(jié)論,基于上述處理方式,在單線程 / 單進(jìn)程場景下,服務(wù)器是無法處理多連接的,解決方案也有很多,常用的有四種:
使用多線程實(shí)現(xiàn) 使用多進(jìn)程實(shí)現(xiàn) 使用 IO 多路轉(zhuǎn)接(復(fù)用)實(shí)現(xiàn) 使用 IO 多路轉(zhuǎn)接 + 多線程實(shí)現(xiàn)
2. 多進(jìn)程并發(fā)
如果要編寫多進(jìn)程版的并發(fā)服務(wù)器程序,首先要考慮,創(chuàng)建出的多個(gè)進(jìn)程都是什么角色,這樣就可以在程序中對(duì)號(hào)入座了。在 Tcp 服務(wù)器端一共有兩個(gè)角色,分別是:監(jiān)聽和通信,監(jiān)聽是一個(gè)持續(xù)的動(dòng)作,如果有新連接就建立連接,如果沒有新連接就阻塞。關(guān)于通信是需要和多個(gè)客戶端同時(shí)進(jìn)行的,因此需要多個(gè)進(jìn)程,這樣才能達(dá)到互不影響的效果。進(jìn)程也有兩大類:父進(jìn)程和子進(jìn)程,通過分析我們可以這樣分配進(jìn)程:
父進(jìn)程:
負(fù)責(zé)監(jiān)聽,處理客戶端的連接請(qǐng)求,也就是在父進(jìn)程中循環(huán)調(diào)用 accept() 函數(shù) 創(chuàng)建子進(jìn)程:建立一個(gè)新的連接,就創(chuàng)建一個(gè)新的子進(jìn)程,讓這個(gè)子進(jìn)程和對(duì)應(yīng)的客戶端通信 回收子進(jìn)程資源:子進(jìn)程退出回收其內(nèi)核 PCB 資源,防止出現(xiàn)僵尸進(jìn)程
子進(jìn)程:
負(fù)責(zé)通信,基于父進(jìn)程建立新連接之后得到的文件描述符,和對(duì)應(yīng)的客戶端完成數(shù)據(jù)的接收和發(fā)送。 發(fā)送數(shù)據(jù): send() / write()接收數(shù)據(jù): recv() / read()
在多進(jìn)程版的服務(wù)器端程序中,多個(gè)進(jìn)程是有血緣關(guān)系,對(duì)應(yīng)有血緣關(guān)系的進(jìn)程來說,還需要想明白他們有哪些資源是可以被繼承的,哪些資源是獨(dú)占的,以及一些其他細(xì)節(jié):
子進(jìn)程是父進(jìn)程的拷貝,在子進(jìn)程的內(nèi)核區(qū) PCB 中,文件描述符也是可以被拷貝的,因此在父進(jìn)程可以使用的文件描述符在子進(jìn)程中也有一份,并且可以使用它們做和父進(jìn)程一樣的事情。 父子進(jìn)程有用各自的獨(dú)立的虛擬地址空間,因此所有的資源都是獨(dú)占的 為了節(jié)省系統(tǒng)資源,對(duì)于只有在父進(jìn)程才能用到的資源,可以在子進(jìn)程中將其釋放掉,父進(jìn)程亦如此。 由于需要在父進(jìn)程中做 accept()操作,并且要釋放子進(jìn)程資源,如果想要更高效一下可以使用信號(hào)的方式處理

多進(jìn)程版并發(fā) TCP 服務(wù)器示例代碼如下:
#include?
#include?
#include?
#include?
#include?
#include?
#include?
#include?
//?信號(hào)處理函數(shù)
void?callback(int?num)
{
????while(1)
????{
????????pid_t?pid?=?waitpid(-1,?NULL,?WNOHANG);
????????if(pid?<=?0)
????????{
????????????printf("子進(jìn)程正在運(yùn)行,?或者子進(jìn)程被回收完畢了\n");
????????????break;
????????}
????????printf("child?die,?pid?=?%d\n",?pid);
????}
}
int?childWork(int?cfd);
int?main()
{
????//?1.?創(chuàng)建監(jiān)聽的套接字
????int?lfd?=?socket(AF_INET,?SOCK_STREAM,?0);
????if(lfd?==?-1)
????{
????????perror("socket");
????????exit(0);
????}
????//?2.?將socket()返回值和本地的IP端口綁定到一起
????struct?sockaddr_in?addr;
????addr.sin_family?=?AF_INET;
????addr.sin_port?=?htons(10000);???//?大端端口
????//?INADDR_ANY代表本機(jī)的所有IP,?假設(shè)有三個(gè)網(wǎng)卡就有三個(gè)IP地址
????//?這個(gè)宏可以代表任意一個(gè)IP地址
????//?這個(gè)宏一般用于本地的綁定操作
????addr.sin_addr.s_addr?=?INADDR_ANY;??//?這個(gè)宏的值為0?==?0.0.0.0
????//????inet_pton(AF_INET,?"192.168.237.131",?&addr.sin_addr.s_addr);
????int?ret?=?bind(lfd,?(struct?sockaddr*)&addr,?sizeof(addr));
????if(ret?==?-1)
????{
????????perror("bind");
????????exit(0);
????}
????//?3.?設(shè)置監(jiān)聽
????ret?=?listen(lfd,?128);
????if(ret?==?-1)
????{
????????perror("listen");
????????exit(0);
????}
????//?注冊(cè)信號(hào)的捕捉
????struct?sigaction?act;
????act.sa_flags?=?0;
????act.sa_handler?=?callback;
????sigemptyset(&act.sa_mask);
????sigaction(SIGCHLD,?&act,?NULL);
????//?接受多個(gè)客戶端連接,?對(duì)需要循環(huán)調(diào)用?accept
????while(1)
????{
????????//?4.?阻塞等待并接受客戶端連接
????????struct?sockaddr_in?cliaddr;
????????int?clilen?=?sizeof(cliaddr);
????????int?cfd?=?accept(lfd,?(struct?sockaddr*)&cliaddr,?&clilen);
????????if(cfd?==?-1)
????????{
????????????if(errno?==?EINTR)
????????????{
????????????????//?accept調(diào)用被信號(hào)中斷了,?解除阻塞,?返回了-1
????????????????//?重新調(diào)用一次accept
????????????????continue;
????????????}
????????????perror("accept");
????????????exit(0);
?
????????}
????????//?打印客戶端的地址信息
????????char?ip[24]?=?{0};
????????printf("客戶端的IP地址:?%s,?端口:?%d\n",
???????????????inet_ntop(AF_INET,?&cliaddr.sin_addr.s_addr,?ip,?sizeof(ip)),
???????????????ntohs(cliaddr.sin_port));
????????//?新的連接已經(jīng)建立了,?創(chuàng)建子進(jìn)程,?讓子進(jìn)程和這個(gè)客戶端通信
????????pid_t?pid?=?fork();
????????if(pid?==?0)
????????{
????????????//?子進(jìn)程?->?和客戶端通信
????????????//?通信的文件描述符cfd被拷貝到子進(jìn)程中
????????????//?子進(jìn)程不負(fù)責(zé)監(jiān)聽
????????????close(lfd);
????????????while(1)
????????????{
????????????????int?ret?=?childWork(cfd);
????????????????if(ret?<=0)
????????????????{
????????????????????break;
????????????????}
????????????}
????????????//?退出子進(jìn)程
????????????close(cfd);
????????????exit(0);
????????}
????????else?if(pid?>?0)
????????{
????????????//?父進(jìn)程不和客戶端通信
????????????close(cfd);
????????}
????}
????return?0;
}
//?5.?和客戶端通信
int?childWork(int?cfd)
{
????//?接收數(shù)據(jù)
????char?buf[1024];
????memset(buf,?0,?sizeof(buf));
????int?len?=?read(cfd,?buf,?sizeof(buf));
????if(len?>?0)
????{
????????printf("客戶端say:?%s\n",?buf);
????????write(cfd,?buf,?len);
????}
????else?if(len??==?0)
????{
????????printf("客戶端斷開了連接...\n");
????}
????else
????{
????????perror("read");
????}
????return?len;
}
在上面的示例代碼中,父子進(jìn)程中分別關(guān)掉了用不到的文件描述符(父進(jìn)程不需要通信,子進(jìn)程也不需要監(jiān)聽)。如果客戶端主動(dòng)斷開連接,那么服務(wù)器端負(fù)責(zé)和客戶端通信的子進(jìn)程也就退出了,子進(jìn)程退出之后會(huì)給父進(jìn)程發(fā)送一個(gè)叫做 SIGCHLD 的信號(hào),在父進(jìn)程中通過 sigaction() 函數(shù)捕捉了該信號(hào),通過回調(diào)函數(shù) callback() 中的 waitpid() 對(duì)退出的子進(jìn)程進(jìn)行了資源回收。
另外還有一個(gè)細(xì)節(jié)要說明一下,這是父進(jìn)程的處理代碼:
int?cfd?=?accept(lfd,?(struct?sockaddr*)&cliaddr,?&clilen);
while(1)
{
????????int?cfd?=?accept(lfd,?(struct?sockaddr*)&cliaddr,?&clilen);
????????if(cfd?==?-1)
????????{
????????????if(errno?==?EINTR)
????????????{
????????????????//?accept調(diào)用被信號(hào)中斷了,?解除阻塞,?返回了-1
????????????????//?重新調(diào)用一次accept
????????????????continue;
????????????}
????????????perror("accept");
????????????exit(0);
?
????????}
?}
如果父進(jìn)程調(diào)用 accept() 函數(shù)沒有檢測到新的客戶端連接,父進(jìn)程就阻塞在這兒了,這時(shí)候有子進(jìn)程退出了,發(fā)送信號(hào)給父進(jìn)程,父進(jìn)程就捕捉到了這個(gè)信號(hào) SIGCHLD, 由于信號(hào)的優(yōu)先級(jí)很高,會(huì)打斷代碼正常的執(zhí)行流程,因此父進(jìn)程的阻塞被中斷,轉(zhuǎn)而去處理這個(gè)信號(hào)對(duì)應(yīng)的函數(shù) callback(),處理完畢,再次回到 accept() 位置,但是這是已經(jīng)無法阻塞了,函數(shù)直接返回 - 1,此時(shí)函數(shù)調(diào)用失敗,錯(cuò)誤描述為 accept: Interrupted system call,對(duì)應(yīng)的錯(cuò)誤號(hào)為 EINTR,由于代碼是被信號(hào)中斷導(dǎo)致的錯(cuò)誤,所以可以在程序中對(duì)這個(gè)錯(cuò)誤號(hào)進(jìn)行判斷,讓父進(jìn)程重新調(diào)用 accept(),繼續(xù)阻塞或者接受客戶端的新連接。
3. 多線程并發(fā)
編寫多線程版的并發(fā)服務(wù)器程序和多進(jìn)程思路差不多,考慮明白了對(duì)號(hào)入座即可。多線程中的線程有兩大類:主線程(父線程)和子線程,他們分別要在服務(wù)器端處理監(jiān)聽和通信流程。根據(jù)多進(jìn)程的處理思路,就可以這樣設(shè)計(jì)了:
主線程:
負(fù)責(zé)監(jiān)聽,處理客戶端的連接請(qǐng)求,也就是在父進(jìn)程中循環(huán)調(diào)用 accept()函數(shù)創(chuàng)建子線程:建立一個(gè)新的連接,就創(chuàng)建一個(gè)新的子進(jìn)程,讓這個(gè)子進(jìn)程和對(duì)應(yīng)的客戶端通信 回收子線程資源:由于回收需要調(diào)用阻塞函數(shù),這樣就會(huì)影響 accept(),直接做線程分離即可。
子線程:
負(fù)責(zé)通信,基于主線程建立新連接之后得到的文件描述符,和對(duì)應(yīng)的客戶端完成數(shù)據(jù)的接收和發(fā)送。 發(fā)送數(shù)據(jù): send() / write()接收數(shù)據(jù): recv() / read()
在多線程版的服務(wù)器端程序中,多個(gè)線程共用同一個(gè)地址空間,有些數(shù)據(jù)是共享的,有些數(shù)據(jù)的獨(dú)占的,下面來分析一些其中的一些細(xì)節(jié):
同一地址空間中的多個(gè)線程的??臻g是獨(dú)占的 多個(gè)線程共享全局?jǐn)?shù)據(jù)區(qū),堆區(qū),以及內(nèi)核區(qū)的文件描述符等資源,因此需要注意數(shù)據(jù)覆蓋問題,并且在多個(gè)線程訪問共享資源的時(shí)候,還需要進(jìn)行線程同步。

多線程版 Tcp 服務(wù)器示例代碼如下:
#include?
#include?
#include?
#include?
#include?
#include?
struct?SockInfo
{
????int?fd;??????????????????????//?通信
????pthread_t?tid;???????????????//?線程ID
????struct?sockaddr_in?addr;?????//?地址信息
};
struct?SockInfo?infos[128];
void*?working(void*?arg)
{
????while(1)
????{
????????struct?SockInfo*?info?=?(struct?SockInfo*)arg;
????????//?接收數(shù)據(jù)
????????char?buf[1024];
????????int?ret?=?read(info->fd,?buf,?sizeof(buf));
????????if(ret?==?0)
????????{
????????????printf("客戶端已經(jīng)關(guān)閉連接...\n");
????????????info->fd?=?-1;
????????????break;
????????}
????????else?if(ret?==?-1)
????????{
????????????printf("接收數(shù)據(jù)失敗...\n");
????????????info->fd?=?-1;
????????????break;
????????}
????????else
????????{
????????????write(info->fd,?buf,?strlen(buf)+1);
????????}
????}
????return?NULL;
}
int?main()
{
????//?1.?創(chuàng)建用于監(jiān)聽的套接字
????int?fd?=?socket(AF_INET,?SOCK_STREAM,?0);
????if(fd?==?-1)
????{
????????perror("socket");
????????exit(0);
????}
????//?2.?綁定
????struct?sockaddr_in?addr;
????addr.sin_family?=?AF_INET;??????????//?ipv4
????addr.sin_port?=?htons(8989);????????//?字節(jié)序應(yīng)該是網(wǎng)絡(luò)字節(jié)序
????addr.sin_addr.s_addr?=??INADDR_ANY;?//?==?0,?獲取IP的操作交給了內(nèi)核
????int?ret?=?bind(fd,?(struct?sockaddr*)&addr,?sizeof(addr));
????if(ret?==?-1)
????{
????????perror("bind");
????????exit(0);
????}
????//?3.設(shè)置監(jiān)聽
????ret?=?listen(fd,?100);
????if(ret?==?-1)
????{
????????perror("listen");
????????exit(0);
????}
????//?4.?等待,?接受連接請(qǐng)求
????int?len?=?sizeof(struct?sockaddr);
????//?數(shù)據(jù)初始化
????int?max?=?sizeof(infos)?/?sizeof(infos[0]);
????for(int?i=0;?i????{
????????bzero(&infos[i],?sizeof(infos[i]));
????????infos[i].fd?=?-1;
????????infos[i].tid?=?-1;
????}
????//?父進(jìn)程監(jiān)聽,?子進(jìn)程通信
????while(1)
????{
????????//?創(chuàng)建子線程
????????struct?SockInfo*?pinfo;
????????for(int?i=0;?i????????{
????????????if(infos[i].fd?==?-1)
????????????{
????????????????pinfo?=?&infos[i];
????????????????break;
????????????}
????????????if(i?==?max-1)
????????????{
????????????????sleep(1);
????????????????i--;
????????????}
????????}
????????int?connfd?=?accept(fd,?(struct?sockaddr*)&pinfo->addr,?&len);
????????printf("parent?thread,?connfd:?%d\n",?connfd);
????????if(connfd?==?-1)
????????{
????????????perror("accept");
????????????exit(0);
????????}
????????pinfo->fd?=?connfd;
????????pthread_create(&pinfo->tid,?NULL,?working,?pinfo);
????????pthread_detach(pinfo->tid);
????}
????//?釋放資源
????close(fd);??//?監(jiān)聽
????return?0;
}
在編寫多線程版并發(fā)服務(wù)器代碼的時(shí)候,需要注意父子線程共用同一個(gè)地址空間中的文件描述符,因此每當(dāng)在主線程中建立一個(gè)新的連接,都需要將得到文件描述符值保存起來,不能在同一變量上進(jìn)行覆蓋,這樣做丟失了之前的文件描述符值也就不知道怎么和客戶端通信了。
在上面示例代碼中是將成功建立連接之后得到的用于通信的文件描述符值保存到了一個(gè)全局?jǐn)?shù)組中,每個(gè)子線程需要和不同的客戶端通信,需要的文件描述符值也就不一樣,只要保證存儲(chǔ)每個(gè)有效文件描述符值的變量對(duì)應(yīng)不同的內(nèi)存地址,在使用的時(shí)候就不會(huì)發(fā)生數(shù)據(jù)覆蓋的現(xiàn)象,造成通信數(shù)據(jù)的混亂了。
文章鏈接:https://subingwen.com/linux/concurrence/
