Tinendra kumar
03/22/2025, 7:22 PMNate
03/22/2025, 8:40 PMfrom prefect import flow, runtime, task
@task
def extract() -> int:
return 42
@task
def transform(x: int) -> int:
if runtime.flow_run.run_count == 1:
raise RuntimeError("simulate a failure")
return x + 1
@task
def load(x: int):
print(x)
@flow(retries=1, persist_result=True)
def f():
x = extract()
y = transform(x)
load(y)
if __name__ == "__main__":
f()
in this example, i simulate failure the first time around, and since I said persist_result=True
on the parent flow, on the retry the extract
will be cached (i.e. the result is loaded from disk without the function running)Nate
03/22/2025, 8:40 PM