【问题标题】:GCP Dataflow throws exception Shuffle key too largeGCP Dataflow 抛出异常 Shuffle key too large
【发布时间】:2020-05-07 13:25:28
【问题描述】:

我有一段代码对我的数据进行分组,但在我输出时它会引发异常。

这个类作为KV中的key

class CKey {
    private Long id;
    private Long subId;
}

这是我的数据流工作的一部分

TupleTag<CItem> itemsTuple = //...
TupleTag<CMeta> metaTuple = //...

//...

PCollection<KV<CKey, CItem>> items = null;
PCollection<KV<CKey, CMeta>> meta;

KeyedPCollectionTuple.of(itemsTuple, items).and(metaTuple, meta.next())
        .apply(CoGroupByKey.create())
        .apply(new CustomGroupPairsFn());

加入数据的自定义函数

class CustomGroupPairsFn extends DoFn<KV<CKey, CoGbkResult>, MyCustomObject> {

        @ProcessElement
        public void processElement(@Element KV<CKey, CoGbkResult> element, OutputReceiver<MyCustomObject> out) {
            CoGbkResult pair = element.getValue();
            Iterator<CItem> citem = pair.getAll(ITEMS).iterator();
            Iterator<CMeta> cmeta = pair.getAll(METADATA).iterator();
            try {
                out.output(new MyCustomObject(citem.next(), cmeta));
            } catch (Exception e) {
                log.error("Error occurred", e);
            }
        }
    }

try里面只有1行代码,里面抛出异常,异常:

我该如何解决这个问题?

【问题讨论】:

  • 您能详细介绍一下您的管道吗?附近有 GroupByKey 吗?是流媒体吗?还是批量?

标签: java google-cloud-platform google-cloud-dataflow apache-beam


【解决方案1】:

发生此错误是因为您正在改组一个太大的键。

这是什么意思?在 Dataflow 中,流式管道允许的最大 shuffle key 为 1.5 MB。您似乎有一个比这更大的元素键。

也许您的管道在某个意想不到的地方有一个 GroupByKey/Shuffling 操作,因此最好了解它的更多详细信息。

【讨论】:

  • 我的管道中有一个CoGroupByKey.create(),这个密钥由什么组成,何时完成改组以及为什么?
  • 我发现GroupByKey.create()CoGroupByKey.create()内部使用。但是如何更改密钥的大小?
  • 你能分享更多关于你的管道的信息吗?是流式还是批处理?您提到的 Par 的下游和上游有哪些操作?从技术上讲,密钥是 KV 对中的密钥。洗牌是指数据被序列化并在管道中的工作人员周围传递......
  • 这是一个批处理作业。我已经更新了问题中的代码。
  • 只有关键才是最重要的。但该值可能会在其他运算符中成为键(例如 Distinct)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-01-24
  • 2012-05-07
  • 2020-02-19
  • 2018-06-22
  • 2018-05-29
相关资源
最近更新 更多