【问题标题】:Check If System Is Connected To Kafka检查系统是否连接到 Kafka
【发布时间】:2017-04-22 07:33:10
【问题描述】:

我需要用Java编写一个冒烟测试来验证系统是否连接到kafka,

有人知道吗? 我找到了这篇文章:

How to check whether Kafka Server is running?

但是从Java代码做起来太复杂了,我不认为这是我应该使用的方向。

提前致谢。

【问题讨论】:

  • 你找到这个问题的答案了吗?为什么您认为以下答案不够好?
  • 不幸的是,我没有检查任何解决方案。所以我不能说任何解决方案都不够好。

标签: java apache-kafka smoke-testing


【解决方案1】:

我有同样的问题,我不想让这个问题没有任何答案。 我阅读了很多关于如何检查连接的内容,我发现的大多数答案都是检查与 Zk 的连接,但我真的很想直接检查与 Kafka 服务器的连接。

我所做的是创建一个简单的 KafkaConsumer 并使用 listTopics() 列出所有主题。如果连接成功,那么您将获得一些回报。否则,您将获得TimeoutException

  def validateKafkaConnection(kafkaParams : mutable.Map[String, Object]) : Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", kafkaParams.get("bootstrap.servers").get.toString)
    props.put("group.id", kafkaParams.get("group.id").get.toString)
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    val simpleConsumer = new KafkaConsumer[String, String](props)
    simpleConsumer.listTopics()
  }

然后您可以将此方法包装在try-catch 语句中以捕获异常。

【讨论】:

  • 您可能还需要更改默认超时值:props.put(REQUEST_TIMEOUT_MS_CONFIG, "5000"); props.put(SESSION_TIMEOUT_MS_CONFIG, "4000");避免等待太多时间。
【解决方案2】:

您可以使用以下方法检查服务器是否正在运行:

ZkClient zkClient = new ZkClient("your_zookeeper_server", 5000 /* ZOOKEEPER_SESSION_TIMEOUT */, 5000 /* ZOOKEEPER_CONNECTION_TIMEOUT */, ZKStringSerializer$.MODULE$);
List<Broker> brokers = scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllBrokersInCluster());
if (brokers.isEmpty()) {
    // No brokers available
} else {
    // There are brokers available
}

【讨论】:

  • 这是一种检查 Zookeeper 连接的方法,而不是 Kafka 服务器。
猜你喜欢
  • 1970-01-01
  • 2021-09-06
  • 2012-08-16
  • 1970-01-01
  • 1970-01-01
  • 2011-01-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多