<@ULVA73B9P> is there any way to read existing sch...
# ask-marvin
o
@Marvin is there any way to read existing schedules for a deployment with python?
m
thought for 269.8 seconds
Yes. In Prefect 3.x you can read a deployment’s schedules via the Python client. Here’s an async example using `get_client`:
Copy code
import asyncio
from prefect.client.orchestration import get_client
from prefect.client.schemas.schedules import CronSchedule, IntervalSchedule, RRuleSchedule

async def main():
    async with get_client() as client:
        # Option A: fetch deployment by "flow_name/deployment_name"
        deployment = await client.read_deployment_by_name("my-flow/my-deployment")

        # Option B: if you already have the deployment UUID
        # deployment = await client.read_deployment(deployment_id)

        # List schedules attached to this deployment
        schedules = await client.read_deployment_schedules(deployment_id=deployment.id)

        # Inspect/print them
        for ds in schedules:
            sched = ds.schedule
            if isinstance(sched, CronSchedule):
                print(f"[{ds.id}] type=cron cron='{sched.cron}' tz={sched.timezone} active={ds.active}")
            elif isinstance(sched, IntervalSchedule):
                print(f"[{ds.id}] type=interval every={sched.interval} anchor={sched.anchor_date} tz={sched.timezone} active={ds.active}")
            elif isinstance(sched, RRuleSchedule):
                print(f"[{ds.id}] type=rrule rrule='{sched.rrule}' tz={sched.timezone} active={ds.active}")
            else:
                # NoSchedule or unknown
                print(f"[{ds.id}] type={type(sched).__name__} active={ds.active} raw={sched}")

asyncio.run(main())
If you prefer sync:
Copy code
from prefect.client.orchestration import SyncPrefectClient

with SyncPrefectClient() as client:
    deployment = client.read_deployment_by_name("my-flow/my-deployment")
    schedules = client.read_deployment_schedules(deployment_id=deployment.id)
    for ds in schedules:
        print(ds)
More details: - Concepts: Schedules in Prefect 3.x - API reference (schedules and deployment schedules): Python API for deployment schedules If you share your flow/deployment names (or whether you want async vs sync), I can tailor the snippet.
o
what happens if you try to read a deployment that doesnt exist? an exception of some kind? which?
@Marvin what happens if you try to read a deployment that doesnt exist? an exception of some kind? which?
m
thought for 97.1 seconds
Short answer: you’ll get a PrefectHTTPStatusError with a 404 status when the deployment doesn’t exist. Details: -
read_deployment(deployment_id)
and
read_deployment_by_name("flow/deployment")
both make a GET request; if the server returns 404 Not Found, the client raises
prefect.exceptions.PrefectHTTPStatusError
. - Some helper methods elsewhere may raise
ObjectNotFound
when they do their own lookup and find zero results, but the direct “read” endpoints typically bubble up
PrefectHTTPStatusError
. Example handling (async):
Copy code
import asyncio
from prefect.client.orchestration import get_client
from prefect.exceptions import PrefectHTTPStatusError

async def main():
    async with get_client() as client:
        try:
            dep = await client.read_deployment_by_name("my-flow/my-deployment")
        except PrefectHTTPStatusError as exc:
            if exc.response is not None and exc.response.status_code == 404:
                print("Deployment not found")
            else:
                raise

asyncio.run(main())
Sync:
Copy code
from prefect.client.orchestration import SyncPrefectClient
from prefect.exceptions import PrefectHTTPStatusError

with SyncPrefectClient() as client:
    try:
        dep = client.read_deployment_by_name("my-flow/my-deployment")
    except PrefectHTTPStatusError as exc:
        if exc.response is not None and exc.response.status_code == 404:
            print("Deployment not found")
        else:
            raise
