【发布时间】:2022-01-07 17:22:51
【问题描述】:
我正在使用将 JSON 条目批量推送到我的 Gcloud Storage 存储桶的管道。我想把这些数据导入 Kafka。
我现在要做的方式是使用 lambda 函数,该函数每分钟触发一次以查找已更改的文件,从中打开流,逐行读取并每隔一段时间将这些行作为消息批处理卡夫卡生产者。
这个过程非常糟糕,但它确实有效....最终。
我希望有一种方法可以使用 Kafka Connect 或 Flink 来做到这一点,但在感知存储桶的增量文件添加方面确实没有太多开发。
【问题讨论】:
-
文件实际更改的频率如何?是否可以将 lambda 更改为仅在检测到文件修改而不是“每分钟”时运行?
-
@OneCricketeer 文件每秒都会出现。每秒最多 5000 个文件。
-
它们有多大?在将它们写入 GCS 以批处理这些事件之前,没有什么可以做的吗?
-
每个大约 0.5MB -1MB。它们不能再从推送端进行批处理,在它们进入存储桶后我必须进行任何批处理。
-
根据我的经验,听起来您可能想考虑更改为“Delta Lake”或 Apache Hudi 之类的东西,而不是原始 GCS 文件系统
标签: google-cloud-platform apache-kafka google-cloud-storage apache-flink