Hello everyone, I am trying to understand how to ...
# ask-community
c
Hello everyone, I am trying to understand how to configure concurrency in Prefect V2? From a questioning forum operated by Prefect, I found that we can use tags for creating concurrency. But it did not work for me. The documentation is a little confusing and I am lost. I would appreciate your help 🙇
👀 1
m
+1 to that. It seems like the paralleism is not a default behaviour - since the
flows
are dynamic by default => no DAGs => no way to paralleize automatically and everything needs to be explicitly paralleized. Even if my
flow
is embarrassingly parallel, nothing gets paralleized automatically. The only way to achieve that is to use
<task>.map
, right? https://docs-3.prefect.io/3.0rc/develop/task-runners#mapping-over-iterables or am I missing something? Thanks upfront 🙇
p
All practical solutions implementing concurrency I've seen were based on running task/flow/run_deployment as async functions and implementing some sort of orchestrator by hand. See this repo for a bunch of examples that got me started.
🤦 1
When resources are limited (like n. of GPUs), I found it easiest to queue a bunch of flow runs with
run_deployment
and create a customized worker pool with a job template and/or the
--limit
option on worker(s).
When using
.map
to parallelize tasks you could try specifying a different task runner, but I have no experience with that.
I tried using concurrency slots, but they turned out not to be very practical when the n. of concurrent tasks is high. From my understanding, all tasks start at the same time and keep checking for free concurrency slots clogging, which clogs up my self-hosted server.
j
You can also use task.submit and wait()
n
hey all there's a couple ways to do concurrency (in 2 or 3.x - should be the same mechanics, but better performance in 3.x) i am working right now on making the docs better on this 🙂 but tldr • use the
concurrency
context manager to
occupy
some number of slots from an existing global concurrency limit • use
submit
inside of the context manager for granular control • tag based concurrency is still around, but global concurrency limits and the context manager are recommended to give a pretty simple example, where you have N=100 tasks where a "rolling window" of 10 slots are filled at a time
Copy code
import asyncio

from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
from prefect.futures import wait


@task
async def sleepy_async_task(duration: int = 5):
    return await asyncio.sleep(duration)


@flow
async def my_flow(num_tasks: int = 10, sleep_duration: int = 5):
    futures = []
    for _ in range(num_tasks):
        async with concurrency("only_10_at_a_time", occupy=10):
            futures.append(sleepy_async_task.submit(duration=sleep_duration))

    wait(futures)


if __name__ == "__main__":
    asyncio.run(my_flow(100, 3))  # type: ignore
is this helpful at all? (as I work on making the docs better) a couple notes • yes you can use
.map
but you'll end up creating the task run in the API before the work happens, ie "artificially long tasks" (perhaps we can revise that DX). the work inside each task will still be concurrency limited through • same as above goes for tag based concurrency
also, in case you dont actually care about limiting concurrency, then in 3.x you can just do this
Copy code
import asyncio

from prefect import flow, task


@task
async def sleepy_async_task(duration: int = 5):
    return await asyncio.sleep(duration)


@flow
async def my_flow(num_tasks: int = 10, sleep_duration: int = 5):
    sleepy_async_task.map([sleep_duration] * num_tasks).wait()  # type: ignore


if __name__ == "__main__":
    asyncio.run(my_flow(100, 3))  # type: ignore
or the same in 2.x if you just replace this line
Copy code
sleepy_async_task.map([sleep_duration] * num_tasks).wait()  # type: ignore
with this
Copy code
futures = sleepy_async_task.map([sleep_duration] * num_tasks)
    [await future.wait() for future in futures]
since we have some convenience utils for this in 3.x that we dont have in 2.x
image.png
m
Got it. Thanks!