【问题标题】:Scala how to subscribe to multiple kafka topicsScala如何订阅多个kafka主题
【发布时间】:2018-12-26 23:21:35
【问题描述】:

我想将字符串数组/列表转换为 scala 中的 util.Collection[String] 对象。我尝试了多种方法,但都没有成功。

import org.apache.kafka.clients.consumer.KafkaConsumer


object KafkaConsumerApp {

  def main(args: Array[String]): Unit = {

    val prop:Properties = new Properties()
    prop.put("bootstrap.servers","192.168.1.100:9092,192.168.1.141:9092,192.168.1.113:9092,192.168.1.118:9092")
    prop.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
    prop.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")

    val consumer = new KafkaConsumer(prop)

    val topics = List[String] ("my_topic_partition","my_topic_partition")
    val a = Collections.singletonList(topics)

    consumer.subscribe(a)

  }
}

consumer.subscribe(a) 返回编译时错误

Error:(24, 14) overloaded method value subscribe with alternatives:
  (x$1: java.util.regex.Pattern)Unit <and>
  (x$1: java.util.Collection[String])Unit
 cannot be applied to (java.util.List[List[String]])
    consumer.subscribe(a)

【问题讨论】:

  • 发布您尝试过的内容,也许有人可以帮助您解决问题。
  • Consumer.subscribe 获取主题列表,所以我认为没有任何问题,请阅读消费者 API 的文档。

标签: scala apache-kafka kafka-consumer-api


【解决方案1】:

您无需创建Singleton ListList 已经是 Collection

val: List[String] topics = List("my_topic_partition","my_topic_partition")
consumer.subscribe(topics)

如果您需要它是 Java,只需输入 .asJava 就像 topics.asJava 并使用导入 import collection.JavaConverters._

【讨论】:

  • 我仍然遇到同样的错误错误:(21、14)重载方法值订阅替代品:(x$1:java.util.regex.Pattern)Unit (x$1:java. util.Collection[String])Unit 不能应用于 (List[String]) consumer.subscribe(topics)
  • import collection.JavaConverters._; consumer.subscribe(Seq("topic-1", "topic-2").asJava)
  • 你试过 asJava 吗? consumer.subscribe(topics.asJava)
猜你喜欢
  • 2019-10-26
  • 1970-01-01
  • 1970-01-01
  • 2017-05-16
  • 2021-10-14
  • 2014-01-21
  • 1970-01-01
  • 2020-09-20
  • 1970-01-01
相关资源
最近更新 更多