Brian Phillips
03/10/2022, 2:35 PMAn error occurred (ThrottlingException) when calling the RegisterTaskDefinition operation (reached max retries: 2): Rate exceeded
Kevin Kho
03/10/2022, 2:36 PMBrian Phillips
03/10/2022, 2:39 PMaws ecs describe-tasks
). In Prefect cloud, this flow has been stuck in a submitted state for 40+ minutes. Do you know of a way to handle this better in the agent?
"stopCode": "TaskFailedToStart",
"stoppedReason": "Timeout waiting for network interface provisioning to complete.",
Kevin Kho
03/10/2022, 4:27 PMBrian Phillips
03/10/2022, 4:28 PMKevin Kho
03/10/2022, 4:33 PMAnna Geller
03/10/2022, 4:41 PMBrian Phillips
03/10/2022, 5:16 PM495f390e-8a08-4ae9-8586-d4688b5f5ca7
and child flow run id e1e56e25-e922-49a5-979f-7357da3bc339
.
I ended up manually cancelling the child flow run so the parent flow failed.
This is the code I'm using to kick off the child flows
child_ids_result = create_flow_run.map(
flow_name=unmapped(...),
project_name=unmapped(...),
parameters=parameters_result,
run_name=run_names_result,
)
wait_for_flow_run_result = wait_for_flow_run.map(
flow_run_id=child_ids_result,
stream_states=unmapped(True),
stream_logs=unmapped(True),
raise_final_state=unmapped(True),
upstream_tasks=[unmapped(child_ids_result)]
)
ECSRun(
image=BASE_DOCKER_IMAGE,
cpu=self.__cpu,
memory=self.__memory,
labels=self.__labels,
env=...,
)
Task Kwargs
Body: !Sub
- |
networkConfiguration:
awsvpcConfiguration:
subnets: [${SubnetIds}]
securityGroups: [${SecurityGroupIds}]
assignPublicIp: ENABLED
- SubnetIds: !Join [",", !Ref SubnetIds]
SecurityGroupIds: !Join [",", !Ref SecurityGroupIds]
Flow Definition
Body: !Sub |
containerDefinitions:
- essential: true
image: ${DockerImage}
name: flow
repositoryCredentials:
credentialsParameter: ...
cpu: 1024
memory: 2048
networkMode: awsvpc
requiresCompatibilities: [FARGATE]
Anna Geller
03/10/2022, 5:21 PMupstream_tasks=[unmapped(child_ids_result)]
the self on ECSRun worries me a bit - how do you call it?Brian Phillips
03/10/2022, 5:34 PMclass LocalFlow(Flow):
def __init__(self, cpu, memory, labels, ...):
self.__cpu = cpu
self.__memory = memory
self.__labels = labels
super().__init__(
...,
run_config=Proxy(self._get_run_config),
)
def _get_run_config(self) -> RunConfig:
return ECSRun(...)
2fa5dc27-6975-4739-abe0-d5662427363e
. No attempt to reschedule. I think I may need to implement a custom task to babysit and retry these flow runsKevin Kho
03/10/2022, 6:18 PMClient.set_flow_run_state
might be helpful you for since you will have the ID from the create_flow_run call. Just making sure you know about it.Brian Phillips
03/10/2022, 6:20 PMAnna Geller
03/10/2022, 6:46 PMclient = Client()
client.cancel_flow_run(flow_run_id)