【问题标题】:How to convert a JSON result to Parquet?如何将 JSON 结果转换为 Parquet?
【发布时间】:2018-11-21 12:34:25
【问题描述】:

我有以下代码从 Marketo 系统中获取一些数据

from marketorestpython.client import MarketoClient
munchkin_id = "xxx-xxx-xxx"
client_id = "00000000-0000-0000-0000-00000000000"
client_secret= "secret"
mc = MarketoClient(munchkin_id, client_id, client_secret)
mc.execute(method='get_multiple_leads_by_filter_type', filterType='email', filterValues=['email@domain.com'], 
                  fields=['BG__c','email','company','createdAt'], batchSize=None)

这会返回以下数据

[{'BG__c': 'ABC',
  'company': 'MCS',
  'createdAt': '2016-10-25T14:04:15Z',
  'id': 4,
  'email': 'email@domain.com'},
 {'BG__c': 'CDE',
  'company': 'MSC',
  'createdAt': '2018-03-28T16:41:06Z',
  'id': 10850879,
  'email': 'email@domain.com'}]

我想要做的是,将其保存到 Parquet 文件中。但是当我使用以下代码尝试此操作时,我收到一条错误消息。

from marketorestpython.client import MarketoClient
munchkin_id = "xxx-xxx-xxx"
client_id = "00000000-0000-0000-0000-00000000000"
client_secret= "secret"
mc = MarketoClient(munchkin_id, client_id, client_secret)
data = mc.execute(method='get_multiple_leads_by_filter_type', filterType='email', filterValues=['email@domain.com'], 
                  fields=['BG__c','email','company','createdAt'], batchSize=None)

sqlContext.read.json(data)
data.write.parquet("adl://subscription.azuredatalakestore.net/folder1/Marketo/marketo_data")

java.lang.ClassCastException: java.util.HashMap cannot be cast to java.lang.String
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<command-1431708582476650> in <module>()
      7                       fields=['BG__c','email','company','createdAt'], batchSize=None)
      8 
----> 9 sqlContext.read.json(data)
     10 data.write.parquet("adl://subscription.azuredatalakestore.net/folder1/Marketo/marketo_data")

/databricks/spark/python/pyspark/sql/readwriter.py in json(self, path, schema, primitivesAsString, prefersDecimal, allowComments, allowUnquotedFieldNames, allowSingleQuotes, allowNumericLeadingZero, allowBackslashEscapingAnyCharacter, mode, columnNameOfCorruptRecord, dateFormat, timestampFormat, multiLine, allowUnquotedControlChars, charset)
    261             path = [path]
    262         if type(path) == list:
--> 263             return self._df(self._jreader.json(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    264         elif isinstance(path, RDD):
    265             def func(iterator):

/databricks/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161 

我做错了什么?

【问题讨论】:

    标签: json apache-spark parquet databricks


    【解决方案1】:

    您有以下数据

    data = [{'BG__c': 'ABC',
           'company': 'MCS',
           'createdAt': '2016-10-25T14:04:15Z',
           'id': 4,
           'email': 'email@domain.com'},
           {'BG__c': 'CDE',
           'company': 'MSC',
           'createdAt': '2018-03-28T16:41:06Z',
           'id': 10850879,
           'email': 'email@domain.com'}]
    

    为了将其保存到 parquet 文件,我建议创建一个 DataFrame 然后将其保存为 parquet。

    from pyspark.sql.types import *
    
    df = spark.createDataFrame(data,
                               schema = StructType([
                                        StructField("BC_g", StringType(), True),
                                        StructField("company", StringType(), True),
                                        StructField("createdAt", StringType(), True),
                                        StructField("email", StringType(), True),
                                        StructField("id", IntegerType(), True)]))
    

    这将给出以下类型:

    df.dtypes
    
    [('BC_g', 'string'),
     ('company', 'string'),
     ('createdAt', 'string'),
     ('email', 'string'),
     ('id', 'int')]
    

    然后您可以将数据框保存为镶木地板文件

    df.show()
    +-----+-------+--------------------+----------------+--------+
    |BG__c|company|           createdAt|           email|      id|
    +-----+-------+--------------------+----------------+--------+
    |  ABC|    MCS|2016-10-25T14:04:15Z|email@domain.com|       4|
    |  CDE|    MSC|2018-03-28T16:41:06Z|email@domain.com|10850879|
    +-----+-------+--------------------+----------------+--------+
    
    df.write.format('parquet').save(parquet_path_in_hdfs)
    

    其中 parquet_path_in_hdfs 是所需 parquet 文件的路径和名称

    【讨论】:

    • 你能否给我一个关于定义模式的简短提示?
    • 我更新了答案。您还可以查看 this answer 以帮助您将日期列从字符串转换为日期。
    【解决方案2】:

    根据代码中的以下语句,您正在直接写入数据。您必须首先创建数据框。您可以使用 val df = sqlContext.read.json("path/to/json/file") 将 json 转换为 df。然后执行 df.write

    data.write.parquet("adl://subscription.azuredatalakestore.net/folder1/Marketo/marketo_data")
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-09-28
      • 2023-03-25
      • 2020-05-10
      • 2020-11-05
      相关资源
      最近更新 更多