Carlo
10/31/2022, 2:22 PMrun_deployment
w/ SequentialTaskRunner
. However when the first in a chain failed, it didn't block the remaining run_deployments
. In fact, they ran and the parent completed. How do I ensure the dependencies are honored? Flow definition in thread@flow(name='parent_flow', task_runner=SequentialTaskRunner())
def parent_flow(env: str, for_date: str = 'TODAY'):
run_date = transform_date(for_date=for_date)
parameters = {'for_date': run_date, 'env': env}
run_deployment(name=run_deployment_name(flow=flow_a, env=env), parameters=parameters)
run_deployment(name=run_deployment_name(flow=flob_b, env=env), parameters=parameters)
run_deployment(name=run_deployment_name(flow=flow_c, env=env), parameters=parameters)
Nate
10/31/2022, 3:27 PMrun_deployment
returns the FlowRun
model, where you can find the state like
from prefect import flow
from prefect.deployments import run_deployment
@flow
def orchestrator():
flow_run = run_deployment(
name="my-flow/testDeploy"
)
assert flow_run.state.name == "Completed" # or handle failure states
if __name__ == "__main__":
orchestrator()
Carlo
10/31/2022, 4:20 PM