Tim Enders
03/19/2021, 6:58 PMnicholas
with Flow("flow") as flow:
shared = "some value"
task_1(shared=shared)
task_2(shared=shared)
Tim Enders
03/19/2021, 7:00 PMTim Enders
03/19/2021, 7:02 PMnicholas
@task
def task_1():
# generate your data and the variable of interest
return (data, variable)
# define your result here instead
@task(result=LocalResult())
def task_2(data):
return data
@task
def task_3(variable):
# do something with variable
with Flow("flow") as flow:
*data, variable = task_1()
task_2(data)
task_3(variable)
Tim Enders
03/19/2021, 7:17 PMnout=2
required to have a Tuple return I always get 2 mapped tasks, even if there is only a single value to mapnicholas
from typing import Tuple
from prefect import Flow, task
@task
def task_1() -> Tuple[list, str]:
# generate your data and the variable of interest
data = [1, 2, 3]
variable = "some variable"
return (data, variable)
@task
def task_2(data):
print(data)
return data
@task
def task_3(variable):
# do something with variable
print(variable)
with Flow("flow") as flow:
data, variable = task_1()
task_2(data)
task_3(variable)
flow.run()
outputs:
[2021-03-19 15:22:48-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'flow'
[2021-03-19 15:22:48-0400] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2021-03-19 15:22:48-0400] DEBUG - prefect.FlowRunner | Flow 'flow': Handling state change from Scheduled to Running
[2021-03-19 15:22:48-0400] INFO - prefect.TaskRunner | Task 'task_1': Starting task run...
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_1': Handling state change from Pending to Running
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_1': Calling task.run() method...
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_1': Handling state change from Running to Success
[2021-03-19 15:22:48-0400] INFO - prefect.TaskRunner | Task 'task_1': Finished task run for task with final state: 'Success'
[2021-03-19 15:22:48-0400] INFO - prefect.TaskRunner | Task 'task_1[1]': Starting task run...
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_1[1]': Handling state change from Pending to Running
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_1[1]': Calling task.run() method...
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_1[1]': Handling state change from Running to Success
[2021-03-19 15:22:48-0400] INFO - prefect.TaskRunner | Task 'task_1[1]': Finished task run for task with final state: 'Success'
[2021-03-19 15:22:48-0400] INFO - prefect.TaskRunner | Task 'task_1[0]': Starting task run...
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_1[0]': Handling state change from Pending to Running
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_1[0]': Calling task.run() method...
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_1[0]': Handling state change from Running to Success
[2021-03-19 15:22:48-0400] INFO - prefect.TaskRunner | Task 'task_1[0]': Finished task run for task with final state: 'Success'
[2021-03-19 15:22:48-0400] INFO - prefect.TaskRunner | Task 'task_2': Starting task run...
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_2': Handling state change from Pending to Running
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_2': Calling task.run() method...
[1, 2, 3]
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_2': Handling state change from Running to Success
[2021-03-19 15:22:48-0400] INFO - prefect.TaskRunner | Task 'task_2': Finished task run for task with final state: 'Success'
[2021-03-19 15:22:48-0400] INFO - prefect.TaskRunner | Task 'task_3': Starting task run...
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_3': Handling state change from Pending to Running
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_3': Calling task.run() method...
some variable
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_3': Handling state change from Running to Success
[2021-03-19 15:22:48-0400] INFO - prefect.TaskRunner | Task 'task_3': Finished task run for task with final state: 'Success'
[2021-03-19 15:22:48-0400] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-03-19 15:22:48-0400] DEBUG - prefect.FlowRunner | Flow 'flow': Handling state change from Running to Success
Tim Enders
03/19/2021, 7:28 PMtask_2
and then flatten
the output meant for task_3
gets put into the task_4
that has the call to flatten
nicholas
Tim Enders
03/19/2021, 7:37 PMTim Enders
03/19/2021, 7:38 PMnout=X
and then you can return multiple values, with Python doing that via tuple. return a, b
Then I was unpacking it in the flow a, b = Task()
Tim Enders
03/19/2021, 7:39 PMb
for some reason gets stuck in there.Tim Enders
03/19/2021, 7:40 PMreturn (a, b)
and if I do it WITHOUT unpacking ab = Task()
then Task(ab[0])
, etc. It treats the output as a single value, but maps and flattens as I was expectingnicholas
Tim Enders
03/19/2021, 7:42 PM