【发布时间】:2018-12-30 01:48:57
【问题描述】:
我已成功打印输出。但我想在 Spark DataFrame 中捕获这些,然后将它们插入到表中。
下面是我的消费者代码
public class SparkAvroConsumer {
private static Injection<GenericRecord, byte[]> recordInjection;
static {
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(UserSchema.getUserSchema());
recordInjection = GenericAvroCodecs.toBinary(schema);
}
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf()
.setAppName("JavaWordCountCon")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
String consumeGroup = "cg1";
Database_Conn conn = new Database_Conn();
Set<String> topics = Collections.singleton("Kafka_Example");
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("enable.auto.commit", "false");
kafkaParams.put("auto.commit.interval.ms", "101");
kafkaParams.put("group.id", consumeGroup);
kafkaParams.put("max.partition.fetch.bytes", "135");
JavaPairInputDStream<String, byte[]> directKafkaStream = KafkaUtils.createDirectStream(ssc,
String.class, byte[].class, StringDecoder.class, DefaultDecoder.class, kafkaParams, topics);
directKafkaStream
.map(message -> recordInjection.invert(message._2).get())
.foreachRDD(rdd -> {
rdd.foreach(record -> {
System.out.println(record);
});
});
ssc.start();
ssc.awaitTermination();
}}
【问题讨论】:
-
可以使用spark结构流式处理:
streamingDF.writeStream.foreachBatch { ...}spark.apache.org/docs/latest/… -
感谢您的回复。您能否提供代码 sn-p 如何将其放入我的代码中?注册
标签: java apache-spark apache-kafka