Eric
07/15/2025, 7:09 PMMarvin
07/15/2025, 7:09 PMMarvin
07/15/2025, 7:10 PMpython
from prefect import flow
@flow(tags=["parallel-subflow"])
def subflow(x):
# your subflow logic here
return x * 2
@flow
def main_flow():
# This will still create all futures at once, but execution will be limited
futures = []
for i in range(10):
futures.append(subflow.submit(i))
# Wait for all results
results = [f.result() for f in futures]
return results
2. Then, create a concurrency limit for the tag using the Prefect CLI:
bash
prefect concurrency-limit create parallel-subflow 3
This will limit the number of concurrent subflows with the tag "parallel-subflow" to 3 at a time. You can adjust the number (3 in this example) to whatever limit makes sense for your VM's resources.
Some important notes:
- The parent flow will still create all the futures immediately, but the actual execution of the subflows will be limited based on the concurrency limit
- Subflows will queue up and execute as slots become available
- You can monitor active concurrency slots using prefect concurrency-limit inspect parallel-subflow
- You can list all concurrency limits using prefect concurrency-limit ls
- If you need to remove the limit, use prefect concurrency-limit delete parallel-subflow
This approach gives you fine-grained control over resource usage while maintaining the parallel execution pattern. The concurrency limits are enforced at the Prefect server level, so they'll work consistently across all your flow runs.
Would you like me to explain any part of this in more detail or show you how to monitor the active concurrency slots?Eric
07/15/2025, 8:43 PMMarvin
07/15/2025, 8:43 PMMarvin
07/15/2025, 8:44 PMbash
# To see details about a specific concurrency limit
prefect concurrency-limit inspect parallel-subflow
# To see all concurrency limits
prefect concurrency-limit ls
2. Better Implementation for Queuing Subflows:
Here's the improved code that will properly queue your subflows using Prefect's concurrency context manager:
python
from prefect import flow
from prefect.concurrency import concurrency
@flow
def subflow(x):
# your subflow logic here
return x * 2
@flow
def main_flow():
results = []
for i in range(10):
# This will properly queue the subflows
with concurrency("my-concurrency-group", max_workers=3):
result = subflow(i)
results.append(result)
return results
The key differences in this implementation:
1. We use the concurrency
context manager from prefect.concurrency
2. The max_workers
parameter directly controls how many subflows can run simultaneously
3. Subflows will properly queue and wait for available slots instead of all entering the running state
You can also monitor the flow runs in the Prefect UI to see:
- Which flows are currently running
- Which flows are queued
- The concurrency limits in effect
Alternative Approach using Work Pool Concurrency:
If you're running your flows via deployments, you can also set concurrency limits at the work pool level:
bash
prefect work-pool create my-pool --concurrency-limit 3
Then deploy your flows to this work pool, and it will automatically handle the concurrency limiting for you.
Would you like me to explain more about either:
1. How to monitor the queued flows in the UI?
2. How to set up work pool-based concurrency limits?
3. How to use more advanced concurrency patterns (like timeouts or different concurrency groups)?Eric
07/15/2025, 8:53 PMMarvin
07/15/2025, 8:53 PMMarvin
07/15/2025, 8:55 PMpython
from prefect import flow
from prefect.concurrency.sync import concurrency_slot
import asyncio
@flow
def subflow(x):
# your subflow logic here
return x * 2
@flow
async def main_flow():
results = []
# Create a semaphore to limit concurrent execution
sem = asyncio.Semaphore(3) # Limit to 3 concurrent subflows
async def run_with_limit(i):
async with sem:
# This will wait for a slot before executing
with concurrency_slot("subflow-group", max_slots=3):
return await subflow(i)
# Create all tasks but they will properly queue
tasks = [run_with_limit(i) for i in range(10)]
# Wait for all tasks to complete
results = await asyncio.gather(*tasks)
return results
# Run the flow
if __name__ == "__main__":
asyncio.run(main_flow())
This solution combines two mechanisms to ensure proper queuing:
1. An asyncio.Semaphore
to control concurrent execution at the Python level
2. Prefect's concurrency_slot
to manage the concurrency at the Prefect level
Key points about this solution:
1. The subflows will actually queue up and wait for their turn, rather than all starting at once
2. You'll see only 3 subflows running at any given time
3. The rest will be in a proper waiting state
4. The parent flow will still coordinate all the subflows but won't launch them all simultaneously
You can monitor the execution in real-time using:
python
@flow
def subflow(x):
print(f"Starting subflow {x}")
# Add some sleep to simulate work
time.sleep(5)
print(f"Finishing subflow {x}")
return x * 2
Alternative Approach using Task-Based Concurrency:
If you prefer a simpler approach without async/await, you can use tasks instead of subflows:
python
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(minutes=5))
def process_item(x):
# Your processing logic here
return x * 2
@flow
def main_flow():
futures = []
with concurrency_slot("process-group", max_slots=3):
for i in range(10):
future = process_item.submit(i)
futures.append(future)
return [f.result() for f in futures]
Would you like me to:
1. Explain how to monitor the queued states in more detail?
2. Show how to add error handling to these patterns?
3. Demonstrate how to add timeout mechanisms to prevent infinite waiting?Eric
07/15/2025, 9:16 PMMarvin
07/15/2025, 9:16 PMMarvin
07/15/2025, 9:18 PMbash
# Create a global concurrency limit that will apply to all flows with a specific tag
prefect gcl create my-subflows-limit 3
2. Then, modify your flows to use this tag:
python
@flow(tags=["my-subflows-limit"])
def subflow(x):
# your subflow logic here
return x * 2
@flow
def main_flow():
futures = []
for i in range(10):
futures.append(subflow.submit(i))
return [f.result() for f in futures]
3. If you're using work pools, you can also set a concurrency limit on the work pool:
bash
# Create a work pool with concurrency limit
prefect work-pool create my-pool --type process --concurrency-limit 3
# Or set concurrency limit on an existing work pool
prefect work-pool set-concurrency-limit my-pool 3
4. Deploy your flows to this work pool:
python
from prefect import serve
if __name__ == "__main__":
main_flow.serve(
name="main-flow",
work_pool="my-pool",
tags=["my-subflows-limit"]
)
To monitor the concurrency limits and queued flows:
bash
# Check global concurrency limits
prefect gcl ls
# Inspect work pool settings
prefect work-pool inspect my-pool
# View active flow runs and their states
prefect flow-run ls
The key differences with this approach:
1. Concurrency is managed at the infrastructure level through global concurrency limits and work pool settings
2. No need for code-level concurrency management
3. Limits are enforced consistently across all deployments
4. Can be modified without changing code
5. Can be managed through the Prefect UI as well
Would you like me to:
1. Show you how to monitor these limits in the Prefect UI?
2. Explain how to set up different limits for different types of subflows?
3. Demonstrate how to temporarily override these limits when needed?
Remember that when using global concurrency limits, you'll need to ensure your flows are properly tagged, but this is a one-time setup in your flow code rather than active concurrency management.Eric
07/16/2025, 8:48 AMThreadPoolTaskRunner
used for? also, how to run concurrently subflows? then i have a task dependant on the success of all these subflows. how to make this task dependant?Marvin
07/16/2025, 8:48 AMMarvin
07/16/2025, 8:51 AMConcurrentTaskRunner
in modern Prefect
2. Running Subflows Concurrently with Dependencies:
Here's a complete example showing how to run subflows concurrently and make a task dependent on their completion:
python
from prefect import flow, task
from typing import List
import asyncio
@task
def process_results(results: List[int]):
"""This task will only run after all subflows complete successfully"""
print(f"Processing results: {results}")
return sum(results)
@flow
def subflow(x: int) -> int:
# Simulate some work
return x * 2
@flow
async def main_flow():
# Create futures for all subflows
futures = []
for i in range(10):
# .submit() runs the subflow asynchronously
future = subflow.submit(i)
futures.append(future)
# Wait for all subflows to complete and collect results
results = await asyncio.gather(*(f.wait() for f in futures))
# Now process_results will only run after all subflows complete successfully
final_result = await process_results.submit(results)
return final_result
# Run the flow
if __name__ == "__main__":
asyncio.run(main_flow())
Alternative approach using as_completed
if you want to process results as they come in:
python
from prefect import flow, task
from prefect.futures import as_completed
@flow
async def main_flow():
# Submit all subflows
futures = [subflow.submit(i) for i in range(10)]
# Process results as they complete
results = []
for future in as_completed(futures):
result = await future.wait()
results.append(result)
# Process final results
final_result = await process_results.submit(results)
return final_result
3. Using ThreadPoolTaskRunner Explicitly:
If you want to use ThreadPoolTaskRunner for your tasks:
python
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task
def io_bound_task(x):
# Some I/O operation like API call or database query
return x * 2
@flow(task_runner=ThreadPoolTaskRunner(max_workers=3))
def main_flow():
results = []
for i in range(10):
result = io_bound_task.submit(i)
results.append(result)
# Wait for all tasks to complete
final_results = [r.result() for r in results]
return final_results
Key points to remember:
1. .submit()
runs flows/tasks asynchronously and returns a future
2. await future.wait()
or future.result()
waits for completion and gets the result
3. Tasks depending on subflows won't start until all required subflows complete successfully
4. ThreadPoolTaskRunner is best for I/O-bound tasks
5. You can control the level of concurrency using max_workers
Would you like me to:
1. Show how to handle errors in concurrent subflows?
2. Explain how to add timeouts to prevent hanging?
3. Demonstrate how to monitor the progress of concurrent subflows?Eric
07/16/2025, 8:55 AM.submit
for flowsMarvin
07/16/2025, 8:55 AMMarvin
07/16/2025, 8:56 AMpython
from prefect import flow, task
import asyncio
@task
def process_results(results):
"""This task will only run after all subflows complete successfully"""
print(f"Processing results: {results}")
return sum(results)
@flow
async def subflow(x: int) -> int:
# Simulate some work
await asyncio.sleep(1) # Simulating some async work
return x * 2
@flow
async def main_flow():
# Create coroutines for all subflows
coroutines = [subflow(i) for i in range(10)]
# Run all subflows concurrently and wait for results
results = await asyncio.gather(*coroutines)
# Process results will only run after all subflows complete
final_result = await process_results(results)
return final_result
# Run the flow
if __name__ == "__main__":
asyncio.run(main_flow())
Or if you need more control over the execution:
python
from prefect import flow, task
import asyncio
from typing import List
@flow
async def subflow(x: int) -> int:
await asyncio.sleep(1) # Simulating some async work
return x * 2
@task
async def process_batch_results(results: List[int]):
print(f"Processing batch results: {results}")
return sum(results)
@flow
async def main_flow():
# Control concurrency by processing in batches
batch_size = 3
all_results = []
for i in range(0, 10, batch_size):
batch = range(i, min(i + batch_size, 10))
# Run batch of subflows concurrently
batch_results = await asyncio.gather(
*(subflow(x) for x in batch)
)
all_results.extend(batch_results)
print(f"Completed batch {i//batch_size + 1}")
# Process all results
final_result = await process_batch_results(all_results)
return final_result
For non-async flows, you can use:
python
from prefect import flow, task
from concurrent.futures import ThreadPoolExecutor
from typing import List
@flow
def subflow(x: int) -> int:
return x * 2
@task
def process_results(results: List[int]):
return sum(results)
@flow
def main_flow():
# Use ThreadPoolExecutor to run subflows concurrently
with ThreadPoolExecutor(max_workers=3) as executor:
# Submit all subflows to the executor
futures = [executor.submit(subflow, i) for i in range(10)]
# Gather results as they complete
results = [f.result() for f in futures]
# Process results after all subflows complete
final_result = process_results(results)
return final_result
Key points about these implementations:
1. Use asyncio.gather()
for concurrent async subflows
2. Use ThreadPoolExecutor
for concurrent non-async subflows
3. Control concurrency by:
- Setting batch sizes
- Using max_workers
with ThreadPoolExecutor
- Using semaphores with async flows
4. Dependencies are naturally handled - tasks won't start until their dependent subflows complete
Would you like me to:
1. Show how to add error handling to these patterns?
2. Demonstrate how to monitor the progress of concurrent subflows?
3. Explain how to add timeouts to prevent hanging?Eric
07/16/2025, 9:50 AMMarvin
07/16/2025, 9:50 AMMarvin
07/16/2025, 9:54 AMpython
from prefect import task
@task
def good_task_returns():
# Basic Python types
return 42 # integers
return 3.14 # floats
return "hello" # strings
return True # booleans
# Collections of basic types
return [1, 2, 3] # lists
return {"a": 1, "b": 2} # dictionaries
return (1, 2, 3) # tuples
return {1, 2, 3} # sets
# Nested structures
return {
"numbers": [1, 2, 3],
"metadata": {
"name": "test",
"success": True
}
}
# Pandas DataFrames (will be serialized)
import pandas as pd
return pd.DataFrame({"a": [1, 2, 3]})
# NumPy arrays
import numpy as np
return np.array([1, 2, 3])
❌ Should Not Return:
python
@task
def bad_task_returns():
# Don't return these:
return lambda x: x + 1 # functions
return open("file.txt") # file handles
return database_connection # database connections
return threading.Lock() # thread locks
### Flows
✅ Can Return:
python
from prefect import flow
@flow
def good_flow_returns():
# Everything that tasks can return
return 42
return [1, 2, 3]
return {"results": "success"}
# Results from tasks
result = my_task()
return result
# Multiple task results
results = [task1(), task2(), task3()]
return results
# Structured results
return {
"task1_result": task1(),
"task2_result": task2(),
"metadata": {
"flow_name": "example",
"timestamp": "2023-01-01"
}
}
### Best Practices
1. Keep it Serializable:
python
@task
def process_data(data):
# Good: Return processed data
return {"processed_values": data, "status": "success"}
# Bad: Don't return objects that can't be serialized
return MySQLConnection()
2. Structure Complex Returns:
python
@flow
def analysis_flow():
# Good: Structured, clear return value
return {
"metrics": {
"accuracy": 0.95,
"precision": 0.92
},
"metadata": {
"model_version": "1.0",
"timestamp": "2023-01-01"
},
"data_summary": {
"rows_processed": 1000,
"errors": []
}
}
3. Handle Large Data:
python
@task
def process_large_dataset():
# Good: Save large data to storage and return reference
results = process_big_data()
save_to_storage(results)
return {
"storage_location": "<s3://bucket/results.parquet>",
"row_count": len(results),
"status": "complete"
}
4. Error Information:
python
@task
def data_validation():
errors = []
warnings = []
# ... validation logic ...
# Good: Return structured validation results
return {
"is_valid": len(errors) == 0,
"errors": errors,
"warnings": warnings,
"records_checked": 1000
}
Key Points to Remember:
1. All returns must be serializable
2. Structure complex returns as dictionaries for clarity
3. For large data, consider storing in external storage and returning references
4. Include metadata when relevant (timestamps, versions, counts)
5. Return values should be meaningful for downstream tasks/flows
Would you like me to:
1. Show how to handle non-serializable objects properly?
2. Demonstrate patterns for returning large datasets?
3. Explain how to use Prefect's result storage for different return types?