【问题标题】:Is select() + non-blocking write() possible on a blocking pipe or socket?select() + non-blocking write() 是否可以在阻塞管道或套接字上使用?
【发布时间】:2023-08-13 18:00:01
【问题描述】:

情况是我有一个 阻塞 管道或套接字 fd,我想 write() 不阻塞,所以我先做一个 select(),但这仍然不能保证write() 不会屏蔽。

这是我收集的数据。即使select() 表示 写入是可能的,写入超过PIPE_BUF 字节可以阻止。 但是,最多写入 PIPE_BUF 字节似乎不会阻塞 实践,但POSIX spec 没有强制要求。

这仅指定原子行为。 Python(!) documentation 声明:

select()poll() 或类似人员报告为准备写入的文件 此模块中的接口保证不会在写入时阻塞 到PIPE_BUF 字节。 POSIX 保证该值至少为 512.

在下面的测试程序中,设置BUF_BYTES100000 来阻止 成功选择后,write() 在 Linux、FreeBSD 或 Solaris 上。一世 假设命名管道与匿名管道具有相似的行为。

不幸的是,阻塞套接字也会发生同样的情况。称呼 test_socket()main() 中使用较大的BUF_BYTES100000 很好 这里也)。目前尚不清楚是否有安全的缓冲区大小,例如 PIPE_BUF 用于套接字。

#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <sys/types.h>
#include <limits.h>
#include <stdio.h>
#include <sys/select.h>
#include <unistd.h>

#define BUF_BYTES PIPE_BUF
char buf[BUF_BYTES];

int
probe_with_select(int nfds, fd_set *readfds, fd_set *writefds,
                  fd_set *exceptfds)
{
    struct timeval timeout = {0, 0};
    int n_found = select(nfds, readfds, writefds, exceptfds, &timeout);
    if (n_found == -1) {
        perror("select");
    }
    return n_found;
}

void
check_if_readable(int fd)
{
    fd_set fdset;
    FD_ZERO(&fdset);
    FD_SET(fd, &fdset);
    printf("select() for read on fd %d returned %d\n",
           fd, probe_with_select(fd + 1, &fdset, 0, 0));
}

void
check_if_writable(int fd)
{
    fd_set fdset;
    FD_ZERO(&fdset);
    FD_SET(fd, &fdset);
    int n_found = probe_with_select(fd + 1, 0, &fdset, 0);
    printf("select() for write on fd %d returned %d\n", fd, n_found);
    /* if (n_found == 0) { */
    /*     printf("sleeping\n"); */
    /*     sleep(2); */
    /*     int n_found = probe_with_select(fd + 1, 0, &fdset, 0); */
    /*     printf("retried select() for write on fd %d returned %d\n",  */
    /*            fd, n_found); */
    /* } */
}

void
test_pipe(void)
{
    int pipe_fds[2];
    size_t written;
    int i;
    if (pipe(pipe_fds)) {
        perror("pipe failed");
        _exit(1);
    }
    printf("read side pipe fd: %d\n", pipe_fds[0]);
    printf("write side pipe fd: %d\n", pipe_fds[1]);
    for (i = 0; ; i++) {
        printf("i = %d\n", i);
        check_if_readable(pipe_fds[0]);
        check_if_writable(pipe_fds[1]);
        written = write(pipe_fds[1], buf, BUF_BYTES);
        if (written == -1) {
            perror("write");
            _exit(-1);
        }
        printf("written %d bytes\n", written);
    }
}

void
serve()
{
    int listenfd = 0, connfd = 0;
    struct sockaddr_in serv_addr;

    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    memset(&serv_addr, '0', sizeof(serv_addr));

    serv_addr.sin_family = AF_INET;
    serv_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    serv_addr.sin_port = htons(5000);

    bind(listenfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));

    listen(listenfd, 10);

    connfd = accept(listenfd, (struct sockaddr*)NULL, NULL);

    sleep(10);
}

