https://prefect.io logo
Title
g

Guido Stein

09/23/2022, 4:14 PM
Is there a way to set a limit of concurrent tasks associated with a prefect flow when adding the concurrent task runner? (v2)
m

Mason Menges

09/23/2022, 4:18 PM
g

Guido Stein

09/23/2022, 4:23 PM
Thanks @Mason Menges, I guess I am a little confused by this because I am not sure where to put this code? I am trying to put everything into a python repo and so I have a deployment file and I have a flow file. Do I now need a client file as well that I need to run prior to running the other code?
m

Mason Menges

09/23/2022, 4:47 PM
Hey @Guido Stein These docs should help clarify, short version you would specify a Tag on a task decorator
@task(name="foo", tags=["concurrencytag"])
def foo():
    print("Prefect_task)
you would then create a concurrency limit for that tag, you can do this through the python client or through the CLI, CLI Example:
prefect concurrency-limit create concurrencytag 5
python example:
from prefect.client import get_client

async with get_client() as client:
    c_limit = await client.create_concurrency_limit(tag="concurrencytag", concurrency_limit=5)
https://docs.prefect.io/concepts/tasks/#tags https://docs.prefect.io/concepts/tasks/#python-client
g

Guido Stein

09/23/2022, 4:55 PM
@Mason Menges is this the workflow that is needed?
import asyncio

from prefect import flow, task
from prefect.client import get_client


@task(tags=["small_instance"])
def task():
    print("-----")


@flow()
def flow():
    task.submit()
    task.submit()
    task.submit()
    task.submit()
    task.submit()
    task.submit()
    task.submit()
    task.submit()
    task.submit()
    task.submit()


async def client_setup():
    async with get_client() as client:
        await client.create_concurrency_limit(tag="small_instance", concurrency_limit=2)


if __name__ == "__main__":
    asyncio.run(client_setup())
    flow()
m

Mason Menges

09/23/2022, 5:21 PM
You cloud do this yes, though it's important to note that concurrency limits are set through the api and then remain until you remove them, so you would really only need to run the concurrency limit command once, which is why we'd generally suggest doing it through the cli.