【问题标题】:Dataflow autoscale does not boost performance数据流自动缩放不会提高性能
【发布时间】:2025-11-28 01:40:01
【问题描述】:

我正在构建一个从 pubsub 读取数据并将请求发送到第 3 方 API 的数据流管道。管道使用THROUGHPUT_BASED 自动缩放。

但是,当我对其进行负载测试时,在它自动缩放到 4 个工作以赶上 pubsub 中的积压工作后,但似乎相同的工作负载在工作之间分散了事件,但总体吞吐量并没有显着增加。

^ pubsub 中未确认消息的数量。高峰是交通停止的时候

^ 每个工作人员发送的字节数。高峰是最初的工人。随着更多的工作人员被添加到池中,工作量被卸载,而不是每个人都承担更多的工作量。 CPU 利用率看起来相同,初始工作人员的峰值利用率低于 30%。

^ 工人的历史。

感觉好像在某个地方遇到了限制,但我很难看到限制是什么。我每秒提取不到 300 条消息,每条消息大约 1kb。

更新: 我在使用 TextIO 的批处理作业和使用 PubSubIO 的流式作业之间进行了另一轮比较,两者都使用“n1-standard-8”机器和固定数量为 15 的工人。批处理作业达到 450 个元素/秒,但流式作业仍然达到 230 个元素/秒的峰值。似乎限制来自源头。虽然我不确定限制是什么。

更新 2 这是一个简单的代码 sn-p 来重现该问题。您需要手动将作品数量设置为 1 和 5,并比较管道处理的元素数量。您将需要一个负载测试器来有效地将消息发布到主题。

package debug;

import java.io.IOException;

import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;

public class DebugPipeline {
    @SuppressWarnings("serial")
    public static PipelineResult main(String[] args) throws IOException {

        /*******************************************
         * SETUP - Build options.
         ********************************************/

        DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(DataflowPipelineOptions.class);
        options.setRunner(DataflowRunner.class);
        options.setAutoscalingAlgorithm(
                DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED);
        // Autoscaling will scale between n/15 and n workers, so from 1-15 here
        options.setMaxNumWorkers(15);
        // Default of 250GB is absurdly high and we don't need that much on every worker
        options.setDiskSizeGb(32);
        // Manually configure scaling (i.e. 1 vs 5 for comparison)
        options.setNumWorkers(5);

        // Debug Pipeline
        Pipeline pipeline = Pipeline.create(options);
        pipeline
            .apply(PubsubIO.readStrings()
                    .fromSubscription("your subscription"))
            // this is the transform that I actually care about. In production code, this will
            // send a REST request to some 3rd party endpoint.
            .apply("sleep", ParDo.of(new DoFn<String, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws InterruptedException {
                    Thread.sleep(500);
                    c.output(c.element());
                }
            }));

        return pipeline.run();
    }
}

【问题讨论】:

  • 出于好奇,您能否检查here 您确实没有达到默认配额?
  • 我想到的另一个想法是,如果管道后面有一个限制阶段会限制整体吞吐量,那么管道的一般结构是什么?
  • 对于您的第一条评论,我认为我们没有超出 pubsub 的限制。我们以每秒 1.2 Mb/s 的速率提取 200 条消息。
  • 到您的第二条评论。管道从 pubsub 拉取 -> 将消息解析为数据模型 -> 将请求查询字符串准备到 3rd 方 API -> 使用 Apache httpclient 发送消息 -> 如果请求失败(从未发生)则准备 Datastore 实体 -> 保存失败数据到数据存储区(从未发生过)。它使用反射将来自 pubsub 的输入 JSON 解析为多个数据模型(在本例中只有一个),然后确定如何在运行时处理数据。我最初认为可能存在竞争条件,但行为不支持它。
  • 这里有一个简单的方法可以确保 PubSub 确实是这里的真正问题,而不是进一步的问题。将您的 PubSubIO 替换为读取大文本文件中相关数据的 TextIO(如果需要,还可以在后续阶段根据需要对数据进行格式化)。 TextIO 非常高效。如果您发现您的吞吐量仍然像使用 PubSub 一样受到限制,则说明您的管道存在瓶颈,这就是您的工作人员无法更快地处理 PubSub 的原因。

标签: google-cloud-platform google-cloud-dataflow apache-beam google-cloud-pubsub


【解决方案1】:

考虑到:

  1. 从 PubSubIO 切换到 TextIO 没有任何改善。
  2. 从 3 名工人变为 15 名工人并没有改善。
  3. 批处理作业达到 450 个元素/秒,但流式处理达到峰值 230 个元素/秒
  4. 有一个将 REST 请求发送到第 3 方 API 的转换,需要花费数小时的时间。
  5. 在测试中,取消转换会将吞吐量从 120 个元素/秒提高到 400 个元素/秒。

问题似乎并不在于 PubSub 方面。根据this documentation,您可能正在重载第 3 方 API。客户端文档中解释了相同的效果,而不是第 3 方 API:

一个客户可能有积压的消息,因为 它没有能力处理传入的数量 消息,但网络上的另一个客户端确实具有该容量。 第二个客户可以减少整体积压,但它没有得到 机会,因为第一个客户端无法将其消息发送到 第二个客户足够快。这降低了总体比率 处理,因为消息卡在第一个客户端上。

创建积压的消息会消耗内存、CPU 和带宽 资源,因为客户端库继续扩展消息' 确认截止日期。

[...]

更一般地说,需要流量控制表明消息是 以高于消费的速度发布。如果这 是一种持久状态,而不是消息量的峰值,请考虑 增加订阅者客户端实例和机器的数量。

如果您只能在 PubSub 上工作以改善结果,并且您认为实现此目的的方法是为元素扩展 acknowledgement deadline time,您可以通过访问 here 并手动编辑订阅来测试它。要使用 Java 以编程方式执行此操作,请查看 thisthis 文档,分别关于管理订阅和更改 ackDeadlineSeconds

【讨论】:

  • 我不认为我们正在重载第 3 方 API,因为可以通过将 API 调用替换为 Thread.sleep 来重现相同的行为。我还尝试将 PubSub 确认截止日期延长到 10 分钟,但吞吐量并没有提高。延长确认截止日期请求在 10 分钟后开始出现。它们不太频繁,但仍然存在。
  • 测试类似的东西,当有足够多的 Pub/Sub 消息要被拉取时,吞吐量与工作人员一起扩展。此时,我建议您打开public issue here,提供帖子中的信息和您的项目编号,以便我们查看您的具体项目。