【问题标题】:Kafka stream join with a specific key as inputKafka 流以特定键作为输入加入
【发布时间】:2017-06-08 05:33:33
【问题描述】:

我在模式注册表中有 3 个不同的主题和 3 个 Avro 文件,我想流式传输这些主题并将它们连接在一起并将它们写入一个主题。问题是我要加入的键与我将数据写入每个主题的键不同。

假设我们有这 3 个 Avro 文件:
警报

{
  "type" : "record",
  "name" : "Alarm",
  "namespace" : "com.kafkastream.schema.avro",
  "fields" : [ {
    "name" : "alarm_id",
    "type" : "string",
    "doc" : "Unique identifier of the alarm."
  }, {
    "name" : "ne_id",
    "type" : "string",
    "doc" : "Unique identifier of the  network element ID that produces the alarm."
  }, {
    "name" : "start_time",
    "type" : "long",
    "doc" : "is the timestamp when the alarm was generated."
  }, {
    "name" : "severity",
    "type" : [ "null", "string" ],
    "doc" : "The severity field is the default severity associated to the alarm ",
    "default" : null
  }]
}

事件:

{
  "type" : "record",
  "name" : "Incident",
  "namespace" : "com.kafkastream.schema.avro",
  "fields" : [ {
    "name" : "incident_id",
    "type" : "string",
    "doc" : "Unique identifier of the incident."
  }, {
    "name" : "incident_type",
    "type" : [ "null", "string" ],
    "doc" : "Categorization of the incident e.g. Network fault, network at risk, customer impact, etc",
    "default" : null
  }, {
    "name" : "alarm_source_id",
    "type" : "string",
    "doc" : "Respective Alarm"
  }, {
    "name" : "start_time",
    "type" : "long",
    "doc" : "is the timestamp when the incident was generated on the node."
  }, {
    "name" : "ne_id",
    "type" : "string",
    "doc" : "ID of specific network element."
  }]
}

维护:

{
  "type" : "record",
  "name" : "Maintenance",
  "namespace" : "com.kafkastream.schema.avro",
  "fields" : [ {
    "name" : "maintenance_id",
    "type" : "string",
    "doc" : "The message number is the unique ID for every maintenance"
  }, {
    "name" : "ne_id",
    "type" : "string",
    "doc" : "The NE ID is the network element ID on which the maintenance is done."
  }, {
    "name" : "start_time",
    "type" : "long",
    "doc" : "The timestamp when the maintenance start."
  }, {
    "name" : "end_time",
    "type" : "long",
    "doc" : "The timestamp when the maintenance start."
  }]
}

对于这些 Avro,我的 Kafka 中有 3 个主题(例如,alarm_raw、incident_raw、maintenance_raw),每当我想写入这些主题时,我都使用 ne_id 作为键(因此主题由 ne_id 分区)。现在我想加入这3个主题并获得一条新记录并将其写入一个新主题。问题是我想加入 AlarmIncident 基于 alarm_id 和 alarm_source_id,根据ne_id加入报警和维护。我想避免创建新主题并重新分配新密钥。无论如何,我在加入时指定了密钥吗?

【问题讨论】:

    标签: apache-kafka apache-kafka-streams kafka-join


    【解决方案1】:

    这取决于您要使用哪种联接 (c.f. https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics)

    对于 KStream-KStream 连接,目前(v0.10.2 及更早版本)除了设置新密钥(例如,通过使用 selectKey())并进行重新分区之外别无他法。

    对于 KStream-KTable 加入,Kafka 0.10.2(将在未来几周内发布)包含一个名为 GlobalKTables 的新功能(参见 https://cwiki.apache.org/confluence/display/KAFKA/KIP-99%3A+Add+Global+Tables+to+Kafka+Streams)。这允许您在 KTable 上进行非键连接(即,KStream-GlobalKTable 连接,因此您不需要重新分区 GlobalKTable 中的数据)。

    注意:KStream-GlobalKTable 连接与 KStream-KTable 连接具有不同的语义。与后者相比,它不是时间同步的,因此,对于 GlobalKTable 更新,连接在设计上是不确定的;即,不能保证哪条 KStream 记录将首先“看到” GlobalKTable 更新并因此加入更新后的 GlobalKTable 记录。

    还计划添加 KTable-GlobalKTable 连接。这可能会在0.10.3 中提供。不过没有计划添加“全局”KStream-KStream 连接。

    【讨论】:

    • 我的情况是 KStream-KStream 所以除了重新划分我的主题别无他法。
    【解决方案2】:

    您可以通过修改它来保持相同的密钥。
    您可以使用KeyValueMapper,通过它您可以修改您的密钥和值。
    您应该按如下方式使用它:

    val modifiedStream = kStream.map[String,String](
        new KeyValueMapper[String, String,KeyValue[String,String]]{
            override def apply(key: String, value: String): KeyValue[String, String] = new KeyValue("modifiedKey", value)
        }
    )
    

    您可以将上述逻辑应用于多个Kstream 对象,以维护一个用于加入KStreams 的键。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-08-13
      • 1970-01-01
      • 2018-11-03
      • 2020-02-22
      • 1970-01-01
      • 1970-01-01
      • 2018-03-17
      相关资源
      最近更新 更多