Eddie
10/18/2021, 2:21 PMwait_for_flow_run tasks for fail if the child flow run was a failure? I know that wait_for_flow_run returns a FlowRunView so I assume it is possible to define another task that raises an exception if the run view state is_failed() but I am curious if there is already an interface for this behavior in the built-in tasks.Kevin Kho
FlowRunView.state . The StartFlowRun task does this here . You can also use StartFlowRun(wait=True) to achieve this.Kevin Kho
signal_from_state functionZanie
StartFlowRun tried to do everything at once but that made it inflexible in the long run. The new tasks are intended to be more discrete so I'm hesitant to add a bool to wait_for_flow_run to fail on child failure, but it may be a reasonable thing to do still. The alternative is introducing a new raise_flow_run_state task that raises the state of a child flow run.Eddie
10/18/2021, 3:19 PM@task(name="validate_success")
def validate_success(run: FlowRunView):
if not run.state.is_successful():
raise RuntimeError(
f"Run {run.flow_run_id} of flow {run.flow_id} was not successful."
)
return run
Will post back here if it does what I expect. I'm still getting familiar with Prefect overall.Zanie
from prefect.signals import FAIL
...
raise FAIL("Your messsage")
or re-raise the child flow's state (this copied from StartFlowRun
exc = signal_from_state(flow_run_state)(
f"{flow_run_id} finished in state {flow_run_state}"
)
raise excZanie
Eddie
10/18/2021, 3:24 PMsignal_from_state is great! Thanks