int
connect_to_server()
{
    int sockfd = 0, n = 0;
    struct sockaddr_in serv_addr;

    if((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        perror("socket");
        exit(-1);
    }

    memset(&serv_addr, '0', sizeof(serv_addr));

    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(5000);

    if(inet_pton(AF_INET, "127.0.0.1", &serv_addr.sin_addr) <= 0) {
        perror("inet_pton");
        exit(-1);
    }

    if (connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
        perror("connect");
        exit(-1);
    }

    return sockfd;
}

void
test_socket(void)
{
    if (fork() == 0)  {
        serve();
    } else {
        int fd;
        int i;
        int written;
        sleep(1);
        fd = connect_to_server();

        for (i = 0; ; i++) {
            printf("i = %d\n", i);
            check_if_readable(fd);
            check_if_writable(fd);
            written = write(fd, buf, BUF_BYTES);
            if (written == -1) {
                perror("write");
                _exit(-1);
            }
            printf("written %d bytes\n", written);
        }
    }
}

int
main(void)
{
    test_pipe();
    /* test_socket(); */
}

【问题讨论】:

  • 您是否有某些原因不想将管道/套接字设置为非阻塞模式并完成它? (如果你觉得有必要,你可以在 write() 返回后再次将其设置回阻塞模式)
  • 这是在语言运行时,所以改变阻塞对程序员来说真的很不友好。
  • 如果程序员永远看不到更改,则不会……如果每次您将其设置回原来的方式,在将控制权还给程序员之前,程序员永远不会知道。 (例外情况是,如果有多个线程同时使用同一个文件描述符,但是做这种事情的设计无论如何都注定要悲痛……;))
  • 我希望避免这种情况。由于 write() 中的阻塞由于我省略的原因而非常糟糕,因此 select+write 逻辑已经处于 per-fd 锁定之下,因此来回更改阻塞性似乎是可能的。但是,由于 fcntl() 调用更改的是 文件描述 而不是 fd,这可能会泄漏到其他进程。
  • @EJP,某处明显断开连接。在我的 linux 系统 fcntl(2) 上说“重复的文件描述符(使用 dup(2)、fcntl(F_DUPFD)、fork(2) 等)引用相同的打开文件描述,因此共享相同的文件状态标志。”

标签: sockets unix pipe posix blocking


【解决方案1】:

您引用的 Posix 部分明确指出:

[对于管道]如果 O_NONBLOCK 标志被清除,写入请求可能会导致线程阻塞,但在正常完成时它应该返回 nbyte。

[对于流,可能包括流式套接字] 如果 O_NONBLOCK 已清除,并且 STREAM 不能接受数据(由于内部流控制条件,STREAM 写入队列已满),write() 将阻塞,直到可以接受数据。

因此,您引用的 Python 文档仅适用于非阻塞模式。但是由于您没有使用 Python,因此无论如何它都没有相关性。

【讨论】:

  • 即使 posix 没有强制要求它描述的行为,python 文档也可能是正确的。
  • @melisgl 不,它不能。它所能做的就是调用 C API。如果 C API 阻塞。 Python 会阻塞。
  • 我并不是要暗示 Python 提供的不仅仅是对 select() 的简单包装。但是,符合 Posix 的实现仍可能提供超出规范要求的保证。这就是为什么引用规范,但要求提供更多信息。
  • 不,它不能。如果它调用write() 的数据量会阻塞,它将阻塞。而且它不能提前告诉你。
【解决方案2】:

除非您希望在 select() 表示 fd 已准备好写入时一次发送一个字节,否则实际上无法知道您将能够发送多少,即使这样在理论上也是可能的(至少在文档中,如果不是在现实世界中)选择说它已准备好写入,然后条件在 select() 和 write() 之间的时间发生变化。

非阻塞发送是这里的解决方案,如果您从使用 write() 更改为 send(),则无需将文件描述符更改为非阻塞模式即可以非阻塞形式发送一条消息。您唯一需要更改的是将 MSG_DONTWAIT 标志添加到发送调用中,这将使发送非阻塞而不更改套接字的属性。在这种情况下,您甚至根本不需要使用 select() ,因为 send() 调用将为您提供返回代码中所需的所有信息 - 如果您获得返回代码 -1 并且 errno 是 EAGAIN或 EWOULDBLOCK 那么你知道你不能再发送了。

