Vincent Chéry
07/16/2025, 8:51 AM@task
def get_periods():
    return [1, 3, 4]
@task
def task_1(period):
    sleep(0.5)
    return period + 1
@task
def task_2(period):
    return period + 2
@flow(name="flow_1")
def flow_1():
    periods = get_periods()
    res_1 = task_1.map(periods)
    task_2.map(
        periods,
        wait_for=[res_1]
    )Marvin
07/16/2025, 8:51 AMMarvin
07/16/2025, 8:51 AMsleepwait_forpython
from time import sleep
from prefect import flow, task
@task
def get_periods():
    return [1, 3, 4]
@task
def task_1(period):
    sleep(0.5)
    return period + 1
@task
def task_2(period):
    return period + 2
@flow(name="flow_1")
def flow_1():
    periods = get_periods()
    res_1 = task_1.map(periods)
    # Wait for all mapped tasks to complete
    res_1.wait()
    # Now map task_2
    task_2.map(periods).wait()task_1task_2Vincent Chéry
07/16/2025, 8:53 AMMarvin
07/16/2025, 8:53 AMMarvin
07/16/2025, 8:53 AMpython
from time import sleep
from prefect import flow, task
@task
def get_periods():
    return [1, 3, 4]
@task
def task_1(period):
    sleep(0.5)
    return period + 1
@task
def task_2(period):
    return period + 2
@flow(name="flow_1")
def flow_1():
    periods = get_periods()
    # Map task_1 and store the futures
    res_1 = task_1.map(periods)
    # Wait for all task_1 executions to complete
    res_1.wait()
    # Now map task_2 with the original periods
    res_2 = task_2.map(periods)
    # Wait for task_2 executions to complete as well
    res_2.wait().wait()wait_fortask_2.map().wait()task_2.map().wait().wait()python
@flow(name="flow_1")
def flow_1():
    periods = get_periods()
    res_1 = task_1.map(periods)
    res_1.wait()
    # Get all results from task_1 if needed
    task_1_results = [r.result() for r in res_1]
    
    res_2 = task_2.map(periods)
    res_2.wait()
    # Get all results from task_2 if needed
    task_2_results = [r.result() for r in res_2]Vincent Chéry
07/16/2025, 8:55 AMwait_forMarvin
07/16/2025, 8:55 AMMarvin
07/16/2025, 8:56 AMwait_forwait_for=[res_1]task_2res_1res_1PrefectFutureListwait_for.wait()res_1.wait()PrefectFutureListtask_1python
# Original approach (may crash):
@flow
def flow_1():
    periods = get_periods()  # Returns [1, 3, 4]
    res_1 = task_1.map(periods)  # Creates 3 task futures
    task_2.map(
        periods,
        wait_for=[res_1]  # Less reliable with mapped tasks
    )
# Better approach (reliable):
@flow
def flow_1():
    periods = get_periods()  # Returns [1, 3, 4]
    res_1 = task_1.map(periods)  # Creates 3 task futures
    res_1.wait()  # Explicitly wait for ALL 3 task_1 futures to complete
    task_2.map(periods)  # Only starts after ALL task_1 executions are done.wait()wait_forwait_fortask_2task_1.wait()Vincent Chéry
07/16/2025, 9:10 AMfrom time import sleep
from prefect import flow, task
@task
def get_periods():
    return [1, 3, 4]
@task
def task_1(period):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Task 1 starting to sleep for {period} seconds")
    sleep(period)
    <http://logger.info|logger.info>(f"Task 1 done sleeping for {period} seconds")
    return period + 1
@task
def task_2(period):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Task 2 processing period {period}")
    return period + 2
@flow(name="flow_1")
def flow_1():
    periods = get_periods()
    res_1 = task_1.map(periods)
    res_1.wait()
    task_2.map(periods)Crash detected! Execution was cancelled by the runtime environment.Marvin
