Madushan Pathirana
08/31/2025, 3:59 AMMarvin
08/31/2025, 3:59 AMMarvin
08/31/2025, 4:14 AMprefect.yaml
to speed up local testing.
Key rule: When do you need to redeploy?
- Redeploy only when deployment metadata changes (entrypoint path or name, parameters defaults, schedule, work pool, etc.).
- If the deployment points at a stable source (local code, a Git branch, or a Docker tag that you rebuild), you can update the code and run again without redeploying.
Option A: Use your local code (no Docker) for fast iteration
- Run locally with a Process work pool and pull your code from your working directory or from Git. This avoids builds entirely.
Example prefect.yaml
using Git for source:
# prefect.yaml
name: my-project
pull:
- prefect.deployments.steps.pull.git_clone:
repository: <https://github.com/your-org/your-repo>
branch: dev
- prefect.deployments.steps.pull.set_working_directory:
directory: your-repo
deployments:
- name: dev
entrypoint: flows/etl.py:etl
work_pool:
name: local-process
Commands to create/start the local worker and deploy:
prefect work-pool create local-process --type process
prefect worker start -p local-process
prefect deploy
prefect deployment run "etl/dev"
Tip: You can also just run your flow directly for quick tests without a deployment:
python flows/etl.py # calls your @flow in __main__
Or use the Python API to serve ephemeral deployments during dev:
from prefect import flow, serve
@flow
def etl(): ...
if __name__ == "__main__":
serve(etl) # starts a local, ephemeral deployment
Option B: Stay with Docker but avoid redeploys by using a mutable dev tag
- Keep your deployment’s image
the same (e.g., my-repo/etl:dev
), rebuild that tag locally (or push to your registry), and just run the deployment again. You do not need to redeploy if the image tag string didn’t change.
- For remote pulls, set image_pull_policy
to ALWAYS
to ensure the worker pulls the latest tag before each run.
prefect.yaml
snippet:
deployments:
- name: dev
entrypoint: flows/etl.py:etl
work_pool:
name: local-docker
job_variables:
image: my-repo/etl:dev
image_pull_policy: "ALWAYS"
Option C: Docker with bind-mounted code (true hot reload without rebuilds)
- Mount your local source into the container the worker starts. This way, saving a file locally updates what runs in the container.
prefect.yaml
snippet:
deployments:
- name: dev
entrypoint: flows/etl.py:etl
work_pool:
name: local-docker
job_variables:
image: my-repo/etl:base # a base image with deps installed
image_pull_policy: "NEVER"
volumes:
- "${PWD}:/opt/project" # bind mount your repo
container_create_kwargs:
working_dir: "/opt/project" # run from the mounted repo
Create/start the Docker work pool and worker:
prefect work-pool create local-docker --type docker
prefect worker start -p local-docker
prefect deploy
What I recommend for your setup
- For fastest local iteration: Option C (Docker + volume mount) or Option A (Process worker + local code).
- If you prefer to keep using images: Option B (rebuild same dev tag, set image_pull_policy: ALWAYS
) — then you won’t need to redeploy on every change.
Docs for deeper reference:
- Prefect YAML and steps: prefect.yaml guide
- Deployment versioning patterns: deployment versioning
A couple of clarifying questions so I can tailor an exact prefect.yaml
for you:
- Do you want to keep using Docker for local testing, or are you open to a Process worker for dev?Marvin
08/31/2025, 4:14 AMMadushan Pathirana
08/31/2025, 4:42 AMMarvin
08/31/2025, 4:42 AMMarvin
08/31/2025, 4:42 AMprefect.yaml
in Prefect 3.x.
Option 1 (Recommended): Docker worker + bind-mounted code (hot reload)
- Build a base image with your dependencies only (not your flow code).
- Mount your local repo into the container the Docker worker starts. Your code changes are picked up instantly; no image rebuilds and no redeploys.
prefect.yaml
name: my-project
deployments:
- name: dev
entrypoint: flows/etl.py:etl
work_pool:
name: local-docker
job_variables:
image: your-repo/prefect-base:py3.11 # deps only, no app code
image_pull_policy: "NEVER"
# Mount your local repo into the container
volumes:
- "${PWD}:/opt/project"
container_create_kwargs:
working_dir: "/opt/project"
env:
# Point flows to your Server API (running on same EC2)
# On Linux, network_mode: host is easiest (see below)
PREFECT_API_URL: "<http://127.0.0.1:4200/api>"
# Optional, if you want containers to see the host network directly
network_mode: "host"
One-time setup
prefect work-pool create local-docker --type docker
prefect worker start -p local-docker
prefect deploy
Dev loop
- Edit your Python files locally
- Run the deployment again (no redeploy needed):
prefect deployment run "etl/dev"
Notes
- Use an absolute path instead of ${PWD} if you prefer (e.g., /home/ubuntu/my-repo:/opt/project).
- If you don’t use network_mode: host, set PREFECT_API_URL to your EC2’s private IP (e.g., http//<ec2 private ip>4200/api). On some Linux configs, host.docker.internal may not resolve.
- Your base image just needs Prefect and your libs installed. The code comes from the bind mount.
Option 2: Docker image with a mutable dev tag (no redeploys, just rebuild/pull)
- Keep the deployment’s image the same (e.g., your-repo/etl:dev).
- Rebuild that tag when you change code; the worker always pulls the latest tag.
prefect.yaml
name: my-project
deployments:
- name: dev
entrypoint: flows/etl.py:etl
work_pool:
name: local-docker
job_variables:
image: your-repo/etl:dev
image_pull_policy: "ALWAYS"
env:
PREFECT_API_URL: "<http://127.0.0.1:4200/api>"
network_mode: "host"
Dev loop
# Rebuild same tag
docker build -t your-repo/etl:dev .
# If worker pulls from a registry, push; if local-only, skip push
docker push your-repo/etl:dev
# No prefect redeploy needed; just run again
prefect deployment run "etl/dev"
When do you actually need to redeploy?
- Only when deployment metadata changes: name, entrypoint path, default params, schedule, work pool, etc.
- If the source of code (bind mount or image tag) stays the same string in the deployment, you can iterate without redeploying.
Links
- Prefect YAML guide: prefect.yaml
- Deployment versioning patterns: deployment versioning
Want me to tailor the exact prefect.yaml
to your repo layout and base image? If you share:
- Your base image name (or Dockerfile),
- The path to your flow entrypoint (e.g., flows/etl.py:etl),
- Whether you can use network_mode: host or prefer a static PREFECT_API_URL,
I’ll drop in a ready-to-use config.Madushan Pathirana
08/31/2025, 4:52 AMMarvin
08/31/2025, 4:52 AMMarvin
08/31/2025, 4:54 AMprefect deploy
for every change, the key is: rebuild the same image tag and keep your deployment metadata unchanged. Then you can run the deployment again with new code without redeploying.
Here’s a concrete setup and dev loop that should fit your workflow.
One-time setup
1) Create a Docker work pool and start a worker on your EC2 host:
prefect work-pool create local-docker --type docker
prefect worker start -p local-docker
2) Define a single deployment in prefect.yaml that points to a stable image tag:
name: my-project
deployments:
- name: dev
entrypoint: flows/etl.py:etl
work_pool:
name: local-docker
job_variables:
image: your-repo/etl:dev # mutable dev tag
image_pull_policy: "NEVER" # local image on same EC2 host
network_mode: "host"
env:
# Since your Server & worker are on the same EC2 host
PREFECT_API_URL: "<http://127.0.0.1:4200/api>"
3) Deploy once:
prefect deploy
Local dev loop (no redeploy needed)
- Rebuild the same tag locally when code changes:
docker build -t your-repo/etl:dev .
- Run your deployment with the updated code:
prefect deployment run "etl/dev" --watch
Because the deployment still references the same image tag and entrypoint, no prefect deploy
is needed for code changes. The Docker worker will launch containers from your freshly rebuilt local image.
Passing parameters with prefect deployment run
- Use multiple -p key=value
flags; values are parsed as JSON:
prefect deployment run "etl/dev" \
-p start_date='"2025-08-01T00:00:00Z"' \
-p runs=10 \
-p features='["alpha","beta"]' \
--watch
- You can also pass a JSON object with --params
(handy for complex/nested params):
prefect deployment run "etl/dev" \
--params '{"start_date":"2025-08-01T00:00:00Z","runs":10,"features":["alpha","beta"]}' \
--watch
- If you need to override job variables at run time (e.g., env for a single run), use `-jv`:
prefect deployment run "etl/dev" \
-jv env='{"LOG_LEVEL":"DEBUG"}'
Using the same prefect.yaml in CI/CD
- Keep a single prefect.yaml
and vary only environment-specific values using env vars.
- Example: use a dynamic image tag for CI:
deployments:
- name: dev
entrypoint: flows/etl.py:etl
work_pool:
name: local-docker
job_variables:
image: your-repo/etl:${IMAGE_TAG:-dev}
image_pull_policy: "ALWAYS" # in CI if pulling from registry
env:
PREFECT_API_URL: "${PREFECT_API_URL}"
- In CI:
- Build and push the image with an immutable tag (e.g., the commit SHA).
- Set IMAGE_TAG
and PREFECT_API_URL
in the pipeline environment.
- Run:
prefect deploy --name dev
- Workers will pull the new tag on next runs.
Notes for your single-EC2 setup
- With network_mode: "host"
, containers can talk to your Server via <http://127.0.0.1:4200/api>
from inside the container. If you don’t use host networking, set PREFECT_API_URL
to your EC2 host’s IP instead.
- Since your worker and Server run on the same host, image_pull_policy: "NEVER"
is ideal for local dev (no registry required). In CI, use "ALWAYS"
if your worker must pull a newly pushed image.
When do you actually need to redeploy?
- Only if deployment metadata changes: entrypoint path, deployment name, schedule, default parameters, work pool, or the image tag string in your YAML. If you keep the tag constant (e.g., your-repo/etl:dev
), you can rebuild and run without redeploying.
References
- Prefect YAML guide: prefect.yaml
- Deployment versioning patterns: deployment versioningMarvin
08/31/2025, 4:54 AMprefect.yaml
and a minimal Dockerfile that keeps dependencies in the image and makes your builds fast.Madushan Pathirana
08/31/2025, 4:58 AMMarvin
08/31/2025, 4:58 AMMarvin
08/31/2025, 4:59 AMprefect.yaml
examples to run flows directly from your local code in Prefect 3.x. Pick the one that fits your dev style.
Option 1: Local code with a Process work pool (simplest)
name: my-project
deployments:
- name: dev
description: "Run flows from local source code"
entrypoint: flows/etl.py:etl # path to your flow: file.py:function_name
parameters:
runs: 1 # optional defaults
work_pool:
name: local-process
job_variables:
# Run from your current repo directory
working_dir: "${PWD}"
Setup/run:
prefect work-pool create local-process --type process
prefect worker start -p local-process
prefect deploy
prefect deployment run "etl/dev" --watch
Option 2: Local code with a Docker work pool (bind mount your repo)
- Requires a base image with your Python deps preinstalled.
- Mount your local repo into the container so code changes pick up instantly.
name: my-project
deployments:
- name: dev
description: "Run flows from local source code mounted into Docker"
entrypoint: flows/etl.py:etl
work_pool:
name: local-docker
job_variables:
image: your-repo/prefect-base:py311 # deps-only base image
image_pull_policy: "NEVER" # using local image
# IMPORTANT: use an absolute host path to your repo
volumes:
- "/home/ubuntu/my-repo:/opt/project"
container_create_kwargs:
working_dir: "/opt/project"
env:
# Server and worker on same EC2 host
PREFECT_API_URL: "<http://127.0.0.1:4200/api>"
network_mode: "host"
Setup/run:
prefect work-pool create local-docker --type docker
prefect worker start -p local-docker
prefect deploy
prefect deployment run "etl/dev" --watch
Notes:
- Redeploy only when metadata changes (entrypoint, schedule, params defaults, work pool, image name, etc.). With the above, you can iterate on your local code without redeploying.
- For Docker, ensure your base image already has Prefect and your libraries installed; your code is provided via the bind mount.
- If you prefer a Git source later, you can add pull.git_clone
steps to prefect.yaml
.
Docs:
- Prefect YAML guide: prefect.yaml
- Work pool job variables (templates): use prefect work-pool get-default-base-job-template --type process|docker
to see available fields.Madushan Pathirana
08/31/2025, 5:07 AMMarvin
08/31/2025, 5:07 AMMarvin
08/31/2025, 5:08 AMprefect.yaml
. It looks like your last sentence got cut off — did you mean “I just want to deploy once and then iterate without redeploying”? I’ll assume yes and show you two tidy options using your Docker pool.
What needs fixing in your current YAML
- For Docker workers, working_dir
is not a valid job variable; set container_create_kwargs.working_dir
instead.
- push: false
under job_variables
is not a valid Docker worker field; remove it.
- Your pull
step path is off; if you keep it, it should be prefect.deployments.steps.pull.set_working_directory
, but you don’t need a pull step if your code is in the image or bind-mounted.
- If you’re rebuilding the dev image locally on the same EC2 host as the worker, set image_pull_policy: "NEVER"
. If you push to ECR and want workers to pull each run, use "ALWAYS"
.
Below are two clean patterns you can copy-paste.
A) Keep code in the image; rebuild the same tag for changes (no redeploys)
- Rebuild :dev
locally when code changes; do not change the tag string in the deployment.
- Run the deployment again; no prefect deploy
needed after the first time.
name: src
prefect-version: 3.4.11
# Build/push used in CI or when you want to refresh the image (not needed for every dev run)
build:
- prefect_docker.deployments.steps.build_docker_image:
id: build_image
requires: prefect-docker>=0.3.1
image_name: <http://010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect|010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect>
tag: dev
dockerfile: Dockerfile.prefect_flows
push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker>=0.3.1
image_name: "{{ build_image.image_name }}"
tag: dev
deployments:
- name: ebay
entrypoint: extraction/ebay/ebay.py:run
parameters: {"seller_id":"d480fd76-f59e-4ea0-95cb-4e22db0164e0"}
work_pool:
name: my-docker-pool-2
work_queue_name: default
job_variables:
image: "{{ build_image.image }}" # resolves to .../credilinq-prefect:dev
image_pull_policy: "NEVER" # local build on same EC2 host
network_mode: "host"
env:
PREFECT_API_URL: "<http://127.0.0.1:4200/api>"
Dev loop:
# Rebuild same tag locally (no redeploy)
docker build -t <http://010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect:dev|010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect:dev> -f Dockerfile.prefect_flows .
# Run the deployment with updated code
prefect deployment run "run/ebay" --watch
B) Mount local code into the container for instant updates (no image rebuilds)
- Use your base image with deps; mount your repo to override code in the container.
name: src
prefect-version: 3.4.11
deployments:
- name: local
entrypoint: extraction/ebay/ebay.py:run
parameters: {"seller_id":"d480fd76-f59e-4ea0-95cb-4e22db0164e0"}
work_pool:
name: my-docker-pool-2
work_queue_name: default
job_variables:
image: <http://010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect:dev|010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect:dev>
image_pull_policy: "NEVER"
# IMPORTANT: absolute path to your repo on the EC2 host
volumes:
- "/home/ubuntu/src:/opt/prefect/src"
container_create_kwargs:
working_dir: "/opt/prefect/src"
network_mode: "host"
env:
PREFECT_API_URL: "<http://127.0.0.1:4200/api>"
Dev loop:
- Edit code locally in /home/ubuntu/src
- Run again (no redeploy, no image rebuild):
prefect deployment run "run/local" --watch
Passing parameters at run time
- Use -p key=value
(values are JSON) multiple times:
prefect deployment run "run/ebay" \
-p seller_id='"d480fd76-f59e-4ea0-95cb-4e22db0164e0"' \
-p start_date='"2025-08-01T00:00:00Z"' \
-p runs=10 \
--watch
- Or pass one JSON object:
prefect deployment run "run/ebay" \
--params '{"seller_id":"d480fd76-f59e-4ea0-95cb-4e22db0164e0","runs":10}' \
--watch
Using the same prefect.yaml for CI/CDMarvin
08/31/2025, 5:08 AMprefect deploy
to update the deployment if you change metadata or want to rebuild/push images.
- You can parameterize the image tag via env (e.g., ${IMAGE_TAG:-dev}
) and set image_pull_policy: "ALWAYS"
in CI if workers need to pull from ECR.
Helpful references
- Prefect YAML guide: prefect.yaml
- Deployment versioning patterns: deployment versioning
- See default job variables for Docker pools:
prefect work-pool get-default-base-job-template --type docker
If you confirm:
- the absolute path to your repo on the EC2 host,
- whether you want A (image rebuild) or B (bind mount),
I’ll finalize your prefect.yaml
accordingly.Madushan Pathirana
08/31/2025, 5:10 AMMarvin
08/31/2025, 5:10 AMMarvin
08/31/2025, 5:11 AMprefect.yaml
, prefect deploy -n local
still runs the project-level build/push steps, which triggers ECR auth. In Prefect 3.x, the build
and push
sections apply to the whole project, not per-deployment. So selecting a single deployment by name doesn’t skip those steps.
Two clean ways to avoid ECR for your “local” deployment:
A) Keep a separate local-only Prefect file (recommended)
- Create a second file (e.g., prefect.local.yaml
) that contains only your local deployment and no build
or push
sections. Then use --prefect-file
when deploying.
Example `prefect.local.yaml`:
name: src
prefect-version: 3.4.11
deployments:
- name: local
entrypoint: extraction/ebay/ebay.py:run
parameters: {"seller_id":"d480fd76-f59e-4ea0-95cb-4e22db0164e0"}
work_pool:
name: my-docker-pool-2
work_queue_name: default
job_variables:
image: <http://010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect:dev|010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect:dev>
image_pull_policy: "NEVER" # don't pull from ECR; use local image
network_mode: "host"
env:
PREFECT_API_URL: "<http://127.0.0.1:4200/api>"
# Optional: set working dir inside container if your code lives there
container_create_kwargs:
working_dir: "/opt/prefect/src"
Dev loop:
# Build the image locally on the same EC2 host as the worker
docker build -t <http://010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect:dev|010614160260.dkr.ecr.ap-southeast-1.amazonaws.com/credilinq-prefect:dev> -f Dockerfile.prefect_flows .
# Deploy ONLY the local deployment (no build/push)
prefect deploy --prefect-file prefect.local.yaml
# Run it (updated code is in the locally built image)
prefect deployment run "run/local" --watch
B) Remove build/push from your existing file when developing locally
- Temporarily comment out the build:
and push:
sections in prefect.yaml
while developing locally, then re-enable them in CI.
- This works but is easy to forget; the separate-file approach is safer.
Also, quick fixes to your current YAML:
- For Docker workers, working_dir
must go under container_create_kwargs.working_dir
(not at top-level of job_variables
).
- Remove push: false
under `job_variables`; that’s not a valid Docker job variable.
Passing parameters with prefect deployment run
- Use -p key=value
(value is JSON) repeatedly:
prefect deployment run "run/local" \
-p seller_id='"d480fd76-f59e-4ea0-95cb-4e22db0164e0"' \
-p runs=10 \
--watch
- Or pass a single JSON object:
prefect deployment run "run/local" \
--params '{"seller_id":"d480fd76-f59e-4ea0-95cb-4e22db0164e0","runs":10}' \
--watch
- You can override job variables at run time using -jv
if needed:
prefect deployment run "run/local" \
-jv env='{"LOG_LEVEL":"DEBUG"}'
Docs for reference:
- Prefect YAML guide: prefect.yaml
If you want, paste your desired “local” directory path inside the container (e.g., /opt/prefect/src
) and whether your Dockerfile copies code there. I’ll adjust the container_create_kwargs
and confirm you won’t hit ECR for local runs.Madushan Pathirana
08/31/2025, 5:33 AMMarvin
08/31/2025, 5:33 AMMarvin
08/31/2025, 5:34 AM${PWD}
is not expanded inside prefect.yaml
— Prefect treats it as a literal string. For a Process worker, set working_dir
to a real path on the host.
Two easy fixes:
Fix A: Use an absolute path
name: src
prefect-version: 3.4.11
deployments:
- name: local
description: "Run flows from local source code"
entrypoint: extraction/ebay/ebay.py:run
parameters: {"seller_id":"d480fd76-f59e-4ea0-95cb-4e22db0164e0"}
work_pool:
name: local-worker
work_queue_name: default
job_variables:
working_dir: "/home/ubuntu/src" # replace with your repo’s absolute path
Fix B: Use Jinja env interpolation (portable across hosts)
- Set an environment variable, then reference it in the YAML:
export REPO_DIR=/home/ubuntu/src # or your repo path
prefect.yaml:
name: src
prefect-version: 3.4.11
deployments:
- name: local
description: "Run flows from local source code"
entrypoint: extraction/ebay/ebay.py:run
parameters: {"seller_id":"d480fd76-f59e-4ea0-95cb-4e22db0164e0"}
work_pool:
name: local-worker
work_queue_name: default
job_variables:
working_dir: "{{ env.REPO_DIR }}"
Notes
- Ensure the working_dir
exists on the machine where the Process worker runs and contains your flow code. The entrypoint
path is resolved relative to working_dir
.
- Create the pool if you haven’t:
prefect work-pool create local-worker --type process
prefect worker start -p local-worker
- Deploy and run:
prefect deploy --name local
prefect deployment run "run/local" --watch
Passing parameters at run time
- Use -p key=value
(value is JSON) or `--params`:
prefect deployment run "run/local" \
-p seller_id='"d480fd76-f59e-4ea0-95cb-4e22db0164e0"' \
-p runs=3 \
--watch
prefect deployment run "run/local" \
--params '{"seller_id":"d480fd76-f59e-4ea0-95cb-4e22db0164e0","runs":3}' \
--watch
Reference: prefect.yaml
If you share the actual path to your repo on the EC2 instance, I’ll drop in a verified YAML snippet with that path.Madushan Pathirana
08/31/2025, 5:49 AMMarvin
08/31/2025, 5:49 AMMarvin
08/31/2025, 5:51 AMget_marketplace
and get_form
are plain functions, they will run sequentially. Decorate them with @task
.
2) Use a task runner and submit concurrently
- For I/O-bound work (DB/API), use ThreadPoolTaskRunner.
- For CPU-bound work, use ProcessPoolTaskRunner.
- Use the .submit(...)
pattern to start tasks concurrently, then .result()
to wait.
Example (I/O-bound, ThreadPool)
from datetime import datetime, timedelta
from prefect import flow, task, get_run_logger
from prefect.task_runners import ThreadPoolTaskRunner
def get_db_connection(env: str):
# ... create and return a new DB connection ...
...
@task
def get_marketplace(seller_id: str, selling_partner_id: str, app_env: str):
conn = get_db_connection(app_env) # make connection inside the task
try:
# ... do work ...
return "marketplace-done"
finally:
conn.close()
@task
def get_form(seller_id: str, selling_partner_id: str, app_env: str):
conn = get_db_connection(app_env)
try:
# ... do work ...
return "form-done"
finally:
conn.close()
@flow(name="EBAY", flow_run_name="{seller_id}", log_prints=True,
task_runner=ThreadPoolTaskRunner(max_workers=8))
def run(seller_id: str):
logger = get_run_logger()
APP_ENV = "dev"
selling_partner_id = "ed442813-afe7-4026-bc14-190ca41bfb23"
today = datetime.utcnow()
start_month_date = today - timedelta(days=365)
start_date = start_month_date.replace(day=1).date()
today_formatted = today.strftime("%Y-%m-%dT00:00:01.000Z")
start_date_formatted = start_date.strftime("%Y-%m-%dT00:00:01.000Z")
# Kick off tasks concurrently
m_fut = get_marketplace.submit(seller_id, selling_partner_id, APP_ENV)
f_fut = get_form.submit(seller_id, selling_partner_id, APP_ENV)
# Wait for both to finish (and propagate exceptions)
m_res = m_fut.result()
f_res = f_fut.result()
<http://logger.info|logger.info>(f"Results: {m_res}, {f_res}")
Notes
- Don’t share a live DB connection object between tasks. Create it inside each task (as shown). This avoids issues with threads/processes and pickling.
- Increase/decrease max_workers
to tune concurrency.
- If your work is CPU-bound, switch to `ProcessPoolTaskRunner`:
from prefect.task_runners import ProcessPoolTaskRunner
@flow(..., task_runner=ProcessPoolTaskRunner(max_workers=4))
def run(...):
...
With process pools, every argument passed to tasks must be picklable; keep that in mind.
- If you want to fire a dynamic number of parallel tasks (e.g., over a list), loop with .submit(...)
and then gather:
futures = [my_task.submit(x) for x in items]
results = [f.result() for f in futures]
If you share whether these tasks are DB/API-heavy or CPU-heavy, I can suggest exact max_workers
and the best runner for your EC2 box.Madushan Pathirana
08/31/2025, 5:59 AMcache_key_fn
2. Disable caching by passing cache_policy=NO_CACHE
Marvin
08/31/2025, 5:59 AMMarvin
08/31/2025, 6:00 AMfrom prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.cache_policies import NO_CACHE
def get_db_connection(env: str):
...
@task(cache_policy=NO_CACHE) # optional: disables caching for this task
def get_marketplace(seller_id: str, selling_partner_id: str, app_env: str):
conn = get_db_connection(app_env)
try:
# do work...
return "marketplace-done"
finally:
conn.close()
@task(cache_policy=NO_CACHE)
def get_form(seller_id: str, selling_partner_id: str, app_env: str):
conn = get_db_connection(app_env)
try:
# do work...
return "form-done"
finally:
conn.close()
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def run(seller_id: str):
app_env = "dev"
m = get_marketplace.submit(seller_id, "ed442813-afe7-4026-bc14-190ca41bfb23", app_env)
f = get_form.submit(seller_id, "ed442813-afe7-4026-bc14-190ca41bfb23", app_env)
m.result(); f.result()
B) Keep passing a handle, but tell Prefect not to use it for caching
- Define a custom cache_key_fn that ignores un-serializable args, or disable caching on those tasks.
Custom cache key:
from prefect.cache_policies import CacheKeyFnPolicy
def key_ignoring_conn(ctx, inputs):
# Only use serializable inputs
return f"{inputs.get('seller_id')}|{inputs.get('selling_partner_id')}"
@task(cache_policy=CacheKeyFnPolicy(cache_key_fn=key_ignoring_conn))
def get_marketplace(seller_id, selling_partner_id, conn):
...
Simplest: disable caching on the affected tasks
@task(cache_policy=NO_CACHE)
def get_marketplace(...):
...
Summary
- Don’t pass DB connections, clients, file handles, locks, etc. as task inputs.
- Prefer creating connections inside each task (thread-safe) and/or disable caching with cache_policy=NO_CACHE
.
- The warning is harmless, but these changes will remove it and avoid overhead.
If you’d like, paste your current task signatures and I’ll show the minimal edits to make them parallel-safe and cache-error-free.Madushan Pathirana
08/31/2025, 6:07 AMMarvin
08/31/2025, 6:07 AMMarvin
08/31/2025, 6:08 AMAPP_ENV
as a parameter to every task. Use one of these patterns:
Option 1: Prefect Variables (centralized, runtime-configurable)
- Set once (CLI/UI) and read from tasks/flows with Variable.get()
.
- Great for environment names, bucket names, etc.
Set it:
prefect variables set APP_ENV=dev
Use it in code:
from prefect import flow, task
from prefect.variables import Variable
@task
def get_form(seller_id: str, selling_partner_id: str):
app_env = Variable.get("APP_ENV", default="dev")
conn = get_db_connection(app_env)
try:
df_sellers = pd.read_sql_query("SELECT ...", conn)
print(df_sellers)
finally:
conn.close()
Option 2: OS environment variable (provided by work pool job variables)
- Set PREFECT__CONTEXT__ENV
or a custom APP_ENV
in your deployment job_variables.env
, then read os.getenv
in code.
prefect.yaml (Process worker):
deployments:
- name: local
entrypoint: extraction/ebay/ebay.py:run
work_pool:
name: local-worker
job_variables:
working_dir: "/home/ubuntu/src"
env:
APP_ENV: "dev"
Use it:
import os
from prefect import task
@task
def get_form(seller_id, selling_partner_id):
app_env = os.getenv("APP_ENV", "dev")
conn = get_db_connection(app_env)
...
Option 3: Flow-level constant via Prefect context
- Compute once in the flow and pass only to tasks that need it, or set a module-level constant.
- Still avoids passing many parameters if only a few tasks need it.
from prefect import flow, task
@task
def get_form(seller_id, selling_partner_id, app_env):
conn = get_db_connection(app_env)
...
@flow
def run(seller_id: str):
app_env = "dev" # or read Variable.get / os.getenv once here
get_form.submit(seller_id, "ed4...", app_env)
Recommendation for your case
- Use Option 1 (Prefect Variable) or Option 2 (env var via job_variables.env). Both let your tasks create their own DB connection without threading/pickling issues and without repeating APP_ENV
everywhere.
- If you already maintain a per-environment Prefect Server, Variables are nice because you can change APP_ENV centrally in the UI/CLI without code changes.
Docs:
- Variables: Prefect Variables
- Customizing job variables: Customize job variables
If you share which you prefer (Prefect Variable vs env var), I can refactor your snippet accordingly.