Kyle McChesney
09/13/2021, 8:28 PMset_dependencies
, but also get its result and pass along to other flows. I.E. mixing and matching imperative vs functional?Kevin Kho
with MyFlow(...) as flow:
abc()
and MyFlow here would be something like
class MyFlow():
def __init__():
# do something
super().__init__()
Now this won’t explicitly set that startup()
as upstream. If your Executor supports parallel execution, it might be hard to pull off. There is one syntax that might help you though,
with Flow(...) as flow:
startup()
with flow:
abc()
and using that you can do:
def configure_flow():
with Flow(...) as flow:
startup()
return flow
flow = configure_flow()
with flow:
other_stuff()
but to be honest, I’m not satisfied because you’ll still have a hard time setting that startup()
. You likely really need it explicit written in the Flow registration. This is because it’s really an element of the DAG, and having it run when called from another flow is an alteration of the registered DAG if you know what I mean.Kyle McChesney
09/13/2021, 8:42 PMimport os
import prefect
from prefect.engine.state import Pending, Running
def startup_handler(flow, current_state, next_state):
if isinstance(current_state, Pending) and isinstance(next_state, Running):
logger = prefect.context.get('logger')
<http://logger.info|logger.info>('Starting flow run...')
<http://logger.info|logger.info>('Context: %s', dict(prefect.context))
<http://logger.info|logger.info>('Env: %s', os.environ)
Kyle McChesney
09/13/2021, 8:42 PMKevin Kho
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by