【问题标题】:Dataflow/Beam Templates, Productionization, Initialization, and ValueProviders数据流/Beam 模板、生产化、初始化和 ValueProviders
【发布时间】:2017-07-27 03:27:48
【问题描述】:

我有一个在 Google Cloud Dataflow 上运行的 Apache Beam 作业,作为其初始化的一部分,它需要对服务、发布/订阅订阅、GCS blob 等运行一些基本的健全性/可用性检查。它是一个流式管道,旨在无限运行处理数十万条发布/订阅消息。

目前它需要一大堆必需的可变参数:它需要在哪个 Google Cloud 项目中运行,它将在哪个存储桶和目录前缀中存储文件,它需要从哪个 pub/sub 订阅中读取,以及很快。在调用 pipeline.run 之前,它会对这些参数进行一些工作——验证、字符串拆分等。在当前的形式中,为了开始一项工作,我们一直将这些参数传递给 PipelineOptionsFactory 并每次都发出一个新的编译,但似乎应该有更好的方法。我已经将参数设置为 ValueProvider 对象,但是因为它们是在 pipeline.run 之外调用的,所以 Maven 在编译时抱怨 ValueProvider.get() 在运行时上下文之外被调用(是的,它是。)

我已尝试在 Google“Creating Templates”文档中使用 NestedValueProviders,但如果我尝试使用 NestedValueProvider.of 返回文档中所示的字符串,我的 IDE 会报错。我能够让 NestedValueProviders 编译的唯一方法如下:

NestedValueProvider<String, String> pid = NestedValueProvider.of(
        pipelineOptions.getDataflowProjectId(),
        (SerializableFunction<String, String>) s -> s
);

(String pid = NestedValueProvider.of(...) 导致以下错误:“不兼容的类型:不存在类型变量 T,X 的实例,因此 org.apache.beam.sdk。 options.ValueProvider.NestedValueProvider 符合 java.lang.String")

我的 pipelineOptions 中有以下内容:

ValueProvider<String> getDataflowProjectId();
void setDataflowProjectId(ValueProvider<String> value);

由于我们要处理的消息量很大,因此在管道的前端为每条通过的消息添加这些检查是不切实际的;我们将很快达到其中一些调用的每日帐户管理限制。

模板是我想做的正确方法吗?我该如何去实际生产这个?应该(可以吗?)我用 maven 编译成一个 jar,然后用我的参数在本地 dev/qa/prod 盒子上运行 jar,而根本不用关心 ValueProviders?或者是否可以为 ValueProvider 提供默认值并将其作为传递给模板的选项的一部分覆盖?

任何关于如何进行的建议将不胜感激。谢谢!

【问题讨论】:

    标签: google-cloud-dataflow apache-beam dev-to-production


    【解决方案1】:

    当前实现模板的方式没有必要执行“模板创建后”,而是“管道启动前”初始化/验证。

    所有现有的验证都在模板创建期间执行。如果验证检测到值不可用(由于是 ValueProvider),则跳过验证。

    在某些情况下,可以通过添加运行时检查来近似验证,这既可以作为自定义源的初始拆分的一部分,也可以作为DoFn@Setup 方法的一部分。在后一种情况下,@Setup 方法将为创建的每个 DoFn 实例运行一次。如果管道是 Batch,则在特定实例失败 4 次后,管道将失败。

    生产流水线的另一个选择是构建运行流水线的 JAR,并拥有一个运行该 JAR 的生产流程来启动流水线。

    关于您收到的编译错误——NestedValueProvider 返回一个ValueProvider——不可能从中得到一个String。但是,您可以将验证代码放入在NestedValueProvider 中运行的SerializableFunction。 虽然我相信这会在每次访问值时重新运行验证,但让 NestedValueProvider 缓存翻译后的值并不是不合理的。

    【讨论】:

    • 好的,谢谢。我现在正在尝试使用 jar 路由,但在让它识别数据流运行器时遇到问题:线程“main”java.lang.IllegalArgumentException 中的异常:未知的“运行器”指定了“数据流”,支持的管道运行器[DirectRunner] 关于这个还有另一篇关于 SO 的帖子,但唯一的答案是指定在编译时运行,这不是一个选项。
    • 这听起来像是一个类路径问题。确保包含 Dataflow 运行器的 JAR 在运行时可用。
    • 我上周想通了;问题是 pom 中的依赖排序之一。我将 dataflow-runner 移到依赖项列表的顶部,事情开始起作用了。不过谢谢!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多