【问题标题】:transform data in azure data factory using python data bricks使用 python 数据块转换 azure 数据工厂中的数据
【发布时间】:2019-11-07 12:18:31
【问题描述】:

我的任务是将数百万个单个 JSON 文件转换并合并为 BIG CSV 文件。

使用复制活动和映射模式的操作将非常简单,我已经测试过,问题是大量文件的 JSON 格式不正确。

我知道错误是什么,而且修复也很简单,我想我可以使用 Python 数据砖活动来修复字符串,然后将输出传递给可以将记录合并为大 CSV 的复制活动文件。

我有这样的想法,我不确定这是否是解决此任务的正确方法。我不知道在 Data Brick 活动中使用 Copy Activy 的输出

【问题讨论】:

    标签: python azure azure-data-factory-2


    【解决方案1】:

    听起来您想使用 Azure 数据工厂转换大量单个 JSON 文件,但正如 @KamilNowinski 所说,它现在不支持 Azure。但是,现在您使用的是 Azure Databricks,编写一个简单的 Python 脚本来执行相同的操作对您来说更容易。因此,一种有效的解决方案是直接使用 Azure Storage SDK 和 pandas Python 包,通过 Azure Databricks 上的几个步骤来完成。

    1. 可能这些 JSON 文件都在 Azure Blob Storage 的容器中,因此您需要通过 list_blob_names 将它们列在容器中,并使用 pandas read_json 函数的 sas 令牌生成它们的 url,代码如下。

      from azure.storage.blob.baseblobservice import BaseBlobService
      from azure.storage.blob import ContainerPermissions
      from datetime import datetime, timedelta
      
      account_name = '<your account name>'
      account_key = '<your account key>'
      container_name = '<your container name>'
      
      service = BaseBlobService(account_name=account_name, account_key=account_key)
      token = service.generate_container_shared_access_signature(container_name, permission=ContainerPermissions.READ, expiry=datetime.utcnow() + timedelta(hours=1),)
      
      blob_names = service.list_blob_names(container_name)
      blob_urls_with_token = (f"https://{account_name}.blob.core.windows.net/{container_name}/{blob_name}?{token}" for blob_name in blob_names)
      
      #print(list(blob_urls_with_token))
      
    2. 然后,您可以通过 read_json 函数直接从 blob 中读取这些 JSON 文件,以创建它们的 pandas Dataframe。

      import pandas as pd
      
      for blob_url_with_token in blob_urls_with_token:
          df = pd.read_json(blob_url_with_token)
      

      即使您想将它们合并到一个大的 CSV 文件中,您也可以先通过 Combining / joining / merging 中列出的 pandas 函数将它们合并到一个大的 Dataframe 中,例如 append

    3. 要将数据框写入 csv 文件,我认为 to_csv 函数非常简单。或者,您可以在 Azure Databricks 上将 pandas 数据帧转换为 PySpark 数据帧,如下所示。

      from pyspark.sql import SQLContext
      from pyspark import SparkContext
      
      sc = SparkContext()
      sqlContest = SQLContext(sc)
      spark_df = sqlContest.createDataFrame(df)
      

    接下来,无论您想做什么,都很简单。如果你想在 Azure Databricks 中将脚本调度为 notebook,可以参考官方文档Jobs 运行 Spark 作业。

    希望对你有帮助。

    【讨论】:

    • 谢谢!这有帮助,我将无法使用 Pandas 直接读取 JSON 文件,因为很多文件都有错误,实际上,我已经有一个 Python 脚本可以从 blob 读取文件修复它们,如有必要,创建一个 CSV 流和将它放回另一个 Blob 容器,我在本地运行它,现在我将尝试使用 Databricks,这些文件可能非常大,我每天的日志记录多达 115k 条记录,并且有几个月的时间处理中,不知道会不会导致内存出现问题
    【解决方案2】:

    将 JSON 文件复制到存储(例如 BLOB),您可以从 Databricks 访问存储。然后,您可以使用 Python 修复文件,甚至可以在集群运行时转换为所需的格式。

    因此,在复制数据活动中,如果您还没有将文件复制到 BLOB。

    【讨论】:

      猜你喜欢
      • 2018-10-29
      • 1970-01-01
      • 2021-11-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-11-17
      • 1970-01-01
      相关资源
      最近更新 更多