【问题标题】:cannot understand this "message sequence mismatch error"无法理解这个“消息序列不匹配错误”
【发布时间】:2021-02-16 12:24:21
【问题描述】:

我已经使用this 链接中回答的程序进行了一些修改。 以下是我修改后的代码:

#include <linux/netlink.h>

#include <netlink/netlink.h>
#include <netlink/route/qdisc.h>
#include <netlink/route/qdisc/plug.h>
#include <netlink/socket.h>

#include <atomic>
#include <csignal>
#include <iostream>
#include <stdexcept>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <thread>
#include <queue>
#include <chrono>

/**
 * Netlink route socket.
 */
struct Socket {
  Socket() : handle{nl_socket_alloc()} {

    if (handle == nullptr) {
      throw std::runtime_error{"Failed to allocate socket!"};
    }

    if (int err = nl_connect(handle, NETLINK_ROUTE); err < 0) {
      throw std::runtime_error{"Unable to connect netlink socket: " +
                               std::string{nl_geterror(err)}};
    }
  }

  Socket(const Socket &) = delete;
  Socket &operator=(const Socket &) = delete;
  Socket(Socket &&) = delete;
  Socket &operator=(Socket &&) = delete;

  ~Socket() { nl_socket_free(handle); }

  struct nl_sock *handle;
};

/**
 * Read all links from netlink socket.
 */
struct LinkCache {
  explicit LinkCache(Socket *socket) : handle{nullptr} {
    if (int err = rtnl_link_alloc_cache(socket->handle, AF_UNSPEC, &handle);
        err < 0) {
      throw std::runtime_error{"Unable to allocate link cache: " +
                               std::string{nl_geterror(err)}};
    }
  }

  LinkCache(const LinkCache &) = delete;
  LinkCache &operator=(const LinkCache &) = delete;
  LinkCache(LinkCache &&) = delete;
  LinkCache &operator=(LinkCache &&) = delete;

  ~LinkCache() { nl_cache_free(handle); }

  struct nl_cache *handle;
};

/**
 * Link (such as "eth0" or "wlan0").
 */
struct Link {
  Link(LinkCache *link_cache, const std::string &iface)
      : handle{rtnl_link_get_by_name(link_cache->handle, iface.c_str())} {

    if (handle == nullptr) {
      throw std::runtime_error{"Link does not exist:" + iface};
    }
  }

  Link(const Link &) = delete;
  Link &operator=(const Link &) = delete;
  Link(Link &&) = delete;
  Link &operator=(Link &&) = delete;

  ~Link() { rtnl_link_put(handle); }

  struct rtnl_link *handle;
};

/**
 * Queuing discipline.
 */
struct QDisc {
  QDisc(const std::string &iface, const std::string &kind)
      : handle{rtnl_qdisc_alloc()} {
    if (handle == nullptr) {
      throw std::runtime_error{"Failed to allocate qdisc!"};
    }

    struct rtnl_tc *tc = TC_CAST(handle);

    // Set link
    LinkCache link_cache{&socket};
    Link link{&link_cache, iface};
    rtnl_tc_set_link(tc, link.handle);

    // Set parent qdisc
    uint32_t parent = 0;

    if (int err = rtnl_tc_str2handle("root", &parent); err < 0) {
      throw std::runtime_error{"Unable to parse handle: " +
                               std::string{nl_geterror(err)}};
    }

    rtnl_tc_set_parent(tc, parent);

    // Set kind (e.g. "plug")
    if (int err = rtnl_tc_set_kind(tc, kind.c_str()); err < 0) {
      throw std::runtime_error{"Unable to set kind: " +
                               std::string{nl_geterror(err)}};
    }
  }

  QDisc(const QDisc &) = delete;
  QDisc &operator=(const QDisc &) = delete;
  QDisc(QDisc &&) = delete;
  QDisc &operator=(QDisc &&) = delete;

  ~QDisc() {
    if (int err = rtnl_qdisc_delete(socket.handle, handle); err < 0) {
      std::cerr << "Unable to delete qdisc: " << nl_geterror(err) << std::endl;
    }

    rtnl_qdisc_put(handle);
  }

