Does Prefect 1.1 support retries on `Flows` ? Spec...
# prefect-community
m
Does Prefect 1.1 support retries on
Flows
? Specifically, I’m trying to workaround some delayed scaling issues with ECS using EC2 instances (not ECS with Fargate tasks) Often, this failure is reported back to Prefect like the following error until Capacity Provider scaling has caught up again:
Copy code
FAIL signal raised: FAIL('a4f09101-0577-41ce-b8b0-31b84f26d855 finished in state <Failed: "Failed to start task for flow run a4f09101-0577-41ce-b8b0-31b84f26d855. Failures: [{\'arn\': \'arn:aws:ecs:us-east-1:<redacted>:container-instance/a8bc98b7c6864874bc6d1138f758e8ea\', \'reason\': \'RESOURCE:CPU\'}]">')
I’m using the following calls to launch the sub-flows (as part of a larger script):
Copy code
flow_a = create_flow_run(flow_name="A", project_name="myles")
wait_for_flow_a = wait_for_flow_run(flow_a, raise_final_state=True, stream_logs=True)
discourse 1
a
there are no flow-level retries - to retry a flow run, you would need to create a new flow run - this is something you could do in a state handler or using Automations Are you on Prefect Cloud?
m
Yep, running on Prefect Cloud.
We’re looking forward to Prefect 2.0 and the formal support for subflows. But, the current experience at beta.prefect.io doesn’t meet our needs yet.
a
Sure, understandable! To sort of retry a flow run you could set an SLA using Automations to ensure that e.g. if your flow run failed to move to a Running state after e.g. 10 minutes (e.g. due to some ECS provisioning issue), you could start a new flow run but not sure if this is the right approach here, let me check sth
👍 1
m
The behavior I’m seeing is that the Parent Flow starts (and we can usually manage making capacity for this ahead of time via Scheduled Scaling Policy in ASGs) but a Child Flow fails to start due to insufficient resources available. Retrying to launch the Child Flow is what I’m hoping to do, while the Parent Flow keeps running.
a
I checked the logs of your flow run and the Automations won't help you here because the flow run fails immediately because Prefect cannot deploy the flow run to ECS due to not enough CPU resources on your EC2 instance: https://github.com/PrefectHQ/prefect/blob/master/src/prefect/agent/ecs/agent.py#L349 I hate to say that because it's a bit lazy answer but the best option going forward here is to ensure that either: • you ensure you have enough capacity on your self-managed EC2 data plane e.g. by implementing a bit more aggressive scaling policy • move to Fargate and not have to worry about infrastructure management, but accept the latency of serverless
managing this on a Prefect side is doable, but a bit hacky and not super elegant. You could e.g. in theory use:
Copy code
from prefect import Flow
from prefect.tasks.prefect import StartFlowRun
from datetime import timedelta

start_flow_run = StartFlowRun(project_name="PROJECT_NAME", wait=True, max_retries=10, retry_delay=timedelta(minutes=5))

with Flow("FLOW_NAME") as flow:
    staging = start_flow_run(flow_name="child_flow_name")
and the
retry_delay
timedelta would respect the time set on your scaling policy (e.g. if it the scale out takes 3-4 minutes then retry of 5 min can make sense) does it make sense?
👀 1
m
Yup, this makes sense and gives me something to play with! I’m trying to avoid moving to Fargate for launching due to existing security vendor deployments which we would also need to update. We might at some point, just can’t do it yet.
👍 1
Ahh, okay, this makes sense now comparing the API of
StartFlowRun
(Task-based) to
create_flow_run
👍 1
@Anna Geller I shared a link to my logs, but I believe this actually is not working as we’d expect it to.
I opened a Github Issue related to this behavior: https://github.com/PrefectHQ/prefect/issues/5610
a
thank you so much for the detailed description, great write-up!
m
BTW, I’m currently testing out the latest code from the repo to confirm if PR 5411 resolves this. Overall it does seem to be! I’m running into a related issue to that test, but it’s not the fault of Prefect Server or the ECS Agent.