找回密码
 立即注册
首页 业界区 安全 Linux 网络编程:从 Socket API 到极简 Redis 发布/订阅 ...

Linux 网络编程:从 Socket API 到极简 Redis 发布/订阅 sub/pub 服务的实现

港髁 6 天前
引言

本文旨在系统性地阐述 Linux 环境下的网络编程基础,重点关注 Socket 应用程序接口(API)的原理与应用。通过循序渐进的方式,结合具体的 C 语言代码示例,我们将剖析核心系统调用的机制,并最终构建一个基于传输控制协议(TCP)的简化版发布/订阅(Publish/Subscribe, Pub/Sub)服务器模型。
在进行深入探讨之前,本文假设读者已具备以下先验知识:

  • C 语言编程能力 :熟悉 C 语言的核心语法、指针操作及内存管理机制。
  • Linux 操作系统基础 :了解 Linux 基本命令行操作及 C 程序的编译流程(例如,使用 GCC 工具链)。
  • 计算机网络基础 :对 OSI 参考模型或 TCP/IP 协议栈有概念性认识,理解 TCP 与 UDP 的核心差异(面向连接与无连接、可靠性保证机制等),并掌握 IP 地址和端口号在网络通信中的作用。
网络编程的本质是实现进程间通信(Inter-Process Communication, IPC),特别是跨越主机边界的分布式通信。在 Linux 及其他类 Unix 操作系统中,Socket(套接字)接口是实现网络通信的标准范式。它提供了一套抽象的 API,允许应用程序将网络通信视为一种特殊的文件 I/O 操作,从而简化了网络数据收发的复杂性。
本文的实践目标是构建一个功能类似于 Redis 服务器 PUBLISH 和 SUBSCRIBE 命令的简化服务实例:

  • 服务器进程监听一个预定义的 TCP 端口。
  • 允许多个客户端并发连接至服务器。
  • 客户端通过发送特定格式的命令(如 SUB )订阅感兴趣的主题。
  • 客户端通过发送特定格式的命令(如 PUB  )向指定主题发布消息。
  • 服务器负责将发布的消息转发给所有订阅了对应主题的客户端(通常不包括发布者自身)。
接下来,我们将逐步解析相关的系统调用和编程技术。
Linux 上的 Socket API

在 Linux 操作系统的设计哲学中,“一切皆文件”是一个核心概念。网络连接在内核层面被抽象为一种特殊的文件类型,并通过文件描述符(File Descriptor)进行管理。socket() 系统调用是创建此类“网络文件”的入口点,用于在网络通信的参与方创建一个通信端点(Endpoint)。
socket() 系统调用详解

socket() 的主要功能是在内核中创建一个新的、未连接的套接字,并返回一个与之关联的文件描述符,供用户空间程序使用。其函数原型定义于  头文件中:
  1. #include <sys/types.h>
  2. #include <sys/socket.h>
  3. int socket(int domain, int type, int protocol);
复制代码
参数说明

  • domain (地址族 Address Family - AF): 指定套接字使用的协议族。常用取值包括:

    • AF_INET: 用于 IPv4 网络协议通信。本文主要采用此地址族。
    • AF_INET6: 用于 IPv6 网络协议通信。
    • AF_UNIX (或 AF_LOCAL): 用于同一主机内部的进程间通信,依赖本地文件系统路径而非网络地址。

  • type (套接字类型 Socket Type): 定义套接字的通信语义。关键取值有:

    • SOCK_STREAM: 提供面向连接、可靠的、基于字节流的传输服务。TCP 协议即为此类型,保证数据传输的顺序性和可靠性(通过序列号、确认、重传等机制)。
    • SOCK_DGRAM: 提供无连接、不可靠的数据报服务。UDP 协议属此类,数据包传输可能发生丢失、重复或乱序。

  • protocol (协议 Protocol): 通常设置为 0,表示由系统根据指定的 domain 和 type 自动选择默认协议。例如,AF_INET 与 SOCK_STREAM 组合通常默认选用 IPPROTO_TCP;AF_INET 与 SOCK_DGRAM 组合则默认选用 IPPROTO_UDP。亦可显式指定协议常量(如 IPPROTO_TCP)。
内核操作与数据结构
当应用程序调用 socket() 时,会触发一次系统调用,进入内核态执行:

  • 资源分配 :内核网络协议栈为新的套接字分配必要的内存资源,创建一个内部的 struct socket 或类似的核心数据结构。此结构包含了套接字的状态信息(如初始状态、类型、协议族)、发送和接收缓冲区、指向特定协议(如 TCP、UDP)处理函数的指针、以及等待队列等。
  • 文件描述符关联 :内核在当前进程的文件描述符表中找到一个未使用的条目,并将该条目指向一个代表该套接字的内核文件对象(struct file)。这个文件描述符(一个非负整数)是用户空间程序操作该套接字的句柄。
  • 返回 :系统调用返回新分配的文件描述符给应用程序。若创建失败(如资源不足、权限问题),则返回 -1,并设置全局变量 errno 以指示具体错误代码。
返回值

  • 成功:返回一个新的文件描述符(非负整数)。
  • 失败:返回 -1,并设置 errno。
示例:创建 IPv4 TCP 套接字
  1. #include <stdio.h>      // 标准输入输出
  2. #include <stdlib.h>     // 标准库函数,如 exit
  3. #include <sys/socket.h> // 套接字核心函数和数据结构
  4. #include <errno.h>      // 错误码 errno
  5. #include <unistd.h>     // close 函数
  6. int main() {
  7.     int sockfd;
  8.     // 创建一个用于 IPv4 TCP 通信的套接字
  9.     // AF_INET: 指定使用 IPv4 协议族
  10.     // SOCK_STREAM: 指定使用面向连接的字节流服务 (TCP)
  11.     // 0: 让内核自动选择合适的协议 (对于 AF_INET 和 SOCK_STREAM,通常是 IPPROTO_TCP)
  12.     sockfd = socket(AF_INET, SOCK_STREAM, 0);
  13.     // 检查 socket() 调用是否成功
  14.     if (sockfd == -1) {
  15.         perror("socket 创建失败"); // perror 会根据当前的 errno 值打印错误信息
  16.         exit(EXIT_FAILURE);       // 异常退出程序
  17.     }
  18.     printf("套接字创建成功! 文件描述符: %d\n", sockfd);
  19.     // 在实际应用中,套接字使用完毕后应显式关闭
  20.     // close(sockfd);
  21.     return 0;
  22. }
