充分披露,我是一名学生,这是一个任务。 我一直在努力了一个星期几乎不停(除了以前的时间),我不知道我做错了什么。 我的服务器在完成“less量”recv之后仍然挂在epoll_wait上(“很less”,因为我预计有几GB的数据,而且只有几十MB)。 我不认为我的客户端工作有什么问题,因为我的select和multithreading服务器工作正常。 请快速浏览一下,让我知道是否有任何东西突然出现在我的问题的原因。
客户端/服务器的基本思想是用连接(10k +)轰击服务器,并跨越多次传输给定数量的数据。 这个epoll服务器在2000年遇到了麻烦,当时我的multithreading服务器只处理了10k的目标。
我不是要求你为我做我的任务(我差不多完成了),我只需要帮助弄清楚我在这里做错了什么。 预先感谢您提供的任何帮助:)
1 #include "common.h" 2 #include <sys/epoll.h> 3 4 uint16_t ready[MAX_CONNS]; 5 uint16_t next; 6 pthread_mutex_t mutex; 7 8 void *worker_thread(void *param) { 9 int my_sock, pos; 10 struct conn_params *conn_ps = (struct conn_params *)param; 11 12 while (1) { 13 pthread_mutex_lock(&mutex); 14 15 while (1) { 16 if (next == MAX_CONNS) { 17 printf("balls\n"); 18 next = 4; 19 } 20 21 if (ready[next] != 0) { 22 pos = next; 23 my_sock = ready[pos]; 24 next++; 25 break; 26 } 27 } 28 29 pthread_mutex_unlock(&mutex); 30 /* handle recv/send */ 31 if (echo_recv(&conn_ps[my_sock], MULTIPLE) == 0) { /* closed conn */ 32 shutdown(my_sock, SHUT_RDWR); 33 close(my_sock); 34 serv_stats.active_connections--; 35 } 36 ready[pos] = 0; 37 /* print_conn_stats(&conn_ps[my_sock]);*/ 38 } 39 } 40 41 void *add_client_thread(void *param) { 42 struct epoll_accept_thread *eat = (struct epoll_accept_thread *)param; 43 struct sockaddr client; 44 struct epoll_event event; 45 socklen_t client_len; 46 int new_sock, ret; 47 char hostbuf[NI_MAXHOST], servbuf[NI_MAXSERV]; 48 49 bzero(&client, sizeof(struct sockaddr)); 50 event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET; 51 52 while ((new_sock = accept(eat->listen_sock, &client, &client_len)) != -1) { 53 set_nonblock(new_sock); 54 event.data.fd = new_sock; 55 if (epoll_ctl(eat->fd_epoll, EPOLL_CTL_ADD, new_sock, &event) == -1) { 56 perror("epoll_ctl"); 57 printf("%u\n", new_sock); 58 continue; 59 } 60 61 bzero(&(eat->conn_ps[new_sock]), sizeof(struct conn_params)); 62 eat->conn_ps[new_sock].sock = new_sock; 63 if ((ret = getnameinfo(&client, client_len, hostbuf, NI_MAXHOST, servbuf, NI_MAXSERV, NI_NUMERICHOST)) != 0) { 64 gai_strerror(ret); 65 } 66 67 update_server_stats(); 68 printf("added client\n"); 69 } 70 71 if (errno != EAGAIN) { 72 perror("Couldn't accept connection"); 73 } 74 75 pthread_exit(NULL); 76 } 77 78 int main(int argc, char **argv) { 79 char opt, *port = NULL; 80 struct addrinfo hints, *results, *p; 81 int listen_sock = new_tcp_sock(), nfds, i, ret; 82 int fd_epoll, next_avail = 4; 83 struct conn_params conn_ps[MAX_CONNS]; 84 struct epoll_event evs[MAX_CONNS]; 85 struct epoll_event event; 86 struct epoll_accept_thread eat; 87 pthread_t thread; 88 89 while ((opt = getopt(argc, argv, ":l:")) != -1) { 90 switch (opt) { 91 case 'l': /* port to listen on */ 92 port = optarg; 93 break; 94 case '?': /* unknown option */ 95 fprintf(stderr, "The option -%c is not supported.\n", opt); 96 exit(1); 97 case ':': /* required arg not supplied for option */ 98 fprintf(stderr, "The option -%c requires an argument.\n", opt); 99 exit(1); 100 } 101 } /* command line arg processing done */ 102 103 if (port == NULL) { 104 fprintf(stderr, "You must provide the port to listen on (-l).\n"); 105 exit(1); 106 } 107 108 signal(SIGINT, handle_interrupt); 109 110 bzero(&hints, sizeof(struct addrinfo)); 111 hints.ai_family = AF_INET; 112 hints.ai_socktype = SOCK_STREAM; 113 hints.ai_flags = AI_PASSIVE; 114 115 set_nonblock(listen_sock); 116 set_reuseaddr(listen_sock); 117 118 if ((ret = getaddrinfo(NULL, port, &hints, &results) != 0)) { 119 gai_strerror(ret); 120 exit(1); 121 } 122 123 for (p = results; p != NULL; p = p->ai_next) { /* attempt to connect to the host */ 124 if (bind(listen_sock, p->ai_addr, p->ai_addrlen) == -1) { 125 perror("Bind failed"); 126 } else { 127 break; 128 } 129 } 130 131 if (p == NULL) { /* we were unable to connect to anything */ 132 fprintf(stderr, "Unable to bind to the specified port. Exiting...\n"); 133 exit(1); 134 } 135 136 freeaddrinfo(results); 137 138 if (listen(listen_sock, 5) == -1) { 139 perror("Listen failed"); 140 exit(1); 141 } 142 143 /* everything is set up. method-specific code goes below */ 144 145 start_server_stats(); 146 next = 4; 147 148 if ((fd_epoll = epoll_create(MAX_CONNS)) == -1) { 149 perror("epoll_create"); 150 exit(1); 151 } 152 153 event.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLET; 154 event.data.fd = listen_sock; 155 if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, listen_sock, &event) == -1) { 156 perror("epoll_ctl"); 157 exit(1); 158 } 159 160 signal(SIGPIPE, SIG_IGN); 161 bzero(ready, MAX_CONNS * sizeof(uint16_t)); 162 pthread_mutex_init(&mutex, NULL); 163 164 for (i = 0; i < 5; i++) { /* five workers should be enough */ 165 pthread_create(&thread, NULL, worker_thread, (void *)&conn_ps); 166 } 167 168 while (1) { 169 if ((nfds = epoll_wait(fd_epoll, evs, MAX_CONNS, -1)) > 0 && errno == EINTR) { 170 continue; 171 } 172 for (i = 0; i < nfds; i++) { /* loop through all FDs */ 173 if (evs[i].events & (EPOLLERR | EPOLLHUP)) { /* if there's an error or a hangup */ 174 /*fprintf(stderr, "Error! Danger, Will Robinson! Danger!");*/ 175 close(evs[i].data.fd); 176 continue; 177 } else if (evs[i].data.fd == listen_sock) { /* we have a new connection coming in */ 178 eat.listen_sock = listen_sock; 179 eat.fd_epoll = fd_epoll; 180 eat.conn_ps = conn_ps; 181 pthread_create(&thread, NULL, add_client_thread, (void *)&eat); 182 } else { /* inbound data */ 183 while (ready[next_avail] != 0) { 184 next_avail++; 185 186 if (next_avail == MAX_CONNS) { 187 next_avail = 4; 188 } 189 } 190 ready[next_avail] = evs[i].data.fd; 191 } /* end inbound data */ 192 } /* end iterating through FDs */ 193 } /* end epoll_wait loop */ 194 195 perror("epoll_wait"); 196 197 return 0; 198 }
这里是echo_recv函数,因为我假设有人也想看到:
14 int echo_recv(struct conn_params *conn_p, int single) { 15 char client_buf[CLIENT_BUF_SIZE], buffer[BUF_SIZE]; 16 int nread, nwrite, nsent = 0, i; 17 18 while ((nread = recv(conn_p->sock, client_buf, CLIENT_BUF_SIZE, 0)) > 0) { 19 /* create buffer of MULTIPLIER(int) times what was received */ 20 for (i = 0; i < MULTIPLIER && nread*i < BUF_SIZE; i++) { 21 memcpy(buffer+(nread*i), client_buf, nread); 22 } 23 24 /* send the created buffer */ 25 while ((nwrite = send(conn_p->sock, buffer+nsent, (nread*MULTIPLIER)-nsent, 0)) > 0) { 26 nsent += nwrite; 27 } 28 29 conn_p->total_recvd += nread; /* update our stats for this conn */ 30 conn_p->total_sent += nsent; /* update our status for this conn */ 31 serv_stats.total_recvd += nread; 32 serv_stats.total_sent += nsent; 33 nsent = 0; 34 35 if (single) { 36 return 1; 37 } 38 } 39 40 if (nread == -1 && (errno & EAGAIN)) { 41 return 1; 42 } 43 44 if (nread == -1) { 45 perror("wtf?"); 46 } 47 48 shutdown(conn_p->sock, SHUT_RDWR); 49 close(conn_p->sock); 50 51 return 0; /* recv failed */ 52 }
这里有一些想法:
pthread_join
) epoll_accept_thread
结构 – 并且没有锁定它。 我会先解决所有的同步问题,然后可能会揭示其他问题。
我想在上面的评论中发表这篇文章,但是它比它允许的时间长得多:
尝试实现一个完全异步的基于epoll的简单服务器(步骤)
这应该消除您从线程中添加的任何可能导致问题的复杂性。 这将epoll移回到与select()
相同的域中,除了它通常快得多。 使用事件库的整个想法是知道什么时候可以读/写,而不是将套接字设置为非阻塞,并尝试读取/写入。
你也似乎从来没有检查从write()
返回值可能由于收到一个SIGPIPE(我知道你忽略了信号,但你仍然会得到一个EAGAIN / EINTR errno)失败。
我看到的另一件事是,你正在等待套接字准备就绪的线程内忙着循环。 当你在这种情况下使用select()
或者epoll
时候,你会被告知有新的东西,所以你不需要做一个忙碌的循环。
我不完全确定你正在尝试完成什么,但是你的代码是非常低效的。
在使用上述步骤实现一个简单的异步示例之后,你可以做的是启动多个工作线程,这些工作线程全部监听(使用epoll)在listener
/ accept
套接字上read
事件,并让每个线程处理各种连接我上面发布的)。