> 文章列表 > linux服务器实现百万并发遇到的问题以及解决思路

linux服务器实现百万并发遇到的问题以及解决思路

linux服务器实现百万并发遇到的问题以及解决思路

目录

  • 前言
  • 服务器与客户端的配置介绍
  • server 代码
  • client 代码
  • 遇到的问题
  • error too many open files
    • 为什么会出现这个问题
    • 解决这个问题的思路
  • killed(已杀死)
    • 为什么会出现这个问题
    • 解决这个问题的思路
  • 最终结果
  • 学到的经验教训

前言

在完成百万并发服务器的时候调试了大概五天,期间总会出现莫名其妙的问题导致连接断开,所以本文就这些问题与如何解决这些问题做一个总结。
本次实验完成的百万并发指的是单纯的连接数量,中间只是增加了必要的打印信息而已,并未增加具体的业务信息。

服务器与客户端的配置介绍

server(1台):ubuntu20.04 8G8核
client(3台):ubuntu16.04 4G2核

server 代码

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/time.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/poll.h>#include <errno.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>#include <pthread.h>#define SERVER_PORT		8080
#define SERVER_IP		"127.0.0.1"
#define MAX_BUFFER		64
#define MAX_EPOLLSIZE	100000
#define MAX_THREAD		80
#define MAX_PORT		100#define CPU_CORES_SIZE	8#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)static int ntySetNonblock(int fd) {int flags;flags = fcntl(fd, F_GETFL, 0);if (flags < 0) return flags;flags |= O_NONBLOCK;if (fcntl(fd, F_SETFL, flags) < 0) return -1;return 0;
}static int ntySetReUseAddr(int fd) {int reuse = 1;return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
}static int ntySetAlive(int fd) {int alive = 1;int idle = 60;int interval = 5;int count = 2;setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void*)&alive, sizeof(alive));setsockopt(fd, SOL_TCP, TCP_KEEPIDLE, (void*)&idle, sizeof(idle));setsockopt(fd, SOL_TCP, TCP_KEEPINTVL, (void*)&interval, sizeof(interval));setsockopt(fd, SOL_TCP, TCP_KEEPCNT, (void*)&count, sizeof(count));
}/** **** ******** **************** thread pool **************** ******** **** **/#define LL_ADD(item, list) { \\item->prev = NULL; \\item->next = list; \\list = item; \\
}#define LL_REMOVE(item, list) { \\if (item->prev != NULL) item->prev->next = item->next; \\if (item->next != NULL) item->next->prev = item->prev; \\if (list == item) list = item->next; \\item->prev = item->next = NULL; \\
}typedef struct worker {	pthread_t thread;	int terminate;	struct workqueue *workqueue;	struct worker *prev;	struct worker *next;
} worker_t;typedef struct job {	void (*job_function)(struct job *job);	void *user_data;	struct job *prev;	struct job *next;
} job_t;typedef struct workqueue {	struct worker *workers;	struct job *waiting_jobs;	pthread_mutex_t jobs_mutex;	pthread_cond_t jobs_cond;
} workqueue_t;static void *worker_function(void *ptr) {	worker_t *worker = (worker_t *)ptr;	job_t *job;	while (1) {			pthread_mutex_lock(&worker->workqueue->jobs_mutex);		while (worker->workqueue->waiting_jobs == NULL) {			if (worker->terminate) break;			pthread_cond_wait(&worker->workqueue->jobs_cond, &worker->workqueue->jobs_mutex);		}			if (worker->terminate) break;		job = worker->workqueue->waiting_jobs;		if (job != NULL) {			LL_REMOVE(job, worker->workqueue->waiting_jobs);		}		pthread_mutex_unlock(&worker->workqueue->jobs_mutex);		if (job == NULL) continue;	/* Execute the job. */		job->job_function(job);	}	free(worker);	pthread_exit(NULL);
}int workqueue_init(workqueue_t *workqueue, int numWorkers) {	int i;	worker_t *worker;	pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;	pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;	if (numWorkers < 1) numWorkers = 1;	memset(workqueue, 0, sizeof(*workqueue));	memcpy(&workqueue->jobs_mutex, &blank_mutex, sizeof(workqueue->jobs_mutex));	memcpy(&workqueue->jobs_cond, &blank_cond, sizeof(workqueue->jobs_cond));	for (i = 0; i < numWorkers; i++) {		if ((worker = malloc(sizeof(worker_t))) == NULL) {			perror("Failed to allocate all workers");			return 1;		}		memset(worker, 0, sizeof(*worker));		worker->workqueue = workqueue;		if (pthread_create(&worker->thread, NULL, worker_function, (void *)worker)) {			perror("Failed to start all worker threads");			free(worker);			return 1;		}		LL_ADD(worker, worker->workqueue->workers);	}	return 0;
}void workqueue_shutdown(workqueue_t *workqueue) {	worker_t *worker = NULL;		for (worker = workqueue->workers; worker != NULL; worker = worker->next) {		worker->terminate = 1;	}	pthread_mutex_lock(&workqueue->jobs_mutex);	workqueue->workers = NULL;	workqueue->waiting_jobs = NULL;	pthread_cond_broadcast(&workqueue->jobs_cond);	pthread_mutex_unlock(&workqueue->jobs_mutex);}void workqueue_add_job(workqueue_t *workqueue, job_t *job) {	pthread_mutex_lock(&workqueue->jobs_mutex);	LL_ADD(job, workqueue->waiting_jobs);	pthread_cond_signal(&workqueue->jobs_cond);	pthread_mutex_unlock(&workqueue->jobs_mutex);}static workqueue_t workqueue;
void threadpool_init(void) {workqueue_init(&workqueue, MAX_THREAD);
}/** **** ******** **************** thread pool **************** ******** **** **/typedef struct client {int fd;char rBuffer[MAX_BUFFER];int length;
} client_t;void *client_cb(void *arg) {int clientfd = *(int *)arg;char buffer[MAX_BUFFER] = {0};int childpid = getpid();while (1) {bzero(buffer, MAX_BUFFER);ssize_t length = recv(clientfd, buffer, MAX_BUFFER, 0); //bioif (length > 0) {//printf(" PID:%d --> buffer: %s\\n", childpid, buffer);int sLen = send(clientfd, buffer, length, 0);//printf(" PID:%d --> sLen: %d\\n", childpid, sLen);} else if (length == 0) {printf(" PID:%d client disconnect\\n", childpid);break;} else {printf(" PID:%d errno:%d\\n", childpid, errno);break;}}
}static int nRecv(int sockfd, void *data, size_t length, int *count) {int left_bytes;int read_bytes;int res;int ret_code;unsigned char *p;struct pollfd pollfds;pollfds.fd = sockfd;pollfds.events = ( POLLIN | POLLERR | POLLHUP );read_bytes = 0;ret_code = 0;p = (unsigned char *)data;left_bytes = length;while (left_bytes > 0) {read_bytes = recv(sockfd, p, left_bytes, 0);if (read_bytes > 0) {left_bytes -= read_bytes;p += read_bytes;continue;} else if (read_bytes < 0) {if (!(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {ret_code = (errno != 0 ? errno : EINTR);}} else {ret_code = ENOTCONN;break;}res = poll(&pollfds, 1, 5);if (pollfds.revents & POLLHUP) {ret_code = ENOTCONN;break;}if (res < 0) {if (errno == EINTR) {continue;}ret_code = (errno != 0 ? errno : EINTR);} else if (res == 0) {ret_code = ETIMEDOUT;break;}}if (count != NULL) {*count = length - left_bytes;}//printf("nRecv:%s, ret_code:%d, count:%d\\n", (char*)data, ret_code, *count);return ret_code;
}void epoll_et_loop(int sockfd)
{char buffer[MAX_BUFFER];int ret;while (1){memset(buffer, 0, MAX_BUFFER);ret = recv(sockfd, buffer, MAX_BUFFER, 0);if (ret == -1){if (errno == EAGAIN || errno == EWOULDBLOCK){printf(" read all data\\n");break;}close(sockfd);break;}else if (ret == 0){printf(" disconnect\\n");close(sockfd);break;}elseprintf("Recv:%s, %d Bytes\\n", buffer, ret);}}static int nSend(int sockfd, const void *buffer, int length, int flags) {int wrotelen = 0;int writeret = 0;unsigned char *p = (unsigned char *)buffer;struct pollfd pollfds = {0};pollfds.fd = sockfd;pollfds.events = ( POLLOUT | POLLERR | POLLHUP );do {int result = poll( &pollfds, 1, 5);if (pollfds.revents & POLLHUP) {printf(" ntySend errno:%d, revent:%x\\n", errno, pollfds.revents);return -1;}if (result < 0) {if (errno == EINTR) continue;printf(" ntySend errno:%d, result:%d\\n", errno, result);return -1;} else if (result == 0) {printf(" ntySend errno:%d, socket timeout \\n", errno);return -1;}writeret = send( sockfd, p + wrotelen, length - wrotelen, flags );if( writeret <= 0 ){break;}wrotelen += writeret ;} while (wrotelen < length);return wrotelen;
}static int curfds = 1;
static int nRun = 0;void client_job(job_t *job) {client_t *rClient = (client_t*)job->user_data;int clientfd = rClient->fd;char buffer[MAX_BUFFER];bzero(buffer, MAX_BUFFER);int length = 0;int ret = nRecv(clientfd, buffer, MAX_BUFFER, &length);if (length > 0) {	if (nRun || buffer[0] == 'a') {		printf(" TcpRecv --> curfds : %d, buffer: %s\\n", curfds, buffer);nSend(clientfd, buffer, strlen(buffer), 0);}} else if (ret == ENOTCONN) {curfds --;close(clientfd);} else {}free(rClient);free(job);
}void client_data_process(int clientfd) {char buffer[MAX_BUFFER];bzero(buffer, MAX_BUFFER);int length = 0;int ret = nRecv(clientfd, buffer, MAX_BUFFER, &length);if (length > 0) {	if (nRun || buffer[0] == 'a') {		printf(" TcpRecv --> curfds : %d, buffer: %s\\n", curfds, buffer);nSend(clientfd, buffer, strlen(buffer), 0);}} else if (ret == ENOTCONN) {curfds --;close(clientfd);} else {}}int listenfd(int fd, int *fds) {int i = 0;for (i = 0;i < MAX_PORT;i ++) {if (fd == *(fds+i)) return *(fds+i);}return 0;
}int main(void) {int i = 0;int sockfds[MAX_PORT] = {0};printf("C1000K Server Start\\n");threadpool_init(); //int epoll_fd = epoll_create(MAX_EPOLLSIZE); for (i = 0;i < MAX_PORT;i ++) {int sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd < 0) {perror("socket");return 1;}struct sockaddr_in addr;memset(&addr, 0, sizeof(struct sockaddr_in));addr.sin_family = AF_INET;addr.sin_port = htons(SERVER_PORT+i);addr.sin_addr.s_addr = INADDR_ANY;if (bind(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {perror("bind");return 2;}if (listen(sockfd, 5) < 0) {perror("listen");return 3;}sockfds[i] = sockfd;printf("C1000K Server Listen on Port:%d\\n", SERVER_PORT+i);struct epoll_event ev;ev.events = EPOLLIN | EPOLLET; //EPOLLLTev.data.fd = sockfd;epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);  }struct timeval tv_begin;gettimeofday(&tv_begin, NULL);struct epoll_event events[MAX_EPOLLSIZE];while (1) {int nfds = epoll_wait(epoll_fd, events, curfds, 5);  //是不是秘书给累死。if (nfds == -1) {perror("epoll_wait");break;}for (i = 0;i < nfds;i ++) {int sockfd = listenfd(events[i].data.fd, sockfds);if (sockfd) {struct sockaddr_in client_addr;memset(&client_addr, 0, sizeof(struct sockaddr_in));socklen_t client_len = sizeof(client_addr);int clientfd = accept(sockfd, (struct sockaddr*)&client_addr, &client_len);if (clientfd < 0) {perror("accept");return 4;}if (curfds ++ > 1000 * 1000) {nRun = 1;}
#if 0printf(" Client %d: %d.%d.%d.%d:%d \\n", curfds, *(unsigned char*)(&client_addr.sin_addr.s_addr), *((unsigned char*)(&client_addr.sin_addr.s_addr)+1),													*((unsigned char*)(&client_addr.sin_addr.s_addr)+2), *((unsigned char*)(&client_addr.sin_addr.s_addr)+3),													client_addr.sin_port);
#elif 0if(curfds % 1000 == 999) {	printf("connections: %d, fd: %d\\n", curfds, clientfd);			}
#elseif (curfds % 1000 == 999) {struct timeval tv_cur;memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));gettimeofday(&tv_begin, NULL);int time_used = TIME_SUB_MS(tv_begin, tv_cur);printf("connections: %d, sockfd:%d, time_used:%d\\n", curfds, clientfd, time_used);}
#endifntySetNonblock(clientfd);ntySetReUseAddr(clientfd);struct epoll_event ev;ev.events = EPOLLIN | EPOLLET | EPOLLOUT;ev.data.fd = clientfd;epoll_ctl(epoll_fd, EPOLL_CTL_ADD, clientfd, &ev);} else {int clientfd = events[i].data.fd;
#if 1if (nRun) {printf(" New Data is Comming\\n");client_data_process(clientfd);} else {client_t *rClient = (client_t*)malloc(sizeof(client_t));memset(rClient, 0, sizeof(client_t));				rClient->fd = clientfd;job_t *job = malloc(sizeof(job_t));job->job_function = client_job;job->user_data = rClient;workqueue_add_job(&workqueue, job);}
#elseclient_data_process(clientfd);#endif}}}
}

client 代码

#include <stdio.h>
#include <string.h>
#include <stdlib.h>#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <errno.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>#define MAX_BUFFER		128
#define MAX_EPOLLSIZE	(384*1024)
#define MAX_PORT		100#define TIME_SUB_MS(tv1, tv2)  ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)int isContinue = 0;static int ntySetNonblock(int fd) {int flags;flags = fcntl(fd, F_GETFL, 0);if (flags < 0) return flags;flags |= O_NONBLOCK;if (fcntl(fd, F_SETFL, flags) < 0) return -1;return 0;
}static int ntySetReUseAddr(int fd) {int reuse = 1;return setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(reuse));
}int main(int argc, char **argv) {if (argc <= 2) {printf("Usage: %s ip port\\n", argv[0]);exit(0);}const char *ip = argv[1];int port = atoi(argv[2]);int connections = 0;char buffer[128] = {0};int i = 0, index = 0;struct epoll_event events[MAX_EPOLLSIZE];int epoll_fd = epoll_create(MAX_EPOLLSIZE);strcpy(buffer, " Data From MulClient\\n");struct sockaddr_in addr;memset(&addr, 0, sizeof(struct sockaddr_in));addr.sin_family = AF_INET;addr.sin_addr.s_addr = inet_addr(ip);struct timeval tv_begin;gettimeofday(&tv_begin, NULL);while (1) {if (++index >= MAX_PORT) index = 0;struct epoll_event ev;int sockfd = 0;if (connections < 380000 && !isContinue) {sockfd = socket(AF_INET, SOCK_STREAM, 0);if (sockfd == -1) {perror("socket");goto err;}//ntySetReUseAddr(sockfd);addr.sin_port = htons(port+index);if (connect(sockfd, (struct sockaddr*)&addr, sizeof(struct sockaddr_in)) < 0) {perror("connect");goto err;}ntySetNonblock(sockfd);ntySetReUseAddr(sockfd);sprintf(buffer, "Hello Server: client --> %d\\n", connections);send(sockfd, buffer, strlen(buffer), 0);ev.data.fd = sockfd;ev.events = EPOLLIN | EPOLLOUT;epoll_ctl(epoll_fd, EPOLL_CTL_ADD, sockfd, &ev);connections ++;}//connections ++;if (connections % 1000 == 999 || connections >= 380000) {struct timeval tv_cur;memcpy(&tv_cur, &tv_begin, sizeof(struct timeval));gettimeofday(&tv_begin, NULL);int time_used = TIME_SUB_MS(tv_begin, tv_cur);printf("connections: %d, sockfd:%d, time_used:%d\\n", connections, sockfd, time_used);int nfds = epoll_wait(epoll_fd, events, connections, 100);for (i = 0;i < nfds;i ++) {int clientfd = events[i].data.fd;if (events[i].events & EPOLLOUT) {sprintf(buffer, "data from %d\\n", clientfd);send(sockfd, buffer, strlen(buffer), 0);} else if (events[i].events & EPOLLIN) {char rBuffer[MAX_BUFFER] = {0};				ssize_t length = recv(sockfd, rBuffer, MAX_BUFFER, 0);if (length > 0) {printf(" RecvBuffer:%s\\n", rBuffer);if (!strcmp(rBuffer, "quit")) {isContinue = 0;}} else if (length == 0) {printf(" Disconnect clientfd:%d\\n", clientfd);connections --;close(clientfd);} else {if (errno == EINTR) continue;printf(" Error clientfd:%d, errno:%d\\n", clientfd, errno);close(clientfd);}} else {printf(" clientfd:%d, errno:%d\\n", clientfd, errno);close(clientfd);}}}usleep(1 * 1000);}return 0;err:printf("error : %s\\n", strerror(errno));return 0;
}

遇到的问题

error too many open files

linux服务器实现百万并发遇到的问题以及解决思路

为什么会出现这个问题

因为是文件系统默认允许打开文件描述符数量个数(默认1024)的限制,使用ulimit -a查看open files的数量

ulimit -a

linux服务器实现百万并发遇到的问题以及解决思路
查看一个进程能够打开的文件描述符数量: ulimit -a

 ulimit -a	# 可以看到 open files 是1024,所以我们进行调大到 1048576

linux服务器实现百万并发遇到的问题以及解决思路

解决这个问题的思路

修改对应的可以打开的文件描述符的数量即可。

1.临时修改:ulimit -n 1048576
2.永久修改:按照下面的操作即可

vim /etc/security/limits.conf# 在该配置文件的最后添加如下内容即可
*               soft    nofile          1048576
*               hard    nofile          1048576
# esc+:wq保存文件#重启机器
reboot

killed(已杀死)

此时已经很接近了!!还是出现问题。
linux服务器实现百万并发遇到的问题以及解决思路

为什么会出现这个问题

此时是因为内存不够了。

解决这个问题的思路

修改net.ipv4.tcp_mem,net.ipv4.tcp_wmem,net.ipv4.tcp_rmem的大小即可。
server:

vim  vim /etc/sysctl.conf#移动到配置文件最末位的位置
net.ipv4.tcp_mem = 252144 524288 786432
net.ipv4.tcp_wmem = 512 512 1024
net.ipv4.tcp_rmem = 512 512 1024
fs.file-max = 1048576
net.nf_conntrack_max = 1048576#esc+ :wq保存#执行如下操作:
sudo modprobe ip_conntrack
sysctl -p

linux服务器实现百万并发遇到的问题以及解决思路

client:

vim  vim /etc/sysctl.conf#移动到配置文件最末位的位置
net.ipv4.tcp_mem = 252144 524288 786432
net.ipv4.tcp_wmem = 1024 1024 2048
net.ipv4.tcp_rmem = 1024 1024 2048
fs.file-max = 1048576
net.nf_conntrack_max = 1048576#esc+ :wq保存#执行如下操作:
sudo modprobe ip_conntrack
sysctl -p

linux服务器实现百万并发遇到的问题以及解决思路

最终结果

将以上配置好之后再次执行程序即可得到如下结果。
linux服务器实现百万并发遇到的问题以及解决思路
linux服务器实现百万并发遇到的问题以及解决思路

学到的经验教训

理论与实践有很大的差别,不要眼高手低,要多多调试,真真正正的将程序敲出来。

文章参考与<零声教育>的C/C++linux服务期高级架构系统教程学习: 链接