【问题标题】:Spark consumer doesn't read Kafka producer messages ScalaSpark消费者不阅读Kafka生产者消息Scala
【发布时间】:2018-04-25 12:56:33
【问题描述】:

我正在尝试创建连接到 Spark 消费者的 Kafka 生产者。生产者工作正常,但是 Spark 中的消费者由于某种原因没有从主题中读取数据。我在 docker-compose 中使用 spotify/kafka 映像运行 kafka。

这是我的消费者:

object SparkConsumer {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("KafkaSparkStreaming")
      .master("local[*]")
      .getOrCreate()

    val ssc = new StreamingContext(spark.sparkContext, Seconds(3))
    val topic1 = "topic1"

    def kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "group1",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    val lines = KafkaUtils.createDirectStream[String, String](
        ssc,
        LocationStrategies.PreferConsistent,
        ConsumerStrategies.Subscribe[String, String](Set(topic1), kafkaParams)
    )
    lines.print()
}

Kafka Producer 如下所示:

object KafkaProducer {

  def main(args: Array[String]) {

    val events = 10
    val topic = "topic1"
    val brokers = "localhost:9092"
    val random = new Random()
    val props = new Properties()

    props.put("bootstrap.servers", brokers)
    props.put("client.id", "KafkaProducerExample")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)

    val t = System.currentTimeMillis()
    for (nEvents <- Range(0, events)) {
      val key = null
      val values = "2017-11-07 04:06:03"
      val data = new ProducerRecord[String, String](topic, key, values)

      producer.send(data)
      System.out.println("sent : " + data.value())
    }

    System.out.println("sent per second: " + events * 1000 / (System.currentTimeMillis() - t))
    producer.close()
  }
}

更新:

我在 Kafka 中的 docker-compose 文件:

version: '3.3'
services:
  kafka:
      image: spotify/kafka
      ports:
        - "9092:9092"

【问题讨论】:

  • 你是如何运行你的消费者的?,在你的 docker-compose 的另一个容器中?
  • 我在 docker-compose 中运行 Kafka。当 Kafka 运行时,我运行消费者,它已准备好处理数据,然后我运行生产者,它开始生成数据
  • 但是您的消费者在 docker 网络之外运行。了解在 zookeeper 中注册的代理名称是什么很重要。
  • 我添加了用于 Kafka 的 docker-compose 文件。我想使用没有 Zokeeper 的图像

标签: scala apache-spark apache-kafka spark-streaming


【解决方案1】:

这是将 Kafka 与 Docker 结合使用的常见问题。首先,您应该检查 Zookeeper 中针对您的主题的配置。您可以在 Kafka 容器中使用 Zookeeper 脚本。可能在创建主题时 ADVERTISED_HOST 是您的服务的名称。因此,当消费者尝试连接到代理时,这将返回“kaf​​ka”作为代理位置。因为您在 docker 网络之外运行消费者,所以您的消费者永远不会连接到代理进行消费。尝试使用 ADVERTISED_HOST=localhost 为您的 kafka 容器设置环境。

【讨论】:

  • 我应该完全像这样设置它:environment: - ADVERTISED_HOST=localhost 吗?
  • 是的,您的消费者可以在那里找到代理
猜你喜欢
  • 2018-12-18
  • 1970-01-01
  • 2018-01-17
  • 1970-01-01
  • 1970-01-01
  • 2019-07-01
  • 1970-01-01
  • 2023-01-21
  • 1970-01-01
相关资源
最近更新 更多