【问题标题】:kafka-node does not receive messages in real timekafka-node 没有实时接收消息
【发布时间】:2016-05-04 08:10:21
【问题描述】:

我按照快速入门指南:http://kafka.apache.org/documentation.html#quickstart 并想在 node.js 中编写一个使用者。主题“测试”已成功创建,我可以使用 kafka-console-produce.sh 并通过 kafka-console-consumer.sh 接收消息

我写了一个简单的消费者(live.js):

var kafka = require('kafka-node'),
    client = new kafka.Client('localhost:2181/'),
    consumer = new kafka.Consumer(client,
                                  [{'topic': 'test', partition: 0}],
                                  {autoCommit: true});

client.on('ready', function(){
  console.log('Client ready!');
});

console.log(client);
console.log(consumer);

consumer.on('error', function (err) {
  console.log("Kafka Error: Consumer - " + err);
});

consumer.on('offsetOutOfRange', function (err){
  console.log("Kafka offsetOutOfRange: " + err);
});

consumer.on('message', function(message){
  console.log(message);
});

在运行node live.js 时,我收到了所有以前发送的消息。但是当 live.js 正在运行并且我通过 Kafka 提供的脚本生成一条消息时,live.js 没有收到该消息(但它是由 Kafka 附带的消费者脚本接收的)。重新启动 live.js 后,我收到了消息,但我想“实时”获取。我使用默认配置,这里是来自 live.js 的日志:

EventEmitter {
  connectionString: 'localhost:2181/',
  clientId: 'kafka-node-client',
  zkOptions: undefined,
  noAckBatchOptions: undefined,
  brokers: {},
  longpollingBrokers: {},
  topicMetadata: {},
  topicPartitions: {},
  correlationId: 0,
  _socketId: 0,
  cbqueue: {},
  brokerMetadata: {},
  ready: false,
  zk: 
   EventEmitter {
     client: 
      Client {
        domain: null,
        _events: [Object],
        _eventsCount: 3,
        _maxListeners: undefined,
        connectionManager: [Object],
        options: [Object],
        state: [Object] },
     _events: 
      { init: [Object],
        brokersChanged: [Function],
        disconnected: [Object],
        error: [Function] },
     _eventsCount: 4 },
  _events: 
   { ready: [ [Function], [Function] ],
     error: [Function],
     close: [Function],
     brokersChanged: [Function] },
  _eventsCount: 4 }
EventEmitter {
  fetchCount: 0,
  client: 
   EventEmitter {
     connectionString: 'localhost:2181/',
     clientId: 'kafka-node-client',
     zkOptions: undefined,
     noAckBatchOptions: undefined,
     brokers: {},
     longpollingBrokers: {},
     topicMetadata: {},
     topicPartitions: {},
     correlationId: 0,
     _socketId: 0,
     cbqueue: {},
     brokerMetadata: {},
     ready: false,
     zk: EventEmitter { client: [Object], _events: [Object], _eventsCount: 4 },
     _events: 
      { ready: [Object],
        error: [Function],
        close: [Function],
        brokersChanged: [Function] },
     _eventsCount: 4 },
  options: 
   { autoCommit: true,
     groupId: 'kafka-node-group',
     autoCommitMsgCount: 100,
     autoCommitIntervalMs: 5000,
     fetchMaxWaitMs: 100,
     fetchMinBytes: 1,
     fetchMaxBytes: 1048576,
     fromOffset: false,
     encoding: 'utf8' },
  ready: false,
  paused: undefined,
  id: 0,
  payloads: 
   [ { topic: 'test',
       partition: 0,
       offset: 0,
       maxBytes: 1048576,
       metadata: 'm' } ],
  _events: { done: [Function] },
  _eventsCount: 1,
  encoding: 'utf8' }
Client ready!

--编辑--

停止 live.js 并重新启动后,Zookeeper 日志显示如下:

[2016-01-27 15:53:20,135] INFO Accepted socket connection from /127.0.0.1:38166 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-01-27 15:53:20,139] WARN Connection request from old client /127.0.0.1:38166; will be dropped if server is in r-o mode (org.apache.zookeeper.server.ZooKeeperServer)
[2016-01-27 15:53:20,140] INFO Client attempting to establish new session at /127.0.0.1:38166 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-01-27 15:53:20,166] INFO Established session 0x1528384e45e0007 with negotiated timeout 30000 for client /127.0.0.1:38166 (org.apache.zookeeper.server.ZooKeeperServer)

【问题讨论】:

    标签: node.js apache-kafka apache-zookeeper


    【解决方案1】:

    在您有一些特定需求之前,尝试使用 HighLevel 消费者效果很好。我正在为高级消费者使用以下选项

                {
                    groupId: "Consumer group",
                // Auto commit config
                    autoCommit: true,
                    autoCommitMsgCount: 100,
                    autoCommitIntervalMs: 5000,
                // Fetch message config
                    fetchMaxWaitMs: 100,
                    fetchMinBytes: 1,
                    fetchMaxBytes: 1024 * 10,
                    fromOffset: true,
                    fromBeginning: false, //to stop reading from beggening
                    encoding:'utf8'
                }
    

    【讨论】:

    • 您正在使用哪个节点版本。?听说它一直在努力使用新的节点版本。我在 0.12.7 中使用过。
    • var kafka = require('kafka-node'), HighLevelConsumer = kafka.HighLevelConsumer, client = new kafka.Client(), consumer = new HighLevelConsumer( client, [ { topic: 't' } ], { groupId: 'my-group', fromOffset: true, fromBeginning: false, //to stop reading from beggening encoding:'utf8' } );你试过了。
    • 是的,同样的问题。
    • 我想Zookeeper版本可能有问题,kafka-node使用node-zookeeper-client,库可能有点过时了。
    • 我尝试了no-kafka(来自npmjs.com/package/no-kafka 的消费者示例)并且它可以工作(Kafka 0.8.2.2,但no-kafka 也可以与 0.9 一起使用)。
    【解决方案2】:

    同样的问题here。 降级到 0.2.27 版本即可解决。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-01-10
      • 1970-01-01
      • 2020-06-04
      • 2016-12-02
      • 2018-09-12
      • 2018-11-28
      • 2020-11-15
      • 2022-08-13
      相关资源
      最近更新 更多