Moishe
03/05/2025, 10:38 PM@task
async def process_data(x, y):
print(f"awaiting {x} + {y}")
async with concurrency("database", occupy=1):
print(f"returning {x} + {y}")
return x + y
@flow
async def my_flow():
async with get_client() as client:
limit_id = await client.create_concurrency_limit(
tag="database",
concurrency_limit=2
)
tasks = []
for x, y in [(1, 2), (2, 3), (3, 4), (4, 5)]:
tasks.append(process_data(x, y))
await asyncio.gather(*tasks)
Marvin
03/05/2025, 10:38 PMconcurrency
context manager. In Prefect, concurrency limits must be explicitly defined before they can be used. Here's a concise breakdown of what you need to do:
1. Create a Concurrency Limit: Ensure that the concurrency limit is created before any tasks attempt to use it. This can be done using the Prefect CLI or API. In your code, you're correctly using the API to create a concurrency limit with client.create_concurrency_limit
.
2. Ensure Proper Flow of Execution: Make sure that the concurrency limit creation is completed before any tasks are executed. This often means ensuring that the create_concurrency_limit
call has been awaited and completed before the tasks are dispatched.
3. Verify the Concurrency Limit: You can verify the existence of a concurrency limit using the Prefect CLI or API to ensure it has been set correctly before your tasks run.
For more details, you can refer to the Prefect documentation on concurrency limits and task run limits.
Additionally, here's a gotcha: Prefect 3.x requires that futures must be resolved by passing them to another task, returning them, or manually calling .result()
or .wait()
. Ensure that your flow handles task results correctly.
If you continue to face issues, double-check that the concurrency limit is created successfully and that your tasks are correctly set up to respect this limit.Moishe
03/05/2025, 10:44 PMprefect gcl create databases --limit 2
it works fine. Am I using the wrong API to create a global concurrency limit?Marvin
03/05/2025, 10:44 PMcreate_concurrency_limit
method you are using might not be the correct one for global limits, or it might not be setting it up correctly for your use case.
Given that creating the concurrency limit via the command line works, it's possible that the CLI handles some setup or context that the API call does not. You might want to continue using the Prefect CLI to set up global concurrency limits before running your flow, as this seems to ensure the correct setup.
For more detailed insights, you might consider checking the Prefect repository on GitHub for issues or discussions related to this functionality, or consult the Prefect community for any undocumented capabilities or workarounds.Moishe
03/05/2025, 10:58 PMcreate_global_concurrency_limit
instead, which takes a GlobalConcurrencyLimitCreate
object.