【问题标题】:nested json from rest api to pyspark dataframe从rest api到pyspark数据框的嵌套json
【发布时间】:2021-07-07 14:03:00
【问题描述】:

我正在尝试创建一个数据管道,我从 REST API 请求数据。输出是一个嵌套的 json 文件,非常棒。我想将 json 文件读入 pyspark 数据框。当我在本地保存文件并使用以下代码时,这工作正常:

from pyspark.sql import *
from pyspark.sql.functions import *

spark = SparkSession\
    .builder\
    .appName("jsontest")\
    .getOrCreate()

raw_df = spark.read.json(r"my_json_path", multiLine='true')

但是,当我想在发出 API 请求后直接创建一个 pyspark 数据帧时,我收到以下错误:

我使用以下代码来调用 rest api 并转换为 pyspark 数据帧:

apiCallHeaders = {'Authorization': 'Bearer ' + bearer_token}
apiCallResponse = requests.get(data_url, headers=apiCallHeaders, verify=True)
json_rdd = spark.sparkContext.parallelize(apiCallResponse.text)
raw_df = spark.read.json(json_rdd)

以下是部分响应输出

{"networks":[{"href":"/v2/networks/velobike-moscow","id":"velobike-moscow","name":"Velobike"},{"href":"/v2/networks/bycyklen","id":"bycyklen","name":"Bycyklen"},{"href":"/v2/networks/nu-connect","id":"nu-connect","name":"Nu-Connect"},{"href":"/v2/networks/baerum-bysykkel","id":"baerum-bysykkel","name":"Bysykkel"},{"href":"/v2/networks/bysykkelen","id":"bysykkelen","name":"Bysykkelen"},{"href":"/v2/networks/onroll-a-rua","id":"onroll-a-rua","name":"Onroll"},{"href":"/v2/networks/onroll-albacete","id":"onroll-albacete","name":"Onroll"},{"href":"/v2/networks/onroll-alhama-de-murcia","id":"onroll-alhama-de-murcia","name":"Onroll"},{"href":"/v2/networks/onroll-almunecar","id":"onroll-almunecar","name":"Onroll"},{"href":"/v2/networks/onroll-antequera","id":"onroll-antequera","name":"Onroll"},{"href":"/v2/networks/onroll-aranda-de-duero","id":"onroll-aranda-de-duero","name":"Onroll"}

我希望我的问题是有意义的,有人可以提供帮助。

提前致谢!

【问题讨论】:

  • 用你的 API 响应更新你的问题,而不是我的例子,请写成文本
  • 我无法分享我的回复输出,因为它是敏感数据。但是我尝试了你的 API,我得到了同样的错误。这可能与我安装 spark 的方式有关吗?
  • @Kafels,是的,它可能与安装有关。我应该重新安装吗?
  • 重装前试试this
  • 非常感谢@Kafels!!!!它奏效了……祝你玩得愉快,伙计:)

标签: python apache-spark pyspark apache-spark-sql data-pipeline


【解决方案1】:

answer 之后,您可以添加以下几行:

import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

要运行您的代码,必须在此处添加[ ]

rdd = spark.sparkContext.parallelize([apiCallResponse.text])

看一个例子:

import requests

response = requests.get('http://api.citybik.es/v2/networks?fields=id,name,href')
rdd = spark.sparkContext.parallelize([response.text])

df = spark.read.json(rdd)

df.printSchema()
# root
#  |-- networks: array (nullable = true)
#  |    |-- element: struct (containsNull = true)
#  |    |    |-- href: string (nullable = true)
#  |    |    |-- id: string (nullable = true)
#  |    |    |-- name: string (nullable = true)

(df
 .selectExpr('inline(networks)')
 .show(n=5, truncate=False))
# +----------------------------+---------------+----------+
# |href                        |id             |name      |
# +----------------------------+---------------+----------+
# |/v2/networks/velobike-moscow|velobike-moscow|Velobike  |
# |/v2/networks/bycyklen       |bycyklen       |Bycyklen  |
# |/v2/networks/nu-connect     |nu-connect     |Nu-Connect|
# |/v2/networks/baerum-bysykkel|baerum-bysykkel|Bysykkel  |
# |/v2/networks/bysykkelen     |bysykkelen     |Bysykkelen|
# +----------------------------+---------------+----------+

【讨论】:

  • 嗨卡菲尔。谢谢您的回答!实际上,我已经尝试过在没有运气的情况下使用并行化中的硬括号......我得到了和以前一样的错误。不知道是不是和我安装spark的方式有关系?
  • 能否更新您的问题以显示apiCallResponse.text 的输出?
  • 我已经在问题中添加了响应输出。
猜你喜欢
  • 2020-01-21
  • 2019-04-27
  • 2021-12-29
  • 1970-01-01
  • 2021-06-12
  • 2023-02-09
  • 2021-02-14
  • 2021-10-20
  • 2021-04-21
相关资源
最近更新 更多