https://prefect.io logo
#prefect-community
Title
# prefect-community
w

William Jamir

06/18/2022, 7:31 AM
Hi, I’m receiving this error when trying to use
upstream_tasks
Does someone know what I’m doing wrong? My intention is to execute
task_3
only after
task_1
and `task_2`is finished (since it depends on the output of them) and start
task_4
only after the
task_3
is finished How can I accomplish that? (Code and output error in thread)
Copy code
from prefect import Flow, task, unmapped


@task
def task_1():
    return [1,2]


@task
def task_2():
    return 2


@task
def task_3(parameter1, parameter2):
    if parameter1 + parameter2 >= 50:
        raise ValueError('Value is too high')

@task
def task_4(parameter1, parameter2):
    print(parameter1, parameter2)

with Flow(name='a') as flow:
    p1 = task_1()
    p2 = task_2()
    task_3.map(parameter1=p1, parameter2=unmapped(p2))
    task_4(p1, p2, upstream_tasks=[task_3])

flow.run()
Copy code
[2022-06-18 09:38:54+0200] INFO - prefect.FlowRunner | Beginning Flow run for 'a'
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_1': Starting task run...
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_1': Finished task run for task with final state: 'Success'
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_2': Starting task run...
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_2': Finished task run for task with final state: 'Success'
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_3': Starting task run...
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_3': Finished task run for task with final state: 'Mapped'
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_3[0]': Starting task run...
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_3[0]': Finished task run for task with final state: 'Success'
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_3[1]': Starting task run...
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_3[1]': Finished task run for task with final state: 'Success'
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_3': Starting task run...
[2022-06-18 09:38:54+0200] ERROR - prefect.TaskRunner | Task 'task_3': Exception encountered during task execution!
Traceback (most recent call last):
  File "/Users/william/.pyenv/versions/myenv/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 880, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/Users/william/.pyenv/versions/emyenv/lib/python3.9/site-packages/prefect/utilities/executors.py", line 468, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
TypeError: task_3() missing 2 required positional arguments: 'parameter1' and 'parameter2'
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_3': Finished task run for task with final state: 'Failed'
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_4': Starting task run...
[2022-06-18 09:38:54+0200] INFO - prefect.TaskRunner | Task 'task_4': Finished task run for task with final state: 'TriggerFailed'
[2022-06-18 09:38:54+0200] INFO - prefect.FlowRunner | Flow run FAILED: some reference tasks failed.

Process finished with exit code 0
a

Anna Geller

06/18/2022, 11:54 AM
Try assigning task3.map to a variable, e.g. t3 and pass t3 as upstream_tasks
k

Kevin Kho

06/18/2022, 4:23 PM
I think it’s:
Copy code
with Flow(name='a') as flow:
    p1 = task_1()
    p2 = task_2()
    a = task_3.map(parameter1=p1, parameter2=unmapped(p2))
    task_4(p1, p2, upstream_tasks=[unmapped(a)])
like Anna suggested
👍 1
w

William Jamir

06/19/2022, 7:21 PM
Thanks Kevin and Anna! I see know my issue, I dont know why I thought that the correct approach would be to refer directly to the decorated function.
🙌 1
10 Views