【问题标题】:How do i pass resources to Jobs so it can be accessible to it's ops我如何将资源传递给 Jobs,以便它的操作可以访问它
【发布时间】:2021-12-29 15:07:04
【问题描述】:

我是 dagster 的新手,我正在尝试通过调用它的作业将资源传递给 dagster 操作,即使在遵循文档之后我也遇到问题,我不确定是否需要再次将配置传递给工作似乎没有任何工作。这是代码。

错误dagster.core.errors.DagsterInvalidConfigError: Error in config for job Error 1: Missing required config entry "resources" at the root.

import os
from dotenv import load_dotenv
load_dotenv()


@op
def return_one(context):
    context.log.info(f'return_one {os.environ.get("BUCKET")}')
    return 1


@op(required_resource_keys={"boto3_connection"})
def add_two(context, i: int):
    context.log.info(f'##### {context.resources.boto3_connection.get_client()}')
    return i + 2


@op
def multi_three(i: int):
    return i * 3


class Boto3Connector(object):
    def __init__(self, aws_access_key_id, aws_secret_access_key):
        self.aws_access_key_id = aws_access_key_id
        self.aws_secret_access_key = aws_secret_access_key


    def get_client(self, resource="s3"):
        session = boto3.session.Session()

        session_client = session.client(
            service_name=resource,
            aws_access_key_id=self.aws_access_key_id,
            aws_secret_access_key=self.aws_secret_access_key,
        )
        return session_client


@resource(
    config_schema={
        'aws_access_key_id': StringSource,
        'aws_secret_access_key': StringSource
    })
def boto3_connection(context):
    return Boto3Connector(
        context.resource_config['aws_access_key_id'],
        context.resource_config['aws_secret_access_key']
    )


@job(resource_defs={'boto3_connection': boto3_connection})
def my_job():
    multi_three(add_two(return_one()))```

【问题讨论】:

    标签: python dagster


    【解决方案1】:

    我的问题是假设资源配置是自动传递的,但您必须在作业配置中指定它们。所以只是添加了配置。

    @job(resource_defs={'boto3_connection': boto3_connection},
            config={'resources':
            { "boto3_connection": {
                "config": {
                    "aws_access_key_id": {"env": "AWS_ACCESS_KEY_ID"},
                    "aws_secret_access_key": {"env": "AWS_SECRET_ACCESS_KEY"},
                }
            }}})
    def my_job():
       multi_three(add_two(return_one()))`
    

    dagster slack上被指出了正确的方向

    【讨论】:

      猜你喜欢
      • 2012-11-01
      • 2011-07-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-10-02
      • 1970-01-01
      • 2018-01-26
      • 1970-01-01
      相关资源
      最近更新 更多