复制代码
编译与执行
  1. gcc create_socket_example.c -o create_socket_example
  2. ./create_socket_example
  3. 套接字创建成功! 文件描述符: 4
复制代码
输出将提示套接字创建成功,并显示其文件描述符。
sockaddr_in 结构体与地址表示

仅创建套接字不足以进行通信,服务器端需要将其绑定到具体的本地网络地址(IP 地址和端口号)。在 IPv4 环境下,此地址信息通过 struct sockaddr_in 结构体(定义于 )来承载:
  1. #include <netinet/in.h>
  2. struct sockaddr_in {
  3.     sa_family_t    sin_family; /* 地址族: 固定为 AF_INET */
  4.     in_port_t      sin_port;   /* 端口号 (网络字节序) */
  5.     struct in_addr sin_addr;   /* IPv4 地址 (网络字节序) */
  6.     // char           sin_zero[8]; /* 填充字节,通常不直接使用,应清零 */
  7. };
  8. /* IPv4 地址结构 */
  9. struct in_addr {
  10.     uint32_t       s_addr;     /* 32位IPv4地址 (网络字节序) */
  11. };
复制代码
关键字段解析

  • sin_family : 地址族,对于 IPv4,必须设置为 AF_INET,与 socket() 调用中的 domain 参数保持一致。
  • sin_port : 端口号。 注意 :该字段必须存储为 网络字节序 (Network Byte Order,即大端序 Big-Endian)。应用程序需使用 htons() (Host to Network Short) 函数将主机字节序(Host Byte Order)的端口号转换为网络字节序。例如,htons(8080)。
  • sin_addr.s_addr : 32位 IPv4 地址。 注意 : 同样必须是 网络字节序

    • 服务器端执行 bind() 时,若希望监听本机所有可用的网络接口,应将此字段设置为 htonl(INADDR_ANY)。INADDR_ANY 是一个特殊常量(通常为 0),htonl() (Host to Network Long) 用于将其转换为主机字节序到网络字节序。
    • 若需绑定到特定 IP 地址(如服务器仅监听某块网卡,或客户端执行 connect() 时指定目标服务器 IP),可使用 inet_pton() (Presentation to Network) 函数将点分十进制表示的 IP 地址字符串(如 "192.168.1.100")转换为网络字节序的 32 位整数,并存入 s_addr。

字节序转换函数 (通常定义于 ):

  • htons(): 主机字节序到网络字节序(16位,用于端口号)。
  • htonl(): 主机字节序到网络字节序(32位,用于 IPv4 地址)。
  • ntohs(): 网络字节序到主机字节序(16位)。
  • ntohl(): 网络字节序到主机字节序(32位)。
  • inet_pton(AF_INET, "ip_string", &addr_struct->sin_addr): 将点分十进制 IP 字符串转换为网络字节序二进制形式。
  • inet_ntop(AF_INET, &addr_struct->sin_addr, buffer, buffer_size): 将网络字节序二进制 IP 地址转换为点分十进制字符串。
bind(), listen(), accept() - 服务器端核心调用

对于 TCP 服务器而言,创建套接字后,必须执行一系列步骤来准备接收客户端连接:

  • bind() : 将套接字与一个本地 IP 地址和端口号关联起来,定义服务的监听地址。
  • listen() : 将套接字设置为监听模式,使其能够接受外来的连接请求,并配置连接请求队列。
  • accept() : 从已完成三次握手的连接队列中接受一个连接,并为此连接创建一个新的专用套接字。
1. bind() 系统调用

bind() 用于将 socket() 创建的套接字文件描述符 sockfd 与 my_addr 指定的本地地址(IP 和端口)进行绑定。
  1. #include <sys/socket.h>
  2. int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);
复制代码

  • sockfd: socket() 返回的文件描述符。
  • addr: 指向包含待绑定地址信息的 sockaddr 结构体的指针。对于 IPv4,实际传递的是已填充好的 struct sockaddr_in 结构体的地址,需强制类型转换为 (struct sockaddr *)。
  • addrlen: addr 指向的结构体的大小,对于 struct sockaddr_in,通常为 sizeof(struct sockaddr_in)。
内核操作
调用 bind() 进入内核态后:

  • 地址复制与校验 :内核将用户空间传入的 sockaddr 结构体复制到内核内存。
  • 状态检查 :检查 sockfd 对应的套接字是否有效且未被绑定。
  • 地址可用性检查 :检查指定的 IP 地址和端口号是否可用。对于端口号,检查是否已被其他套接字绑定(除非设置了 SO_REUSEADDR 等选项);对于 IP 地址,检查是否是分配给本机的有效地址(或 INADDR_ANY)。
  • 权限检查 :检查进程是否有权限绑定到指定端口(通常,绑定到 1024 以下的端口需要超级用户权限)。
  • 绑定操作 :如果所有检查通过,内核将该地址信息与内部的套接字结构关联起来。
