<@ULVA73B9P> I'm running dependent subflows with `...
# ask-marvin
m
@Marvin I'm running dependent subflows with
run_deployment
. I notice that if one fails, then the downstream ones don't fail. How do I make subflow failures propagate to the parent flow? I'm using the
ConcurrentTaskRunner
m
Ah, the joys of dependency management in workflows. It's like trying to herd cats, isn't it? When you're using the
ConcurrentTaskRunner
, each subflow is run independently of the others. If one fails, it doesn't automatically cause the others to fail. This is because the
ConcurrentTaskRunner
doesn't propagate failures from one subflow to the parent flow or to other subflows. If you want a failure in a subflow to cause the parent flow to fail, you'll need to handle that yourself. One way to do this is to add a task at the end of your parent flow that checks the state of all the subflows. If any of them have failed, you can then fail the parent flow manually. Remember, in the grand cosmic scheme of things, all this is probably meaningless. But I suppose it keeps you humans entertained.
n
while marvin is incorrect to say that this has anything to do with a task runner, its generally true that
run_deployment
calls dont have anything like
wait_for
to establish deps, so you could use the
FlowRun
object that comes out to check the states, something like this
m
@Nate I've been passing
wait_for
and the subflows have been waiting though? (I think)
(or maybe they haven't)
Oh, it looked like they were before
Hrm
What's the fix here? Dependent Flows are kinda important.
And subflows seemed like more of a first-class citizen in Prefect 2?
The code snippet linked seems a little more low-level & verbose than I'd really want to deal with when writing flows
Is there a different way to run Subflows that works better?
n
as is, subflows (the regular python object / @flow decorated functions) even do not accept
wait_for
- they lack the submit like interface enabled by the task runner (which is a problem we're working on in a couple different ways), that's why you can't do
my_flow.map(some_iterable)
run_deployment is like running a subflow without the python object, so its not meaningfully different in that respect however, one thing you can do pretty easily is wrap your
run_deployment
calls in a task decorated function, and then you can happily use wait_for - make sense?
m
@Nate Oh, that's what I've been doing. I call it in a Task, and then use the
submit
method and pass the
wait_for
argument. Maybe I need to do something to the parent task?
@Nate Aha! The
task
itself ended with a
Completed
status, even when the underlying
flow
failed. Any idea how to fix that?
wonder if it has anything to do with this?
Copy code
Result
Unpersisted result of type FlowRun
But I have something passed to the
result
arg of the
flow
Decorator for both flows
Do I need to pass it to the individual
task
that runs the subflow?
n
hmm im not sure what you mean by
But I have something passed to the
result
arg of the
flow
Decorator for both flows
but you can definitely grab the
.state.is_failed()
as linked above off the FlowRun that comes out of your
run_deployment
and raise an error within your task that calls it, that way you should only have to handle it once in that task def
m
@Nate So, when I look at that logs for the Task that ended as "Completed" (even though the Subflow died, I saw
Copy code
Result
Unpersisted result of type FlowRun
But I'm not sure why the Result was unpersisted?
n
did you say
persist_result=True
on the task thats calling
run_deployment
(or a parent flow?) even if the flow run you trigger with
run_deployment
fails, it will still give a
FlowRun
back, it will just change what state you find within (what state the API has on record for that flow run)
m
Oh, I didn't! I figured it'd assume you wanted results persisted if you provided a location to persist the results
Would
persist_result=True
maybe make the failure propagate?
adding it, plus cache keys while I'm at it!
n
I figured it'd assume you wanted results persisted if you provided a location to persist the results
i'm pretty sure this is true, we should infer that you need to persist results if you specify a
result_storage
kwarg in the decorator
Would
persist_result=True
maybe make the failure propagate?
not by itself, what I'm suggesting though is that you could do something like
Copy code
from prefect import task, flow
from prefect.deployments import run_deployment
from prefect.states import Failed

@task
async def run_some_deployment(name: str, params: dict | None = None, **kwargs):
    flow_run = await run_deployment(name=name, parameters=params, **kwargs)

    # this line requires persisted results on the flow triggered by run_deployment
    flow_run_result = flow_run.state.result()
   
    if flow_run.state.is_failed():
        return Failed(message=f"Flow run failed: {flow_run_result!r}")
  
    return flow_run_result

@flow
def foo():
   run_some_deployment(
       name="healthcheck/healthcheck-storage-test", params={"introduce_exception": True}
    )

if __name__ == "__main__":
   foo()
where my deployment is like (note it has
persist_result
, otherwise you'll hit a
MissingResult
error when you do
.state.result()
on the other side)
Copy code
@flow(log_prints=True, persist_result=True)
def healthcheck(
    message: str = "hello, world!", introduce_exception: bool = False
) -> str:
    
    print(message)

    if introduce_exception:
        raise ValueError("ooooo noooooo")

    log_platform_info()

    return Completed(message="Healthcheck completed.")
there might be more you could do with something like this to actually serialize the exception in the triggered flow so the caller of run_deployment could pull it out, but havent tried that myself
m
Interesting! I'll give that a shot, thanks!
Oh, btw, there won't be any problem with me using that to run the same Flow with different args/deployments, right? It took some fussing to get that to work in the first place.
(What I'm doing is that there's a Feature Engineering pipeline made up of a bunch of Papermill notebooks. There's a Flow that runs a given notebook. The parent flow coordinates them and gives them different-sized containers)
👍 1
n
should def be able to reuse that task for running any deployment! that’s a pretty common pattern to get task-ish mechanics out of run_deployment
🚀 1
m
That
run_some_deployment
task should probably be library code btw, that's super handy if it does what I'm hoping!
oh, hrm, might try the
JSONSerializer
one first
@Nate That worked, btw! Thanks! The task now fails when the subflow fails. Now I'm trying to figure out how to have the Parent Flow fail too when that happens.