【发布时间】:2018-05-26 01:27:19
【问题描述】:
我有一个Dataset<Row>,它是 Kafka readStream 的结果,如下面的 Java 代码 sn-p 所示。
m_oKafkaEvents = getSparkSession().readStream().format("kafka")
.option("kafka.bootstrap.servers", strKafkaAddress)
.option("subscribe", getInsightEvent().getTopic())
.option("maxOffsetsPerTrigger", "100000")
.option("startingOffsets", "latest")
.option("failOnDataLoss", false)
.load()
.select(functions.from_json(functions.col("value").cast("string"), oSchema).as("events"))
.select("events.*");
m_oKafkaEvents
{
{"EventTime":"1527005246864000000","InstanceID":"231","Model":"Opportunity_1","Milestone":"OrderProcessed"},
{"EventTime":"1527005246864000002","InstanceID":"232","Model":"Opportunity_2","Milestone":"OrderProcessed"},
{"EventTime":"1527005246864000001","InstanceID":"233","Model":"Opportunity_1","Milestone":"OrderProcessed"},
{"EventTime":"1527005246864000002","InstanceID":"234","Model":"Opportunity_2","Milestone":"OrderProcessed"}
}
我需要根据“模型”列拆分此数据集,这将产生两个数据集,如下所示;
m_oKafkaEvents_for_Opportunity_1_topic
{
{"EventTime":"1527005246864000000","InstanceID":"231","Model":"Opportunity_1","Milestone":"OrderProcessed"},
{"EventTime":"1527005246864000001","InstanceID":"233","Model":"Opportunity_1","Milestone":"OrderProcessed"}
}
m_oKafkaEvents_for_Opportunity_2_topic
{
{"EventTime":"1527005246864000002","InstanceID":"232","Model":"Opportunity_2","Milestone":"OrderProcessed"},
{"EventTime":"1527005246864000002","InstanceID":"234","Model":"Opportunity_2","Milestone":"OrderProcessed"}
}
这些数据集将发布到 Kafka 接收器中。主题名称将是模型值。即Opportunity_1 和Opportunity_2。
因此,我需要有一个句柄列“模型”值和相应的事件列表。
由于是 spark 新手,我正在寻求有关如何通过 java 代码实现这一点的帮助。
感谢任何帮助。
【问题讨论】:
-
更正拼写,改进格式