  void send_msg() {
    int flags = NLM_F_CREATE;

    if (int err = rtnl_qdisc_add(socket.handle, handle, flags); err < 0) {
      throw std::runtime_error{"Unable to add qdisc: " +
                               std::string{nl_geterror(err)}};
    }
  }

  Socket socket;
  struct rtnl_qdisc *handle;
};

/**
 * Queuing discipline for plugging traffic.
 */
class Plug {
public:
  Plug(const std::string &iface, uint32_t limit, std::string msg)
      : qdisc_{iface, "plug"} {

    rtnl_qdisc_plug_set_limit(qdisc_.handle, limit);
    qdisc_.send_msg();

    // set_enabled(enabled_);
    set_msg(msg);
  }

  // void set_enabled(bool enabled) {
  //   if (enabled) {
  //     rtnl_qdisc_plug_buffer(qdisc_.handle);
  //   } else {
  //     rtnl_qdisc_plug_release_one(qdisc_.handle);
  //   }

  //   qdisc_.send_msg();
  //   enabled_ = enabled;
  // }

  void set_msg(std::string msg) {
    if (msg == "buffer") {
      int ret = rtnl_qdisc_plug_buffer(qdisc_.handle);
      //std::cout<<strerror(ret);
    } else if(msg == "commit") {
      int ret = rtnl_qdisc_plug_release_one(qdisc_.handle);
      //std::cout<<strerror(ret);
    } else {
      int ret = rtnl_qdisc_plug_release_indefinite(qdisc_.handle);
      //std::cout<<strerror(ret);   
    }

    qdisc_.send_msg();
  }  

  // bool is_enabled() const { return enabled_; }

private:
  QDisc qdisc_;

  // bool enabled_;
};

std::atomic<bool> quit{false};

void exit_handler(int /*signal*/) { quit = true; }

// this function busy wait on job queue until there's something 
//and calls release operation i.e. unplug qdisc to release output packets 
//generated for a particular epoch
void transmit_ckpnt(std::queue<int> &job_queue, Plug &plug){

  while(true){

      while(!job_queue.empty()){

        int id = job_queue.front();
        job_queue.pop();
        std::string s = std::to_string(id);

        std::cout<<"called from parallel thread "<<s<<"\n"; 

        //release buffer
        plug.set_msg("commit");  
      }
  }

}

int main() {
  std::string iface{"veth-host"};
  constexpr uint32_t buffer_size = 10485760;
  // bool enabled = true;

  Plug plug{iface, buffer_size, "buffer"};

  /**
   * Set custom exit handler to ensure destructor runs to delete qdisc.
   */
  struct sigaction sa {};
  sa.sa_handler = exit_handler;
  sigfillset(&sa.sa_mask);
  sigaction(SIGINT, &sa, nullptr);

  pid_t wpid;
  int status = 0;
  std::queue<int> job_queue;
  int ckpnt_no  = 1;

  std::thread td(transmit_ckpnt, std::ref(job_queue), std::ref(plug));
  plug.set_msg("indefinite");

  while(true){
    //plug the buffer at start of the epoch
    plug.set_msg("buffer");

    //wait for completion of epoch
    sleep(4);   
    
    job_queue.push(ckpnt_no);
    ckpnt_no += 1;  
  }

  plug.set_msg("indefinite");
  td.join();
  
  // while (!quit) {
  //   std::cout << "Plug set to " << plug.is_enabled() << std::endl;
  //   std::cout << "Press <Enter> to continue.";
  //   std::cin.get();

  //   plug.set_enabled(!plug.is_enabled());
  // }

  return EXIT_SUCCESS;
}

代码演练: 这个程序创建了一个插入/拔出类型的 qdisc,其中在插入操作期间,网络数据包被缓冲,而在拔出操作期间,网络数据包从第一个插头释放(队列规则 qdisc 的前面)到 qdisc 中的第二个插件。如果插入和拔出操作交替进行,则上述程序可以正常工作。但我想以它的构建方式使用它,即就像this link 中提到的那样,即

     TCQ_PLUG_BUFFER (epoch i)
         TCQ_PLUG_BUFFER (epoch i+1) 
             TCQ_PLUG_RELEASE_ONE (for epoch i)
                 TCQ_PLUG_BUFFER (epoch i+2)
                     ..............................so on

