<@ULVA73B9P> Can you tell me why this flow crashes...
# ask-marvin
v
@Marvin Can you tell me why this flow crashes ?
Copy code
@task
def get_periods():
    return [1, 3, 4]

@task
def task_1(period):
    sleep(0.5)
    return period + 1

@task
def task_2(period):
    return period + 2

@flow(name="flow_1")
def flow_1():
    periods = get_periods()
    res_1 = task_1.map(periods)
    task_2.map(
        periods,
        wait_for=[res_1]
    )
m
thought for 26.0 seconds
Based on the research, I can tell you that there might be a couple of issues with your flow: 1. Missing import for
sleep
2. The way you're handling task dependencies with
wait_for
could be improved Here's the corrected version of your flow:
Copy code
python
from time import sleep
from prefect import flow, task

@task
def get_periods():
    return [1, 3, 4]

@task
def task_1(period):
    sleep(0.5)
    return period + 1

@task
def task_2(period):
    return period + 2

@flow(name="flow_1")
def flow_1():
    periods = get_periods()
    res_1 = task_1.map(periods)
    # Wait for all mapped tasks to complete
    res_1.wait()
    # Now map task_2
    task_2.map(periods)
The main changes are: 1. Added the necessary imports 2. Used
.wait()
to ensure all mapped tasks from
task_1
complete before starting
task_2
However, to provide more specific guidance about why your flow is crashing, it would be helpful if you could share: 1. The specific error message you're seeing 2. Which version of Prefect you're using (2.x or 3.x) This will help me give you a more targeted solution to your specific issue.
v
@Marvin I'm using prefect 3.x and the error message is : "Crash detected! Execution was cancelled by the runtime environment."
m
thought for 16.5 seconds
Based on the GitHub issues and your code, I can identify the issue. The problem is related to how you're handling task dependencies in your mapped tasks. The crash occurs because the flow is not properly waiting for all mapped tasks to complete before proceeding. Here's the corrected version of your flow:
Copy code
python
from time import sleep
from prefect import flow, task

@task
def get_periods():
    return [1, 3, 4]

@task
def task_1(period):
    sleep(0.5)
    return period + 1

@task
def task_2(period):
    return period + 2

@flow(name="flow_1")
def flow_1():
    periods = get_periods()
    # Map task_1 and store the futures
    res_1 = task_1.map(periods)
    # Wait for all task_1 executions to complete
    res_1.wait()
    # Now map task_2 with the original periods
    res_2 = task_2.map(periods)
    # Wait for task_2 executions to complete as well
    res_2.wait()
The key changes are: 1. Added explicit
.wait()
calls after each mapped task to ensure all iterations complete before moving on 2. Removed the
wait_for
parameter in
task_2.map()
as it's better to use explicit
.wait()
calls with mapped tasks 3. Store the result of
task_2.map()
in a variable so we can wait for it The "Crash detected!" error occurs when the flow tries to proceed before all mapped task executions have completed. This is a common issue when working with mapped tasks in Prefect 3.x. The
.wait()
method ensures that all mapped iterations of a task complete before the flow continues. If you need to access the results of the mapped tasks, you can do so after the
.wait()
call:
Copy code
python
@flow(name="flow_1")
def flow_1():
    periods = get_periods()
    res_1 = task_1.map(periods)
    res_1.wait()
    # Get all results from task_1 if needed
    task_1_results = [r.result() for r in res_1]
    
    res_2 = task_2.map(periods)
    res_2.wait()
    # Get all results from task_2 if needed
    task_2_results = [r.result() for r in res_2]
