【问题标题】:Attempting asynchronous I/O with Win32 threads尝试使用 Win32 线程进行异步 I/O
【发布时间】:2011-03-07 15:10:48
【问题描述】:

我正在为 Windows 编写一个串口软件。为了提高性能,我尝试将例程转换为使用异步 I/O。我已经编写好代码并且工作得相当好,但我是这方面的半初学者,我想进一步提高程序的性能。在程序的压力测试期间(即以高波特率尽可能快地向/从端口突发数据),CPU 负载变得相当高。

如果有人在 Windows 中具有异步 I/O 和多线程方面的经验,如果您能看看我的程序,我将不胜感激。我有两个主要担忧:

  • 异步 ​​I/O 是否正确实现?我在网上找到了一些相当可靠的来源,建议您可以通过在最后使用您自己的数据实现您自己的 OVERLAPPED 结构来将用户数据传递给回调函数。这似乎工作得很好,但对我来说确实有点“骇人听闻”。此外,当我从同步/轮询转换为异步/回调时,程序的性能并没有提高多少,这让我怀疑自己做错了什么。

  • 将 STL std::deque 用于 FIFO 数据缓冲区是否合理?由于程序目前正在编写,我只允许一次接收 1 个字节的数据,然后必须对其进行处理。因为我不知道我会收到多少数据,所以可能是无穷无尽的。我假设当它必须分配数据时,这种每次 1 字节的行为会在双端队列后面产生缓慢的行为。而且我也不相信双端队列是线程安全的(我应该吗?)。 如果使用 STL deque 不明智,有什么建议可以使用更好的数据类型吗?基于静态数组的环形缓冲区?

也欢迎对代码提供任何其他反馈。


实现了串行例程,因此我有一个名为“Comport”的父类,它处理与串行 I/O 相关的所有内容。从这个类中,我继承了另一个名为“ThreadedComport”的类,它是一个多线程版本。

ThreadedComport 类(相关部分)

class ThreadedComport : public Comport
{
  private:

    HANDLE        _hthread_port;                 /* thread handle      */
    HANDLE        _hmutex_port;                  /* COM port access    */
    HANDLE        _hmutex_send;                  /* send buffer access */
    HANDLE        _hmutex_rec;                   /* rec buffer access  */

    deque<uint8>  _send_buf;
    deque<uint8>  _rec_buf;
    uint16        _data_sent;
    uint16        _data_received;

    HANDLE        _hevent_kill_thread;
    HANDLE        _hevent_open;
    HANDLE        _hevent_close;
    HANDLE        _hevent_write_done;
    HANDLE        _hevent_read_done;
    HANDLE        _hevent_ext_send;              /* notifies external thread */
    HANDLE        _hevent_ext_receive;           /* notifies external thread */

    typedef struct
    {
      OVERLAPPED       overlapped;
      ThreadedComport* caller;                  /* add user data to struct */
    } OVERLAPPED_overlap;

    OVERLAPPED_overlap _send_overlapped;
    OVERLAPPED_overlap _rec_overlapped;
    uint8*             _write_data;
    uint8              _read_data;
    DWORD              _bytes_read;

    static DWORD WINAPI _tranceiver_thread (LPVOID param);
    void                _send_data         (void);
    void                _receive_data      (void);
    DWORD               _wait_for_io       (void);

    static void WINAPI  _send_callback     (DWORD dwErrorCode,
                                            DWORD dwNumberOfBytesTransfered,
                                            LPOVERLAPPED lpOverlapped);
    static void WINAPI  _receive_callback  (DWORD dwErrorCode,
                                            DWORD dwNumberOfBytesTransfered,
                                            LPOVERLAPPED lpOverlapped);

};

通过 CreateThread() 创建的主线程例程:

