【问题标题】:Deserializing Spark structured stream data from Kafka topic反序列化来自 Kafka 主题的 Spark 结构化流数据
【发布时间】:2019-11-25 04:25:19
【问题描述】:

我正在使用 Kafka 2.3.0 和 Spark 2.3.4。我已经构建了一个 Kafka 连接器,它读取 CSV 文件并将 CSV 中的一行发布到相关的 Kafka 主题。这条线是这样的: "201310,XYZ001,Sup,XYZ,A,0,Presales,6,Callout,0,0,1,N,Prospect"。 CSV 包含 1000 条这样的行。连接器能够成功地将它们发布在主题上,我也能够在 Spark 中获取消息。我不确定如何将该消息反序列化到我的架构中?请注意,消息是无标题的,因此 kafka 消息中的关键部分为空。值部分包括上面的 complete CSV 字符串。我的代码如下。

我查看了这个 - How to deserialize records from Kafka using Structured Streaming in Java?,但无法将其移植到我的 csv 案例中。此外,我尝试了其他 spark sql 机制来尝试从“值”列中检索单个行,但无济于事。如果我确实设法获得了编译版本(例如,indivValues 数据集或 dsRawData 上的映射),我会收到类似以下错误:“org.apache.spark.sql.AnalysisException: cannot resolve 'IC' given input columns: [value ];"。如果我理解正确,那是因为 value 是一个逗号分隔的字符串,如果我不做“某事”,spark 并不会真正为我神奇地映射它。

//build the spark session
SparkSession sparkSession = SparkSession.builder()
    .appName(seCfg.arg0AppName)
    .config("spark.cassandra.connection.host",config.arg2CassandraIp)
    .getOrCreate();

...
//my target schema is this:
StructType schema = DataTypes.createStructType(new StructField[] {
    DataTypes.createStructField("timeOfOrigin",  DataTypes.TimestampType, true),
    DataTypes.createStructField("cName", DataTypes.StringType, true),
    DataTypes.createStructField("cRole", DataTypes.StringType, true),
    DataTypes.createStructField("bName", DataTypes.StringType, true),
    DataTypes.createStructField("stage", DataTypes.StringType, true),
    DataTypes.createStructField("intId", DataTypes.IntegerType, true),
    DataTypes.createStructField("intName", DataTypes.StringType, true),
    DataTypes.createStructField("intCatId", DataTypes.IntegerType, true),
    DataTypes.createStructField("catName", DataTypes.StringType, true),
    DataTypes.createStructField("are_vval", DataTypes.IntegerType, true),
    DataTypes.createStructField("isee_vval", DataTypes.IntegerType, true),
    DataTypes.createStructField("opCode", DataTypes.IntegerType, true),
    DataTypes.createStructField("opType", DataTypes.StringType, true),
    DataTypes.createStructField("opName", DataTypes.StringType, true)
    });
...

 Dataset<Row> dsRawData = sparkSession
    .readStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", config.arg3Kafkabootstrapurl)
    .option("subscribe", config.arg1TopicName)
    .option("failOnDataLoss", "false")
    .load();

//getting individual terms like '201310', 'XYZ001'.. from "values"
Dataset<String> indivValues = dsRawData
    .selectExpr("CAST(value AS STRING)")
    .as(Encoders.STRING())
    .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(",")).iterator(), Encoders.STRING());

//indivValues when printed to console looks like below which confirms that //I receive the data correctly and completely
/*
When printed on console, looks like this:
                +--------------------+
                |               value|
                +--------------------+
                |              201310|
                |              XYZ001|
                |                 Sup|
                |                 XYZ|
                |                   A|
                |                   0|
                |            Presales|
                |                   6|
                |             Callout|
                |                   0|
                |                   0|
                |                   1|
                |                   N|
                |            Prospect|
                +--------------------+
*/

StreamingQuery sq = indivValues.writeStream()
    .outputMode("append")
    .format("console")
    .start();
