Olve Heitmann
09/17/2025, 3:27 PMMarvin
09/17/2025, 3:27 PMMarvin
09/17/2025, 3:31 PMimport asyncio
from prefect.client.orchestration import get_client
from prefect.client.schemas.schedules import CronSchedule, IntervalSchedule, RRuleSchedule
async def main():
async with get_client() as client:
# Option A: fetch deployment by "flow_name/deployment_name"
deployment = await client.read_deployment_by_name("my-flow/my-deployment")
# Option B: if you already have the deployment UUID
# deployment = await client.read_deployment(deployment_id)
# List schedules attached to this deployment
schedules = await client.read_deployment_schedules(deployment_id=deployment.id)
# Inspect/print them
for ds in schedules:
sched = ds.schedule
if isinstance(sched, CronSchedule):
print(f"[{ds.id}] type=cron cron='{sched.cron}' tz={sched.timezone} active={ds.active}")
elif isinstance(sched, IntervalSchedule):
print(f"[{ds.id}] type=interval every={sched.interval} anchor={sched.anchor_date} tz={sched.timezone} active={ds.active}")
elif isinstance(sched, RRuleSchedule):
print(f"[{ds.id}] type=rrule rrule='{sched.rrule}' tz={sched.timezone} active={ds.active}")
else:
# NoSchedule or unknown
print(f"[{ds.id}] type={type(sched).__name__} active={ds.active} raw={sched}")
asyncio.run(main())
If you prefer sync:
from prefect.client.orchestration import SyncPrefectClient
with SyncPrefectClient() as client:
deployment = client.read_deployment_by_name("my-flow/my-deployment")
schedules = client.read_deployment_schedules(deployment_id=deployment.id)
for ds in schedules:
print(ds)
More details:
- Concepts: Schedules in Prefect 3.x
- API reference (schedules and deployment schedules): Python API for deployment schedules
If you share your flow/deployment names (or whether you want async vs sync), I can tailor the snippet.Olve Heitmann
09/17/2025, 4:58 PMOlve Heitmann
09/17/2025, 5:02 PMMarvin
09/17/2025, 5:02 PMMarvin
09/17/2025, 5:03 PMread_deployment(deployment_id)
and read_deployment_by_name("flow/deployment")
both make a GET request; if the server returns 404 Not Found, the client raises prefect.exceptions.PrefectHTTPStatusError
.
- Some helper methods elsewhere may raise ObjectNotFound
when they do their own lookup and find zero results, but the direct “read” endpoints typically bubble up PrefectHTTPStatusError
.
Example handling (async):
import asyncio
from prefect.client.orchestration import get_client
from prefect.exceptions import PrefectHTTPStatusError
async def main():
async with get_client() as client:
try:
dep = await client.read_deployment_by_name("my-flow/my-deployment")
except PrefectHTTPStatusError as exc:
if exc.response is not None and exc.response.status_code == 404:
print("Deployment not found")
else:
raise
asyncio.run(main())
Sync:
from prefect.client.orchestration import SyncPrefectClient
from prefect.exceptions import PrefectHTTPStatusError
with SyncPrefectClient() as client:
try:
dep = client.read_deployment_by_name("my-flow/my-deployment")
except PrefectHTTPStatusError as exc:
if exc.response is not None and exc.response.status_code == 404:
print("Deployment not found")
else:
raise
References:
- REST API: Read deployment by ID
- REST API: Read deployment by name
- Exceptions: prefect.exceptionsOlve Heitmann
09/19/2025, 2:09 PMMarvin
09/19/2025, 2:09 PMMarvin
09/19/2025, 2:13 PMcache_key_fn=task_input_hash
(or use the Inputs cache policy), Prefect computes a deterministic hash of the task’s arguments. Simple Python types (str/int/float/bool), lists/tuples, and dicts (with JSON-serializable contents) are great here and produce stable keys. Complex objects like Polars DataFrames may be hashed via fallbacks, but this can be large/slow and may produce different keys if library versions change.
- Result persistence: If you persist a task’s result, serializers matter:
- JSONSerializer
works for JSON-serializable structures (e.g., list[dict] of simple types).
- PickleSerializer
(cloudpickle) can handle most Python objects, including Polars DataFrames, but results are bigger, less portable, and sensitive to environment/library versions. For large tabular data, it’s often better to write to Parquet/IPC and return a reference (e.g., a path or URL) instead of storing the whole frame in the result store.
Practical patterns:
- If your task input is a Polars DataFrame, prefer passing a lightweight identifier and hash that instead, e.g., the file path plus a file content hash or a Parquet byte hash:
import io, hashlib, polars as pl
from prefect import task
from prefect.utilities.hashing import stable_hash
def hash_polars_df(df: pl.DataFrame) -> str:
buf = io.BytesIO()
df.write_parquet(buf, compression="zstd")
return hashlib.sha256(buf.getvalue()).hexdigest()
def cache_key(context, arguments):
df = arguments.get("df")
other = {k: v for k, v in arguments.items() if k != "df"}
parts = [stable_hash(other)]
if df is not None:
parts.append(hash_polars_df(df))
return hashlib.sha256("::".join(parts).encode()).hexdigest()
@task(cache_key_fn=cache_key)
def transform(df: pl.DataFrame, options: dict):
...
If the DataFrame comes from a known file/object store, it’s even better to hash the file contents (or a version/ETag) and avoid materializing the frame before caching.
- For list[dict] inputs, task_input_hash
is typically perfect:
from prefect import task
from prefect.tasks import task_input_hash
@task(cache_key_fn=task_input_hash)
def process(records: list[dict]):
...
Docs:
- Caching concepts: Prefect 3.x Caching
- Serializers API: Prefect serializers API
If you can share how you’re constructing the cache key (e.g., cache_key_fn
vs cache_policy
) and whether you also want to persist the result, I can suggest a concrete approach tailored to your flow.Olve Heitmann
09/19/2025, 10:26 PMMarvin
09/19/2025, 10:26 PMMarvin
09/19/2025, 10:30 PMprefect deployment run <FLOW_NAME>/<DEPLOYMENT_NAME>
- Create a run with a specific start time (UTC ISO 8601):
prefect deployment run <FLOW_NAME>/<DEPLOYMENT_NAME> --start-at "2025-09-19T01:00:00Z"
You can loop these to recreate each missed interval.
- Python API (scriptable): use create_flow_run_from_deployment
via the client to programmatically create runs for each missed time window and optionally pass parameters that encode the backfill date.
- Strategy: if you need reliable scheduling while a laptop may be offline, consider running Prefect Server on an always-on host or using Prefect Cloud so the scheduler is always available.
Docs for reference:
- Schedules: Prefect 3.x Schedules
- Workers: Workers overview
- API client: API client
If you share your schedule type (cron/interval) and how long the server was down, I can sketch a tiny backfill script or a one-liner to generate the needed prefect deployment run
commands.