Syméon del Marmol
10/01/2025, 9:47 AMmyFlow.deploy() ) to a work-pool whose type is "process" ?
I'm running workers with docker compose.
When I try to deploy the flows, it says it requires either an image or a source of code: with the "process" type, shouldn't it simply start a new process ?Marvin
10/01/2025, 9:48 AMMarvin
10/01/2025, 9:54 AMfrom prefect import flow, Flow
@flow
def my_flow():
...
Flow.from_source(
source="/opt/app", # path inside the worker container
entrypoint="flows/my_flow.py:my_flow", # file:callable
).deploy(
name="prod",
work_pool_name="my-process-pool",
)
- If you’d rather pull from Git (recommended when not mounting code), use a Git URL:
from prefect import flow, Flow
@flow
def my_flow():
...
Flow.from_source(
source="<https://github.com/acme/prefect-project>",
entrypoint="flows/my_flow.py:my_flow",
).deploy(
name="prod",
work_pool_name="my-process-pool",
)
Notes:
- The path or repo must be accessible from inside the worker container.
- With Process workers, your worker container must already have all Python dependencies installed; Prefect does not build an image per deployment for Process pools.
Option B: Switch to a Docker work pool and use an image
- If you prefer to bake code and dependencies into an image per deployment, use a Docker or Kubernetes work pool instead of Process and pass image=... to .deploy(...). This is not used by Process pools.
Why the error?
- Flow.deploy() requires Prefect to know how the worker will load your code at run time. For Process pools, that means the worker must be able to import your flow from a specified “source”; for Docker/K8s pools, an image is required. Hence the “image or source” requirement.
Helpful links
- Create deployments in Prefect 3
- Work pools and workers overview (blog)
- Agents to Workers migration (context)
Quick check
- Are you mounting your repo into the worker container via docker-compose? If so, what is the mount path inside the container? I can tailor the from_source(..., entrypoint=...) snippet to match your layout.
- If you’re using a private Git repo, do you already have credentials configured in the worker container environment?Syméon del Marmol
10/01/2025, 10:15 AMMarvin
10/01/2025, 10:15 AMMarvin
10/01/2025, 10:18 AMprefect work-pool create my-process-pool --type process # if not created
prefect worker start --pool my-process-pool
Example docker-compose service:
services:
prefect-flow-worker:
image: prefecthq/prefect:3-latest
command: bash -lc "prefect worker start --pool my-process-pool"
environment:
- PREFECT_API_URL=${PREFECT_API_URL}
- PREFECT_API_KEY=${PREFECT_API_KEY}
volumes:
- ./app:/opt/app # if your deployments use a local path source
- Scale by running more replicas of this service:
docker compose up -d --scale prefect-flow-worker=3
2) Task workers for background tasks created with .delay()
- Background tasks are executed by task workers, which you run as long-lived processes using prefect.task_worker.serve.
- Create a small runner script and run it as a service:
worker_task.py:
from prefect.task_worker import serve
from my_package.tasks import task_a, task_b # import the task objects you call with .delay()
if __name__ == "__main__":
serve(task_a, task_b, limit=20) # limit = max concurrent tasks per worker
docker-compose service:
services:
prefect-task-worker:
image: prefecthq/prefect:3-latest
working_dir: /opt/app
command: bash -lc "python worker_task.py"
environment:
- PREFECT_API_URL=${PREFECT_API_URL}
- PREFECT_API_KEY=${PREFECT_API_KEY}
volumes:
- ./app:/opt/app
- Scale task capacity by running more replicas or increasing `limit`:
docker compose up -d --scale prefect-task-worker=5
Notes for your setup
- For Process workers, your code must be available inside the worker container. If you used Flow.from_source("/opt/app", entrypoint=...), keep that path mounted. If you pointed at a Git URL, ensure the container can reach the repo (and any credentials).
- For background tasks, make sure the exact Task objects you call with .delay() are the ones passed to serve(...). If you’re dynamically renaming tasks via .with_options(name=...), that creates new task definitions. If you need per-run naming while keeping a stable task identity, consider using a task-run naming callback instead of changing the task’s name.
Helpful links
- Create deployments in Prefect 3
- Work pools and workers overview
If you can share your docker-compose snippet and how you’re calling .delay(), I can tailor the services and serve(...) call to match your layout and scaling goals.Syméon del Marmol
10/01/2025, 10:20 AMprefect worker start --pool my-pool?Marvin
10/01/2025, 10:20 AMMarvin
10/01/2025, 10:21 AMfrom prefect.workers import ProcessWorker
if __name__ == "__main__":
worker = ProcessWorker(
work_pool_name="my-pool",
# optional:
# work_queues=["high-priority"],
# name="my-python-worker",
# limit=5, # max concurrent flow runs this worker will execute
# create_pool_if_not_found=True,
)
worker.start(with_healthcheck=True) # runs forever, polling the pool
Notes:
- This is the programmatic equivalent of prefect worker start --pool my-pool.
- The worker must run in an environment where your flow code is available (e.g., your Docker Compose volume or the Git source you used in Flow.from_source(...)) and where `PREFECT_API_URL`/`PREFECT_API_KEY` are set if using Cloud.
- limit controls concurrency for flow runs on this worker.
- with_healthcheck=True starts a healthcheck server; you can omit it if you don’t need one.
If you want a quick Docker Compose snippet using Python:
services:
process-worker:
image: prefecthq/prefect:3-latest
working_dir: /opt/app
command: bash -lc "python start_worker.py"
environment:
- PREFECT_API_URL=${PREFECT_API_URL}
- PREFECT_API_KEY=${PREFECT_API_KEY}
volumes:
- ./app:/opt/app
Where start_worker.py is the script above.