【发布时间】:2016-08-14 17:11:17
【问题描述】:
在我们的 spark-streaming 工作中,我们从 kafka 读取流式消息。
为此,我们使用返回JavaPairInputDStreamfrom的KafkaUtils.createDirectStream API。
通过以下方式从 kafka 读取消息(来自三个主题 - test1、test2、test3):
private static final String TOPICS = "test1,test2,test3";
HashSet<String> topicsSet = new HashSet<>(Arrays.asList(TOPICS.split(",")));
HashMap<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", BROKERS);
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(
streamingContext,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
我们希望以不同的方式处理来自每个主题的消息,为了实现这一点,我们需要知道每条消息的主题名称。
所以我们执行以下操作:
JavaDStream<String> lines = messages.map(new SplitToLinesFunction());
这是SplitToLinesFunction的实现:
public class SplitToLinesFunction implements Function<Tuple2<String, String>, String> {
@Override
public String call(Tuple2<String, String> tuple2)
{
System.out.println(tuple2._1);
return tuple2._2();
}
}
问题在于tuple2._1 为空,我们假设tuple2._1 将包含一些元数据,例如消息来自的主题/分区的名称。
但是,当我们打印tuple2._1 时,它是空的。
我们的问题 - 有没有办法在 kafka 中发送主题名称,以便在 spark-streaming 代码中,tuple2._1 将包含它(而不是 null)?
请注意,我们还尝试从 spark-streaming kafka-integration tutorial 中提到的 DStream 中获取主题名称:
但它返回发送到KafkaUtils.createDirectStream 的所有主题,而不是消息(属于当前RDD)来自的特定主题。
所以它并不能帮助我们识别 RDD 中的消息是从哪里发送的主题的名称。
编辑
响应大卫的回答 - 我尝试像这样使用MessageAndMetadata:
Map<TopicAndPartition, Long> topicAndPartition = new HashMap();
topicAndPartition.put(new TopicAndPartition("test1", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test2", 0), 1L);
topicAndPartition.put(new TopicAndPartition("test3", 0), 1L);
class MessageAndMetadataFunction implements Function<MessageAndMetadata<String, String>, String>
{
@Override
public String call(MessageAndMetadata<String, String> v1)
throws Exception {
// nothing is printed here
System.out.println("topic = " + v1.topic() + ", partition = " + v1.partition());
return v1.topic();
}
}
JavaInputDStream<String> messages = KafkaUtils.createDirectStream(streamingContext, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicAndPartition, new MessageAndMetadataFunction());
messages.foreachRDD(new VoidFunction() {
@Override
public void call(Object t) throws Exception {
JavaRDD<String> rdd = (JavaRDD<String>)t;
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// here all the topics kafka listens to are printed, but that doesn't help
for (OffsetRange offset : offsets) {
System.out.println(offset.topic() + " " + offset.partition() + " " + offset.fromOffset() + " " + offset.untilOffset());
}
}
});
问题是MessageAndMetadataFunction.call 方法中没有打印任何内容。为了在 MessageAndMetadataFunction.call 方法中获取该 RDD 的相关主题,我应该修复什么?
【问题讨论】:
-
“这里什么都没有打印”是什么意思?甚至没有“topic =”部分,或者该部分打印但值为空。
-
如果没有,那么您应该查看您的
YARN日志,或者您正在运行的任何集群。对我来说,/usr/local/hadoop/logs/userLogs/中有日志文件从您的执行者那里捕获stdout。 -
抱歉 -- 我现在知道问题所在了。这是因为您的
MessageAndMetadataFunction必须将主题和消息都返回到一条记录中。现在您只返回主题,而不是消息本身。这就是为什么您一遍又一遍地打印出主题的原因——因为这就是你从MessageAndMetadataFunction返回的内容——返回两者,你将拥有两者。 -
编辑了我的答案以使其更清晰
-
但是
v1.topic()和v1.partition怎么可以一起返回呢?创建一个new Tuple2<String, String>(v1.topic(), v1.partition())并返回它?
标签: apache-spark apache-kafka spark-streaming