另一种解决方案是使用工作流工具 dask。虽然它在语法上不如...
var
| do this
| then do that
...它仍然允许您的变量沿链向下流动,并且使用 dask 可以在可能的情况下提供并行化的额外好处。
以下是我使用 dask 完成管道链模式的方法:
import dask
def a(foo):
return foo + 1
def b(foo):
return foo / 2
def c(foo,bar):
return foo + bar
# pattern = 'name_of_behavior': (method_to_call, variables_to_pass_in, variables_can_be_task_names)
workflow = {'a_task':(a,1),
'b_task':(b,'a_task',),
'c_task':(c,99,'b_task'),}
#dask.visualize(workflow) #visualization available.
dask.get(workflow,'c_task')
# returns 100
在使用了 elixir 之后,我想在 Python 中使用管道模式。这不是完全相同的模式,但它是相似的,就像我说的那样,带有并行化的额外好处;如果您告诉 dask 在您的工作流程中获取不依赖于其他人先运行的任务,它们将并行运行。
如果您想要更简单的语法,您可以将其包装在可以为您处理任务命名的东西中。当然,在这种情况下,您需要所有函数都将管道作为第一个参数,并且您将失去任何并行化的好处。但是,如果您对此感到满意,则可以执行以下操作:
def dask_pipe(initial_var, functions_args):
'''
call the dask_pipe with an init_var, and a list of functions
workflow, last_task = dask_pipe(initial_var, {function_1:[], function_2:[arg1, arg2]})
workflow, last_task = dask_pipe(initial_var, [function_1, function_2])
dask.get(workflow, last_task)
'''
workflow = {}
if isinstance(functions_args, list):
for ix, function in enumerate(functions_args):
if ix == 0:
workflow['task_' + str(ix)] = (function, initial_var)
else:
workflow['task_' + str(ix)] = (function, 'task_' + str(ix - 1))
return workflow, 'task_' + str(ix)
elif isinstance(functions_args, dict):
for ix, (function, args) in enumerate(functions_args.items()):
if ix == 0:
workflow['task_' + str(ix)] = (function, initial_var)
else:
workflow['task_' + str(ix)] = (function, 'task_' + str(ix - 1), *args )
return workflow, 'task_' + str(ix)
# piped functions
def foo(df):
return df[['a','b']]
def bar(df, s1, s2):
return df.columns.tolist() + [s1, s2]
def baz(df):
return df.columns.tolist()
# setup
import dask
import pandas as pd
df = pd.DataFrame({'a':[1,2,3],'b':[1,2,3],'c':[1,2,3]})
现在,使用此包装器,您可以按照以下任一语法模式创建管道:
# wf, lt = dask_pipe(initial_var, [function_1, function_2])
# wf, lt = dask_pipe(initial_var, {function_1:[], function_2:[arg1, arg2]})
像这样:
# test 1 - lists for functions only:
workflow, last_task = dask_pipe(df, [foo, baz])
print(dask.get(workflow, last_task)) # returns ['a','b']
# test 2 - dictionary for args:
workflow, last_task = dask_pipe(df, {foo:[], bar:['string1', 'string2']})
print(dask.get(workflow, last_task)) # returns ['a','b','string1','string2']