07/16/2025, 9:10 AMMarvin
07/16/2025, 9:12 AMpython
from time import sleep
from prefect import flow, task, get_run_logger
from typing import List
@task
def get_periods() -> List[int]:
    return [1, 3, 4]
@task(timeout_seconds=300)  # Add explicit timeout
def task_1(period: int) -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Task 1 starting to sleep for {period} seconds")
    sleep(period)
    <http://logger.info|logger.info>(f"Task 1 done sleeping for {period} seconds")
    return period + 1
@task
def task_2(period: int) -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Task 2 processing period {period}")
    return period + 2
@flow(name="flow_1", timeout_seconds=600)  # Add flow-level timeout
def flow_1():
    periods = get_periods()
    res_1 = task_1.map(periods)
    # Wait and capture results
    res_1.wait()
    res_2 = task_2.map(periods)
    # Wait for task_2 to complete as well
    res_2.wait()
    
    return res_2  # Return final results
if __name__ == "__main__":
    flow_1()timeout_seconds.wait()python
@task(timeout_seconds=300, retries=2)  # Add retries
def task_1(period: int) -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Task 1 starting to sleep for {period} seconds")
    try:
        sleep(period)
    except Exception as e:
        logger.error(f"Error during sleep: {e}")
        raise
    <http://logger.info|logger.info>(f"Task 1 done sleeping for {period} seconds")
    return period + 1
@flow(name="flow_1", timeout_seconds=600)
def flow_1():
    periods = get_periods()
    # Execute and wait for each map operation completely
    res_1 = task_1.map(periods)
    results_1 = [r.result() for r in res_1]  # Alternative way to wait and get results
    
    res_2 = task_2.map(periods)
    results_2 = [r.result() for r in res_2]
    
    return results_2Vincent Chéry
07/16/2025, 9:16 AMprefect_test_harnessMarvin
07/16/2025, 9:16 AMMarvin
07/16/2025, 9:17 AMpython
from time import sleep
from prefect import flow, task, get_run_logger
from typing import List
@task
def get_periods() -> List[int]:
    return [1, 3, 4]
@task(retries=2)  # Add retries for resilience
def task_1(period: int) -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Task 1 starting to sleep for {period} seconds")
    sleep(period)
    <http://logger.info|logger.info>(f"Task 1 done sleeping for {period} seconds")
    return period + 1
@task(retries=2)  # Add retries for resilience
def task_2(period: int) -> int:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Task 2 processing period {period}")
    return period + 2
@flow(name="flow_1")
def flow_1():
    periods = get_periods()
    
    # Execute first mapped task and ensure results are retrieved
    res_1 = task_1.map(periods)
    # Use a more robust way to wait and get results
    results_1 = []
    for future in res_1:
        try:
            results_1.append(future.result())
        except Exception as e:
            logger = get_run_logger()
            logger.error(f"Error getting result from task_1: {e}")
            raise
            
    # Small delay before starting next task map
    sleep(0.1)
    
    # Execute second mapped task
    res_2 = task_2.map(periods)
    results_2 = []
    for future in res_2:
        try:
            results_2.append(future.result())
        except Exception as e:
            logger = get_run_logger()
            logger.error(f"Error getting result from task_2: {e}")
            raise
            
    return results_2prefect_test_harnesspython
from prefect.testing.utilities import prefect_test_harness
import pytest
@pytest.mark.timeout(30)  # Add timeout to prevent hanging tests
def test_flow():
    with prefect_test_harness(
        server_startup_timeout=10,  # Increase server startup timeout
    ):
        result = flow_1()
        assert result is not Nonepython
@flow(name="flow_1")
def flow_1():
    periods = get_periods()
    
    # Process periods one at a time to avoid transition issues
    results_1 = []
    results_2 = []
    
    for period in periods:
        # Execute task_1 for this period
        res_1 = task_1.submit(period)
        results_1.append(res_1.result())
        
        # Execute task_2 for this period
        res_2 = task_2.submit(period)
        results_2.append(res_2.result())
    
    return results_2