How would I share a variable between two tasks? I ...
# ask-community
t
How would I share a variable between two tasks? I don't want to return it in the result object because the data grain is off, but I need it for an evaluation.
n
Could you declare it as an input?
Copy code
with Flow("flow") as flow:
  shared = "some value"
  task_1(shared=shared)
  task_2(shared=shared)
t
No, it is a value gained from within the task.
There are many clunky ways to get around it, but in Pure Python I would return a tuple, or I would be able to declare a variable in the Python file that is accessible. That doesn't look to be the case in Prefect.
n
Hm, perhaps a tuple solution would work, if you instead configured your result off the tuple and not the task that generates the tuple. Maybe something like this:
Copy code
@task
def task_1():
  # generate your data and the variable of interest
  return (data, variable)

# define your result here instead
@task(result=LocalResult())
def task_2(data):
  return data

@task
def task_3(variable):
  # do something with variable

with Flow("flow") as flow:
  *data, variable = task_1()

  task_2(data)
  task_3(variable)
t
That seems to interact with mapped tasks. When I add the
nout=2
required to have a Tuple return I always get 2 mapped tasks, even if there is only a single value to map
n
Hm yes, Prefect will generate some task runs to track the results of Tuple-unpacking for downstream dependencies, so this:
Copy code
from typing import Tuple

from prefect import Flow, task


@task
def task_1() -> Tuple[list, str]:
    # generate your data and the variable of interest
    data = [1, 2, 3]
    variable = "some variable"
    return (data, variable)


@task
def task_2(data):
    print(data)
    return data


@task
def task_3(variable):
    # do something with variable
    print(variable)


with Flow("flow") as flow:
    data, variable = task_1()
    task_2(data)
    task_3(variable)

flow.run()
outputs:
Copy code
[2021-03-19 15:22:48-0400] INFO - prefect.FlowRunner | Beginning Flow run for 'flow'
[2021-03-19 15:22:48-0400] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2021-03-19 15:22:48-0400] DEBUG - prefect.FlowRunner | Flow 'flow': Handling state change from Scheduled to Running
[2021-03-19 15:22:48-0400] INFO - prefect.TaskRunner | Task 'task_1': Starting task run...
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_1': Handling state change from Pending to Running
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_1': Calling task.run() method...
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_1': Handling state change from Running to Success
[2021-03-19 15:22:48-0400] INFO - prefect.TaskRunner | Task 'task_1': Finished task run for task with final state: 'Success'
[2021-03-19 15:22:48-0400] INFO - prefect.TaskRunner | Task 'task_1[1]': Starting task run...
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_1[1]': Handling state change from Pending to Running
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_1[1]': Calling task.run() method...
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_1[1]': Handling state change from Running to Success
[2021-03-19 15:22:48-0400] INFO - prefect.TaskRunner | Task 'task_1[1]': Finished task run for task with final state: 'Success'
[2021-03-19 15:22:48-0400] INFO - prefect.TaskRunner | Task 'task_1[0]': Starting task run...
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_1[0]': Handling state change from Pending to Running
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_1[0]': Calling task.run() method...
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_1[0]': Handling state change from Running to Success
[2021-03-19 15:22:48-0400] INFO - prefect.TaskRunner | Task 'task_1[0]': Finished task run for task with final state: 'Success'
[2021-03-19 15:22:48-0400] INFO - prefect.TaskRunner | Task 'task_2': Starting task run...
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_2': Handling state change from Pending to Running
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_2': Calling task.run() method...

[1, 2, 3]

[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_2': Handling state change from Running to Success
[2021-03-19 15:22:48-0400] INFO - prefect.TaskRunner | Task 'task_2': Finished task run for task with final state: 'Success'
[2021-03-19 15:22:48-0400] INFO - prefect.TaskRunner | Task 'task_3': Starting task run...
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_3': Handling state change from Pending to Running
[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_3': Calling task.run() method...

some variable

[2021-03-19 15:22:48-0400] DEBUG - prefect.TaskRunner | Task 'task_3': Handling state change from Running to Success
[2021-03-19 15:22:48-0400] INFO - prefect.TaskRunner | Task 'task_3': Finished task run for task with final state: 'Success'
[2021-03-19 15:22:48-0400] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-03-19 15:22:48-0400] DEBUG - prefect.FlowRunner | Flow 'flow': Handling state change from Running to Success
t
right, but when you map
task_2
and then
flatten
the output meant for
task_3
gets put into the
task_4
that has the call to
flatten
n
Sorry @Tim Enders - I'm not sure I'm following; can you explain what you're trying to do?
t
Ah, got it. Thank you
So, I initially tried the multiple return method on a Task. Where you set
nout=X
and then you can return multiple values, with Python doing that via tuple.
return a, b
Then I was unpacking it in the flow
a, b = Task()
That causes issues with the flatten (I would have to set it back up to replicate right now). As
b
for some reason gets stuck in there.
But what you showed me was with an explicit tuple
return (a, b)
and if I do it WITHOUT unpacking
ab = Task()
then
Task(ab[0])
, etc. It treats the output as a single value, but maps and flattens as I was expecting
n
Ahh I see, sorry, I misunderstood what you meant there. Glad you got it sorted!
t
No worries. It seems to be an edge case with how Prefect interprets the Task return
👍 1