<@ULVA73B9P> I just realized that when I have defa...
# ask-marvin
j
@Marvin I just realized that when I have default flow retries enabled and I run a flow that invokes several subflows in a row, if one of those subflows fails and enters into awaitingretry, the next flow goes ahead and runs without waiting for the first subflow to enter a truly final state. This isn't what I want. How do I ensure each subflow is successful before moving onto the next?
m
Prefect 2.x does not automatically handle waiting for the completion or final state of subflows when using retries. However, you can manage the flow sequence to ensure each subflow completes successfully before moving on to the next by using the
wait_for
parameter in task and subflow submissions. Here's an example of how you can structure your flows to wait for the preceding subflow to complete successfully or fail before starting the next one:
Copy code
python
from prefect import flow, task

@task(retries=3, retry_delay_seconds=5)
def task_a():
    print("Running Task A")
    # Simulate task logic here
    # If the task fails, raise an exception to trigger retries

@task
def task_b():
    print("Running Task B")

@flow
def subflow_1():
    task_a()

@flow
def subflow_2():
    task_b()

@flow
def main_flow():
    # Run subflow_1 and wait for its completion
    subflow_1_future = subflow_1().result()
    
    # Only proceed if subflow_1 succeeds
    if subflow_1_future.state.is_completed():
        subflow_2()

# Starting the main flow
main_flow()
### Key Points: 1. Retry Logic: Defined in the task itself using retries and retry delay parameters. 2. Result Handling: Use
.result()
to get the result of
subflow_1()
to ensure it waits for completion. 3. Condition Check: Use
subflow_1_future.state.is_completed()
to check whether the subflow succeeded before proceeding. ### References: 1. Prefect Flows Documentation 2. Prefect Tasks Documentation 3. GitHub PR on Manual Flow Retries - Comprehensive explanation of manual retry logic. By structuring your main flow like this, you ensure that each subflow completes and is in a terminal state before starting the next one. If subflow_1 does not succeed, the execution of subflow_2 can be conditioned or handled accordingly.
j
@Marvin when I invoke subflow_1.result() will that throw an exception right then if the flow failed? I know this is how tasks work. So maybe I don't need to check the state if I just invoke result?
m
You're correct. When you use
.result()
on a subflow, it will indeed block until the subflow completes and will raise an exception if the subflow fails. This behavior aligns with how tasks work. You can leverage this to simplify your flow execution without additional state checks. Here's a simplified version of your main flow:
Copy code
python
from prefect import flow, task

@task(retries=3, retry_delay_seconds=5)
def task_a():
    print("Running Task A")
    # Simulate task logic here
    # If the task fails, raise an exception to trigger retries

@task
def task_b():
    print("Running Task B")

@flow
def subflow_1():
    task_a()

@flow
def subflow_2():
    task_b()

@flow
def main_flow():
    try:
        # Run subflow_1 and wait for its completion
        subflow_1().result()
        
        # If subflow_1 succeeds, proceed to subflow_2
        subflow_2()
    except Exception as e:
        print(f"Subflow 1 failed with exception: {e}")
        # Handle failure if necessary

# Starting the main flow
main_flow()
### Key Points: 1. Blocking Call:
subflow_1().result()
is a blocking call which waits for
subflow_1
to complete and raises an exception if it fails. 2. Exception Handling: The
try...except
block catches exceptions raised by
subflow_1
, allowing you to handle failures appropriately. ### References: - Prefect Flows Documentation - Prefect Error Handling Using
.result()
simplifies your flow logic and ensures that any failure in
subflow_1
is immediately propagated, preventing
subflow_2
from executing unless
subflow_1
succeeds.