【问题标题】:Parsing JSON message in Spark from Kafka stream从 Kafka 流中解析 Spark 中的 JSON 消息
【发布时间】:2020-01-06 18:12:08
【问题描述】:

我有一个需要在 spark ( java ) 中解析的事件流(格式如下)。我能够读取流,但无法找到将消息转换为 java bean 的示例。

{
    user_id  : string,
    session_id : string,
    event : string,
    page : string,
    timestamp : timestamp
}

Java 豆

public class Event implements Serializable {

    private String user_id;

    private String session_id;
    private String page;
    private String event;
    private Timestamp timestamp;
}

将消息读取为字符串的代码。

Dataset<String> lines = spark
                        .readStream()
                        .format("kafka")
                        .option("kafka.bootstrap.servers", "localhost:9092")
                        .option("subscribe", topics)
                        .load()
                        .selectExpr("CAST(value AS STRING)")
                        .as(Encoders.STRING());     

【问题讨论】:

    标签: apache-spark apache-spark-sql spark-streaming


    【解决方案1】:

    我能够使用以下方法使其工作。

            FlatMapFunction<String, Event> linesToEvents = new FlatMapFunction<String, Event>() {
                @Override
                public Iterator<Event> call(String line) throws JsonMappingException, JsonProcessingException {
                    ObjectMapper mapper = new ObjectMapper();
                    ArrayList<Event> eventList = new ArrayList<>();
                    eventList.add(mapper.readValue(line, Event.class));
                    return eventList.iterator();
                }
            };
    
    
            Dataset<Event> lines = spark
                                    .readStream()
                                    .format("kafka")
                                    .option("kafka.bootstrap.servers", "localhost:9092")
                                    .option("subscribe", topics)
                                    .load()
                                    .selectExpr("CAST(value AS STRING)")
                                    .as(Encoders.STRING())
                                    .flatMap(linesToEvents, Encoders.bean(Event.class));
    

    【讨论】:

      猜你喜欢
      • 2016-08-13
      • 2017-11-08
      • 2015-09-13
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-11
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多