batuan
10/06/2025, 7:34 PMMarvin
10/06/2025, 7:34 PMMarvin
10/06/2025, 7:39 PMfrom prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task(retries=3, retry_delay_seconds=5)
def fetch_record(i: int) -> int:
# Simulate IO work
return i * 2
@flow(name="example-flow", task_runner=ThreadPoolTaskRunner(max_workers=8))
def example_flow(n: int = 10) -> int:
# Run tasks in parallel with a thread pool
results = []
for i in range(n):
results.append(fetch_record.submit(i))
values = [r.result() for r in results]
return sum(values)
if __name__ == "__main__":
print(example_flow(5))
5) Subflows and secrets
- Blocks let you store config and credentials centrally
- Secrets avoid hardcoding sensitive data
from prefect import flow, task
from prefect.blocks.system import Secret
@task
def use_api_key() -> None:
api_key = Secret.load("my-api-key").get()
# use the key safely
@flow
def child():
use_api_key()
@flow
def parent():
child() # subflow call
- Links:
- Store secrets (blocks)
6) From script to production
- Create a deployment (new 3.x pattern)
- Run it on a worker connected to a work pool
- Add a schedule, concurrency limits, and triggers
Two common paths:
A) Programmatic deployment from source
from prefect.flows import Flow
Flow.from_source(
source="<https://github.com/your-org/your-repo>",
entrypoint="flows/etl.py:etl_flow", # path/to/file.py:function_name
).deploy(
name="etl-prod",
work_pool_name="docker-pool",
cron="0 * * * *", # run hourly
concurrency_limit=2, # per-deployment limit
tags=["prod"],
description="ETL hourly pipeline",
)
B) CLI (verified)
- Create deployment from local project entrypoint:
prefect deploy ./flows/etl.py:etl_flow --name etl-prod
- Create a work pool and start a worker:
prefect work-pool create "docker-pool" --type docker
prefect worker start --pool "docker-pool"
Note: For Docker/Kubernetes workers, install extras (in CI or local) like:
uv run --with 'prefect[docker]' prefect work-pool create "docker-pool" --type docker
7) Schedules and concurrency
- Add schedules (cron/interval/rrule) either in code via Flow.deploy(...) or in the UIMarvin
10/06/2025, 7:39 PMconcurrency_limit) or global limits
- Links:
- Global concurrency limits
8) Caching and retries
- Retries are task-level (e.g., retries=3, retry_delay_seconds=10)
- Caching can avoid re-computation for unchanged inputs
- Link: Caching in Prefect 3.x
9) Notifications and automations
- Use notification blocks (Slack, Teams, email) inside flows
- Use Automations to react to events (e.g., Flow Run Failed -> send Slack)
- CLI management (list/pause/resume) is available; create automations in the UI
prefect automation ls
prefect automation pause <AUTOMATION_ID>
prefect automation resume <AUTOMATION_ID>
- Notification blocks: Blocks & secrets and see block types in code below
Copy-paste examples by workflow type
A) Data pipeline (ETL)
- Parallel extraction, simple transform, load
- Retries on flaky IO, secrets for credentials, deployment with a schedule
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.blocks.system import Secret
@task(retries=3, retry_delay_seconds=5)
def extract(source: str) -> list[int]:
# Pretend to call an external API with an API key
api_key = Secret.load("etl-api-key").get()
return [1, 2, 3] # example data
@task
def transform(items: list[int]) -> list[int]:
return [x * 10 for x in items]
@task
def load(rows: list[int]) -> None:
print(f"Loaded {len(rows)} rows: {rows[:3]}...")
@flow(name="etl-pipeline", task_runner=ThreadPoolTaskRunner(max_workers=8))
def etl_flow(source: str = "api"):
raw = extract.submit(source)
clean = transform.submit(raw)
load.submit(clean)
if __name__ == "__main__":
etl_flow()
Programmatic deployment:
from prefect.flows import Flow
Flow.from_source(
source="<https://github.com/your-org/your-repo>",
entrypoint="flows/etl.py:etl_flow",
).deploy(
name="etl-hourly",
work_pool_name="docker-pool",
cron="0 * * * *", # hourly
concurrency_limit=1, # avoid overlapping runs
tags=["etl", "prod"],
)
B) ML workflow (train/evaluate)
- Parameters for hyperparameters
- Use a process pool for CPU-bound training
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
import time
@task
def load_data():
# stand-in for real dataset loading
X = list(range(1000))
y = [x % 2 for x in X]
return X, y
@task
def train_model(X, y, lr: float = 0.1):
time.sleep(2) # simulate training
return {"lr": lr, "accuracy": 0.9}
@task
def evaluate(model):
return model["accuracy"]
@flow(name="ml-train", task_runner=ProcessPoolTaskRunner())
def train_pipeline(lr: float = 0.1):
X, y = load_data()
model = train_model(X, y, lr)
acc = evaluate(model)
print(f"Accuracy: {acc}")
return acc
if __name__ == "__main__":
train_pipeline(0.05)
Programmatic deployment (daily retrain):
from prefect.flows import Flow
Flow.from_source(
source="<https://github.com/your-org/ml-repo>",
entrypoint="flows/train.py:train_pipeline",
).deploy(
name="ml-train-daily",
work_pool_name="docker-pool",
cron="0 2 * * *", # daily at 02:00
tags=["ml", "training"],
)
C) Business flow (notification/approval pattern)
- Send Slack notifications via a notification block
- Often paired with Automations for failure alerts in production
```
from prefect import flow, task
from prefect.blocks.notifications import SlackWebhook
@task
def generate_report() -> str:
return "Quarterly KPI report content"
@task
def notify_slack(message: str):
slack = SlackWebhook.load("alerts") # configure block in UI
slack.notify(message)
@flow(name="report-approval")
def business_flow():
report = generate_report()Marvin
10/06/2025, 7:39 PMProgrammatic deployment (weekday mornings):
from prefect.flows import Flow
Flow.from_source(
source="https://github.com/your-org/biz-flows",
entrypoint="flows/report.py:business_flow",
).deploy(
name="report-weekdays",
work_pool_name="docker-pool",
cron="0 9 * * 1-5", # Mon-Fri at 09:00
tags=["business", "reporting"],
)
```
Helpful library surface (confirmed)
- Task runners for parallelism:
- `prefect.task_runners`: ThreadPoolTaskRunner, ProcessPoolTaskRunner
- Notification blocks:
- `prefect.blocks.notifications`: SlackWebhook, MicrosoftTeamsWebhook, PagerDutyWebHook, TwilioSMS, SendgridEmail, etc.
- Schedules:
- `prefect.schedules`: Cron, Interval, RRule classes (or pass `cron=`/`interval=` to Flow.deploy)
- Concurrency:
- concurrency_limit at deployment time (see Flow.deploy signature)
- Global limits: Global concurrency limits
Additional resources
- Workflow design patterns with Prefect
- Caching in Prefect 3.x
- Task mapping at scale
- Work pools and workers blog
- Store secrets (blocks)
Questions to tailor this for you
- Who’s the audience (data engineers, ML, platform/ops)?
- How long is the talk?
- What runtime do you target (local, Docker, Kubernetes, Prefect Cloud)?
- Want this turned into a formatted slide deck (Google Slides, PPTX, or Markdown)?