【问题标题】:Dataflow Map side-input issue数据流映射侧输入问题
【发布时间】:2017-05-03 08:45:43
【问题描述】:

我在使用 DataflowRunner 创建 Map PCollectionView 时遇到问题。

下面的管道将无边界的countingInput 与来自侧输入的值(包含10 个生成值)聚合在一起。 在 gcp 上运行管道时,它会卡在 View.asMap() 转换中。 更具体地说, ParDo(StreamingPCollectionViewWriter) 没有任何输出。

我使用数据流 2.0.0-beta3 以及 beam-0.7.0-SNAPSHOT 进行了尝试,但没有任何结果。请注意,我的管道在使用本地 DirectRunner 时运行没有任何问题。

我做错了吗? 感谢所有帮助,提前感谢您帮助我!

public class SimpleSideInputPipeline {

    private static final Logger LOG = LoggerFactory.getLogger(SimpleSideInputPipeline.class);

    public interface Options extends DataflowPipelineOptions {}

    public static void main(String[] args) throws IOException {
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
        Pipeline pipeline = Pipeline.create(options);

        final PCollectionView<Map<Integer, String>> sideInput = pipeline
                .apply(CountingInput.forSubrange(0L, 10L))
                .apply("Create KV<Integer, String>",ParDo.of(new DoFn<Long, KV<Integer, String>>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        c.output(KV.of(c.element().intValue(), "TEST"));
                    }
                }))
                .apply(View.asMap());

        pipeline
            .apply(CountingInput.unbounded().withRate(1, Duration.standardSeconds(5)))
            .apply("Aggregate with side-input",ParDo.of(new DoFn<Long, KV<Long, String>>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    Map<Integer, String> map = c.sideInput(sideInput);

                    //get first segment from map
                    Object[] values = map.values().toArray();
                    String firstVal = (String) values[0];
                    LOG.info("Combined: K: "+ c.element() + " V: " + firstVal + " MapSize: " + map.size());
                    c.output(KV.of(c.element(), firstVal));
                }
            }).withSideInputs(sideInput));

        pipeline.run();
    }
}

【问题讨论】:

  • 面临同样的问题。你找到解决办法了吗?
  • 仍然没有解决办法。在此期间您找到了吗?
  • 似乎对我有用(请参阅我在 BEAM-2155 中稍作修改的代码)。

标签: google-cloud-dataflow apache-beam


【解决方案1】:

不必担心ParDo(StreamingPCollectionViewWriterFn) 不会记录任何输出 - 它实际上是将每个元素写入内部位置。

您的代码在我看来没问题,应该对此进行调查。我已提交BEAM-2155

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-06-13
    • 2011-10-24
    • 1970-01-01
    相关资源
    最近更新 更多