Hello, Can someone explain how <ConcurrentTaskRunn...
# ask-community
d
Hello, Can someone explain how ConcurrentTaskRunner.submit works. example given:
Copy code
# len(param_dict) => 100
for params in param_dict:
    copy_files.submit(params)
will this create 100 threads or is it using async to run it concurrently in one thread. The reason I am asking is because we're facing issues of hanging flown runs when we submit a larger number of tasks, (in last attempt we had cca 48 tasks and flow just hanged) Any clarification on how it works would be appreciated.
2
j
Hi Denis, ConcurrentTaskRunner uses async to run it concurrently in one thread. 40 is the max due to AnyIO. See discussion from @Zanie in this Slack thread.
👍 1
d
Awesome, thanks!
👍 1
@Jeff Hale any advice on best practice how to prevent it to hang?
j
I don’t have additional advice. 🙂
z
Increasing the default thread limit can help
Using async can help too, we won’t spawn threads for each task then — they’ll run on the event loop.
We’re redesigning how the concurrency is managed though
d
wait, @Zanie does that means that my example above actually fires 100 threads?
I changed flow to async and now some tasks complete but several fails due to error:
Crash detected! Execution was interrupted by an unexpected exception: PrefectHTTPStatusError: Client error '408 Request Timeout
Any idea? @Zanie
z
No it’ll use a pool of ~40 threads but if you have more tasks than that running concurrently we can deadlock waiting for a thread to be available.
👀 1
Using Cloud or a local server? What route was being called?
d
Using Cloud
Copy code
Crash detected! Execution was interrupted by an unexpected exception: PrefectHTTPStatusError: Client error '408 Request Timeout' for url '<https://api.prefect.cloud/api/accounts/60a39fa6-c00d-4e61-8757-a9c1fa7b6f11/workspaces/245aa934-2480-45c4-845e-2f66f1cdf298/task_runs/f533b916-97b3-46d3-88f8-b3a6273a7781/set_state>'
z
That sounds like a problem on our end! I’ll ping one of our Cloud engineers cc @dustin
d
Thank you for looking into it.
a
having this problem too! is it because i’m on the free plan and the connectivity to the cloud backend gets throttled?
v
@Zanie Where the ConcurrentTaskRunner redesign on the roadmap? It is super critical, looks like it can execute really simple tasks in other cases it is super slow or hangs.
1
z
It’s happening right now — it’s my highest priority work.
v
Thank you! Waiting for updates. 🙌
k
Hey @Zanie Hope you're doing well! I've noticed that some of our flows also crash due to
408 Request Timeout
errors. Just like alvin, I'm wondering whether it could be due to us being on the free plan.
z
We don’t prioritize requests based on plan type 🙂
1
That could be a lot of things! Let me grab one of your Cloud engineers to see if we have insights on our end, cc @Zach Angell
1
z
As Zanie mentioned this shouldn’t have to do with plan tier. A 408 is a response that our LoadBalancer will give if it takes too long for the client to finish sending us the body of a request. Receiving a 408 means it took over 30 seconds for the client to send data to the server. This is difficult to debug because it could be anywhere between the client and our load balancer. We’ve seen this so far when 1. A client’s network is fully saturated, preventing any requests from being sent 2. Flow code is performing excessive blocking work (e.g.
time.sleep
) that doesn’t allow our asynchronous client to send data in a timely manner
🙌 2
1
k
Ah that makes a lot of sense
Thank you for the info!
Hey @Zanie, I'm wondering if using many mapped tasks that make fetches via
requests
(as opposed to
httpx
) could be a reason for this error. Zach mentioned above that this could happen when:
Flow code is performing excessive blocking work (e.g.
time.sleep
) that doesn’t allow our asynchronous client to send data in a timely manner
Do you think this could be the case?
z
If you’re performing sync network requests in an async task, then yeah you could be blocking the event loop.