https://prefect.io logo
Title
c

Carlo

10/31/2022, 2:22 PM
We are using
run_deployment
w/
SequentialTaskRunner
. However when the first in a chain failed, it didn't block the remaining
run_deployments
. In fact, they ran and the parent completed. How do I ensure the dependencies are honored? Flow definition in thread
1
@flow(name='parent_flow', task_runner=SequentialTaskRunner())
def parent_flow(env: str, for_date: str = 'TODAY'):
    run_date = transform_date(for_date=for_date)
    parameters = {'for_date': run_date, 'env': env}
    run_deployment(name=run_deployment_name(flow=flow_a, env=env), parameters=parameters)
    run_deployment(name=run_deployment_name(flow=flob_b, env=env), parameters=parameters)
    run_deployment(name=run_deployment_name(flow=flow_c, env=env), parameters=parameters)
we want it to run a -> b-> c a failed, the rest passed and including the parent
We just tried wrapping the run_deployments in tasks, an that appears to have helped
👍 2
n

Nate

10/31/2022, 3:27 PM
Hi @Carlo Glad that works for you! Alternatively
run_deployment
returns the
FlowRun
model, where you can find the state like
from prefect import flow
from prefect.deployments import run_deployment

@flow
def orchestrator():
    
    flow_run = run_deployment(
        name="my-flow/testDeploy"
    )
    assert flow_run.state.name == "Completed" # or handle failure states

if __name__ == "__main__":
    orchestrator()
🙌 2
c

Carlo

10/31/2022, 4:20 PM
Thank you