【发布时间】:2021-06-04 11:11:00
【问题描述】:
在 PySpark 中,如何将 Dataframe 转换为普通字符串?
背景:
我在 Kafka 中使用 PySpark,而不是硬编码代理名称,我在 PySpark 中参数化了 Kafka 代理名称。
Json 文件包含 Broker 详细信息,Spark 读取此 Json 输入并将值分配给变量。这些变量是带字符串的 Dataframe 类型。
当我将数据框传递给 Pyspark-Kakfa 连接详细信息以替换值时,我遇到了问题。
错误:
只能将字符串(不是数据框)连接到字符串。
Json 参数文件:
{
"broker": "https://at.com:8082",
"topicname": "dev_hello"
}
PySpark 代码:
parameter = spark.read.option("multiline", "true").json("/at/dev_parameter.json")
kserver = parameter.select("broker")
ktopic = parameter.select("topicname")
df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
.write
.format("kafka")
.outputMode("append")
.option("kafka.bootstrap.servers", "f"+ **kserver**)
.option("topic", "josn_data_topic",**ktopic** )
.save()
请指教。
我的第二个查询是如何将这些基于 Python 的变量传递给另一个基于 Scala 的 Spark 笔记本。
【问题讨论】:
-
您不能将 Python 变量传递给 Scala 笔记本。但是您可以将记录写入 Kafka,然后从 Scala 中使用它们
标签: json apache-spark pyspark apache-kafka