【问题标题】:Alert if Kafka nodes are downKafka 节点关闭时发出警报
【发布时间】:2018-03-26 17:08:08
【问题描述】:

我已经公开了一个 Kafka 节点和一个主题名称。我的网络服务器接收到大量的 http 请求数据,我需要对其进行处理,然后将它们推送到 kafka。有时,如果 kafka 节点宕机了,那么我的服务器仍然会不断地抽取数据,这会导致我的内存炸毁,我的服务器也会宕机。

我想要的是在 Kafka 宕机时停止发布数据。我的Java示例代码如下:

  static Producer producer;

  Produce() {
    Properties properties = new Properties();
    properties.put("request.required.acks","1");
    properties.put("bootstrap.servers","localhost:9092,localhost:9093,localhost:9094");
    properties.put("enabled","true");
    properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
    properties.put("kafka-topic","pixel-server");
    properties.put("batch.size","1000");
    properties.put("producer.type","async");
    properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
    producer = new KafkaProducer<String, String>(properties);
  }


  public static void main(String[] args) {
    Produce produce = new Produce();

    produce.send(producer, "pixel-server", "Some time");

  }

  //This method is called lot of times
  public void send(Producer<String, String> producer, String topic, String data) {
    ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, data);
    Future<RecordMetadata> response = producer.send(producerRecord, (metadata, exception) -> {
      if (null != exception) {
        exception.printStackTrace();
      } else {
        System.out.println("Done");
      }
    });

我刚刚抽象出了一些示例代码。 send 方法被多次调用。如果卡夫卡关闭,我只想阻止发送任何消息。解决这种情况的有效方法是什么。

【问题讨论】:

  • 如果经纪人倒闭,你会得到来自未来的异常,你需要处理。也许放置一个 Redis 缓存来减轻您的 api 和代理之间的压力?如果您要求监控解决方案,有更好的工具来执行过程检查,而不是您自己编写的工具。不遵循监控数据库或网站的不同方式

标签: java apache-kafka publish vert.x


【解决方案1】:

如果我是你,我会尝试实现circuit breaker。当您在发送记录时遇到合理数量的故障时,电路会中断并提供一些回退行为。一旦满足某些条件(例如:经过一段时间),电路就会关闭,您将再次发送记录。还有vertx.io 附带它的own solution

【讨论】:

  • 但是我无法确定 kafka 是否已关闭的问题。如果我能找到,那么我可以找到一些解决方案。
  • 我的想法是 Kafka 是否启动很重要,但我是否可以将记录发送给他更重要。这就是我建议断路器模式的原因。我认为监控 Kafka 并做出相应的行为(你想这样做,对吗?)可能会更复杂。
  • 是的,你是正确的,我根本不想监控 kafka。但是当记录发送到kafka时我怎么知道失败是我的问题?
  • send 方法中的 Future 应该会在发生故障时通过返回异常来提醒您。
  • 是的,但它没有发生。我没有任何例外。所以这就是问题所在。
猜你喜欢
  • 1970-01-01
  • 2022-07-30
  • 1970-01-01
  • 1970-01-01
  • 2010-11-17
  • 1970-01-01
  • 1970-01-01
  • 2021-11-25
相关资源
最近更新 更多