【问题标题】:PySpark - Create a pyspark dataframe using Kakfa Json messagePySpark - 使用 Kakfa Json 消息创建 pyspark 数据帧
【发布时间】:2021-01-26 10:18:53
【问题描述】:

我正在使用 pyspark 结构化流并从 Kafka 主题读取数据,该主题采用 Json 复杂格式。

我使用 Spark Structred Streaming Format 作为 Kafka,代码如下 -

spark = SparkSession.builder \
        .appName("PythonSparkStreamingKafka") \
        .getOrCreate()

kafkaStreamDF = spark \
            .readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "localhost:9092") \
            .option("subscribe", "main.test.mysql.main.test_bank_data") \
            .option("startingOffsets", "earliest") \
            .load()

kafkaStreamDF1 = kafkaStreamDF.selectExpr("CAST(value AS STRING)")

message_schema = StructType().add("payload",StringType())
kafkaStreamDF2 = kafkaStreamDF1.select(from_json(col("value"),message_schema).alias("message"))

consoleOutput = kafkaStreamDF2.writeStream \
                .outputMode("append") \
                .format("console") \
                .option("truncate", "false") \
                .start()

我已经从消息中提取数据,直到 kafka json 消息的 Payload 部分及其在控制台上的输出,如下所示 -

|[{"before":null,"after":{"transaction_id":20,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL12071 ","value_date":18490,"withdrawal_amt":"AMTWoA==","deposit_amt":null,"balance_amt":"K6LiGA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|

|[{"before":null,"after":{"transaction_id":21,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL13071 ","value_date":18490,"withdrawal_amt":"AV741A==","deposit_amt":null,"balance_amt":"KkPpRA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|

现在我想提取后部分的数据并读取数据框中的归档数据,如下所示 -

transaction_id|account_no|transaction_date|transaction_details|value_date|withdrawal_amt|deposit_amt|   balance_amt

20              | 409000611074  |   16/08/2020       |  INDO GIBL Indiaforensic STL12071 |  16/08/2020  |   129000.00      |    (null)      | 7320950.00

21              | 409000611074  |   16/08/2020       |  INDO GIBL Indiaforensic STL13071 |  16/08/2020  |   230013.00      |    (null)      | 7090937.00

请建议我如何在 pyspark 数据框中实现此预期输出?

在kafka消息的确切值字段下方添加-

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false ,"field":"transaction_id"},{"type":"int64","optional":false,"field":"account_no"},{"type":"int32","optional":true," name":"io.debezium.time.Date","version":1,"field":"transaction_date"},{"type":"string","optional":true,"field":"transaction_details" },{"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"value_date"},{"type" :"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect .decimal.precision":"12"},"field":"withdrawal_amt"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.十进制","version":1,"parameters":{"scale":"2","connect.decimal.precision":"12"},"field":"deposit_amt"},{"type":" bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal .precision":"12"},"field":"balance_amt"}],"optional":true,"name":"main. test.mysql.main.test_bank_data.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false," field":"transaction_id"},{"type":"int64","optional":false,"field":"account_no"},{"type":"int32","optional":true,"name" :"io.debezium.time.Date","version":1,"field":"transaction_date"},{"type":"string","optional":true,"field":"transaction_details"}, {"type":"int32","optional":true,"name":"io.debezium.time.Date","version":1,"field":"value_date"},{"type":" bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal .precision":"12"},"field":"withdrawal_amt"},{"type":"bytes","optional":true,"name":"org.apache.kafka.connect.data.Decimal" ,"version":1,"parameters":{"scale":"2","connect.decimal.precision":"12"},"field":"deposit_amt"},{"type":"bytes" ,"可选":true,"name":"org.apache.kafka.connect.data.Decimal","version":1,"parameters":{"scale":"2","connect.decimal.precision ":"12"},"field":"balance_amt"}],"可选":tr ue,"name":"main.test.mysql.main.test_bank_data.Value","field":"after"},{"type":"struct","fields":[{"type":"string ","可选":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string", "可选":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","可选":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false", "field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":true,"field ":"table"},{"type":"int64","optional":false,"field":"server_id"},{"type":"string","optional":true,"field": "gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos "},{"type":"int32","optional":false,"field":"row"},{"type":"int64","optional":true,"field":"thread"} ,{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field ":"源"},{"type":"s TRING","可选":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"struct" ,"fields":[{"type":"string","optional":false,"field":"id"},{"type":"int64","optional":false,"field":" total_order"},{"type":"int64","optional":false,"field":"data_collection_order"}],"optional":true,"field":"transaction"}],"optional":false ,"名称":"main.test.mysql.main.test_bank_data.Envelope"},"payload":{"before":null,"after":{"transaction_id":146,"account_no":409000611076,"transaction_date ":18652,"transaction_details":"TRF FROM Indiaforensic SERVICES","value_date":18652,"withdrawal_amt":"AA==","deposit_amt":"B6Eg","balance_amt":"B6Eg"},"source ":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":1611587463000,"snapshot":"false"," db":"main","table":"test_bank_data","server_id":19105,"gtid":null,"file":"binlog.000584","pos":46195052,"row":0," thread":1604558,"query":null},"op":"c","ts_ms":1611587463181,"transaction":null}}

