<@ULVA73B9P> I run a long running loop in flow on ...
# ask-marvin
s
@Marvin I run a long running loop in flow on a docker worker. In the middle I took the worker down and restarted the worker. The UI showing flow is running but on the worker it's still not running
m
thought for 198.2 seconds
Thanks for the details — this is expected behavior with Docker workers. What’s happening - When you stop a Docker worker mid-run, it does not “reattach” to that in-flight flow run when it comes back. Workers only pick up new scheduled runs. - For Docker work pools, the worker launches a separate flow-run container. If that container kept running, your flow is still running independently of the worker. If that container died when you took the worker/host down, the orchestration layer won’t immediately know — the UI will continue to show “Running” until the run is marked “Crashed” due to missing heartbeats. - Prefect relies on flow-run heartbeats to detect “zombie” runs. If heartbeats stop, an automation can transition the run to “Crashed.” Until then, it will remain “Running” in the UI. What you can do now - Check whether the flow-run container is still running on the Docker host (e.g.,
docker ps
). If there’s no container for that run, the run is gone and will be marked “Crashed” once the missing-heartbeat window elapses. - If you don’t want to wait, you can manually set the flow run state to “Crashed” or “Cancelled” in the UI and then re-run the flow. There isn’t a built-in way to resume a killed run mid-execution; you’ll need to restart it (optionally leveraging task caching/checkpoints if you’ve enabled them). Prevent this in the future - Enable a “detect zombie flows” automation so runs that stop heartbeating are automatically moved to “Crashed” (and optionally notify you). This keeps the UI from showing “Running” indefinitely when the underlying process is gone. See Detect zombie flows. - For very long-running work, consider: - Breaking the loop into smaller tasks/iterations so partial progress can be retried or resumed via caching. - Ensuring results are persisted so re-runs can pick up from prior successful steps. A couple of quick questions to help pinpoint your situation: - Are you using Prefect Cloud or Prefect Server (self-hosted)? - Did you stop just the worker process, or did the Docker host (and thus the flow-run container) also restart? - Roughly how long has the UI shown “Running” since the restart? With those details, I can advise on the expected timing for the state to flip and the best recovery steps.
s
I have to run my alert fetching flow every 5 min. Which calls siem api fetch alert and insert in db via api. Which set-up will work for me I'm using prefect server
I have to run my alert fetching flow every 5 min. Which calls siem api fetch alert and insert in db via api. Which set-up will work for me I'm using prefect server
@Marvin how can I leverage task caching and checkpoint
m
thought for 186.3 seconds
Great question — in Prefect 3.x you can combine task caching with persisted results (“checkpoints”) so that after a crash/restart, previously completed work is reused instead of recomputed. Key concepts - Caching decides whether to skip re-execution based on a cache key and expiration. - Persisted results (checkpointing) store the task’s output in a filesystem or object store so cache hits can be fulfilled across different runs and different workers. - For cross-run reuse, you generally want BOTH: - cache_key_fn + cache_expiration on the task - persist_result=True and a shared result_storage (e.g., S3, shared filesystem) Minimal example (local filesystem)
Copy code
from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect.filesystems import LocalFileSystem
from prefect.serializers import JSONSerializer

# Shared path mounted/visible to all workers if you want cross-worker reuse
result_fs = LocalFileSystem(basepath="/mnt/prefect-results")

@task(
    # Enable checkpointing
    persist_result=True,
    result_storage=result_fs,
    result_serializer=JSONSerializer(),
    # Enable caching across runs
    cache_key_fn=task_input_hash,          # key derived from task inputs
    cache_expiration=timedelta(days=7),    # reuse for 7 days
)
def expensive(x: int) -> int:
    print("Running expensive computation...")
    return x * x

@flow(persist_result=True, result_storage=result_fs, result_serializer=JSONSerializer())
def my_flow(items: list[int]):
    # Break the long loop into per-item tasks; each call can be cached/checkpointed
    futures = [expensive.submit(x) for x in items]
    return [f.result() for f in futures]

if __name__ == "__main__":
    my_flow(list(range(1000)))
