【问题标题】:How do I do functions.from_csv at spark structured stream我如何在 spark 结构化流中执行 functions.from_csv
【发布时间】:2020-08-17 10:34:31
【问题描述】:

我从 kafka 源读取行,我想建立一个 kafka 消费者......在 spark 结构化流中 我知道如何告诉 spark 传入的行是 json 类型......我如何对 from_csv 做同样的事情?

   val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "topic2")
      .option("startingOffsets", "earliest")
      .load()
      .selectExpr("CAST(value AS STRING)")
      .select(functions.from_json($"value", retailDataSchema).as("data"))
      lines.printSchema()

架构是:

        val retailDataSchema = new StructType()
          .add("InvoiceNo", IntegerType)
          .add("Quantity", IntegerType)
          .add("Country", StringType)

谢谢!

输入数据如下所示:

【问题讨论】:

    标签: scala apache-spark apache-kafka spark-structured-streaming


    【解决方案1】:

    你可以解决这个问题:

        val lines = spark
                .readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "topic2")
                .option("startingOffsets", "earliest")
                .load()
                .select(col("value").cast("string")).as("data").select("data.*").selectExpr("cast(split(value,',')[0] as DataTypes.IntegerType) as InvoiceNo"
                        ,"cast(split(value,',')[1] as DataTypes.IntegerType) as Quantity"
                        ,"cast(split(value,',')[2] as DataTypes.StringType) as Country" );
        lines.printSchema();
    

    或者您可以使用内置函数 from_csv 从 Apache spark 3.0.0 开始

    val lines = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("subscribe", "topic2")
      .option("startingOffsets", "earliest")
      .load()
      .selectExpr("CAST(value AS STRING)")
      .select(functions.from_csv($"value", retailDataSchema).as("data"))
      lines.printSchema()
    

    Apache Spark Docs for from_csv built-in function

    【讨论】:

    • 首先,在两个.readStream中你要去掉括号()。其次,通过添加您的代码,在retailDataSchema 之后期望{'ADD'、'AFTER'、'ALL'}的不匹配输入'' 会出现异常。我的代码的目的是告诉 kafka 输入行是逗号分隔值。最后,我不明白 .csv("path") 的原因。
    • @vasilis 如果我的回答解决了您的问题,请接受它作为正确答案
    猜你喜欢
    • 2018-01-18
    • 1970-01-01
    • 2017-05-04
    • 2018-06-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多