<@ULVA73B9P> how do I leverage prefect’s ability t...
# ask-community
o
@Marvin how do I leverage prefect’s ability to track the progress/state of individual tasks such that if the entire flow fails or the flow finishes but individual tasks fail, rerunning the flow will only rerun the failed tasks?
m
thought for 221.7 seconds
n
short answer: results
m
Great question. In Prefect 3.x, the way to “only rerun failed tasks” on a new flow run is to use task result persistence + caching. Prefect will then skip tasks that already have a valid, matching cached result and only execute the ones that don’t (e.g., the ones that failed last time). How it works at a high level: - Each task run can persist its result to durable storage. - On a subsequent flow run, Prefect computes a cache key for each task invocation (based on your cache policy). If a completed result with that key exists and isn’t expired, the task is skipped and the cached result is used. - Tasks that previously failed won’t have a valid cached result, so they will run. What to configure on your tasks: - persist_result=True — actually saves results so they’re available across runs. - cache_policy=prefect.cache_policies.DEFAULT — a sensible, code- and input-aware cache key that invalidates when task code changes. - Optional: cache_expiration to control how long results are reusable. - Optional: result_storage to pick where results are stored (local FS, S3, GCS, etc.). If you’re using Prefect Cloud, a default result store is available out-of-the-box. Example
Copy code
from datetime import timedelta
from prefect import flow, task, cache_policies

# Optional: pick an explicit result store (example: local filesystem)
# from prefect.filesystems import LocalFileSystem
# store = LocalFileSystem(basepath="./.prefect-results")

@task(
    persist_result=True,
    cache_policy=cache_policies.DEFAULT,    # skip if code+inputs match
    cache_expiration=timedelta(days=14),
    # result_storage=store,                 # uncomment to use explicit local storage
    retries=3,                              # retry within the same run first
    retry_delay_seconds=10
)
def compute_item(i: int) -> int:
    # expensive logic here
    return i * i

@flow
def my_flow(n: int = 10):
    futures = [compute_item.submit(i) for i in range(n)]
    return [f.result() for f in futures]

if __name__ == "__main__":
    my_flow(10)
Behavior you’ll see: - First run: all tasks execute and persist results. - If the flow fails midway, completed tasks’ results are already persisted. - Next run with the same inputs: completed tasks are skipped (cache hit), failed/missing ones rerun. - If you change the task code, DEFAULT cache policy invalidates and recomputes (so you don’t accidentally reuse stale results). Notes and tips: - “Resuming” a failed flow run: Prefect treats each run independently; there isn’t a “resume failed run” or “retry from failed” button. You just start a new run of the flow/deployment. Caching/persisted results make it effectively resume by skipping work that already completed. - Force recompute: set
refresh_cache=True
on the task call or change the cache policy/key if you need to invalidate. - Side effects: caching only helps for returned results. If a task has side effects (e.g., writes to an external system), design idempotent logic or add your own existence checks. - Storage: If you’re not on Prefect Cloud, set an explicit
result_storage
(e.g., LocalFileSystem, S3, GCS) so results survive restarts and are available to workers. Would you like help wiring this to a specific result store (local disk, S3, GCS) or tuning the cache key to your use case?
o
@Marvin so if my task writes results out to ADLS as the success criteria, I can write custom logic to determine if that data is in there. If it is in there, I can skip that task, if not, I can run that task?
m
thought for 82.7 seconds
Yes—that’s a solid pattern for side-effecting tasks. Because Prefect can’t infer what’s already materialized in ADLS, make your task idempotent: check ADLS up front and return early if the data already exists. On a rerun, tasks with data present will “fast-complete” and only missing/failed partitions will do work. Two common ways to wire it: 1) Short-circuit inside the task
Copy code
from azure.storage.blob import BlobServiceClient, ResourceExistsError
from prefect import flow, task

@task(retries=3, retry_delay_seconds=10)
def write_partition(container: str, blob_path: str, data: bytes, conn_str: str):
    bsc = BlobServiceClient.from_connection_string(conn_str)
    blob = bsc.get_blob_client(container=container, blob=blob_path)

    # Fast path: already written
    if blob.exists():
        return {"path": f"abfs://{container}/{blob_path}", "status": "already_exists"}

    # Write once; treat “already exists” as success to be idempotent under races
    try:
        blob.upload_blob(data, overwrite=False)
        return {"path": f"abfs://{container}/{blob_path}", "status": "written"}
    except ResourceExistsError:
        return {"path": f"abfs://{container}/{blob_path}", "status": "already_exists"}

@flow
def load_partitions(partitions, conn_str, container):
    # Submit in parallel; tasks that find existing data will return immediately
    futures = [write_partition.submit(container, p["key"], p["payload"], conn_str) for p in partitions]
    return [f.result() for f in futures]
2) Pre-check then submit only missing work
Copy code
from azure.storage.blob import BlobServiceClient
from prefect import flow, task

@task
def needs_write(container: str, blob_path: str, conn_str: str) -> bool:
    bsc = BlobServiceClient.from_connection_string(conn_str)
    return not bsc.get_blob_client(container=container, blob=blob_path).exists()

@task
def write(container: str, blob_path: str, data: bytes, conn_str: str):
    BlobServiceClient.from_connection_string(conn_str).get_blob_client(
        container=container, blob=blob_path
    ).upload_blob(data, overwrite=False)

@flow
def load_partitions(partitions, conn_str, container):
    # Decide what still needs work
    need_flags = [needs_write.submit(container, p["key"], conn_str) for p in partitions]
    to_run = [p for p, flag in zip(partitions, need_flags) if flag.result()]

    # Only submit missing ones
    futures = [write.submit(container, p["key"], p["payload"], conn_str) for p in to_run]
    for f in futures:
        f.result()
Notes - This marks “already done” tasks as Completed (they just return early), which is exactly what you want for reruns. - You don’t need Prefect task caching here since the source of truth is ADLS; the existence check is your “cache.” If you do return useful metadata and want to reuse it across runs, you can still add
persist_result=True
and a
cache_policy
, but it’s optional. - Handle concurrency: if multiple runs may write the same blob, keep the upload idempotent (
overwrite=False
+ catch
ResourceExistsError
) to avoid flapping. - If you have many partitions, the pre-check pattern can avoid scheduling unnecessary work entirely. If you share how you lay out your ADLS paths (per-date/partition/etc.), I can tailor the check/write helpers for your structure.