Konstantinos
03/09/2021, 2:59 PMKonstantinos
03/09/2021, 2:59 PMimport prefect
from prefect import task, Flow
@task(name="func_one")
def func_one():
logger = prefect.context["logger"]
<http://logger.info|logger.info>("Running func one")
@task(name="func_two")
def func_two(y):
logger = prefect.context["logger"] #
<http://logger.info|logger.info>("Running func_two with arg{}".format(y))
return
with Flow("Test") as flow:
task_func_one = func_one()
task_func_two = func_two.map([1, 2, 3], upstream_tasks=[task_func_one])
flow.run()
Greg Roche
03/09/2021, 3:09 PMtask_func_two = func_two.map([1, 2, 3], upstream_tasks=[task_func_one])
with these two lines:
task_func_two = func_two.map([1, 2, 3])
task_func_two.set_upstream(task_func_one)
does what I think you want, in that these logs are generated:
[2021-03-09 16:07:17+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'Test'
[2021-03-09 16:07:17+0100] INFO - prefect.TaskRunner | Task 'func_one': Starting task run...
[2021-03-09 16:07:17+0100] INFO - prefect.func_one | Running func one
[2021-03-09 16:07:17+0100] INFO - prefect.TaskRunner | Task 'func_one': Finished task run for task with final state: 'Success'
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two': Starting task run...
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two': Finished task run for task with final state: 'Mapped'
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two[0]': Starting task run...
[2021-03-09 16:07:18+0100] INFO - prefect.func_two[0] | Running func_two with arg1
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two[0]': Finished task run for task with final state: 'Success'
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two[1]': Starting task run...
[2021-03-09 16:07:18+0100] INFO - prefect.func_two[1] | Running func_two with arg2
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two[1]': Finished task run for task with final state: 'Success'
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two[2]': Starting task run...
[2021-03-09 16:07:18+0100] INFO - prefect.func_two[2] | Running func_two with arg3
[2021-03-09 16:07:18+0100] INFO - prefect.TaskRunner | Task 'func_two[2]': Finished task run for task with final state: 'Success'
[2021-03-09 16:07:18+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
Konstantinos
03/09/2021, 3:10 PMKonstantinos
03/09/2021, 3:11 PMJim Crist-Harif
03/09/2021, 3:14 PMwith Flow("Test") as flow:
task_func_one = func_one()
task_func_two = func_two.map([1, 2, 3], upstream_tasks=[unmapped(task_func_one)])
Konstantinos
03/09/2021, 3:16 PMJim Crist-Harif
03/09/2021, 3:16 PMunmapped
when passed to .map
, prefect will attempt to map across items in the argument. In your first case, task_func_one
wasn't wrapped in unmapped
, so prefect tried to map across the value (but the value isn't iterable, so you get an error). In the version above we mark it as unmapped
, so no issue. Likewise with calling set_upstream
later, since in that case the argument is not mapped across.Konstantinos
03/09/2021, 3:17 PM