Eddie
10/18/2021, 2:21 PMwait_for_flow_run
tasks for fail if the child flow run was a failure? I know that wait_for_flow_run
returns a FlowRunView
so I assume it is possible to define another task that raises an exception if the run view state is_failed()
but I am curious if there is already an interface for this behavior in the built-in tasks.Kevin Kho
FlowRunView.state
. The StartFlowRun
task does this here . You can also use StartFlowRun(wait=True)
to achieve this.Kevin Kho
signal_from_state
functionZanie
StartFlowRun
tried to do everything at once but that made it inflexible in the long run. The new tasks are intended to be more discrete so I'm hesitant to add a bool to wait_for_flow_run
to fail on child failure, but it may be a reasonable thing to do still. The alternative is introducing a new raise_flow_run_state
task that raises the state of a child flow run.Eddie
10/18/2021, 3:19 PM@task(name="validate_success")
def validate_success(run: FlowRunView):
if not run.state.is_successful():
raise RuntimeError(
f"Run {run.flow_run_id} of flow {run.flow_id} was not successful."
)
return run
Will post back here if it does what I expect. I'm still getting familiar with Prefect overall.Zanie
from prefect.signals import FAIL
...
raise FAIL("Your messsage")
or re-raise the child flow's state (this copied from StartFlowRun
exc = signal_from_state(flow_run_state)(
f"{flow_run_id} finished in state {flow_run_state}"
)
raise exc
Zanie
Eddie
10/18/2021, 3:24 PMsignal_from_state
is great! Thanks