This pattern ensures proper task execution order and prevents the runtime environment from cancelling the execution prematurely.
v
@Marvin I tried what you suggest and it works. But I don't understand why this is different from using the
wait_for
argument.
m
thought for 45.6 seconds
Now I can explain the key difference between
wait_for
and `.wait()`: 1.
wait_for=[res_1]
(your original approach): - This tells Prefect that
task_2
should wait for
res_1
before starting - However, when used with mapped tasks,
res_1
is actually a
PrefectFutureList
(a collection of futures, one for each mapped task) - The
wait_for
mechanism doesn't handle this collection of futures as effectively as
.wait()
- This can lead to timing issues where some mapped tasks start before their dependencies are complete 2.
res_1.wait()
(the working approach): - This explicitly waits for ALL futures in the
PrefectFutureList
to complete - It's a synchronization point that ensures every single mapped instance of
task_1
has finished before moving on - It's more explicit and reliable when dealing with mapped tasks Here's a visual example to illustrate:
Copy code
python
# Original approach (may crash):
@flow
def flow_1():
    periods = get_periods()  # Returns [1, 3, 4]
    res_1 = task_1.map(periods)  # Creates 3 task futures
    task_2.map(
        periods,
        wait_for=[res_1]  # Less reliable with mapped tasks
    )

# Better approach (reliable):
@flow
def flow_1():
    periods = get_periods()  # Returns [1, 3, 4]
    res_1 = task_1.map(periods)  # Creates 3 task futures
    res_1.wait()  # Explicitly wait for ALL 3 task_1 futures to complete
    task_2.map(periods)  # Only starts after ALL task_1 executions are done
General rule of thumb: - Use
.wait()
when dealing with mapped tasks and you need to ensure all iterations complete - Use
wait_for
when dealing with individual task dependencies or when you need more fine-grained control over task dependencies The reason your original code crashed is that
wait_for
wasn't providing the strong synchronization guarantee needed for the mapped tasks, potentially allowing some
task_2
executions to start before all
task_1
executions were complete. Using
.wait()
fixes this by providing an explicit synchronization point.
v
@Marvin There is still a problem. This flow crashed altough it follows your advice :
Copy code
from time import sleep
from prefect import flow, task

@task
def get_periods():
    return [1, 3, 4]

@task
def task_1(period):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Task 1 starting to sleep for {period} seconds")
    sleep(period)
    <http://logger.info|logger.info>(f"Task 1 done sleeping for {period} seconds")
    return period + 1

@task
def task_2(period):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Task 2 processing period {period}")
    return period + 2

@flow(name="flow_1")
def flow_1():
    periods = get_periods()
    res_1 = task_1.map(periods)
    res_1.wait()
    task_2.map(periods)
The error is still the same :
Crash detected! Execution was cancelled by the runtime environment.
m
thought for 90.3 seconds
Based on the research, I can see a few potential issues with your flow. Here's the corrected version with several improvements:
Copy code
python
from time import sleep
from prefect import flow, task, get_run_logger
from typing import List

@task
def get_periods() -> List[int]:
    return [1, 3, 4]

@task(timeout_seconds=300)  # Add explicit timeout
def task_1(period: int) -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Task 1 starting to sleep for {period} seconds")
    sleep(period)
    <http://logger.info|logger.info>(f"Task 1 done sleeping for {period} seconds")
    return period + 1

@task
def task_2(period: int) -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Task 2 processing period {period}")
    return period + 2

@flow(name="flow_1", timeout_seconds=600)  # Add flow-level timeout
def flow_1():
    periods = get_periods()
    res_1 = task_1.map(periods)
    # Wait and capture results
    res_1.wait()
    res_2 = task_2.map(periods)
    # Wait for task_2 to complete as well
    res_2.wait()
    
    return res_2  # Return final results

if __name__ == "__main__":
    flow_1()
Key changes and explanations: 1. Added Explicit Timeouts: - Added
timeout_seconds
to both the task and flow to prevent unexpected cancellations - The flow timeout should be longer than the sum of potential task execution times 2. Complete Task Chain Waiting: - Added
.wait()
for both task chains to ensure proper completion - Store results of both mapped operations 3. Type Hints: - Added type hints to make the code more explicit and help catch potential issues 4. Return Values: - Added explicit return of results from the flow You might also want to try these alternatives if you're still seeing issues:
Copy code
python
@task(timeout_seconds=300, retries=2)  # Add retries
def task_1(period: int) -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Task 1 starting to sleep for {period} seconds")
    try:
        sleep(period)
    except Exception as e:
        logger.error(f"Error during sleep: {e}")
        raise
    <http://logger.info|logger.info>(f"Task 1 done sleeping for {period} seconds")
    return period + 1

