【发布时间】:2026-01-13 20:55:01
【问题描述】:
我还是 Apache Beam/Cloud Dataflow 的新手,所以如果我的理解不正确,我深表歉意。
我正在尝试通过管道读取约 30,000 行长的数据文件。我的简单管道首先从 GCS 打开 csv,从数据中提取标头,通过 ParDo/DoFn 函数运行数据,然后将所有输出写入 csv 回 GCS。该管道有效,是我的第一个测试。
然后我编辑了管道以读取 csv,提取标题,从数据中删除标题,通过 ParDo/DoFn 函数运行数据,并将标题作为侧输入,然后将所有输出写入一个.csv。唯一的新代码是将标头作为侧面输入传入并从数据中过滤。
ParDo/DoFn 函数 build_rows 只生成 context.element,以便我可以确保我的辅助输入正常工作。
我得到的错误如下:
我不确定问题是什么,但我认为这可能是由于内存限制。我将示例数据从 30,000 行减少到 100 行,我的代码终于可以工作了。
没有侧输入的管道会读/写所有 30,000 行,但最终我需要侧输入来对我的数据进行转换。
如何修复我的管道,以便我可以处理来自 GCS 的大型 csv 文件,并且仍然使用边输入作为文件的伪全局变量?
【问题讨论】:
-
*注意:这是在本地测试的。我在添加代码时一直在做增量测试。如果它在本地工作,那么我在 Google Cloud Dataflow 上运行它以确保它也在那里运行。如果它在 Cloud Dataflow 中有效,那么我会添加更多代码。
标签: python google-cloud-dataflow apache-beam