好久没写网络聊天室了,去年暑假可以说写了一暑假,最近复习这些,又因为我一直偏向于多线程,就用多进程复习一下。
下面给出昨天写的基于多进程、共享内存的网络聊天室代码。每个进程负责一个连接,多个进程之间仅共享读,不共享写,因此无需信号量来同步。分配的一段内存中,以数组的方式,分配给每个client一段buffer,每个clilent对应的buffer的索引就是connfd。当一个子进程收到客户端数据后,通过每客户端管道发送自己的pid给主进程,主进程通知除了该子进程的其他进程将该片内存写好的数据转发给其他客户端(sub_proess[pid]=connd)。
代码如下:
复制代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <signal.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
const int USER_LIMIT = 3;
const int BUFFER_SIZE = 1024;
const int FD_LIMIT = 65545;
const int MAX_EVENT_NUMBER = 1024;
const int PROCESS_LIMIT = 65536;
//封装每个客户端连接数据
struct client_data {
sockaddr_in address;
int
connfd;
pid_t
pid;
//负责该客户端子进程的pid
int
pipefd[2];
//每个子进程pipe
};
static const char* shm_name = "/my_shm";
//共享内存的名字
int sig_pipefd[2];
//用来统一事件源
int epollfd;
int listenfd;
int shmfd;
char* share_mem = NULL;
//共享内存起始地址
//客户端连接数组,进程用客户连接的编号来索引这个数组,即可取得相关的客户连接数据
client_data* users = NULL;
//子进程和客户连接的关系映射表,用子进程的pid来索引这个数组,即可取得该进程处理的客户连接的编号
int* sub_process = 0;
int user_count = 0; //客户连接下标,这个名字有点误导,总之user_count>=USER_LIMIT即连接过多
bool stop_child = false;
//停止一个子进程,这个是全部变量,每个子进程都有自己拷贝的一份
int setnonblocking(int fd)
{
int old_option = fcntl(fd, F_GETFL);
int new_option = old_option | O_NONBLOCK;
fcntl(fd, F_SETFL, new_option);
return old_option;
}
void addfd(int epfd, int fd)
{
epoll_event event;
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &event);
setnonblocking(fd);
}
void sig_handler(int sig)
{
int save_errno = errno;
int msg = sig;
send(sig_pipefd[1], (char*)&msg, 1, 0);
errno = save_errno;
}
void addsig(int sig, void(*handler)(int), bool restart = true)
{
struct sigaction sa;
memset(&sa, '', sizeof(sa));
sa.sa_handler = handler;
if(restart)
sa.sa_flags |= SA_RESTART;
sigfillset(&sa.sa_mask);
assert(sigaction(sig, &sa, NULL) != -1);
}
void del_resource()
{
close(sig_pipefd[0]);
close(sig_pipefd[1]);
close(listenfd);
close(epollfd);
shm_unlink(shm_name);
}
//子进程的信号处理函数,停止一个子进程
void child_term_handler(int sig)
{
stop_child = true;
}
//子进程运行的函数,参数inx指出该子进程处理的客户连接的编号,users是保存所有客户连接数据的数组,参数share_mem指出共享内存的起始地址
int run_child(int idx, client_data* users, char* share_mem)
{
epoll_event events[MAX_EVENT_NUMBER];
//每个子进程使用I/O服用同时监听客户连接socket和与父进程通信的pipe描述符
int child_epollfd = epoll_create(5);
assert(child_epollfd != -1);
int connfd = users[idx].connfd;
addfd(child_epollfd, connfd);
int pipefd = users[idx].pipefd[1];
addfd(child_epollfd, pipefd);
int ret;
//子进程需要设置自己的信号处理函数,因为fork会继承父进程信号处理函数
addsig(SIGTERM, child_term_handler, false);
while(!stop_child){
int number = epoll_wait(child_epollfd, events, MAX_EVENT_NUMBER, -1);
if(number < 0 && errno != EINTR){
printf("epoll failuren");
break;
}
for(int i=0; i<number; ++i){
int sockfd = events[i].data.fd;
//本子进程负责的客户链接有数据到达
if(sockfd == connfd && (events[i].events & EPOLLIN)){
//清零该客户对应的缓冲区
memset(share_mem+idx*BUFFER_SIZE, '', BUFFER_SIZE);
//将客户数据读取到对应的读缓存中,该读缓存是共享内存的一段,它开始于idx*BUFFER_SIZE处,长度为BUFFER_SIZE字节,因此每个客户连接是共享的
ret = recv(connfd, share_mem+idx*BUFFER_SIZE, BUFFER_SIZE-1, 0);
//留一个字节为''间隔
if(ret < 0){
if(errno != EAGAIN)
stop_child = true;
}
else if(ret == 0)
stop_child = true;
else
//成功读取客户数据后就通知主进程,让主进程吩咐其他进程转发
send(pipefd, (char*)&idx, sizeof(idx), 0);
}
//主进程通过管道通知本进程需要转发第client个客户端的数据到本进程负责的客户
else if(sockfd == pipefd && (events[i].events & EPOLLIN)){
int client = 0;
//接受主进程发来的数据,即客户的编号,用来索引buffer
ret = recv(sockfd, (char *)&client, sizeof(client), 0);
if(ret < 0){
if(errno != EAGAIN)
stop_child = true;
}
else if(ret == 0)
stop_child = true;
else
//转发给自己的客户
send(connfd, share_mem+client*BUFFER_SIZE, BUFFER_SIZE, 0);
}
else
continue;
}
}
close(connfd);
close(pipefd);
close(child_epollfd);
return 0;
}
int main(int argc, char** argv)
{
if( argc <= 2 )
{
printf( "usage: %s ip_address port_numbern", basename( argv[0] ) );
return 1;
}
const char* ip = argv[1];
int port = atoi( argv[2] );
int ret = 0;
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int listenfd = socket(PF_INET, SOCK_STREAM, 0);
assert(listenfd >= 0);
int on = 1;
ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
assert(ret != -1);
ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
assert(ret != -1);
ret = listen(listenfd, 5);
assert(ret != -1);
/////////////////////////////////////////////////////////////
user_count = 0;
users = new client_data[USER_LIMIT];
sub_process = new int [PROCESS_LIMIT];
for(int i=0; i<PROCESS_LIMIT; ++i)
sub_process[i] = -1;
////////////////////////////////////////////////////////////
epoll_event events[MAX_EVENT_NUMBER];
epollfd = epoll_create(5);
assert(epollfd != -1);
addfd(epollfd, listenfd);
//socketpair是全双工的,所以父子进程通信无需向pipe一样需要两个pipe[2]
//fork完毕socketpair可以双向通信
ret = socketpair(PF_UNIX, SOCK_STREAM, 0, sig_pipefd);
assert(ret != -1);
setnonblocking( sig_pipefd[1] );
addfd(epollfd, sig_pipefd[0]);
// add all the interesting signals here
addsig(SIGCHLD, sig_handler);
addsig(SIGTERM, sig_handler);
addsig(SIGINT, sig_handler);
addsig(SIGPIPE, SIG_IGN);
bool stop_server = false;
bool terminate = false;
///////////////////////////////////////////////////////////////
//创建共享内存,作为所有客户连接的读缓存
shmfd = shm_open(shm_name, O_CREAT | O_RDWR, 0666);
assert(shmfd != -1);
//清空且resize文件大小为USER_LIMIT*BUFFER_SIZE
ret = ftruncate(shmfd, USER_LIMIT*BUFFER_SIZE);
assert(ret != -1);
//通过上面生成的一定大小的文件来使用mmap映射共享内存
//这是共享内存的一种方式,另外一种使用SystemV的shmat
share_mem = (char *)mmap(NULL, USER_LIMIT*BUFFER_SIZE, PROT_WRITE | PROT_READ,
MAP_SHARED, shmfd, 0);
assert(share_mem != MAP_FAILED);
close(shmfd);
//close shmfd is ok
//////////////////////////////////////////////////////////////
while(!stop_server){
int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
if(number < 0 && errno != EINTR){
printf("epoll failuren");
break;
}
for(int i=0; i<number; ++i){
int sockfd = events[i].data.fd;
if(sockfd == listenfd){
struct sockaddr_in client_address;
socklen_t len = sizeof(client_address);
int connfd = accept(listenfd, (struct sockaddr*)&client_address, &len);
if(connfd < 0){
printf("errno is: %dn", errno);
continue;
}
if(user_count >= USER_LIMIT){
//limit
const char* info = "too many usersn";
printf("%s", info);
send(connfd, info, strlen(info), 0);
close(connfd);
continue;
}
//保存第user_count个客户连接的数据
users[user_count].address = client_address;
users[user_count].connfd = connfd;
//在子进程和父进程间建立管道,以传递必要的数据
ret = socketpair(PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd);
assert(ret != -1);
pid_t pid = fork();
if(pid < 0){
close(connfd);
continue;
//!!!!!!!
}
else if(pid == 0){
//in child
close(epollfd);
close(listenfd);
close(users[user_count].pipefd[0]);
//子进程关掉一端,子进程给父进程发数据使用pipefd[1]
close(sig_pipefd[0]);
close(sig_pipefd[1]);
run_child(user_count, users, share_mem);
munmap((void*)share_mem, USER_LIMIT*BUFFER_SIZE);
exit(0);
}
else{
close(connfd);
close(users[user_count].pipefd[1]); //同理
addfd(epollfd, users[user_count].pipefd[0]);
//记录新的客户连接在数组users中的索引值,建立进程pid和索引值的映射关系
users[user_count].pid = pid;
sub_process[pid] = user_count;
user_count++;
}
}
//handle signal
else if(sockfd == sig_pipefd[0] && (events[i].events & EPOLLIN)){
int sig;
char signals[1024];
ret = recv(sig_pipefd[0], signals, sizeof(signals), 0);
if(ret == -1)
continue;
else if(ret == 0)
continue;
else{
for(int i=0; i<ret; ++i){
switch(signals[i]){
case SIGCHLD:
//子进程退出,表示有客户端关闭了连接
{
pid_t pid;
int stat;
while((pid = waitpid(-1, &stat, WNOHANG)) > 0){
//用子进程的pid取得被关闭客户连接的编号
int del_user = sub_process[pid];
sub_process[pid] = -1;
if(del_user < 0 || del_user > USER_LIMIT)
continue;
//清除数据
epoll_ctl(epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0);
close(users[del_user].pipefd[0]);
//用最后一个user替换该位置
users[del_user] = users[--user_count];
sub_process[users[del_user].pid] = del_user;
//修正sub_process对应的值,也就是修正最后一个客户端pid对应的客户编号
}
if(terminate && user_count == 0)
stop_server = true;
break;
}
case SIGTERM:
case SIGINT:
//结束服务器程序
{
printf("kill all the child newn");
if(user_count == 0){
stop_server = true;
break;
}
for(int i=0; i<user_count; ++i){
int pid = users[i].pid;
kill(pid, SIGTERM);
//kill每个子进程
}
terminate = true;
break;
}
default:
break;
}
}
}
}
//某个子进程收到数据,向父进程通知
else if(events[i].events & EPOLLIN){
int child = 0;
//读取管道数据,收到的数据时child变量记录了哪个客户连接有数据到达
ret =recv(sockfd, (char*)&child, sizeof(child), 0);
if(ret == -1)
continue;
else if(ret == 0)
continue;
else{
//向除负责第child个客户的子进程之外的子进程发送消息,通知他们有客户数据要写
for(int j=0; j<user_count; ++j){
if(users[j].pipefd[0] != sockfd){
printf("send data to child accross pipen");
send(users[j].pipefd[0], (char*)&child, sizeof(child), 0);
}
}
}
}
}
}
del_resource();
return 0;
}
最后
以上就是大胆缘分最近收集整理的关于多进程、共享内存的网络聊天室的全部内容,更多相关多进程、共享内存内容请搜索靠谱客的其他文章。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复