This is already extremely useful, however, what if...
# prefect-community
n
This is already extremely useful, however, what if I now want to pass the results from imported flows on to a new flow? If I try to print the results of the
reduce_task()
in the combined flow with
Copy code
print(mr_flow_state.result[reduction].result)
Then I get the error
Copy code
Traceback (most recent call last):
  File "combine_flow.py", line 14, in <module>
    print(mr_flow_state.result[reduction].result)
NameError: name 'reduction' is not defined
This makes sense to me, but is using some kind of cache the answer to getting these results? How do I go about doing that? Thanks so much!
e
As far as I understand,
reduction
is defined in the module
map_reduce_flow
, while your combined flow is in a different module,
combine_flow
. You have to expose
reduction
somehow, maybe encapsulate your
mr_flow.run()
in a function, which returns the result of
reduction
after the flow run finishes. Then you can import the function instead of the flow itself, which would return the desired result.
changing your import to
from map_reduce_flow import mr_flow, reduction
will probably work as well, but that looks like sketchy design to me.
n
I understand that
reduction
isn’t automatically saved to the
mr_flow_state
. It is an interesting idea to encapsulate the flow in a function which both runs the flow and returns the output. I am sure that this will solve my problem! I was wondering, though, if there was a more prefect-y way to do this... Am I totally off that the
cache-key
parameter in tasks can be used to cache output to pass between local flows?
I don’t see any use of
cache-key
in the docs…
e
Haven’t used caching yet, so I won’t be able to help on that front. I don’t believe there is much functionality built into Prefect to make multiple flows work with each other, although there has been many talks about it and folks at Prefect might be working on something.
As for exposing reduction,
flow.tasks
and
flow.reference_tasks
expose a set of tasks as well, that might be of use to you.
n
But is there a way to expose the results of tasks? Reduction is a variable saved as the result of running the task
reduce_task
As far as I can tell, the wrapping of
flow.run()
in a function and then calling it in the combined flow runs into the same problem that reduction is a local variable to the mr_flow… How would you go about implementing this function?
I tried as:
Copy code
def mr_flow_run(params):
    mr_flow_state = mr_flow.run(parameters=params)
    reduction = mr_flow_state.result[reduction].result
    return reduction
However, then calling
Copy code
reduction = mr_flow_run(params={"data":extract_flow_state.result[data].result})
throws an error that reduction is referenced before assignment…
Or maybe there is a way to use
prefect.context
to specify where the code should look for variables? https://docs.prefect.io/api/latest/utilities/context.html#context-2
e
What I had in mind was more like:
Copy code
def mr_flow_build_run(params):
    with Flow("Map / Reduce 🤓") as mr_flow:
        numbers = Parameter('data')
        first_map = map_task.map(numbers)
        second_map = map_task.map(first_map)
        reduction = reduce_task(second_map)
        printing = print_task(reduction)
    mr_flow_state = mr_flow.run(parameters=params)
    reduction = mr_flow_state.result[reduction].result
    return reduction
n
Ah, I understand. This still gives the same
Copy code
UnboundLocalError: local variable 'reduction' referenced before assignment
when calling from another file than that defines the flow
I have it working now! Thanks for all the help!
e
No worries! How did you make it work out?
n
I implemented your function wrong! 😅 This was a great fix for saving the variables I need between flows!
I still have a feeling that the
prefect.context.caches
can also do the same thing (possibly in a more “correct” way) but this fix makes the local variables being reused very visible. Thanks!
c
Hey @Nate Joselson - regarding your
cache_key
question, Prefect Core currently stores all cached states within
prefect.context.caches
with the provided
cache_key
as the key (this could change to store to disk in the future, still TBD). At the moment, to share between processes we usually see users storing this dictionary and reloading it for other flows. That being said, Prefect Cloud (including the free tier) it our recommended multi-flow orchestration layer, which would automatically pass this information between flow runs for you