//await termination
sq.awaitTermination();
  • 我需要将数据键入为我上面显示的自定义架构,因为我将对其进行数学计算(对于每个新行与一些旧行相结合)。
  • 在将 Kafka 连接器源任务中的标头推送到主题之前是否会更好?有标头会使这个问题的解决更简单吗?

谢谢!

【问题讨论】:

    标签: apache-spark apache-kafka spark-streaming-kafka


    【解决方案1】:

    我现在已经能够解决这个问题了。通过使用 spark sql。解决方案的代码如下。

    //dsRawData has raw incoming data from Kafka...
    Dataset<String> indivValues = dsRawData
                    .selectExpr("CAST(value AS STRING)")
                    .as(Encoders.STRING());
    
    //create new columns, parse out the orig message and fill column with the values
    Dataset<Row> dataAsSchema2 = indivValues
                        .selectExpr("value",
                                "split(value,',')[0] as time",
                                "split(value,',')[1] as cname",
                                "split(value,',')[2] as crole",
                                "split(value,',')[3] as bname",
                                "split(value,',')[4] as stage",
                                "split(value,',')[5] as intid",
                                "split(value,',')[6] as intname",
                                "split(value,',')[7] as intcatid",
                                "split(value,',')[8] as catname",
                                "split(value,',')[9] as are_vval",
                                "split(value,',')[10] as isee_vval",
                                "split(value,',')[11] as opcode",
                                "split(value,',')[12] as optype",
                                "split(value,',')[13] as opname")
                        .drop("value");
    
    //remove any whitespaces as they interfere with data type conversions
    dataAsSchema2 = dataAsSchema2
                        .withColumn("intid", functions.regexp_replace(functions.col("int_id"),
                                " ", ""))
                        .withColumn("intcatid", functions.regexp_replace(functions.col("intcatid"),
                                " ", ""))
                        .withColumn("are_vval", functions.regexp_replace(functions.col("are_vval"),
                                " ", ""))
                        .withColumn("isee_vval", functions.regexp_replace(functions.col("isee_vval"),
                                " ", ""))
                        .withColumn("opcode", functions.regexp_replace(functions.col("opcode"),
                                " ", ""));
    
        //change types to ready for calc
    dataAsSchema2 = dataAsSchema2
                        .withColumn("intcatid",functions.col("intcatid").cast(DataTypes.IntegerType))
                        .withColumn("intid",functions.col("intid").cast(DataTypes.IntegerType))
                        .withColumn("are_vval",functions.col("are_vval").cast(DataTypes.IntegerType))
                        .withColumn("isee_vval",functions.col("isee_vval").cast(DataTypes.IntegerType))
                        .withColumn("opcode",functions.col("opcode").cast(DataTypes.IntegerType));
    
    
    //build a POJO dataset    
    Encoder<Pojoclass2> encoder = Encoders.bean(Pojoclass2.class);
            Dataset<Pojoclass2> pjClass = new Dataset<Pojoclass2>(sparkSession, dataAsSchema2.logicalPlan(), encoder);
    

    【讨论】:

      【解决方案2】:

      鉴于您现有的代码,从dsRawData 解析输入的最简单方法是将其转换为Dataset&lt;String&gt;,然后使用native csv reader api

      //dsRawData has raw incoming data from Kafka...
      Dataset<String> indivValues = dsRawData
                      .selectExpr("CAST(value AS STRING)")
                      .as(Encoders.STRING());
      
      Dataset<Row>    finalValues = sparkSession.read()
                      .schema(schema)
                      .option("delimiter",",")
                      .csv(indivValues);
      

      通过这样的结构,您可以使用与直接从 Spark 读取 CSV 文件时完全相同的 CSV 解析选项。

      【讨论】:

      • 谢谢!我还没有机会尝试这个,但看起来更干净!
      猜你喜欢
      • 2021-09-30
      • 1970-01-01
      • 2017-08-23
      • 2020-02-27
      • 2019-04-30
      • 2018-03-31
      • 2019-07-29
      • 1970-01-01
      • 2020-01-31
      相关资源
      最近更新 更多