【问题标题】:Dagster chaining resourcesDagster 链接资源
【发布时间】:2021-12-01 19:49:10
【问题描述】:

我最近选择了 Dagster 来评估作为 Airflow 的替代品。

我一直无法完全理解资源的概念,并想了解我正在尝试做的事情是否可行或可以通过不同的方式更好地实现。

我有一个像下面这样的帮助类来帮助保持代码干燥

from dagster import resource, solid, ModeDefinition, pipeline
from dagster_aws.s3 import s3_resource

class HelperAwsS3:
    def __init__(self, s3_resource):
        self.s3_resource = s3_resource

    def s3_list_bucket(self, bucket, prefix):
        return self.s3_resource.list_objects_v2(
            Bucket=bucket,
            Prefix=prefix
        )

    def s3_download_file(self, bucket, file, local_path):
        self.s3_resource.meta.client.download_file(
            Bucket=bucket,
            Key=file,
            Filename=local_path
        )

    def s3_upload_file(self, bucket, file, local_path):
        self.s3_resource.meta.client.upload_file(
            Bucket=bucket,
            Key=file,
            Filename=local_path
        )

s3_resource 实际上是 dagster_aws.s3.s3_resource,它将帮助我使用本地 aws 凭据连接到 AWS。

当我在下面的 @resource 部分进行调用时,我不确定如何将 s3_resource 传递给 HelperAwsS3。

@resource
def connection_helper_aws_s3_resource(context):
    return HelperAwsS3()

请指点一下?还是我做错了,需要以不同的方式做?

感谢您的帮助。

【问题讨论】:

    标签: dagster


    【解决方案1】:

    我在 dagster Slack 频道上发布了同样的问题,并很快得到了乐于助人的团队的回复。在这里发布它,以防它帮助某人 -

    保留您的 HelperAwsS3 类并编写您自己的使用 s3 资源的资源,它可能看起来像这样:

    @resource(required_resource_keys={"s3"})
    def connection_helper_aws_s3_resource(context):
        return HelperAwsS3(s3_resource=context.resources.s3)
    

    (然后确保在您的模式定义中包含 s3 资源和您的自定义资源:

    @pipeline(mode_defs=[ModeDefinition(
      resource_defs={"s3": s3_resource, "connection_helper_aws_s3": connection_helper_aws_s3_resource}
    )]):
      ...
    

    【讨论】:

      猜你喜欢
      • 2010-09-06
      • 1970-01-01
      • 2014-05-14
      • 1970-01-01
      • 2019-02-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-07-07
      相关资源
      最近更新 更多