https://prefect.io logo
r

Ryan Abernathey

12/29/2020, 4:57 PM
Quick q for the prefect devs: I'm curious why task concurrency limits are a cloud feature. This feels to me more like a core feature, since it is presumably something that has to be managed by agents, which are independent of cloud. https://docs.prefect.io/orchestration/concepts/task-concurrency-limiting.html
j

John Grubb

12/29/2020, 4:59 PM
clearly I'm not a prefect dev, but it's pretty common to upsell features like this, regardless of the technical implementation.
j

Jim Crist-Harif

12/29/2020, 5:00 PM
Agents are only responsible for kicking off flow runs, they don't interact with a running flow afterwards. Since concurrency limits apply across flow runs (meaning multiple concurrent flow runs share the limit), this needs to be handled centrally in Prefect Cloud (or Server).
j

Jeremiah

12/29/2020, 5:15 PM
@Ryan Abernathey to follow what Jim said, it’s really the need for a global broker more than anything else (as Prefect’s working assumption is that many different executions of the same flow run / task run can be attempted in physically diverse locations at any time). Therefore, all concurrency features started as Cloud features, where we can exercise that broker. There are PRs for concurrency limits open for Server but we require that features implemented in both Cloud and Server have consistent APIs, and keeping concurrency performant at Cloud’s scale has required multiple special implementations that Server doesn’t have the capability to handle at this time. It has always been our plan to continue moving these Cloud workflow features into the OSS products where possible, but our design priority is to only do so when they can be done in a performant and race-condition free way, so that the quality (and parity) of the feature is maintained.
upvote 1
r

Ryan Abernathey

12/29/2020, 6:01 PM
Gotcha, thanks for the explanation.
Is there a recommended workaround if you just want to limit concurrency within a single flow run? Say I call
map
over 100 objects, but I only want 10 of these tasks to be able to run concurrently...
j

Jim Crist-Harif

12/29/2020, 6:15 PM
You could handle that within your tasks themselves using a
Semaphore
(
threading.Semaphore
if using the local executors,
distributed.Semaphore
if using a
DaskExecutor
).
That would mean that tasks that couldn't run would block and wait for a chance to run, but that's how task concurrency works at the global level as well.
Perhaps we should add an api for this - we could universally handle the concurrency-limit-inside-a-single-flow-run within the executor/flow-runner to make this simpler. Hmmm
upvote 3
Can you give a motivating example for concurrency limits within a single flow run alone? When would this feature be useful?
r

Ryan Abernathey

12/29/2020, 6:18 PM
This is for Pangeo Forge
The use case is where we need to pull 1000 files from an FTP server somewhere
we want to avoid clobbering it
1
m

Marwan Sarieddine

12/29/2020, 6:20 PM
Sorry to chime in like this but we have the same usecase of trying to access an FTP server
j

Jim Crist-Harif

12/29/2020, 6:20 PM
Would you want all tasks to wait for the files to finish? If so, doing this with multiple prefect tasks doesn't get you much, you might be better served running the FTP pull within a single task and handling the load limiting yourself.
r

Ryan Abernathey

12/29/2020, 6:20 PM
Most of the different recipes (which get translated to prefect flows) will point at different upstream servers
Jim I see your point; however, we want to use the same machinery for pulling data e.g. from cloud storage, where we don't care about clobbering and can benefit from distributed parallelism
so we'd like recipe writers to be able to flag a certain
map
operation with an optional concurrency parameter. The recipe writer will presumably know enough about the source to know whether concurrency limits are necessary.
The other reason for creating tasks for each download is to benefit from prefect's retry logic
j

Jim Crist-Harif

12/29/2020, 6:29 PM
Gotcha. I'd handle this with a
distributed.Semaphore
within your tasks for now. Alternatively, you could make use of dask's worker resources. Tasks tagged with tags of the form
dask-resource:KEY=N
will each take
N
amount of
KEY
resource. So you could limit active download tasks by creating a resource for downloading then tagging download tasks to mark that they require that resource.
👍 1
That would mean that the total concurrency limit scales with the number of workers (so it isn't absolute across the whole run), but would also work and wouldn't block other tasks from running like the
Semaphore
would.