【问题标题】:Kafka 2.3.0 producer and consumerKafka 2.3.0 生产者和消费者
【发布时间】:2019-08-27 08:28:18
【问题描述】:

我是 kafka 新手,想使用 Kafka 2.3 来实现生产者/消费者应用程序。

  1. 我已经在我的 ubuntu 服务器上下载并安装了 Kafka 2.3。
  2. 我在网上找到了一些代码,并在我的笔记本电脑上在 IDEA 中构建它,但消费者无法获得任何信息。
  3. 我已经在我的服务器上检查了该主题的主题信息。
  4. 我曾使用kafka-console-consumer 来检查该主题,成功获得该主题的值,但不是我的消费者。

那么我的消费者有什么问题?

制作人

package com.phitrellis.tool

import java.util.Properties
import java.util.concurrent.{Future, TimeUnit}

import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer._

object MyKafkaProducer extends App {

  def createKafkaProducer(): Producer[String, String] = {
    val props = new Properties()
    props.put("bootstrap.servers", "*:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("producer.type", "async")
    props.put("acks", "all")

    new KafkaProducer[String, String](props)
  }

  def writeToKafka(topic: String): Unit = {
    val producer = createKafkaProducer()
    val record = new ProducerRecord[String, String](topic, "key", "value22222222222")
    println("start")
    producer.send(record)
    producer.close()
    println("end")
  }

  writeToKafka("phitrellis")

}

消费者

package com.phitrellis.tool

import java.util
import java.util.Properties
import java.time.Duration
import scala.collection.JavaConverters._
import org.apache.kafka.clients.consumer.KafkaConsumer

object MyKafkaConsumer extends App {

  def createKafkaConsumer(): KafkaConsumer[String, String] = {
    val props = new Properties()
    props.put("bootstrap.servers", "*:9092")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    //    props.put("auto.offset.reset", "latest")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    props.put("group.id", "test")

    new KafkaConsumer[String, String](props)
  }

  def consumeFromKafka(topic: String) = {

    val consumer: KafkaConsumer[String, String] = createKafkaConsumer()
    consumer.subscribe(util.Arrays.asList(topic))
    while (true) {
      val records = consumer.poll(Duration.ofSeconds(2)).asScala.iterator
      println("true")
      for (record <- records){
        print(record.value())
      }
    }
  }

  consumeFromKafka("phitrellis")

}

【问题讨论】:

    标签: scala apache-kafka


    【解决方案1】:

    Consumer 代码中有两行至关重要:

    props.put("auto.offset.reset", "latest")
    props.put("group.id", "test")
    

    要从主题的开头开始阅读,您必须将auto.offset.reset 设置为earliestlatest 因为您跳过了在消费者开始之前生成的消息)。

    group.id负责群管理。如果您使用一些group.id 开始处理数据,然后重新启动您的应用程序或使用相同的group.id 开始新的应用程序,则只会读取新消息。

    对于您的测试,我建议添加auto.offset.reset -> earliest 并更改group.id

    props.put("auto.offset.reset", "earliest")
    props.put("group.id", "test123")
    

    另外: 您必须记住KafkaProducer::send 返回Future&lt;RecordMetadata&gt; 并且消息是异步发送的,如果您在Future 之前完成程序,则可能不会发送完成消息。

    【讨论】:

      【解决方案2】:

      这里有两个部分。生产方和消费者。

      您没有提及制作人,所以我们假设它确实有效。但是,您检查过服务器吗?您可以检查 kafka 日志文件,看看是否有关于这些特定主题/分区的任何数据。

      在消费者方面,要进行验证,您应该尝试使用同一主题的命令行进行消费,以确保数据在其中。在以下链接中查找“Kafka Consumer Console”,然后按照这些步骤操作。

      http://cloudurable.com/blog/kafka-tutorial-kafka-from-command-line/index.html

      如果有关于该主题的数据,那么运行该命令应该会为您获取数据。如果不是,那么它将只是“挂起”,因为它正在等待将数据写入主题。

      此外,您可以尝试使用这些命令行工具生成相同的主题,以确保您的集群配置正确、您拥有正确的地址和端口、端口未被阻塞等。

      【讨论】:

      • 我曾经使用kafka-console-consumer查看过这个topic,发现有命名topic但没有任何数据,并使用kafka-console-producer生产数据,可以在 kafka-console-consumer 控制台查看数据。那么我的生产者不能向服务器发送数据吗?
      猜你喜欢
      • 2019-01-15
      • 1970-01-01
      • 1970-01-01
      • 2018-01-07
      • 2018-02-08
      • 2019-10-16
      • 2018-12-18
      • 2018-02-05
      • 1970-01-01
      相关资源
      最近更新 更多