Shubham
09/23/2025, 5:55 PMMarvin
09/23/2025, 5:55 PMMarvin
09/23/2025, 5:57 PMMarvin
09/23/2025, 5:57 PMShubham
09/23/2025, 6:02 PMMarvin
09/23/2025, 6:02 PMMarvin
09/23/2025, 6:07 PMCron("*/5 * * * *")concurrency("processor-run")rate_limit("vendor-x")ThreadPoolTaskRunner.submitfrom prefect import flow, task, get_run_logger
@task(retries=3, retry_delay_seconds=10, timeout_seconds=60)
def fetch_siem_alerts():
    # TODO: pull from SIEM
    return [...]
@task
def write_alerts_to_db(alerts):
    # TODO: upsert alerts; ensure idempotency
    return len(alerts)
@flow
def collector():
    log = get_run_logger()
    alerts = fetch_siem_alerts()
    n = write_alerts_to_db(alerts)
    <http://log.info|log.info>(f"Inserted/updated {n} alerts")from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.concurrency.sync import concurrency, rate_limit
@task(retries=3, retry_delay_seconds=[5,15,30], timeout_seconds=30)
def call_api_vendor_x(alert):
    with rate_limit("vendor-x"):  # create this limit in Prefect UI
        # TODO: call API X
        return {...}
@task
def load_new_alerts():
    # TODO: fetch only unprocessed/new alerts
    return [...]
@task
def save_enriched(alert):
    # TODO: write back to DB
    return True
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def processor():
    # ensure only one processor run at a time
    with concurrency("processor-run", occupy=1, timeout_seconds=0):
        alerts = load_new_alerts()
        futs = [call_api_vendor_x.submit(a) for a in alerts]
        results = [f.result() for f in futs]
        for r in results:
            save_enriched.submit(r)from prefect.flows import Flow
from prefect.schedules import Cron
Flow.from_source(".", "collector:collector").deploy(
    name="collector",
    schedule=Cron("*/5 * * * *"),  # every 5 minutes
    work_pool_name="my-pool",
)
Flow.from_source(".", "processor:processor").deploy(
    name="processor",
    schedule=Cron("*/5 * * * *"),  # or offset to start at :02
    work_pool_name="my-pool",
)processor-runvendor-xMarvin
09/23/2025, 6:07 PMShubham
09/23/2025, 6:09 PMMarvin
09/23/2025, 6:09 PMMarvin
09/23/2025, 6:11 PMShubham
09/23/2025, 7:27 PMMarvin
09/23/2025, 7:27 PMMarvin
09/23/2025, 7:28 PMclient_idparameters={"client_id": "<client>"}client_idconcurrency(f"processor-{client_id}", occupy=1)rate_limit("vendor-x")client_idfrom prefect import flow, task
from prefect.concurrency.sync import concurrency, rate_limit
from prefect.task_runners import ThreadPoolTaskRunner
@task
def fetch_from_siem(client_id): ...
@task
def upsert_alerts(client_id, alerts): ...
@flow
def collector(client_id: str):
    alerts = fetch_from_siem(client_id)
    return upsert_alerts(client_id, alerts)
@task
def load_new_alerts(client_id): ...
@task
def enrich_with_vendor_x(alert):
    with rate_limit("vendor-x"):
        ...
    return alert
@task
def save_enriched(client_id, alert): ...
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def processor(client_id: str):
    # ensure one processor per client at a time
    with concurrency(f"processor-{client_id}", occupy=1, timeout_seconds=0):
        alerts = load_new_alerts(client_id)
        futs = [enrich_with_vendor_x.submit(a) for a in alerts]
        for f in futs:
            save_enriched.submit(client_id, f.result())from prefect.flows import Flow
from prefect.schedules import Cron
clients = ["acme", "beta", "charlie"]  # build this list from your DB/config
collector_schedules = []
processor_schedules = []
# Stagger minutes: 0, 1, 2, ... to avoid bursts
for i, cid in enumerate(clients):
    minute = i % 5  # example staggering within a 5m cadence
    collector_schedules.append(Cron(f"{minute}/5 * * * *", parameters={"client_id": cid}))
    processor_schedules.append(Cron(f"{(minute+2)%5}/5 * * * *", parameters={"client_id": cid}))
Flow.from_source(".", "collector:collector").deploy(
    name="collector",
    schedules=collector_schedules,
    work_pool_name="your-pool",
)
Flow.from_source(".", "processor:processor").deploy(
    name="processor",
    schedules=processor_schedules,
    work_pool_name="your-pool",
)client_idMarvin
09/23/2025, 7:28 PM