Result: If the run is interrupted and you re-run with the same inputs, Prefect will skip the already-completed
expensive(x)
calls and load the results from storage. Using S3 for result storage - Install the AWS extra:
pip install 'prefect[aws]'
(or in CI:
uv run --with 'prefect[aws]' ...
) - Configure an S3 bucket as the result store, so any worker with AWS access can reuse results.
Copy code
from datetime import timedelta
from prefect import flow, task
from prefect.tasks import task_input_hash
from prefect.serializers import CompressedPickleSerializer
from prefect_aws.s3 import S3Bucket
from prefect_aws.credentials import AwsCredentials

result_s3 = S3Bucket(
    bucket="my-prefect-bucket",
    basepath="results",  # optional subfolder
    credentials=AwsCredentials(aws_access_key_id="...", aws_secret_access_key="...", region_name="us-east-1"),
)

@task(
    persist_result=True,
    result_storage=result_s3,
    # Pick a serializer that suits your data (JSON, Pickle, Compressed*)
    result_serializer=CompressedPickleSerializer(),
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(days=14),
)
def heavy_task(payload: dict) -> dict:
    # ... heavy work ...
    return {"ok": True, "n": len(payload)}

@flow(persist_result=True, result_storage=result_s3, result_serializer=CompressedPickleSerializer())
def pipeline(batch: list[dict]):
    return [heavy_task.submit(b) for b in batch]
Notes and tips - cache_key_fn:
task_input_hash
is a good default. If you need custom invalidation, you can write your own function that includes a “version” or file checksums. - cache_expiration: controls how long a cache entry is considered valid. - persist_result: set this to True on tasks (and optionally flows) to write outputs to your result storage. - result_storage: must be reachable by all workers that need to read cached results (e.g., shared filesystem, S3, GCS). - Serializers: choose JSONSerializer for plain data, Pickle/CompressedPickle for Python objects or larger payloads. - Forcing a refresh: you can set
refresh_cache=True
when defining the task (or use
my_task.with_options(refresh_cache=True)
when calling) if you ever need to bypass the cache. Docs
- Results (checkpointing): Persisting results - Caching: Task caching Happy to tailor this to your setup — which backend do your workers share access to (local/NFS, S3, GCS)? And roughly how large are your task results?
s
@Marvin I have to run my alert fetching flow every 5 min. Which calls siem api fetch alert and insert in db via api. Which set-up will work for me I'm using prefect server Should I use long running loop or scheduled run for durable execution
m
thought for 308.3 seconds
Short answer: use a scheduled deployment, not a long‑running loop. Why - Long loops are fragile (worker/container restarts, memory leaks) and hard to recover mid-run. - Scheduled runs are durable: each poll is its own flow run with retries, logging, and isolation. If a worker goes down, the next run will proceed when it’s back. - You can prevent overlap with a concurrency limit and maintain a “watermark” so you never miss alerts. Recommended setup (Prefect Server) 1) Model the poll as a short, idempotent flow - Store a watermark (e.g., last processed timestamp or alert ID) in a Prefect Variable or your DB. - Fetch alerts since the watermark, write idempotently to your DB/API, then advance the watermark. Example
Copy code
from datetime import datetime, timedelta, timezone
from typing import List, Dict

from prefect import flow, task
from prefect.variables import Variable

@task(retries=3, retry_delay_seconds=[5, 15, 30], timeout_seconds=60, log_prints=True)
def fetch_alerts_since(since: datetime) -> List[Dict]:
    # TODO: call your SIEM API with `since`
    # return a list of alerts with a reliable created_at or id
    ...

@task(retries=3, retry_delay_seconds=[5, 15, 30], timeout_seconds=60)
def upsert_alerts(alerts: List[Dict]) -> None:
    # TODO: write alerts via your DB/API
    # Make this idempotent if possible (e.g., use natural keys / upsert / idempotency keys)
    ...

def _get_initial_since() -> datetime:
    # start 10 minutes back on first run to be safe; tune as needed
    return datetime.now(timezone.utc) - timedelta(minutes=10)