返回值 :成功返回 0;失败返回 -1,并设置 errno。常见错误包括 EADDRINUSE(地址已在使用)、EACCES(权限不足)、EINVAL(sockfd 无效或已绑定)。
示例:绑定套接字到本地 8080 端口
  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>       // 用于 memset
  4. #include <sys/socket.h>
  5. #include <netinet/in.h>
  6. #include     // 用于 htons, htonl
  7. #include <unistd.h>       // 用于 close
  8. #define PORT 8080
  9. int main() {
  10.     int server_fd;
  11.     struct sockaddr_in address;
  12.     int opt = 1; // 用于 setsockopt
  13.     // 创建套接字
  14.     if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
  15.         perror("socket 创建失败");
  16.         exit(EXIT_FAILURE);
  17.     }
  18.     // 可选: 设置套接字选项,允许地址重用,便于服务器快速重启
  19.     // SO_REUSEADDR 允许重用本地地址 (IP+端口),尤其是在 TIME_WAIT 状态下的端口
  20.     // SO_REUSEPORT (需要内核支持) 允许多个进程绑定到同一 IP 和端口
  21.     if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
  22.         perror("setsockopt 失败");
  23.         close(server_fd);
  24.         exit(EXIT_FAILURE);
  25.     }
  26.     // 准备 sockaddr_in 结构体
  27.     memset(&address, 0, sizeof(address)); // 推荐清零结构体
  28.     address.sin_family = AF_INET;
  29.     address.sin_addr.s_addr = htonl(INADDR_ANY); // 监听所有网络接口
  30.     address.sin_port = htons(PORT);              // 监听指定端口 (转换为网络字节序)
  31.     // 将套接字绑定到指定的地址和端口
  32.     if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
  33.         perror("bind 失败");
  34.         close(server_fd);
  35.         exit(EXIT_FAILURE);
  36.     }
  37.     printf("套接字成功绑定到端口 %d\n", PORT);
  38.     // ... 后续步骤: listen() 和 accept() ...
  39.     close(server_fd); // 完成后关闭监听套接字
  40.     return 0;
  41. }
复制代码
2. listen() 系统调用

listen() 用于将一个已绑定的流式套接字(SOCK_STREAM 或 SOCK_SEQPACKET)转换为被动监听状态,使其能够接受传入的连接请求。
  1. #include <sys/socket.h>
  2. int listen(int sockfd, int backlog);
复制代码

  • sockfd: 已成功 bind() 的套接字文件描述符。
  • backlog: 指定内核为此监听套接字维护的 已完成连接队列 (Completed Connection Queue, 或称 Accept Queue)的最大长度。
内核操作与连接队列
当调用 listen() 时:

  • 状态转换 :内核将 sockfd 对应的内部套接字结构的状态从 CLOSED(或 BOUND)修改为 LISTEN。
  • 队列初始化 :内核为该监听套接字关联并初始化两个重要的队列:

    • 未完成连接队列 (Incomplete Connection Queue / SYN Queue) :存储收到的 SYN 包,但尚未完成三次握手的连接请求(处于 SYN_RCVD 状态)。此队列的大小通常由系统参数(如 net.ipv4.tcp_max_syn_backlog)控制,backlog 参数对其影响有限或间接。
    • 已完成连接队列 (Completed Connection Queue / Accept Queue) :存储已经完成 TCP 三次握手,等待被应用程序通过 accept() 提取的连接(这些连接在内核中已是 ESTABLISHED 状态,但从服务器监听角度看是在等待 accept)。backlog 参数主要限制的是这个队列的大小。当此队列满时,内核可能会拒绝新的已完成握手的连接(例如,不响应 ACK,或发送 RST)。

返回值 :成功返回 0;失败返回 -1,并设置 errno。
示例(续上)
  1.     // ... bind() 成功后 ...
  2.     // 开始监听传入连接
  3.     // backlog 设置为 10,意味着最多允许 10 个已完成三次握手的连接在队列中等待 accept()
  4.     if (listen(server_fd, 10) < 0) {
  5.         perror("listen 失败");
  6.         close(server_fd);
  7.         exit(EXIT_FAILURE);
  8.     }
  9.     printf("服务器正在端口 %d 上监听...\n", PORT);
  10.     // ... 下一步: accept() ...
复制代码
3. accept() 系统调用

accept() 从监听套接字 sockfd 的已完成连接队列中取出一个连接请求,为该连接创建一个 新的 、已连接的套接字,并返回这个新套接字的文件描述符。
  1. #include <sys/socket.h>
  2. int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);
复制代码

  • sockfd: 处于监听状态的套接字文件描述符。
  • addr: (可选)指向 sockaddr 结构体的指针,用于接收发起连接的客户端的地址信息。若不关心客户端地址,可传递 NULL。
  • addrlen: (可选)指向 socklen_t 类型变量的指针。这是一个 值-结果 参数:

    • 调用前,*addrlen 必须初始化为 addr 指向的缓冲区的实际大小。
    • 调用成功后,内核会将客户端地址结构的实际大小写入 *addrlen。若传入 addr 为 NULL,则 addrlen 也应为 NULL。

内核操作

  • 检查队列 :内核检查与 sockfd 关联的已完成连接队列。
  • 阻塞/非阻塞行为

    • 如果队列为空且 sockfd 是阻塞模式(默认),accept() 调用将使进程 睡眠 ,直到队列中出现新的已完成连接。
    • 如果队列为空且 sockfd 是非阻塞模式,accept() 立即返回 -1,并将 errno 设置为 EAGAIN 或 EWOULDBLOCK。

  • 提取连接 :如果队列非空,内核从中取出一个连接。
  • 创建新套接字 :内核为这个被接受的连接创建一个 全新的 内部套接字结构和对应的文件对象。这个新套接字的状态被设为 ESTABLISHED,并关联了客户端的地址信息。
  • 返回新描述符 :内核在进程的文件描述符表中分配一个新的文件描述符,指向这个新创建的已连接套接字的文件对象,并将此描述符返回给应用程序。
  • 填充地址信息 :如果 addr 和 addrlen 参数有效,内核将客户端的地址信息复制到 addr 指向的缓冲区,并更新 *addrlen。
关键点 :accept() 返回的是一个 新的文件描述符 ,代表与特定客户端的通信通道。后续与该客户端的数据收发(send(), recv() 等)必须使用这个 新描述符 ,而非原来的监听描述符 sockfd。监听描述符 sockfd 保持不变,继续用于接受后续的连接请求。
返回值

  • 成功:返回一个新的、非负的已连接套接字文件描述符。
  • 失败:返回 -1,并设置 errno。
