【问题标题】:Why does Spark Streaming not read from Kafka topic?为什么 Spark Streaming 不从 Kafka 主题中读取?
【发布时间】:2017-04-22 15:44:38
【问题描述】:
  • Spark 流式传输 1.6.0
  • Apache Kafka 10.0.1

我使用 Spark Streaming 读取 sample 主题。代码运行没有错误或异常,但我没有通过print() 方法在控制台上获得任何数据。

我查看了主题中是否有消息:

./bin/kafka-console-consumer.sh \
    --zookeeper ip-172-xx-xx-xxx:2181 \
    --topic sample \
    --from-beginning

我收到了消息:

message no. 1
message no. 2
message no. 3
message no. 4
message no. 5

运行流式作业的命令:

./bin/spark-submit \
    --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -XX:MaxDirectMemorySize=512m" \
    --jars /home/ubuntu/zifferlabs/target/ZifferLabs-1-jar-with-dependencies.jar \
    --class "com.zifferlabs.stream.SampleStream" \
    /home/ubuntu/zifferlabs/src/main/java/com/zifferlabs/stream/SampleStream.java

这是完整的代码:

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import kafka.serializer.DefaultDecoder;
import kafka.serializer.StringDecoder;
import scala.Tuple2;

public class SampleStream {
  private static void processStream() {
    SparkConf conf = new SparkConf().setAppName("sampleStream")
            .setMaster("local[3]")
            .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
            .set("spark.driver.memory", "2g").set("spark.streaming.blockInterval", "1000")
            .set("spark.driver.allowMultipleContexts", "true")
            .set("spark.scheduler.mode", "FAIR");

    JavaStreamingContext jsc = new JavaStreamingContext(conf, new Duration(Long.parseLong("2000")));

    String[] topics = "sample".split(",");
    Set<String> topicSet = new HashSet<String>(Arrays.asList(topics));
    Map<String, String> props = new HashMap<String, String>();
    props.put("metadata.broker.list", "ip-172-xx-xx-xxx:9092");
    props.put("kafka.consumer.id", "sample_con");
    props.put("group.id", "sample_group");
    props.put("zookeeper.connect", "ip-172-xx-xx-xxx:2181");
    props.put("zookeeper.connection.timeout.ms", "16000");

    JavaPairInputDStream<String, byte[]> kafkaStream =
      KafkaUtils.createDirectStream(jsc, String.class, byte[].class, StringDecoder.class,
                                    DefaultDecoder.class, props, topicSet);

    JavaDStream<String> data = kafkaStream.map(new Function<Tuple2<String,byte[]>, String>() {
      public String call(Tuple2<String, byte[]> arg0) throws Exception {
        System.out.println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ value is: " + arg0._2().toString());
        return arg0._2().toString();
      }
    });

    data.print();

    System.out.println("Spark Streaming started....");
    jsc.checkpoint("/home/spark/sparkChkPoint");
    jsc.start();
    jsc.awaitTermination();
    System.out.println("Stopped Spark Streaming");
  }

  public static void main(String[] args) {
    processStream();
  }
}

【问题讨论】:

    标签: amazon-ec2 apache-kafka spark-streaming apache-spark-1.6


    【解决方案1】:

    认为你的代码是正确的,但是执行它的命令行不正确。

    spark-submit应用如下(格式化我的+spark.executor.extraJavaOptions为简单起见去掉):

    ./bin/spark-submit \
      --jars /home/ubuntu/zifferlabs/target/ZifferLabs-1-jar-with-dependencies.jar \
      --class "com.zifferlabs.stream.SampleStream" \
      /home/ubuntu/zifferlabs/src/main/java/com/zifferlabs/stream/SampleStream.java
    

    认为因为spark-submit提交你的Java源代码而不是可执行代码所以它不会工作。

    spark-submit您的申请如下:

    ./bin/spark-submit \
      --class "com.zifferlabs.stream.SampleStream" \
      /home/ubuntu/zifferlabs/target/ZifferLabs-1-jar-with-dependencies.jar
    

    --class 定义 Spark 应用程序的“入口点”和具有依赖关系的代码(作为 spark-submit 的唯一输入参数)。

    试一试,然后报告!

    【讨论】:

      猜你喜欢
      • 2019-06-24
      • 1970-01-01
      • 1970-01-01
      • 2019-11-23
      • 2019-07-12
      • 2023-03-25
      • 2017-04-11
      • 2018-01-09
      • 2016-05-06
      相关资源
      最近更新 更多