@flow(name="poll-siem-every-5m")
def poll_siem():
    last = Variable.get("siem_last_ts", default=None)
    if last:
        since = datetime.fromisoformat(last)
    else:
        since = _get_initial_since()

    alerts = fetch_alerts_since(since)
    upsert_alerts(alerts)

    # Advance watermark; use max(alert["created_at"]) if available, otherwise now()
    new_since = max(
        [since] + [
            a["created_at"] for a in alerts if "created_at" in a
        ]
    ) if alerts else datetime.now(timezone.utc)

    Variable.set("siem_last_ts", new_since.isoformat())
2) Deploy with a 5‑minute schedule and no overlap - Use code-first deployment with a cron or interval schedule. - Set a concurrency limit of 1 to prevent overlapping runs. If a run takes >5m, the next will wait.
Copy code
if __name__ == "__main__":
    poll_siem.deploy(
        name="prod",
        work_pool_name="docker-pool",
        cron="*/5 * * * *",          # or: interval=timedelta(minutes=5)
        concurrency_limit=1,
        description="Poll SIEM and write alerts every 5 minutes",
    )
3) Create a work pool and start a worker - Create a Docker work pool (adjust if you’re using process/k8s).
Copy code
prefect work-pool create "docker-pool" --type docker
prefect worker start -p docker-pool
Operational tips - Idempotency: Make your DB/API writes idempotent (upserts, unique keys) so retries and catch-up windows don’t duplicate data. - Watermark: Keeping the watermark external (Prefect Variable or your DB) ensures missed runs will “catch up” next time. - Retries and timeouts: Put them on the tasks that touch external services. - Overruns: With
concurrency_limit=1
, Prefect won’t overlap runs; late runs will queue until the prior one completes. - Optional: Add result persistence or logging as needed; caching is usually not necessary for polling workflows. Docs - Schedules: Schedules - Variables (for the watermark): Variables - Concurrency limits: Concurrency limits If you share how your SIEM and DB APIs handle idempotency (e.g., unique alert IDs), I can help tailor the watermarking and write logic.
s
@Marvin what kind of outbound access required for docker worker to run workflow pulled from git with image.
m
thought for 192.4 seconds
Short answer: outbound HTTPS (port 443) only. No inbound is required. Here’s what needs egress when a Prefect 3.x Docker worker runs a deployment that pulls code from Git and uses a Docker image: Who needs to reach what - Docker worker host (the process that polls for flow runs and starts containers): - Prefect Cloud API to poll for work and report statuses - Your container registry to pull the image - Flow-run container (the container that actually executes your flow): - Prefect Cloud API to send logs/states and receive run instructions - Your Git provider to fetch the flow code specified by
flow.from_source(...)
- Any external services your flow touches (databases, APIs, etc.) Typical domains and ports - Prefect Cloud - HTTPS 443 to the Prefect Cloud API - Domains vary by environment; generally allow the Prefect Cloud domains you use (e.g., api/ui under the prefect.cloud umbrella). If you enforce egress allowlists, allow Prefect Cloud endpoints for your region/workspace. - For private connectivity options and allowlists, see: - PrivateLink for Prefect Cloud - Secure access by IP address - Docker image registry (worker host pulls images) - Docker Hub: HTTPS 443 to: - registry-1.docker.io (image layers) - auth.docker.io (authentication) - production.cloudflare.docker.com (content CDN used by Docker) - GitHub Container Registry (if used): HTTPS 443 to: - ghcr.io - pkg-containers.githubusercontent.com - AWS ECR: HTTPS 443 to: - *.ecr.<region>.amazonaws.com (registry) - sts.amazonaws.com (if using AssumeRole for auth) - Google Artifact Registry/GCR: HTTPS 443 to: - *.pkg.dev and *.gcr.io - Azure Container Registry: HTTPS 443 to: - <your-registry>.azurecr.io - login.microsoftonline.com (AAD auth), as applicable - Any other private registry: your registry’s hostname over 443 - Git provider (flow-run container pulls code) - GitHub over HTTPS 443: - github.com (repo access) - codeload.github.com (archives) - raw.githubusercontent.com (raw files) - If using SSH for Git instead of HTTPS, also allow TCP 22 to your Git host - GitHub Enterprise/GitLab/Bitbucket: your organization’s hostnames over 443 (or 22 if SSH) Notes and clarifications - Worker vs. container access: - The Docker worker does NOT need to reach Git to clone your repo. The clone happens inside the flow-run container started by the worker. - Both the worker and the flow-run container need outbound to Prefect Cloud to function properly. - Ports: 443 (HTTPS) is sufficient in almost all cases. If you use Git over SSH, open 22 from the flow-run container. - No inbound ports are required for either the worker or the container. - If you’re self-hosting Prefect Server instead of Prefect Cloud, replace the Prefect Cloud endpoints above with your API/UI hostnames. - Storing flow code with `from_source`: - Prefect will fetch your code at run time inside the flow-run container. Ensure the container’s network policy allows your Git host over 443. - Docs: Store flow code for deployments If you can share: - Are you on Prefect Cloud or self-hosted? - Which registry (Docker Hub, GHCR, ECR, ACR, GCR, other)? - Which Git host (public GitHub, GitHub Enterprise, GitLab, Bitbucket)? I can tailor an exact allowlist for your environment.
s
@Marvin my flow on worker add info in supabase db how can I make this api call from worker if client deny to whitelist supabase api endpoint directly from engine
m
thought for 279.6 seconds
First, quick clarification: in Prefect 3.x your flow code runs inside the flow-run container started by your worker. Prefect Cloud does not call Supabase on your behalf. So any API call to Supabase originates from your worker’s network (not “the engine”). If your client will not allow direct egress from the worker/flow-run container to Supabase, you have a few workable patterns: Option A — Use a corporate outbound proxy from the flow-run container - Point your flow-run container at an egress proxy your client already allows. - Set HTTP(S) proxy env vars on the Docker work pool or per-deployment so outbound HTTP from your flow (including the Supabase client) goes via the proxy: - HTTP_PROXY / HTTPS_PROXY - NO_PROXY (to bypass the proxy for internal hosts as needed) Example (Python deploy) setting job variables for a Docker work pool:
Copy code
from prefect import flow
from prefect.docker import DockerImage