示例(续上,接受单个连接)
  1.     // ... listen() 成功后 ...
  2.     int new_socket;
  3.     struct sockaddr_in client_address;
  4.     socklen_t addrlen = sizeof(client_address); // 注意类型是 socklen_t
  5.     char client_ip[INET_ADDRSTRLEN]; // 用于存储客户端 IP 字符串
  6.     printf("等待连接...\n");
  7.     // 接受一个传入连接
  8.     // 默认情况下,accept() 会阻塞,直到有客户端连接进来
  9.     if ((new_socket = accept(server_fd, (struct sockaddr *)&client_address, &addrlen)) < 0) {
  10.         perror("accept 失败");
  11.         close(server_fd);
  12.         exit(EXIT_FAILURE);
  13.     }
  14.     // 将客户端的 IP 地址从网络字节序转换为点分十进制字符串以便打印
  15.     inet_ntop(AF_INET, &client_address.sin_addr, client_ip, INET_ADDRSTRLEN);
  16.     printf("接受来自 %s:%d 的连接\n", client_ip, ntohs(client_address.sin_port));
  17.     printf("用于通信的新套接字描述符: %d\n", new_socket);
  18.     // 现在可以使用 new_socket 与该客户端进行数据交换
  19.     // 例如: send(new_socket, "欢迎!\n", 7, 0);
  20.     // 例如: recv(new_socket, buffer, 1024, 0);
  21.     // 完成与该客户端的通信后,关闭连接套接字
  22.     close(new_socket);
  23.     // 服务器最终关闭时,关闭监听套接字
  24.     close(server_fd);
复制代码
上述示例仅能处理一次连接。为实现并发处理多个客户端,需引入循环结构,并结合多进程/多线程模型或 I/O 多路复用技术。对于需要高效处理大量并发连接的场景(如我们的 Pub/Sub 服务器),I/O 多路复用是更常用的方案。
select() - I/O 多路复用机制

服务器程序通常需要同时关注多个事件源:监听套接字上的新连接请求,以及多个已连接客户端套接字上的数据到达。若使用阻塞式的 accept() 和 recv(),程序执行流会在单一调用点暂停,无法及时响应其他事件。I/O 多路复用技术解决了这个问题,它允许进程同时监视多个文件描述符,并在其中任何一个或多个描述符准备好进行 I/O 操作(可读、可写或异常)时获得通知。select() 是 POSIX 标准中定义的一种经典 I/O 多路复用机制。
  1. #include <sys/select.h>
  2. #include <sys/time.h> // 对于 struct timeval
  3. int select(int nfds, fd_set *readfds, fd_set *writefds,
  4.            fd_set *exceptfds, struct timeval *timeout);
复制代码

  • nfds: 被监听的文件描述符的数量,其值应为所有被监听文件描述符中的最大值 加 1 。例如,若监听 fd 3, 5, 8,则 nfds 应为 9。
  • readfds: 指向 fd_set 结构体的指针,用于指定需要监听 可读 事件的文件描述符集合。对于监听套接字,可读意味着有新连接待 accept();对于已连接套接字,可读意味着有数据到达、连接已关闭(收到 FIN)或发生错误。
  • writefds: 指向 fd_set 的指针,用于指定需要监听 可写 事件的文件描述符集合。通常表示套接字的发送缓冲区有可用空间。
  • exceptfds: 指向 fd_set 的指针,用于指定需要监听 异常 条件的文件描述符集合(如 TCP 带外数据)。
  • timeout: 指向 struct timeval 结构体的指针,用于设定 select() 的最大等待时间。

    • struct timeval { time_t tv_sec; suseconds_t tv_usec; }; (秒和微秒)
    • 若 timeout 为 NULL,select() 将无限期阻塞,直到至少有一个描述符就绪。
    • 若 timeout 指向的结构体中 tv_sec 和 tv_usec 均为 0,select() 执行非阻塞检查,立即返回。
    • 若 timeout 指向的结构体包含正值,select() 最多等待指定时间。超时前有描述符就绪则返回;超时则返回 0。

fd_set 相关宏定义

  • FD_ZERO(fd_set *set): 清空(初始化)一个 fd_set 集合。 每次调用 select() 前,对工作集合必须执行此操作或从主集合复制
  • FD_SET(int fd, fd_set *set): 将文件描述符 fd 添加到集合 set 中。
  • FD_CLR(int fd, fd_set *set): 将文件描述符 fd 从集合 set 中移除。
  • FD_ISSET(int fd, fd_set *set):  在 select() 返回后 ,用于检查文件描述符 fd 是否仍在就绪集合 set 中。
重要特性 :select() 调用会 修改 传入的 fd_set 集合(readfds, writefds, exceptfds),将其中未就绪的文件描述符移除。因此,应用程序通常需要维护一个 主集合 (master set)记录所有需要关心的文件描述符,在每次循环调用 select() 之前,将主集合的内容 复制 到一个 工作集合 (working set),然后将工作集合传递给 select()。
返回值

  • 成功:返回三个集合中总共就绪的文件描述符数量。
  • 超时:返回 0。
  • 失败:返回 -1,并设置 errno。
