【问题标题】:Python: Implementing concurrent DNS requests (pipelining)Python:实现并发 DNS 请求(流水线)
【发布时间】:2014-09-21 08:18:05
【问题描述】:

我正在尝试编写一个发送多个 DNS 请求的 python 脚本,每个请求使用不同的名称服务器。

使用 dnspython 实现顺序解决方案很容易,但对我来说太慢了。 使用线程池添加并发是不可能的,因为在我的特殊情况下,所有请求都使用相同的源端口(REUSE_ADDRESS 在这里也无济于事)。

由于上述原因,我正在考虑使用以下解决方案(放弃使用 dnspython 的解析器模块,但利用其消息构建和解析模块):

  • 最多允许进行 X 个请求
  • 同时发送 X 个请求(仅使用 udp 发送 dns 请求数据包。可能会在发送之间增加延迟以避免突发)
  • 另一个线程等待响应
  • 当响应到达时,将其与请求匹配(按地址)并允许运行新请求
  • 如果对请求的响应未在 TIMEOUT 秒内到达,则将其标记为已完成并允许运行新请求

我的主要问题是:

  • 如何轻松实现任务超时
  • 是否可以在不使用线程同步的情况下实现它(例如使用事件循环?)
  • 是否有任何现有的库可以帮助实现它(感觉就像我在这里尝试重新发明轮子,我查看了 asycnio 模块但无法找到一种方法来利用它来解决我的问题)。请注意,我不想使用现有的 dns 或网络库,因为我需要灵活地更改核心功能(例如,使用原始套接字、更改 DNS 标头字段等)。

【问题讨论】:

    标签: python sockets dns task python-asyncio


    【解决方案1】:

    你试过aiodns 包吗? https://pypi.python.org/pypi/aiodns/

    对于超时 asyncio 有标准的 wait_for 协程 (https://docs.python.org/3/library/asyncio-task.html#asyncio.wait_for)。

    【讨论】:

    • 我需要为每个请求使用不同的名称服务器,而且似乎 aiodns 不是为此而设计的。
    • 您可以为每个名称服务器创建一个DNSResolver 的实例,并通过nameservers 参数指定所需的DNS ip。 DNSResolver 的实例化是相对轻量级的任务。
    【解决方案2】:

    在这里使用简单的选择循环效果很好。这是完成的代码sn-p:

    def run(self, resolvers_iter):
        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0)
        sock.setblocking(False)
    
        try:
            pending = []
    
            # For rate limiting
            limit = float(self.timeout)/self.max_pending  # delay between sends
            last_sent = clock() - limit
    
            # Work as long as there are more resolvers to query
            completed_sending = False
            while not completed_sending or pending:
    
                # Can I send more requests
                want_to_write = False
                if not completed_sending and len(pending) < self.max_pending:
                    want_to_write = True
    
                # Calculate nearest timeout time to make sure select returns on time
                timeout = None
                if pending:
                    timeout = self.timeout - clock() + pending[0][0] + 0.001
                    timeout = max(timeout, 0)
    
                # Rate limit
                time_passed_since_send = clock() - last_sent
                if want_to_write and time_passed_since_send + 0.001 < limit:
                    timeout = min(timeout, limit-time_passed_since_send)
                    timeout = max(timeout, 0)
                    want_to_write = False
    
                # Poll socket - uses internally the select module
                readable, writable = self._select(readable=True, writable=want_to_write, timeout=timeout)
    
                # Can read
                if readable:
                    # Read as many as possible
                    while True:
                        try:
                            # Get response
                            response, from_address = DnsFacilities.read_response(sock)
    
                            # Check if not duplicate or already timed out
                            sent_time = None
                            for i, (t, ip) in enumerate(pending):
                                if ip == from_address[0]:
                                    sent_time = t
                                    del pending[i]
                                    break
    
                            if sent_time is not None:
                                self.response_received((response, from_address, clock()-sent_time))
    
                        except socket.error, e:
                            if e[0] in (socket.errno.EWOULDBLOCK, socket.errno.EAGAIN):
                                break
                            elif e[0] in (socket.errno.WSAECONNRESET, socket.errno.WSAENETRESET):
                                pass
                            else:
                                raise
    
                # Can write
                if writable:
                    try:
                        last_sent = clock()
                        resolver_address = resolvers_iter.next()
                        DnsFacilities.send_query(resolver_address)
                        pending.append((clock(), resolver_address)
                    except StopIteration:
                        completed_sending = True
    
                # Check for timed out tasks
                now = clock()
                while pending and now - pending[0][0] > self.timeout:
                    self.response_timeout(pending[0][1])
                    del pending[0]
    
        finally:
            sock.close()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2011-07-04
      • 2015-05-18
      • 2019-11-07
      • 1970-01-01
      • 1970-01-01
      • 2018-04-16
      • 2017-02-19
      • 2017-02-18
      相关资源
      最近更新 更多