【问题标题】:Apache kafka producer and consumer at different servres不同服务器上的 Apache kafka 生产者和消费者
【发布时间】:2019-11-01 22:35:02
【问题描述】:

我有多台服务器来生成消息,我需要在一台服务器上使用代理和消费者。如果我在同一台服务器上同时运行生产者和消费者,那么它可以正常工作,但不确定需要进行哪些更改才能使生产者分开。我不希望生产者服务器上的 zookeeper 和 kafka 服务器有任何依赖,因为它们有很多并且会增加。我尝试在设置 KafkaProducer 时将引导服务器更改为代理/消费者服务器,例如 192.168.0.1:9092,但仍然无法生成消息。不知道我错过了什么,请在这里帮助我。 我已关注https://github.com/mapr-demos/kafka-sample-programs 获取代码。

尝试在同一台服务器上同时运行生产者和消费者,它工作正常。

Producer.java

public class Producer {
    public static void main(String[] args) throws IOException {
        // set up the producer
        KafkaProducer<String, String> producer;
        try (InputStream props = Resources.getResource("producer.props").openStream()) {
            Properties properties = new Properties();
            properties.load(props);
            producer = new KafkaProducer<>(properties);
        }

        try {
            for (int i = 0; i < 1000000; i++) {
                // send lots of messages
                producer.send(new ProducerRecord<String, String>(
                        "fast-messages",
                        String.format("{\"type\":\"test\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)));

                // every so often send to a different topic
                if (i % 1000 == 0) {
                    producer.send(new ProducerRecord<String, String>(
                            "fast-messages",
                            String.format("{\"type\":\"marker\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)));
                    producer.send(new ProducerRecord<String, String>(
                            "summary-markers",
                            String.format("{\"type\":\"other\", \"t\":%.3f, \"k\":%d}", System.nanoTime() * 1e-9, i)));
                    producer.flush();
                    System.out.println("Sent msg number " + i);
                }
            }
        } catch (Throwable throwable) {
            System.out.printf("%s", throwable.getStackTrace());
        } finally {
            producer.close();
        }

    }

prodcuer.props

bootstrap.servers=192.168.0.1:9092
acks=all
retries=0
batch.size=16384
auto.commit.interval.ms=1000
linger.ms=0
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
block.on.buffer.full=true

Consumer.java

public class Consumer {
    public static void main(String[] args) throws IOException {
        // set up house-keeping
        ObjectMapper mapper = new ObjectMapper();
        Histogram stats = new Histogram(1, 10000000, 2);
        Histogram global = new Histogram(1, 10000000, 2);

        // and the consumer
        KafkaConsumer<String, String> consumer;
        try (InputStream props = Resources.getResource("consumer.props").openStream()) {
            Properties properties = new Properties();
            properties.load(props);
            if (properties.getProperty("group.id") == null) {
                properties.setProperty("group.id", "group-" + new Random().nextInt(100000));
            }
            consumer = new KafkaConsumer<>(properties);
        }
        consumer.subscribe(Arrays.asList("fast-messages", "summary-markers"));
        int timeouts = 0;
        //noinspection InfiniteLoopStatement
        while (true) {
            // read records with a short timeout. If we time out, we don't really care.
            ConsumerRecords<String, String> records = consumer.poll(200);
            if (records.count() == 0) {
                timeouts++;
            } else {
                System.out.printf("Got %d records after %d timeouts\n", records.count(), timeouts);
                timeouts = 0;
            }
            for (ConsumerRecord<String, String> record : records) {
                switch (record.topic()) {
                    case "fast-messages":
                        // the send time is encoded inside the message
                        JsonNode msg = mapper.readTree(record.value());
                        switch (msg.get("type").asText()) {
                            case "test":
                                long latency = (long) ((System.nanoTime() * 1e-9 - msg.get("t").asDouble()) * 1000);
                                stats.recordValue(latency);
                                global.recordValue(latency);
                                break;
                            case "marker":
                                // whenever we get a marker message, we should dump out the stats
                                // note that the number of fast messages won't necessarily be quite constant
                                System.out.printf("%d messages received in period, latency(min, max, avg, 99%%) = %d, %d, %.1f, %d (ms)\n",
                                        stats.getTotalCount(),
                                        stats.getValueAtPercentile(0), stats.getValueAtPercentile(100),
                                        stats.getMean(), stats.getValueAtPercentile(99));
                                System.out.printf("%d messages received overall, latency(min, max, avg, 99%%) = %d, %d, %.1f, %d (ms)\n",
                                        global.getTotalCount(),
                                        global.getValueAtPercentile(0), global.getValueAtPercentile(100),
                                        global.getMean(), global.getValueAtPercentile(99));

                                stats.reset();
                                break;
                            default:
                                throw new IllegalArgumentException("Illegal message type: " + msg.get("type"));
                        }
                        break;
                    case "summary-markers":
                        break;
                    default:
                        throw new IllegalStateException("Shouldn't be possible to get message on topic " + record.topic());
                }
            }
        }
    }
}

consumer.props

bootstrap.servers=192.168.0.1:9092
group.id=test
enable.auto.commit=true
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

# fast session timeout makes it more fun to play with failover
session.timeout.ms=10000

# These buffer sizes seem to be needed to avoid consumer switching to
# a mode where it processes one bufferful every 5 seconds with multiple
# timeouts along the way.  No idea why this happens.
fetch.min.bytes=50000
receive.buffer.bytes=262144
max.partition.fetch.bytes=2097152

【问题讨论】:

    标签: java apache-kafka


    【解决方案1】:

    当生产者不在代理机器上时会发生什么?您是否看到任何日志或错误消息?您没有描述您的设置,而是 IP 192.168.0.1。 (代理机器)可以从生产者机器访问,端口 9092 是否对外开放(检查 iptables)?

    另一件事:如果生产者和消费者不在同一台机器上,上面的代码不会给你有意义的结果。您使用System.nanoTime() 来测量延迟。但是根据official documentation

    此方法只能用于测量经过的时间,与系统或挂钟时间的任何其他概念无关。返回的值表示自某个固定但任意的原始时间以来的纳秒(可能在将来,因此值可能为负数)。在 Java 虚拟机实例中,此方法的所有调用都使用相同的来源;其他虚拟机实例可能使用不同的来源。

    【讨论】:

      猜你喜欢
      • 2019-10-16
      • 2019-01-15
      • 2017-11-03
      • 1970-01-01
      • 2018-01-07
      • 2018-02-08
      • 2020-01-11
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多