Robert Szomolanyi
09/02/2025, 11:14 AMMarvin
09/02/2025, 11:14 AMMarvin
09/02/2025, 11:17 AMDeployment.build_from_flow()
or `prefect deployment build`; use flow.from_source(...).deploy(...)
- Example:
from prefect import flow
@flow(retries=3, retry_delay_seconds=30)
def etl_flow(param: str = "default"):
# your logic
return "ok"
if __name__ == "__main__":
# Register/Update a deployment from source control
flow.from_source(
source="<https://github.com/org/repo>",
entrypoint="flows/etl.py:etl_flow",
).deploy(
name="etl-prod",
work_pool_name="k8s-prod",
image="<http://ghcr.io/org/etl:latest|ghcr.io/org/etl:latest>",
parameters={"param": "value"},
tags=["prod"],
# schedule=..., cron="0 * * * *", etc.
)
- Links:
- Prefect 3.2 release notes
- Prefect 3 GA blog
- Work pools and workers
- Work pools define the execution environment (e.g., local, Docker, Kubernetes, ECS) and default job variables (image, CPU/memory, etc.)
- Workers poll a pool for scheduled/queued runs and launch them on the target infra; scale by running more workers
- Links:
- Work pools & workers overview
- Execution options
- Local: simple dev/testing
- Docker/Kubernetes/ECS: production-grade isolation and scalability
- Patterns:
- Use Docker images for reproducibility
- Use Kubernetes/ECS for autoscaling and isolation per run
- Links:
- Docker infrastructure
- Submit flows directly to dynamic infrastructure
2) Scalability and Reliability
- Horizontal scale
- Add workers to the same pool to increase throughput
- Partition work by pools (e.g., k8s-prod, k8s-staging), tags, and parameters
- Concurrency and rate limits
- Global concurrency limits for shared resources or APIs
- Queue-level and deployment-level controls; combine with tags
- Link: Global concurrency limits
- Retries and backoff
- Configure at task/flow level; combine with idempotency and external locks
- Link: Retries
- Caching and results reuse
- Cache task/flow outputs based on inputs, code version, and parameters to avoid rework
- Link: Caching concepts
- Scheduling and event triggers
- Time-based: cron/interval calendars
- Event-based: react to upstream events or prior run outcomes with triggers/automations
- Link: Event triggers
3) Security
- Secrets and blocksMarvin
09/02/2025, 11:17 AMflow.from_source(...).deploy(...)
in CI
- Version pinning via Docker tags and deployment versions; promote by changing pool or image tag
- Environment separation
- Separate workspaces or pools for dev/staging/prod; parameterize environment per deployment
- Scheduling and promotion
- Start in staging with more logs, then promote to prod by re-deploying or retargeting the work pool
- Notifications and runbooks
- Automations notify Slack/PagerDuty/Email on failures or SLAs
- Helpful references:
- CI/CD patterns blog
5) Observability
- UI views
- Runs, deployments, pools, workers; drill into task/flow states and durations
- Logs
- Structured logs streamed to UI and API; include correlation IDs and state transitions
- Metrics and timelines
- Run-level timings, success/failure rates, backoff behavior
- Notifications and automations
- Route critical events to Slack/PagerDuty; add run links for fast triage
- References:
- Observability and metrics blog
6) Audit (Cloud)
- What’s captured
- Authentication/authorization events, workspace membership changes, block creation/updates, deployment changes, automations, and run operations
- How to use it
- Filter by time, actor, resource; export for compliance and forensics
- Link:
- Audit logs in Prefect Cloud
7) Reference Architecture Patterns
- GitHub → CI (build image, tests) → flow.from_source().deploy() → Work Pool → Worker on K8s/ECS
- Use private container registry; pull images inside the cluster
- Secrets via blocks or cloud secret manager; service account for CI deploy step
- Global concurrency limits for shared databases/APIs
- Automations for failure notifications and SLA breach alerts
- Multi-env
- Workspaces or dedicated pools per env
- Images tagged by env/version; same code, different parameters
- Data governance
- Run workloads inside data perimeter (no data egress)
- Rely on event triggers to decouple pipelines and reduce tight coupling
8) What’s Changed from Prefect 2.x to 3.x (Highlights)
- Deployment build removed; use flow.from_source(...).deploy(...)
- CLI prefect deployment build
removed; use prefect deploy
- GitHub storage blocks removed; use .from_source("<https://github.com/owner/repo>")
- Workers and work pools are the standard way to execute runs
Appendix: Quick Examples
- Basic flow with retries and caching
from datetime import timedelta
from prefect import flow, task
@task(retries=3, retry_delay_seconds=10, cache_expiration=timedelta(hours=1))
def extract(x: int) -> int:
return x * 2
@task
def transform(y: int) -> int:
return y + 1
@flow
def pipeline(n: int = 5) -> int:
a = extract(n)
return transform(a)
if __name__ == "__main__":
pipeline()
- Concurrency limit strategyMarvin
09/02/2025, 11:17 AMfrom prefect import flow, task
@task(tags=["shared-db"])
def write_row(i: int):
...
@flow
def bulk_load():
for i in range(100):
write_row.submit(i)
- Then set a concurrency limit for tag shared-db
in the UI (or via API/CLI)
- Link: Global concurrency limits
- Deploy from source in CI
# in a CI step with PREFECT_API_URL and PREFECT_API_KEY set (Cloud) or local server URL
python - <<'PY'
from prefect import flow
@flow
def nightly():
...
flow.from_source(
source="<https://github.com/org/repo>",
entrypoint="flows/nightly.py:nightly",
).deploy(
name="nightly-prod",
work_pool_name="k8s-prod",
image="<http://ghcr.io/org/pipelines:nightly|ghcr.io/org/pipelines:nightly>",
cron="0 2 * * *",
tags=["prod", "nightly"],
)
PY
Key Links (for the team’s packet)
- Prefect 3 GA: GA announcement
- Prefect 3.2 release notes: Release notes
- Work pools & workers: Overview
- Docker infra: Docker
- Dynamic infra: Dynamic infra
- Concurrency limits: Concurrency
- Caching: Caching
- Retries: Retries
- Event triggers: Triggers
- Secrets: Secrets
- SSO/SAML: SSO
- Audit logs: Audit logs
- Observability blog: Observability
- CI/CD blog: CI/CD
Questions so I can tailor this for your architecture team
- Prefect Cloud or self-hosted?
- Primary runtime: Kubernetes, ECS, Docker, or hybrid?
- Secret management preference: Prefect blocks, cloud secret manager, or both?
- Need SSO/RBAC/audit (i.e., enterprise/compliance requirements)?
- Current CI/CD platform (GitHub Actions, GitLab, Jenkins, etc.)?
If you share those, I’ll convert this into slide-ready content with environment-specific examples and any required CLI steps.Robert Szomolanyi
09/03/2025, 7:21 AMMarvin
09/03/2025, 7:21 AMMarvin
09/03/2025, 7:27 AMFlow.from_source(...).deploy(...)
works in Prefect 3.x.
High-level flow
- from_source: Tells Prefect where your flow code lives and which function to load.
- deploy: Creates/updates a Deployment in the Prefect API, attaching schedules/triggers, default parameters, and the execution infrastructure (via a work pool). Optionally builds/pushes a Docker image.
What from_source accepts
- Signature: Flow.from_source(source, entrypoint)
- `source`: a string path/URL or a storage object
- Local path (e.g., "."
, "./project"
)
- Git URL (e.g., "<https://github.com/org/repo>"
). For private repos, ensure the worker has access via SSH keys or tokens (env vars), or pass a storage object that handles auth.
- A ReadableDeploymentStorage
or RunnerStorage
object for advanced/remote storage.
- `entrypoint`: path/to/file.py:flow_function_name
inside that source
References
- Code: prefect/flows.py
- Work pools/workers overview: Blog: Work pools and workers
- Project config: prefect.yaml
- Docker deployment how-to: Deploy with Docker
What deploy does
- Signature (abridged): Flow.deploy(name, work_pool_name=None, image=None, build=True, push=True, work_queue_name=None, job_variables=None, interval=None, cron=None, rrule=None, paused=None, schedule=None, schedules=None, concurrency_limit=None, triggers=None, parameters=None, tags=None, version=None, ...)
- Registers/updates the deployment with the Prefect API and returns the deployment ID (UUID)
- Binds the deployment to a work pool and (optionally) queue
- Applies default parameters, tags, description, version
- Adds a schedule with interval
, cron
, rrule
, or a prebuilt `Schedule`/list of schedules
- Adds event-based triggers (automations) if provided
- If image
is set, build=True/push=True
will build and push the Docker image; otherwise no image is built
- job_variables
let you pass worker-specific runtime settings (env, memory, resources), depending on the worker type
Runtime behavior
- When a flow run is created for the deployment, a worker in the specified work pool:
- Fetches the source (clone/pull if Git, read local path, or use the supplied storage object)
- Loads your flow using entrypoint
- Executes it within the worker’s runtime (e.g., process, Docker, Kubernetes), applying job_variables
and environment
Auth for source
- For private Git: ensure your worker has credentials (SSH keys on disk/agent, or tokens via environment variables). You can also supply a storage object implementing auth. Avoid hardcoding tokens in URLs.
- For local paths: the worker must have filesystem access to that path.
- For advanced/remote storage: use a ReadableDeploymentStorage
object so Prefect can read with the right credentials.
Examples
1) Local source, simple schedule
from prefect import Flow
Flow.from_source(
source=".", # current directory
entrypoint="flows/etl.py:etl"
).deploy(
name="etl-prod",
work_pool_name="process-pool",
cron="0 * * * *", # hourly
parameters={"dataset": "sales"},
tags=["prod"]
)
2) GitHub source + Docker image + job variables
```
from prefect import Flow
from prefect.docker import DockerImage
Flow.from_source(
source="https://github.com/acme/data-pipelines",
entrypoint="flows/forecast.py:forecast"
).deploy(
name="forecast-docker",
work_pool_name="docker-pool",
image=DockerImage(name="ghcr.io/acme/forecast", tag="2025-09-03"),
build=True,
push=True,
cron="30 2 * * *", # daily at 02:30
job_variables={
"env": {"PREFECT_LOGGING_LEVEL": "DEBUG"},
# worker-specific knobs like resources/labels vary by worker type
}Marvin
09/03/2025, 7:27 AM3) Multiple schedules and triggers
from prefect import Flow, schedules, events
Flow.from_source(
source=".",
entrypoint="flows/report.py:report"
).deploy(
name="reporting",
work_pool_name="kubernetes-pool",
schedules=[
schedules.Cron("0 9 * * MON-FRI"), # weekdays at 09:00
schedules.Interval(6 * 60 * 60), # every 6 hours
],
triggers=[
events.RunDeployment.on_event("my.system.data_ready"),
]
)
```
Differences vs Prefect 2.x
- 2.x Deployment.build_from_flow()
and prefect deployment build
are removed in 3.x.
- Use Flow.from_source(...).deploy(...)
or the CLI prefect deploy
instead.
- Storage blocks like “GitHub” blocks are replaced by direct source references or storage objects; prefer .from_source('<https://github.com/owner/repo>')
.
Optional CLI alternative
- You can also run prefect deploy
from a project with a prefect.yaml
. For structure and options, see prefect.yaml.
If you share your environment (process vs Docker vs K8s, public vs private Git, etc.), I can tailor a ready-to-run snippet for your setup.Robert Szomolanyi
09/03/2025, 8:01 AMMarvin
09/03/2025, 8:01 AMMarvin
09/03/2025, 8:02 AMfrom prefect import Flow
from prefect.docker import DockerImage
Flow.from_source(
source="<https://github.com/acme/pipelines>",
entrypoint="flows/etl.py:etl"
).deploy(
name="etl-prod",
work_pool_name="docker-pool",
image=DockerImage(name="<http://ghcr.io/acme/etl|ghcr.io/acme/etl>", tag="2025-09-03"),
build=True,
push=True,
)
- Docs: Deploy with Docker
- Use a persistent local checkout with a Process worker
- Keep the repo checked out on the worker host and deploy from a local path (e.g., source="/srv/prefect/code"
). Your own cron/script can git fetch --prune
periodically. This avoids network pulls on each run, but requires that path to exist on every worker.
- Add a repo cache via volumes/entrypoint for Docker/Kubernetes
- Mount a persistent volume (Docker volume or K8s PVC) and use an entrypoint/init step that does a git fetch
if the repo exists, git clone
otherwise. You’ll need to wire this via worker-specific job variables (env, volumes, command/entrypoint) and your container image.
- This is not built-in; it’s an infra pattern you control.
- If your “remote source” is object storage (S3/GS/Azure)
- Consider packaging your project as an artifact (e.g., tarball) and using a caching proxy/CDN or fsspec’s local cache in your own download code (if you implement a custom ReadableDeploymentStorage
). Prefect does not expose a first-class “source cache directory” toggle.
Other tips
- Pin to a commit/ref in Git to ensure reproducibility; this doesn’t cache, but it keeps fetches small (shallow clone of a fixed ref) and makes runs deterministic.
- If clone times are dominated by network, a local Git mirror/proxy (e.g., on the same VPC) can help.
References
- Code API for source/deploy: prefect/flows.py
- Project configuration: prefect.yaml
- Work pools and workers overview: Blog: Work pools and workers
If you can share:
- Which worker type (process, docker, kubernetes)?
- Where your sources live (GitHub, S3, etc.) and how frequently runs start?
I can suggest a concrete caching pattern for your setup (volume mounts, job_variables, or an example entrypoint script).Robert Szomolanyi
09/03/2025, 8:05 AMMarvin
09/03/2025, 8:05 AMMarvin
09/03/2025, 8:08 AMfrom_source
at a wheel or virtualenv, but you can absolutely run deployments where your flow code is already packaged and installed in the runtime environment (as a wheel inside a venv or container image). Prefect won’t manage the wheel/venv for you; it just needs the worker’s environment to be able to import your flow.
Recommended patterns
- Package code into a container image (best for portability/perf)
- Build an image that installs your wheel (or your package from PyPI/Git).
- Deploy with that image so workers import your code directly; no repo pulls at run time.
- Example:
from my_pkg.flows import my_flow
from prefect.docker import DockerImage
my_flow.deploy(
name="my-flow",
work_pool_name="docker-pool",
image=DockerImage(
name="<http://ghcr.io/acme/myflow|ghcr.io/acme/myflow>",
tag="0.3.0", # pin your app version
dockerfile="Dockerfile" # your Dockerfile runs: pip install my_pkg-0.3.0-py3-none-any.whl
),
build=True,
push=True,
)
- Docs: Deploy with Docker
- Use a Process worker with a pre-installed wheel
- Install your wheel into the Python environment that runs the worker.
- Import your flow from the installed package and deploy:
# on the worker host (once): pip install my_pkg-0.3.0-py3-none-any.whl
from my_pkg.flows import my_flow
my_flow.deploy(
name="my-flow",
work_pool_name="process-pool",
tags=["prod"]
)
- Every worker that might pick up this deployment must have the same package version installed.
- Kubernetes worker variant
- Same as Docker: build/push an image with your wheel installed and reference it in image=...
when you deploy.
Notes and limitations
- Flow.from_source(...)
expects a directory/repository-like source; it does not accept wheels or virtualenvs as “source”.
- Prefect does not install wheels at runtime. If you want on-the-fly installs, do it in your container entrypoint/init (e.g., pip install from your artifact store) or maintain a persistent environment on the worker host.
- For frequently run deployments, images are the most efficient route: layers are cached by registries/nodes and you avoid pulling code each run.
If you can share which worker type you’re using (process, docker, kubernetes) and how you distribute your wheel, I can provide a concrete setup (Dockerfile snippet, deployment code, or worker config) tailored to your environment.