Hi <@ULVA73B9P>, I'm currently working on a POC wh...
# ask-marvin
t
Hi @Marvin, I'm currently working on a POC where I have a simple ETL job. I have created a flow with three subflows inside it and provided retry logic in the main flow. Please consider the example below. @flow(name='run ETL flow', log_prints=True, retries=3, retry_delay_seconds=180) def main_flow(): data_extract_subflow() # Extract data_transform_subflow() # Transform data_load_subflow() # Load if name == "__main__": main_flow() QQ: Suppose my second subflow (data_transform_subflow()) fails for some reason during execution. Are there any parameters or settings we can configure so that the next retry starts from where it failed last time? In my case It should start from second subflow data_transform_subflow instead of running from the start (edited)
m
In Prefect 3.x, if you want to resume a flow from a specific subflow after a failure, there's no built-in mechanism to automatically restart from a specific subflow. Prefect treats subflows as tasks, and the retry logic applies at the flow level, meaning it will restart the entire flow, including all subflows, upon failure. However, you can implement a workaround by using the state management features. You can manually track the progress of your subflows using persistent states and conditionally execute subflows based on their previous success or failure. Here's a rough idea of how you might implement this: 1. State Tracking: Use persistent storage or Prefect's state management to track the completion state of each subflow. 2. Conditional Execution: Before executing each subflow, check its completion state. If it has already succeeded, skip its execution. Here's a simplified version of how you might structure this:
Copy code
python
from prefect import flow, task, get_run_logger

@task
def data_extract_subflow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Extracting data...")
    # Perform extraction
    return "extracted"

@task
def data_transform_subflow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Transforming data...")
    # Perform transformation
    return "transformed"

@task
def data_load_subflow():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Loading data...")
    # Perform loading
    return "loaded"

@flow(name='run ETL flow', log_prints=True, retries=3, retry_delay_seconds=180)
def main_flow():
    # Check state before running each subflow
    if not check_state("extraction"):
        data_extract_subflow()
        set_state("extraction", "done")
        
    if not check_state("transformation"):
        data_transform_subflow()
        set_state("transformation", "done")
        
    if not check_state("loading"):
        data_load_subflow()
        set_state("loading", "done")

def check_state(step_name):
    # Implement state checking logic
    return False

def set_state(step_name, state):
    # Implement state setting logic
    pass

if __name__ == "__main__":
    main_flow()
For more detailed implementation, you might need to integrate with a database or another form of persistent storage to track and update the state of each subflow. You can read more about state management in Prefect's documentation.