【问题标题】:Creating Custom Windowing Function in Apache Beam在 Apache Beam 中创建自定义窗口函数
【发布时间】:2018-09-12 16:59:06
【问题描述】:

我有一个 Beam 管道,它从读取多个文本文件开始,其中文件中的每一行代表稍后在管道中插入 Bigtable 的行。该场景需要确认从每个文件中提取的行数和稍后插入 Bigtable 的行数匹配。为此,我计划开发一个自定义窗口策略,以便基于文件名将单个文件中的行分配给单个窗口,作为将传递给窗口函数的键。

是否有任何用于创建自定义窗口函数的代码示例?

【问题讨论】:

  • 这是流式传输管道吗?如果没有,您可以使用 GroupByKey 完成此操作
  • @Pablo 感谢您的回复。但是,根据我有限的知识和查看文档,GroupByKey 仅充当 SQL GROUP BY & 实际上并没有分配窗口。在我的场景中,这些行已经根据文件名作为容器(即密钥)分组在一起。这里的问题是能够插入属于同一文件的行(行)作为同一窗口的一部分,我相信如果在调用CloudBigtableIO.writeToTable()之前将这些行作为同一窗口的一部分,这是可能的。
  • 我不明白你的意思。如果您已经按文件名对行进行了分组,为什么还需要插入其他行?这些其他行是从哪里来的?
  • 很抱歉给您带来了困惑。当我说“行已经根据文件名分组在一起”时,基本上我想说的是我已经知道特定行属于哪个文件。这不是问题。问题是调用CloudBigtableIO.writeToTable() 需要在每个窗口的基础上发生(1 个文件名 = 1 个窗口)。不幸的是GroupByKey 不会为每个键创建窗口。希望他的澄清。
  • Hmmm in Beam windows 用来表示时间,没有其他维度。如果要对元素进行窗口化,则必须添加时间戳,并应用窗口化策略。您可以添加自定义时间窗口策略,但这似乎不是您想要的?

标签: google-cloud-dataflow apache-beam dataflow


【解决方案1】:

虽然我改变了确认插入行数的策略,但对于任何对从批处理源读取的窗口元素感兴趣的人,例如FileIO 在批处理作业中,这是创建自定义窗口策略的代码:

public class FileWindows extends PartitioningWindowFn<Object, IntervalWindow>{

private static final long serialVersionUID = -476922142925927415L;
private static final Logger LOG = LoggerFactory.getLogger(FileWindows.class);

@Override
public IntervalWindow assignWindow(Instant timestamp) {
    Instant end = new Instant(timestamp.getMillis() + 1);
    IntervalWindow interval = new IntervalWindow(timestamp, end);
    LOG.info("FileWindows >> assignWindow(): Window assigned with Start: {}, End: {}", timestamp, end);
    return interval;
}

@Override
public boolean isCompatible(WindowFn<?, ?> other) {
    return this.equals(other);
}

@Override
public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
    if (!this.isCompatible(other)) {
        throw new IncompatibleWindowException(other, String.format("Only %s objects are compatible.", FileWindows.class.getSimpleName()));
    }
  }

@Override
public Coder<IntervalWindow> windowCoder() {
    return IntervalWindow.getCoder();
}   

}

然后可以在管道中使用如下:

p
 .apply("Assign_Timestamp_to_Each_Message", ParDo.of(new AssignTimestampFn()))
 .apply("Assign_Window_to_Each_Message", Window.<KV<String,String>>into(new FileWindows())
  .withAllowedLateness(Duration.standardMinutes(1))
  .discardingFiredPanes());

请记住,您需要编写AssignTimestampFn(),以便每条消息都带有时间戳。

【讨论】:

    猜你喜欢
    • 2021-12-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-04-13
    • 2011-06-16
    • 1970-01-01
    相关资源
    最近更新 更多