datamongus
08/08/2024, 5:24 AMMarvin
08/08/2024, 5:24 AM.submit()
. Here's a high-level overview of how you can achieve this:
1. Using the concurrency
context manager: You can limit the maximum number of concurrent flows or tasks by using the concurrency
context manager to request slots on a concurrency limit, as shown in the Global Concurrency Limits guide.
2. Submitting tasks within flows: You can submit tasks within flows to run multiple tasks concurrently. Here's an excerpt demonstrating how to use .submit()
within a flow:
python
@flow
def my_flow():
queries = ["SELECT * FROM table1", "SELECT * FROM table2", "SELECT * FROM table3"]
for query in queries:
database_query.submit(query)
if __name__ == "__main__":
my_flow()
3. Using async flows: Prefect supports defining async flows and tasks, leveraging Python's asyncio
to run concurrent operations. You can define async functions and call them within an async flow:
Async Flow Code Example:
python
import asyncio
from prefect import flow
async def some_async_function():
await asyncio.sleep(1)
return "I'm an async function"
@flow(log_prints=True) # send `print` statements as INFO logs
async def my_async_workflow():
print("I can call any python code here, including prefect tasks or flows")
await some_async_function()
if __name__ == "__main__":
asyncio.run(my_async_workflow()) # run it like a normal async python function
# or serve it as a long-lived process
# my_async_workflow.serve("my-deployment-name")
4. Using Work Pools and Priority: You can manage the concurrency of flow runs using work pools and setting priorities for flow run submission, as explained in the Work Pools documentation.
By leveraging these methods, you can manage concurrent execution of tasks and flows to suit your workflow needs.datamongus
08/08/2024, 5:29 AMMarvin
08/08/2024, 5:30 AMconcurrency
context manager to set a concurrency limit. Here's how you can implement this:
python
from prefect import flow, task
from prefect.tasks import task_input_hash
import time
# Define a task that will be used in the subflows
@task
def process_data(item):
time.sleep(1) # Simulate some processing delay
return item
# Define the subflow
@flow
def subflow(item):
return process_data(item)
# Define the main flow
@flow
def parent_flow():
items = range(8)
subflow_locks = "subflow_concurrency"
# Register the concurrency limit for subflows
with concurrency(subflow_locks, occupy=4):
subflow_futures = [subflow.submit(item) for item in items]
# Optionally, wait for all subflows to complete
results = [future.result() for future in subflow_futures]
print("Results:", results)
if __name__ == "__main__":
parent_flow()
This example introduces subflow_locks
for concurrency limiting, setting the limit to 4. When the parent_flow
runs, it ensures no more than 4 subflows are executing concurrently.
For further details on concurrency limits, refer to the Global Concurrency Limits guide.datamongus
08/08/2024, 5:57 AMMarvin
08/08/2024, 5:57 AMmy-concurrency-limit
. For instance, limit it to 4 slots:
prefect gcl create my-concurrency-limit --limit 4
2. Configure the Concurrency Limit: You can inspect, update, or delete the concurrency limit using the following commands:
- To inspect the details:
prefect gcl inspect my-concurrency-limit
- To update the limit:
prefect gcl update my-concurrency-limit --limit 6
- To delete the concurrency limit:
prefect gcl delete my-concurrency-limit
3. Implement in Your Flow Code: Use the concurrency context manager within your flow code:
python
from prefect import flow, task
from prefect.concurrent import concurrency
@task
def process_data(item):
# Simulate processing
import time
time.sleep(1)
return f"Processed {item}"
@flow
def subflow(item):
return process_data(item)
@flow
def parent_flow():
items = range(8) # Example list of items to process
subflow_locks = "my-concurrency-limit"
# Use the concurrency context manager
futures = []
with concurrency(subflow_locks, occupy=1): # Ensure using 1 slot per subflow run
for item in items:
futures.append(subflow.submit(item))
# Collect results from subflows
results = [future.result() for future in futures]
print("Results:", results)
if __name__ == "__main__":
parent_flow()
This setup ensures that only 4 subflows can run concurrently at any given time by setting the concurrency limit to 4.
For more detailed information on configuring and managing concurrency limits, refer to the Global Concurrency Limits guide.KG
08/09/2024, 5:27 PMfor i in range(3):
run_deplyment(some_deployment, parameters={"i":i})
datamongus
08/09/2024, 5:28 PMBring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.
Powered by