Shubham
09/23/2025, 5:55 PMMarvin
09/23/2025, 5:55 PMMarvin
09/23/2025, 5:57 PMMarvin
09/23/2025, 5:57 PMShubham
09/23/2025, 6:02 PMMarvin
09/23/2025, 6:02 PMMarvin
09/23/2025, 6:07 PMCron("*/5 * * * *")
- Prevent overlap: a global concurrency limit with concurrency("processor-run")
- API rate limits: rate_limit("vendor-x")
configured globally
- Parallel enrichment: ThreadPoolTaskRunner
+ .submit
- Automations: optionally trigger Processor when Collector completes, or offset schedules by a minute
Minimal patterns
Collector (every 5 minutes)
from prefect import flow, task, get_run_logger
@task(retries=3, retry_delay_seconds=10, timeout_seconds=60)
def fetch_siem_alerts():
# TODO: pull from SIEM
return [...]
@task
def write_alerts_to_db(alerts):
# TODO: upsert alerts; ensure idempotency
return len(alerts)
@flow
def collector():
log = get_run_logger()
alerts = fetch_siem_alerts()
n = write_alerts_to_db(alerts)
<http://log.info|log.info>(f"Inserted/updated {n} alerts")
Processor (every 5 minutes, no overlap, respect API limits)
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.concurrency.sync import concurrency, rate_limit
@task(retries=3, retry_delay_seconds=[5,15,30], timeout_seconds=30)
def call_api_vendor_x(alert):
with rate_limit("vendor-x"): # create this limit in Prefect UI
# TODO: call API X
return {...}
@task
def load_new_alerts():
# TODO: fetch only unprocessed/new alerts
return [...]
@task
def save_enriched(alert):
# TODO: write back to DB
return True
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def processor():
# ensure only one processor run at a time
with concurrency("processor-run", occupy=1, timeout_seconds=0):
alerts = load_new_alerts()
futs = [call_api_vendor_x.submit(a) for a in alerts]
results = [f.result() for f in futs]
for r in results:
save_enriched.submit(r)
How to run it on a schedule
- Create deployments with cron schedules and point them at your work pool. Example (programmatic):
from prefect.flows import Flow
from prefect.schedules import Cron
Flow.from_source(".", "collector:collector").deploy(
name="collector",
schedule=Cron("*/5 * * * *"), # every 5 minutes
work_pool_name="my-pool",
)
Flow.from_source(".", "processor:processor").deploy(
name="processor",
schedule=Cron("*/5 * * * *"), # or offset to start at :02
work_pool_name="my-pool",
)
- In Prefect Cloud, create:
- A global concurrency limit named processor-run
with limit 1
- A rate limit named vendor-x
matching the provider’s API policy
- Optional Automation: trigger the Processor deployment when the Collector run completes, instead of two separate cron jobs
Helpful references
- Global rate limits: Rate limit concepts
- Global concurrency limits: How to: concurrency limits
- Work pools/workers overview: Work pools & workers
- Automations overview: Automations & observabilityMarvin
09/23/2025, 6:07 PMShubham
09/23/2025, 6:09 PMMarvin
09/23/2025, 6:09 PMMarvin
09/23/2025, 6:11 PMShubham
09/23/2025, 7:27 PMMarvin
09/23/2025, 7:27 PMMarvin
09/23/2025, 7:28 PMclient_id
parameter to both flows.
- Attach one cron schedule per client to each deployment; each schedule sets parameters={"client_id": "<client>"}
.
- Optionally offset minutes per client (to spread load and avoid API bursts).
- Store per‑client config
- Put credentials/secrets in Prefect Cloud (Secrets/Variables) and look them up by client_id
.
- Enforce per‑client isolation
- Use a per‑client concurrency limit inside your flow so runs for the same client never overlap: concurrency(f"processor-{client_id}", occupy=1)
.
- Add per‑vendor API rate limits (e.g., rate_limit("vendor-x")
) so your 20+ clients don’t exceed external quotas.
- Cut schedules in half (optional)
- Keep only the Collector scheduled; use an Automation to trigger the Processor for the same client_id
when the Collector run completes and actually ingested new alerts.
Minimal code
Flows with client_id parameter and isolation
from prefect import flow, task
from prefect.concurrency.sync import concurrency, rate_limit
from prefect.task_runners import ThreadPoolTaskRunner
@task
def fetch_from_siem(client_id): ...
@task
def upsert_alerts(client_id, alerts): ...
@flow
def collector(client_id: str):
alerts = fetch_from_siem(client_id)
return upsert_alerts(client_id, alerts)
@task
def load_new_alerts(client_id): ...
@task
def enrich_with_vendor_x(alert):
with rate_limit("vendor-x"):
...
return alert
@task
def save_enriched(client_id, alert): ...
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def processor(client_id: str):
# ensure one processor per client at a time
with concurrency(f"processor-{client_id}", occupy=1, timeout_seconds=0):
alerts = load_new_alerts(client_id)
futs = [enrich_with_vendor_x.submit(a) for a in alerts]
for f in futs:
save_enriched.submit(client_id, f.result())
Deploy once per flow with many client schedules
from prefect.flows import Flow
from prefect.schedules import Cron
clients = ["acme", "beta", "charlie"] # build this list from your DB/config
collector_schedules = []
processor_schedules = []
# Stagger minutes: 0, 1, 2, ... to avoid bursts
for i, cid in enumerate(clients):
minute = i % 5 # example staggering within a 5m cadence
collector_schedules.append(Cron(f"{minute}/5 * * * *", parameters={"client_id": cid}))
processor_schedules.append(Cron(f"{(minute+2)%5}/5 * * * *", parameters={"client_id": cid}))
Flow.from_source(".", "collector:collector").deploy(
name="collector",
schedules=collector_schedules,
work_pool_name="your-pool",
)
Flow.from_source(".", "processor:processor").deploy(
name="processor",
schedules=processor_schedules,
work_pool_name="your-pool",
)
Why this keeps costs in check
- You avoid multiplying deployments; management overhead stays low with just 2 deployments.
- Prefect Cloud costs generally scale with run volume and your own compute usage, not the count of deployments. Reducing “empty” work (fast no‑op when no new alerts) and staggering schedules helps.
- One shared work pool and a small worker fleet can run everything; add horizontal workers only if throughput requires it.
Optional: replace the Processor schedules with an Automation
- When a Collector run completes and “new alerts > 0”, trigger the Processor deployment with the same client_id
. This halves your scheduled runs while keeping the workflow reactive.
Useful docs
- Deployments: Deployments overview
- Schedules (Cron/RRule/parameters): SchedulesMarvin
09/23/2025, 7:28 PM