【问题标题】:Elasticsearch/dataflow - connection timeout after ~60 concurrent connectionElasticsearch/dataflow - 约 60 个并发连接后的连接超时
【发布时间】:2020-11-24 19:13:57
【问题描述】:

我们在 Elastic Cloud 上托管 elatsicsearch 集群,并从数据流 (GCP) 中调用它。工作在 dev 中运行良好,但是当我们部署到 prod 时,我们在客户端看到很多连接超时。

Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "main.py", line 159, in process
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/client/utils.py", line 152, in _wrapped
    return func(*args, params=params, headers=headers, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/client/__init__.py", line 1617, in search
    body=body,
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py", line 390, in perform_request
    raise e
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/transport.py", line 365, in perform_request
    timeout=timeout,
  File "/usr/local/lib/python3.7/site-packages/elasticsearch/connection/http_urllib3.py", line 258, in perform_request
    raise ConnectionError("N/A", str(e), e)
elasticsearch.exceptions.ConnectionError: ConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fe5d04e5690>: Failed to establish a new connection: [Errno 110] Connection timed out) caused by: NewConnectionError(<urllib3.connection.HTTPSConnection object at 0x7fe5d04e5690>: Failed to establish a new connection: [Errno 110] Connection timed out)

我将 elasticsearch 客户端中的超时设置增加到 300 秒,如下所示,但似乎没有帮助。

self.elasticsearch = Elasticsearch([es_host], http_auth=http_auth, timeout=300)

https://cloud.elastic.co/deployments//metrics 查看部署 CPU 和内存使用率非常低(低于 10%),搜索响应时间也在 200 毫秒左右。 这里的瓶颈可能是什么?我们如何避免这种超时?

如下面的日志所示,大多数请求都因连接超时而失败,而成功的请求很快就会收到响应:

我尝试 ssh 进入我们遇到连接错误的 VM。 netstat 显示有大约 60 个 ESTABLISHED 连接到弹性搜索 IP 地址。当我从虚拟机卷曲到弹性搜索地址时,我能够重现超时。我可以很好地卷曲到其他 URL。我也可以从本地卷曲到 elasticsearch,所以问题只是 VM 和 elasticsaerch 服务器之间的连接。

dataflow(计算引擎)或 ElasticSearch 对并发连接数有限制吗?我在网上找不到任何信息。

【问题讨论】:

  • 你能澄清一下你正在建立什么类型的连接吗?是批处理还是流式传输?一段连接代码会有所帮助
  • 这是流式作业,我通过 elasticsearch-py 库(调用 _search REST API)连接到 elasticsearch。我把简化的python代码和shell脚本放在这里-gist.github.com/daisy1754/985411be17773342d73cb408627e461c实际上在发布之后,我做了一些更新,所以我们现在使用共享类来共享elasticsearch客户端,并将maxsize设置为elasticsearch以限制连接数。这似乎有帮助,但经过大约 8 小时的运行后,我再次开始看到连接超时
  • 您是否在代码中实现了拆卸?
  • 我没有。我的最新代码(以上链接)在设置中获得了共享的弹性搜索,因此无需清理
  • 您的 Dataflow 作业是否开启了自动缩放功能?当问题发生时,您是否看到工人人数增加了?此外,当您 ssh 进入有问题的 VM 时,VM 运行的繁忙程度如何?

标签: python elasticsearch google-cloud-platform google-cloud-dataflow apache-beam


【解决方案1】:

我对 ElasticSearch 的连接器做了一些研究。您可能需要尝试两个原则,以确保您的连接器尽可能高效。

注意如另一个答案中所建议的那样,设置最大工作人员数量(目前)可能没有太大帮助 - 让我们提高 Beam/Elastic 集群资源的利用率,如果我们开始达到任何一个限制,然后我们可以考虑限制工作人员的数量 - 但现在,您可以尝试改进您的连接器。

对外部服务使用批量请求

您提供的代码为进入 DoFn 的每个元素发出单独的搜索请求。正如您所指出的,这可以正常工作,但它会导致您的管道花费太多时间等待每个元素的外部请求 - 因此您等待往返的时间将是 O(n)。

很高兴,Elasticsearch 客户端有一个msearch method,它应该允许您执行批量搜索。你可以这样做:

class PredictionFn(beam.DoFn):
    def __init__(self, ...):
      self.buffer = []
    ...
    def process(self, element):
        self.buffer.append(element)
        if len(self.buffer) > BATCH_SIZE:
          return self.flush()

    def flush(self):
        result = []

        # Perform the search requests for user ids
        user_ids = [uid for cid, did, uid in self.buffer]
        user_ids_request = self._build_uid_reqs(user_ids)

        resp = es.msearch(body=user_ids_request)

        user_id_and_device_id_lists = []
        for r, elm in zip(resp['responses'], self.buffer):
          if len(r["hits"]["hits"]) == 0:
            continue
          # Get new device_id_list
          user_id_and_device_id_lists.append((elm[2],  # User ID
                                              device_id_list))
          

        device_id_lists = [elm[1] for elm in user_id_and_device_id_lists]
        device_ids_request = self._build_device_id_reqs(device_id_lists)

        resp = es.msearch(body=device_ids_request)

        resp = self.elasticsearch.search(index="sessions", body={"query": {"match": {"userId": user_id }}})
        # Handle the result, output anything necessary

    def _build_uid_reqs(self, uids):
      # Relying on this answer: https://stackoverflow.com/questions/28546253/how-to-create-request-body-for-python-elasticsearch-msearch/37187352
      res = []
      for uid in uids:
        res.append(json.dumps({'index': 'sessions'}))  # Request HEAD
        res.append(json.dumps({"query": {"match": {"userId": uid }}}))  # Request BODY

      return '\n'.join(res)