使用 select() 的服务器模式

  • 初始化监听套接字 (socket, bind, listen)。
  • 初始化主文件描述符集合 master_fds:FD_ZERO(&master_fds),然后 FD_SET(listener_fd, &master_fds)。
  • 维护当前最大文件描述符 max_fd,初始值为 listener_fd。
  • 进入主事件循环:
    a.  创建临时工作集合 read_fds,将其初始化为主集合:read_fds = master_fds。
    b.  调用 select(max_fd + 1, &read_fds, NULL, NULL, NULL)(此处示例仅关心读事件,阻塞等待)。
    c.  检查 select 返回值。若为 -1,处理错误(需注意 EINTR 信号中断)。
    d.  遍历从 0 到 max_fd 的所有文件描述符 i:
    i.  使用 FD_ISSET(i, &read_fds) 判断描述符 i 是否在就绪的读集合中。
    ii. 若 i 是监听套接字 (listener_fd) 且 FD_ISSET 为真:
    * 调用 accept() 接受新连接,得到 new_fd。
    * 将 new_fd 添加到主集合 master_fds:FD_SET(new_fd, &master_fds)。
    * 更新 max_fd:if (new_fd > max_fd) max_fd = new_fd;。
    iii. 若 i 是已连接客户端套接字且 FD_ISSET 为真:
    * 调用 recv(i, buffer, ...) 或 read(i, buffer, ...) 尝试读取数据。
    * 处理连接状态
    * 返回值 > 0 : 成功读取 valread 字节数据。处理接收到的数据(解析命令 SUB/PUB 等)。
    * 返回值 0 : 表示对端执行了有序关闭(发送了 FIN 包),所有数据已接收完毕。这是 TCP 半关闭 状态的体现(对方关闭了写,本方可以继续写,但通常也应准备关闭)。应用程序应:
    * close(i) 关闭本端的套接字连接。
    * 从 master_fds 中移除 i:FD_CLR(i, &master_fds)。
    * 清理与该客户端相关的应用层资源(如订阅信息)。
    * 返回值 -1 : 发生错误。检查 errno:
    * 若 errno 为 EAGAIN 或 EWOULDBLOCK(非阻塞模式下),表示暂时无数据可读,不是错误。
    * 若 errno 为 ECONNRESET,表示对端发送了 RST 包(连接异常中断)。
    * 其他错误(如 ETIMEDOUT, ENOTCONN 等)。
    * 对于实际错误,同样需要 close(i),从 master_fds 移除 i,并清理资源。
关于 shutdown() 和半关闭

recv/read 返回 0 表明对端关闭了其发送通道。如果本端应用还想发送数据,理论上可以(只要对端接收缓冲区未满且未完全关闭连接),但这通常不符合应用逻辑。更常见的是,接收到 0 后本端也应关闭连接。
有时应用可能需要主动进行 半关闭 ,即关闭自己的发送通道,但仍保持接收通道打开,以接收对端可能还未发送完的数据。这可以通过 shutdown() 系统调用实现:
  1. #include <sys/socket.h>
  2. int shutdown(int sockfd, int how);
复制代码

  • sockfd: 要操作的套接字。
  • how: 指定关闭方式:

    • SHUT_RD: 关闭接收通道(之后不能再从此套接字接收数据)。
    • SHUT_WR: 关闭发送通道(之后不能再从此套接字发送数据)。这会向对端发送一个 FIN 包。
    • SHUT_RDWR: 同时关闭接收和发送通道(等同于 close() 的一部分效果,但不释放文件描述符)。

使用 shutdown(sockfd, SHUT_WR) 后,对端 recv 会在接收完所有已在途的数据后返回 0。而本端仍可调用 recv 接收数据,直到对端也关闭其发送通道(发送 FIN)。
close() 调用则会同时关闭读写两个方向(如果引用计数为零,还会释放文件描述符和相关内核资源)。
在我们的简单 Pub/Sub 服务器中,我们直接使用 close() 处理连接终止,这隐含了双向关闭。
极简 Pub/Sub 服务器 C 代码实现

