Hi, I'm currently working on a POC where I have a ...
# ask-community
t
Hi, I'm currently working on a POC where I have a simple ETL job. I have created a flow with three subflows inside it and provided retry logic in the main flow. Please consider the example below. @flow(name='run ETL flow', log_prints=True, retries=3, retry_delay_seconds=180) def main_flow(): data_extract_subflow() # Extract data_transform_subflow() # Transform data_load_subflow() # Load if name == "__main__": main_flow() QQ: Suppose my second subflow (data_transform_subflow()) fails for some reason during execution. Are there any parameters or settings we can configure so that the next retry starts from where it failed last time? In my case It should start from second subflow data_transform_subflow instead of running from the start
n
hi @Tinendra kumar - short answer, yes! this is the idea behind results in prefect
Copy code
from prefect import flow, runtime, task

@task
def extract() -> int:
    return 42

@task
def transform(x: int) -> int:
    if runtime.flow_run.run_count == 1:
        raise RuntimeError("simulate a failure")
    return x + 1

@task
def load(x: int):
    print(x)

@flow(retries=1, persist_result=True)
def f():
    x = extract()
    y = transform(x)
    load(y)

if __name__ == "__main__":
    f()
in this example, i simulate failure the first time around, and since I said
persist_result=True
on the parent flow, on the retry the
extract
will be cached (i.e. the result is loaded from disk without the function running)
image.png