tried to do everything at once but that made it inflexible in the long run. The new tasks are intended to be more discrete so I'm hesitant to add a bool to
wait_for_flow_run
to fail on child failure, but it may be a reasonable thing to do still. The alternative is introducing a new
raise_flow_run_state
task that raises the state of a child flow run.
e
Eddie
10/18/2021, 3:19 PM
Yea I am testing this at the moment:
Copy code
@task(name="validate_success")
def validate_success(run: FlowRunView):
if not run.state.is_successful():
raise RuntimeError(
f"Run {run.flow_run_id} of flow {run.flow_id} was not successful."
)
return run
Will post back here if it does what I expect. I'm still getting familiar with Prefect overall.
z
Zanie
10/18/2021, 3:22 PM
You can also raise a failure signal directly i.e.
Copy code
from prefect.signals import FAIL
...
raise FAIL("Your messsage")
or re-raise the child flow's state (this copied from
StartFlowRun
Copy code
exc = signal_from_state(flow_run_state)(
f"{flow_run_id} finished in state {flow_run_state}"
)
raise exc
👍 1
Zanie
10/18/2021, 3:23 PM
That looks like it should do what you want though.
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.