DWORD WINAPI ThreadedComport::_tranceiver_thread (LPVOID param)
{
  ThreadedComport* caller = (ThreadedComport*) param;

  HANDLE handle_array [3] =
  {
    caller->_hevent_kill_thread,                 /* WAIT_OBJECT_0 */
    caller->_hevent_open,                        /* WAIT_OBJECT_1 */
    caller->_hevent_close                        /* WAIT_OBJECT_2 */
  };

  DWORD result;

  do
  {
    /* wait for anything to happen */
    result = WaitForMultipleObjects(3,
                                    handle_array,
                                    false,       /* dont wait for all */
                                    INFINITE);

    if(result == WAIT_OBJECT_1 )                 /* open? */
    {
      do                                         /* while port is open, work */
      {
        caller->_send_data();
        caller->_receive_data();
        result = caller->_wait_for_io();         /* will wait for the same 3 as in handle_array above,
                                                    plus all read/write specific events */

      } while (result != WAIT_OBJECT_0 &&        /* while not kill thread */
               result != WAIT_OBJECT_2);         /* while not close port */
    }
    else if(result == WAIT_OBJECT_2)             /* close? */
    {
      ;                                          /* do nothing */
    }

  } while (result != WAIT_OBJECT_0);             /* kill thread? */

  return 0;
}

依次调用以下三个函数:

void ThreadedComport::_send_data (void)
{
  uint32 send_buf_size;

  if(_send_buf.size() != 0)                      // anything to send?
  {
    WaitForSingleObject(_hmutex_port, INFINITE);
      if(_is_open)                               // double-check port
      {
        bool result;

        WaitForSingleObject(_hmutex_send, INFINITE);
          _data_sent = 0;
          send_buf_size = _send_buf.size();
          if(send_buf_size > (uint32)_MAX_MESSAGE_LENGTH)
          {
            send_buf_size = _MAX_MESSAGE_LENGTH;
          }
          _write_data = new uint8 [send_buf_size];


          for(uint32 i=0; i<send_buf_size; i++)
          {
            _write_data[i] = _send_buf.front();
            _send_buf.pop_front();
          }
          _send_buf.clear();
        ReleaseMutex(_hmutex_send);


        result = WriteFileEx (_hcom,              // handle to output file
                              (void*)_write_data, // pointer to input buffer
                              send_buf_size,      // number of bytes to write
                              (LPOVERLAPPED)&_send_overlapped, // pointer to async. i/o data
                              (LPOVERLAPPED_COMPLETION_ROUTINE )&_send_callback);

        SleepEx(INFINITE, true);                 // Allow callback to come

        if(result == false)
        {
          // error handling here
        }

      } // if(_is_open)
    ReleaseMutex(_hmutex_port);
  }
  else /* nothing to send */
  {
    SetEvent(_hevent_write_done);                // Skip write
  }
}


void ThreadedComport::_receive_data (void)
{
  WaitForSingleObject(_hmutex_port, INFINITE);

    if(_is_open)
    {
      BOOL  result;

      _bytes_read = 0;
      result = ReadFileEx (_hcom,                  // handle to output file
                           (void*)&_read_data,     // pointer to input buffer
                           1,                      // number of bytes to read
                           (OVERLAPPED*)&_rec_overlapped, // pointer to async. i/o data
                           (LPOVERLAPPED_COMPLETION_ROUTINE )&_receive_callback);

      SleepEx(INFINITE, true);                     // Allow callback to come

      if(result == FALSE)
      {
        DWORD last_error = GetLastError();
        if(last_error == ERROR_OPERATION_ABORTED)  // disconnected ?
        {
          close();                                 // close the port
        }
      }
    }

  ReleaseMutex(_hmutex_port);
}



DWORD ThreadedComport::_wait_for_io (void)
{
  DWORD result;
  bool  is_write_done = false;
  bool  is_read_done  = false;

  HANDLE handle_array [5] =
  {
    _hevent_kill_thread,
    _hevent_open,
    _hevent_close,
    _hevent_write_done,
    _hevent_read_done
  };


  do /* COM port message pump running until sending / receiving is done */
  {
    result = WaitForMultipleObjects(5,
                        handle_array,
                        false,                     /* dont wait for all */
                        INFINITE);

    if(result <= WAIT_OBJECT_2)
    {
      break;                                       /* abort */
    }
    else if(result == WAIT_OBJECT_3)               /* write done */
    {
      is_write_done = true;
      SetEvent(_hevent_ext_send);
    }
    else if(result == WAIT_OBJECT_4)               /* read done */
    {
      is_read_done = true;

      if(_bytes_read > 0)
      {
        uint32 errors = 0;

        WaitForSingleObject(_hmutex_rec, INFINITE);
          _rec_buf.push_back((uint8)_read_data);
          _data_received += _bytes_read;

          while((uint16)_rec_buf.size() > _MAX_MESSAGE_LENGTH)
          {
            _rec_buf.pop_front();
          }

        ReleaseMutex(_hmutex_rec);
        _bytes_read = 0;

        ClearCommError(_hcom, &errors, NULL);
        SetEvent(_hevent_ext_receive);
      }
    }
  } while(!is_write_done || !is_read_done);

  return result;
}

