【发布时间】:2018-05-30 02:59:53
【问题描述】:
我目前正在尝试使用 withIdAttribute 和 PubSubIO 对来自 PubSub 的消息进行重复数据删除(因为 PubSub 只保证至少一次传递)。
我的消息有四个字段,label1、label2、timestamp 和 value。在某个时间戳上,两个标签的值是唯一的。因此,我在写入 PubSub 之前另外设置了一个 uniqueID 属性,该属性等于作为字符串连接的这三个值。
例如,这是我使用 gcp 控制台工具从订阅中读取的结果。
┌───────────────────────────────────────────────────────────────────────────────────────────────────────────┬────────────────┬───────────────────────────────────────────────────────────────────────────────────────────────────┐
│ DATA │ MESSAGE_ID │ ATTRIBUTES │
├───────────────────────────────────────────────────────────────────────────────────────────────────────────┼────────────────┼───────────────────────────────────────────────────────────────────────────────────────────────────┤
│ {"label1":"5c381a51-2873-49b8-acf5-60a0fa59fc65","label2":"foobarbaz","timestamp":1513199383,"value":4.2} │ 11185357338249 │ eventTime=2017-12-13T21:09:43Z uniqueID=5c381a51-2873-49b8-acf5-60a0fa59fc65:foobarbaz:1513199383 │
└───────────────────────────────────────────────────────────────────────────────────────────────────────────┴────────────────┴───────────────────────────────────────────────────────────────────────────────────────────────────┘
在我的 Beam 作业中,在 GCP Dataflow 上运行,我将这些消息解码为 json,将它们窗口化,按它们的两个标签对它们进行分组,然后尝试聚合它们。但是,在我的聚合类CreateMyAggregationsFn 中,我看到具有相同label1、label2 和timestamp 的重复消息。
public class MyBeam {
public interface MyBeanOptions extends PipelineOptions {
// ...
}
private static class MyMessage implements Serializable {
public long timestamp;
public double value;
public String label1;
public String label2;
}
private static class CreateMyAggregationsFn extends DoFn<KV<String, Iterable<MyMessage>>, MyAggregate> {
@ProcessElement
public void processElement(ProcessContext c) {
ArrayList<MyMessage> messages = new ArrayList<>();
c.element().getValue().forEach(messages::add);
Collections.sort(messages, (msg1, msg2) -> Long.compare(msg1.timestamp, msg2.timestamp));
MyMessage prev = null
for (MyMessage msg : messages) {
if (prev != null &&
msg.timestamp == prev.timestamp &&
msg.label1.equals(prev.label1) &&
msg.label2.equals(prev.label2)) {
// ... identifying duplicates here
}
prev = msg;
}
...
}
}
public static void main(String[] args) throws IOException {
MyBeamOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyBeamOptions.class);
Pipeline pipeline = Pipeline.create(options);
PubsubIO.Read<String> pubsubReadSubscription =
PubsubIO.readStrings()
.withTimestampAttribute("eventTime")
.withIdAttribute("uniqueID")
.fromSubscription(options.getPubsubSubscription());
pipeline
.apply("PubsubReadSubscription", pubsubReadSubscription)
.apply("ParseJsons", ParseJsons.of(MyMessage.class))
.setCoder(SerializableCoder.of(MyMessage.class))
.apply(
"Window",
Window.<MyMessage>into(FixedWindows.of(Duration.standardSeconds(60)))
.triggering(
AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterPane.elementCountAtLeast(1)))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.standardSeconds(3600)))
.apply(
"PairMessagesWithLabels",
MapElements.into(
TypeDescriptors.kvs(
TypeDescriptors.strings(), TypeDescriptor.of(MyMessage.class)))
.via(msg -> KV.of(msg.label1 + ":" + msg.label2, msg)))
.apply("GroupMessagesByLabels", GroupByKey.<String, MyMessage>create())
.apply("CreateAggregations", ParDo.of(new CreateMyAggregationsFn()))
// ...
PipelineResult result = pipeline.run();
}
}
是否有额外的步骤来使用我缺少的 withIdAttribute 方法从 PubSubIO 中删除消息?
【问题讨论】:
标签: google-cloud-dataflow apache-beam google-cloud-pubsub