【发布时间】:2019-07-20 21:41:31
【问题描述】:
我需要根据接收到的输入有条件地处理我的数据。该应用程序是一个 Spark (2.3.4) 结构化流应用程序,读取 Kafka 源 (2.3.0)。我可以成功读取数据,获取数据,解析数据等等。
根据消息包含的数据,我需要扩展我的进一步处理。我在代码中列出了一种工作方法,需要进行严格的评估,以查看这是最好的方法还是可以使用另一种更好的方法。
工作方法如下。根据该消息,我需要进行许多进一步的转换,并将各种转换后的输出保存到数据库中,最后以 csv 或 json 格式提供答案。
//raw streaming data from kafka here
Dataset<String> values = dsRawData
.selectExpr("CAST(value AS STRING)")
.as(Encoders.STRING());
//conditional processing check here
Dataset<String> csvIn = values
.map((MapFunction<String, String>) se -> {
String[] controls = se.split(",");
secoreLog.info(logHeader+"controls: " + controls[0]);
if(controls[0].equals("magic1") && controls[1].equals("magic2") &&
controls[2].equals("magic2") && controls[3].equals("magic1")){
//trigger transformations & writes
}
else {
//trigger a different set of transformations & writes
}
return controls.toString();
}, Encoders.STRING());
请查看并发表您的意见!
【问题讨论】:
标签: apache-spark apache-kafka spark-structured-streaming