【问题标题】:How to publish multiple messages in google Pub/Sub fast?如何在 google Pub/Sub 中快速发布多条消息?
【发布时间】:2021-11-17 07:10:00
【问题描述】:

如何快速发布多条消息到 pubsub?没有多处理和多线程,因为代码已经在一个线程中

下面的代码每秒发布 40 条消息

publisher = pubsub.PublisherClient(
    credentials=credentials,
    batch_settings=types.BatchSettings(
         max_messages=1000,  # default is 100
        max_bytes=1 * 1000 * 1000,  # 1 MiB
        max_latency=0.1,  # default is 10 ms
    )
)

topic_name = 'projects/{project_id}/topics/{topic}'.format(
    project_id=PROJECT_ID,
    topic=TOPIC_PUBSUB,
)

for data in results:
    bytes_json_data = str.encode(json.dumps(data))
    future = publisher.publish(topic_name, bytes_json_data)
    future.result()

【问题讨论】:

  • 每秒 40 条消息并不是那么快。你的批处理配置应该足够了。你有什么问题?

标签: google-cloud-platform publish-subscribe google-cloud-pubsub


【解决方案1】:

取出:

future.result()

别这样:

for data in results:
    bytes_json_data = str.encode(json.dumps(data))
    future = publisher.publish(topic_name, bytes_json_data)

发布 10k 条消息应该花费不到一秒钟的时间

【讨论】:

    【解决方案2】:

    不要一次发布一条消息,然后等待future,您应该一次发布所有消息,然后等待最后发布的futures。它看起来像:

    from concurrent import futures
    ...
    publish_futures = []
    for data in results:
        bytes_json_data = str.encode(json.dumps(data))
        future = publisher.publish(topic_name, bytes_json_data)
        publish_futures.append(future)
    ...
    futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
    

    有一个带有示例代码的detailed example in the docs

    【讨论】: