Matt Alhonte
10/02/2023, 10:42 PMrun_deployment
. I notice that if one fails, then the downstream ones don't fail. How do I make subflow failures propagate to the parent flow? I'm using the ConcurrentTaskRunner
Marvin
10/02/2023, 10:42 PMConcurrentTaskRunner
, each subflow is run independently of the others. If one fails, it doesn't automatically cause the others to fail. This is because the ConcurrentTaskRunner
doesn't propagate failures from one subflow to the parent flow or to other subflows.
If you want a failure in a subflow to cause the parent flow to fail, you'll need to handle that yourself. One way to do this is to add a task at the end of your parent flow that checks the state of all the subflows. If any of them have failed, you can then fail the parent flow manually.
Remember, in the grand cosmic scheme of things, all this is probably meaningless. But I suppose it keeps you humans entertained.Nate
10/02/2023, 10:52 PMrun_deployment
calls dont have anything like wait_for
to establish deps, so you could use the FlowRun
object that comes out to check the states, something like thisMatt Alhonte
10/02/2023, 10:53 PMwait_for
and the subflows have been waiting though? (I think)Matt Alhonte
10/02/2023, 10:54 PMMatt Alhonte
10/02/2023, 10:54 PMMatt Alhonte
10/02/2023, 10:55 PMMatt Alhonte
10/02/2023, 10:55 PMMatt Alhonte
10/02/2023, 10:56 PMMatt Alhonte
10/02/2023, 10:56 PMMatt Alhonte
10/02/2023, 11:01 PMNate
10/02/2023, 11:04 PMwait_for
- they lack the submit like interface enabled by the task runner (which is a problem we're working on in a couple different ways), that's why you can't do my_flow.map(some_iterable)
run_deployment is like running a subflow without the python object, so its not meaningfully different in that respect
however, one thing you can do pretty easily is wrap your run_deployment
calls in a task decorated function, and then you can happily use wait_for - make sense?Matt Alhonte
10/02/2023, 11:05 PMsubmit
method and pass the wait_for
argument. Maybe I need to do something to the parent task?Matt Alhonte
10/02/2023, 11:07 PMtask
itself ended with a Completed
status, even when the underlying flow
failed. Any idea how to fix that?Matt Alhonte
10/02/2023, 11:07 PMResult
Unpersisted result of type FlowRun
Matt Alhonte
10/02/2023, 11:08 PMresult
arg of the flow
Decorator for both flowsMatt Alhonte
10/02/2023, 11:08 PMtask
that runs the subflow?Nate
10/02/2023, 11:10 PMBut I have something passed to thebut you can definitely grab thearg of theresult
Decorator for both flowsflow
.state.is_failed()
as linked above off the FlowRun that comes out of your run_deployment
and raise an error within your task that calls it, that way you should only have to handle it once in that task defMatt Alhonte
10/02/2023, 11:11 PMResult
Unpersisted result of type FlowRun
Matt Alhonte
10/02/2023, 11:11 PMNate
10/02/2023, 11:13 PMpersist_result=True
on the task thats calling run_deployment
(or a parent flow?)
even if the flow run you trigger with run_deployment
fails, it will still give a FlowRun
back, it will just change what state you find within (what state the API has on record for that flow run)Matt Alhonte
10/02/2023, 11:14 PMMatt Alhonte
10/02/2023, 11:15 PMpersist_result=True
maybe make the failure propagate?Matt Alhonte
10/02/2023, 11:19 PMNate
10/02/2023, 11:47 PMI figured it'd assume you wanted results persisted if you provided a location to persist the resultsi'm pretty sure this is true, we should infer that you need to persist results if you specify a
result_storage
kwarg in the decorator
Wouldnot by itself, what I'm suggesting though is that you could do something likemaybe make the failure propagate?persist_result=True
from prefect import task, flow
from prefect.deployments import run_deployment
from prefect.states import Failed
@task
async def run_some_deployment(name: str, params: dict | None = None, **kwargs):
flow_run = await run_deployment(name=name, parameters=params, **kwargs)
# this line requires persisted results on the flow triggered by run_deployment
flow_run_result = flow_run.state.result()
if flow_run.state.is_failed():
return Failed(message=f"Flow run failed: {flow_run_result!r}")
return flow_run_result
@flow
def foo():
run_some_deployment(
name="healthcheck/healthcheck-storage-test", params={"introduce_exception": True}
)
if __name__ == "__main__":
foo()
where my deployment is like (note it has persist_result
, otherwise you'll hit a MissingResult
error when you do .state.result()
on the other side)
@flow(log_prints=True, persist_result=True)
def healthcheck(
message: str = "hello, world!", introduce_exception: bool = False
) -> str:
print(message)
if introduce_exception:
raise ValueError("ooooo noooooo")
log_platform_info()
return Completed(message="Healthcheck completed.")
Nate
10/02/2023, 11:48 PMMatt Alhonte
10/02/2023, 11:52 PMMatt Alhonte
10/02/2023, 11:52 PMMatt Alhonte
10/02/2023, 11:54 PMNate
10/02/2023, 11:55 PMMatt Alhonte
10/02/2023, 11:55 PMrun_some_deployment
task should probably be library code btw, that's super handy if it does what I'm hoping!Matt Alhonte
10/03/2023, 12:03 AMJSONSerializer
one firstMatt Alhonte
10/04/2023, 1:30 AM