【问题标题】:Airflow Python Operator with a. return type带有 a. 的气流 Python 运算符。返回类型
【发布时间】:2019-07-09 03:26:07
【问题描述】:

我的 DAG 中有一个 python 运算符。 python 可调用函数返回一个布尔值。但是,当我运行 DAG 时,出现以下错误。

TypeError: 'bool' 对象不可调用

我修改了函数以不返回任何内容,但我再次收到以下错误

错误 - 'NoneType' 对象不可调用

下面是我的日记

def check_poke(threshold,sleep_interval):
flag=snowflake_poke(1000,10).poke()
#print(flag)
return flag

dependency = PythonOperator(
task_id='poke_check',
#python_callable=check_poke(129600,600),
provide_context=True,
python_callable=check_poke(129600,600),
dag=dag)

end = BatchEndOperator(
queue=QUEUE,
dag=dag)

start.set_downstream(dependency)
dependency.set_downstream(end)

无法弄清楚我错过了什么。有人可以帮我解决这个问题...对气流来说还很陌生。

我在 dag 中编辑了 python 运算符,如下所示

dependency = PythonOperator(
task_id='poke_check',
provide_context=True,
python_callable=check_poke(129600,600),
dag=dag)

但是现在,我得到了一个不同的错误。

Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/airflow/models.py", line 1245, in run
    result = task_copy.execute(context=context)
File "/usr/local/lib/python2.7/dist-packages/airflow/operators/python_operator.py", line 66, in execute
    return_value = self.python_callable(*self.op_args, **self.op_kwargs)
TypeError: () takes no arguments (25 given)
[2019-02-15 05:30:25,375] {models.py:1298} INFO - Marking task as UP_FOR_RETRY
[2019-02-15 05:30:25,393] {models.py:1327} ERROR - () takes no arguments (25 given)

【问题讨论】:

  • 显然这个问题演变成this

标签: python airflow


【解决方案1】:

参数名称泄露了它。您传递的是调用的结果,而不是可调用的。

python_callable=check_poke(129600,600)

第二个错误表明可调用对象是使用 25 个参数调用的。所以lambda: 不起作用。以下方法可行,但忽略 25 个参数确实值得怀疑。

python_callable=lambda *args, **kwargs: check_poke(129600,600)

【讨论】:

    【解决方案2】:

    同意 @Dan D. 的意见;但令人困惑的是为什么他的解决方案不起作用(它肯定适用于python shell

    看看这是否能帮你找到好运(它只是 @Dan D. 解决方案的冗长变体)

    from typing import Callable
    
    # your original check_poke function
    def check_poke(arg_1: int, arg_2: int) -> bool:
        # do something
        # somehow returns a bool
        return arg_1 < arg_2
    
    # a function that returns a callable, that in turn invokes check_poke
    # with the supplied params
    def check_poke_wrapper_creator(arg_1: int, arg_2: int) -> Callable[[], bool]:
        def check_poke_wrapper() -> bool:
            return check_poke(arg_1=arg_1, arg_2=arg_2)
    
        return check_poke_wrapper
    
    ..
    
    # usage
    python_callable=check_poke_wrapper_creator(129600, 600)
    

    【讨论】:

      【解决方案3】:

      代码需要一个可调用的,而不是结果(正如已经指出的那样)。
      您可以使用functools.Partial 来填写参数:

      from functools import partial
      
      def check_poke(threshold,sleep_interval):
         flag=snowflake_poke(1000,10).poke()
         return flag
      
      func = partial(check_poke, 129600, 600)
      dependency = PythonOperator(
          task_id='poke_check',
          provide_context=True,
          python_callable=func,
          dag=dag)
      

      【讨论】:

        猜你喜欢
        • 2015-04-26
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-09-01
        • 1970-01-01
        • 2019-07-10
        • 2019-05-13
        • 2015-01-29
        相关资源
        最近更新 更多