MUHAMMAD ZEESHAN
08/26/2025, 9:48 AMMarvin
08/26/2025, 9:49 AMMarvin
08/26/2025, 9:56 AMpip install -U prefect
prefect config set PREFECT_API_URL=<https://prefect.happyraisins.com/api>
prefect config view
2) Identify and wrap functions as tasks
- Wrap idempotent, side-effectful, or expensive functions in @task
- Add retries, retry delays, and timeouts
- Use get_run_logger()
for structured logs
Example refactor:
from datetime import timedelta
from prefect import task, flow, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
@task(retries=3, retry_delay_seconds=30, timeout_seconds=600, log_prints=True)
def extract(source: str) -> list[dict]:
# ... your extraction logic ...
return [{"id": 1, "value": "a"}]
@task
def transform(rows: list[dict]) -> list[dict]:
# ... your transform logic ...
return rows
@task(persist_result=True)
def load(rows: list[dict], env: str) -> int:
# ... your load logic (e.g., DB writes) ...
return len(rows)
@flow(name="etl-flow", task_runner=ThreadPoolTaskRunner())
def etl_flow(env: str = "prod", source: str = "<s3://bucket/path>"):
logger = get_run_logger()
<http://logger.info|logger.info>(f"Starting ETL for env={env} from {source}")
rows = extract(source)
rows_t = transform(rows)
count = load(rows_t, env)
<http://logger.info|logger.info>(f"Loaded {count} records")
return count
if __name__ == "__main__":
# Local smoke test
etl_flow(env="staging", source="<s3://dev-bucket/path>")
3) Parameterize
- Make environment, date range, and other tunables flow parameters (so you can create multiple deployments with different params)
4) Add resilience
- Timeouts and retries (already shown)
- Optional: caching or persist_result=True
if you need result retrieval or caching
- Optional: concurrency limits via work pool or deployment config
5) Local test
- Run your flow locally in the app’s venv to verify behavior and logging
Helpful docs:
- Create deployments (Prefect 3.x)
- prefect.yaml reference
Part 2 — Production deployments with repeatable/multiple deployments
You have two good options in Prefect 3.x. Both support running deployments repeatedly (idempotent “update” behavior) and creating multiple deployments (e.g., staging and prod) from the same code.
Option A: Programmatic deployments (recommended for CI/CD)
Use Flow.from_source(...).deploy(...)
in a deployment script. Point at your Git repo and entrypoint so workers always pull the right code version.
deploy.py
```
from prefect.flows import Flow
def deploy_all():
# Replace with your org/repo and entrypoint path
source = "https://github.com/YOUR-ORG/YOUR-REPO.git"
entrypoint = "app/flows/etl.py:etl_flow"
# Production deployment (scheduled hourly)
Flow.from_source(
source=source,
entrypoint=entrypoint,
).deploy(
name="etl-prod",
work_pool_name="YOUR-POOL-NAME", # existing pool on server1
parameters={"env": "prod"},
tags=["prod"],
cron="0 * * * *", # hourly
description="Production ETL deployment",
version=None, # optionally set a version label (e.g., git SHA)
)
# Staging deployment (manual trigger)
Flow.from_source(
source=source,
entrypoint=entrypoint,
).deploy(
name="etl-staging",
work_pool_name="YOUR-POOL-NAME",
parameters={"env": "staging"},
tags=["staging"],Marvin
08/26/2025, 9:56 AMThen run:
python deploy.py
prefect deployment ls
Notes:
- Private repos: set up Git credentials (tokens/SSH). Prefect supports supplying credentials; see the Git storage docs from the deployment guide above.
- Re-running `deploy.py` updates your deployments (safe for “multiple times deployments”).
- In CI, add a job that (1) installs Prefect, (2) sets `PREFECT_API_URL=<https://prefect.happyraisins.com/api>`, (3) runs `python deploy.py`.
Option B: CLI-managed deployments via prefect.yaml
Use `prefect init` to create a project config, define multiple deployments, then run `prefect deploy`.
From your repo root:
prefect init
Edit `prefect.yaml` to define multiple deployments for your entrypoint and pool, then:
prefect deploy
prefect deployment ls
CLI references we verified:
- `prefect deploy` help
- `prefect init` help
Hardening your workers on server2 (systemd)
Run your workers under systemd so they restart automatically and survive reboots.
Example unit file at /etc/systemd/system/prefect-worker.service:
[Unit]
Description=Prefect worker - ec2-process
After=network-online.target
Wants=network-online.target
[Service]
# Set your API URL
Environment="PREFECT_API_URL=https://prefect.happyraisins.com/api"
# Optionally set other environment vars needed by your flows
# Use the same user/venv that has Prefect installed
User=ubuntu
Group=ubuntu
WorkingDirectory=/home/ubuntu
ExecStart=/home/ubuntu/.venv/bin/prefect worker start -p YOUR-POOL-NAME --name server2-worker
Restart=always
RestartSec=5
KillSignal=SIGINT
[Install]
WantedBy=multi-user.target
Enable and start:
sudo systemctl daemon-reload
sudo systemctl enable prefect-worker
sudo systemctl start prefect-worker
sudo systemctl status prefect-worker
```
Extra production recommendations
- Versioning: include a version label (e.g., git SHA) in deploy(..., version="...")
to track what’s running.
- Secrets/config: store in environment variables on the worker or use Prefect Variables/Blocks as appropriate.
- Concurrency and schedules: set deployment schedules and concurrency limits per deployment or work pool.
- Notifications: configure a Slack/Email notification block and attach an Automation for failures.
- Observability: Use the Prefect UI at https://prefect.happyraisins.com to monitor runs, logs, and retries.
Questions to finalize
- What’s your repo URL and flow entrypoint path (e.g., app/flows/etl.py:etl_flow
)?
- What is the exact name of your existing work pool on server1?
- Do you want schedules for prod, and should staging be manual-only?
Once I have those, I can give you a ready-to-run deploy.py
and, if you prefer, a prefect.yaml
with multiple deployments.
Docs links:
- Deployments in Prefect 3.x
- prefect.yaml configuration