【发布时间】:2021-12-01 19:49:10
【问题描述】:
我最近选择了 Dagster 来评估作为 Airflow 的替代品。
我一直无法完全理解资源的概念,并想了解我正在尝试做的事情是否可行或可以通过不同的方式更好地实现。
我有一个像下面这样的帮助类来帮助保持代码干燥
from dagster import resource, solid, ModeDefinition, pipeline
from dagster_aws.s3 import s3_resource
class HelperAwsS3:
def __init__(self, s3_resource):
self.s3_resource = s3_resource
def s3_list_bucket(self, bucket, prefix):
return self.s3_resource.list_objects_v2(
Bucket=bucket,
Prefix=prefix
)
def s3_download_file(self, bucket, file, local_path):
self.s3_resource.meta.client.download_file(
Bucket=bucket,
Key=file,
Filename=local_path
)
def s3_upload_file(self, bucket, file, local_path):
self.s3_resource.meta.client.upload_file(
Bucket=bucket,
Key=file,
Filename=local_path
)
s3_resource 实际上是 dagster_aws.s3.s3_resource,它将帮助我使用本地 aws 凭据连接到 AWS。
当我在下面的 @resource 部分进行调用时,我不确定如何将 s3_resource 传递给 HelperAwsS3。
@resource
def connection_helper_aws_s3_resource(context):
return HelperAwsS3()
请指点一下?还是我做错了,需要以不同的方式做?
感谢您的帮助。
【问题讨论】:
标签: dagster