Slackbot
07/28/2022, 12:49 AMChu
07/28/2022, 12:51 AM@task(name = 'add')
def func_1(param_1, param_2):
return param_1 + param_2
@task(name = 'minus')
def func_2(param_1, param_2):
return param_2 - param_1
with Flow('flow', executor=LocalDaskExecutor(scheduler="processes", num_workers=2)) as flow:
param_1 = Parameter("param_1")
param_2 = Parameter("param_2")
task_1 = func_1.map(
param_1 = param_1,
param_2 = unmapped(param_2)
)
task_2 = func_2.map(
param_1 = param_1,
param_2 = unmapped(param_2),
task_args=unmapped(dict(trigger=all_finished))
)
task_2.set_upstream(task_1, mapped=True)
if __name__ == "__main__":
flow.run(parameters = dict(param_1 = [1,2,3,4], param_2 = 5 ))
And I have this error:
File "/Users/acsjhba/opt/anaconda3/envs/py39/lib/python3.9/site-packages/prefect/core/task.py", line 810, in map
task_args = task_args.copy() if task_args else {}
AttributeError: 'unmapped' object has no attribute 'copy'
@task(name = 'minus', trigger = all_finished)
in the task decorator, but how can I do that in flow of flows, specifically for create_flow_runAnna Geller
Chu
07/28/2022, 12:19 PMdowntstream_flow_id.set_upstream(upstream_flow, mapped=True)
, correct?Anna Geller
Chu
07/28/2022, 12:33 PMAnna Geller
Chu
07/28/2022, 2:41 PM