Julius Faber
07/15/2025, 8:59 AMMarvin
07/15/2025, 8:59 AMMarvin
07/15/2025, 9:00 AMpython
from prefect import flow, task
import asyncio
from typing import Dict, List, Optional
# Example tree structure (you can modify this to match your actual data structure)
class TreeNode:
def __init__(self, value: str, children: Optional[List['TreeNode']] = None):
self.value = value
self.children = children or []
@flow(name="process-single-node")
async def process_node(node_value: str) -> str:
"""This is the subflow that processes a single node"""
# Replace this with your actual node processing logic
await asyncio.sleep(1) # Simulate some work
return f"Processed node: {node_value}"
@flow(name="process-tree")
async def process_tree(node: TreeNode) -> List[str]:
"""Main flow that recursively processes the tree structure"""
# Process current node
current_node_result = await process_node(node.value)
# If there are no children, return just this node's result
if not node.children:
return [current_node_result]
# Process all children in parallel using asyncio.gather
child_tasks = [
process_tree(child)
for child in node.children
]
# Wait for all child results
child_results = await asyncio.gather(*child_tasks)
# Flatten results and combine with current node result
all_results = [current_node_result]
for result_list in child_results:
all_results.extend(result_list)
return all_results
# Example usage
@flow(name="main-tree-processor")
async def main():
# Create a sample tree
tree = TreeNode("root", [
TreeNode("A", [
TreeNode("A1"),
TreeNode("A2")
]),
TreeNode("B", [
TreeNode("B1"),
TreeNode("B2")
])
])
# Process the tree
results = await process_tree(tree)
# Print results
for result in results:
print(result)
# Run the flow
if __name__ == "__main__":
asyncio.run(main())
This example demonstrates several important concepts:
1. Tree Structure: We define a simple TreeNode
class to represent our tree structure. You can modify this to match your specific needs.
2. Subflow Definition: The process_node
flow is our subflow that processes individual nodes. You should replace its contents with your actual node processing logic.
3. Parallel Processing: The process_tree
flow uses asyncio.gather
to process child nodes in parallel. This ensures that all children at the same level are processed concurrently.
4. Recursive Pattern: The flow recursively traverses the tree structure, ensuring that all nodes are processed.
When you run this script, it will:
- Process the root node
- Concurrently process all its children
- For each child, recursively process its children in parallel
- Collect and return all results
The output will look something like:
Processed node: root
Processed node: A
Processed node: B
Processed node: A1
Processed node: A2
Processed node: B1
Processed node: B2
Some important notes:
1. This example uses async/await patterns, which is recommended for concurrent execution in Prefect 3.x.
2. The subflows run in the same process by default. If you need them to run in separate processes or machines, you would need to create deployments for them.
3. You can add error handling and retries by using Prefect's built-in retry mechanisms:
python
@flow(name="process-single-node", retries=3, retry_delay_seconds=10)
async def process_node(node_value: str) -> str:
# ... your code ...
4. The Prefect UI will show you the full hierarchy of flow runs, making it easy to monitor and debug the execution.Marvin
07/15/2025, 9:00 AMJulius Faber
07/15/2025, 9:02 AMJulius Faber
07/15/2025, 9:02 AMMarvin
07/15/2025, 9:03 AMMarvin
07/15/2025, 9:03 AMDaskTaskRunner
for parallel execution. This approach will process children before their parents and won't require asyncio:
python
from prefect import flow, task
from prefect.task_runners import DaskTaskRunner
from typing import List, Optional
from concurrent.futures import Future
class TreeNode:
def __init__(self, value: str, children: Optional[List['TreeNode']] = None):
self.value = value
self.children = children or []
@task
def process_node(node_value: str) -> str:
"""Task that processes a single node"""
# Replace this with your actual node processing logic
return f"Processed node: {node_value}"
@flow(name="process-tree", task_runner=DaskTaskRunner())
def process_tree(node: TreeNode) -> List[str]:
"""Main flow that recursively processes the tree structure"""
results = []
# First, process all children recursively
child_results = []
for child in node.children:
# Recursively process each child and collect results
child_result = process_tree(child)
child_results.extend(child_result)
# Add all child results first (maintaining bottom-up order)
results.extend(child_results)
# Process current node after children
current_result = process_node.submit(node.value)
results.append(current_result.result())
return results
@flow(name="main-tree-processor")
def main():
# Create a sample tree
tree = TreeNode("root", [
TreeNode("A", [
TreeNode("A1"),
TreeNode("A2")
]),
TreeNode("B", [
TreeNode("B1"),
TreeNode("B2")
])
])
# Process the tree
results = process_tree(tree)
# Print results
for result in results:
print(result)
if __name__ == "__main__":
main()
Key changes and explanations:
1. Task vs Flow:
- process_node
is now a @task
instead of a @flow
- We can use .submit()
on tasks but not on flows
- Tasks within a flow will run in parallel when using the DaskTaskRunner
2. Parallel Execution:
- We're using DaskTaskRunner()
which handles parallel execution of tasks
- The submit()
method on tasks returns a Future object that we can wait for using .result()
3. Bottom-up Processing:
- The code now processes children before their parent nodes
- Results are collected in order: children first, then parent
4. Simplified Structure:
- Removed async/await patterns
- Using synchronous code that's easier to understand and debug
The output will now show nodes being processed from bottom to top:
Processed node: A1
Processed node: A2
Processed node: A
Processed node: B1
Processed node: B2
Processed node: B
Processed node: root
For more complex scenarios, you could add additional features:
1. Add retries to the task:
python
@task(retries=3, retry_delay_seconds=10)
def process_node(node_value: str) -> str:
# ... your code ...
2. Add parallel processing of siblings:
python
@flow(name="process-tree", task_runner=DaskTaskRunner())
def process_tree(node: TreeNode) -> List[str]:
results = []
# Process all children recursively
child_futures = []
for child in node.children:
# Start processing each child branch
child_future = process_tree(child)
child_futures.append(child_future)
# Collect all child results
for future in child_futures:
results.extend(future)
# Process current node after children
current_result = process_node.submit(node.value)
results.append(current_result.result())
return results
3. Add logging or monitoring:
```python
@task
def process_node(node_value: str) -> str:
from prefect import get_run_logger
logger = get_run_logger()
logger.info(f"Starting to process node: {node_value}")
result = f"Processed node: {node_value}"Marvin
07/15/2025, 9:03 AMJulius Faber
07/15/2025, 9:09 AMMarvin
07/15/2025, 9:09 AMMarvin
07/15/2025, 9:11 AMpython
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
@task
def my_task(x):
return x * 2
@flow(task_runner=ConcurrentTaskRunner())
def my_flow():
# These tasks will run in parallel
results = [my_task.submit(i) for i in range(10)]
# Get all results
return [r.result() for r in results]
2. Flow-Level Parallelism (Multiple Flow Runs)
python
from prefect import flow
@flow
def my_subflow(x):
return x * 2
@flow
def parent_flow():
# These subflows will run in parallel
futures = []
for i in range(10):
future = my_subflow.submit(i)
futures.append(future)
# Wait for all results
return [f.result() for f in futures]
### Task Runners Explained
You don't necessarily need DaskTaskRunner for parallel execution. Prefect offers several task runners:
1. ConcurrentTaskRunner (Default)
python
from prefect.task_runners import ConcurrentTaskRunner
@flow(task_runner=ConcurrentTaskRunner(max_workers=4))
def my_flow():
# Tasks will run in parallel using threads
pass
2. DaskTaskRunner
python
from prefect.task_runners import DaskTaskRunner
@flow(task_runner=DaskTaskRunner())
def my_flow():
# Tasks will run in parallel using Dask (multiple processes/machines)
pass
3. SequentialTaskRunner
python
from prefect.task_runners import SequentialTaskRunner
@flow(task_runner=SequentialTaskRunner())
def my_flow():
# Tasks will run sequentially (no parallelism)
pass
### When to Use Each Task Runner
1. ConcurrentTaskRunner (Default)
- Best for: I/O-bound tasks (API calls, database operations)
- Uses: Python threads
- Limitation: Doesn't bypass Python's GIL
- Example use case:
python
@flow(task_runner=ConcurrentTaskRunner())
def download_files():
futures = []
for url in urls:
future = download_task.submit(url)
futures.append(future)
return [f.result() for f in futures]
2. DaskTaskRunner
- Best for: CPU-bound tasks or distributed computing
- Uses: Multiple processes/machines
- Advantage: True parallelism, can use multiple cores/machines
- Example use case:
python
@flow(task_runner=DaskTaskRunner())
def process_large_datasets():
futures = []
for dataset in large_datasets:
future = heavy_computation_task.submit(dataset)
futures.append(future)
return [f.result() for f in futures]
### Flow-Level Parallelism Without Task Runners
You can achieve parallel execution of flows without specifying any task runner:
python
from prefect import flow
@flow
def process_item(item):
return item * 2
@flow
def main_flow():
# These subflows will run in parallel regardless of task runner
futures = []
for i in range(10):
# .submit() on a flow creates a new flow run
future = process_item.submit(i)
futures.append(future)
# Collect results
results = [f.result() for f in futures]
return results
### Key Points to Remember:
1. Flow.submit()
- Creates a new flow run
- Runs in parallel by default
- Returns a future object
- No task runner required for flow-level parallelism
2. Task.submit()
- Submits a task for execution
- Parallelism depends on the task runner
- Returns a future object
- Requires appropriate task runner for parallel execution
3. Choosing Parallelism Level
- Use task-level parallelism for fine-grained control within a flow
- Use flow-level parallelism for independent, larger units of work
- Can combine both approaches when needed
Would you like me to elaborate on any of these aspects or provide more specific examples for your use case?Julius Faber
07/15/2025, 9:35 AMMarvin
07/15/2025, 9:35 AMMarvin
07/15/2025, 9:36 AMpython
from prefect import flow, task
@flow
def subflow(x: int):
return x * 2
@task
def my_task(x: int):
# Directly call the flow
result = subflow(x)
return result
@flow
def main_flow():
result = my_task(5)
return result
### 2. Creating Dynamic Flow Runs
A task can create dynamic flow runs using deployment:
python
from prefect import flow, task
from prefect.client.orchestration import get_client
@task
def create_flow_run(data: dict):
# Get the Prefect client
client = get_client()
# Create a flow run from an existing deployment
flow_run = client.create_flow_run(
deployment_id="deployment-id",
parameters={"data": data}
)
return flow_run.id
@flow
def main_flow():
data = {"value": 42}
flow_run_id = create_flow_run(data)
return flow_run_id
### 3. Parallel Flow Runs from Task
A task can trigger multiple flow runs in parallel:
python
from prefect import flow, task
@flow
def process_item(item: int):
return item * 2
@task
def process_batch(items: list):
# Submit multiple flow runs in parallel
futures = []
for item in items:
future = process_item.submit(item)
futures.append(future)
# Wait for all results
return [f.result() for f in futures]
@flow
def main_flow():
items = [1, 2, 3, 4, 5]
results = process_batch(items)
return results
### 4. Creating Deployments from Task
A task can create new deployments programmatically:
python
from prefect import flow, task
from prefect.deployments import Deployment
@flow
def target_flow(x: int):
return x * 2
@task
def create_deployment():
deployment = Deployment.build_from_flow(
flow=target_flow,
name="programmatic-deployment",
version="1",
parameters={"x": 42}
)
deployment_id = deployment.apply()
return deployment_id
@flow
def main_flow():
deployment_id = create_deployment()
return deployment_id
### Important Considerations
1. State Handling
python
@task
def trigger_flow_with_state(data: dict):
try:
result = subflow(data)
return result
except Exception as e:
# Handle any flow run failures
raise RuntimeError(f"Flow run failed: {e}")
2. Async Support
python
from prefect import flow, task
@task
async def async_task_with_flow():
# Async tasks can call async flows
result = await async_subflow()
return result
@flow
async def async_subflow():
return "async result"
@flow
async def main_flow():
result = await async_task_with_flow()
return result
3. Best Practices
- Keep it Simple: Only create flows from tasks when there's a clear separation of concerns
python
@task
def prepare_flow_run(data: dict):
# Task handles preparation logic
processed_data = preprocess(data)
# Then triggers the flow that handles the main processing
return processing_flow(processed_data)
- Error Handling: Always include proper error handling when creating flow runs
python
@task(retries=3)
def safe_flow_trigger(data: dict):
try:
return subflow(data)
except Exception as e:
logger = get_run_logger()
logger.error(f"Failed to trigger flow: {e}")
raise
- Resource Management: Be mindful of resource usage when creating multiple flow runs
python
@task
def batch_flow_creator(items: list, batch_size: int = 10):
# Create flow runs in batches to manage resources
for batch in chunks(items, batch_size):
futures = [process_flow.submit(item) for item in batch]
# Wait for batch to complete before starting next batch
[f.result() for f in futures]
Marvin
07/15/2025, 9:36 AM