Daniel Manson
09/19/2024, 8:58 AMMarvin
09/19/2024, 8:58 AMconcurrency
context manager which you can use to limit the number of certain types of tasks that can run concurrently.
Here's an example of how to use the concurrency
context manager in a flow:
python
from prefect import flow, task, concurrency
import psycopg2
@task
def database_query(query):
with concurrency("database", occupy=1):
connection = psycopg2.connect("<connection_string>")
cursor = connection.cursor()
cursor.execute(query)
result = cursor.fetchall()
connection.close()
return result
@flow
def my_flow():
queries = ["SELECT * FROM table1", "SELECT * FROM table2", "SELECT * FROM table3"]
for query in queries:
database_query.submit(query)
if __name__ == "__main__":
my_flow()
In this example:
- The database_query
task uses the concurrency
context manager to limit the number of concurrent database connections.
- The flow my_flow
submits multiple database_query
tasks, but due to the concurrency limit, only one task will run at a time if the limit is set to 1.
For more details, you can refer to the documentation on task concurrency limits and global concurrency limits.