【问题标题】:downloading and uploading file to GCP bucket in Apache Beam DoFn(Google Dataflow)将文件下载并上传到 Apache Beam DoFn 中的 GCP 存储桶(Google Dataflow)
【发布时间】:2021-10-22 12:46:28
【问题描述】:

我正在尝试从 GCP 存储桶下载加密文件和密钥,然后解密文件并将其加载回存储桶。所以我构建了这个 DataFlow 管道,如下所示:

class downloadFile(beam.DoFn):
def __init__(self):
    self.bucket_name = 'bucket_name'
    self.source_blob_name = 'test.csv.gpg'
    self.destination_file_name = "/tmp/test.csv.gpg"

def process(self, element):
    from google.cloud import storage

    storage_client = storage.Client()
    bucket = storage_client.bucket(self.bucket_name)
    blob = bucket.blob(self.source_blob_name)
    blob.download_to_filename(self.destination_file_name)

这里我使用self.destination_file_name = "/tmp/test.csv.gpg",因为我从其他人那里了解到,DataFlow 作业将在 Linux VM 上运行,因此将文件下载到此 /tmp/ 路径是完全安全的。

class downloadKey(beam.DoFn):
def __init__(self):
    self.bucket_name = 'bucket_name'
    self.source_blob_name = 'privateKey.txt'
    self.destination_file_name = "/tmp/privateKey.txt"


def process(self, element):
    from google.cloud import storage

    storage_client = storage.Client()
    bucket = storage_client.bucket(self.bucket_name)
    blob = bucket.blob(self.source_blob_name)
    blob.download_to_filename(self.destination_file_name)

基本上,两个下载 DoFns 具有相同的结构。下载文件和密钥后,密钥将被导入运行VM的DataFlow:

class importKey(beam.DoFn):
def process(self, element):
    import subprocess
    subprocess.call(['gpg', '--import','/tmp/privateKey.txt'])

然后解密DoFn:

class decryption(beam.DoFn):
def process(self, element, *args, **kwargs):
    import subprocess
    subprocess.call(['gpg', '-d', '/tmp/test.csv.gpg > test.csv'])
    
    # load file back to bucket
    bucket_name = 'bucket_name'
    source_file_name = '/tmp/test.csv'
    destination_blob_name = "clearText.csv"

    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    blob.upload_from_filename(source_file_name)

所以这个解密 DoFn 将调用带有子进程的 gpg 命令来解密文件。

最后是管道本身:

dummyMessage = {"projectID":"fakeProjectID",
            "bucketID":"fakeBucketID"}

setp= (
    p
    | 'Create Sample'
            >> beam.Create([dummyMessage["projectID"]])
    |"testDecrypt" >> beam.ParDo(downloadLookupFile())
    |"testDecrypt2" >> beam.ParDo(downloadKey())
    |"testDecrypt3" >> beam.ParDo(importKey())
    |"testDecrypt4" >> beam.ParDo(decryption())
   )

这里我只是创建一个虚拟消息来调用管道,稍后将替换为真实消息。

当我运行管道时,看起来一切正常,我可以看到 DataFlow 中已创建作业,并且显示作业状态为成功。但是在存储桶中我看不到解密的文件。

我在代码中添加了几个打印语句来调试,似乎在 downloadFile() 和 downloadKey() 方法中,从未到达 process(),这意味着没有文件被处理过。任何人都可以分享一些有关如何在 DoFn 中访问 GCS 存储桶的知识吗?我不确定代码的哪一部分被磨损了,对我来说一切都很好。

任何帮助将不胜感激。

【问题讨论】:

    标签: google-cloud-platform google-cloud-dataflow apache-beam


    【解决方案1】:

    欢迎 Alex 使用 stackoverflow。

    • 首先,关于日志(打印语句),如果您没有看到它们,可能是因为您看错了地方。事实上,如果您将它们放在 DoFns 的 process 内(如 decryption 类),您需要查看 WORKER LOGS 而不是 JOB LOGS 或您的终端。在下面的屏幕截图中,我展示了您如何访问工作人员日志。作业日志或驱动程序日志是显示您在管道创建级别 (beam.Create ...) 添加的打印/日志的日志,如果您从终端运行作业,您可以在终端中看到它们。

    • 那么,恕我直言,数据流不是这种需求的正确处理平台。它用于并行分布式处理大文件块或大数据(比如说 > 2GB)。这意味着在幕后你有一个部分在一个工作节点(幕后的 GCE VM 实例)上处理,而另一块文件在另一个工作节点上处理。在您的情况下,如果您有超过 1 个工作人员,您可能会在一个节点中下载加密文件,在另一个节点中下载密钥,在第三个节点中进行解密。所以使用/tmp 会过时。

    • 最后一个变通解决方案是使用例如云函数 (CF),它将以单线程方式运行,并允许您重用不同 process 方法中的代码:

    1. CF 由将加密文件上传到 GCS 存储桶触发。这里有一些文档如何设置此类触发器:https://cloud.google.com/functions/docs/calling/storage(python 中有示例)
    2. 您的 CF 代码将下载加密密钥、解密并将解密的文件上传回另一个 GCS 存储桶。对于云功能,您可以将内存设置为最高 8GB,并使用 /tmp 使用后台内存。

    顺便说一下,安全方面,我认为将加密密钥存储在 GCS 中不是一个好习惯,尝试看看https://cloud.google.com/secret-manager

    【讨论】:

    • 感谢 MBHA Phoenix,对于像我这样的 GCP 新用户来说,这绝对是一个非常有用的指南和说明。关于你的第二点,我只是好奇。假设我使用'''num_workers = 1'''强制工人数量仅为1。根据我的理解,这意味着所有工作都将在一个节点中完成,这是否更有意义?虽然这不是一个聪明的方法,但仍然可行?
    • 是的,您可以将工作人员的数量强制为 1。你检查过日志吗?
    • 谢谢,是的,我已经检查了日志。只有一件事仍然让我感到困惑,正如您提到在 Dataflow 上运行此类作业可能会冒着在一个节点中下载加密文件、在另一个节点中下载密钥和在第三个节点中解密的风险。这是否意味着每个节点都有自己的存储位置,因此解密过程将无法调用,因为加密的文件和密钥存储在不同的位置?再次感谢~
    • 是的,我确认你明白了:每个工作节点都是一个拥有自己磁盘的 GCE 虚拟机(你可以使用 --disk_size_gb 为工作人员设置磁盘空间)所以如果你有这个风险更多的一个工人。
    • 感谢您的回答。真的很有帮助
    猜你喜欢
    • 2022-06-10
    • 2021-04-29
    • 2020-03-04
    • 2022-11-10
    • 1970-01-01
    • 1970-01-01
    • 2020-08-03
    • 2021-12-06
    • 1970-01-01
    相关资源
    最近更新 更多