Darius Tan
09/08/2025, 9:40 AMdynamic_pipeline_flow
(Main Flow): This is the orchestrator. It loops through a list of module configurations and calls the run_module
task for each one, passing the output artifact from the previous step to the next.
• run_module
(Task): This is a hybrid executor responsible for running a single module.
◦ In-Process Mode (Working): It dynamically imports and runs the specified Python module. It correctly handles loading input artifacts from a URI and saving the output artifact.
◦ Docker Mode (Broken): It attempts to trigger a separate Prefect deployment (module-docker-subflow-deployment
) as a subflow. This implementation is incomplete.
• docker_subflow
(Subflow): This is a placeholder flow intended to execute a module inside a Docker container. It is currently unimplemented.
• API & Deployments: A FastAPI endpoint (/pipelines/run
) triggers new pipeline runs.
◦ On startup, the application creates two deployments: one for the main flow targeting a process-based worker pool (local-pool
) and one for the Docker subflow targeting a Docker-based worker pool (local-docker-pool
).
◦ The API endpoint currently always submits runs to the main process-based deployment
Key Problems & Questions
Based on the architecture described above, please provide code examples and explanations to solve the following problems:
1. Implementing the Docker Subflow Call: In the run_module
task, the call to run_deployment
for Docker execution is incomplete. How do I correctly pass all necessary parameters (module_path
, params
, upstream_data
) to the subflow deployment, and how do I retrieve the resulting artifact URI from the completed subflow run to pass to the next module?
2. docker_subflow
Implementation: The docker_subflow
is currently empty. What should its implementation look like? Specifically, how to:
◦ Receive the module_path
and other parameters?
◦ Load the input artifact using the upstream_data
URI from within its Docker container?
◦ Dynamically execute the specified module?
◦ Save the output as a new artifact and return its URI to the parent flow?
3. Correctly Using Deployments: The /pipelines/run
endpoint hardcodes the main deployment. Is this the correct approach? Confirm if the intended pattern is for the dynamic_pipeline_flow
to always be the entry point, which then correctly delegates tasks to different worker pools (process vs. Docker) via subflow calls.Marvin
09/08/2025, 9:40 AMMarvin
09/08/2025, 9:47 AMprefect.deployments.run_deployment
to submit the subflow to the Docker pool, then wait for completion with wait_for_flow_run
and pull the return value from the flow’s terminal state with get_state_result
.
Key imports:
- prefect.deployments.run_deployment
- prefect.flow_runs.wait_for_flow_run
- prefect.states.get_state_result
Example task (async) showing both in-process and Docker modes:
from typing import Any, Dict, Optional
import importlib
from prefect import task, get_run_logger
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result
def dynamic_import(module_path: str):
# module_path format: "package.module:callable_name"
mod_path, func_name = module_path.split(":")
module = importlib.import_module(mod_path)
return getattr(module, func_name)
def load_artifact(uri: Optional[str]) -> Any:
if uri is None:
return None
# Implement your artifact loader here (local, S3, etc.)
# For example:
# if uri.startswith("s3://"): ...
# else: open(local_path) ...
raise NotImplementedError("load_artifact is env-specific")
def save_artifact(obj: Any) -> str:
# Implement your artifact writer and return a URI
# For example: write to S3/local and return "s3://..." or "file://..."
raise NotImplementedError("save_artifact is env-specific")
@task
async def run_module(
module_path: str,
params: Dict[str, Any],
upstream_data: Optional[str],
mode: str = "process",
) -> str:
"""
Returns a URI string for the output artifact produced by the module.
"""
logger = get_run_logger()
if mode == "process":
<http://logger.info|logger.info>(f"Running module in-process: {module_path}")
func = dynamic_import(module_path)
input_obj = load_artifact(upstream_data)
output_obj = func(input_obj, **params)
out_uri = save_artifact(output_obj)
return out_uri
elif mode == "docker":
<http://logger.info|logger.info>(f"Running module in Docker via deployment: {module_path}")
# Submit to the Docker subflow deployment with all needed parameters
subflow_run = await run_deployment(
name="module-docker-subflow-deployment",
parameters={
"module_path": module_path,
"params": params,
"upstream_data": upstream_data,
},
as_subflow=True, # keep parent/child relationship in UI
# timeout=None means do not wait here; we'll wait using wait_for_flow_run below
)
# Wait for completion
finished_subflow = await wait_for_flow_run(
subflow_run.id, log_states=True
)
# Extract the subflow return value (artifact URI)
artifact_uri = get_state_result(finished_subflow.state)
if not isinstance(artifact_uri, str):
raise ValueError("Docker subflow did not return a string artifact URI")
return artifact_uri
else:
raise ValueError(f"Unknown mode: {mode}")
Notes
- The parameters for run_deployment
go in the parameters
dict exactly as you’d call the subflow.
- wait_for_flow_run
blocks until the flow reaches a terminal state.Marvin
09/08/2025, 9:47 AMget_state_result(finished_subflow.state)
gives you the value returned by the subflow (i.e., your artifact URI). This requires the subflow result to be retrievable (more on this in section 2).
References:
- Create deployments
- Customize job variables
2) Implementing docker_subflow
This flow runs inside your Docker worker. It needs to:
- Accept module_path
, params
, upstream_data
- Load input data from the URI
- Dynamically import and run the specified module
- Save the output as an artifact and return the artifact URI (a string)
Example:
from typing import Any, Dict, Optional
import importlib
from prefect import flow, get_run_logger
def dynamic_import(module_path: str):
mod_path, func_name = module_path.split(":")
module = importlib.import_module(mod_path)
return getattr(module, func_name)
def load_artifact(uri: Optional[str]) -> Any:
if uri is None:
return None
# Same implementation contract as in-process
raise NotImplementedError("load_artifact is env-specific in Docker too")
def save_artifact(obj: Any) -> str:
# Same implementation contract as in-process
raise NotImplementedError("save_artifact is env-specific in Docker too")
@flow(name="module-docker-subflow")
def docker_subflow(
module_path: str,
params: Dict[str, Any],
upstream_data: Optional[str],
) -> str:
"""
Executes a single module inside a Docker container and returns the output artifact URI.
"""
logger = get_run_logger()
<http://logger.info|logger.info>(f"Docker subflow starting: {module_path}")
# Ensure the container has access to the upstream_data storage (e.g., S3 creds, mounted volumes, etc.)
input_obj = load_artifact(upstream_data)
func = dynamic_import(module_path)
output_obj = func(input_obj, **params)
out_uri = save_artifact(output_obj)
<http://logger.info|logger.info>(f"Docker subflow completed; artifact URI: {out_uri}")
# Return the URI as the flow's return value; parent flow will fetch this via get_state_result
return out_uri
Important considerations
- Code availability: Your Docker image must contain your project code (so importlib.import_module
works). With Prefect 3.x deployments from source, the Docker worker can clone/pull your repo automatically; otherwise bake code into the image.
- Credentials and networking: Make sure the container can read upstream_data
and write the new artifact (e.g., S3 credentials via environment variables; host volumes; network access).
- Returning the URI: Returning a small string is ideal. The parent can reliably read this via get_state_result
. If you return large objects, configure a result store; not necessary for a short URI string.
3) Correctly using deployments and the FastAPI entrypoint
- Yes: the FastAPI endpoint should trigger the main pipeline deployment. The main flow orchestrates per-module execution and can call the Docker subflow deployment whenever a module is marked for Docker.
Programmatically create both deployments on startup (recommended in Prefect 3.x):
```
from prefect import flow
from prefect.flows import Flow
# Assume both flows live in this repo and these entrypoints are correct
# Adjust source/entrypoint to your code layout.
def create_deployments():
# Main orchestrator deployment -> process pool
Flow.from_source(
source="https://github.com/your-org/your-repo", # or "." if local + remote storage
entrypoint="app/pipeline.py:dynamic_pipeline_flow",
).deploy(
name="dynamic-pipeline",
work_pool_name="local-pool",
# Optional: default parameters
# parameters={"default_param": "value"},
# Optional: tags=["prod"],
)
# Docker subflow deployment -> docker pool
Flow.from_source(
source="https://github.com/your-org/your-repo",Marvin
09/08/2025, 9:47 AMFastAPI endpoint that triggers the main flow deployment:
from fastapi import FastAPI
from pydantic import BaseModel
from prefect.deployments import run_deployment
app = FastAPI()
class ModuleConfig(BaseModel):
module_path: str
params: dict
mode: str # "process" or "docker"
class RunRequest(BaseModel):
modules: list[ModuleConfig]
initial_artifact: str | None = None
@app.on_event("startup")
async def on_startup():
# Optionally: create_deployments() if you programmatically manage them at app startup
pass
@app.post("/pipelines/run")
async def run_pipeline(req: RunRequest):
# Important: use the full identifier "Flow Name/Deployment Name" if you have multiple
fr = await run_deployment(
name="dynamic_pipeline_flow/dynamic-pipeline",
parameters={
"modules": [m.dict() for m in req.modules],
"initial_artifact": req.initial_artifact,
},
# as_subflow=False because this is a top-level run
)
return {"flow_run_id": str(fr.id)}
And your main flow should iterate the modules and call `run_module`:
from typing import Any, Dict, List, Optional
from prefect import flow
@flow(name="dynamic_pipeline_flow")
def dynamic_pipeline_flow(
modules: List[Dict[str, Any]],
initial_artifact: Optional[str] = None,
) -> str:
"""
modules: [{"module_path": "...", "params": {...}, "mode": "process"|"docker"}, ...]
Returns final artifact URI.
"""
current_uri = initial_artifact
for i, mod in enumerate(modules):
current_uri = run_module.submit(
module_path=mod["module_path"],
params=mod.get("params", {}),
upstream_data=current_uri,
mode=mod.get("mode", "process"),
).result()
return current_uri
```
Common gotchas and tips
- Use as_subflow=True
in run_deployment
calls for child runs so the UI shows parent/child relationships and logs get grouped.
- Results: returning a small string from the Docker subflow is best. The parent uses get_state_result
to read it. If you ever return larger data across containers, configure result storage appropriately.
- Parameters must be JSON-serializable. Pass URIs and standard types — not complex objects.
- Docker job variables: place everything the container needs there (image, env for credentials, volumes).
- Source-of-truth: With Flow.from_source(...).deploy(...)
, the worker will pull your repo at run time. Ensure entrypoints are correct and code is importable.
Docs
- Create deployments
- Customize job variables
If you can share your current run_module
and any helper loaders/savers, I can tailor these snippets exactly to your code paths (e.g., S3 vs. local filesystem).Darius Tan
09/08/2025, 10:10 AMMarvin
09/08/2025, 10:10 AMMarvin
09/08/2025, 10:13 AMjob_variables
to run_deployment
. This lets each module/run use a different container image without creating new deployments.
Example: pass image/env/volumes per module
# In your module config, allow docker overrides
{
"module_path": "your_pkg.mod:run",
"params": {"x": 1},
"mode": "docker",
"docker": {
"image": "<http://ghcr.io/you/pipeline-mod-a:2025-09-08|ghcr.io/you/pipeline-mod-a:2025-09-08>",
"env": {"EXTRA_FLAG": "1"},
"volumes": ["host_path:/container_path:rw"]
}
}
# In run_module task
@task
async def run_module(module_path, params, upstream_data, mode="process", docker: dict | None = None) -> str:
if mode == "process":
...
elif mode == "docker":
subflow_run = await run_deployment(
name="module-docker-subflow-deployment",
parameters={
"module_path": module_path,
"params": params,
"upstream_data": upstream_data,
},
job_variables=(docker or {}), # <-- override image/env/volumes per run
as_subflow=True,
)
finished = await wait_for_flow_run(subflow_run.id, log_states=True)
return get_state_result(finished.state)
else:
raise ValueError(f"Unknown mode: {mode}")
Why this works
- Deployments define the flow entrypoint and default infra. Work pools + job variables control the “how to run.” In Prefect 3.x you can override those per flow run submission via run_deployment(..., job_variables=...)
. No redeploy needed.
- This is ideal when modules have different dependencies or when you build/push an image per submission.
Where to put the code and dependencies
- If your modules live in your repo: use a generic base image (Python + system deps), deploy the flow with Flow.from_source(...).deploy(...)
, and let Prefect pull your repo at run time. Then you can still override the image to pick different Python/system stacks for different modules.
- If modules change outside the repo: build/push a purpose-built image per submission (CI or on-demand) and pass its tag via job_variables["image"]
.
Operational tips
- Make sure your Docker work pool’s worker can pull images from your registry.
- Keep job_variables
small and focused: commonly image
, env
, volumes
. Defaults can be set on the work pool or deployment; overrides at run time take precedence.
- Security: if you don’t want arbitrary images at run time, lock down your pool/deployment defaults and only allow specific overrides.
End-to-end pattern recap
- FastAPI endpoint triggers the main orchestrator deployment (correct approach).
- The main flow iterates over modules. For each module in Docker mode, it:
- Passes module_path
, params
, upstream_data
to the subflow deployment.
- Overrides Docker job_variables
with the module’s desired image/env/volumes.
- Waits for completion and reads the returned artifact URI.
Docs
- Customize job variables
- Create deployments
If you want, share how you plan to select/build the image per module (e.g., naming/tagging scheme), and I can suggest a concrete structure for your module config and CI step that pairs cleanly with the job_variables
override.Darius Tan
09/08/2025, 10:29 AMMarvin
09/08/2025, 10:29 AMMarvin
09/08/2025, 10:30 AM# Module config
{
"module_path": "your_pkg.mod_a:run",
"params": {"x": 1},
"mode": "docker",
"docker": {
"image": "python:3.11-slim",
"volumes": ["/abs/path/to/your/project:/app:ro"],
"env": {"PYTHONPATH": "/app"},
# Optional: set working dir via env or your flow code
}
}
# In run_module
@task
async def run_module(..., docker: dict | None = None) -> str:
if mode == "docker":
job_vars = (docker or {})
subflow = await run_deployment(
name="module-docker-subflow-deployment",
parameters={
"module_path": module_path,
"params": params,
"upstream_data": upstream_data,
},
job_variables=job_vars, # volumes/env/image passed here
as_subflow=True,
)
finished = await wait_for_flow_run(subflow.id, log_states=True)
return get_state_result(finished.state)
Pros
- No image builds per run
- Always uses your current local code
Cons
- Works only when the Docker worker is on the same machine and can mount your host path
Option B: Build/pull image dynamically per run
- Before calling run_deployment, ensure the desired image exists on the worker’s Docker host:
- If image tag provided and present locally, use it
- If not present:
- Pull it (if allowed), or
- Build it from a Dockerfile in your module’s directory
- If the worker is remote, push the image to a registry and reference the registry tag
Minimal utilities using docker SDK (local build/pull)
```
# pip install docker
import docker
from docker.errors import ImageNotFound
_docker = docker.from_env()
def image_exists_locally(tag: str) -> bool:
try:
_docker.images.get(tag)
return True
except ImageNotFound:
return False
def pull_image(tag: str):
_docker.images.pull(tag)
def build_image(context_dir: str, dockerfile: str, tag: str, buildargs: dict | None = None, platform: str | None = None):
_docker.images.build(
path=context_dir,
dockerfile=dockerfile,
tag=tag,
buildargs=buildargs or {},
platform=platform,
)
def ensure_image_available(docker_cfg: dict) -> str:
"""
docker_cfg accepts:
- image: str | None
- pull: bool (default True)
- dockerfile: str | None
- context: str | None
- build_args: dict | None
- platform: str | None
- push: bool (default False) # requires registry auth if True
- push_tag: str | None # registry tag to push to and use
Returns an image tag that exists locally (or in registry for remote workers).
"""
image = docker_cfg.get("image")
pull_ok = docker_cfg.get("pull", True)
dockerfile = docker_cfg.get("dockerfile")
context_dir = docker_cfg.get("context")
build_args = docker_cfg.get("build_args")
platform = docker_cfg.get("platform")Marvin
09/08/2025, 10:30 AMUse it before calling the Docker subflow:
@task
async def run_module(..., docker: dict | None = None) -> str:
if mode == "docker":
# Prepare or resolve the image to use
image_to_use = ensure_image_available(docker or {})
job_vars = {
"image": image_to_use,
# forward additional runtime settings
**{k: v for k, v in (docker or {}).items() if k in ("env", "volumes")}
}
subflow = await run_deployment(
name="module-docker-subflow-deployment",
parameters={"module_path": module_path, "params": params, "upstream_data": upstream_data},
job_variables=job_vars,
as_subflow=True,
)
finished = await wait_for_flow_run(subflow.id, log_states=True)
return get_state_result(finished.state)
```
Key considerations
- Same Docker daemon: If your FastAPI/orchestrator and Docker worker use the same local Docker daemon, a local build (via docker SDK) is immediately usable by the worker.
- Remote workers: If the Docker worker runs on a different host, you must push the built image to a registry accessible by that worker and pass the registry tag in job_variables.
- Volume mounting vs builds: For local-only dev flows, mounting your code into a generic base image is simpler and very fast. For reproducible or remote execution, build/push images per module/version.
- Module_path with local code: for volume-mount approach, set PYTHONPATH to your mount (e.g., /app) and import dynamically inside docker_subflow just like in-process.
- Security: Only allow trusted Dockerfile paths and registries; avoid arbitrary inputs becoming build context without validation.
Your docker_subflow stays the same
- It only needs to import and run the requested module using the image/environment you provided; it does not need to know whether you mounted the code or baked it into the image.
Docs
- Job variables overview: Customize job variables
- Deployment creation pattern: Create deployments
If you can confirm:
- Is your Docker worker on the same machine as your FastAPI app?
- Do you want to rely on volume mounts (local dev) or prefer building/pushing images (reproducible/remote)?
I can tailor the code to exactly match your module config shape (e.g., auto-tag scheme, Dockerfile lookup like “<module_dir>/Dockerfile”, and default volumes/env).Darius Tan
09/08/2025, 10:43 AMMarvin
09/08/2025, 10:43 AMMarvin
09/08/2025, 10:44 AMDarius Tan
09/09/2025, 10:22 AMjob_variables = {
"image": docker_config.get("image", "python:3.10-slim"),
"env": {
"PYTHONPATH": "/app",
**docker_config.get("env", {})
},
"volumes": volumes,
"pull_policy": "Never" # Force use of local image
}
# Submit to Docker deployment (worker will create container)
subflow_run = await run_deployment(
name=f"{DOCKER_FLOW_NAME}/{DOCKER_DEPLOYMENT_NAME}",
parameters={
"module_path": module_path,
"params": params,
"upstream_data": upstream_data,
"target_uri": target_uri
},
job_variables=job_variables, # This configures the container
as_subflow=True,
)
i am currently facing this error
Failed to submit flow run 'b1b444ce-9a3a-49a1-92a0-304bdd6bc9b9' to infrastructure.
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/docker/api/client.py", line 275, in _raise_for_status
response.raise_for_status()
File "/usr/local/lib/python3.10/site-packages/requests/models.py", line 1026, in raise_for_status
raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 404 Client Error: Not Found for url: <http+docker://localhost/v1.49/images/create?tag=latest&fromImage=example_deploy_flow>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/workers/base.py", line 1262, in _submit_run_and_capture_errors
result = await self.run(
File "/usr/local/lib/python3.10/site-packages/prefect_docker/worker.py", line 457, in run
container, created_event = await run_sync_in_worker_thread(
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 233, in run_sync_in_worker_thread
result = await anyio.to_thread.run_sync(
File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 56, in run_sync
return await get_async_backend().run_sync_in_worker_thread(
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 2405, in run_sync_in_worker_thread
return await future
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 914, in run
result = context.run(func, *args)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 243, in call_with_mark
return call()
File "/usr/local/lib/python3.10/site-packages/prefect_docker/worker.py", line 727, in _create_and_start_container
self._pull_image(docker_client, configuration)
File "/usr/local/lib/python3.10/site-packages/prefect_docker/worker.py", line 876, in _pull_image
return docker_client.images.pull(image, tag)
File "/usr/local/lib/python3.10/site-packages/docker/models/images.py", line 464, in pull
pull_log = self.client.api.pull(
File "/usr/local/lib/python3.10/site-packages/docker/api/image.py", line 429, in pull
self._raise_for_status(response)
File "/usr/local/lib/python3.10/site-packages/docker/api/client.py", line 277, in _raise_for_status
raise create_api_error_from_http_exception(e) from e
File "/usr/local/lib/python3.10/site-packages/docker/errors.py", line 39, in create_api_error_from_http_exception
raise cls(e, response=response, explanation=explanation) from e
docker.errors.ImageNotFound: 404 Client Error for <http+docker://localhost/v1.49/images/create?tag=latest&fromImage=example_deploy_flow>: Not Found ("pull access denied for example_deploy_flow, repository does not exist or may require 'docker login': denied: requested access to the resource is denied")
Marvin
09/09/2025, 10:22 AMMarvin
09/09/2025, 10:24 AMjob_variables = {
"image": docker_config.get("image", "python:3.10-slim"),
"env": {
"PYTHONPATH": "/app",
**docker_config.get("env", {})
},
"volumes": volumes,
"image_pull_policy": "Never" # correct key
}
subflow_run = await run_deployment(
name=f"{DOCKER_FLOW_NAME}/{DOCKER_DEPLOYMENT_NAME}",
parameters={
"module_path": module_path,
"params": params,
"upstream_data": upstream_data,
"target_uri": target_uri
},
job_variables=job_variables,
as_subflow=True,
)
2) Confirm your deployment and work pool defaults
- If your deployment was created with a Docker image (e.g., via DockerImage(name="example_deploy_flow", tag="latest")), that’s the image you’re seeing.
- Overrides from run_deployment(job_variables=...) should still win, but if they aren’t, double-check:
- You’re calling the correct deployment (Flow/Deployment pair).
- You’re on Prefect 3.x with prefect-docker installed on the worker.
- Your Docker worker belongs to the work pool targeted by that deployment.
- You can inspect the deployment in the UI (Deployments tab) and also review the work pool’s default job variables.
3) If you truly want to use a local image on the worker
- Make sure the image exists on the worker’s Docker daemon with the exact name:tag you pass.
- With image_pull_policy="Never", the worker will not pull; if the image is missing locally, the run will fail. If you need “pull if missing,” use image_pull_policy="IfNotPresent".
4) If the worker is on a different host than your API/orchestrator
- “Local” means local to the worker’s Docker daemon. If you build images on another host, push them to a registry and reference the registry tag in job_variables["image"].
5) Quick debugging steps
- In the Prefect UI, open the failed flow run and check:
- Flow run > Details > Infrastructure: does it show image=python:3.10-slim and image_pull_policy=Never? If not, the override didn’t apply.
- Worker logs: they’ll print the chosen image and pull policy.
- If you can’t access the UI, you can also run:
- prefect deployment inspect "YourFlow/YourDeployment" (to see deployment defaults)
- Make sure your job_variables dict only uses keys the Docker worker understands: commonly image, image_pull_policy, env, volumes, (and optionally others like networks). See: Customize job variables
Why the error says “pull access denied…example_deploy_flow”
- That’s the image configured on your deployment or pool, likely created during an earlier “build with DockerImage(name='example_deploy_flow')”.Marvin
09/09/2025, 10:24 AMDarius Tan
09/10/2025, 2:21 AMMarvin
09/10/2025, 2:21 AMMarvin
09/10/2025, 2:22 AMfrom prefect import flow, task, get_run_logger
@task
def do_work():
logger = get_run_logger()
<http://logger.info|logger.info>("Task is running inside the Docker container")
# your code...
@flow
def docker_subflow(module_path: str, params: dict, upstream_data: str | None) -> str:
logger = get_run_logger()
<http://logger.info|logger.info>("Subflow started in Docker")
# your code...
<http://logger.info|logger.info>("Subflow completed")
return "artifact://..."
- If your dynamic modules use Python’s standard logging, make sure their logs propagate so Prefect can capture them. Optionally set an env var to capture extra loggers in your job_variables:
job_variables = {
"image": "...",
"env": {
"PYTHONPATH": "/app",
# Capture non-Prefect loggers by name (e.g., your_pkg, uvicorn)
"PREFECT_LOGGING_EXTRA_LOGGERS": "your_pkg,uvicorn",
# Optional: set level
"PREFECT_LOGGING_LEVEL": "INFO",
},
"volumes": [...],
}
- If you spawn subprocesses and want their output in Prefect logs, capture stdout/stderr and forward to the Prefect logger:
import subprocess
from prefect import get_run_logger
def run_and_stream(cmd: list[str]) -> None:
logger = get_run_logger()
with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1) as p:
for line in p.stdout:
<http://logger.info|logger.info>(line.rstrip())
rc = p.wait()
if rc != 0:
raise RuntimeError(f"Command failed with exit code {rc}")
What gets where
- Prefect logger calls (get_run_logger) are shipped to the Prefect API and appear on the flow/task run in the UI.
- Standard Python loggers from your code can be captured if they propagate and you include them via PREFECT_LOGGING_EXTRA_LOGGERS.
- Bare prints/subprocess output do not automatically reach Prefect; capture and forward them as shown above.
- The Docker worker also emits “infrastructure” logs (container creation, image pulls). Those are separate from flow/task logs.
Gotchas
- Make sure your Docker subflow actually runs inside a Prefect flow/task context (i.e., the container is running your Prefect flow). Then get_run_logger will work and send logs to the API.
- If the container runs code outside Prefect (e.g., just a shell script), the worker may only see stdout/stderr as generic container logs. If you want them in the Prefect flow logs, capture and forward via a Prefect task/flow process as shown above.
- Ensure the environment inside the container includes the Prefect configuration the worker injects (the Docker worker handles this automatically for flow runs launched via run_deployment).Darius Tan
09/10/2025, 2:50 AMsubflow_run = await run_deployment(
name=f"{DOCKER_FLOW_NAME}/{DOCKER_DEPLOYMENT_NAME}",
parameters={
"module_path": module_path,
"params": params,
"upstream_data": upstream_data,
"target_uri": target_uri
},
job_variables=job_variables,
as_subflow=True,
)
# Wait for container execution to complete
<http://logger.info|logger.info>(f"Waiting for Docker subflow completion: {subflow_run.id}")
finished_subflow = await wait_for_flow_run(
subflow_run.id,
log_states=True
)
and before the subflow runs?
@flow(name=DOCKER_FLOW_NAME)
def docker_subflow(module_path: str, params: dict, upstream_data: str, target_uri: str) -> str:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Docker subflow executing in container for: {module_path}")
currently the flow run crashes before any logs gets displayed in the subflow as per:
prefect-docker-worker_1 | 02:27:20.125 | INFO | prefect.flow_runs.worker - Worker 'DockerWorker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3' submitting flow run '41942f4a-4606-41dd-a631-e9c0117a7488'
prefect-docker-worker_1 | 02:27:20.207 | INFO | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Creating Docker container 'tricky-skua'...
prefect-docker-worker_1 | 02:27:20.208 | INFO | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Creating Docker container 'tricky-skua'...
prefect-docker-worker_1 | 02:27:20.218 | INFO | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Docker container 'tricky-skua' has status 'created'
prefect-docker-worker_1 | 02:27:20.218 | INFO | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Docker container 'tricky-skua' created successfully.
prefect-docker-worker_1 | 02:27:20.365 | INFO | prefect.flow_runs.worker - Completed submission of flow run '41942f4a-4606-41dd-a631-e9c0117a7488'
prefect-docker-worker_1 | 02:27:20.367 | INFO | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Docker container 'tricky-skua' has status 'running'
prefect-docker-worker_1 | 02:27:26.342 | INFO | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Docker container 'tricky-skua' has status 'exited'
prefect-docker-worker_1 | 02:27:26.345 | INFO | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Docker container 'tricky-skua' has status 'exited'
prefect-docker-worker_1 | 02:27:26.380 | INFO | prefect.flow_runs.worker - Reported flow run '41942f4a-4606-41dd-a631-e9c0117a7488' as crashed: Flow run infrastructure exited with non-zero status code 1.
prefect-process-worker_1 | 02:27:28.203 | INFO | prefect.haimo_orchestration.flows - Waiting for Docker subflow completion: 41942f4a-4606-41dd-a631-e9c0117a7488
prefect-process-worker_1 | 02:27:28.225 | INFO | prefect - Flow run is in state 'Crashed'
prefect-process-worker_1 | 02:27:28.226 | ERROR | prefect.haimo_orchestration.flows - Error executing module ExampleDeploy: Docker subflow failed with state: Crashed('Flow run infrastructure exited with non-zero status code 1.')
Marvin
09/10/2025, 2:50 AMMarvin
09/10/2025, 2:52 AMFROM python:3.11-slim
RUN pip install --no-cache-dir "prefect==3.*" "prefect-docker"
WORKDIR /app
ENV PYTHONPATH=/app
Or start from a Prefect image:
FROM prefecthq/prefect:3-latest
WORKDIR /app
ENV PYTHONPATH=/app
3) Ensure your code is present in the container at the path your deployment expects
- If your deployment entrypoint is something like app/pipeline.py:docker_subflow, then app/pipeline.py must exist inside the container at runtime.
- If you’re using your local codebase (not pulling from source), mount it and set PYTHONPATH:
- job_variables example:
job_variables = {
"image": "prefecthq/prefect:3-latest", # or your custom image with Prefect installed
"env": {
"PYTHONPATH": "/app",
"PREFECT_LOGGING_LEVEL": "DEBUG", # more verbosity
# capture additional loggers if you want
"PREFECT_LOGGING_EXTRA_LOGGERS": "uvicorn,your_pkg"
},
"volumes": [
"/abs/path/to/your/project:/app:ro"
],
# optional, helps relative imports if your code assumes /app
"working_dir": "/app",
# pulling policy depending on your workflow
"image_pull_policy": "IfNotPresent"
}
- If your entrypoint path is different (e.g., monorepo subdir), align the mount and working_dir accordingly.
4) Verify you aren’t overriding the container command
- Let the Docker worker set the command needed to run the flow (it will run a Prefect command with your flow run ID).
- If you pass a “command” override in job_variables (or your image ENTRYPOINT/CMD conflicts), it can kill the run before Prefect starts. Remove custom command overrides while debugging.
5) Confirm your deployment target and overrides are actually applied
- In the Prefect UI, open the subflow run → Details → Infrastructure and verify:
- image is what you set in job_variables
- working_dir, env, volumes look correct
- If you still see an unexpected image (e.g., example_deploy_flow), you’re calling the wrong deployment or the override did not apply.
- Use the correct key name for pull policy: image_pull_policy (not pull_policy).
6) Turn up worker-side logs
- Run your Docker worker with PREFECT_LOGGING_LEVEL=DEBUG to get more infra detail.
- The worker logs will show the container command, env, and any errors in creating/starting the container.
7) Reproduce interactively to isolate the import issueMarvin
09/10/2025, 2:52 AMDarius Tan
09/10/2025, 2:55 AMdocker logs tricky-skua
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/cli/_utilities.py", line 44, in wrapper
return fn(*args, **kwargs)
File "/usr/local/lib/python3.10/site-packages/prefect/cli/_types.py", line 156, in sync_fn
return asyncio.run(async_fn(*args, **kwargs))
File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/usr/local/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
return future.result()
File "/usr/local/lib/python3.10/site-packages/prefect/cli/flow_run.py", line 412, in execute
await runner.execute_flow_run(id)
File "/usr/local/lib/python3.10/site-packages/prefect/runner/runner.py", line 571, in execute_flow_run
async with context:
File "/usr/local/lib/python3.10/site-packages/prefect/runner/runner.py", line 1498, in __aenter__
await self._exit_stack.enter_async_context(
File "/usr/local/lib/python3.10/contextlib.py", line 619, in enter_async_context
result = await _cm_type.__aenter__(cm)
File "/usr/local/lib/python3.10/site-packages/prefect/runner/_observers.py", line 42, in __aenter__
self._events_subscriber = await self._exit_stack.enter_async_context(
File "/usr/local/lib/python3.10/contextlib.py", line 619, in enter_async_context
result = await _cm_type.__aenter__(cm)
File "/usr/local/lib/python3.10/site-packages/prefect/events/clients.py", line 555, in __aenter__
await self._reconnect()
File "/usr/local/lib/python3.10/site-packages/prefect/events/clients.py", line 566, in _reconnect
self._websocket = await self._connect.__aenter__()
File "/usr/local/lib/python3.10/site-packages/websockets/asyncio/client.py", line 482, in __aenter__
return await self
File "/usr/local/lib/python3.10/site-packages/prefect/_internal/websockets.py", line 135, in _proxy_connect
proto = await self.__await_impl__()
File "/usr/local/lib/python3.10/site-packages/websockets/asyncio/client.py", line 439, in __await_impl__
self.connection = await self.create_connection()
File "/usr/local/lib/python3.10/site-packages/websockets/asyncio/client.py", line 365, in create_connection
_, connection = await loop.create_connection(factory, **kwargs)
File "/usr/local/lib/python3.10/asyncio/base_events.py", line 1036, in create_connection
infos = await self._ensure_resolved(
File "/usr/local/lib/python3.10/asyncio/base_events.py", line 1418, in _ensure_resolved
return await loop.getaddrinfo(host, port, family=family, type=type,
File "/usr/local/lib/python3.10/asyncio/base_events.py", line 863, in getaddrinfo
return await self.run_in_executor(
File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 58, in run
result = self.fn(*self.args, **self.kwargs)
File "/usr/local/lib/python3.10/socket.py", line 967, in getaddrinfo
for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -2] Name or service not known
An exception occurred.
Marvin
09/10/2025, 2:55 AMMarvin
09/10/2025, 2:57 AMjob_variables = {
"image": "...",
"env": {
"PYTHONPATH": "/app",
# optional for more logs:
"PREFECT_LOGGING_LEVEL": "DEBUG",
},
"volumes": [...],
"networks": ["your_compose_network_name"],
"image_pull_policy": "IfNotPresent",
# optionally set working_dir if needed
"working_dir": "/app",
}
- Also ensure the Docker worker service itself is attached to the same network; the worker will connect job containers to the listed networks.
- If you’re using Prefect Server running on your host (not containerized):
- Do not use localhost or 127.0.0.1 in PREFECT_API_URL — that points to the job container itself.
- On Mac/Windows Docker Desktop, you can use http://host.docker.internal:4200/api.
- On Linux, consider:
- Exposing the host on a reachable IP/hostname and use that in PREFECT_API_URL, or
- Mapping host.docker.internal via extra hosts on your image/daemon and using http://host.docker.internal:4200/api, or
- Running the job container with host networking (not generally recommended) if your environment permits it.
- If you’re using Prefect Cloud:
- PREFECT_API_URL should be the Cloud API URL and should resolve on the internet. If you see DNS failure here, check outbound DNS/firewall from the Docker host network.
3) Make sure you’re not wiping out the worker’s injected env
- The Docker worker injects Prefect env (PREFECT_API_URL, flow run ID, etc.). Your job_variables["env"] is merged, not replaced, in current releases — but if you’re on older versions or have custom code, confirm that the Prefect env vars are present in docker inspect.
4) Quick network test
- From the Docker host, run a test container on the same network to curl the API URL you plan to use:
- docker run --rm --network your_compose_network_name curlimages/curl:8.7.1 -v http://prefect-server:4200/api
- If that fails, it’s a network or naming issue, not Prefect.
5) Prefer consistent defaults
- Put the correct network(s) in your Docker work pool default job variables so you don’t have to pass them for every subflow.
- Set PREFECT_API_URL at the worker container level (env) to a value that works for all job containers; the worker will propagate it.
Likely fix for your setup
- You have both a process worker and a docker worker in Compose (“prefect-process-worker_1”, “prefect-docker-worker_1”). If your Prefect API is also in Compose, create/use a shared network (e.g., “prefect-net”), attach all three services to it, set the docker worker’s environment:
- PREFECT_API_URL=http://prefect-server:4200/api
- Then add the same network to job_variables["networks"] so the job container can resolve “prefect-server”.Marvin
09/10/2025, 2:57 AMservices:
prefect-server:
image: prefecthq/prefect:3-latest
command: server start
ports: ["4200:4200"]
networks: ["prefect-net"]
prefect-docker-worker:
image: prefecthq/prefect:3-latest
command: >
prefect worker start
--type docker
--pool local-docker-pool
environment:
- PREFECT_API_URL=<http://prefect-server:4200/api>
- PREFECT_LOGGING_LEVEL=DEBUG
networks: ["prefect-net"]
prefect-process-worker:
image: prefecthq/prefect:3-latest
command: >
prefect worker start
--type process
--pool local-pool
environment:
- PREFECT_API_URL=<http://prefect-server:4200/api>
networks: ["prefect-net"]
networks:
prefect-net:
driver: bridge
Then your job_variables should include:
job_variables = {
"image": "prefecthq/prefect:3-latest", # or your image with Prefect installed
"env": {"PYTHONPATH": "/app"},
"volumes": ["/abs/path/to/project:/app:ro"],
"networks": ["prefect-net"],
"image_pull_policy": "IfNotPresent",
"working_dir": "/app",
}
Once the container can resolve and reach PREFECT_API_URL, your flow engine will start and you’ll see the first logger.info lines from docker_subflow.
If you share:
- Your PREFECT_API_URL value
- Whether your Prefect API is in Compose or on the host
- The network(s) your workers and API are attached to
…I can tailor the exact network and env settings for your case.Darius Tan
09/10/2025, 3:07 AMprefect-docker-worker_1 | 03:00:25.482 | INFO | prefect.flow_runs.worker - Worker 'DockerWorker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3' submitting flow run 'd5ea019d-dd70-4d4c-9bc0-bbf4ddf2270e'
prefect-docker-worker_1 | 03:00:25.563 | INFO | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Creating Docker container 'golden-bittern'...
prefect-docker-worker_1 | 03:00:25.564 | INFO | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Creating Docker container 'golden-bittern'...
prefect-docker-worker_1 | 03:00:25.574 | INFO | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Docker container 'golden-bittern' has status 'created'
prefect-docker-worker_1 | 03:00:25.574 | INFO | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Docker container 'golden-bittern' created successfully.
prefect-docker-worker_1 | 03:00:25.721 | ERROR | prefect.flow_runs.worker - Failed to submit flow run 'd5ea019d-dd70-4d4c-9bc0-bbf4ddf2270e' to infrastructure.
prefect-docker-worker_1 | Traceback (most recent call last):
prefect-docker-worker_1 | File "/usr/local/lib/python3.10/site-packages/docker/api/client.py", line 275, in _raise_for_status
prefect-docker-worker_1 | response.raise_for_status()
prefect-docker-worker_1 | File "/usr/local/lib/python3.10/site-packages/requests/models.py", line 1026, in raise_for_status
prefect-docker-worker_1 | raise HTTPError(http_error_msg, response=self)
prefect-docker-worker_1 | requests.exceptions.HTTPError: 404 Client Error: Not Found for url: <http+docker://localhost/v1.49/containers/2050a4c76c34e7f58126a7e49a52aef35e5d19ffb5a1b4a59db3f86d87224834/start>
prefect-docker-worker_1 |
prefect-docker-worker_1 | The above exception was the direct cause of the following exception:
prefect-docker-worker_1 |
prefect-docker-worker_1 | Traceback (most recent call last):
prefect-docker-worker_1 | File "/usr/local/lib/python3.10/site-packages/prefect/workers/base.py", line 1262, in _submit_run_and_capture_errors
prefect-docker-worker_1 | result = await self.run(
prefect-docker-worker_1 | File "/usr/local/lib/python3.10/site-packages/prefect_docker/worker.py", line 457, in run
prefect-docker-worker_1 | container, created_event = await run_sync_in_worker_thread(
prefect-docker-worker_1 | File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 233, in run_sync_in_worker_thread
prefect-docker-worker_1 | result = await anyio.to_thread.run_sync(
prefect-docker-worker_1 | File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 56, in run_sync
prefect-docker-worker_1 | return await get_async_backend().run_sync_in_worker_thread(
prefect-docker-worker_1 | File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 2405, in run_sync_in_worker_thread
prefect-docker-worker_1 | return await future
prefect-docker-worker_1 | File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 914, in run
prefect-docker-worker_1 | result = context.run(func, *args)
prefect-docker-worker_1 | File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 243, in call_with_mark
prefect-docker-worker_1 | return call()
prefect-docker-worker_1 | File "/usr/local/lib/python3.10/site-packages/prefect_docker/worker.py", line 753, in _create_and_start_container
prefect-docker-worker_1 | container.start()
prefect-docker-worker_1 | File "/usr/local/lib/python3.10/site-packages/docker/models/containers.py", line 420, in start
prefect-docker-worker_1 | return self.client.api.start(self.id, **kwargs)
prefect-docker-worker_1 | File "/usr/local/lib/python3.10/site-packages/docker/utils/decorators.py", line 19, in wrapped
prefect-docker-worker_1 | return f(self, resource_id, *args, **kwargs)
prefect-docker-worker_1 | File "/usr/local/lib/python3.10/site-packages/docker/api/container.py", line 1136, in start
prefect-docker-worker_1 | self._raise_for_status(res)
prefect-docker-worker_1 | File "/usr/local/lib/python3.10/site-packages/docker/api/client.py", line 277, in _raise_for_status
prefect-docker-worker_1 | raise create_api_error_from_http_exception(e) from e
prefect-docker-worker_1 | File "/usr/local/lib/python3.10/site-packages/docker/errors.py", line 39, in create_api_error_from_http_exception
prefect-docker-worker_1 | raise cls(e, response=response, explanation=explanation) from e
prefect-docker-worker_1 | docker.errors.NotFound: 404 Client Error for <http+docker://localhost/v1.49/containers/2050a4c76c34e7f58126a7e49a52aef35e5d19ffb5a1b4a59db3f86d87224834/start>: Not Found ("failed to set up container networking: network my-network not found")
prefect-docker-worker_1 | 03:00:25.748 | INFO | prefect.flow_runs.worker - Reported flow run 'd5ea019d-dd70-4d4c-9bc0-bbf4ddf2270e' as crashed: Flow run could not be submitted to infrastructure:
prefect-docker-worker_1 | NotFound(HTTPError('404 Client Error: Not Found for url: <http+docker://localhost/v1.49/containers/2050a4c76c34e7f58126a7e49a52aef35e5d19ffb5a1b4a59db3f86d87224834/start'))>
Marvin
09/10/2025, 3:07 AMMarvin
09/10/2025, 3:08 AMservices:
prefect-server:
...
networks: ["prefect-net"]
prefect-docker-worker:
...
networks: ["prefect-net"]
prefect-process-worker:
...
networks: ["prefect-net"]
networks:
prefect-net:
name: prefect-net # pins the actual Docker network name
driver: bridge
- Then use:
job_variables = {
"image": "...",
"env": {...},
"volumes": [...],
"networks": ["prefect-net"], # must match docker network ls
"image_pull_policy": "IfNotPresent",
"working_dir": "/app",
}
Alternative (external network)
- Create the network once and mark it external so Compose won’t rename it:
# Create it once:
# docker network create prefect-net
networks:
prefect-net:
external: true
name: prefect-net
- Then attach your services to “prefect-net” and use that exact name in job_variables.
3) Sanity checks
- After changing compose config, re-create services: docker compose up -d --force-recreate
- Verify the network exists: docker network ls should show “prefect-net”.
- Verify the worker is on that network: docker inspect <docker-worker-container> → NetworkSettings.Networks keys include “prefect-net”.
- Try again; the job container should start and attach to the network.
Notes and tips
- If you omit job_variables["networks"], Docker will attach the job to the default “bridge” network. That won’t be able to resolve your Prefect server container by its compose service name. For compose-to-compose connectivity, attaching to the shared network is the right move.
- Make sure you’re using the correct key names in job variables; for pull policy use "image_pull_policy", not "pull_policy".
- You can set the default networks at the work pool level to avoid specifying them on every run_deployment
. In the UI: Work Pools → your Docker pool → Default job variables.
Docs
- Customize job variables
If you share your docker-compose networks section and the exact value you’re passing in job_variables["networks"], I can suggest the exact string to use.Darius Tan
09/10/2025, 3:54 AMMarvin
09/10/2025, 3:54 AMMarvin
09/10/2025, 3:56 AM"volumes": ["/var/prefect/artifacts:/var/prefect/artifacts:rw"]
- Use absolute paths in your code/URIs:
- Save/load artifacts under a fixed root, e.g., file:///var/prefect/artifacts/{run_id}/...
- Do not rely on relative paths or working directory.
- Keep the same path across in-process and Docker:
- If an in-process module writes to /var/prefect/artifacts on the host, the Docker job must mount that same host path at /var/prefect/artifacts (not /app/data or some other path).
- Avoid mixing named volumes and bind mounts unintentionally:
- If you mount a named volume (e.g., data-vol:/data), that’s not the same as a bind mount to a host directory. If the previous module wrote to a host path, but the next module mounts a named volume, you’ll see an empty folder.
- Use :rw for data volumes:
- If you mounted your project code as read-only (:ro), that’s fine for code.
- For data/artifacts, use :rw so modules can create folders/files.
- Ensure the host directory exists before runs:
- Docker will create missing host paths as an empty directory for bind mounts; if you expected pre-existing content and see an empty dir, you may be mounting the wrong host path.
- Create the directory ahead of time with correct permissions.
- Permissions:
- If module A writes as root and module B runs as a non-root user, B may not see or access the files.
- Consider aligning users across images or set permissive directory permissions on the host directory.
- Consistent job_variables across runs:
- Every Docker subflow invocation must include the same volume mapping for the data root; otherwise the next container won’t see the previous output.
Is it a timing issue?
- If you are awaiting the previous run (wait_for_flow_run) before starting the next module, the previous process has fully exited. Standard file writes should be complete and visible to the host immediately after close. Timing is rarely the cause unless:
- Your code is writing asynchronously (threads/processes) and the flow returns before joins flush/close.
- You’re using an eventually consistent remote FS (not typical for local bind mounts).
- You wrote to a different path (e.g., /tmp) that isn’t a bind mount.
Recommended “always works” pattern
1) Choose a single data root on the Docker host, e.g., /var/prefect/artifacts.
2) Add this to your Docker work pool default job variables so every Docker job gets it automatically:
{
"volumes": [
"/var/prefect/artifacts:/var/prefect/artifacts:rw"
],
"env": {
"ARTIFACT_ROOT": "/var/prefect/artifacts"
}
}
3) In your save/load helpers, always read/write under ARTIFACT_ROOT (fall back to /var/prefect/artifacts if not set). Return file:// URIs with absolute paths:
file:///var/prefect/artifacts/<flow_run_id>/<module_name>/...
4) For in-process runs, write to the same absolute host path (/var/prefect/artifacts) so the next Docker container sees the files unchanged.
Quick debugging steps
- Confirm the run’s volumes in the UI:
- Open the flow run > Details > Infrastructure and verify the volumes mapping is exactly what you expect for that run.
- Inside the container, list the directory:
- Temporarily add a debug task in the docker_subflow:
```
import os, subprocess
from prefect import task, get_run_logger
@task
def debug_list(path: str):
logger = get_run_logger()Marvin
09/10/2025, 3:56 AMdebug_list("/var/prefect/artifacts")
and debug_list("/var/prefect/artifacts/<expected_subdir>")
.
- Verify you’re not using different container paths in different runs:
- If module A writes to /data (inside its container) but that’s not a bind mount to the host (or is mounted at a different host path), the next module won’t see it.
- Check your job_variables source:
- If you build the volumes list dynamically, log it before calling run_deployment to ensure it’s consistent between modules and points to the correct host path.
If you share:
- The exact volumes value you’re passing to job_variables for both the producer and consumer modules
- An example artifact URI returned by module A
- The code in your save_artifact/load_artifact helpers
…I can pinpoint the mismatch (host path vs container path vs named volume) and suggest a precise fix.Darius Tan
09/10/2025, 6:37 AMsubflow_run = await run_deployment(
name=f"{DOCKER_FLOW_NAME}/{DOCKER_DEPLOYMENT_NAME}",
parameters={
"module_path": module_path,
"params": params,
"upstream_data": upstream_data,
"target_uri": target_uri
},
job_variables=job_variables, # This configures the container
as_subflow=True,
)
# Wait for container execution to complete
<http://logger.info|logger.info>(f"Waiting for Docker subflow completion: {subflow_run.id}")
finished_subflow = await wait_for_flow_run(
subflow_run.id,
log_states=True
)
# Extract return value (artifact URI) from subflow
if not finished_subflow.state.is_completed():
raise RuntimeError(f"Docker subflow failed with state: {finished_subflow.state}")
then what?Marvin
09/10/2025, 6:37 AMMarvin
09/10/2025, 6:38 AMfrom prefect.states import get_state_result, raise_state_exception
# Submit and wait as you already do...
subflow_run = await run_deployment(
name=f"{DOCKER_FLOW_NAME}/{DOCKER_DEPLOYMENT_NAME}",
parameters={
"module_path": module_path,
"params": params,
"upstream_data": upstream_data,
"target_uri": target_uri,
},
job_variables=job_variables,
as_subflow=True,
)
<http://logger.info|logger.info>(f"Waiting for Docker subflow completion: {subflow_run.id}")
finished_subflow = await wait_for_flow_run(subflow_run.id, log_states=True)
# Handle terminal state
if not finished_subflow.state.is_completed():
# Re-raise the subflow's underlying exception with its original traceback
raise_state_exception(finished_subflow.state)
# Retrieve the return value (e.g., your artifact URI string)
try:
artifact_uri = get_state_result(finished_subflow.state)
except Exception as e:
# Typically MissingResult if the flow returned a non-serializable object without a result store
raise RuntimeError("Failed to read subflow result") from e
# Optional: sanity check type
if not isinstance(artifact_uri, str):
raise TypeError(f"Expected subflow to return artifact URI string, got: {type(artifact_uri)}")
return artifact_uri
Important notes
- Make sure your docker_subflow actually returns the URI:
@flow(name=DOCKER_FLOW_NAME)
def docker_subflow(...)->str:
# compute and persist your artifact
return out_uri
- Returning a small string (URI) is ideal; it’s easy for Prefect to serialize and retrieve. If you ever return large or complex objects, configure a result store/serializer or just stick to “return a URI, write the data to shared storage.”
- If you see MissingResult when calling get_state_result, it means the subflow did not (or could not) persist a result that can be retrieved. Return a basic type (string) or configure result storage.
Docs
- Create deployments: https://docs-3.prefect.io/v3/how-to-guides/deployments/create-deployments
- Customize job variables: https://docs-3.prefect.io/v3/how-to-guides/deployments/customize-job-variablesDarius Tan
09/10/2025, 7:05 AMprefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
Marvin
09/10/2025, 7:05 AMMarvin
09/10/2025, 7:06 AMfrom prefect.states import get_state_result
and call it directly.
- The MissingResult error means your subflow’s return value wasn’t persisted anywhere retrievable. By default, flow results are not persisted in Prefect 3.x. You have two good options:
Option 1 (simplest): Don’t fetch a return value at all
- You’re already passing target_uri
into the docker subflow. Have the subflow write to that URI and just use that known URI as the “result” in the parent. No need to call get_state_result.
- Parent code can skip result retrieval entirely:
subflow_run = await run_deployment(..., parameters={..., "target_uri": target_uri}, ...)
finished = await wait_for_flow_run(subflow_run.id, log_states=True)
from prefect.states import raise_state_exception
if not finished.state.is_completed():
raise_state_exception(finished.state)
# Use the same target_uri you passed in
return target_uri
Option 2 (persist and fetch the returned value)
- Enable result persistence for the docker subflow and give it a result store. Then get_state_result(finished.state)
will work.
Steps:
1) Pick a result store accessible to both the subflow container and the parent (process worker). For local dev, a shared host path bind-mounted into both containers works; for production, prefer a remote store (S3/GCS/Azure).
2) Configure the docker_subflow to persist results:
from prefect import flow
from prefect.filesystems import LocalFileSystem
from prefect.serializers import JSONSerializer
# Shared path mounted into both the Docker worker's job containers and your process worker
RESULTS_ROOT = "/var/prefect/results"
results_fs = LocalFileSystem(basepath=RESULTS_ROOT)
@flow(
name=DOCKER_FLOW_NAME,
persist_result=True,
result_storage=results_fs,
result_serializer=JSONSerializer(),
)
def docker_subflow(module_path: str, params: dict, upstream_data: str | None, target_uri: str) -> str:
# compute & persist output at target_uri
return target_uri # small string is ideal
3) Mount the same path into both containers:
- In your Docker work pool defaults or per-run job_variables:
"volumes": [
"/host/results:/var/prefect/results:rw"
]
- In your process worker service (compose), also mount /host/results:/var/prefect/results:rw
.
4) In the parent, retrieve the result:
from prefect.states import get_state_result, raise_state_exception
finished = await wait_for_flow_run(subflow_run.id, log_states=True)
if not finished.state.is_completed():
raise_state_exception(finished.state)
artifact_uri = get_state_result(finished.state) # no await
Notes and gotchas
- If you saw “coroutine” from get_state_result, double-check your import. It should be:
- from prefect.states import get_state_result
- And called without await.
- MissingResult means the state had no embedded data and no result store reference. Setting persist_result=True
and a result store fixes that.
- If you use LocalFileSystem for results, it must be a shared bind mount visible to the parent and job containers. Otherwise, prefer a remote store (S3/GCS) so Prefect can fetch results from anywhere.
- Keep the actual data in your artifact store and only return a small string (the URI). Large return values are not recommended.
If you want, share:
- Your current docker_subflow decorator
- The job_variables volumes for both the docker job and the process worker
I can validate the exact storage setup to ensure get_state_result
can read the persisted return.Darius Tan
09/10/2025, 7:09 AMraise TypeError(f"Expected subflow to return artifact URI string, got: {type(artifact_uri)}")
prefect-process-worker_1 | TypeError: Expected subflow to return artifact URI string, got: <class 'coroutine'>
Marvin
09/10/2025, 7:09 AMMarvin
09/10/2025, 7:10 AMfrom prefect.states import get_state_result
and call it directly.
- If get_state_result
returns a coroutine, it means your docker_subflow returned a coroutine object (usually because you called an async function without awaiting it).
Fix: ensure docker_subflow returns a plain string
Handle both sync and async dynamic modules so you never return an awaitable.
Option A: Make docker_subflow async and await as needed
import importlib
import inspect
from typing import Any, Dict, Optional
from prefect import flow, get_run_logger
def dynamic_import(module_path: str):
# "pkg.module:callable"
mod_path, func_name = module_path.split(":")
module = importlib.import_module(mod_path)
return getattr(module, func_name)
def load_artifact(uri: Optional[str]) -> Any:
# your implementation
...
def save_artifact(obj: Any, target_uri: Optional[str] = None) -> str:
# your implementation; return the final URI
...
@flow(name="module-docker-subflow")
async def docker_subflow(
module_path: str,
params: Dict[str, Any],
upstream_data: Optional[str],
target_uri: Optional[str] = None,
) -> str:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Docker subflow executing: {module_path}")
func = dynamic_import(module_path)
input_obj = load_artifact(upstream_data)
# Call the dynamic module safely for both sync/async impls
if inspect.iscoroutinefunction(func):
result = await func(input_obj, **params)
else:
maybe = func(input_obj, **params)
result = await maybe if inspect.isawaitable(maybe) else maybe
out_uri = save_artifact(result, target_uri=target_uri)
return out_uri
Option B: Keep docker_subflow sync and run async functions explicitly
import asyncio
import inspect
@flow(name="module-docker-subflow")
def docker_subflow(...)->str:
func = dynamic_import(module_path)
input_obj = load_artifact(upstream_data)
if inspect.iscoroutinefunction(func):
result = asyncio.run(func(input_obj, **params))
else:
maybe = func(input_obj, **params)
if inspect.isawaitable(maybe):
result = asyncio.run(maybe)
else:
result = maybe
out_uri = save_artifact(result, target_uri=target_uri)
return out_uri
Then, in the parent:
- Do not await get_state_result.
- First, raise the original exception if the subflow failed; then read the return value.
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result, raise_state_exception
subflow_run = await run_deployment(
name=f"{DOCKER_FLOW_NAME}/{DOCKER_DEPLOYMENT_NAME}",
parameters={
"module_path": module_path,
"params": params,
"upstream_data": upstream_data,
"target_uri": target_uri,
},
job_variables=job_variables,
as_subflow=True,
)
finished = await wait_for_flow_run(subflow_run.id, log_states=True)
if not finished.state.is_completed():
# surfaces the real error/traceback
raise_state_exception(finished.state)
artifact_uri = get_state_result(finished.state) # no await
if not isinstance(artifact_uri, str):
raise TypeError(f"Expected subflow to return artifact URI string, got {type(artifact_uri)}")
About the MissingResult error you saw
- That happens when Prefect tries to fetch a result from the API but there’s nothing persisted. If you stick to returning a small string, Prefect will typically embed it in the state and get_state_result
will succeed.
- If you ever need to return larger/complex objects, set persist_result=True
and configure a result_storage
on the docker_subflow so Prefect can fetch it later. Otherwise, the best practice is returning a small URI and writing the real data to your artifact store.Marvin
09/10/2025, 7:10 AMtarget_uri
into the subflow, you can just use that value in the parent (have the subflow write to it) and skip get_state_result
altogether. This avoids result persistence concerns entirely.Darius Tan
09/10/2025, 7:16 AM@flow(name="module-docker-subflow")
async def docker_subflow(
module_path: str,
params: Dict[str, Any],
upstream_data: Optional[str],
target_uri: Optional[str] = None,
) -> str:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Docker subflow executing: {module_path}")
Marvin
09/10/2025, 7:16 AMMarvin
09/10/2025, 7:18 AMimport importlib
import inspect
import os
from typing import Any, Dict, Optional
from prefect import flow, get_run_logger
from prefect.filesystems import LocalFileSystem
from prefect.serializers import JSONSerializer
# Shared results root path inside containers and process worker
# Ensure this path is a bind-mount on BOTH your docker worker jobs AND your process worker
# e.g., host: /host/prefect/results -> container: /var/prefect/results
RESULTS_ROOT = os.getenv("PREFECT_RESULTS_ROOT", "/var/prefect/results")
RESULTS_FS = LocalFileSystem(basepath=RESULTS_ROOT)
def dynamic_import(module_path: str):
# "package.module:callable"
mod_path, func_name = module_path.split(":")
module = importlib.import_module(mod_path)
return getattr(module, func_name)
def load_artifact(uri: Optional[str]) -> Any:
if uri is None:
return None
if uri.startswith("file://"):
path = uri[len("file://"):]
with open(path, "rb") as f:
return f.read()
# Add other schemes (s3://, gs://) as needed
raise NotImplementedError(f"Unsupported artifact URI: {uri}")
def save_artifact(obj: Any, target_uri: Optional[str]) -> str:
if not target_uri:
raise ValueError("target_uri is required for docker_subflow")
if target_uri.startswith("file://"):
path = target_uri[len("file://"):]
os.makedirs(os.path.dirname(path), exist_ok=True)
# Choose your serialization; here we assume bytes or str
mode = "wb" if isinstance(obj, (bytes, bytearray)) else "w"
with open(path, mode) as f:
f.write(obj if isinstance(obj, (bytes, bytearray)) else str(obj))
return target_uri
# Add other schemes (s3://, gs://) as needed
raise NotImplementedError(f"Unsupported artifact URI: {target_uri}")
@flow(
name="module-docker-subflow",
# Persist the return value so parent can call get_state_result(...)
persist_result=True,
result_storage=RESULTS_FS,
result_serializer=JSONSerializer(),
)
async def docker_subflow(
module_path: str,
params: Dict[str, Any],
upstream_data: Optional[str],
target_uri: Optional[str] = None,
) -> str:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Docker subflow executing: {module_path}")
func = dynamic_import(module_path)
input_obj = load_artifact(upstream_data)
# Call module function; support both sync and async variants
if inspect.iscoroutinefunction(func):
result = await func(input_obj, **params)
else:
maybe = func(input_obj, **params)
result = await maybe if inspect.isawaitable(maybe) else maybe
out_uri = save_artifact(result, target_uri=target_uri)
<http://logger.info|logger.info>(f"Wrote artifact to: {out_uri}")
# Return a small string; parent can read via get_state_result (with persist_result=True)
return out_uri
Parent side (recap)
- Do not await get_state_result.Marvin
09/10/2025, 7:18 AMfrom prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result, raise_state_exception
subflow_run = await run_deployment(
name=f"{DOCKER_FLOW_NAME}/{DOCKER_DEPLOYMENT_NAME}",
parameters={
"module_path": module_path,
"params": params,
"upstream_data": upstream_data,
"target_uri": target_uri,
},
job_variables=job_variables,
as_subflow=True,
)
finished = await wait_for_flow_run(subflow_run.id, log_states=True)
if not finished.state.is_completed():
raise_state_exception(finished.state)
# Option A: just use target_uri you passed in (no result fetching needed)
# return target_uri
# Option B: read the subflow's returned string
artifact_uri = get_state_result(finished.state) # no await
if not isinstance(artifact_uri, str):
raise TypeError(f"Expected subflow to return artifact URI string, got {type(artifact_uri)}")
return artifact_uri
Mounts you’ll need
- Make sure both your process worker and the docker job containers mount the same host path at /var/prefect/results (or set PREFECT_RESULTS_ROOT to your chosen path):
- job_variables on the docker subflow runs:
- "volumes": ["/host/prefect/results/var/prefect/resultsrw"]
- docker-compose for your process worker service:
- volumes: ["./results/var/prefect/resultsrw"] or the same host path mapping
Common mistakes that cause “coroutine” or MissingResult
- Returning an awaitable from the subflow (fix by awaiting the dynamic module call as shown)
- Trying to get_state_result when the subflow didn’t persist its return (fix by using Option A or enabling persist_result with a result store)
Docs
- Create deployments: https://docs-3.prefect.io/v3/how-to-guides/deployments/create-deployments
- Customize job variables: https://docs-3.prefect.io/v3/how-to-guides/deployments/customize-job-variables
If you share your artifact loader/saver patterns (e.g., S3 vs file://), I can adapt the save/load helpers accordingly.Darius Tan
09/10/2025, 7:18 AM@flow(name="module-docker-subflow")
async def docker_subflow(
module_path: str,
params: Dict[str, Any],
upstream_data: Optional[str],
target_uri: Optional[str] = None,
) -> str:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Docker subflow executing: {module_path}")
sys.path.insert(0, "/app")
# call python module.py --upstream_artifact_path <upstream_data> --target_uri <target_uri> --kwargs key=value ...
command = [
"python", "/app/module.py",
"--module_path", module_path,
"--upstream_artifact_path", upstream_data or "",
"--target_uri", target_uri,
"--kwargs"
]
for k, v in params.items():
command.append(f"{k}={v}")
command = [str(c) for c in command] # Ensure all parts are strings
<http://logger.info|logger.info>(f"Running command: {' '.join(command)}")
result = subprocess.run(command, capture_output=True, text=True, check=False)
Marvin
09/10/2025, 7:18 AMMarvin
09/10/2025, 7:19 AMtarget_uri
you pass in
- Return that same target_uri
as a plain string
- In the parent, don’t call `get_state_result`; just use the known target_uri
- If you do want to fetch the returned value with get_state_result
, you must enable result persistence on the subflow and configure a result store shared between the parent and the Docker job
Concrete fixes for your current subprocess approach
1) Make your docker_subflow return a string and surface subprocess output
- Since your flow is async, avoid blocking the event loop. Run subprocess.run
in a worker thread and log stdout/stderr.
- Return target_uri
explicitly.
import os
import sys
import shlex
import subprocess
from typing import Any, Dict, Optional
from prefect import flow, get_run_logger
from prefect.utilities.asyncutils import run_sync_in_worker_thread
@flow(name="module-docker-subflow")
async def docker_subflow(
module_path: str,
params: Dict[str, Any],
upstream_data: Optional[str],
target_uri: Optional[str] = None,
) -> str:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Docker subflow executing: {module_path}")
if not target_uri:
raise ValueError("target_uri is required")
# Ensure your code path is importable if your module.py imports project code
sys.path.insert(0, "/app")
# Build the command
cmd = [
"python", "/app/module.py",
"--module_path", module_path,
"--upstream_artifact_path", upstream_data or "",
"--target_uri", target_uri,
"--kwargs",
]
for k, v in params.items():
cmd.append(f"{k}={v}")
<http://logger.info|logger.info>(f"Running command: {shlex.join(map(str, cmd))}")
# Run synchronously in a worker thread so we can await it
def _run():
return subprocess.run(cmd, capture_output=True, text=True, cwd="/app", env=os.environ.copy())
cp = await run_sync_in_worker_thread(_run)
# Stream logs
if cp.stdout:
for line in cp.stdout.splitlines():
<http://logger.info|logger.info>(line)
if cp.stderr:
for line in cp.stderr.splitlines():
logger.error(line)
if cp.returncode != 0:
raise RuntimeError(f"module.py exited with {cp.returncode}")
# Option A (recommended): return the known target_uri that module.py wrote to
return target_uri
2) Parent: do not fetch a result; use the same target_uri you passed
- This avoids MissingResult entirely.
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import raise_state_exception
# submit
subflow_run = await run_deployment(
name=f"{DOCKER_FLOW_NAME}/{DOCKER_DEPLOYMENT_NAME}",
parameters={
"module_path": module_path,
"params": params,
"upstream_data": upstream_data,
"target_uri": target_uri,
},
job_variables=job_variables,
as_subflow=True,
)
# wait
finished = await wait_for_flow_run(subflow_run.id, log_states=True)
if not finished.state.is_completed():
raise_state_exception(finished.state)
# Just use the known target_uri
return target_uri
If you must read the subflow’s return via get_state_result
- Keep the subflow’s return target_uri
as above, but also enable result persistence and a result store for the subflow so the parent can read it later:
- Add to docker_subflow decorator:
- persist_result=True
- result_storage=LocalFileSystem(basepath="/var/prefect/results")
(bind-mount the same host path into BOTH the Docker job containers and your process worker), or use S3/GCS/Azure
- result_serializer=JSONSerializer()
- Then the parent can do:
- from prefect.states import get_state_result
Marvin
09/10/2025, 7:19 AMartifact_uri = get_state_result(finished.state)
(no await)
- Without persistence, calling get_state_result
after fetching the state from the API will raise MissingResult
.
Why you saw “coroutine” earlier
- That happens when the flow returned an awaitable (e.g., you called an async function and didn’t await it) and you then tried to treat it as a string. With the subprocess approach above, you return a plain string.
Other gotchas to double-check
- Ensure /app/module.py
exists inside the container; mount your project into the container consistently and set working_dir="/app"
in job_variables if you rely on relative paths.
- Use volumes
consistently for data/artifacts so the subprocess can write to the same host path the next module will read.
- Keep PREFECT_LOGGING_LEVEL=DEBUG
temporarily to see more in logs.
Docs
- Create deployments
- Customize job variables
If you want, share your job_variables
for the subflow run and how module.py
writes to target_uri
— I can validate the mount/paths and tailor the snippet exactly.Darius Tan
09/10/2025, 7:38 AMMarvin
09/10/2025, 7:38 AMMarvin
09/10/2025, 7:39 AMrun_module
). That creates a task run in the parent flow.
- That task then triggers a separate flow run via run_deployment(..., as_subflow=True)
. That creates a child flow run (the Docker subflow) on your Docker worker.
So in the UI you’ll see:
- Parent flow run
- Task run: run_module (awaiting/monitoring)
- Child flow run: module-docker-subflow (running in the container)
If you prefer not to see an extra task node, you can call run_deployment
directly from the flow body instead of inside a task:
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import raise_state_exception, get_state_result
@flow
async def dynamic_pipeline_flow(...):
fr = await run_deployment(
name="module-docker-subflow-deployment-name",
parameters={...},
job_variables={...},
as_subflow=True,
)
finished = await wait_for_flow_run(fr.id, log_states=True)
if not finished.state.is_completed():
raise_state_exception(finished.state)
# artifact_uri = get_state_result(finished.state)
...
Trade-offs:
- Inside a task: you get task-level features (retry, timeout, caching, concurrency limits) and a clear node in the task graph.
- In the flow body: a cleaner graph (no extra task node) but you lose task-level controls for that step.
Both patterns are valid; choose based on how much task-level control/observability you want.
Docs
- Deployments: https://docs-3.prefect.io/v3/how-to-guides/deployments/create-deployments
- Job variables: https://docs-3.prefect.io/v3/how-to-guides/deployments/customize-job-variablesDarius Tan
09/10/2025, 10:18 AMMarvin
09/10/2025, 10:19 AMMarvin
09/10/2025, 10:20 AM# Process work pool default job variables (or per deployment call)
{
"env": {
"CUDA_VISIBLE_DEVICES": "0", # limit to GPU 0 for this run
"NVIDIA_VISIBLE_DEVICES": "0" # older runtimes sometimes pay attention to this too
}
}
- In your model code:
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
# or explicitly pick GPU 0 if you’ve set CUDA_VISIBLE_DEVICES appropriately
If your “in-process” mode literally runs inside your FastAPI process (not via a worker), that FastAPI container/host must be started with GPU access and CUDA libs too.
B) Docker subflow (Docker worker)
Prereqs on the Docker host:
- Install NVIDIA Container Toolkit so Docker can run GPU workloads: https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/install-guide.html
- Use a CUDA-enabled image for your module (e.g., nvidia/cuda:* or framework-specific GPU image). Prefect base images do NOT include CUDA.
Two reliable ways to expose GPUs to job containers:
1) Default runtime method (most robust, no job variables needed)
- Set Docker’s default runtime to nvidia on the host:
# /etc/docker/daemon.json
{
"runtimes": {
"nvidia": { "path": "nvidia-container-runtime", "runtimeArgs": [] }
},
"default-runtime": "nvidia"
}
- Restart Docker. Now every container (including Prefect job containers) can see GPUs automatically.
- You still control which GPUs per run with env:
job_variables = {
"image": "your-gpu-image:tag",
"env": { "CUDA_VISIBLE_DEVICES": "0" },
"volumes": [...],
"networks": ["prefect-net"],
"image_pull_policy": "IfNotPresent"
}
2) Per-run request method (job_variables)
- Depending on your prefect-docker version, the worker may accept GPU-related job variables and pass them to Docker, e.g.:
job_variables = {
"image": "your-gpu-image:tag",
"env": { "CUDA_VISIBLE_DEVICES": "0" },
"volumes": [...],
"networks": ["prefect-net"],
"image_pull_policy": "IfNotPresent",
"gpus": "all" # equivalent to `docker run --gpus all` if supported
}
Notes:
- Support for keys like gpus/device_requests can vary by version. If you see errors, prefer the “default runtime” method.
- You can always set default job variables with your Docker work pool in the UI so you don’t pass gpus/env each run: Work Pools → your pool → Default job variables.
- If the worker logs show it’s ignoring unknown keys, your version may not support per-run GPU keys; use the default runtime method instead.
C) Verify GPU visibility (both modes)
Add a quick check in your docker_subflow (and in-process flow) to confirm GPU presence:
```
import subprocess, shutil
from prefect import get_run_logger
logger = get_run_logger()
if shutil.which("nvidia-smi"):Marvin
09/10/2025, 10:20 AM