Konstantinos
03/09/2021, 2:59 PMKonstantinos
03/09/2021, 2:59 PMimport prefect
from prefect import task, Flow
@task(name="func_one")
def func_one():
logger = prefect.context["logger"]
<http://logger.info|logger.info>("Running func one")
@task(name="func_two")
def func_two(y):
logger = prefect.context["logger"] #
<http://logger.info|logger.info>("Running func_two with arg{}".format(y))
return
with Flow("Test") as flow:
task_func_one = func_one()
task_func_two = func_two.map([1, 2, 3], upstream_tasks=[task_func_one])
flow.run()Greg Roche
03/09/2021, 3:09 PMtask_func_two = func_two.map([1, 2, 3], upstream_tasks=[task_func_one])
with these two lines:
task_func_two = func_two.map([1, 2, 3])
task_func_two.set_upstream(task_func_one)
does what I think you want, in that these logs are generated:
[2021-03-09 16:07:17+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'Test'
[2021-03-09 16:07:17+0100] INFO - prefect.TaskRunner | Task 'func_one': Starting task run...
[2021-03-09 16:07:17+0100] INFO - prefect.func_one | Running func one
[2021-03-09 16:07:17+0100] INFO - prefect.TaskRunner | Task 'func_one': Finished task run for task with final state: 'Success'
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two': Starting task run...
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two': Finished task run for task with final state: 'Mapped'
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two[0]': Starting task run...
[2021-03-09 16:07:18+0100] INFO - prefect.func_two[0] | Running func_two with arg1
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two[0]': Finished task run for task with final state: 'Success'
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two[1]': Starting task run...
[2021-03-09 16:07:18+0100] INFO - prefect.func_two[1] | Running func_two with arg2
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two[1]': Finished task run for task with final state: 'Success'
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two[2]': Starting task run...
[2021-03-09 16:07:18+0100] INFO - prefect.func_two[2] | Running func_two with arg3
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two[2]': Finished task run for task with final state: 'Success'
[2021-03-09 16:07:18+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeededKonstantinos
03/09/2021, 3:10 PMKonstantinos
03/09/2021, 3:11 PMJim Crist-Harif
03/09/2021, 3:14 PMwith Flow("Test") as flow:
task_func_one = func_one()
task_func_two = func_two.map([1, 2, 3], upstream_tasks=[unmapped(task_func_one)])Konstantinos
03/09/2021, 3:16 PMJim Crist-Harif
03/09/2021, 3:16 PMunmapped when passed to .map, prefect will attempt to map across items in the argument. In your first case, task_func_one wasn't wrapped in unmapped, so prefect tried to map across the value (but the value isn't iterable, so you get an error). In the version above we mark it as unmapped, so no issue. Likewise with calling set_upstream later, since in that case the argument is not mapped across.Konstantinos
03/09/2021, 3:17 PM