<@ULVA73B9P> is it possible to run flow’s concurrently like tasks via .submit()
d
@Marvin is it possible to run flow’s concurrently like tasks via .submit()
m
Yes, it is possible to run flows concurrently in Prefect, similar to how you can run tasks concurrently via
.submit()
. Here's a high-level overview of how you can achieve this: 1. Using the
concurrency
context manager
: You can limit the maximum number of concurrent flows or tasks by using the
concurrency
context manager to request slots on a concurrency limit, as shown in the Global Concurrency Limits guide. 2. Submitting tasks within flows: You can submit tasks within flows to run multiple tasks concurrently. Here's an excerpt demonstrating how to use
.submit()
within a flow:
Copy code
python
@flow
def my_flow():
    queries = ["SELECT * FROM table1", "SELECT * FROM table2", "SELECT * FROM table3"]
    for query in queries:
        database_query.submit(query)

if __name__ == "__main__":
    my_flow()
3. Using async flows: Prefect supports defining async flows and tasks, leveraging Python's
asyncio
to run concurrent operations. You can define async functions and call them within an async flow: Async Flow Code Example:
Copy code
python
import asyncio
from prefect import flow

async def some_async_function():
    await asyncio.sleep(1)
    return "I'm an async function"

@flow(log_prints=True)  # send `print` statements as INFO logs
async def my_async_workflow():
    print("I can call any python code here, including prefect tasks or flows")
    await some_async_function()

if __name__ == "__main__":
    asyncio.run(my_async_workflow())  # run it like a normal async python function
    # or serve it as a long-lived process
    # my_async_workflow.serve("my-deployment-name")
4. Using Work Pools and Priority: You can manage the concurrency of flow runs using work pools and setting priorities for flow run submission, as explained in the Work Pools documentation. By leveraging these methods, you can manage concurrent execution of tasks and flows to suit your workflow needs.
d
@Marvin the flow is a subflow that is called within an existing flow. This flow is currently running within a loop which generates 8 subflows. I would like 4 subflows to run concurrently at the same time.
m
To achieve running 4 subflows concurrently within an existing flow, you can use the
concurrency
context manager to set a concurrency limit. Here's how you can implement this:
Copy code
python
from prefect import flow, task
from prefect.tasks import task_input_hash
import time

# Define a task that will be used in the subflows
@task
def process_data(item):
    time.sleep(1)  # Simulate some processing delay
    return item

# Define the subflow
@flow
def subflow(item):
    return process_data(item)

# Define the main flow
@flow
def parent_flow():
    items = range(8)
    subflow_locks = "subflow_concurrency"
    
    # Register the concurrency limit for subflows
    with concurrency(subflow_locks, occupy=4):
        subflow_futures = [subflow.submit(item) for item in items]
        
    # Optionally, wait for all subflows to complete
    results = [future.result() for future in subflow_futures]
    print("Results:", results)

if __name__ == "__main__":
    parent_flow()
This example introduces
subflow_locks
for concurrency limiting, setting the limit to 4. When the
parent_flow
runs, it ensures no more than 4 subflows are executing concurrently. For further details on concurrency limits, refer to the Global Concurrency Limits guide.
d
@Marvin how do I load the concurrency context manager
m
To use the concurrency context manager in Prefect, you first need to create and configure a global concurrency limit. Here’s the step-by-step setup: 1. Create a Concurrency Limit: Use the CLI to create a global concurrency limit called
my-concurrency-limit
. For instance, limit it to 4 slots:
Copy code
prefect gcl create my-concurrency-limit --limit 4
2. Configure the Concurrency Limit: You can inspect, update, or delete the concurrency limit using the following commands: - To inspect the details:
Copy code
prefect gcl inspect my-concurrency-limit
- To update the limit:
Copy code
prefect gcl update my-concurrency-limit --limit 6
- To delete the concurrency limit:
Copy code
prefect gcl delete my-concurrency-limit
3. Implement in Your Flow Code: Use the concurrency context manager within your flow code:
Copy code
python
from prefect import flow, task
from prefect.concurrent import concurrency

@task
def process_data(item):
    # Simulate processing
    import time
    time.sleep(1)
    return f"Processed {item}"

@flow
def subflow(item):
    return process_data(item)

@flow
def parent_flow():
    items = range(8)  # Example list of items to process
    subflow_locks = "my-concurrency-limit"
    
    # Use the concurrency context manager
    futures = []
    with concurrency(subflow_locks, occupy=1):  # Ensure using 1 slot per subflow run
        for item in items:
            futures.append(subflow.submit(item))

    # Collect results from subflows
    results = [future.result() for future in futures]
    print("Results:", results)

if __name__ == "__main__":
    parent_flow()
This setup ensures that only 4 subflows can run concurrently at any given time by setting the concurrency limit to 4. For more detailed information on configuring and managing concurrency limits, refer to the Global Concurrency Limits guide.
k
@datamongus I've been using run_deployment for this. Seems simpler.
Copy code
for i in range(3):
   run_deplyment(some_deployment, parameters={"i":i})
1
d
Thank you