【问题标题】:Json column as key in kafka producer and push in different partitions on the basis of keyjson列作为kafka生产者的key,根据key推送到不同的partition
【发布时间】:2020-12-23 03:41:28
【问题描述】:

正如我们所知,我们可以向 kafka 生产者发送一个密钥,该密钥在内部进行哈希处理,以查找主题数据中的哪个分区。我有一个 producer,我在其中发送 JSON 格式的数据。

[
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99:9e:dc",
    "EVENTTIME": "2020-07-23 07:50:42",
    "TIME": 75042,
  },
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99:9e:dc",
    "EVENTTIME": "2020-07-23 08:02:26",
    "TIME": 80226
  },
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99:9e:dc",
    "EVENTTIME": "2020-07-23 08:39:55",
    "TIME": 83955
  },
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99:9e:dc",
    "EVENTTIME": "2020-07-23 08:43:26",
    "TIME": 84326
},
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99:9e:dc",
    "EVENTTIME": "2020-07-23 08:44:22",
    "TIME": 84422
  },
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99:9e:dc",
    "EVENTTIME": "2020-07-23 08:45:09",
    "TIME": 84509
  },
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99:9e:dc",
    "EVENTTIME": "2020-07-23 08:45:58",
    "TIME": 84558
  },
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99",
    "EVENTTIME": "2020-07-23 08:45:58",
    "TIME": 84558
          },
  {
    "DATE": 20200723,
    "SOURCETYPE": "WIFI",
    "DEVICEID": "24:6f:28:99",
    "EVENTTIME": "2020-07-23 08:45:58",
    "TIME": 84558
  }
]

我想根据 key(DEVICEID) 推送 topic 但不同分区的数据。 我创建了具有两个分区 0 和 1 的主题。但它将所有数据存储在分区 0 中。我希望所有唯一密钥(设备 ID)都存储在不同的分区中。代码:

object Producer extends App{
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.connect.json.JsonSerializer")
    val producer = new KafkaProducer[String,JsonNode](props)
    println("inside prducer")
    val mapper = (new ObjectMapper() with ScalaObjectMapper).
        registerModule(DefaultScalaModule).
        configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false).
        findAndRegisterModules(). // register joda and java-time modules automatically
        asInstanceOf[ObjectMapper with ScalaObjectMapper] 
     val filename = "/Users/rishunigam/Documents/devicd.json"
     val jsonNode: JsonNode=  mapper.readTree(new File(filename))
     val s = jsonNode.size()
     for(i <- 0 to jsonNode.size()-1) {
     val js = jsonNode.get(i)
       val keys = jsonNode.get(i).findValue("DEVICEID").toString
       println(keys)
       println(js)
     val record = new ProducerRecord[String,JsonNode]( "tpch.devices_logs",keys,js)
   println(record)
  producer.send(record)
}
    println("producer complete")
    producer.close()
}

【问题讨论】:

  • 您想将数据分发到 paetirions 吗?或抱歉,如果小姐理解您的问题
  • 根据key将数据分配到partition中。

标签: apache-kafka kafka-consumer-api kafka-producer-api


【解决方案1】:

它将所有数据存储在partition-0中

这并不意味着它不起作用。只是意味着键的哈希最终在同一个分区中。

如果要覆盖默认的partitioner,需要定义自己的Partitioner类来解析消息并分配合适的partition,然后在Producer属性中设置partitioner.class

我希望所有唯一的密钥(设备ID)都存储在不同的分区中

那么您必须提前了解您的竞争数据集才能为 N 个设备创建 N 个分区。当您添加一个全新的设备时会发生什么?

【讨论】:

猜你喜欢
  • 2020-04-08
  • 1970-01-01
  • 2019-01-01
  • 2017-11-10
  • 2014-09-01
  • 2015-11-11
  • 2022-12-01
  • 1970-01-01
  • 2018-04-23
相关资源
最近更新 更多