Chandan Purbia
08/23/2024, 5:29 AMmarrrcin
08/26/2024, 12:07 PMflows
are dynamic by default => no DAGs => no way to paralleize automatically and everything needs to be explicitly paralleized.
Even if my flow
is embarrassingly parallel, nothing gets paralleized automatically.
The only way to achieve that is to use <task>.map
, right? https://docs-3.prefect.io/3.0rc/develop/task-runners#mapping-over-iterables
or am I missing something? Thanks upfront 🙇Paweł Biernat
08/26/2024, 1:03 PMPaweł Biernat
08/26/2024, 1:06 PMrun_deployment
and create a customized worker pool with a job template and/or the --limit
option on worker(s).Paweł Biernat
08/26/2024, 1:08 PM.map
to parallelize tasks you could try specifying a different task runner, but I have no experience with that.Paweł Biernat
08/26/2024, 1:10 PMJanet Carson
08/26/2024, 7:48 PMNate
08/26/2024, 9:18 PMconcurrency
context manager to occupy
some number of slots from an existing global concurrency limit
• use submit
inside of the context manager for granular control
• tag based concurrency is still around, but global concurrency limits and the context manager are recommended
to give a pretty simple example, where you have N=100 tasks where a "rolling window" of 10 slots are filled at a time
import asyncio
from prefect import flow, task
from prefect.concurrency.asyncio import concurrency
from prefect.futures import wait
@task
async def sleepy_async_task(duration: int = 5):
return await asyncio.sleep(duration)
@flow
async def my_flow(num_tasks: int = 10, sleep_duration: int = 5):
futures = []
for _ in range(num_tasks):
async with concurrency("only_10_at_a_time", occupy=10):
futures.append(sleepy_async_task.submit(duration=sleep_duration))
wait(futures)
if __name__ == "__main__":
asyncio.run(my_flow(100, 3)) # type: ignore
is this helpful at all? (as I work on making the docs better)
a couple notes
• yes you can use .map
but you'll end up creating the task run in the API before the work happens, ie "artificially long tasks" (perhaps we can revise that DX). the work inside each task will still be concurrency limited through
• same as above goes for tag based concurrencyNate
08/26/2024, 9:31 PMimport asyncio
from prefect import flow, task
@task
async def sleepy_async_task(duration: int = 5):
return await asyncio.sleep(duration)
@flow
async def my_flow(num_tasks: int = 10, sleep_duration: int = 5):
sleepy_async_task.map([sleep_duration] * num_tasks).wait() # type: ignore
if __name__ == "__main__":
asyncio.run(my_flow(100, 3)) # type: ignore
or the same in 2.x if you just replace this line
sleepy_async_task.map([sleep_duration] * num_tasks).wait() # type: ignore
with this
futures = sleepy_async_task.map([sleep_duration] * num_tasks)
[await future.wait() for future in futures]
since we have some convenience utils for this in 3.x that we dont have in 2.xNate
08/26/2024, 9:31 PMmarrrcin
08/27/2024, 11:05 AM