【问题标题】:PySpark UDF Returns Status Code and Response in Separate withColumnPySpark UDF 在与列分开的情况下返回状态代码和响应
【发布时间】:2021-10-27 17:09:14
【问题描述】:

我有一个可以引用 status_code 并返回正文的 udf。

def Api(a):
  path = endpoint
  headers = {'sample-Key': sample}
  body = [{'text': body }]
  res = None
  try:
      req = requests.post(path, params=params, headers=headers, json=body)
      req = req.json()
      dumps=json.dumps(req)
  except Exception as e:
    return e
  if res != None and req.status_code == 200:
    return json.loads(dumps)
  return None

udf_Api = udf(Api)
newDF=df.withColumn("output", udf_Api(col("input")))

我可以返回 json.loads 并将其放入数据框中。但是,我的问题是我还需要将 status_code 保留在单独的列中。所以输出看起来像:

+---------+-----------+----------+
|    input|status_code|    output|
+---------+-----------+----------+
|inputText|        200|outputText|
+---------+-----------+----------+

那么我怎样才能同时返回 req.status_code 和 json.loads(),但将它们放在数据框中的单独列中?我想过返回一个数组然后拆分它,但不知道该怎么做。

【问题讨论】:

    标签: api pyspark user-defined-functions


    【解决方案1】:

    您可以修改 UDF 以返回 dict 而不是字符串或整数,然后定义输出模式。

    from pyspark.sql import functions as F
    from pyspark.sql import types as T
    
    def Api(a):
        return {
            'status': 200,
            'data': '{"a": 1}'
        }
    
    schema = T.StructType([
        T.StructField('status', T.IntegerType()),
        T.StructField('data', T.StringType())
    ])
    
    (df
        .withColumn('output', F.udf(Api, schema)('col'))
        .select('col', 'output.*')
        .show()
    )
    
    # +---+------+--------+
    # |col|status|    data|
    # +---+------+--------+
    # | 10|   200|{"a": 1}|
    # | 20|   200|{"a": 1}|
    # | 30|   200|{"a": 1}|
    # +---+------+--------+
    

    【讨论】:

      猜你喜欢
      • 2022-10-20
      • 2023-03-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-07-08
      • 2021-11-21
      • 1970-01-01
      相关资源
      最近更新 更多