在我的程序中,主线程在每个 epoch 开始时开始缓冲,并继续执行。作业线程从作业队列中获取作业 ID,并将缓冲的数据包从队列头部释放到下一个插件。但这会产生以下错误:

./a.out: /lib/x86_64-linux-gnu/libnl-3.so.200: no version information available (required by ./a.out)
./a.out /usr/lib/x86_64-linux-gnu/libnl-route-3.so.200: no version information available (required by ./a.out)
called from parallel thread 1
called from parallel thread 2
called from parallel thread 3
called from parallel thread 4
called from parallel thread 5
called from parallel thread 6
called from parallel thread 7
terminate called after throwing an instance of 'std::runtime_error'
 what(): Unable to add qdisc: Message sequence number mismatch
Aborted

无法理解这是什么以及为什么会出现此错误,当在主线程中按顺序执行释放时,它正在工作,但现在当有另一个线程执行释放操作时,它只是检查 job_queue 是否是是否为空并执行释放操作,直到作业队列中有内容,如果job_queue为空,则忙等待。

【问题讨论】:

    标签: c++ multithreading linux-kernel netlink


    【解决方案1】:

    libnl 将预期的序列计数器存储为 nl_sock 结构 (reference) 的一部分。当多个线程调用 libnl 函数时,这可能会导致不一致,例如数据竞争(两个线程同时写入序列计数器)或竞争条件(检查时间问题,其中一个线程检查计数器是否满足某些条件,然后执行某些操作,但在其他线程之间修改计数器)。有关数据竞争和竞争条件的更多详细信息,请参阅here

    旁注:g++clang++ 都支持 -fsanitize=thread 标志,它会自动将额外的调试代码插入到二进制文件中,以帮助检测这种数据竞争 (reference)。虽然在这种情况下,它可能没有那么有用,因为您还必须使用此标志编译 libnl,这可能并不容易。

    来自 libnl 文档 (reference):

    The next step is to check the sequence number of the message against
    the currently expected sequence number. The application may provide
    its own sequence number checking algorithm by setting the callback
    function NL_CB_SEQ_CHECK to its own implementation. In fact, calling
    nl_socket_disable_seq_check() to disable sequence number checking will
    do nothing more than set the NL_CB_SEQ_CHECK hook to a function which
    always returns NL_OK.
    

    这给我们留下了以下选择:

    1. 使用互斥锁来保护对可能修改序列计数器的 libnl 函数的所有访问。

    2. 使用nl_socket_disable_seq_check禁用序列计数器检查。

    在我看来,1) 是更强大的解决方案。如果您更关心性能而不是健壮性,那么您可以选择 2)。

    选项 1:使用互斥锁来保护对 libnl 函数的访问

    包括标准库中的互斥头:

    #include <mutex>
    

    Plug 类中,添加std::mutex 作为成员:

    class Plug {
    ...
    private:
      std::mutex seq_counter_mutex_;
    ...
    };
    

    set_msg 的开头,使用std::lock_guard 在函数执行期间获取互斥锁。这保证了只有一个线程可以同时进入函数:

      void set_msg(std::string msg) {
        std::lock_guard guard{seq_counter_mutex_};
        ...
      }
    

    选项 2:禁用序列号检查

    Socket 类中,在构造函数的末尾,您可以禁用序列计数器检查:

      nl_socket_disable_seq_check(handle);
    

    【讨论】:

    • 现在发生的事情是主线程经常推送作业ID,但是transmit_ckpnt函数无法获得作业队列的访问权并且长时间停止,你知道它是怎么做到的吗像公平访问两个线程的作业队列一样完成?
    • 嗨,我很忙,很长时间没有看到这个答案。我尝试了第二个选项,它现在可以工作了。
    • 我很高兴这有效。关于std::mutex 的不公平问题(即一个线程能够比另一个线程更频繁地获取互斥锁),我对这个问题没有经验。我找到了这个答案,它解释了如何进行公平的互斥锁实现,但我还没有尝试过:*.com/a/17528648/10992173
    • @f9c69e9781fa194211448473495534 请在空闲时间看这个问题unix.stackexchange.com/questions/626528/…,尝试了很长时间但无法解决这个问题。只有网络输出数据包通过qdiscs 对吗?
    最近更新 更多