【问题标题】:Write the results of the Google Api to a data lake with Databricks使用 Databricks 将 Google Api 的结果写入数据湖
【发布时间】:2019-04-11 08:30:08
【问题描述】:

我正在通过 Databricks 上的 Python SDK 从 Google Admin Report User Usage Api 取回用户使用数据。数据大小约为每天 100 000 条记录,我通过批处理进行了一个晚上。该 api 返回的最大页面大小为 1000,因此我粗略地将其称为 1000 以获取我当天需要的数据。这工作正常。

我的最终目标是以原始格式将数据存储在数据湖中(Azure Gen2,但与此问题无关)。稍后,我将使用 Databricks 将数据转换为聚合报告模型,并将 PowerBI 置于其之上,以跟踪 Google App 的使用情况。

作为一名 C# 程序员,我是 Python 和 Spark 的新手:我目前的做法是从 api 请求 1000 条记录的第一页,然后将其作为 JSON 文件直接写入 datalake,然后获取下一个页面集和也写那个。文件夹结构类似于“\raw\googleuser\YYYY\MM\DD\data1.json”。

我希望在原始区域中尽可能以最原始的形式保存数据,并且不要应用太多转换。第二个过程可以提取我需要的字段,用元数据标记它并将其写回 Parquet 以供函数使用。这就是为什么我想把它写成 JSON。

这意味着第二个过程需要将 JSON 读取到数据帧中,我可以在其中对其进行转换并将其写入 parquet(这部分也很简单)。

因为我使用的是 Google Api,所以我没有使用 Json - 它返回 dict 对象(具有复杂的嵌套)。我可以使用 json.dump() 将其提取为 Json 字符串,但我无法弄清楚如何将 STRING 直接写入我的数据湖。一旦我将它放入数据帧,我就可以轻松地以任何格式编写它,但是将它从 Json 转换为数据帧然后基本上返回 Json 只是为了编写它似乎是一种性能开销。

这是我尝试过的事情和结果:

  1. 建立一个 pyspark.sql.Rows 列表并在所有分页(100k 行)的末尾 - 使用 spark.createDataFrame(rows) 将其转换为数据帧。一旦它是一个数据框,我就可以将它保存为一个 Json 文件。这可行,但似乎效率低下。
  2. 使用 json.dump(request) 获取 Json 中 1000 条记录的字符串。我可以使用以下代码将其写入 Databricks 文件系统:

    with open("/dbfs/tmp/googleuserusagejsonoutput-{0}.json" .format(keyDateFilter), 'w') as f: f.write(json.dumps(response))

    但是,我必须将其移至我的 Azure 数据湖:

    dbutils.fs.cp("/tmp/test_dbfs1.txt", datalake_path + dbfs_path + "xyz.json")

    然后我得到接下来的 1000 条记录并继续这样做。我似乎无法将 open() 方法目录用于数据湖存储(Azure abfss 驱动程序),否则这将是一个不错的解决方案。先在本地转储,然后再移动,似乎很脆弱,也很奇怪。

  3. 与选项 1 相同,但每隔 1000 条记录将数据帧转储到 datalake 并覆盖它(这样内存一次不会增加超过 1000 条记录)

  4. 忽略转储原始 Json 的规则。将数据按摩成我想要的最简单的格式,并删除我不需要的所有额外数据。这将导致占用空间小得多,然后将遵循上面的选项 1 或 3。 (这是第二个问题 - 以原始格式保存来自 Api 的所有数据的原则,以便随着时间的推移需求发生变化,我总是在数据湖中拥有历史数据,并且可以更改转换例程以从中提取不同的指标因此,我不愿意在这个阶段删除任何数据。

请多多指教...

【问题讨论】:

    标签: python apache-spark azure-data-lake databricks google-api-python-client


    【解决方案1】:

    将湖安装到您的数据块环境中,这样您就可以将其保存到湖中,就像它是一个普通文件夹一样:

    with open('/dbfs/mnt/mydatalake/googleuserusagejsonoutput-{0}.json', 'wb') as f:
                json.dump(data, codecs.getwriter('utf-8')(f), sort_keys = True, indent = 4, ensure_ascii=False)
                f.close()
    

    你只需要登上湖面一次:

    https://docs.databricks.com/spark/latest/data-sources/azure/azure-datalake-gen2.html#mount-the-azure-data-lake-storage-gen2-filesystem-with-dbfs

    话虽如此,

    以 json 格式存储大数据不是最优的;对于每个值(单元格),您都存储了键(列名),因此您的数据将比需要的大得多。此外,您可能应该具有重复数据删除功能以确保(1)数据中没有间隙,以及(2)您没有将相同的数据存储在多个文件中。 Databricks delta 负责处理。

    https://docs.databricks.com/delta/delta-intro.html

    【讨论】:

    • 谢谢,我试试挂载DL,直接写。您对整体方法有什么建议吗?例如,您提到将大数据存储在 Json 中并不是最优的。什么被认为是大的?这些文件每个 12 兆(包含 1000 条记录 - 每条记录有 188 个参数)。我可以预先整理一些数据以减小这个大小并将数据扁平化为列格式(然后我可以将其存储为镶木地板),但是我认为原始区域的原理是完全按原样转储数据从源系统接收并在后续方法中对其进行处理。想法?
    • 好问题。这取决于您的数据对数据丢失的敏感程度。我没有使用过 google api,但我想如果数据错误或者你搞砸了转换,你可以再次查询它。你绝对可以走那条路,如果简单的文件转换错误,如果我可以轻松地再次访问数据,我个人不会认为文件格式是登陆后需要进行的转换。如果没有办法再次获取数据,登陆时一定要保存整个JSON。只要所有数据都在那里,我认为格式并不重要。
    • 太棒了。它现在可以工作了 - 非常令人困惑,但这段文字在这里总结了它“当您使用 Spark API 时,您使用“/mnt/training/file.csv”或“dbfs:/mnt/training/file”引用文件。 csv”。如果您使用本地文件 API,则必须提供 /dbfs 下的路径,例如:“/dbfs/mnt/training/file.csv”。使用 Spark API 时,不能使用 dbfs 下的路径。” . docs.databricks.com/user-guide/dbfs-databricks-file-system.html。因此,有时您需要为 Spark API 执行 /mtn/,然后对于本地 API,您需要使用 /dbfs/mnt/。
    • WRT 将原始 json 展平:Google API 只能滚动回溯 1.5 年,所以我想我会尽量保持原样(并提取所有可能的字段,无论我是否需要它们)或暂时没有)。然后在未来,我至少拥有从我开始这个过程开始的原始格式的所有数据,但我认为你的观点是文件格式不被视为转换。为帮助干杯。
    • 是的,使用 spark 打开文件已经在 dbfs 中查看:spark.read.json("/mnt/pathandfile.json"),但是在打开文件时必须在 mnt 之前添加 /dbfs示例中的python。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-02
    • 1970-01-01
    • 2019-04-22
    • 2021-06-07
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多