【问题标题】:Creating a JavaPairRDD using KafkaUtils.createRDD (spark and kafka)使用 KafkaUtils.createRDD(spark 和 kafka)创建 JavaPairRDD
【发布时间】:2016-12-16 08:46:53
【问题描述】:

我正在编写一个批处理作业来重播来自 Kafka 的事件。 Kafka v. 0.10.1.0 和 spark 1.6。

我正在尝试使用 JavaPairRDD javaPairRDD = KafkaUtils.createRDD(...) 调用:

Properties configProperties = new Properties();
configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.4.1.194:9092");
configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
for (String topic : topicNames) {
            List<PartitionInfo> partitionInfos = producer.partitionsFor(topic);
    for (PartitionInfo partitionInfo : partitionInfos) {
                log.debug("partition leader id: {}", partitionInfo.leader().id());
        JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
        Map<String, String> kafkaParams = new HashMap();
        kafkaParams.put("metadata.broker.list", "10.4.1.194:9092");
        kafkaParams.put("zookeeper.connect", "10.4.1.194:2181");
        kafkaParams.put("group.id", "kafka-replay");
        OffsetRange[] offsetRanges = new OffsetRange[]{OffsetRange.create(topic, partitionInfo.partition(), 0, Long.MAX_VALUE)};

        JavaPairRDD<String, String> javaPairRDD = KafkaUtils.createRDD(
                        sparkContext,
                        String.class,
                        String.class,
                        StringDecoder.class,
                        StringDecoder.class,
                        kafkaParams,
                        offsetRanges);

        javaPairRDD
                .map(t -> getInstrEvent(t._2))
                .filter(ie -> startTimestamp <= ie.getTimestamp() && ie.getTimestamp() <= endTimestamp)
                .foreach(s -> System.out.println(s));
    }
}

但是它失败并出现错误:

2016-12-14 15:45:44,700 [main] ERROR     com.goldenrat.analytics.KafkaToHdfsReplayMain - error
org.apache.spark.SparkException: Offsets not available on leader:     OffsetRange(topic: 'sfs_create_room', partition: 0, range: [1 -> 100])
    at     org.apache.spark.streaming.kafka.KafkaUtils$.org$apache$spark$streaming$kaf    ka$KafkaUtils$$checkOffsets(KafkaUtils.scala:200)
    at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createRDD$1.apply(KafkaUtils.scala:253)
    at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createRDD$1.apply(KafkaUtils.scala:249)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
    at org.apache.spark.streaming.kafka.KafkaUtils$.createRDD(KafkaUtils.scala:249)
    at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createRDD$3.apply(KafkaUtils.scala:338)
    at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$createRDD$3.apply(KafkaUtils.scala:333)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
    at org.apache.spark.streaming.kafka.KafkaUtils$.createRDD(KafkaUtils.scala:333)
    at org.apache.spark.streaming.kafka.KafkaUtils.createRDD(KafkaUtils.scala)
    at com.goldenrat.analytics.KafkaToHdfsReplayMain$KafkaToHdfsReplayJob.start(KafkaToHdfsReplayMain.java:172)

我可以使用其他客户端连接到代理并获取消息,所以我知道它不是代理。有什么帮助吗?

【问题讨论】:

    标签: java apache-spark apache-kafka


    【解决方案1】:

    看起来您无法为您的范围指定不存在的偏移量。我希望我可以通过将 0 指定为 Long.MAX_VALUE 来获得 所有 偏移量,但如果偏移量因该错误消息而无效,则会失败。如果我为该范围指定一个有效的偏移量(最小/最大),它确实有效。对于其他偶然发现此问题的人,您可以通过以下方式获得它们:

            Properties configProperties = new Properties();
            configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.4.1.194:9092");
            configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
            for (String topic : topicNames) {
                offsets.get(topic).getMinimum(), offsets.get(topic).getMaximum());
                log.debug("doing topic: {}", topic);
                List<PartitionInfo> partitionInfos = producer.partitionsFor(topic);
                for (PartitionInfo partitionInfo : partitionInfos) {
    
                    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionInfo.partition());
                    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
                    SimpleConsumer consumer = new SimpleConsumer("10.4.1.194", 9092, 10000, 64 * 1024, "kafka-replay");
    
                    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.EarliestTime(), 1));
                    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), "kafka-replay");
                    OffsetResponse response = consumer.getOffsetsBefore(request);
                    if (response.hasError()) {
                        log.error("error, " + response.errorCode(topic, partitionInfo.partition()));
                    }
                    long[] earliestOffsetsArray = response.offsets(topic, partitionInfo.partition());
    
    
                    requestInfo = new HashMap<>();
                    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
                    request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), "kafka-replay");
    
                    response = consumer.getOffsetsBefore(request);
                    if (response.hasError()) {
                        log.error("error, " + response.errorCode(topic, partitionInfo.partition()));
                    }
                    long[] latestOffsetsArray = response.offsets(topic, partitionInfo.partition());
    
                    ...
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-11-19
      • 2021-10-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多