https://prefect.io logo
Title
m

Matthew Seligson

05/23/2022, 12:46 PM
If I have a StartFlowRun task with wait=True in my flow, and the subflow fails, what is the expected behavior when I restart the parent flow? Should the StartFlowRun schedule a brand new subflow run? Should the subflow restart all failed tasks? Is this documented anywhere?
a

Anna Geller

05/23/2022, 1:36 PM
it's partly documented here but you should really treat it as retriggering an API call to start a flow run rather than restarting a child flow run per se, does it make sense?
m

Matthew Seligson

05/23/2022, 2:35 PM
It seems like the solution you provided schedules an entirely new child flow run; it doesn’t restart the existing child flow run. Am I understanding this correctly?
k

Kevin Kho

05/23/2022, 2:37 PM
Yeah it doesn’t because
StartFlowRun
and
create_flow_run
just hit the API, but if you hit the same flow run with the same idempotency key, it jsut returns the state. It doesn’t restart it. So you would need to restart the child flow run, and then restart the parent flow
m

Matthew Seligson

05/23/2022, 2:47 PM
Why doesn’t StartFlowRun restart the subflow if it already exists? What is the benefit of the caching?
k

Kevin Kho

05/23/2022, 2:51 PM
Outside of flow-of-flows, creating the Flow with the same idempotency key multiple times triggers only one flow run. The
StartFlowRun
task just passes an idempotency key for you if you don’t specify one.
The run is cached for 24 hours. It’s not triggering the flow run again.
k

Kevin Kho

05/23/2022, 2:54 PM
Ah that’s wrong. They don’t expire
But you can trigger another flow run by changing the idempotency key
m

Matthew Seligson

05/23/2022, 2:54 PM
I’d like to keep the same flow run, just have it restart from failed tasks
Rather than start over
k

Kevin Kho

05/23/2022, 2:59 PM
I get that, but Prefect 1 just doesn’t support it by design. There is also no “restart flow run” API route to hit. So the next best thing is the link Anna gave where you use caching on the subflows, and then create a new flow run with an idempotency key and it would use the cached work in the new flow run. Otherwise you need to hit restart on the child run, and then hit restart on the parent flow in the UI. This is a known pain point of Prefect 1, so I think the design of 2.0 will support this use case.
m

Matthew Seligson

05/23/2022, 3:07 PM
Can you point me to where this is supported in 2.0? Also, why is there no restart flow run API route to hit? How does this actually work under the hood if not an API call?
k

Kevin Kho

05/23/2022, 3:15 PM
This is the code for the restart button. It gets all tasks in a Failed state, and then moves them back to scheduled. 2.0 has no restart button yet, but subflows might already have retries. I don’t think it’s documented yet.
m

Matthew Seligson

05/24/2022, 4:35 PM
Hi @Kevin Kho, I am observing that after 24 hours, restarting the parent flow will start entirely new child flow runs. Are you sure that’s not the case?
k

Kevin Kho

05/24/2022, 4:37 PM
That’s very surprising to me. Is your StartFlowRun call passing any idempotency key? No right? I’ll check then what the default key passed is
The default is the task run id which doesn’t change. I guess I could be wrong. I opened a previous issue to update docs to say idempotency keys were permanent though.
I’ll dig a bit
m

Matthew Seligson

05/24/2022, 4:42 PM
It passes the task run id by default right?
k

Kevin Kho

05/24/2022, 4:43 PM
Yeah which is why I don’t expect a new run
Tracing the code path to the database shows them as permanent so pretty surprised by the behavior you see. I can try to replicate today and get back to you tomorrow
I ran a test man. Will restart this tomorrow around 2 PM ET
m

Matthew Seligson

05/24/2022, 5:58 PM
Did this behavior exist at some point in the past? If so, what version?
k

Kevin Kho

05/24/2022, 6:06 PM
That’s less about Prefect version and more about changes to Prefect Cloud so I’ll have to dig 1 sec
Effective Nov 2020. 24 hours was before then
m

Matthew Seligson

05/24/2022, 6:34 PM
Thanks Kevin. You also mentioned that there is no API endpoint for restarting flows. I am in contact with our account manager on this for Prefect 2.0, but what other options do we have for right now? Can we use the API to set the failed task runs to “scheduled” and then try to create the flow run again?
k

Kevin Kho

05/24/2022, 6:47 PM
I know you were opposed to the suggestion, but I would instead recommend using caching on the child row to prevent work, and then you can put a duration on it (cache for the next 24 hours), and then if the idempotency key is dynamic and changes, you just fire off new runs that use the cache. Or yes you can Restart the Child Flow and then Restart the Parent Flow and it will continue run past that Child Flow.
m

Matthew Seligson

05/24/2022, 6:52 PM
Thanks and appreciated Kevin. We don’t have a situation where we’d want the failed state of the child flow to persist nor do we want to fully rerun the subflow. We just want the restart of the parent flow to restart the subflow. Is there any way to do this with the API?
k

Kevin Kho

05/24/2022, 6:55 PM
Not in one go. If I attempted something like that, I would: 1. Move failed states to scheduled for Child Flow 2. Use something like the wait_for_flow_run task on the child flow (but in native Python, just call the .run() method) 3. Move failed states to scheduled for Parent Flow
m

Matthew Seligson

06/21/2022, 12:51 PM
Hi Kevin. I am still seeing the behavior where restarting the flow run 24 hours later is starting an entirely new child flow run. What was the outcome of your test?
k

Kevin Kho

06/21/2022, 1:43 PM
So I just hit restart now on a Flow from May and it triggered the same child flow run instead of creating a new one:
This was my code - default pickle based storage
from prefect import Flow, task
from prefect.tasks.prefect import StartFlowRun

@task
def abc():
    raise ValueError()
    return 1

with Flow("child") as flow:
    abc()

flow.register("databricks")

start = StartFlowRun(flow_name="child", project_name="databricks")

with Flow("parent") as flow2:
    start()

flow2.register("databricks")