【讨论】:

  • 好吧,只要能确定fd是否属于socket,应该是可以的。保持在 PIPE_BUF 限制以下是否可以解决管道问题?
  • @melisgl 您应该注意MSG_DONTWAIT on send() 是非Posix,可能并非所有平台都支持。
  • 无法保证通过将写入大小保持在 PIPE_BUF 下不会阻止写入管道。正如您正确指出的那样,这仅定义了保证以原子方式传递多少字节,并且如果低于此大小而不是您可以依赖的行为,则内核管道缓冲区足够大只是巧合。
  • MSG_DONTWAIT 是非 posix,但至少受 FreeBSD、Linux 和 Solaris 支持,但不支持 OSX。我还没有调查过其他哪些 BSD 支持它。
  • 谢谢。作为记录,我们最终将MSG_DONTWAIT 用于套接字,并且发送的管道不超过PIPE_BUF 字节(接受可能从我们下面拉出地毯的事实)。
【解决方案3】:

ckolivas 的答案是正确的,但是,在阅读了这篇文章后,我想我可以添加一些测试数据以供感兴趣。

我很快写了一个读取速度慢的 tcp 服务器(读取之间休眠 100 毫秒),它在每个周期读取 4KB。然后是一个快速编写客户端,我用它来测试各种写入场景。两者都在读取(服务器)或写入(客户端)之前使用选择。

这是在分配了 1GB 内存的 Windows 7 VM(VirtualBox)下运行的 Linux Mint 18 上。

对于阻塞情况

如果可以写入“特定数量的字节”,则选择返回,并且写入要么立即全部完成,要么阻塞直到完成。在我的系统上,这个“一定数量的字节”至少是 1MB。在 OP 的系统上,这显然要少得多(少于 100,000)。

因此,直到可以写入至少 1MB 的数据,select 才返回 。如果较小的写入随后会阻塞,则从未出现过(我看到的) select 会返回的情况。因此 select + write(x) 其中 x 是 4K 或 8K 或 128K 在此系统上永远不会被阻塞。

当然,这一切都很好,但这是一个具有 1GB 内存的未加载 VM。预计其他系统会有所不同。但是,我期望在选择之后发出的低于某个幻数(也许是 PIPE_BUF)的写入永远不会在所有符合 POSIX 的系统上阻塞。但是(再次)我没有看到任何关于这种效果的文档,所以不能依赖这种行为(即使 Python 文档清楚地说明了)。正如 OP 所说,目前尚不清楚是否有像 PIPE_BUF 用于套接字的安全缓冲区大小。可惜了。

这是 ckolivas 的帖子所说的,尽管我认为当只有一个字节可用时,没有理性的系统会从选择中返回!

额外信息:

在任何时候(在正常操作中),除了请求的全部数量(或错误)之外,写入没有返回任何内容。

如果服务器被终止(ctrl-c),客户端写入将立即返回一个值(通常小于请求 - 没有正常操作!),没有其他错误指示​​。下一个 select 调用将立即返回,随后的 write 将返回 -1 并带有 errno 说“对等方重置连接”。这是人们所期望的 - 这次尽可能多地写,下次失败。

这(和 EINTR)似乎是唯一一次 write 返回大于 0 但小于请求的数字。

如果服务器端正在读取并且客户端被杀死,则服务器继续读取所有可用数据,直到用完为止。然后它读取一个零并关闭套接字。

对于非阻塞情况:

低于某个魔法值的行为与上面相同。 select 返回,write 不会阻塞(当然),并且 write 完全完成。

我的问题是否则会发生什么。 send(2) 手册页说,在非阻塞模式下,发送失败并显示 EAGAIN 或 EWOULDBLOCK。这可能意味着(取决于你如何阅读)它是全有或全无。除了它还说 select 可用于确定何时可以发送更多数据。所以不可能全有或全无。

Write(与不带标志的发送相同)表示它可以返回少于请求的返回值。这种吹毛求疵似乎很迂腐,但手册页是福音,所以我照此阅读。

在测试中,值大于某个特定值的非阻塞写入返回的值小于请求值。这个值不是恒定的,它会随着写入的变化而变化,但它总是很大(> 1 到 2MB)。

【讨论】:

    最近更新 更多