I'm having trouble with crashing async tasks and I'm not sure how to troubleshoot it. I'm using thi...
r
I'm having trouble with crashing async tasks and I'm not sure how to troubleshoot it. I'm using this pattern within a flow to start the tasks concurrently:
Copy code
coros = []
for item in item_list:
    coros.append(task1(item , return_state=True))
    coros.append(task2(item , return_state=True))
    coros.append(task3(item , return_state=True))
    coros.append(task4(item , return_state=True))

    states = await asyncio.gather(*coros)
For a small
item_list
, everything works fine. But if
item_list
approaches 100 items, almost all of the async tasks crash. I'm not seeing anything helpful in the logs to indicate why the tasks crash, so I'm wondering how to go about troubleshooting/fixing this async flow for large lists. Each crashed task shows this error message:
Copy code
Crash detected! Execution was cancelled by the runtime environment.
How can I figure out why execution was canceled? Is there any other logging source that I should be looking at outside of the Prefect logs? Or do I need to increase my log level for this type of issue? Or do I need to calculate how many workers are needed in the work pool to handle large lists? Background info (in case it's useful): • Prefect version 2.16.2 • The compute instance is very large. It has 12 cores and 62G memory. • The compute and memory footprint of each task is small (each is calling an API and waiting for a response) • I'm using work pools and all tasks are being sent to the same work pool.
n
hi @Richard Alexander - any reason you're not using
.submit
here to leverage the task runner?
can you say more about how you're dispatching this work on the large instance?
r
Hi @Nate! Thanks a ton for the help on this. I'm starting this particular flow from a web app, so it's calling the deployment and then returning immediately. Here's the pattern I'm using:
Copy code
flow_obj = run_deployment(
        "Search Action Main", 
        parameters=search_action.dict(),
        timeout=0
    )
(Timeout 0 so that it returns immediately instead of waiting for flow completion)
What do I need to understand about the task runner vs. what I'm doing? I assumed starting a flow/task was basically the same regardless of the method used...