异步 I/O 回调函数:

void WINAPI ThreadedComport::_send_callback (DWORD dwErrorCode,
                                             DWORD dwNumberOfBytesTransfered,
                                             LPOVERLAPPED lpOverlapped)
{
  ThreadedComport* _this = ((OVERLAPPED_overlap*)lpOverlapped)->caller;

  if(dwErrorCode == 0)                           // no errors
  {
    if(dwNumberOfBytesTransfered > 0)
    {
      _this->_data_sent = dwNumberOfBytesTransfered;
    }
  }


  delete [] _this->_write_data;                  /* always clean this up */
  SetEvent(lpOverlapped->hEvent);
}


void WINAPI ThreadedComport::_receive_callback (DWORD dwErrorCode,
                                                DWORD dwNumberOfBytesTransfered,
                                                LPOVERLAPPED lpOverlapped)
{
  if(dwErrorCode == 0)                           // no errors
  {
    if(dwNumberOfBytesTransfered > 0)
    {
      ThreadedComport* _this = ((OVERLAPPED_overlap*)lpOverlapped)->caller;
      _this->_bytes_read = dwNumberOfBytesTransfered;
    }
  }

  SetEvent(lpOverlapped->hEvent);
}

【问题讨论】:

  • 我成功删除了 SleepEx() 函数并将其中一个 WaitForMultipleObjects 更改为“Ex”版本。尽管我无法发现任何明显的性能变化,但它运行良好。我想启动 ReadFileEx() 和 WriteFileEx() 所需的时间几乎没有。
  • 对代码做了进一步的修改。首先,我将 write/rec 数组缓冲区更改为固定大小。这使得程序消耗更多的 RAM,但没有显着的性能变化。其次,我允许超过 1 个字节的传入数据。正如怀疑的那样,它显着提升了性能。作为记录,我仍然坚持使用 std:deque,并且 std:将原始 uint8 数组复制到带有 back_inserter 的双端队列中。所以 STL 实现似乎相当有效(Dinkumware STL,Embarcadero Builder 2009 的一部分)。

标签: c++ winapi asynchronous io deque


【解决方案1】:

第一个问题很简单。该方法不是hackish;您拥有OVERLAPPED 内存及其后面的所有内容。 Raymond Chen 最好地描述了这一点:http://blogs.msdn.com/b/oldnewthing/archive/2010/12/17/10106259.aspx

如果您在等待 I/O 完成时有更好的事情要做,那么您只会期望性能提升。如果您所做的只是SleepEx,您只会看到 CPU% 下降。线索就在“重叠”的名称中 - 它允许您重叠计算和 I/O。

std::deque&lt;unsigned char&gt; 可以毫无问题地处理 FIFO 数据。它可能会回收 4KB 块(精确数字由广泛的分析确定,所有这些都为您完成)。

[编辑] 我进一步研究了您的代码,似乎代码不必要地复杂。对于初学者来说,异步 I/O 的主要好处之一是您不需要所有线程的东西。线程允许您使用更多内核,但您正在处理一个缓慢的 I/O 设备。即使是单个内核也足够了,if 它不会花费所有时间等待。这正是重叠 I/O 的用途。您只需将一个线程专用于端口的所有 I/O 工作。由于它是唯一的线程,因此它不需要互斥锁来访问该端口。

OTOH,您需要一个围绕 deque&lt;uint8&gt; 对象的互斥体,因为生产者/消费者线程与 comport 线程不同。

