【发布时间】:2016-05-04 08:10:21
【问题描述】:
我按照快速入门指南:http://kafka.apache.org/documentation.html#quickstart 并想在 node.js 中编写一个使用者。主题“测试”已成功创建,我可以使用 kafka-console-produce.sh 并通过 kafka-console-consumer.sh 接收消息
我写了一个简单的消费者(live.js):
var kafka = require('kafka-node'),
client = new kafka.Client('localhost:2181/'),
consumer = new kafka.Consumer(client,
[{'topic': 'test', partition: 0}],
{autoCommit: true});
client.on('ready', function(){
console.log('Client ready!');
});
console.log(client);
console.log(consumer);
consumer.on('error', function (err) {
console.log("Kafka Error: Consumer - " + err);
});
consumer.on('offsetOutOfRange', function (err){
console.log("Kafka offsetOutOfRange: " + err);
});
consumer.on('message', function(message){
console.log(message);
});
在运行node live.js 时,我收到了所有以前发送的消息。但是当 live.js 正在运行并且我通过 Kafka 提供的脚本生成一条消息时,live.js 没有收到该消息(但它是由 Kafka 附带的消费者脚本接收的)。重新启动 live.js 后,我收到了消息,但我想“实时”获取。我使用默认配置,这里是来自 live.js 的日志:
EventEmitter {
connectionString: 'localhost:2181/',
clientId: 'kafka-node-client',
zkOptions: undefined,
noAckBatchOptions: undefined,
brokers: {},
longpollingBrokers: {},
topicMetadata: {},
topicPartitions: {},
correlationId: 0,
_socketId: 0,
cbqueue: {},
brokerMetadata: {},
ready: false,
zk:
EventEmitter {
client:
Client {
domain: null,
_events: [Object],
_eventsCount: 3,
_maxListeners: undefined,
connectionManager: [Object],
options: [Object],
state: [Object] },
_events:
{ init: [Object],
brokersChanged: [Function],
disconnected: [Object],
error: [Function] },
_eventsCount: 4 },
_events:
{ ready: [ [Function], [Function] ],
error: [Function],
close: [Function],
brokersChanged: [Function] },
_eventsCount: 4 }
EventEmitter {
fetchCount: 0,
client:
EventEmitter {
connectionString: 'localhost:2181/',
clientId: 'kafka-node-client',
zkOptions: undefined,
noAckBatchOptions: undefined,
brokers: {},
longpollingBrokers: {},
topicMetadata: {},
topicPartitions: {},
correlationId: 0,
_socketId: 0,
cbqueue: {},
brokerMetadata: {},
ready: false,
zk: EventEmitter { client: [Object], _events: [Object], _eventsCount: 4 },
_events:
{ ready: [Object],
error: [Function],
close: [Function],
brokersChanged: [Function] },
_eventsCount: 4 },
options:
{ autoCommit: true,
groupId: 'kafka-node-group',
autoCommitMsgCount: 100,
autoCommitIntervalMs: 5000,
fetchMaxWaitMs: 100,
fetchMinBytes: 1,
fetchMaxBytes: 1048576,
fromOffset: false,
encoding: 'utf8' },
ready: false,
paused: undefined,
id: 0,
payloads:
[ { topic: 'test',
partition: 0,
offset: 0,
maxBytes: 1048576,
metadata: 'm' } ],
_events: { done: [Function] },
_eventsCount: 1,
encoding: 'utf8' }
Client ready!
--编辑--
停止 live.js 并重新启动后,Zookeeper 日志显示如下:
[2016-01-27 15:53:20,135] INFO Accepted socket connection from /127.0.0.1:38166 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-01-27 15:53:20,139] WARN Connection request from old client /127.0.0.1:38166; will be dropped if server is in r-o mode (org.apache.zookeeper.server.ZooKeeperServer)
[2016-01-27 15:53:20,140] INFO Client attempting to establish new session at /127.0.0.1:38166 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-01-27 15:53:20,166] INFO Established session 0x1528384e45e0007 with negotiated timeout 30000 for client /127.0.0.1:38166 (org.apache.zookeeper.server.ZooKeeperServer)
【问题讨论】:
标签: node.js apache-kafka apache-zookeeper