@flow
def write_to_supabase():
    # your supabase code here

flow.from_source("<https://github.com/org/repo>").deploy(
    name="my-deployment",
    work_pool_name="docker-pool",
    image=DockerImage(name="your-image:tag"),
    job_variables={
        "env": {
            "HTTPS_PROXY": "<http://proxy.mycorp.local:8080>",
            "HTTP_PROXY": "<http://proxy.mycorp.local:8080>",
            "NO_PROXY": "127.0.0.1,localhost,.svc,.cluster.local"
        }
    },
)
Docs: Customize job variables Option B — Relay service you control - Have the flow-run container send the payload to an internal API or trusted relay (allowed by your client). - That relay (running in an environment with internet access) writes to Supabase. - Variants: queue-based (SQS/Pub/Sub/Kafka) where a consumer outside the restricted network writes to Supabase. Option C — Direct Postgres connection pattern - If policy allows TCP 5432 to your Supabase Postgres endpoint (instead of the Supabase REST API), connect with a Postgres client from the flow-run container and insert directly. This still requires allowing egress to the Supabase DB host, so it depends on your client’s policy. Notes - Put Supabase credentials in environment variables or your secret manager; do not hardcode in code. You can inject them at the work pool level or deployment job_variables:
Copy code
job_variables={
  "env": {
    "SUPABASE_URL": "<redacted>",
    "SUPABASE_SERVICE_ROLE_KEY": "<redacted>"
  }
}
- If your worker host itself needs a proxy to pull images, set the same proxy variables on the worker process environment as well. Minimal flow example (the supabase client will honor HTTPS_PROXY if using requests under the hood; otherwise the system/env proxy is typically respected):
Copy code
from prefect import flow
import os
from supabase import create_client

@flow
def write_to_supabase():
    url = os.environ["SUPABASE_URL"]
    key = os.environ["SUPABASE_SERVICE_ROLE_KEY"]
    supabase = create_client(url, key)
    supabase.table("events").insert({"source": "prefect", "ok": True}).execute()
Questions to tailor a precise setup: - Where is your worker running (on-prem VM, k8s, cloud VPC)? - Do you have an approved egress proxy we can route through? - Is your client willing to allow egress to an internal relay/queue you control? - Are you using the Supabase REST API only, or is opening Postgres (5432) an option? Based on your answers, I can provide the exact job_variables or infra config you need.