【问题标题】:In Spark streaming, Is it possible to upsert batch data from kafka to Hive?在 Spark 流中,是否可以将批处理数据从 kafka 插入到 Hive?
【发布时间】:2020-08-21 06:35:18
【问题描述】:

我的计划是:

1. using spark streaming to load data from kafka every period like 1 minute.
2. convert the data loading every 1 min into DataFrame.
3. upsert the DataFrame into a Hive table (a table storing all history data)

目前,我成功实现了step1-2。

我想知道是否有任何实际的方法来实现step3。详细:

1. load the latest history table with a certain partition in spark streaming.
2. use batch DataFrame to join the history table/DataFrame with a partition, and generate a new DataFrame.
3. save the new DataFrame to Hive, overwriting the history table of that partition.

这是我的代码:


public final class SparkConsumer {
  private static final Pattern SPACE = Pattern.compile(" ");

  public static void main(String[] args) throws Exception {
    String brokers = "device1:9092,device2:9092,device3:9092";
    String groupId = "spark";
    String topics = "zhihu_comment";
    String destTable = "ods.zhihu_comment";

    // Create context with a certain seconds batch interval
    SparkConf sparkConf = new SparkConf().setAppName("TestKafkaStreaming");
    sparkConf.set("spark.streaming.backpressure.enabled", "true");
    sparkConf.set("spark.streaming.backpressure.initialRate", "10000");
    sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "10000");
    JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(60));

    Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
    kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

    kafkaParams.put("enable.auto.commit", false);
    kafkaParams.put("max.poll.records", "500");

    SparkSession spark = SparkSession.builder().appName(topics).getOrCreate();

    // Create direct kafka stream with brokers and topics
    JavaInputDStream<ConsumerRecord<String, String>> messages = KafkaUtils.createDirectStream(
            jssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.Subscribe(topicsSet, kafkaParams));

    messages.foreachRDD(rdd -> {
        OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
        // some time later, after outputs have completed
        ((CanCommitOffsets) messages.inputDStream()).commitAsync(offsetRanges);
    });
    /*
     * Keep only the actual message in JSON format
     */
    Column[] colList =  { col("answer_id"), col("author"), col("content"), col("vote_count") };

    JavaDStream<String> recordStream = messages.flatMap(record -> Arrays.asList(record.value()).iterator());
    /*
     * Extract RDDs from stream
     */
    recordStream.foreachRDD(rdd -> {
        if (rdd.count() > 0) {
            Dataset<Row> df = spark.read().json(rdd.rdd());

            df.select(colList).show();
        }
    });

    // Get the lines, split them into words, count the words and print
    JavaDStream<String> lines = messages.map(ConsumerRecord::value);

    jssc.start();
    jssc.awaitTermination();
  }
}

我想知道这种方式是否实用?如果您能给我一些建议,我将不胜感激。

【问题讨论】:

    标签: java apache-spark apache-kafka hive spark-streaming


    【解决方案1】:

    我强烈推荐Kafka Connect,而不是重新发明轮子。 您只需要 HDFS Sink 连接器,它将数据从 Kafka 主题复制到 Hive:

    Kafka Connect HDFS Sink 连接器允许您从 Kafka 主题到各种格式的 HDFS 文件并集成 使用 Hive 使数据立即可用于使用 HiveQL 进行查询

    【讨论】:

    • 您好 Giorgos,感谢您的推荐。我更喜欢用原始代码来实现它,因为我认为场景不是那么复杂。不过,您的建议将来可能是一个不错的选择。
    猜你喜欢
    • 2017-05-13
    • 1970-01-01
    • 1970-01-01
    • 2017-02-10
    • 1970-01-01
    • 1970-01-01
    • 2019-01-14
    • 2022-12-17
    • 2019-02-21
    相关资源
    最近更新 更多