Hey, I'm new to prefect and have a question regard...
# ask-community
c
Hey, I'm new to prefect and have a question regarding sharing objects between different flows. I have two different flows,
flow_1
initializes the object and
flow_2
uses that object, but I'm not able to achieve this. This is the example code not exact but similar to what I'm trying to achieve. adding code to thread
k
Hey @Chhaya Vankhede, I think there are a couple of things off here. • The
StartFlowRun
is not actually called. It is a task so it should be used inside the Flow block like:
Copy code
with Flow("parent-flow") as flow:
    flow_a = StartFlowRun(flow_name="flow_a", project_name="my project")
    b = flow_b(upstream_tasks=[flow_a])
flow_a
initializes the
a
and
b
but the they won’t be seen by
flow_b
. This is because tasks are only executed when Flows run, so that
initialise
only happens in
flow_a
. The idea was right there, but the thing is we can’t pass the data between Flow runs. There are a couple of ways to do this, but the Flows are executed as different processes so they don’t share variables. If your Flow requires then to share variables in memory, you should probably just stick to one flow.
• You can use the KV store to persist small pieces of data. This can then be written or read during the Flow run. You can also use the family of tasks built to pass Results between Flow runs. These are
create_flow_run
,
wait_for_flow_run
, and
get_task_run_result
all seen on this page.
You would get the task run result and then pass it over to the next Flow.
Could you move the code to the thread when possible to keep the main channel neater? 🙂
c
Copy code
from prefect import task, Flow
import prefect
from prefect.tasks.prefect import StartFlowRun

a = None
b = None

@task
def initialise():
    global a, b
    a = "hello "
    b = "world!"
    logger = prefect.context.get('logger')

    <http://logger.info|logger.info>("I actually requested this time...")

with Flow("flow_a") as f:
    initialise()

f.register(project_name="my project")
flow_a = StartFlowRun(flow_name="flow_a", project_name="my project")

@task
def add_str():
    global a, b
    logger = prefect.context.get('logger')
    <http://logger.info|logger.info>(a)
    <http://logger.info|logger.info>(b)
    c = a+b
 
    <http://logger.info|logger.info>(c)

with Flow("flow_b") as f:
    add_str()

f.register(project_name="my project")

flow_b = StartFlowRun(flow_name="flow_b", project_name="my project")

with Flow("parent-flow") as flow:
    b = flow_b(upstream_tasks=[flow_a])

flow.run()
Here
a
and
b
are global object which is used in
flow_a
and
flow_b
.
flow_a
and
flow_b
must be different. Any idea what's wrong here?
Let me try that way. Thanks for the help...🙂
k
Thanks!