Hello, need some help with mapped/ upstream tasks....
# ask-community
k
Hello, need some help with mapped/ upstream tasks. I'd like to run a mapped task, that will only start upon the success of another task but I am doing sth obviously wrong in the following simple example ( probably thinking of the parameter upstream_tasks wrong). Thanks in advance for your help
Copy code
import prefect
from prefect import task, Flow


@task(name="func_one")
def func_one():
    logger = prefect.context["logger"]
    <http://logger.info|logger.info>("Running func one")


@task(name="func_two")
def func_two(y):
    logger = prefect.context["logger"] #
    <http://logger.info|logger.info>("Running func_two with arg{}".format(y))
    return

with Flow("Test") as flow:
    task_func_one = func_one()
    task_func_two = func_two.map([1, 2, 3], upstream_tasks=[task_func_one])


flow.run()
g
Replacing this line:
Copy code
task_func_two = func_two.map([1, 2, 3], upstream_tasks=[task_func_one])
with these two lines:
Copy code
task_func_two = func_two.map([1, 2, 3])
    task_func_two.set_upstream(task_func_one)
does what I think you want, in that these logs are generated:
Copy code
[2021-03-09 16:07:17+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'Test'
[2021-03-09 16:07:17+0100] INFO - prefect.TaskRunner | Task 'func_one': Starting task run...
[2021-03-09 16:07:17+0100] INFO - prefect.func_one | Running func one
[2021-03-09 16:07:17+0100] INFO - prefect.TaskRunner | Task 'func_one': Finished task run for task with final state: 'Success'
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two': Starting task run...
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two': Finished task run for task with final state: 'Mapped'
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two[0]': Starting task run...
[2021-03-09 16:07:18+0100] INFO - prefect.func_two[0] | Running func_two with arg1
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two[0]': Finished task run for task with final state: 'Success'
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two[1]': Starting task run...
[2021-03-09 16:07:18+0100] INFO - prefect.func_two[1] | Running func_two with arg2
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two[1]': Finished task run for task with final state: 'Success'
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two[2]': Starting task run...
[2021-03-09 16:07:18+0100] INFO - prefect.func_two[2] | Running func_two with arg3
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two[2]': Finished task run for task with final state: 'Success'
[2021-03-09 16:07:18+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
k
that is true! many thanks
any reason why the former did not work ? the two methods seem equivalent
j
The following is also equivalent, but may give some clarity about the difference:
Copy code
with Flow("Test") as flow:
    task_func_one = func_one()
    task_func_two = func_two.map([1, 2, 3], upstream_tasks=[unmapped(task_func_one)])
k
right, so my first method weas trying to actually map across results of task one
👍 1
j
Unless an argument is wrapped with
unmapped
when passed to
.map
, prefect will attempt to map across items in the argument. In your first case,
task_func_one
wasn't wrapped in
unmapped
, so prefect tried to map across the value (but the value isn't iterable, so you get an error). In the version above we mark it as
unmapped
, so no issue. Likewise with calling
set_upstream
later, since in that case the argument is not mapped across.
k
thank you both for your quick help!