【问题标题】:How to read file names from GCP buckets recursively using a composer DAG如何使用作曲家 DAG 从 GCP 存储桶中递归读取文件名
【发布时间】:2019-11-22 00:12:16
【问题描述】:

我正在尝试使用作曲家 DAG 从所有文件夹、存储桶下的子文件夹递归地读取 GCS 存储桶中的文件名。可能吗?例如,我有如下所述的带有相应文件夹和子文件夹的存储桶。 static 是存储桶名称。

静态/folder1/subfolder1/file1.json 静态/文件夹1/子文件夹2/file2.json 静态/文件夹1/子文件夹3/file3.json 静态/文件夹1/子文件夹3/file4.json

我想递归读取文件并将数据放入两个变量中,如下所示。

桶名 = 静态 文件路径 = static/folder1/subfolder3/file4.json

【问题讨论】:

  • 您使用的是哪个运算符?或者这是问题的主题:使用 composer 在 Storage 中读取的正确操作符是什么?

标签: google-cloud-platform composer-php airflow directed-acyclic-graphs


【解决方案1】:

您可以使用 Airflow 的 BashOperator 来使用 GCS CLI 工具 (docs here)。

示例如下:

read_files = BashOperator(
    task_id='read_files',
    bash_command='gsutil ls -r gs://bucket',
    dag=dag,
)

编辑:由于您想捕获输出,而 BashOperator 仅将 stdout 的最后一行推送到 XCom,我建议使用 PythonOperator 调用自定义 Python 可调用对象,该调用使用 GCS API 甚至CLI 工具通过subprocess 收集所有文件名并将其推送到 XCom 以供下游任务后续使用。除非您根本不需要其他任务来使用这些数据,在这种情况下,您可以随意处理它(从问题中不清楚)。

【讨论】:

  • 感谢您的回答,使用带有 GCS API 的 python 运算符来执行此操作。
  • @ShilabaRoul 很酷,很高兴我能帮上忙 :)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-04-01
  • 1970-01-01
  • 2021-03-27
  • 1970-01-01
相关资源
最近更新 更多