Daniel Manson
09/19/2024, 8:58 AMMarvin
09/19/2024, 8:58 AMconcurrency context manager which you can use to limit the number of certain types of tasks that can run concurrently.
Here's an example of how to use the concurrency context manager in a flow:
python
from prefect import flow, task, concurrency
import psycopg2
@task
def database_query(query):
with concurrency("database", occupy=1):
connection = psycopg2.connect("<connection_string>")
cursor = connection.cursor()
cursor.execute(query)
result = cursor.fetchall()
connection.close()
return result
@flow
def my_flow():
queries = ["SELECT * FROM table1", "SELECT * FROM table2", "SELECT * FROM table3"]
for query in queries:
database_query.submit(query)
if __name__ == "__main__":
my_flow()
In this example:
- The database_query task uses the concurrency context manager to limit the number of concurrent database connections.
- The flow my_flow submits multiple database_query tasks, but due to the concurrency limit, only one task will run at a time if the limit is set to 1.
For more details, you can refer to the documentation on task concurrency limits and global concurrency limits.