【发布时间】:2025-11-28 01:40:01
【问题描述】:
我正在构建一个从 pubsub 读取数据并将请求发送到第 3 方 API 的数据流管道。管道使用THROUGHPUT_BASED 自动缩放。
但是,当我对其进行负载测试时,在它自动缩放到 4 个工作以赶上 pubsub 中的积压工作后,但似乎相同的工作负载在工作之间分散了事件,但总体吞吐量并没有显着增加。
^ 每个工作人员发送的字节数。高峰是最初的工人。随着更多的工作人员被添加到池中,工作量被卸载,而不是每个人都承担更多的工作量。 CPU 利用率看起来相同,初始工作人员的峰值利用率低于 30%。
感觉好像在某个地方遇到了限制,但我很难看到限制是什么。我每秒提取不到 300 条消息,每条消息大约 1kb。
更新: 我在使用 TextIO 的批处理作业和使用 PubSubIO 的流式作业之间进行了另一轮比较,两者都使用“n1-standard-8”机器和固定数量为 15 的工人。批处理作业达到 450 个元素/秒,但流式作业仍然达到 230 个元素/秒的峰值。似乎限制来自源头。虽然我不确定限制是什么。
更新 2 这是一个简单的代码 sn-p 来重现该问题。您需要手动将作品数量设置为 1 和 5,并比较管道处理的元素数量。您将需要一个负载测试器来有效地将消息发布到主题。
package debug;
import java.io.IOException;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
public class DebugPipeline {
@SuppressWarnings("serial")
public static PipelineResult main(String[] args) throws IOException {
/*******************************************
* SETUP - Build options.
********************************************/
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setAutoscalingAlgorithm(
DataflowPipelineWorkerPoolOptions.AutoscalingAlgorithmType.THROUGHPUT_BASED);
// Autoscaling will scale between n/15 and n workers, so from 1-15 here
options.setMaxNumWorkers(15);
// Default of 250GB is absurdly high and we don't need that much on every worker
options.setDiskSizeGb(32);
// Manually configure scaling (i.e. 1 vs 5 for comparison)
options.setNumWorkers(5);
// Debug Pipeline
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(PubsubIO.readStrings()
.fromSubscription("your subscription"))
// this is the transform that I actually care about. In production code, this will
// send a REST request to some 3rd party endpoint.
.apply("sleep", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws InterruptedException {
Thread.sleep(500);
c.output(c.element());
}
}));
return pipeline.run();
}
}
【问题讨论】:
-
出于好奇,您能否检查here 您确实没有达到默认配额?
-
我想到的另一个想法是,如果管道后面有一个限制阶段会限制整体吞吐量,那么管道的一般结构是什么?
-
对于您的第一条评论,我认为我们没有超出 pubsub 的限制。我们以每秒 1.2 Mb/s 的速率提取 200 条消息。
-
到您的第二条评论。管道从 pubsub 拉取 -> 将消息解析为数据模型 -> 将请求查询字符串准备到 3rd 方 API -> 使用 Apache httpclient 发送消息 -> 如果请求失败(从未发生)则准备 Datastore 实体 -> 保存失败数据到数据存储区(从未发生过)。它使用反射将来自 pubsub 的输入 JSON 解析为多个数据模型(在本例中只有一个),然后确定如何在运行时处理数据。我最初认为可能存在竞争条件,但行为不支持它。
-
这里有一个简单的方法可以确保 PubSub 确实是这里的真正问题,而不是进一步的问题。将您的 PubSubIO 替换为读取大文本文件中相关数据的 TextIO(如果需要,还可以在后续阶段根据需要对数据进行格式化)。 TextIO 非常高效。如果您发现您的吞吐量仍然像使用 PubSub 一样受到限制,则说明您的管道存在瓶颈,这就是您的工作人员无法更快地处理 PubSub 的原因。
标签: google-cloud-platform google-cloud-dataflow apache-beam google-cloud-pubsub