@flow(name="flow_1", timeout_seconds=600)
def flow_1():
    periods = get_periods()
    # Execute and wait for each map operation completely
    res_1 = task_1.map(periods)
    results_1 = [r.result() for r in res_1]  # Alternative way to wait and get results
    
    res_2 = task_2.map(periods)
    results_2 = [r.result() for r in res_2]
    
    return results_2
If you're still seeing crashes, it would be helpful to know: 1. Are you running this locally or in a distributed environment (Kubernetes, Docker, etc.)? 2. What is the exact timing of the crash - does it happen during task_1 execution or during the transition to task_2? 3. Are you seeing any additional error messages in the logs besides the "Crash detected" message? This information would help us further diagnose if there might be infrastructure-specific issues at play.
v
@Marvin I'm running locally in a
prefect_test_harness
context. The timing of the crash is when starting task_2 after task_1 has completed. There are no other error messages. And I should add that it's somewhat random, sometimes it crashes, sometimes not
m
thought for 68.4 seconds
Based on the research and the intermittent nature of the crashes, I can suggest a few approaches to make your flow more reliable: 1. First, let's modify how we handle the transition between mapped tasks:
Copy code
python
from time import sleep
from prefect import flow, task, get_run_logger
from typing import List

@task
def get_periods() -> List[int]:
    return [1, 3, 4]

@task(retries=2)  # Add retries for resilience
def task_1(period: int) -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Task 1 starting to sleep for {period} seconds")
    sleep(period)
    <http://logger.info|logger.info>(f"Task 1 done sleeping for {period} seconds")
    return period + 1

@task(retries=2)  # Add retries for resilience
def task_2(period: int) -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Task 2 processing period {period}")
    return period + 2

@flow(name="flow_1")
def flow_1():
    periods = get_periods()
    
    # Execute first mapped task and ensure results are retrieved
    res_1 = task_1.map(periods)
    # Use a more robust way to wait and get results
    results_1 = []
    for future in res_1:
        try:
            results_1.append(future.result())
        except Exception as e:
            logger = get_run_logger()
            logger.error(f"Error getting result from task_1: {e}")
            raise
            
    # Small delay before starting next task map
    sleep(0.1)
    
    # Execute second mapped task
    res_2 = task_2.map(periods)
    results_2 = []
    for future in res_2:
        try:
            results_2.append(future.result())
        except Exception as e:
            logger = get_run_logger()
            logger.error(f"Error getting result from task_2: {e}")
            raise
            
    return results_2
2. If you're using
prefect_test_harness
in a test context, you might want to wrap your test with additional context:
Copy code
python
from prefect.testing.utilities import prefect_test_harness
import pytest

@pytest.mark.timeout(30)  # Add timeout to prevent hanging tests
def test_flow():
    with prefect_test_harness(
        server_startup_timeout=10,  # Increase server startup timeout
    ):
        result = flow_1()
        assert result is not None
3. Alternative approach using sequential submission:
Copy code
python
@flow(name="flow_1")
def flow_1():
    periods = get_periods()
    
    # Process periods one at a time to avoid transition issues
    results_1 = []
    results_2 = []
    
    for period in periods:
        # Execute task_1 for this period
        res_1 = task_1.submit(period)
        results_1.append(res_1.result())
        
        # Execute task_2 for this period
        res_2 = task_2.submit(period)
        results_2.append(res_2.result())
    
    return results_2
The intermittent nature of the crashes suggests there might be a race condition or resource management issue in the test harness when transitioning between mapped tasks. The solutions above try to address this by: 1. Adding more explicit error handling and logging 2. Using individual result retrieval instead of waiting for all results at once 3. Adding a small delay between task maps 4. Providing an alternative sequential approach that might be more stable in the test environment Try these approaches and let me know if you still see the crashes. Also, it would be helpful to know: 1. Are you running this as part of a test suite or standalone? 2. What version of Python and Prefect are you using exactly? 3. Do you see any difference in crash frequency with smaller or larger sleep periods? This information would help us further refine the solution if needed.