Romain Lefebvre
07/30/2025, 1:52 PMMarvin
07/30/2025, 1:52 PMMarvin
07/30/2025, 1:53 PMpython
from prefect import flow, task
@task
def task1(item):
return f"Processing {item} in task1"
@task
def task2(result):
return f"Processing {result} in task2"
@task
def task3(result):
return f"Finalizing {result} in task3"
@flow
def process_item(item):
# Tasks will execute sequentially for each item
result1 = task1(item)
result2 = task2(result1)
result3 = task3(result2)
return result3
@flow
def main_flow(items):
# Map the process_item flow over the list
results = process_item.map(items)
# If you need to wait for all results before proceeding
# You can use wait_for parameter in your next task
next_task(wait_for=[results])
Here are a few key points to ensure correct dependencies:
1. Sequential Tasks Within the Subflow:
- By defining tasks within a subflow (process_item
), you ensure that for each item, the tasks execute in sequence.
- Each task depends on the result of the previous task, creating a natural sequential dependency.
2. Mapping the Subflow:
- Using process_item.map(items)
will execute the entire sequence for each item.
- Each item's sequence runs independently but maintains internal order.
3. Waiting for All Results:
- If you need to ensure all mapped executions are complete before moving on, use the wait_for
parameter in your next task.
If you're seeing incorrect dependencies, here are common fixes:
python
# If you need explicit dependencies between mapped tasks:
@flow
def main_flow(items):
# Map first task
results1 = task1.map(items)
# Map second task, waiting for first task's results
results2 = task2.map(results1)
# Map third task, waiting for second task's results
results3 = task3.map(results2)
return results3
If you're still seeing incorrect dependencies, it would be helpful if you could share:
1. Your current code structure
2. A description of what dependencies you're seeing vs. what you expect
3. Whether you need parallel execution of items or strictly sequential execution
This would help me provide more specific guidance for your use case.Romain Lefebvre
07/30/2025, 1:55 PMMarvin
07/30/2025, 1:55 PMMarvin
07/30/2025, 1:56 PMpython
from prefect import flow, task
@task
async def async_task1(item):
return f"Processing {item} in task1"
@task
async def async_task2(result):
return f"Processing {result} in task2"
@task
async def async_task3(result):
return f"Finalizing {result} in task3"
@flow
async def async_process_item(item):
# Tasks will execute sequentially for each item
result1 = await async_task1(item)
result2 = await async_task2(result1)
result3 = await async_task3(result2)
return result3
@flow
async def main_async_flow(items):
# Map the async process_item flow over the list
results = await async_process_item.map(items)
return results
2. Using .submit() with Mapping:
python
from prefect import flow, task
@task
async def async_task1(item):
return f"Processing {item} in task1"
@task
async def async_task2(result):
return f"Processing {result} in task2"
@task
async def async_task3(result):
return f"Finalizing {result} in task3"
@flow
async def async_process_item(item):
# Using .submit() for concurrent execution
result1_future = async_task1.submit(item)
result2_future = async_task2.submit(result1_future)
result3_future = async_task3.submit(result2_future)
return await result3_future
@flow
async def main_async_flow(items):
# Map using .submit()
futures = [async_process_item.submit(item) for item in items]
# Wait for all results
results = await prefect.futures.gather(*futures)
return results
Key points to remember:
1. Async Flow Requirements:
- All async functions must be awaited using await
- The parent flow calling async tasks must also be async
- You can mix async and sync tasks, but async tasks must be awaited
2. Using .submit():
- .submit()
returns a future immediately
- Dependencies are maintained automatically when you pass futures as arguments
- Use await
to get the actual result from a future
3. Combining .map() and .submit():
python
@flow
async def main_combined_flow(items):
# Using .submit() with .map()
results = async_process_item.map(items, submit=True)
# Wait for all results if needed
final_results = await prefect.futures.gather(*results)
return final_results
4. Performance Considerations:
- .submit()
allows for more fine-grained control over task execution
- .map()
is generally more convenient for simple parallel execution
- Both methods support concurrent execution, but .submit()
gives you more flexibility in managing dependencies
Choose the approach that best fits your needs:
- Use .map()
when you want simple parallel execution over a list of inputs
- Use .submit()
when you need more control over task submission and dependencies
- You can combine both approaches as needed
Would you like me to explain any specific aspect of these patterns in more detail?Romain Lefebvre
07/30/2025, 2:12 PMawait subflow_1()
await subflow_2()
Why can't I see on the dependency graph the relation between subflow_1 and subflow_2 ?Marvin
07/30/2025, 2:12 PMMarvin
07/30/2025, 2:13 PMpython
from prefect import flow
@flow
async def subflow_1():
return "result from subflow 1"
@flow
async def subflow_2():
return "result from subflow 2"
@flow
async def main_flow():
# Execute subflow_1 and store its future
result_1 = await subflow_1()
# Make subflow_2 explicitly wait for subflow_1
result_2 = await subflow_2(wait_for=[result_1])
2. Using Results as Input Parameters:
python
@flow
async def subflow_1():
return "result from subflow 1"
@flow
async def subflow_2(upstream_result):
return f"processed {upstream_result}"
@flow
async def main_flow():
# Pass the result of subflow_1 to subflow_2
result_1 = await subflow_1()
result_2 = await subflow_2(result_1)
The reason why your current setup doesn't show dependencies is that simply using await
statements sequentially doesn't create a Prefect-level dependency - it only creates a Python-level execution order. Prefect needs to know about the relationships between flows to visualize them in the dependency graph.
Key points:
- Sequential await
statements only enforce execution order at the Python level
- To make dependencies visible in Prefect's UI, you need to create explicit Prefect-level dependencies
- Using either wait_for
or passing results as parameters will create visible dependencies
- These dependencies will then appear in the UI's dependency graph
Choose the method that best fits your use case:
- Use wait_for
when you just need execution order without data passing
- Use parameter passing when the downstream flow needs data from the upstream flow
Would you like me to explain more about how these dependencies work or show more complex examples with multiple dependent subflows?Romain Lefebvre
07/30/2025, 2:20 PMMarvin
07/30/2025, 2:20 PMMarvin
07/30/2025, 2:20 PMpython
from prefect import flow
@flow
async def subflow_1():
# Do something but return nothing
pass
@flow
async def subflow_2():
# Do something but return nothing
pass
@flow
async def main_flow():
# Submit the first subflow and get its future
future_1 = subflow_1.submit()
# Make subflow_2 wait for subflow_1's completion
# even though subflow_1 returns None
await subflow_2(wait_for=[future_1])
2. Alternative using .submit() for all flows:
python
@flow
async def main_flow():
# Submit both flows but make subflow_2 wait for subflow_1
future_1 = subflow_1.submit()
future_2 = subflow_2.submit(wait_for=[future_1])
# Wait for all to complete
await future_2
The key points here are:
- You don't need return values to create dependencies
- Using .submit()
creates a future that can be used for dependency tracking
- The wait_for
parameter works with futures regardless of whether the flows return values
- This will make the sequential relationship visible in the dependency graph
Choose whichever style feels more natural for your codebase. Both approaches will create the visible dependencies you're looking for in the UI.
Would you like me to explain more about how these patterns work or show more complex examples with multiple subflows?Romain Lefebvre
07/30/2025, 2:24 PM.submit
methodMarvin
07/30/2025, 2:24 PMMarvin
07/30/2025, 2:25 PMpython
from prefect import flow, task
@task
async def marker_task():
pass
@flow
async def subflow_1():
# Do your subflow work
result = await marker_task()
return result # This will return a Future that can be used for dependencies
@flow
async def subflow_2():
# Your subflow work here
pass
@flow
async def main_flow():
result_1 = await subflow_1()
await subflow_2(wait_for=[result_1])
2. Return a Simple Value to Track Completion:
python
@flow
async def subflow_1():
# Do your subflow work
return True # or any simple value to indicate completion
@flow
async def subflow_2():
# Your subflow work here
pass
@flow
async def main_flow():
result_1 = await subflow_1()
await subflow_2(wait_for=[result_1])
The key points are:
- Flows don't have .submit()
(I was wrong about that)
- You need some kind of result to use with wait_for
- You can either use tasks within your flows to create trackable futures
- Or return a simple value from your flows just to track completion
Would you like me to explain more about how to structure your specific flows to show the dependencies you need?