【问题标题】:Kafka producer - Sending a list of messagesKafka 生产者 - 发送消息列表
【发布时间】:2018-07-09 16:22:49
【问题描述】:

我需要发送几批消息,并确保每批中的所有消息在同一批中一起到达消费者。

例如,假设我需要分5个批次/组发送400条消息,每个组将包含80条消息,并且需要在消费者端同批次消费。

我正在使用 spark 结构化流处理消息。

我读过类似的问题,但我仍然对正确的做法感到困惑。

生产者是否应该将所有消息(每批)放在一个列表中,并将列表发送给 kafka?

还有其他更好的方法吗?

谢谢

【问题讨论】:

    标签: apache-spark apache-kafka


    【解决方案1】:

    这可以通过创建一个有5个分区的主题来实现,这样就可以将每种类型的批处理消息发送到每个分区

    ProducerRecord(java.lang.String topic, java.lang.Integer partition, K key, V value)
    Creates a record to be sent to a specified topic and partition
    

    我们可以创建 5 个消费者并将每个消费者分配给每个分区,但我不确定每个消费者 poll() 是否会一次提取该分区中的所有消息

    手动分区分配。 here doc

    例如: 如果进程正在维护与该分区关联的某种本地状态(如本地磁盘键值存储),那么它应该只获取它在磁盘上维护的分区的记录。
    如果进程本身是高可用的,并且在失败时将重新启动(可能使用 YARN、Mesos 或 AWS 设施等集群管理框架,或者作为流处理框架的一部分)。在这种情况下,Kafka 无需检测故障并重新分配分区,因为消费进程将在另一台机器上重新启动。
    要使用此模式,您只需调用 assign(Collection) 并使用要使用的分区的完整列表,而不是使用 subscribe 订阅主题。

     String topic = "foo";
     TopicPartition partition0 = new TopicPartition(topic, 0);
     TopicPartition partition1 = new TopicPartition(topic, 1);
     consumer.assign(Arrays.asList(partition0, partition1));
    

    【讨论】:

      猜你喜欢
      • 2019-10-03
      • 1970-01-01
      • 1970-01-01
      • 2021-01-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多