【问题标题】:Spark Dataframe to StringTypeSpark Dataframe 到 StringType
【发布时间】:2021-06-04 11:11:00
【问题描述】:

在 PySpark 中,如何将 Dataframe 转换为普通字符串?

背景:

我在 Kafka 中使用 PySpark,而不是硬编码代理名称,我在 PySpark 中参数化了 Kafka 代理名称。

Json 文件包含 Broker 详细信息,Spark 读取此 Json 输入并将值分配给变量。这些变量是带字符串的 Dataframe 类型。

当我将数据框传递给 Pyspark-Kakfa 连接详细信息以替换值时,我遇到了问题。

错误

只能将字符串(不是数据框)连接到字符串。

Json 参数文件

{
"broker": "https://at.com:8082",
"topicname": "dev_hello"
}

PySpark 代码:

parameter = spark.read.option("multiline", "true").json("/at/dev_parameter.json")

kserver = parameter.select("broker")

ktopic = parameter.select("topicname")


df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
   
.write

   .format("kafka")

   .outputMode("append")

   .option("kafka.bootstrap.servers", "f"+ **kserver**)

   .option("topic", "josn_data_topic",**ktopic** )

   .save()

请指教。

我的第二个查询是如何将这些基于 Python 的变量传递给另一个基于 Scala 的 Spark 笔记本。

【问题讨论】:

  • 您不能将 Python 变量传递给 Scala 笔记本。但是您可以将记录写入 Kafka,然后从 Scala 中使用它们

标签: json apache-spark pyspark apache-kafka


【解决方案1】:

使用json.load 代替 Spark json 阅读器:

import json

with open("/at/dev_parameter.json") as f:
    parameter = json.load(f)

kserver = parameter["broker"]
ktopic = parameter["topicname"]

df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value") \
  .write \
  .format("kafka") \
  .outputMode("append") \
  .option("kafka.bootstrap.servers", kserver) \
  .option("topic", ktopic) \
  .save()

如果你更喜欢使用 Spark json 阅读器,你可以这样做:

parameter = spark.read.option("multiline", "true").json("/at/dev_parameter.json")
kserver = parameter.select("broker").head()[0]
ktopic = parameter.select("topicname").head()[0]

【讨论】:

  • 感谢您的评论。通过 Spark 读取 Json 很容易,因为我也正在获取一些嵌套元素,并且使用 spark DF 很容易。一定有办法将 DF 传递给 String。
  • @Abhi 我已经添加了一种方法来做到这一点。看看有没有帮助?
  • 感谢您抽出宝贵时间。我尝试了您提出的选项,但是 Kserver 的数据类型是数据框,并且字符串不允许与数据框连接。如果您在 Pyspark 中尝试以下代码。它会失败。参数 = spark.read.option("multiline", "true").json("/at/dev_parameter.json") kserver = parameter.select("broker").head()[0] ktopic = parameter.select ("topicname").head()[0] server = "dev" final_broker= server+ kserver
  • kserver 应该是一个字符串,因为调用了.head()[0]。你能做到kserver = parameter.select("broker").head()[0]print(type(kserver))吗?
猜你喜欢
  • 2017-02-19
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-10-18
  • 1970-01-01
  • 2020-11-23
  • 2016-09-28
  • 2017-02-03
相关资源
最近更新 更多