【发布时间】:2017-07-27 03:27:48
【问题描述】:
我有一个在 Google Cloud Dataflow 上运行的 Apache Beam 作业,作为其初始化的一部分,它需要对服务、发布/订阅订阅、GCS blob 等运行一些基本的健全性/可用性检查。它是一个流式管道,旨在无限运行处理数十万条发布/订阅消息。
目前它需要一大堆必需的可变参数:它需要在哪个 Google Cloud 项目中运行,它将在哪个存储桶和目录前缀中存储文件,它需要从哪个 pub/sub 订阅中读取,以及很快。在调用 pipeline.run 之前,它会对这些参数进行一些工作——验证、字符串拆分等。在当前的形式中,为了开始一项工作,我们一直将这些参数传递给 PipelineOptionsFactory 并每次都发出一个新的编译,但似乎应该有更好的方法。我已经将参数设置为 ValueProvider 对象,但是因为它们是在 pipeline.run 之外调用的,所以 Maven 在编译时抱怨 ValueProvider.get() 在运行时上下文之外被调用(是的,它是。)
我已尝试在 Google“Creating Templates”文档中使用 NestedValueProviders,但如果我尝试使用 NestedValueProvider.of 返回文档中所示的字符串,我的 IDE 会报错。我能够让 NestedValueProviders 编译的唯一方法如下:
NestedValueProvider<String, String> pid = NestedValueProvider.of(
pipelineOptions.getDataflowProjectId(),
(SerializableFunction<String, String>) s -> s
);
(String pid = NestedValueProvider.of(...) 导致以下错误:“不兼容的类型:不存在类型变量 T,X 的实例,因此 org.apache.beam.sdk。 options.ValueProvider.NestedValueProvider 符合 java.lang.String")
我的 pipelineOptions 中有以下内容:
ValueProvider<String> getDataflowProjectId();
void setDataflowProjectId(ValueProvider<String> value);
由于我们要处理的消息量很大,因此在管道的前端为每条通过的消息添加这些检查是不切实际的;我们将很快达到其中一些调用的每日帐户管理限制。
模板是我想做的正确方法吗?我该如何去实际生产这个?应该(可以吗?)我用 maven 编译成一个 jar,然后用我的参数在本地 dev/qa/prod 盒子上运行 jar,而根本不用关心 ValueProviders?或者是否可以为 ValueProvider 提供默认值并将其作为传递给模板的选项的一部分覆盖?
任何关于如何进行的建议将不胜感激。谢谢!
【问题讨论】:
标签: google-cloud-dataflow apache-beam dev-to-production