【发布时间】:2019-01-26 07:45:27
【问题描述】:
我尝试使用数据流将数据从 pubsub 流式传输到数据存储区。
我尝试构建模板,但它根本不起作用。 所以,我认为这是不可能的。
怎么样? 请给我一些建议。
【问题讨论】:
标签: google-cloud-platform google-cloud-datastore google-cloud-dataflow google-cloud-pubsub
我尝试使用数据流将数据从 pubsub 流式传输到数据存储区。
我尝试构建模板,但它根本不起作用。 所以,我认为这是不可能的。
怎么样? 请给我一些建议。
【问题讨论】:
标签: google-cloud-platform google-cloud-datastore google-cloud-dataflow google-cloud-pubsub
您可能偶然发现了该特定模板中的错误。其中有两个单独的问题,第一个是在这个 SO 问题How to use google provided template [pubsub to Datastore]? 中回答的一个,它指向缺少的errorTag,第二个是 Datastore 的编写者在将实体写入到数据存储。
如果您使用-e 选项运行maven 编译命令,它将显示错误消息GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey。为什么这样做?这与消息是从 PubSub 流式传输而不是批处理(这是我们所期望的)这一事实有关。这意味着没有有限的项目集流入,而是永无止境的项目流。为了使用它,我们需要将其限制为可以由聚合函数(例如GroupByKey)考虑的项目的流式窗口。帮助将实体写入数据存储区的 DatastoreConverters 类实际上会检查我们是否尝试多次写入相同的键,它通过使用 GroupByKey 函数来实现。
简单的解决方案,只需为其提供一个流式处理窗口即可,这是管道中添加的第三个.apply(...),它将流窗口化在一起,并允许您在此处使用数据存储写入器:
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TupleTag;
import org.joda.time.Duration;
...
public static void main(String[] args) {
PubsubToDatastoreOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(PubsubToDatastoreOptions.class);
Pipeline pipeline = Pipeline.create(options);
TupleTag<String> errorTag = new TupleTag<String>("errors") {};
pipeline
.apply(PubsubIO.readStrings()
.fromTopic(options.getPubsubReadTopic()))
.apply(TransformTextViaJavascript.newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.build())
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(WriteJsonEntities.newBuilder()
.setProjectId(options.getDatastoreWriteProjectId())
.setErrorTag(errorTag)
.build());
pipeline.run();
}
现在还有其他的,而且可能更好的方法可以做到这一点,但这将使您的模板编译并工作。此示例显示 1 秒的 FixedWindow,还有其他选项可以执行此操作,请查看 Google DataFlow - Windowing 的文档。
编译你的模板:
mvn compile exec:java -Dexec.mainClass=com.google.cloud.teleport.templates.PubsubToDatastore -Dexec.cleanupDaemonThreads=false -Dexec.args=" \
--project=[YOUR_PROJECTID_HERE] \
--stagingLocation=gs://[YOUR_BUCKET_HERE]/staging \
--tempLocation=gs://[YOUR_BUCKET_HERE]/temp \
--templateLocation=gs://[YOUR_BUCKET_HERE]/templates/PubsubToDatastore.json \
--runner=DataflowRunner"
然后启动作业:
gcloud dataflow jobs run [NAME_OF_THE_JOB_WHATEVER_YOU_LIKE] \
--gcs-location=gs://[YOUR_BUCKET_HERE]/templates/PubsubToDatastore.json \
--zone=[ZONE_WHERE_YOU_WANT_TO_RUN] \
--parameters "pubsubReadTopic=[YOUR_PUBSUB_TOPIC_HERE],datastoreWriteProjectId=[YOUR_PROJECTID_HERE]"
现在您应该看到您的作业在 GCP 控制台中运行:
请注意,此特定解决方案和选择的窗口将意味着 PubSub 消息最多延迟一秒才能最终到达数据存储区。缩短窗口可能会有所帮助,但为了获得更高的吞吐量,您需要与此处显示的管道不同的管道。
【讨论】: