Vincent Chéry
07/16/2025, 8:51 AM@task
def get_periods():
return [1, 3, 4]
@task
def task_1(period):
sleep(0.5)
return period + 1
@task
def task_2(period):
return period + 2
@flow(name="flow_1")
def flow_1():
periods = get_periods()
res_1 = task_1.map(periods)
task_2.map(
periods,
wait_for=[res_1]
)
Marvin
07/16/2025, 8:51 AMMarvin
07/16/2025, 8:51 AMsleep
2. The way you're handling task dependencies with wait_for
could be improved
Here's the corrected version of your flow:
python
from time import sleep
from prefect import flow, task
@task
def get_periods():
return [1, 3, 4]
@task
def task_1(period):
sleep(0.5)
return period + 1
@task
def task_2(period):
return period + 2
@flow(name="flow_1")
def flow_1():
periods = get_periods()
res_1 = task_1.map(periods)
# Wait for all mapped tasks to complete
res_1.wait()
# Now map task_2
task_2.map(periods)
The main changes are:
1. Added the necessary imports
2. Used .wait()
to ensure all mapped tasks from task_1
complete before starting task_2
However, to provide more specific guidance about why your flow is crashing, it would be helpful if you could share:
1. The specific error message you're seeing
2. Which version of Prefect you're using (2.x or 3.x)
This will help me give you a more targeted solution to your specific issue.Vincent Chéry
07/16/2025, 8:53 AMMarvin
07/16/2025, 8:53 AMMarvin
07/16/2025, 8:53 AMpython
from time import sleep
from prefect import flow, task
@task
def get_periods():
return [1, 3, 4]
@task
def task_1(period):
sleep(0.5)
return period + 1
@task
def task_2(period):
return period + 2
@flow(name="flow_1")
def flow_1():
periods = get_periods()
# Map task_1 and store the futures
res_1 = task_1.map(periods)
# Wait for all task_1 executions to complete
res_1.wait()
# Now map task_2 with the original periods
res_2 = task_2.map(periods)
# Wait for task_2 executions to complete as well
res_2.wait()
The key changes are:
1. Added explicit .wait()
calls after each mapped task to ensure all iterations complete before moving on
2. Removed the wait_for
parameter in task_2.map()
as it's better to use explicit .wait()
calls with mapped tasks
3. Store the result of task_2.map()
in a variable so we can wait for it
The "Crash detected!" error occurs when the flow tries to proceed before all mapped task executions have completed. This is a common issue when working with mapped tasks in Prefect 3.x. The .wait()
method ensures that all mapped iterations of a task complete before the flow continues.
If you need to access the results of the mapped tasks, you can do so after the .wait()
call:
python
@flow(name="flow_1")
def flow_1():
periods = get_periods()
res_1 = task_1.map(periods)
res_1.wait()
# Get all results from task_1 if needed
task_1_results = [r.result() for r in res_1]
res_2 = task_2.map(periods)
res_2.wait()
# Get all results from task_2 if needed
task_2_results = [r.result() for r in res_2]
This pattern ensures proper task execution order and prevents the runtime environment from cancelling the execution prematurely.Vincent Chéry
07/16/2025, 8:55 AMwait_for
argument.Marvin
07/16/2025, 8:55 AMMarvin
07/16/2025, 8:56 AMwait_for
and `.wait()`:
1. wait_for=[res_1]
(your original approach):
- This tells Prefect that task_2
should wait for res_1
before starting
- However, when used with mapped tasks, res_1
is actually a PrefectFutureList
(a collection of futures, one for each mapped task)
- The wait_for
mechanism doesn't handle this collection of futures as effectively as .wait()
- This can lead to timing issues where some mapped tasks start before their dependencies are complete
2. res_1.wait()
(the working approach):
- This explicitly waits for ALL futures in the PrefectFutureList
to complete
- It's a synchronization point that ensures every single mapped instance of task_1
has finished before moving on
- It's more explicit and reliable when dealing with mapped tasks
Here's a visual example to illustrate:
python
# Original approach (may crash):
@flow
def flow_1():
periods = get_periods() # Returns [1, 3, 4]
res_1 = task_1.map(periods) # Creates 3 task futures
task_2.map(
periods,
wait_for=[res_1] # Less reliable with mapped tasks
)
# Better approach (reliable):
@flow
def flow_1():
periods = get_periods() # Returns [1, 3, 4]
res_1 = task_1.map(periods) # Creates 3 task futures
res_1.wait() # Explicitly wait for ALL 3 task_1 futures to complete
task_2.map(periods) # Only starts after ALL task_1 executions are done
General rule of thumb:
- Use .wait()
when dealing with mapped tasks and you need to ensure all iterations complete
- Use wait_for
when dealing with individual task dependencies or when you need more fine-grained control over task dependencies
The reason your original code crashed is that wait_for
wasn't providing the strong synchronization guarantee needed for the mapped tasks, potentially allowing some task_2
executions to start before all task_1
executions were complete. Using .wait()
fixes this by providing an explicit synchronization point.Vincent Chéry
07/16/2025, 9:10 AMfrom time import sleep
from prefect import flow, task
@task
def get_periods():
return [1, 3, 4]
@task
def task_1(period):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Task 1 starting to sleep for {period} seconds")
sleep(period)
<http://logger.info|logger.info>(f"Task 1 done sleeping for {period} seconds")
return period + 1
@task
def task_2(period):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Task 2 processing period {period}")
return period + 2
@flow(name="flow_1")
def flow_1():
periods = get_periods()
res_1 = task_1.map(periods)
res_1.wait()
task_2.map(periods)
The error is still the same : Crash detected! Execution was cancelled by the runtime environment.
Marvin
07/16/2025, 9:10 AMMarvin
07/16/2025, 9:12 AMpython
from time import sleep
from prefect import flow, task, get_run_logger
from typing import List
@task
def get_periods() -> List[int]:
return [1, 3, 4]
@task(timeout_seconds=300) # Add explicit timeout
def task_1(period: int) -> int:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Task 1 starting to sleep for {period} seconds")
sleep(period)
<http://logger.info|logger.info>(f"Task 1 done sleeping for {period} seconds")
return period + 1
@task
def task_2(period: int) -> int:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Task 2 processing period {period}")
return period + 2
@flow(name="flow_1", timeout_seconds=600) # Add flow-level timeout
def flow_1():
periods = get_periods()
res_1 = task_1.map(periods)
# Wait and capture results
res_1.wait()
res_2 = task_2.map(periods)
# Wait for task_2 to complete as well
res_2.wait()
return res_2 # Return final results
if __name__ == "__main__":
flow_1()
Key changes and explanations:
1. Added Explicit Timeouts:
- Added timeout_seconds
to both the task and flow to prevent unexpected cancellations
- The flow timeout should be longer than the sum of potential task execution times
2. Complete Task Chain Waiting:
- Added .wait()
for both task chains to ensure proper completion
- Store results of both mapped operations
3. Type Hints:
- Added type hints to make the code more explicit and help catch potential issues
4. Return Values:
- Added explicit return of results from the flow
You might also want to try these alternatives if you're still seeing issues:
python
@task(timeout_seconds=300, retries=2) # Add retries
def task_1(period: int) -> int:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Task 1 starting to sleep for {period} seconds")
try:
sleep(period)
except Exception as e:
logger.error(f"Error during sleep: {e}")
raise
<http://logger.info|logger.info>(f"Task 1 done sleeping for {period} seconds")
return period + 1
@flow(name="flow_1", timeout_seconds=600)
def flow_1():
periods = get_periods()
# Execute and wait for each map operation completely
res_1 = task_1.map(periods)
results_1 = [r.result() for r in res_1] # Alternative way to wait and get results
res_2 = task_2.map(periods)
results_2 = [r.result() for r in res_2]
return results_2
If you're still seeing crashes, it would be helpful to know:
1. Are you running this locally or in a distributed environment (Kubernetes, Docker, etc.)?
2. What is the exact timing of the crash - does it happen during task_1 execution or during the transition to task_2?
3. Are you seeing any additional error messages in the logs besides the "Crash detected" message?
This information would help us further diagnose if there might be infrastructure-specific issues at play.Vincent Chéry
07/16/2025, 9:16 AMprefect_test_harness
context. The timing of the crash is when starting task_2 after task_1 has completed. There are no other error messages. And I should add that it's somewhat random, sometimes it crashes, sometimes notMarvin
07/16/2025, 9:16 AMMarvin
07/16/2025, 9:17 AMpython
from time import sleep
from prefect import flow, task, get_run_logger
from typing import List
@task
def get_periods() -> List[int]:
return [1, 3, 4]
@task(retries=2) # Add retries for resilience
def task_1(period: int) -> int:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Task 1 starting to sleep for {period} seconds")
sleep(period)
<http://logger.info|logger.info>(f"Task 1 done sleeping for {period} seconds")
return period + 1
@task(retries=2) # Add retries for resilience
def task_2(period: int) -> int:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Task 2 processing period {period}")
return period + 2
@flow(name="flow_1")
def flow_1():
periods = get_periods()
# Execute first mapped task and ensure results are retrieved
res_1 = task_1.map(periods)
# Use a more robust way to wait and get results
results_1 = []
for future in res_1:
try:
results_1.append(future.result())
except Exception as e:
logger = get_run_logger()
logger.error(f"Error getting result from task_1: {e}")
raise
# Small delay before starting next task map
sleep(0.1)
# Execute second mapped task
res_2 = task_2.map(periods)
results_2 = []
for future in res_2:
try:
results_2.append(future.result())
except Exception as e:
logger = get_run_logger()
logger.error(f"Error getting result from task_2: {e}")
raise
return results_2
2. If you're using prefect_test_harness
in a test context, you might want to wrap your test with additional context:
python
from prefect.testing.utilities import prefect_test_harness
import pytest
@pytest.mark.timeout(30) # Add timeout to prevent hanging tests
def test_flow():
with prefect_test_harness(
server_startup_timeout=10, # Increase server startup timeout
):
result = flow_1()
assert result is not None
3. Alternative approach using sequential submission:
python
@flow(name="flow_1")
def flow_1():
periods = get_periods()
# Process periods one at a time to avoid transition issues
results_1 = []
results_2 = []
for period in periods:
# Execute task_1 for this period
res_1 = task_1.submit(period)
results_1.append(res_1.result())
# Execute task_2 for this period
res_2 = task_2.submit(period)
results_2.append(res_2.result())
return results_2
The intermittent nature of the crashes suggests there might be a race condition or resource management issue in the test harness when transitioning between mapped tasks. The solutions above try to address this by:
1. Adding more explicit error handling and logging
2. Using individual result retrieval instead of waiting for all results at once
3. Adding a small delay between task maps
4. Providing an alternative sequential approach that might be more stable in the test environment
Try these approaches and let me know if you still see the crashes. Also, it would be helpful to know:
1. Are you running this as part of a test suite or standalone?
2. What version of Python and Prefect are you using exactly?
3. Do you see any difference in crash frequency with smaller or larger sleep periods?
This information would help us further refine the solution if needed.