Nate Joselson
01/29/2020, 10:03 AMreduce_task()
in the combined flow with
print(mr_flow_state.result[reduction].result)
Then I get the error
Traceback (most recent call last):
File "combine_flow.py", line 14, in <module>
print(mr_flow_state.result[reduction].result)
NameError: name 'reduction' is not defined
This makes sense to me, but is using some kind of cache the answer to getting these results? How do I go about doing that?
Thanks so much!emre
01/29/2020, 10:30 AMreduction
is defined in the module map_reduce_flow
, while your combined flow is in a different module, combine_flow
. You have to expose reduction
somehow, maybe encapsulate your mr_flow.run()
in a function, which returns the result of reduction
after the flow run finishes. Then you can import the function instead of the flow itself, which would return the desired result.from map_reduce_flow import mr_flow, reduction
will probably work as well, but that looks like sketchy design to me.Nate Joselson
01/29/2020, 10:35 AMreduction
isn’t automatically saved to the mr_flow_state
. It is an interesting idea to encapsulate the flow in a function which both runs the flow and returns the output. I am sure that this will solve my problem!
I was wondering, though, if there was a more prefect-y way to do this... Am I totally off that the cache-key
parameter in tasks can be used to cache output to pass between local flows?cache-key
in the docs…emre
01/29/2020, 10:42 AMflow.tasks
and flow.reference_tasks
expose a set of tasks as well, that might be of use to you.Nate Joselson
01/29/2020, 10:47 AMreduce_task
flow.run()
in a function and then calling it in the combined flow runs into the same problem that reduction is a local variable to the mr_flow… How would you go about implementing this function?def mr_flow_run(params):
mr_flow_state = mr_flow.run(parameters=params)
reduction = mr_flow_state.result[reduction].result
return reduction
However, then calling
reduction = mr_flow_run(params={"data":extract_flow_state.result[data].result})
throws an error that reduction is referenced before assignment…prefect.context
to specify where the code should look for variables?
https://docs.prefect.io/api/latest/utilities/context.html#context-2emre
01/29/2020, 11:14 AMdef mr_flow_build_run(params):
with Flow("Map / Reduce 🤓") as mr_flow:
numbers = Parameter('data')
first_map = map_task.map(numbers)
second_map = map_task.map(first_map)
reduction = reduce_task(second_map)
printing = print_task(reduction)
mr_flow_state = mr_flow.run(parameters=params)
reduction = mr_flow_state.result[reduction].result
return reduction
Nate Joselson
01/29/2020, 11:17 AMUnboundLocalError: local variable 'reduction' referenced before assignment
when calling from another file than that defines the flowemre
01/29/2020, 11:31 AMNate Joselson
01/29/2020, 11:33 AMprefect.context.caches
can also do the same thing (possibly in a more “correct” way) but this fix makes the local variables being reused very visible. Thanks!Chris White
01/29/2020, 4:34 PMcache_key
question, Prefect Core currently stores all cached states within prefect.context.caches
with the provided cache_key
as the key (this could change to store to disk in the future, still TBD). At the moment, to share between processes we usually see users storing this dictionary and reloading it for other flows.
That being said, Prefect Cloud (including the free tier) it our recommended multi-flow orchestration layer, which would automatically pass this information between flow runs for you