Omar Khudeira
08/25/2025, 6:18 PMMarvin
08/25/2025, 6:18 PMNate
08/25/2025, 6:20 PMMarvin
08/25/2025, 6:22 PMfrom datetime import timedelta
from prefect import flow, task, cache_policies
# Optional: pick an explicit result store (example: local filesystem)
# from prefect.filesystems import LocalFileSystem
# store = LocalFileSystem(basepath="./.prefect-results")
@task(
persist_result=True,
cache_policy=cache_policies.DEFAULT, # skip if code+inputs match
cache_expiration=timedelta(days=14),
# result_storage=store, # uncomment to use explicit local storage
retries=3, # retry within the same run first
retry_delay_seconds=10
)
def compute_item(i: int) -> int:
# expensive logic here
return i * i
@flow
def my_flow(n: int = 10):
futures = [compute_item.submit(i) for i in range(n)]
return [f.result() for f in futures]
if __name__ == "__main__":
my_flow(10)
Behavior you’ll see:
- First run: all tasks execute and persist results.
- If the flow fails midway, completed tasks’ results are already persisted.
- Next run with the same inputs: completed tasks are skipped (cache hit), failed/missing ones rerun.
- If you change the task code, DEFAULT cache policy invalidates and recomputes (so you don’t accidentally reuse stale results).
Notes and tips:
- “Resuming” a failed flow run: Prefect treats each run independently; there isn’t a “resume failed run” or “retry from failed” button. You just start a new run of the flow/deployment. Caching/persisted results make it effectively resume by skipping work that already completed.
- Force recompute: set refresh_cache=True
on the task call or change the cache policy/key if you need to invalidate.
- Side effects: caching only helps for returned results. If a task has side effects (e.g., writes to an external system), design idempotent logic or add your own existence checks.
- Storage: If you’re not on Prefect Cloud, set an explicit result_storage
(e.g., LocalFileSystem, S3, GCS) so results survive restarts and are available to workers.
Would you like help wiring this to a specific result store (local disk, S3, GCS) or tuning the cache key to your use case?Omar Khudeira
08/25/2025, 6:30 PMMarvin
08/25/2025, 6:30 PMMarvin
08/25/2025, 6:31 PMfrom azure.storage.blob import BlobServiceClient, ResourceExistsError
from prefect import flow, task
@task(retries=3, retry_delay_seconds=10)
def write_partition(container: str, blob_path: str, data: bytes, conn_str: str):
bsc = BlobServiceClient.from_connection_string(conn_str)
blob = bsc.get_blob_client(container=container, blob=blob_path)
# Fast path: already written
if blob.exists():
return {"path": f"abfs://{container}/{blob_path}", "status": "already_exists"}
# Write once; treat “already exists” as success to be idempotent under races
try:
blob.upload_blob(data, overwrite=False)
return {"path": f"abfs://{container}/{blob_path}", "status": "written"}
except ResourceExistsError:
return {"path": f"abfs://{container}/{blob_path}", "status": "already_exists"}
@flow
def load_partitions(partitions, conn_str, container):
# Submit in parallel; tasks that find existing data will return immediately
futures = [write_partition.submit(container, p["key"], p["payload"], conn_str) for p in partitions]
return [f.result() for f in futures]
2) Pre-check then submit only missing work
from azure.storage.blob import BlobServiceClient
from prefect import flow, task
@task
def needs_write(container: str, blob_path: str, conn_str: str) -> bool:
bsc = BlobServiceClient.from_connection_string(conn_str)
return not bsc.get_blob_client(container=container, blob=blob_path).exists()
@task
def write(container: str, blob_path: str, data: bytes, conn_str: str):
BlobServiceClient.from_connection_string(conn_str).get_blob_client(
container=container, blob=blob_path
).upload_blob(data, overwrite=False)
@flow
def load_partitions(partitions, conn_str, container):
# Decide what still needs work
need_flags = [needs_write.submit(container, p["key"], conn_str) for p in partitions]
to_run = [p for p, flag in zip(partitions, need_flags) if flag.result()]
# Only submit missing ones
futures = [write.submit(container, p["key"], p["payload"], conn_str) for p in to_run]
for f in futures:
f.result()
Notes
- This marks “already done” tasks as Completed (they just return early), which is exactly what you want for reruns.
- You don’t need Prefect task caching here since the source of truth is ADLS; the existence check is your “cache.” If you do return useful metadata and want to reuse it across runs, you can still add persist_result=True
and a cache_policy
, but it’s optional.
- Handle concurrency: if multiple runs may write the same blob, keep the upload idempotent (overwrite=False
+ catch ResourceExistsError
) to avoid flapping.
- If you have many partitions, the pre-check pattern can avoid scheduling unnecessary work entirely.
If you share how you lay out your ADLS paths (per-date/partition/etc.), I can tailor the check/write helpers for your structure.