William Jamir
06/18/2022, 7:31 AMupstream_tasks
Does someone know what I’m doing wrong?
My intention is to execute task_3
only after task_1
and `task_2`is finished (since it depends on the output of them) and start task_4
only after the task_3
is finished
How can I accomplish that?
(Code and output error in thread)from prefect import Flow, task, unmapped
@task
def task_1():
return [1,2]
@task
def task_2():
return 2
@task
def task_3(parameter1, parameter2):
if parameter1 + parameter2 >= 50:
raise ValueError('Value is too high')
@task
def task_4(parameter1, parameter2):
print(parameter1, parameter2)
with Flow(name='a') as flow:
p1 = task_1()
p2 = task_2()
task_3.map(parameter1=p1, parameter2=unmapped(p2))
task_4(p1, p2, upstream_tasks=[task_3])
flow.run()
[2022-06-18 09:38:54+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'a'
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_1': Starting task run...
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_1': Finished task run for task with final state: 'Success'
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_2': Starting task run...
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_2': Finished task run for task with final state: 'Success'
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_3': Starting task run...
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_3': Finished task run for task with final state: 'Mapped'
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_3[0]': Starting task run...
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_3[0]': Finished task run for task with final state: 'Success'
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_3[1]': Starting task run...
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_3[1]': Finished task run for task with final state: 'Success'
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_3': Starting task run...
[2022-06-18 09:38:54+0200] ERROR - prefect.TaskRunner | Task 'task_3': Exception encountered during task execution!
Traceback (most recent call last):
File "/Users/william/.pyenv/versions/myenv/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/Users/william/.pyenv/versions/emyenv/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
TypeError: task_3() missing 2 required positional arguments: 'parameter1' and 'parameter2'
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_3': Finished task run for task with final state: 'Failed'
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_4': Starting task run...
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_4': Finished task run for task with final state: 'TriggerFailed'
[2022-06-18 09:38:54+0200] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.
Process finished with exit code 0
Anna Geller
Kevin Kho
with Flow(name='a') as flow:
p1 = task_1()
p2 = task_2()
a = task_3.map(parameter1=p1, parameter2=unmapped(p2))
task_4(p1, p2, upstream_tasks=[unmapped(a)])
like Anna suggestedWilliam Jamir
06/19/2022, 7:21 PM