从这里开始,我已经在 DF1 上转换为字符串,并将 Payload 的一部分放入 DF2。

-- 最终工况 cmets -- 在 Kafka 连接端的 Debezium MySQL 连接器中转换 SMT 后添加我正在使用 Kafaka 获取 PySpark 结构化流中的消息值,如下所示 -

Value = 
{"transaction_id":21,"account_no":409000611074,"transaction_date":"2020-08- 
229","transaction_details":"INDO GIBL Indiaforensic STL13071 
","value_date":"2020-08-22","withdrawal_amt":"230013.00","deposit_amt":null,"balance_amt":"7090937.00"}

message_schema = StructType([
StructField('transaction_id', IntegerType(), True),
StructField('account_no', LongType(), True),
StructField('transaction_date', StringType(), True),
StructField('transaction_details', StringType(), True),
StructField('value_date', StringType(), True),
StructField('withdrawal_amt', StringType(), True),
StructField('deposit_amt', StringType(), True),
StructField('balance_amt', StringType(), True)   
]
)

【问题讨论】:

    标签: apache-spark pyspark spark-streaming apache-kafka-streams


    【解决方案1】:

    您可以将字符串 JSON 消息的架构传递给 from_json 函数。

    你的消息是这样的:

    #+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    #|value                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           |
    #+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    #|[{"before":null,"after":{"transaction_id":20,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL12071 ","value_date":18490,"withdrawal_amt":"AMTWoA==","deposit_amt":null,"balance_amt":"K6LiGA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|
    #|[{"before":null,"after":{"transaction_id":21,"account_no":409000611074,"transaction_date":18490,"transaction_details":"INDO GIBL Indiaforensic STL13071 ","value_date":18490,"withdrawal_amt":"AV741A==","deposit_amt":null,"balance_amt":"KkPpRA=="},"source":{"version":"1.4.0-SNAPSHOT","connector":"mysql","name":"main.test.mysql","ts_ms":0,"snapshot":"true","db":"main","table":"test_bank_data","server_id":0,"gtid":null,"file":"binlog.000584","pos":15484438,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1611582308774,"transaction":null}]|
    #+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    

    您可以修改代码以将 json 中的 after 字段解析为 MapType,然后选择所需的键作为列:

    message_schema = StructType([
         StructField('before', MapType(StringType(), StringType(), True), True),
         StructField('after', MapType(StringType(), StringType(), True), True),
         StructField('source', MapType(StringType(), StringType(), True), True),
         StructField('op', StringType(), True),
         StructField('ts_ms', StringType(), True),
         StructField('transaction', StringType(), True)
         ]
    )
    
    after_fields = [
        "account_no", "balance_amt", "deposit_amt", "transaction_date",
        "transaction_details", "transaction_id", "value_date", "withdrawal_amt"
    ]
    
    # parse json strings using from_json and select message.after.*
     kafkaStreamDF.withColumn(
         "message",
         F.from_json(F.col("value"), message_schema)
     ).select(
         *[F.col("message.after").getItem(f).alias(f) for f in after_fields]
     ).writeStream \
      .outputMode("append") \
      .format("console") \
      .option("truncate", "false") \
      .start() \
      .awaitTermination()   
    

    【讨论】:

    • 嗨@blacbishop 感谢您的回复。但是当我尝试使用此逻辑时,我收到错误消息 - 文件“/home/ubuntu/python-scripts/pyspark-sql-test.py” ,第 24 行,在 中 json_message_rdd = kafkaStreamDF.select("value").limit(1).rdd.map(lambda row: row.value) pyspark.sql.utils.AnalysisException: 必须执行带有流源的查询用 writeStream.start();;kafka
    • @Nicolas 你可以试试更新后的答案吗?
    • @blackbishop 看起来 OP 只是复制粘贴代码,所以你可能已经添加了 awaitTermination ;-)
    • @mike 感谢您指出这一点。我并没有真正使用火花流,而是回答它,因为它是纯 SQL 转换:)
    • @blackbishop :我已经检查了架构并在 debezium 连接器端尝试了一些逻辑,以从 debezium 连接器中删除不需要的数据,并将消息的后半部分生成到 kafka 中。然后我尝试使用您提供的逻辑并创建了新的 message_schema,如下所示,它工作正常,并显示数据框中的数据。非常感谢您的帮助和建议 :)
    猜你喜欢
    • 2023-01-27
    • 1970-01-01
    • 1970-01-01
    • 2021-08-06
    • 2022-10-24
    • 1970-01-01
    • 2016-10-25
    • 2022-11-29
    • 2021-03-11
    相关资源
    最近更新 更多