References: - REST API: Read deployment by ID - REST API: Read deployment by name - Exceptions: prefect.exceptions
o
@Marvin would a polars DataFrame be serializable / usable as input when caching in prefect, or could that give issues? What about a list of dictionaries?
m
thought for 276.0 seconds
Short answer: - Polars DataFrame as an input to a cached task: it can work, but it’s not ideal. Prefect’s default input-hash function can fall back to hashing complex objects, which may be slow, memory-heavy, and brittle across library versions. Prefer deriving a lightweight, deterministic cache key instead of hashing the entire DataFrame. - List[dict]: generally safe and well-supported for input-based caching and for result serialization (if values are JSON-serializable). Details and recommendations (Prefect 3.x): - Input-based caching: When you set
cache_key_fn=task_input_hash
(or use the Inputs cache policy), Prefect computes a deterministic hash of the task’s arguments. Simple Python types (str/int/float/bool), lists/tuples, and dicts (with JSON-serializable contents) are great here and produce stable keys. Complex objects like Polars DataFrames may be hashed via fallbacks, but this can be large/slow and may produce different keys if library versions change. - Result persistence: If you persist a task’s result, serializers matter: -
JSONSerializer
works for JSON-serializable structures (e.g., list[dict] of simple types). -
PickleSerializer
(cloudpickle) can handle most Python objects, including Polars DataFrames, but results are bigger, less portable, and sensitive to environment/library versions. For large tabular data, it’s often better to write to Parquet/IPC and return a reference (e.g., a path or URL) instead of storing the whole frame in the result store. Practical patterns: - If your task input is a Polars DataFrame, prefer passing a lightweight identifier and hash that instead, e.g., the file path plus a file content hash or a Parquet byte hash:
Copy code
import io, hashlib, polars as pl
  from prefect import task
  from prefect.utilities.hashing import stable_hash

  def hash_polars_df(df: pl.DataFrame) -> str:
    buf = io.BytesIO()
    df.write_parquet(buf, compression="zstd")
    return hashlib.sha256(buf.getvalue()).hexdigest()

  def cache_key(context, arguments):
    df = arguments.get("df")
    other = {k: v for k, v in arguments.items() if k != "df"}
    parts = [stable_hash(other)]
    if df is not None:
      parts.append(hash_polars_df(df))
    return hashlib.sha256("::".join(parts).encode()).hexdigest()

  @task(cache_key_fn=cache_key)
  def transform(df: pl.DataFrame, options: dict):
    ...
If the DataFrame comes from a known file/object store, it’s even better to hash the file contents (or a version/ETag) and avoid materializing the frame before caching. - For list[dict] inputs,
task_input_hash
is typically perfect:
Copy code
from prefect import task
  from prefect.tasks import task_input_hash

  @task(cache_key_fn=task_input_hash)
  def process(records: list[dict]):
    ...
Docs: - Caching concepts: Prefect 3.x Caching - Serializers API: Prefect serializers API If you can share how you’re constructing the cache key (e.g.,
cache_key_fn
vs
cache_policy
) and whether you also want to persist the result, I can suggest a concrete approach tailored to your flow.
o
@Marvin what happens in prefect if a scheduled deployment run somehow is "missed" (ie server hosted locally in docker and computer is shut down)
m
thought for 233.1 seconds
Short answer: missed schedules are not backfilled automatically. Details (Prefect 3.x): - If the Prefect server is down during a scheduled time (e.g., you shut down your laptop running Server in Docker), no flow runs are created for those times. When the server comes back up, the scheduler resumes and only creates runs going forward. The “missed” windows are skipped unless you manually create runs. - If the server stayed up but your worker(s) were down, the runs were created and will sit Scheduled (they may show as Late). When a worker comes online, it can still pick up and run those Late/Scheduled runs. How to catch up/backfill: - CLI (quick/manual): - Create a run now:
Copy code
prefect deployment run <FLOW_NAME>/<DEPLOYMENT_NAME>
- Create a run with a specific start time (UTC ISO 8601):
Copy code
prefect deployment run <FLOW_NAME>/<DEPLOYMENT_NAME> --start-at "2025-09-19T01:00:00Z"
You can loop these to recreate each missed interval. - Python API (scriptable): use
create_flow_run_from_deployment
via the client to programmatically create runs for each missed time window and optionally pass parameters that encode the backfill date. - Strategy: if you need reliable scheduling while a laptop may be offline, consider running Prefect Server on an always-on host or using Prefect Cloud so the scheduler is always available. Docs for reference: - Schedules: Prefect 3.x Schedules - Workers: Workers overview - API client: API client If you share your schedule type (cron/interval) and how long the server was down, I can sketch a tiny backfill script or a one-liner to generate the needed
prefect deployment run
commands.