gperrone
05/03/2023, 5:52 PMCrash detected! Execution was cancelled by the runtime environment
error log with no other context. What is everyone’s experience with submitting a large amount of tasks? I have a different flow that submits about 1000 concurrent tasks and I have not had any problems with that onegperrone
05/03/2023, 6:26 PMgperrone
05/03/2023, 8:22 PMTraceback (most recent call last):
File "/Users/gperrone/.pyenv/versions/3.8.12/lib/python3.8/contextlib.py", line 189, in __aexit__
await self.gen.athrow(typ, value, traceback)
File "/Users/gperrone/.pyenv/versions/dasa-etl/lib/python3.8/site-packages/prefect/task_runners.py", line 165, in start
yield self
File "/Users/gperrone/.pyenv/versions/dasa-etl/lib/python3.8/site-packages/prefect/engine.py", line 388, in begin_flow_run
terminal_or_paused_state = await orchestrate_flow_run(
File "/Users/gperrone/.pyenv/versions/dasa-etl/lib/python3.8/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
result = await flow_call.aresult()
File "/Users/gperrone/.pyenv/versions/dasa-etl/lib/python3.8/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
return await asyncio.wrap_future(self.future)
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
KeyError: 13033
Bianca Hoch
05/03/2023, 8:25 PM429
response in the logs if that is the case.gperrone
05/03/2023, 8:52 PMBianca Hoch
05/04/2023, 5:21 PMAnthony Desmier
05/30/2023, 10:01 AMCrash details:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/anyio/_core/_synchronization.py", line 314, in acquire
self.acquire_nowait()
File "/usr/local/lib/python3.9/site-packages/anyio/_core/_synchronization.py", line 342, in acquire_nowait
raise WouldBlock
anyio.WouldBlock
During handling of the above exception, another exception occurred:
We receive multiple of these - I assume 1 for every task that was submitted? For a bit of context our flow does submit around 450 tasks in one go. This is run in a personal account but we don't see any 429 error codes and this flow runs successfully 90% of the time.
Below is the section of flow code that submits and executes the task runs:
get_table_futures = get_table.map(
sf_object=sf_objects_to_query,
sf_session=unmapped(salesforce_session),
base_request_params=unmapped(base_request_params),
s3_prefix=s3_prefix,
bucket=bucket,
is_prod=is_prod,
)
for f in get_table_futures:
f.result(raise_on_failure=False)
This may not be the most efficient implementation here, but our goal is to have the flow continue on any failure of the get_table task and for this task to be retried up to 3 times. My understanding is that returning the results of the map function, the tasks get submitted but are only executed (up to our task concurrency limit) once the result
function is called. At this point we can specify that we don't want to raise on any failure. This appears to be what is happening when reading through the logs.
This does work around 90% of the time, but when it does fail it crashes midway through submitted the tasks, so it feels like it's not even reached the 400 limit before to crashes. We run on ECS Fargate and I've check resource utilisation of the container and it's fairly minimal cpu/mem/network utilisation.
Wondering if you could suggest any debugging steps of perhaps any improvements on our current flow implementation?Anthony Desmier
06/09/2023, 11:05 AM