重用客户端,因为它是线程安全的

Elasticsearch 客户端 is also thread safe!

因此,您可以执行以下操作,而不是每次都创建一个新的:

class PredictionFn(beam.DoFn):
    CLIENT = None

    def init_elasticsearch(self):
        if PredictionFn.CLIENT is not None:
          return PredictionFn.CLIENT
        es_host = fetch_host()
        http_auth = fetch_auth()
        PredictionFn.CLIENT = Elasticsearch([es_host], http_auth=http_auth, 
            timeout=300, sniff_on_connection_fail=True,
            retry_on_timeout=True, max_retries=2,
            maxsize=5) # 5 connections per client
        return PredictionFn.CLIENT

这应该确保您为每个工作人员保留一个客户端,并且您不会创建太多到 ElasticSearch 的连接 - 因此不会收到拒绝消息。

让我知道这两个是否有帮助,或者我们是否需要尝试进一步改进!

【讨论】:

  • 哇,非常感谢您的详细回答,Pablo!对于重用客户端,实际上我的代码已经使用了梁的共享,因此所有线程都共享单个客户端。
  • 我忘了更新这个 SO 问题,但我还发现有 CLOSE_WAIT 套接字卡住了,并且可能导致连接超时,因为 elasticsearch-py 的连接池用完了(我尝试增加 maxsize 但它没有出于某种原因无济于事)。我已经向 elasticsearch-py 报告了这一点,但你能从光束/数据流方面想到任何可能导致这个 CLOSE_WAIT 套接字的东西吗? github.com/elastic/elasticsearch-py/issues/1459
  • 啊有趣。你能在你的 init_elasticsearch 函数中添加一个记录器吗?我可能猜想每次创建客户端都可能创建新连接?老实说,我不确定..
【解决方案2】:

编辑:这是红鲱鱼。 CLOSE_WAIT 不相关。我又遇到了同样的问题,现在大多数连接都处于 ESTABLISHED 状态:/

虽然下面的两个答案都很有见地,但我认为他们没有回答这个问题。

经过更多调查,我发现以某种方式 elasticsearch-py(或 urllib3)与数据流相结合,会在CLOSE_WAIT 状态下保持连接。一旦连接获得此状态,这些连接就会卡住(操作系统不会释放这些套接字,因为操作系统认为应用程序代码会关闭它)所以在某个时间运行作业后,我在连接池中的所有连接都处于此 CLOSE_WAIT 状态,因此我无法进行任何操作新的连接。如果我不使用连接池并为每个 pardo 实例化 elasticsaerch 客户端,它就会变得有价值,不知何故连接会更快地卡住。

我在这里报告了https://github.com/elastic/elasticsearch-py/issues/1459 的问题,但老实说,这个问题似乎在堆栈中更深层次,因为当我直接使用requests 包的连接池时遇到了类似的问题(我相信它也在后台使用了 urllib3)。

【讨论】:

  • 感谢您进行调查!您认为这是 Beam 问题吗?还是其他图书馆之一的问题?也许在删除 DoFn 时您还需要关闭连接?
【解决方案3】:

Dataflow 对传出连接的数量没有限制。 它在底层使用了一个 K8s 集群,每个 python 线程都存在于它们自己的 docker 容器中。

对 Elastic Cloud 的 API 调用受到速率限制(查看响应标头中的 x-rate-limit-{interval,limit,remaining} 字段)。

如果您执行大量并行作业和/或谷歌云扩展您的作业节点以使其更快,则使用 Dataflow 很容易达到 API 速率限制。

Dataflow / Apache Beam 作业中的可能解决方法:

1 -(无需代码)使用(Dataflow 执行参数)[ https://cloud.google.com/dataflow/docs/guides/specifying-exec-params] 来限制并发处理线程的数量。

您需要调整的三个参数是:

  • max_num_workers :运行的工作程序实例(机器)的最大数量。
  • number_of_worker_harness_threads:默认情况下,您的实例每个 CPU 1 个线程。
  • machine_type:您将使用的实例类型。

2 - 对您的代码实施速率限制。请参阅 Apache Beam Timely (and stateful) processing processing with Apache Beam

【讨论】:

  • 当我卷曲我的弹性搜索端点 (sessions/_search) 时,我没有得到任何这些速率限制标头。 elasticserver也不应该返回速率限制错误而不是仅仅挂在连接上吗?
猜你喜欢
  • 2015-04-01
  • 2015-03-31
  • 2018-08-17
  • 2013-03-30
  • 1970-01-01
  • 2014-06-22
  • 2022-01-21
  • 1970-01-01
  • 2021-04-13
相关资源
最近更新 更多