受到警告
开发环境:
RT-Thread版本:4.1.0
操作系统:Windows10
RT-Thread Studio版本:2.2.3
并发服务器支持多个客户端的同时连接,最大可接入的客户端数取决于内核控制块的个数。当使用Socket API时,要使服务器能够同时支持多个客户端的连接,必须引入多任务机制,为每个连接创建一个单独的任务来处理连接上的数据,多任务可以是多线程或者多进程,这是最常用的并发服务器设计。但是多线程/多进程消耗资源多,处理起来也比较复杂,本文将基于LWIP协议栈的Select/Poll机制实现并发服务器。
1 IO模型概述在具体讲解基于Select/Poll机制实现并发服务器之前,我们需要了解IO的相关概念,所谓IO就是,就是数据的读写,一般分为网络IO(本质就是socket读写)和磁盘IO。
IO模型大致可以分为:同步阻塞、同步非阻塞、异步、信号驱动。
可细分为5种I/O模型:
1)阻塞I/O,进程处于阻塞模式时,让出CPU,进入休眠状态;
2)非阻塞I/O,非阻塞模式的使用并不普遍,因为非阻塞模式会浪费大量的CPU资源;
3)I/O复用(select和poll),针对批量IP操作时,使用I/O多路复用,非常有好;
4)异步I/O(POSIX的aio_系列函数)
5)信号驱动I/O(SIGIO)一个输入操作通常包括两个不同的阶段:
1)等待数据准备好;
2)从内核向进程复制数据;
对于一个套接字的输入操作,第一步通常涉及等待数据从网络中到达。当所等待分组到达时,它被复制到内核中某个缓冲区。第二步就是把数据从内核缓冲区复制到应用进程缓冲区。
根据上述定义,我们前4种模型----阻塞I/O模型、非阻塞I/O模型、I/O复用模型和信号去驱动I/O模型都是同步I/O模型,因为其中真正的I/O操作(recvfrom)将阻塞进程。只有异步I/O模型与POSIX定义的异步I/O相匹配。
本文的要将的I/O复用,本质就是select/poll机制。因此,其他IO有兴趣可以去了解。
2 Select/Poll概述在LWIP中,如果要实现并发服务器,可以基于Sequentaial API来实现,这种方式需要使用多线程,也就是为每个连接创建一个线程来处理数据。而在资源受限的嵌入式设备来说,如果为每个连接都创建一个线程,这种资源的消耗是巨大的,因此,我们需要换一种实现思路,也就是使用IO多路复用的机制来实现,也就是select机制。
Select/Poll则是POSIX所规定,一般操作系统或协议栈均有实现。
值得注意的是,poll和select都是基于内核函数sys_poll实现的,不同在于在Linux系统中select是从BSDUnix系统继承而来,poll则是从SystemV Unix系统继承而来,因此两种方式相差不大。poll函数没有最大文件描述符数量的限制。poll和 select与一样,大量文件描述符的数组被整体复制于用户和内核的地址空间之间,开销随着文件描述符数量的增加而线性增大。
1.Select函数
在BSD Socket 中,select函数原型如下:
int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds,struct
timeval *timeout);
【参数说明】
l nfds:select监视的文件句柄数,一般设为要监视各文件中的最大文件描述符值加1。
l readfds:文件描述符集合监视文件集中的任何文件是否有数据可读,当select函数返回的时候,readfds将清除其中不可读的文件描述符,只留下可读的文件描述符。
l writefds:文件描述符集合监视文件集中的任何文件是否有数据可写,当select函数返回的时候,writefds将清除其中不可写的文件描述符,只留下可写的文件描述符。
l exceptfds:文件集将监视文件集中的任何文件是否发生错误,可用于其他的用途,例如,监视带外数据OOB,带外数据使用MSG_OOB标志发送到套接字上。当select函数返回的时候,exceptfds将清除其中的其他文件描述符,只留下标记有OOB数据的文件描述符。
l timeout 参数是一个指向 struct timeval 类型的指针,它可以使 select()在等待 timeout 时间后若没有文件描述符准备好则返回。其timeval结构用于指定这段时间的秒数和微秒数。它可以使select处于三种状态:
(1) 若将NULL以形参传入,即不传入时间结构,就是将select置于阻塞状态,一定等到监视文件描述符集合中某个文件描述符发生变化为止;
(2) 若将时间值设为0秒0毫秒,就变成一个纯粹的非阻塞函数,不管文件描述符是否有变化,都立刻返回继续执行,文件无变化返回0,有变化返回一个正值;
(3) timeout的值大于0,这就是等待的超时时间,即select在timeout时间内阻塞,超时时间之内有事件到来就返回了,否则在超时后不管怎样一定返回,返回值同上述。
timeval 结构体定义
struct timeval
{
int tv_sec;/* 秒 */
int tv_usec;/* 微妙 */
};
【返回值】
l int:若有就绪描述符返回其数目,若超时则为0,若出错则为-1
下列操作用来设置、清除、判断文件描述符集合。
FD_ZERO(fd_set*set);//清除一个文件描述符集。
FD_SET(intfd,fd_set *set);//将一个文件描述符加入文件描述符集中。
FD_CLR(intfd,fd_set *set);//将一个文件描述符从文件描述符集中清除。
FD_ISSET(intfd,fd_set *set);//判断文件描述符是否被置位
fd_set可以理解为一个集合,这个集合中存放的是文件描述符(file descriptor),即文件句柄。中间的三个参数指定我们要让内核测试读、写和异常条件的文件描述符集合。如果对某一个的条件不感兴趣,就可以把它设为空指针。
select()的机制中提供一种fd_set的数据结构,实际上是一个long类型的数组,每一个数组元素都能与打开的文件句柄(不管是Socket句柄,还是其他文件或命名管道或设备句柄)建立联系,建立联系的工作由程序员完成,当调用select()时,由内核根据IO状态修改fd_set的内容,由此来通知执行了select()的进程哪一Socket或文件可读。
2.Poll函数
poll的函数原型:
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
【参数说明】
l fds:fds是一个struct pollfd类型的数组,用于存放需要检测其状态的socket描述符,并且调用poll函数之后fds数组不会被清空;一个pollfd结构体表示一个被监视的文件描述符,通过传递fds指示 poll() 监视多个文件描述符。
struct pollfd原型如下:
typedef struct pollfd {
int fd; // 需要被检测或选择的文件描述符
short events; // 对文件描述符fd上感兴趣的事件
short revents; // 文件描述符fd上当前实际发生的事件
} pollfd_t;
其中,结构体的events域是监视该文件描述符的事件掩码,由用户来设置这个域,结构体的revents域是文件描述符的操作结果事件掩码,内核在调用返回时设置这个域。
l nfds:记录数组fds中描述符的总数量。
l timeout:指定等待的毫秒数,无论 I/O 是否准备好,poll() 都会返回,和select函数是类似的。
【返回值】
l int:函数返回fds集合中就绪的读、写,或出错的描述符数量,返回0表示超时,返回-1表示出错;
poll改变了文件描述符集合的描述方式,使用了pollfd结构而不是select的fd_set结构,使得poll支持的文件描述符集合限制远大于select的1024。这也是和select不同的地方。
3并发服务器实现3.1网络配置1.使能ETH
2.使能LWIP协议栈
默认配置就行。
3.使能网络接口
这样就可在命令行使用功能ping、ifconfig等功能。
4.使能套接字抽象层
然后保存就可以了。
3.2 Select实现并发服务器接下来将使用select/poll来实现并发服务器。这里以select为例。
select并发服务器模型:
socket(...); // 创建套接字
bind(...); // 绑定
listen(...); // 监听
while(1)
{
if(select(...) > 0) // 检测监听套接字是否可读
{
if(FD_ISSET(...)>0) // 套接字可读,证明有新客户端连接服务器
{
accpet(...);// 取出已经完成的连接
process(...);// 处理请求,反馈结果
}
}
close(...); // 关闭连接套接字:accept()返回的套接字
}
因此,基于select实现的并发服务器模型如下:
从流程上来看,使用select函数进行IO请求和同步阻塞模型没有太大的区别,甚至还多了添加监视socket,以及调用select函数的额外操作,效率更差。但是,使用select以后最大的优势是用户可以在一个线程内同时处理多个socket的IO请求。用户可以注册多个socket,然后不断地调用select读取被激活的socket,即可达到在同一个线程内同时处理多个IO请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。
Server:
/**
******************************************************************************
* @date 2022-05-30
* @blog ***
******************************************************************************
*/
#include
#include
#include
#include
#include
#defineSERVER_PORT 8888
#defineBUFF_SIZE 1024
static char recvbuff[BUFF_SIZE];
static void net_server_thread_entry(void *parameter)
{
int sfd, cfd, maxfd, i, nready, n;
struct sockaddr_in server_addr, client_addr;
struct netdev *netdev = RT_NULL;
char sendbuff[] = "Hello client!";
socklen_t client_addr_len;
fd_set all_set, read_set;
//FD_SETSIZE里面包含了服务器的fd
int clientfds[FD_SETSIZE - 1];
/* 通过名称获取 netdev 网卡对象 */
netdev = netdev_get_by_name((char*)parameter);
if (netdev == RT_NULL)
{
rt_kprintf("get network interface device(%s)failed.n", (char*)parameter);
}
//创建socket
if ((sfd = socket(AF_INET, SOCK_STREAM, 0))< 0)
{
rt_kprintf("Socket create failed.n");
}
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
//server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
/* 获取网卡对象中 IP 地址信息 */
server_addr.sin_addr.s_addr = netdev->ip_addr.addr;
//绑定socket
if (bind(sfd, (struct sockaddr *)&server_addr, sizeof(struct sockaddr)) < 0)
{
rt_kprintf("socket bind failed.n");
closesocket(sfd);
}
rt_kprintf("socket bind network interfacedevice(%s) success!n", netdev->name);
//监听socket
if(listen(sfd, 5) == -1)
{
rt_kprintf("listen error");
}
else
{
rt_kprintf("listening...n");
}
client_addr_len = sizeof(client_addr);
//初始化 maxfd 等于 sfd
maxfd = sfd;
//清空fdset
FD_ZERO(&all_set);
//把sfd文件描述符添加到集合中
FD_SET(sfd, &all_set);
//初始化客户端fd的集合
for(i = 0; i < FD_SETSIZE -1 ; i++)
{
//初始化为-1
clientfds = -1;
}
while(1)
{
//每次select返回之后,fd_set集合就会变化,再select时,就不能使用,
//所以我们要保存设置fd_set 和 读取的fd_set
read_set = all_set;
nready = select(maxfd + 1,&read_set, NULL, NULL, NULL);
//没有超时机制,不会返回0
if(nready < 0)
{
rt_kprintf("select error rn");
}
//判断监听的套接字是否有数据
if(FD_ISSET(sfd, &read_set))
{
//有客户端进行连接了
cfd = accept(sfd, (struct sockaddr *)&client_addr, &client_addr_len);
if(cfd < 0)
{
rt_kprintf("accept socketerrorrn");
//继续select
continue;
}
rt_kprintf("new client connect fd =%drn", cfd);
//把新的cfd 添加到fd_set集合中
FD_SET(cfd, &all_set);
//更新要select的maxfd
maxfd = (cfd > maxfd)?cfd:maxfd;
//把新的cfd 保存到cfds集合中
for(i = 0; i < FD_SETSIZE -1 ; i++)
{
if(clientfds == -1)
{
clientfds = cfd;
//退出,不需要添加
break;
}
}
//没有其他套接字需要处理:这里防止重复工作,就不去执行其他任务
if(--nready == 0)
{
//继续select
continue;
}
}
//遍历所有的客户端文件描述符
for(i = 0; i < FD_SETSIZE -1 ; i++)
{
if(clientfds == -1)
{
//继续遍历
continue;
}
//判断是否在fd_set集合里面
if(FD_ISSET(clientfds, &read_set))
{
n = recv(clientfds,recvbuff, sizeof(recvbuff), 0);
rt_kprintf("clientfd %d: %s rn",clientfds, recvbuff);
if(n <= 0)
{
//从集合里面清除
FD_CLR(clientfds,&all_set);
//当前的客户端fd 赋值为-1
clientfds = -1; }
else
{
//写回客户端
n = send(clientfds,sendbuff, strlen(sendbuff), 0);
if(n < 0)
{
//从集合里面清除
FD_CLR(clientfds,&all_set);
//当前的客户端fd 赋值为-1
clientfds = -1;
}
}
}
}
}
}
static int server(int argc, char **argv)
{
rt_err_t ret = RT_EOK;
if (argc != 2)
{
rt_kprintf("bind_test [netdev_name] --bind network interface device byname.n");
return -RT_ERROR;
}
/* 创建 serial 线程 */
rt_thread_t thread = rt_thread_create("server",
net_server_thread_entry,
argv[1],
4096,
10,
10);
/* 创建成功则启动线程 */
if (thread != RT_NULL)
{
rt_thread_startup(thread);
}
else
{
ret = RT_ERROR;
}
return ret;
}
#ifdefFINSH_USING_MSH
#include
MSH_CMD_EXPORT(server,network interface device test);
#endif /* FINSH_USING_MSH */
Client:【Linux版】
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define SERVPORT 8888
int main(int argc,char *argv[])
{
char sendbuf[] = "Client1 : HelloRtthread!";
char recvbuf[2014];
int sockfd,sendbytes;
struct sockaddr_in serv_addr;//需要连接的服务器地址信息
if (argc != 2)
{
perror("init error");
}
//1.创建socket
//AF_INET 表示IPV4
//SOCK_STREAM 表示TCP
if((sockfd = socket(AF_INET,SOCK_STREAM,0))< 0)
{
perror("socket");
exit(1);
}
//填充服务器地址信息
serv_addr.sin_family = AF_INET; //网络层的IP协议: IPV4
serv_addr.sin_port = htons(SERVPORT); //传输层的端口号
serv_addr.sin_addr.s_addr = inet_addr(argv[1]); //网络层的IP地址: 实际的服务器IP地址
bzero(&(serv_addr.sin_zero),8); //保留的8字节置零
//2.发起对服务器的连接信息
//三次握手,需要将sockaddr_in类型的数据结构强制转换为sockaddr
if((connect(sockfd,(struct sockaddr*)&serv_addr,sizeof(struct sockaddr))) < 0) {
perror("connect failed!");
exit(1);
}
printf("connect successful! n");
//3.发送消息给服务器端
while (1)
{
send(sockfd, sendbuf, strlen(sendbuf),0);
recv(sockfd, recvbuf, sizeof(recvbuf),0);
printf("Server: %s n", recvbuf);
sleep(2);
}
//4.关闭
close(sockfd);
}
Client:【RT-Thread版】
/**
******************************************************************************
* @file client.c
* @author BruceOu
* @version V1.0
* @date 2022-05-30
* @Official Accounts 嵌入式实验楼
* @brief 客户端
******************************************************************************
*/
#include
#include
#include
#include
#include
#define SERVER_HOST "192.168.101.8"
#define SERVER_PORT 8888
static int client(int argc, char**argv)
{
struct sockaddr_in client_addr;
struct sockaddr_in server_addr;
struct netdev *netdev = RT_NULL;
int sockfd = -1;
char sendbuf[] = "Hello RT-Thread!rn";
char recvbuf[2014];
if (argc != 2)
{
rt_kprintf("bind_test[netdev_name] --bind network interfacedevice by name.n");
return -RT_ERROR;
}
/* 通过名称获取 netdev 网卡对象 */
netdev = netdev_get_by_name(argv[1]);
if (netdev == RT_NULL)
{
rt_kprintf("get network interfacedevice(%s) failed.n", argv[1]);
return -RT_ERROR;
}
if ((sockfd = socket(AF_INET, SOCK_STREAM,0)) < 0)
{
rt_kprintf("Socket createfailed.n");
return -RT_ERROR;
}
/* 初始化需要绑定的客户端地址 */
client_addr.sin_family = AF_INET;
client_addr.sin_port = htons(8080);
/* 获取网卡对象中 IP 地址信息 */
client_addr.sin_addr.s_addr =netdev->ip_addr.addr;
rt_memset(&(client_addr.sin_zero), 0,sizeof(client_addr.sin_zero));
if (bind(sockfd, (struct sockaddr*)&client_addr, sizeof(struct sockaddr)) < 0)
{
rt_kprintf("socket bindfailed.n");
closesocket(sockfd);
return -RT_ERROR;
}
rt_kprintf("socket bind networkinterface device(%s) success!n", netdev->name);
/* 初始化预连接的服务端地址 */
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
server_addr.sin_addr.s_addr =inet_addr(SERVER_HOST);
rt_memset(&(server_addr.sin_zero), 0,sizeof(server_addr.sin_zero));
/* 连接到服务端 */
if (connect(sockfd, (struct sockaddr*)&server_addr, sizeof(struct sockaddr)) < 0)
{
rt_kprintf("socket connectfailed!n");
closesocket(sockfd);
return -RT_ERROR;
}
else
{
rt_kprintf("socket connectsuccess!n");
}
while (1)
{
send(sockfd, sendbuf, strlen(sendbuf),0);
recv(sockfd, recvbuf, sizeof(recvbuf),0);
fputs(recvbuf, stdout);
memset(recvbuf, 0, sizeof(recvbuf));
rt_thread_mdelay(500);
}
/* 关闭连接 */
closesocket(sockfd);
return RT_EOK;
}
#ifdef FINSH_USING_MSH
#include
MSH_CMD_EXPORT(client, networkinterface device test);
#endif /* FINSH_USING_MSH */
接下来就是验证了,现在CH32V307开发板上开启服务器。
Server:
然后开启客户端,笔者的客户端在Ubuntu上运行的:
Client:
笔者这里使用的客户端只有两个,有兴趣的也可以使用多个客户端。
当然啦,如果懒得写客户端,也可使用网络调试助手测试。
附:CH32V307配置Flash和RAMCH32V307默认Flash大小为256 KB,SRAM大小为64 KB。
但是在实际过程中SRAM大小不够用,因此就需要重新规划,根据应用手册,其Flash和SRAM的大小支持配置,具体配置项如下:
l 192 KB Flash +128 KB SRAM
l 224 KB Flash + 96KB SRAM
l 256 KB Flash + 54KB SRAM
l 288 KB Flash + 32KB SRAM
具体方法如下:
第一步:切换启动模式,BOOT0 = 1,BOOT1 = 0
第二步:通过WCHISPTool工具下载程序,这里需要根据需求选择相应的配置方案。
【注】如果烧写不成功,可以尝试点击‘解除保护’。
第三步:切换启动模式,BOOT0 = 0,BOOT1 = 0,复位即可运行程序。
CH32V307开发板的USB2.0接口以及BOOT如下图所示: