【问题标题】:Scio Apache Beam - How to properly separate a pipeline code?Scio Apache Beam - 如何正确分离管道代码?
【发布时间】:2019-08-25 06:28:23
【问题描述】:

我有一个带有一组 PTransforms 的管道,我的方法变得很长。

我想将我的 DoFns 和我的复合变换写在一个单独的包中,然后在我的 main 方法中使用它们。使用 python 非常简单,如何使用 Scio 来实现?我没有看到任何这样做的例子。 :(

     withFixedWindows(
        FIXED_WINDOW_DURATION,
        options = WindowOptions(
          trigger = groupedWithinTrigger,
          timestampCombiner = TimestampCombiner.END_OF_WINDOW,
          accumulationMode = AccumulationMode.ACCUMULATING_FIRED_PANES,
          allowedLateness = Duration.ZERO
        )
      )
      .sumByKey
      // How to write this in an another file and use it here?
      .transform("Format Output") {
        _
          .withWindow[IntervalWindow]
          .withTimestamp
      }

【问题讨论】:

    标签: scala apache-beam spotify-scio


    【解决方案1】:

    如果我正确理解您的问题,您希望将您的 map, groupBy, ... 转换捆绑在一个单独的包中,并在您的主管道中使用它们。

    一种方法是使用applyTransform,但你最终会使用PTransforms,它不适合scala。

    您可以简单地编写一个函数来接收一个 SCollection 并返回转换后的一个,例如:

    def myTransform(input: SCollection[InputType]): Scollection[OutputType] = ???
    

    但如果你打算编写自己的 Source/Sink,请查看ScioIO class

    【讨论】:

      【解决方案2】:

      您可以使用map 函数映射您的元素example

      您可以传递来自另一个类的方法引用,而不是传递 lambda 示例.map(MyClass.MyFunction)

      【讨论】:

      • 这应该适用于 DoFn,但我怎样才能分离我的复合转换并保留上下文?
      • 我没有一个很好的主意,但我认为您可以使用 .applyTransform(MyTransform) 传入复合转换并获取上下文,就像它在此处完成的那样 github.com/spotify/scio/blob/…
      【解决方案3】:

      我认为解决此问题的一种方法是在另一个包中定义一个对象,然后在该对象中创建一个方法,该方法具有转换所需的逻辑。例如:

      def main(cmdlineArgs: Array[String]): Unit = {
          val (sc, args) = ContextAndArgs(cmdlineArgs)
      
          val defaulTopic = "tweets"
          val input = args.getOrElse("inputTopic", defaulTopic)
          val output = args("outputTopic")
      
          val inputStream: SCollection[Tweet] = sc.withName("read from pub sub").pubsubTopic(input)
            .withName("map to tweet class").map(x => {parse(x).extract[Tweet]})
      
          inputStream
            .flatMap(sentiment.predict) // object sentiment with method predict
      
        }
      
      object sentiment  {
      
        def predict(tweet: Tweet): Option[List[TweetSentiment]] = {
          val data = tweet.text
          val emptyCase = Some("")
          Some(data) match {
            case `emptyCase` => None
            case Some(v) => Some(entitySentimentFile(data)) // I used another method, //not defined
          }
      
        }
      
      

      也请通过此链接获取Scio examples 中给出的示例

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多