James Ashby
06/26/2024, 5:43 PMMarvin
06/26/2024, 5:43 PMwait_for
parameter in task and subflow submissions.
Here's an example of how you can structure your flows to wait for the preceding subflow to complete successfully or fail before starting the next one:
python
from prefect import flow, task
@task(retries=3, retry_delay_seconds=5)
def task_a():
print("Running Task A")
# Simulate task logic here
# If the task fails, raise an exception to trigger retries
@task
def task_b():
print("Running Task B")
@flow
def subflow_1():
task_a()
@flow
def subflow_2():
task_b()
@flow
def main_flow():
# Run subflow_1 and wait for its completion
subflow_1_future = subflow_1().result()
# Only proceed if subflow_1 succeeds
if subflow_1_future.state.is_completed():
subflow_2()
# Starting the main flow
main_flow()
### Key Points:
1. Retry Logic: Defined in the task itself using retries and retry delay parameters.
2. Result Handling: Use .result()
to get the result of subflow_1()
to ensure it waits for completion.
3. Condition Check: Use subflow_1_future.state.is_completed()
to check whether the subflow succeeded before proceeding.
### References:
1. Prefect Flows Documentation
2. Prefect Tasks Documentation
3. GitHub PR on Manual Flow Retries - Comprehensive explanation of manual retry logic.
By structuring your main flow like this, you ensure that each subflow completes and is in a terminal state before starting the next one. If subflow_1 does not succeed, the execution of subflow_2 can be conditioned or handled accordingly.James Ashby
06/26/2024, 5:45 PMMarvin
06/26/2024, 5:46 PM.result()
on a subflow, it will indeed block until the subflow completes and will raise an exception if the subflow fails. This behavior aligns with how tasks work. You can leverage this to simplify your flow execution without additional state checks.
Here's a simplified version of your main flow:
python
from prefect import flow, task
@task(retries=3, retry_delay_seconds=5)
def task_a():
print("Running Task A")
# Simulate task logic here
# If the task fails, raise an exception to trigger retries
@task
def task_b():
print("Running Task B")
@flow
def subflow_1():
task_a()
@flow
def subflow_2():
task_b()
@flow
def main_flow():
try:
# Run subflow_1 and wait for its completion
subflow_1().result()
# If subflow_1 succeeds, proceed to subflow_2
subflow_2()
except Exception as e:
print(f"Subflow 1 failed with exception: {e}")
# Handle failure if necessary
# Starting the main flow
main_flow()
### Key Points:
1. Blocking Call: subflow_1().result()
is a blocking call which waits for subflow_1
to complete and raises an exception if it fails.
2. Exception Handling: The try...except
block catches exceptions raised by subflow_1
, allowing you to handle failures appropriately.
### References:
- Prefect Flows Documentation
- Prefect Error Handling
Using .result()
simplifies your flow logic and ensures that any failure in subflow_1
is immediately propagated, preventing subflow_2
from executing unless subflow_1
succeeds.