【问题标题】:Read Data, Hold Data for N Seconds, Write Data (Kafka, Flink)读取数据、保持数据 N 秒、写入数据(Kafka、Flink)
【发布时间】:2019-04-06 00:20:00
【问题描述】:

应用程序从 kafka 主题中读取。 每条消息必须是唯一的(忽略重复) 保存“N”秒的数据 并作为单独的消息写入不同的 kafka 主题

有没有办法将消息保存'N秒'并写入kafka 每条消息必须在收到消息后的“N”秒后写入同一个主题。

目前我将数据保存在内存中的 json 结构中,每次收到消息时,我都会遍历我拥有的所有消息并比较时间。

这当然不是办法

val some_consumer= new FlinkKafkaConsumer09(data_topic
      , new JSONKeyValueDeserializationSchema(false), properties)
    some_consumer.setStartFromLatest()
    val in_stream = env.addSource(some_consumer)
      .filter(!_.isNull)
      .map(x => processMessage(x))
def process(x: ObjectNode){
 // store message in json if not existing
 // loop through entire set and compare times
 // if after 'N' seconds
   // write to kafka
    kafka_producer.send(new ProducerRecord[String, String](output_topic, the_unique_message))


}

【问题讨论】:

  • 即使不推荐,如果在.send()前加Thread.sleep()会怎样?

标签: scala apache-kafka apache-flink


【解决方案1】:

您应该将消息保持在 Flink 状态,以便它们被检查点,并且在失败的情况下将被恢复。

要对流进行重复数据删除,您可以通过使事件唯一的任何属性(即keyBy(x -> x.uniqueId))对流进行键控。然后我会使用KeyedProcessFunction,并为ValueState<Event> 中的每个键缓冲第一个事件。您可以使用 EventTimeTimer 或 ProcessingTimeTimer 来触发发送事件(以合适的为准)。如果去重的范围是N秒,那么可以在发出事件的同时清除状态。

【讨论】:

  • 完美,谢谢
  • 如果在旧事件发出之前有多个传入事件,Flink 不会覆盖状态吗?
  • 这里的目标是只发送每个键的第一个事件,并忽略该键的所有后续事件。所以这不是问题。
【解决方案2】:

您可以使用Tumbling Windows https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#tumbling-windows

    .window(TumblingEventTimeWindows.of(Time.seconds(5)))

上例表示数据每5秒出一次,打印到控制台时可以看得很清楚

在您的情况下,您不需要EventTime,可以使用ProcessingTime。 你也不需要 keyBy(),只使用 AllWindow,虽然使用 keyBy() 不是一个坏主意,所以你可以获得并行性

window()之后,您可以拨打FlinkKafkaSink。因为这个窗口会按照你的意愿每隔 X 分钟/秒周期性地发出事件

您可能要注意内存限制,因为保存在窗口中的数据存储在内存中

【讨论】:

  • 这不会产生预期的效果——在窗口结束前一毫秒到达的消息将立即发送出去。 (另外,窗口状态不必保存在内存中,它可以保存在 RocksDB 中。)
  • true,然后它必须使用 TumblingProcessingTimeWindows
  • TumblingProcessingTimeWindows 也不起作用。目标是在释放事件之前将事件保持 N 秒,这不是 Flink 中时间窗口的工作方式。时间窗口与时代对齐,而不是与事件对齐。
  • 事件在窗口中保持 n 秒,然后由定义的时间/计数(TimeTrigger 或 CountTrigger)触发。使用特定触发器.window()时使用默认触发器,或者?确保调用 .trigger() 可以使用自定义的 Trigger
  • 使用窗口 API 获得所需的语义需要自定义触发器和 Evictor。到那时,使用 WIndow API 变得比它的价值更复杂。
猜你喜欢
  • 2016-11-08
  • 1970-01-01
  • 1970-01-01
  • 2010-12-14
  • 2018-05-09
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-08-19
相关资源
最近更新 更多