Shubham
09/23/2025, 5:05 AMMarvin
09/23/2025, 5:05 AMMarvin
09/23/2025, 5:08 AMdocker ps
). If there’s no container for that run, the run is gone and will be marked “Crashed” once the missing-heartbeat window elapses.
- If you don’t want to wait, you can manually set the flow run state to “Crashed” or “Cancelled” in the UI and then re-run the flow. There isn’t a built-in way to resume a killed run mid-execution; you’ll need to restart it (optionally leveraging task caching/checkpoints if you’ve enabled them).
Prevent this in the future
- Enable a “detect zombie flows” automation so runs that stop heartbeating are automatically moved to “Crashed” (and optionally notify you). This keeps the UI from showing “Running” indefinitely when the underlying process is gone. See Detect zombie flows.
- For very long-running work, consider:
- Breaking the loop into smaller tasks/iterations so partial progress can be retried or resumed via caching.
- Ensuring results are persisted so re-runs can pick up from prior successful steps.
A couple of quick questions to help pinpoint your situation:
- Are you using Prefect Cloud or Prefect Server (self-hosted)?
- Did you stop just the worker process, or did the Docker host (and thus the flow-run container) also restart?
- Roughly how long has the UI shown “Running” since the restart?
With those details, I can advise on the expected timing for the state to flip and the best recovery steps.Shubham
09/23/2025, 5:15 AMShubham
09/23/2025, 5:15 AMShubham
09/23/2025, 5:18 AMMarvin
09/23/2025, 5:18 AMMarvin
09/23/2025, 5:22 AMfrom datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect.filesystems import LocalFileSystem
from prefect.serializers import JSONSerializer
# Shared path mounted/visible to all workers if you want cross-worker reuse
result_fs = LocalFileSystem(basepath="/mnt/prefect-results")
@task(
# Enable checkpointing
persist_result=True,
result_storage=result_fs,
result_serializer=JSONSerializer(),
# Enable caching across runs
cache_key_fn=task_input_hash, # key derived from task inputs
cache_expiration=timedelta(days=7), # reuse for 7 days
)
def expensive(x: int) -> int:
print("Running expensive computation...")
return x * x
@flow(persist_result=True, result_storage=result_fs, result_serializer=JSONSerializer())
def my_flow(items: list[int]):
# Break the long loop into per-item tasks; each call can be cached/checkpointed
futures = [expensive.submit(x) for x in items]
return [f.result() for f in futures]
if __name__ == "__main__":
my_flow(list(range(1000)))
Result: If the run is interrupted and you re-run with the same inputs, Prefect will skip the already-completed expensive(x)
calls and load the results from storage.
Using S3 for result storage
- Install the AWS extra: pip install 'prefect[aws]'
(or in CI: uv run --with 'prefect[aws]' ...
)
- Configure an S3 bucket as the result store, so any worker with AWS access can reuse results.
from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect.serializers import CompressedPickleSerializer
from prefect_aws.s3 import S3Bucket
from prefect_aws.credentials import AwsCredentials
result_s3 = S3Bucket(
bucket="my-prefect-bucket",
basepath="results", # optional subfolder
credentials=AwsCredentials(aws_access_key_id="...", aws_secret_access_key="...", region_name="us-east-1"),
)
@task(
persist_result=True,
result_storage=result_s3,
# Pick a serializer that suits your data (JSON, Pickle, Compressed*)
result_serializer=CompressedPickleSerializer(),
cache_key_fn=task_input_hash,
cache_expiration=timedelta(days=14),
)
def heavy_task(payload: dict) -> dict:
# ... heavy work ...
return {"ok": True, "n": len(payload)}
@flow(persist_result=True, result_storage=result_s3, result_serializer=CompressedPickleSerializer())
def pipeline(batch: list[dict]):
return [heavy_task.submit(b) for b in batch]
Notes and tips
- cache_key_fn: task_input_hash
is a good default. If you need custom invalidation, you can write your own function that includes a “version” or file checksums.
- cache_expiration: controls how long a cache entry is considered valid.
- persist_result: set this to True on tasks (and optionally flows) to write outputs to your result storage.
- result_storage: must be reachable by all workers that need to read cached results (e.g., shared filesystem, S3, GCS).
- Serializers: choose JSONSerializer for plain data, Pickle/CompressedPickle for Python objects or larger payloads.
- Forcing a refresh: you can set refresh_cache=True
when defining the task (or use my_task.with_options(refresh_cache=True)
when calling) if you ever need to bypass the cache.
DocsMarvin
09/23/2025, 5:22 AMShubham
09/23/2025, 5:45 AMMarvin
09/23/2025, 5:45 AMMarvin
09/23/2025, 5:50 AMfrom datetime import datetime, timedelta, timezone
from typing import List, Dict
from prefect import flow, task
from prefect.variables import Variable
@task(retries=3, retry_delay_seconds=[5, 15, 30], timeout_seconds=60, log_prints=True)
def fetch_alerts_since(since: datetime) -> List[Dict]:
# TODO: call your SIEM API with `since`
# return a list of alerts with a reliable created_at or id
...
@task(retries=3, retry_delay_seconds=[5, 15, 30], timeout_seconds=60)
def upsert_alerts(alerts: List[Dict]) -> None:
# TODO: write alerts via your DB/API
# Make this idempotent if possible (e.g., use natural keys / upsert / idempotency keys)
...
def _get_initial_since() -> datetime:
# start 10 minutes back on first run to be safe; tune as needed
return datetime.now(timezone.utc) - timedelta(minutes=10)
@flow(name="poll-siem-every-5m")
def poll_siem():
last = Variable.get("siem_last_ts", default=None)
if last:
since = datetime.fromisoformat(last)
else:
since = _get_initial_since()
alerts = fetch_alerts_since(since)
upsert_alerts(alerts)
# Advance watermark; use max(alert["created_at"]) if available, otherwise now()
new_since = max(
[since] + [
a["created_at"] for a in alerts if "created_at" in a
]
) if alerts else datetime.now(timezone.utc)
Variable.set("siem_last_ts", new_since.isoformat())
2) Deploy with a 5‑minute schedule and no overlap
- Use code-first deployment with a cron or interval schedule.
- Set a concurrency limit of 1 to prevent overlapping runs. If a run takes >5m, the next will wait.
if __name__ == "__main__":
poll_siem.deploy(
name="prod",
work_pool_name="docker-pool",
cron="*/5 * * * *", # or: interval=timedelta(minutes=5)
concurrency_limit=1,
description="Poll SIEM and write alerts every 5 minutes",
)
3) Create a work pool and start a worker
- Create a Docker work pool (adjust if you’re using process/k8s).
prefect work-pool create "docker-pool" --type docker
prefect worker start -p docker-pool
Operational tips
- Idempotency: Make your DB/API writes idempotent (upserts, unique keys) so retries and catch-up windows don’t duplicate data.
- Watermark: Keeping the watermark external (Prefect Variable or your DB) ensures missed runs will “catch up” next time.
- Retries and timeouts: Put them on the tasks that touch external services.
- Overruns: With concurrency_limit=1
, Prefect won’t overlap runs; late runs will queue until the prior one completes.
- Optional: Add result persistence or logging as needed; caching is usually not necessary for polling workflows.
Docs
- Schedules: Schedules
- Variables (for the watermark): Variables
- Concurrency limits: Concurrency limits
If you share how your SIEM and DB APIs handle idempotency (e.g., unique alert IDs), I can help tailor the watermarking and write logic.Shubham
09/23/2025, 12:54 PMMarvin
09/23/2025, 12:54 PMMarvin
09/23/2025, 12:57 PMflow.from_source(...)
- Any external services your flow touches (databases, APIs, etc.)
Typical domains and ports
- Prefect Cloud
- HTTPS 443 to the Prefect Cloud API
- Domains vary by environment; generally allow the Prefect Cloud domains you use (e.g., api/ui under the prefect.cloud umbrella). If you enforce egress allowlists, allow Prefect Cloud endpoints for your region/workspace.
- For private connectivity options and allowlists, see:
- PrivateLink for Prefect Cloud
- Secure access by IP address
- Docker image registry (worker host pulls images)
- Docker Hub: HTTPS 443 to:
- registry-1.docker.io (image layers)
- auth.docker.io (authentication)
- production.cloudflare.docker.com (content CDN used by Docker)
- GitHub Container Registry (if used): HTTPS 443 to:
- ghcr.io
- pkg-containers.githubusercontent.com
- AWS ECR: HTTPS 443 to:
- *.ecr.<region>.amazonaws.com (registry)
- sts.amazonaws.com (if using AssumeRole for auth)
- Google Artifact Registry/GCR: HTTPS 443 to:
- *.pkg.dev and *.gcr.io
- Azure Container Registry: HTTPS 443 to:
- <your-registry>.azurecr.io
- login.microsoftonline.com (AAD auth), as applicable
- Any other private registry: your registry’s hostname over 443
- Git provider (flow-run container pulls code)
- GitHub over HTTPS 443:
- github.com (repo access)
- codeload.github.com (archives)
- raw.githubusercontent.com (raw files)
- If using SSH for Git instead of HTTPS, also allow TCP 22 to your Git host
- GitHub Enterprise/GitLab/Bitbucket: your organization’s hostnames over 443 (or 22 if SSH)
Notes and clarifications
- Worker vs. container access:
- The Docker worker does NOT need to reach Git to clone your repo. The clone happens inside the flow-run container started by the worker.
- Both the worker and the flow-run container need outbound to Prefect Cloud to function properly.
- Ports: 443 (HTTPS) is sufficient in almost all cases. If you use Git over SSH, open 22 from the flow-run container.
- No inbound ports are required for either the worker or the container.
- If you’re self-hosting Prefect Server instead of Prefect Cloud, replace the Prefect Cloud endpoints above with your API/UI hostnames.
- Storing flow code with `from_source`:
- Prefect will fetch your code at run time inside the flow-run container. Ensure the container’s network policy allows your Git host over 443.
- Docs: Store flow code for deployments
If you can share:
- Are you on Prefect Cloud or self-hosted?
- Which registry (Docker Hub, GHCR, ECR, ACR, GCR, other)?
- Which Git host (public GitHub, GitHub Enterprise, GitLab, Bitbucket)?
I can tailor an exact allowlist for your environment.Shubham
09/23/2025, 1:03 PMMarvin
09/23/2025, 1:03 PMMarvin
09/23/2025, 1:07 PMfrom prefect import flow
from prefect.docker import DockerImage
@flow
def write_to_supabase():
# your supabase code here
flow.from_source("<https://github.com/org/repo>").deploy(
name="my-deployment",
work_pool_name="docker-pool",
image=DockerImage(name="your-image:tag"),
job_variables={
"env": {
"HTTPS_PROXY": "<http://proxy.mycorp.local:8080>",
"HTTP_PROXY": "<http://proxy.mycorp.local:8080>",
"NO_PROXY": "127.0.0.1,localhost,.svc,.cluster.local"
}
},
)
Docs: Customize job variables
Option B — Relay service you control
- Have the flow-run container send the payload to an internal API or trusted relay (allowed by your client).
- That relay (running in an environment with internet access) writes to Supabase.
- Variants: queue-based (SQS/Pub/Sub/Kafka) where a consumer outside the restricted network writes to Supabase.
Option C — Direct Postgres connection pattern
- If policy allows TCP 5432 to your Supabase Postgres endpoint (instead of the Supabase REST API), connect with a Postgres client from the flow-run container and insert directly. This still requires allowing egress to the Supabase DB host, so it depends on your client’s policy.
Notes
- Put Supabase credentials in environment variables or your secret manager; do not hardcode in code. You can inject them at the work pool level or deployment job_variables:
job_variables={
"env": {
"SUPABASE_URL": "<redacted>",
"SUPABASE_SERVICE_ROLE_KEY": "<redacted>"
}
}
- If your worker host itself needs a proxy to pull images, set the same proxy variables on the worker process environment as well.
Minimal flow example (the supabase client will honor HTTPS_PROXY if using requests under the hood; otherwise the system/env proxy is typically respected):
from prefect import flow
import os
from supabase import create_client
@flow
def write_to_supabase():
url = os.environ["SUPABASE_URL"]
key = os.environ["SUPABASE_SERVICE_ROLE_KEY"]
supabase = create_client(url, key)
supabase.table("events").insert({"source": "prefect", "ok": True}).execute()
Questions to tailor a precise setup:
- Where is your worker running (on-prem VM, k8s, cloud VPC)?
- Do you have an approved egress proxy we can route through?
- Is your client willing to allow egress to an internal relay/queue you control?
- Are you using the Supabase REST API only, or is opening Postgres (5432) an option?
Based on your answers, I can provide the exact job_variables or infra config you need.