【发布时间】:2014-02-12 11:49:45
【问题描述】:
我正在使用 Kafka 0.8.0 并尝试实现下面提到的场景。
JCA API(充当生产者并向其发送数据)-----> 消费者-----> HBase
我在使用 JCA 客户端获取数据后立即将每条消息发送给消费者。例如,一旦生产者发送消息 no.1 ,我想从消费者那里获取相同的消息并“放入”HBase。但是我的消费者在一些随机的 n 消息之后开始获取消息。我想让生产者和消费者同步,这样他们就可以一起工作了。
我用过:
1 个经纪人
1 个主题
1 个单一生产者和高级消费者
谁能建议我需要做什么才能达到同样的效果?
已编辑:
添加一些相关代码sn-p。
Consumer.java
public class Consumer extends Thread {
private final ConsumerConnector consumer;
private final String topic;
PrintWriter pw = null;
int t = 0;
StringDecoder kd = new StringDecoder(null);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
Map<String, List<KafkaStream<String, Signal>>> consumerMap;
KafkaStream<String, Signal> stream;
ConsumerIterator<String, Signal> it;
public Consumer(String topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
this.topic = topic;
topicCountMap.put(topic, new Integer(1));
consumerMap = consumer.createMessageStreams(topicCountMap, kd, new Serializer(
new VerifiableProperties()));
stream = consumerMap.get(topic).get(0);
it = stream.iterator();
}
private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
props.put("zookeeper.connect", KafkaProperties.zkConnect);
props.put("group.id", KafkaProperties.groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("fetch.size", "1024");
return new ConsumerConfig(props);
}
synchronized public void run() {
while (it.hasNext()) {
t = (it.next().message()).getChannelid();
System.out.println("In Consumer received msg" + t);
}
}
}
producer.java
public class Producer {
public final kafka.javaapi.producer.Producer<String, Signal> producer;
private final String topic;
private final Properties props = new Properties();
public Producer(String topic)
{
props.put("serializer.class", "org.bigdata.kafka.Serializer");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("metadata.broker.list", "localhost:9092");
// Use random partitioner. Don't need the key type. Just set it to Integer.
// The message is of type userdefined Object .
producer = new kafka.javaapi.producer.Producer<String,Signal(newProducerConfig(props));
this.topic = topic;
}
}
KafkaProperties.java
public interface KafkaProperties {
final static String zkConnect = "127.0.0.1:2181";
final static String groupId = "group1";
final static String topic = "test00";
final static String kafkaServerURL = "localhost";
final static int kafkaServerPort = 9092;
final static int kafkaProducerBufferSize = 64 * 1024;
final static int connectionTimeOut = 100000;
final static int reconnectInterval = 10000;
final static String clientId = "SimpleConsumerDemoClient";
}
这就是消费者对于前 10 条消息的行为方式,它不会系统输出消费者收到的消息,但从第 11 条消息开始,它开始正常运行。
producer sending msg1
producer sending msg2
producer sending msg3
producer sending msg4
producer sending msg5
producer sending msg6
producer sending msg7
producer sending msg8
producer sending msg9
producer sending msg10
producer sending msg11
producer sending msg12
In Consumer received msg12
producer sending msg13
In Consumer received msg13
producer sending msg14
In Consumer received msg14
producer sending msg15
In Consumer received msg15
producer sending msg16
In Consumer received msg16
producer sending msg17
In Consumer received msg17
producer sending msg18
In Consumer received msg18
producer sending msg19
In Consumer received msg19
producer sending msg20
In Consumer received msg20
producer sending msg21
In Consumer received msg21
已编辑:在生产者向消费者发送消息的位置添加侦听器功能。而且我使用的默认生产者配置没有覆盖它
public synchronized void onValueChanged(final MonitorEvent event_) {
// Get the value from the DBR
try {
final DBR dbr = event_.getDBR();
final String[] val = (String[]) dbr.getValue();
producer1.producer.send(new KeyedMessage<String, Signal>
(KafkaProperties.topic,new Signal(messageNo)));
System.out.println("producer sending msg"+messageNo);
messageNo++;
} catch (Exception ex) {
ex.printStackTrace();
}
}
【问题讨论】:
-
你能展示你的生产者和消费者代码/配置吗?看起来他们中的一些是批量操作的(事实上,这是件好事)。
-
@Dmitry 添加了代码 sn-p。
-
消费者似乎没问题(除了属性 fetch.size = 1K - 这意味着消费者无法接收更大的消息,但可能不是我们正在寻找的问题)。可以分享一下生产者的 newProducerConfig() 和 run() 方法的代码吗?
-
@Dmitry 我的生产者类没有线程化。我有一个监听器函数,每当某个值发生变化时都会调用它,在其中我使用 producer.send() 函数向消费者发送消息。无论如何,我将共享监听器函数。
标签: java apache-kafka