【问题标题】:Spark Java - Conditional logic for streaming data - Need critiqueSpark Java - 流数据的条件逻辑 - 需要批评
【发布时间】:2019-07-20 21:41:31
【问题描述】:

我需要根据接收到的输入有条件地处理我的数据。该应用程序是一个 Spark (2.3.4) 结构化流应用程序,读取 Kafka 源 (2.3.0)。我可以成功读取数据,获取数据,解析数据等等。

根据消息包含的数据,我需要扩展我的进一步处理。我在代码中列出了一种工作方法,需要进行严格的评估,以查看这是最好的方法还是可以使用另一种更好的方法。

工作方法如下。根据该消息,我需要进行许多进一步的转换,并将各种转换后的输出保存到数据库中,最后以 csv 或 json 格式提供答案。

//raw streaming data from kafka here
Dataset<String> values = dsRawData
                    .selectExpr("CAST(value AS STRING)")
                    .as(Encoders.STRING());

//conditional processing check here
Dataset<String> csvIn = values 
                    .map((MapFunction<String, String>) se -> {
                        String[] controls = se.split(",");
                        secoreLog.info(logHeader+"controls: " + controls[0]);

                        if(controls[0].equals("magic1") && controls[1].equals("magic2") &&
                                controls[2].equals("magic2") && controls[3].equals("magic1")){
                            //trigger transformations & writes
                        }
                        else {
                            //trigger a different set of transformations & writes
                        }

                        return controls.toString();
                    }, Encoders.STRING());

请查看并发表您的意见!

【问题讨论】:

    标签: apache-spark apache-kafka spark-structured-streaming


    【解决方案1】:

    您为什么不使用过滤器,然后根据您的要求使用不同的 writeStreams。 我认为这将是一个更好的方法。 此外,您将能够更好地处理每个流式查询。 谢谢!

    【讨论】:

    • 好的。所以,如果我对你的理解正确,你所提倡的是 2 个不同的过滤器,每个过滤器都被编码来识别消息的类型?然后这给了我两个数据集,一个带有 1 类消息,另一个带有 2 类消息?
    • 是的!单独使用它们并使用spark.streams.awaitAnyTermination
    猜你喜欢
    • 2018-12-03
    • 1970-01-01
    • 2010-12-22
    • 2020-06-15
    • 1970-01-01
    • 1970-01-01
    • 2012-03-29
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多