【问题标题】:Asynchronous I/O reading from a file从文件中读取异步 I/O
【发布时间】:2017-02-22 07:15:14
【问题描述】:

我最近获得了多个项目的想法,这些想法都涉及从文件中读取 IP 地址。由于它们都应该能够处理大量主机,因此我尝试实现多线程或创建套接字池并从中选择(),以实现某种形式的并发以获得更好的性能.在很多情况下,从文件中读取似乎是提高性能的瓶颈。我理解它的方式,从具有 fgets 或类似功能的文件中读取是一种同步的阻塞操作。所以即使我成功实现了一个异步连接到多个主机的客户端,操作仍然是同步的,因为我一次只能从文件中读取一个地址。

    /* partially pseudo code */

/* getaddrinfo() stuff here */

while(fgets(ip, sizeof(ip), file) {
FD_ZERO(&readfds);
/* create n sockets here in a for loop */
for (i = 0; i < socket_num; i++) {
    if (newfd > fd[i]) newfd = fd[i];
    FD_SET(fd[i], &readfds);
}

/* here's where I think I should connect n sockets to n addresses from file
 * but I'm only getting one IP at a time from file, so I'm not sure how to connect to 
 * n addresses at once with fgets
 */

for (j = 0; j < socket_num; j++) {
        if ((connect(socket, ai->ai_addr, ai->ai_addrlen)) == -1)
        // error
        else { 
            freeaddrinfo(ai);       
        FD_SET(socket, &master);
            fdmax = socket;
            if (select(socket+1, &master, NULL, NULL, &tv) == -1);
        // error        
            if ((recvd = read(socket, banner, RECVD)) <= 0)
        // error
            if (FD_ISSET(socket, &master))
        // print success
        }
    /* clear sets and close sockets and stuff */
}

我已经指出了我与 cmets 的问题,但只是为了澄清一下:我不确定如何在从文件读取的多个目标服务器上执行异步 I/O 操作,因为从文件中读取条目似乎是严格同步的.我在多线程方面遇到过类似的问题,但成功程度略高。

    void *function_passed_to_pthread_create(void *opts) 
    { 
        while(fgets(ip_addr, sizeof(ip_addr), opts->file) {
            /* speak to ip_addr and get response */
    }
}

main()
{
    /* necessary stuff */
    for (i = 0; i < thread_num; i++) {
        pthread_create(&tasks, NULL, above_function, opts)
    }
    for (j = 0; j < thread_num; j++)
        /* join threads */
    return 0;
}

这似乎可行,但由于多个线程都在处理同一个文件,因此结果并不总是准确的。我想这是因为多个线程可能同时处理来自文件的相同地址。

我考虑过将文件中的所有条目加载到数组/内存中,但如果文件特别大,我想这可能会导致内存问题。最重要的是,我不确定这样做是否有意义。

作为最后一点;如果我正在读取的文件恰好是具有大量 IP 的特别大的文件,那么我认为这两种解决方案都不能很好地扩展。不过,使用 C 一切皆有可能,所以我想有一些方法可以实现我希望的目标。

总结这篇文章;在从文件中读取条目时,我想找到一种方法来使用异步 I/O 或多线程来提高客户端应用程序的性能。

【问题讨论】:

  • 我感受到了你的痛苦,但是有很多方法可以读取文件。我知道,最快的方法是使用本机 API 函数(即 ReadFile)顺序读取文件。然后您可以将处理分拆到不同的线程等。
  • 这对于读取具有多个线程读取this 的文件是没有用的。你应该有一个经理来阅读文件并给每个线程一行文件。
  • 在典型的 Linux 系统上,打开描述符(文件和套接字)的最大数量的硬限制通常为 65536。(软限制,即默认值,通常要低得多,例如 1024。 ) 即使假设使用了极长的主机名和服务名(用于连接,而不是端口号),每个客户端通常仍需要少于 100 个字符。 65536×100 只有大约 6.5 兆字节。读取和标记它(到主机和端口字符串对)只需要几分之一秒。 这无关紧要。
  • 假设您只需要 IPv6(或 IPv4)地址。 struct sockaddr_storage 通常为 128 字节。因此,如果您从文件中读取每个主机和端口对,并使用线程池将它们解析为使用 getnameinfo() 的可连接套接字(但立即关闭每个连接,以便稍后进行实际连接),则每个需要大约 144 个字节客户。那时即使是一百万个客户端也只有 144 兆字节;如果限制为 IPv4,则只有 24 兆字节。除非您以数字格式存储主机名,否则名称解析将花费最多的时间!
  • 如果您决定以数字格式存储主机名,那么您可以切换到二进制格式;一个文件用于struct sockaddr_ins,另一个用于struct sockaddr_in6s。您可以以闪电般的速度阅读它们,并编写单独的实用程序将它们转换为文本文件/从文本文件转换,并具有可选的名称解析。所以,总的来说,我只是不明白你在哪里发现问题;我在这里没有看到。 (我自己也处理大型数据集,所以我在这里的逻辑中没有假设任何大小限制。)

标签: c multithreading pthreads


【解决方案1】:

一些人在他们的 cmets 中暗示了一个很好的解决方案,但可能值得更详细地说明它。 full 解决方案有很多细节,代码也很复杂,所以我将使用伪代码来解释我的建议。

这里的内容实际上是经典生产者/消费者问题的变体:您有一个单一的事物产生数据,而许多事物试图使用该数据。在您的情况下,它 必须 是产生该数据的“单一事物”,因为源文件每行的长度是未知的:您不能只是向前跳转 'n' 个字节并以某种方式成为在下一个 IP。一次只能有一个参与者将读取指针移向\n 的下一个未知位置,因此根据定义,您只有一个生产者。

一般有三种攻击方式:

  • 解决方案 A 涉及让每个线程从共享文件缓冲区中提取更多内容,并在每次最后一次读取时启动异步(非阻塞)读取读取完成。使这个解决方案正确有很多令人头疼的问题,因为它对文件系统和正在执行的工作之间的时间差异非常敏感:如果文件读取速度很慢,工作人员都会等待文件。如果工作人员很慢,文件读取器将停止或填满内存,等待他们使用数据。此解决方案可能是绝对最快的,但要正确处理大量警告也是非常困难的同步代码。除非你是线程方面的专家(或者非常聪明地滥用epoll_wait()),否则你可能不想走这条路。

  • 解决方案 B 有一个“主”线程,负责读取文件,并用它读取的数据填充某种线程安全队列,每个队列条目有一个 IP 地址(一个字符串)。每个工作线程都尽可能快地使用队列条目,查询远程服务器,然后请求另一个队列条目。这需要一点小心才能正确,但通常比解决方案 A 安全得多,尤其是在您使用其他人的队列实现时。

  • Solution C 是相当 hacktastic,但你不应该立即忽略它,这取决于你在做什么。这个解决方案只涉及使用类似 Un*x sed 命令(请参阅Get a range of lines from a file given the start and end line numbers)将您的源文件预先分割成一堆“大块”源文件——比如说,二十个。然后,您只需使用&amp; 并行运行一个非常简单的单线程程序的 20 个副本,每个副本位于不同的文件“切片”上。结合一个小的 shell 脚本来自动化它,这可能是一个“足够好”的解决方案,可以满足很多需求。


让我们仔细看看解决方案 B — 具有线程安全队列的主线程。我会作弊并假设您可以构造一个工作队列实现(如果没有,有 StackOverflow 文章关于使用 pthreads 实现线程安全队列:pthread synchronized blocking queue)。

在伪代码中,这个解决方案是这样的:

main()
{
    /* Create a queue. */
    queue = create_queue();

    /* Kick off the master thread to read the file, and give it the queue. */
    master_thread = pthread_create(master, queue);

    /* Kick off a bunch of workers with access to the queue. */
    for (i = 0; i < 20; i++) {
        worker_thread[i] = pthread_create(worker, queue);
    }

    /* Wait for everybody to finish. */
    pthread_join(master_thread);
    for (i = 0; i < 20; i++) {
        pthread_join(worker_thread[i]);
    }
}

void master(queue q)
{
    FILE *fp = fopen("ips.txt", "r");
    char buffer[BIGGER_THAN_ANY_IP];

    /* Inhale the file as fast as we can, and push each line we
       read onto the queue. */
    while (fgets(fp, buffer) != NULL) {
        char *next_ip = strdup(buffer);
        enqueue(q, next_ip);
    }

    /* Add some final messages in the queue to let the workers
       know that we're out of data.  There are *much* better ways
       of notifying them that we're "done", but in this case,
       pushing a bunch of NULLs equal to the number of threads is
       simple and probably good enough. */
    for (i = 0; i < 20; i++) {
        enqueue(q, NULL);
    }
}

void worker(queue q)
{
    char *ip;

    /* Inhale messages off the queue as fast as we can until
       we get a "NULL", which means that it's time to stop.
       The call to dequeue() *must* block if there's nothing
       in the queue; the call should only return NULL if the
       queue actually had NULL pushed into it. */
    while ((ip = dequeue(q)) != NULL) {

        /* Insert code to actually do the work here. */
        connect_and_send_and_receive_to(ip);
    }
}

在实际实现中有很多注意事项和细节(例如:我们如何实现队列、环形缓冲区或链表?如果文本不是所有 IP 怎么办?如果字符缓冲区不大怎么办?够吗?多少线程就够了?我们如何处理文件或网络错误?malloc 性能会成为瓶颈吗?如果队列太大怎么办?我们能否更好地重叠网络 I/O?)。

但是,除了注意事项和细节之外,我上面介绍的伪代码是一个足够好的起点,您可能可以将其扩展为可行的解决方案。

【讨论】:

  • 谢谢你这是完美的。这对我来说绝对是足够的信息来开发我所拥有的。
  • 您还可以对同时处于活动状态的并发连接数实施某种限制。您可以在队列消费者代码中使用信号量/互斥锁
  • 抱歉,本意是尽快奖励这个答案。
  • 别担心!很高兴这个答案很有帮助。
【解决方案2】:

从文件中读取 IP,拥有工作线程,不断将 IP 分配给工作线程。让所有套接字通信发生在工作线程中。此外,如果 IPv4 地址以十六进制格式而不是 ascii 格式存储,则可能可以一次读取多个地址,并且速度会更快。

【讨论】:

    【解决方案3】:

    如果您只想异步读取,您可以使用 ncurses 中的 getch() 并将延迟设置为 0。它是 posix 的一部分,因此您不需要任何额外的依赖项。你也有 unlocked_stdio。

    另一方面,我想知道为什么 fgets() 是一个瓶颈。只要文件中有数据,它就不会阻塞。即使数据很大(例如 1MB 或 100k ip 地址),在启动时将其读取到列表中也需要不到 1 秒的时间。

    为什么要打开与列表中每个 ip 的 sockets_num 连接?您同时将 sockets_num 乘以 ip 地址数。由于 Linux 上的每个套接字都是文件,因此当您尝试打开数千个文件时会遇到系统问题(请参阅 ulimit -Sn)。您能否确认在这种情况下问题不在 connect() 中?

    【讨论】:

      猜你喜欢
      • 2011-06-10
      • 2011-11-25
      • 2012-05-16
      • 1970-01-01
      • 2013-06-19
      • 1970-01-01
      • 1970-01-01
      • 2012-07-05
      • 1970-01-01
      相关资源
      最近更新 更多