https://prefect.io logo
#prefect-community
Title
# prefect-community
c

Chu

07/28/2022, 11:42 AM
Hi, I have a question about why task args cannot have copy, this seems affect my creat flow run properly using trigger arg See thread for code
1
code:
Copy code
@task(name = 'add')
def func_1(param_1, param_2):
    return param_1 + param_2
Copy code
@task(name = 'minus')
def func_2(param_1, param_2):
    return param_2 - param_1
Copy code
with Flow('flow', executor=LocalDaskExecutor(scheduler="processes", num_workers=2)) as flow:
    param_1 = Parameter("param_1")
    param_2 = Parameter("param_2")
    task_1 = func_1.map(
        param_1 = param_1, 
        param_2 = unmapped(param_2)
        )
    task_2 = func_2.map(
        param_1 = param_1, 
        param_2 = unmapped(param_2),
        task_args=unmapped(dict(trigger=all_finished))
        )
Copy code
task_2.set_upstream(task_1, mapped=True)
Copy code
if __name__ == "__main__":
    flow.run(parameters = dict(param_1 = [1,2,3,4], param_2 = 5 ))
And I have this error:
Copy code
File "/Users/acsjhba/opt/anaconda3/envs/py39/lib/python3.9/site-packages/prefect/core/task.py", line 810, in map
    task_args = task_args.copy() if task_args else {}
AttributeError: 'unmapped' object has no attribute 'copy'
I know for single task, I can set @task(name = 'minus', trigger = all_finished) in the task decorator, but how can I do that in flow of flows, specifically for create_flow_run
2 Views