Richard Alexander
04/23/2024, 3:17 PMcoros = []
for item in item_list:
coros.append(task1(item , return_state=True))
coros.append(task2(item , return_state=True))
coros.append(task3(item , return_state=True))
coros.append(task4(item , return_state=True))
states = await asyncio.gather(*coros)
For a small item_list
, everything works fine. But if item_list
approaches 100 items, almost all of the async tasks crash. I'm not seeing anything helpful in the logs to indicate why the tasks crash, so I'm wondering how to go about troubleshooting/fixing this async flow for large lists.
Each crashed task shows this error message:
Crash detected! Execution was cancelled by the runtime environment.
How can I figure out why execution was canceled? Is there any other logging source that I should be looking at outside of the Prefect logs? Or do I need to increase my log level for this type of issue? Or do I need to calculate how many workers are needed in the work pool to handle large lists?
Background info (in case it's useful):
• Prefect version 2.16.2
• The compute instance is very large. It has 12 cores and 62G memory.
• The compute and memory footprint of each task is small (each is calling an API and waiting for a response)
• I'm using work pools and all tasks are being sent to the same work pool.Nate
04/23/2024, 4:21 PM.submit
here to leverage the task runner?Nate
04/23/2024, 4:23 PMRichard Alexander
04/23/2024, 5:15 PMflow_obj = run_deployment(
"Search Action Main",
parameters=search_action.dict(),
timeout=0
)
(Timeout 0 so that it returns immediately instead of waiting for flow completion)Richard Alexander
04/23/2024, 5:17 PMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by