【讨论】:

  • 感谢您的回复。很高兴知道 OVERLAPPED 和 deque 都不会引起问题。我为此使用线程的原因是为了减少主程序中的延迟。我已经看到太多有缺陷的程序设计从他们的主 GUI 线程执行 I/O 以信任它。我预计 I/O 速度会非常高,并且我不希望它进入并中断主程序。虽然我承认线程是代码使用同步 I/O 时的遗留物。我想我应该在开始 SleepEx() 之前尝试同时开始读取和写入。
  • 两个双端队列都受自己的互斥体(_hmutex_rec 和 _hmutex_send)保护。
  • 这是一个串口。您将不会使现代 CPU 过载。至少从 8080 时代开始,我们就一直在进行串行端口通信,并且它们在 8086 时代达到了目前的最高速度(115K2)。这些速度比现在的 CPU 慢 3-4 个数量级。
  • 哦,是的,但是 8086 没有 MS Windows 将其拖下。根据我的经验,几乎所有 8 位 MCU 在串行通信方面都优于 Windows PC。我开始剖析这段代码的原因是,当暴露在 115kpbs 或 256kbps 的连续数据洪流中时,GUI 会滞后。我从一开始就知道 GUI 本身就是罪魁祸首,尽管我想在重新设计 GUI 之前尽可能优化串行例程。
【解决方案2】:

我看不出有任何理由在这样的项目中使用异步 I/O。当您处理大量套接字或在等待数据时有工作要做时,异步 I/O 很好,但据我所知,您只处理一个套接字,中间不做任何工作.

另外,为了了解知识,您通常会使用 I/O 完成端口来处理异步 I/O。我不确定是否存在使用 I/O 完成端口会对性能产生负面影响的情况。

但是,您的异步 I/O 使用看起来还不错。实现你自己的 OVERLAPPED 结构确实看起来像一个 hack,但它是正确的;没有其他方法可以将您自己的数据与完成相关联。

Boost 也有一个循环缓冲区实现,但我不确定它是否是线程安全的。但是,没有一个标准库容器是线程安全的。

【讨论】:

    【解决方案3】:

    我认为您的代码设计欠佳。

    • 我猜你正在与太多线程共享太多数据结构。我认为您应该将一个端口的串行设备 IO 的所有处理放到一个线程中,并在 IO 线程和所有客户端线程之间放置一个同步的命令/数据队列。让 IO 线程注意队列中的命令/数据。

    • 您似乎正在为每个发送的事件分配和释放一些缓冲区。避免这种情况。如果将所有 IO 保存在单个线程中,则可以重用单个缓冲区。无论如何,您都在限制消息的大小,您可以预先分配一个足够大的缓冲区。

    • 将要发送的字节放入std::deque 是次优的。您必须将它们序列化为WriteFile() 的连续内存块。相反,如果您在一个 IO 线程和其他线程之间使用某种命令/数据队列,您可以让客户端线程一次提供连续的内存块。

    • 一次读取 1 个字节似乎也很愚蠢。除非它不适用于串行设备,否则您可以为ReadFileEx() 提供足够大的缓冲区。它返回它实际设法读取的字节数。它不应该阻止,AFAIK,除非我当然错了。

    • 您正在等待重叠 IO 使用 SleepEx() 调用完成。如果您只是同步结束,那么重叠 IO 的意义何在?

    【讨论】:

    • 而不是警觉地睡觉,为什么不警觉地等待完成事件 - 这样你会在 I/O 完成时被唤醒,而不是永远不会。
    • @wilx 感谢您的回复。我看不出我是如何共享太多数据结构的,每个私有变量都是非静态的,因此通过调用者参数传递,这意味着类的每个实例都有自己的数据集。关于动态分配,它确实看起来确实是不必要的复杂。我将尝试将其更改为静态缓冲区。
    • @wilx 关于 WriteFile(),我不确定如何使用单个缓冲区?这意味着应用程序每次要访问数据时都必须等待线程完成,这是不可接受的,并且使多线程毫无意义。最大的资源窃贼是在 WriteFile 的缓冲区和程序缓冲区(无论是双端队列还是其他任何东西)之间铲起数据,但我认为没有办法避免这种情况?
    • @wilx 我可以为 ReadFileEx() 和 WaitForSingleObject 设置超时。但是超时必须非常小,我怀疑它有什么意义。我想我可以尝试一下,看看它是否能提高性能。
    • @Larry 你永远不会醒来是什么意思? SleepEx() 文档声明它会在回调执行后立即唤醒。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-06-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-07-24
    相关资源
    最近更新 更多