【问题标题】:Is there any way to check if kafka is up and running from kafka-net有什么方法可以检查 kafka 是否已启动并从 kafka-net 运行
【发布时间】:2016-06-18 20:33:35
【问题描述】:

我正在使用 kafka-net 客户端向 kafka 发送消息。我只是想知道是否有任何方法可以检查 kafka 服务器是否已启动并可以接收消息。我关闭了 kafka,但生产者已成功创建,并且 SendMessageAsync 只是冻结了很长时间。我试图通过超时,但它没有改变任何东西。我使用 kafka-net 0.9 当 kafka 服务器启动并运行时,它工作得很好

【问题讨论】:

标签: apache-kafka


【解决方案1】:

试试这个。

在你的构造函数中,放

options = new KafkaOptions(uri);
var endpoint = new DefaultKafkaConnectionFactory().Resolve(options.KafkaServerUri.First(), options.Log);
client = new KafkaTcpSocket(new DefaultTraceLog(), endpoint);

然后在您发送每条消息之前,

// test if the broker is alive
var request = new MetadataRequest { Topics = new List<string>() { Topic } };
var task1 = client.WriteAsync(request.Encode()).ConfigureAwait(false);
Task<KafkaDataPayload> task2 = Task.Factory.StartNew(() =>  task1.GetAwaiter().GetResult());
if (task2.Wait(30000) == false)
{
    throw new TimeoutException("Timeout while sending message to kafka broker!");
}

如果您的消息量很大,这会影响性能,但如果消息量较少,则无关紧要。

【讨论】:

    【解决方案2】:

    broker 的 id 在 zookeeper(/brokers/ids/[brokerId]) 中注册为临时节点,允许其他 broker 和消费者检测故障。(现在健康的定义相当幼稚。如果注册在 zk /brokers/ids/[brokerId] 中,broker 是健康,否则就死了)。

    只要代理的会话存在,zookeeper 临时节点就存在 活跃。

    您可以通过 ZkUtils.getSortedBrokerList(zkClient) 检查代理是否已启动,它会返回 /brokers/ids 下的所有活动代理 id

    import org.I0Itec.zkclient.ZkClient;
    
    ZkClient zkClient = new ZkClient(properties.getProperty("zkQuorum"), zkSessionTimeout, zkConnectionTimeout,ZKStringSerializer$.MODULE$);
    ZkUtils.getSortedBrokerList(zkClient);
    

    参考
    Kafka data structures in Zookeeper

    【讨论】:

      猜你喜欢
      • 2020-07-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-08-19
      • 2012-10-31
      • 2022-07-08
      • 2016-10-21
      • 1970-01-01
      相关资源
      最近更新 更多