is there a good way to define a flow with `set_dependencies`, but also get its result and pass along...
k
is there a good way to define a flow with
set_dependencies
, but also get its result and pass along to other flows. I.E. mixing and matching imperative vs functional?
k
You have a similar question to Ben right above I think. He wants to subclass Flow so that he can parameterize his 10 identical flows. I don’t have a clear example of subclassing Flow but basically the goal is to do:
Copy code
with MyFlow(...) as flow:
     abc()
and MyFlow here would be something like
Copy code
class MyFlow():
      def __init__():
         # do something
         super().__init__()
Now this won’t explicitly set that
startup()
as upstream. If your Executor supports parallel execution, it might be hard to pull off. There is one syntax that might help you though,
Copy code
with Flow(...) as flow:
     startup()

with flow:
     abc()
and using that you can do:
Copy code
def configure_flow():
    with Flow(...) as flow:
         startup()
    return flow

flow = configure_flow()

with flow:
     other_stuff()
but to be honest, I’m not satisfied because you’ll still have a hard time setting that
startup()
. You likely really need it explicit written in the Flow registration. This is because it’s really an element of the DAG, and having it run when called from another flow is an alteration of the registered DAG if you know what I mean.
k
Thanks for this. I ended up with the following:
Copy code
import os

import prefect
from prefect.engine.state import Pending, Running


def startup_handler(flow, current_state, next_state):
    if isinstance(current_state, Pending) and isinstance(next_state, Running):
        logger = prefect.context.get('logger')
        <http://logger.info|logger.info>('Starting flow run...')
        <http://logger.info|logger.info>('Context: %s', dict(prefect.context))
        <http://logger.info|logger.info>('Env: %s', os.environ)
just added this state handler to my Flow.state_handlers
k
that is a good solution