以下是使用 select() 实现的极简 Pub/Sub 服务器的 C 代码,注释已更新为中文。请注意,此实现为了教学目的保持简洁,省略了许多生产环境中必要的健壮性设计(如完善的错误处理、动态资源管理、优化的 max_fd 更新策略等)。
  1. #include <stdio.h>
  2. #include <stdlib.h>
  3. #include <string.h>
  4. #include <unistd.h>
  5. #include <sys/socket.h>
  6. #include <netinet/in.h>
  7. #include
  8. #include <sys/select.h>
  9. #include <errno.h>
  10. #define PORT 8080               // 服务器监听端口
  11. #define MAX_CLIENTS 30          // 最大并发客户端数量
  12. #define MAX_TOPICS 10           // 最大主题数量
  13. #define MAX_SUBS_PER_TOPIC 10   // 每个主题最大订阅者数量
  14. #define BUFFER_SIZE 1024        // 接收缓冲区大小
  15. // 主题订阅数据结构
  16. typedef struct {
  17.     char name[50];                       // 主题名称
  18.     int subscribers[MAX_SUBS_PER_TOPIC]; // 订阅该主题的客户端套接字描述符数组
  19.     int sub_count;                       // 当前订阅者数量
  20. } Topic;
  21. Topic topics[MAX_TOPICS]; // 全局主题数组
  22. int topic_count = 0;      // 当前主题数量
  23. // 辅助函数:查找或创建主题
  24. int find_or_create_topic(const char* topic_name) {
  25.     // 查找现有主题
  26.     for (int i = 0; i < topic_count; ++i) {
  27.         if (strcmp(topics[i].name, topic_name) == 0) {
  28.             return i; // 找到,返回索引
  29.         }
  30.     }
  31.     // 如果未找到且还有空间,则创建新主题
  32.     if (topic_count < MAX_TOPICS) {
  33.         strncpy(topics[topic_count].name, topic_name, sizeof(topics[topic_count].name) - 1);
  34.         topics[topic_count].name[sizeof(topics[topic_count].name) - 1] = '\0'; // 确保空字符结尾
  35.         topics[topic_count].sub_count = 0; // 初始化订阅者数量为0
  36.         return topic_count++; // 返回新主题的索引,并增加主题计数
  37.     }
  38.     return -1; // 主题数组已满,无法创建
  39. }
  40. // 辅助函数:将客户端添加到主题的订阅列表
  41. void add_subscriber(int topic_index, int client_socket) {
  42.     if (topic_index < 0 || topic_index >= topic_count) return; // 无效的主题索引
  43.     Topic* topic = &topics[topic_index];
  44.     if (topic->sub_count < MAX_SUBS_PER_TOPIC) {
  45.         // 检查是否已订阅,避免重复添加
  46.         for(int i = 0; i < topic->sub_count; ++i) {
  47.             if (topic->subscribers[i] == client_socket) return; // 已存在,直接返回
  48.         }
  49.         // 添加新的订阅者
  50.         topic->subscribers[topic->sub_count++] = client_socket;
  51.         printf("客户端 %d 订阅了主题 '%s'\n", client_socket, topic->name);
  52.     } else {
  53.         // 主题订阅已满
  54.         printf("主题 '%s' 订阅已满, 无法添加客户端 %d\n", topic->name, client_socket);
  55.         // 可选: 向客户端发送错误消息
  56.         // send(client_socket, "ERR topic full\n", 15, 0);
  57.     }
  58. }
  59. // 辅助函数:从所有主题中移除指定客户端的订阅
  60. void remove_subscriber(int client_socket) {
  61.     printf("正在移除客户端 %d 的所有订阅。\n", client_socket);
  62.     for (int i = 0; i < topic_count; ++i) { // 遍历所有主题
  63.         int found_idx = -1;
  64.         // 在当前主题的订阅者列表中查找该客户端
  65.         for (int j = 0; j < topics[i].sub_count; ++j) {
  66.             if (topics[i].subscribers[j] == client_socket) {
  67.                 found_idx = j;
  68.                 break;
  69.             }
  70.         }
  71.         // 如果找到该客户端
  72.         if (found_idx != -1) {
  73.             // 将后续订阅者向前移动一位,覆盖掉要移除的客户端
  74.             for (int k = found_idx; k < topics[i].sub_count - 1; ++k) {
  75.                 topics[i].subscribers[k] = topics[i].subscribers[k + 1];
  76.             }
  77.             topics[i].sub_count--; // 减少订阅者计数
  78.             printf("已将客户端 %d 从主题 '%s' 中移除\n", client_socket, topics[i].name);
  79.         }
  80.     }
  81. }
  82. // 辅助函数:向指定主题发布消息
  83. void publish_message(int topic_index, const char* message, int publisher_socket) {
  84.     if (topic_index < 0 || topic_index >= topic_count) return; // 无效的主题索引
  85.     Topic* topic = &topics[topic_index];
  86.     char full_message[BUFFER_SIZE + 100]; // 预留足够空间构造 "MSG <topic> <data>\n" 格式的消息
  87.     // 构造完整的消息格式
  88.     snprintf(full_message, sizeof(full_message), "MSG %s %s", topic->name, message);
  89.      // 确保消息以换行符结束 (如果原始消息没有的话)
  90.     if (full_message[strlen(full_message)-1] != '\n') {
  91.          strncat(full_message, "\n", sizeof(full_message) - strlen(full_message) - 1);
  92.     }
  93.     printf("向主题 '%s' 发布消息: %s", topic->name, message); // 假设原始 message 可能已包含换行符
  94.     // 遍历该主题的所有订阅者
  95.     for (int i = 0; i < topic->sub_count; ++i) {
  96.         int subscriber_socket = topic->subscribers[i];
  97.         // 不将消息发回给发布者自己
  98.         if (subscriber_socket != publisher_socket) {
  99.             // 发送消息给订阅者
  100.             if (send(subscriber_socket, full_message, strlen(full_message), 0) < 0) {
  101.                 // 发送失败,可能需要处理连接断开等问题
  102.                 perror("向订阅者发送消息失败");
  103.                 // 注意: 健壮的实现应在此处检测到错误后,可能也需要关闭并清理该订阅者的连接
  104.             } else {
  105.                  printf("  -> 已发送至客户端 %d\n", subscriber_socket);
  106.             }
  107.         }
  108.     }
  109. }
  110. int main() {
  111.     int listener_fd;                   // 监听套接字描述符
  112.     int new_socket;                    // 新接受的客户端连接套接字描述符
  113.     int client_sockets[MAX_CLIENTS];   // 存储客户端套接字描述符的数组
  114.     int max_clients = MAX_CLIENTS;     // 最大客户端数 (冗余变量,可直接用宏)
  115.     int activity;                      // select() 的返回值
  116.     int i, valread, sd;                // 循环变量, read() 返回值, 当前处理的套接字描述符
  117.     int max_sd;                        // select() 需要的最大文件描述符值 + 1
  118.     struct sockaddr_in address;        // 服务器地址结构
  119.     char buffer[BUFFER_SIZE + 1];      // 数据接收缓冲区 (+1 用于空字符结尾)
  120.     // 文件描述符集合
  121.     fd_set readfds;
  122.     // 初始化客户端套接字数组,全部置 0 (0 不是有效的文件描述符)
  123.     for (i = 0; i < max_clients; i++) {
  124.         client_sockets[i] = 0;
  125.     }
  126.     // 创建主监听套接字
  127.     if ((listener_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
  128.         perror("socket 创建失败");
  129.         exit(EXIT_FAILURE);
  130.     }
  131.     // 设置监听套接字选项,允许地址重用
  132.     int opt = 1;
  133.     if (setsockopt(listener_fd, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)) < 0) {
  134.         perror("setsockopt 失败");
  135.         exit(EXIT_FAILURE);
  136.     }
  137.     // 配置服务器地址结构
  138.     address.sin_family = AF_INET;         // IPv4
  139.     address.sin_addr.s_addr = INADDR_ANY; // 监听所有接口
  140.     address.sin_port = htons(PORT);       // 指定端口(网络字节序)
  141.     // 将监听套接字绑定到指定地址和端口
  142.     if (bind(listener_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
  143.         perror("bind 失败");
  144.         exit(EXIT_FAILURE);
  145.     }
  146.     printf("监听器已启动在端口 %d \n", PORT);
  147.     // 使套接字进入监听状态,设定等待队列长度为 3
  148.     if (listen(listener_fd, 3) < 0) {
  149.         perror("listen 失败");
  150.         exit(EXIT_FAILURE);
  151.     }
  152.     // 等待客户端连接
  153.     socklen_t addrlen = sizeof(address);
  154.     puts("等待连接中 ...");
  155.     while (1) { // 服务器主循环
  156.         // 清空读文件描述符集合
  157.         FD_ZERO(&readfds);
  158.         // 将监听套接字加入集合
  159.         FD_SET(listener_fd, &readfds);
  160.         max_sd = listener_fd; // 初始化 max_sd
  161.         // 将所有活动的客户端套接字加入集合
  162.         for (i = 0; i < max_clients; i++) {
  163.             sd = client_sockets[i]; // 获取客户端套接字描述符
  164.             // 如果是有效的套接字描述符 (大于 0),则加入读集合
  165.             if (sd > 0)
  166.                 FD_SET(sd, &readfds);
  167.             // 更新 max_sd 以跟踪最大的文件描述符值
  168.             if (sd > max_sd)
  169.                 max_sd = sd;
  170.         }
  171.         // 调用 select() 等待活动发生,timeout 设置为 NULL 表示无限期阻塞
  172.         activity = select(max_sd + 1, &readfds, NULL, NULL, NULL);
  173.         // 检查 select() 是否出错 (忽略 EINTR 信号中断)
  174.         if ((activity < 0) && (errno != EINTR)) {
  175.             perror("select 错误");
  176.             // 在实际应用中可能需要更复杂的错误处理逻辑
  177.         }
  178.         // 检查监听套接字是否就绪 (表示有新连接请求)
  179.         if (FD_ISSET(listener_fd, &readfds)) {
  180.             // 接受新连接
  181.             if ((new_socket = accept(listener_fd, (struct sockaddr *)&address, &addrlen)) < 0) {
  182.                 perror("accept 失败");
  183.                 // 严重错误,通常需要退出或重启服务
  184.                 exit(EXIT_FAILURE);
  185.             }
  186.             // 获取并打印新客户端的 IP 和端口信息
  187.              char client_ip[INET_ADDRSTRLEN];
  188.              inet_ntop(AF_INET, &address.sin_addr, client_ip, INET_ADDRSTRLEN);
  189.              printf("新连接建立,套接字描述符为 %d,IP 为: %s,端口为: %d\n",
  190.                    new_socket, client_ip, ntohs(address.sin_port));
  191.             // 将新套接字添加到客户端数组中
  192.             for (i = 0; i < max_clients; i++) {
  193.                 // 找到数组中第一个空位
  194.                 if (client_sockets[i] == 0) {
  195.                     client_sockets[i] = new_socket;
  196.                     printf("已将套接字 %d 添加到列表位置 %d\n", new_socket, i);
  197.                     break; // 添加成功后退出循环
  198.                 }
  199.             }
  200.             // 如果客户端数组已满
  201.              if (i == max_clients) {
  202.                 printf("客户端连接数已达上限,拒绝连接。\n");
  203.                 close(new_socket); // 关闭这个无法处理的连接
  204.             }
  205.         }
  206.         // 检查各个客户端套接字是否有活动 (数据到达或连接关闭)
  207.         for (i = 0; i < max_clients; i++) {
  208.             sd = client_sockets[i]; // 当前要检查的客户端套接字
  209.             // 如果当前客户端套接字在就绪的读集合中
  210.             if (FD_ISSET(sd, &readfds)) {
  211.                 // 尝试读取数据
  212.                 // read() 返回值: >0 表示读取的字节数, 0 表示连接已关闭 (收到FIN), <0 表示错误
  213.                 if ((valread = read(sd, buffer, BUFFER_SIZE)) <= 0) {
  214.                     // 连接关闭或发生错误
  215.                     if (valread == 0) {
  216.                         // 对端正常关闭连接 (FIN)
  217.                         getpeername(sd, (struct sockaddr*)&address, &addrlen); // 获取对端地址信息用于打印
  218.                         char client_ip[INET_ADDRSTRLEN];
  219.                         inet_ntop(AF_INET, &address.sin_addr, client_ip, INET_ADDRSTRLEN);
  220.                         printf("主机 %s:%d (套接字 %d) 断开连接\n", client_ip, ntohs(address.sin_port), sd);
  221.                     } else {
  222.                         // 读取错误
  223.                         perror("read 错误");
  224.                         // 也可以在这里获取对端信息打印
  225.                     }
  226.                     // 关闭套接字
  227.                     close(sd);
  228.                     // 从客户端数组中移除 (标记为 0)
  229.                     client_sockets[i] = 0;
  230.                     // 从所有主题订阅中移除该客户端
  231.                     remove_subscriber(sd);
  232.                 } else {
  233.                     // 成功读取到数据,处理客户端发送的命令
  234.                     buffer[valread] = '\0'; // 添加空字符结尾,确保字符串处理安全
  235.                     // 简单的命令解析 (生产环境需要更健壮的解析器)
  236.                     char command[5];       // 存储命令 (SUB/PUB)
  237.                     char topic_name[50];   // 存储主题名
  238.                     char message[BUFFER_SIZE]; // 存储消息内容 (PUB 命令)
  239.                     memset(message, 0, sizeof(message)); // 清空消息缓冲区
  240.                     // 使用 sscanf 解析输入,格式为 "CMD TOPIC [MESSAGE]"
  241.                     // %4s: 读取最多4个字符作为命令
  242.                     // %49s: 读取最多49个字符作为主题名 (防止溢出)
  243.                     // %[^\n]: 读取从主题名后到行尾的所有字符作为消息 (包括空格)
  244.                     int parsed_items = sscanf(buffer, "%4s %49s %[^\n]", command, topic_name, message);
  245.                     printf("收到来自客户端 %d 的数据: %s", sd, buffer); // buffer 可能自带换行符
  246.                     if (parsed_items >= 2 && strcmp(command, "SUB") == 0) {
  247.                         // 处理 SUB 命令
  248.                         int topic_idx = find_or_create_topic(topic_name);
  249.                         if (topic_idx != -1) {
  250.                             add_subscriber(topic_idx, sd);
  251.                             // 可选: 向客户端发送订阅成功确认
  252.                             // send(sd, "SUB OK\n", 7, 0);
  253.                         } else {
  254.                             // 可选: 发送错误(如主题数达到上限)
  255.                             // send(sd, "ERR max topics\n", 15, 0);
  256.                         }
  257.                     } else if (parsed_items >= 3 && strcmp(command, "PUB") == 0) {
  258.                          // 处理 PUB 命令
  259.                          int topic_idx = find_or_create_topic(topic_name); // 查找主题,如果不存在,简单实现可以忽略或报错
  260.                           if (topic_idx != -1) {
  261.                              // 确保消息格式包含换行符,便于客户端接收处理
  262.                              // 注意: 这里假设 message 已包含从 sscanf 读取的内容
  263.                              // 这里对 message 的处理可以省略,因为 publish_message 内部会确保换行符
  264.                              publish_message(topic_idx, message, sd);
  265.                          } else {
  266.                               printf("未找到用于发布的主题 '%s'.\n", topic_name);
  267.                              // 可选: 发送错误(主题不存在)
  268.                              // send(sd, "ERR no such topic\n", 18, 0);
  269.                          }
  270.                     } else {
  271.                         // 无效命令格式
  272.                         printf("来自客户端 %d 的无效命令: %s", sd, buffer);
  273.                         // 可选: 发送错误
  274.                         // send(sd, "ERR invalid command\n", 20, 0);
  275.                     }
  276.                     // 清理缓冲区以便下次读取 (可选,因为 read 会覆盖)
  277.                     memset(buffer, 0, BUFFER_SIZE + 1);
  278.                 }
  279.             }
  280.         }
  281.     }
  282.     // 理论上,服务器的无限循环不会到达这里,但在正常关闭流程中应关闭监听套接字
  283.     close(listener_fd);
  284.     return 0;
  285. }
复制代码
编译
  1. gcc pubsub_server_revised.c -o pubsub_server_revised
复制代码
运行
  1. ./pubsub_server_revised
复制代码
  1. (base) ➜  back-end-notes git:(master) ✗ nc localhost 8080
  2. ^C
  3. (base) ➜  back-end-notes git:(master) ✗ nc localhost 8080
  4. SUB TOPIC-1
  5. MSG TOPIC-1 1,242.42,214\n
复制代码
  1. (base) ➜  back-end-notes git:(master) ✗ nc localhost 8080
  2. PUB TOPIC-1 1,242.42,214\n
复制代码
  1. piperliu@go-x86:~/code/socket$ ./pubsub_server_revised
  2. 监听器已启动在端口 8080
  3. 等待连接中 ...
  4. 新连接建立,套接字描述符为 5,IP 为: 127.0.0.1,端口为: 38910
  5. 已将套接字 5 添加到列表位置 0
  6. 主机 127.0.0.1:38910 (套接字 5) 断开连接
  7. 正在移除客户端 5 的所有订阅。
  8. 新连接建立,套接字描述符为 5,IP 为: 127.0.0.1,端口为: 38924
  9. 已将套接字 5 添加到列表位置 0
  10. 收到来自客户端 5 的数据: SUB TOPIC-1
  11. 客户端 5 订阅了主题 'TOPIC-1'
  12. 新连接建立,套接字描述符为 6,IP 为: 127.0.0.1,端口为: 58630
  13. 已将套接字 6 添加到列表位置 1
  14. 收到来自客户端 6 的数据: PUB TOPIC-1 1,242.42,214\n
  15. 向主题 'TOPIC-1' 发布消息: 1,242.42,214\n  -> 已发送至客户端 5
复制代码
订阅与发布流程的内核视角梳理

结合内核操作,我们重新梳理客户端订阅和发布时服务器端的处理流程:

  • 服务器初始化与监听


  • socket():内核创建 socket 结构,分配资源,返回文件描述符 listener_fd。
  • bind():内核校验地址、端口可用性及权限,将本地地址信息关联到 listener_fd 的 socket 结构。
  • listen():内核将 listener_fd 对应的 socket 结构状态置为 LISTEN,并初始化 SYN 队列和 Accept 队列。

  • 客户端连接建立


  • 客户端发起 TCP 连接请求(发送 SYN 包)。
  • 服务器内核网络协议栈收到 SYN,创建半连接条目放入 SYN 队列,回复 SYN-ACK。
  • 客户端回复 ACK,完成三次握手。内核将对应的连接从 SYN 队列移至 Accept 队列。
  • 服务器进程调用 select(),select 检测到 listener_fd 可读(因为 Accept 队列非空)。
  • FD_ISSET(listener_fd, ...) 为真。
  • 服务器进程调用 accept()。内核从 Accept 队列取出一个连接,创建新的 socket 结构和文件描述符 new_socket(状态为 ESTABLISHED),返回给服务器进程。

  • 客户端订阅 (SUB)


  • 客户端通过 new_socket 发送 "SUB ..." 数据。
  • 数据到达服务器,内核将其放入 new_socket 的接收缓冲区。
  • 服务器进程调用 select(),select 检测到 new_socket 可读(接收缓冲区有数据)。
  • FD_ISSET(new_socket, ...) 为真。
  • 服务器进程调用 read(new_socket, ...),内核从接收缓冲区复制数据到用户空间 buffer。
  • 服务器进程解析 buffer,识别订阅请求,更新应用层数据结构(topics 数组)。

  • 客户端发布 (PUB)


  • 客户端通过 new_socket 发送 "UB ..." 数据。
  • 数据到达服务器,内核处理同上,select 报告 new_socket 可读。
  • 服务器进程调用 read() 获取数据并解析。
  • 服务器进程查找主题,遍历订阅者列表。
  • 对于每个订阅者 sub_socket(且 sub_socket != new_socket):

    • 服务器进程调用 send(sub_socket, ...)。内核将数据复制到 sub_socket 的发送缓冲区。
    • 内核网络协议栈负责将发送缓冲区的数据打包成 TCP 段并发送出去。


  • 客户端断开连接 (有序关闭)


  • 客户端调用 close() 或 shutdown(SHUT_WR),其内核发送 FIN 包。
  • 服务器内核收到 FIN,将 new_socket 标记为收到 FIN,并向客户端回复 ACK。连接进入 CLOSE_WAIT 状态。
  • 服务器进程调用 select(),select 检测到 new_socket 可读(因为收到 FIN 也是可读事件)。
  • FD_ISSET(new_socket, ...) 为真。
  • 服务器进程调用 read(new_socket, ...),read 返回 0。
  • 服务器进程识别出连接关闭,调用 close(new_socket)。内核发送 FIN 包给客户端(如果尚未发送),释放套接字资源(当引用计数为0时),从 master_fds 移除 new_socket,清理应用层资源。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册