Hey <@ULVA73B9P> I'm trying to use concurrency lim...
# ask-marvin
m
Hey @Marvin I'm trying to use concurrency limits and I want to set them up dynamically from within my Python code. This fails and tells me "Concurrency limits['database'] do not exist". Any idea why? Here's the code:
Copy code
@task
async def process_data(x, y):
    print(f"awaiting {x} + {y}")
    async with concurrency("database", occupy=1):
        print(f"returning {x} + {y}")
        return x + y


@flow
async def my_flow():
    async with get_client() as client:
        limit_id = await client.create_concurrency_limit(
            tag="database",
            concurrency_limit=2
            )

    tasks = []
    for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
        tasks.append(process_data(x, y))

    await asyncio.gather(*tasks)
m
Your issue arises because concurrency limits are not implicitly created when using the
concurrency
context manager. In Prefect, concurrency limits must be explicitly defined before they can be used. Here's a concise breakdown of what you need to do: 1. Create a Concurrency Limit: Ensure that the concurrency limit is created before any tasks attempt to use it. This can be done using the Prefect CLI or API. In your code, you're correctly using the API to create a concurrency limit with
client.create_concurrency_limit
. 2. Ensure Proper Flow of Execution: Make sure that the concurrency limit creation is completed before any tasks are executed. This often means ensuring that the
create_concurrency_limit
call has been awaited and completed before the tasks are dispatched. 3. Verify the Concurrency Limit: You can verify the existence of a concurrency limit using the Prefect CLI or API to ensure it has been set correctly before your tasks run. For more details, you can refer to the Prefect documentation on concurrency limits and task run limits. Additionally, here's a gotcha: Prefect 3.x requires that futures must be resolved by passing them to another task, returning them, or manually calling
.result()
or
.wait()
. Ensure that your flow handles task results correctly. If you continue to face issues, double-check that the concurrency limit is created successfully and that your tasks are correctly set up to respect this limit.
m
@Marvin Hmm, I must be confused about this. If I create the concurrency limit via the command line before running the program with
prefect gcl create databases --limit 2
it works fine. Am I using the wrong API to create a global concurrency limit?
m
It seems that there is no direct method available in the Prefect Python client to create a global concurrency limit dynamically from within your code. The
create_concurrency_limit
method you are using might not be the correct one for global limits, or it might not be setting it up correctly for your use case. Given that creating the concurrency limit via the command line works, it's possible that the CLI handles some setup or context that the API call does not. You might want to continue using the Prefect CLI to set up global concurrency limits before running your flow, as this seems to ensure the correct setup. For more detailed insights, you might consider checking the Prefect repository on GitHub for issues or discussions related to this functionality, or consult the Prefect community for any undocumented capabilities or workarounds.
m
aha I think I solved this: I need to use
create_global_concurrency_limit
instead, which takes a
GlobalConcurrencyLimitCreate
object.