<@ULVA73B9P> I need help fixing and completing my ...
# ask-marvin
d
@Marvin I need help fixing and completing my dynamic pipeline architecture built with Prefect and FastAPI. Goal is to have a system that can run a sequence of modules, where each can run either in the main process or in a separate Docker container. The in-process execution works, but the Docker execution logic is incomplete and broken. Below is a detailed summary of the current architecture followed by specific questions I need help with. Background: Architecture Dynamic pipeline that executes a sequence of "modules." The output of one module, called an "artifact" (referenced by a URI), is passed as input to the next. Key Components & Logic Flow:
dynamic_pipeline_flow
(Main Flow)
: This is the orchestrator. It loops through a list of module configurations and calls the
run_module
task for each one, passing the output artifact from the previous step to the next. •
run_module
(Task)
: This is a hybrid executor responsible for running a single module. ◦ In-Process Mode (Working): It dynamically imports and runs the specified Python module. It correctly handles loading input artifacts from a URI and saving the output artifact. ◦ Docker Mode (Broken): It attempts to trigger a separate Prefect deployment (
module-docker-subflow-deployment
) as a subflow. This implementation is incomplete. •
docker_subflow
(Subflow)
: This is a placeholder flow intended to execute a module inside a Docker container. It is currently unimplemented. • API & Deployments: A FastAPI endpoint (
/pipelines/run
) triggers new pipeline runs. ◦ On startup, the application creates two deployments: one for the main flow targeting a process-based worker pool (
local-pool
) and one for the Docker subflow targeting a Docker-based worker pool (
local-docker-pool
). ◦ The API endpoint currently always submits runs to the main process-based deployment Key Problems & Questions Based on the architecture described above, please provide code examples and explanations to solve the following problems: 1. Implementing the Docker Subflow Call: In the
run_module
task, the call to
run_deployment
for Docker execution is incomplete. How do I correctly pass all necessary parameters (
module_path
,
params
,
upstream_data
) to the subflow deployment, and how do I retrieve the resulting artifact URI from the completed subflow run to pass to the next module? 2.
docker_subflow
Implementation
: The
docker_subflow
is currently empty. What should its implementation look like? Specifically, how to: ◦ Receive the
module_path
and other parameters? ◦ Load the input artifact using the
upstream_data
URI from within its Docker container? ◦ Dynamically execute the specified module? ◦ Save the output as a new artifact and return its URI to the parent flow? 3. Correctly Using Deployments: The
/pipelines/run
endpoint hardcodes the main deployment. Is this the correct approach? Confirm if the intended pattern is for the
dynamic_pipeline_flow
to always be the entry point, which then correctly delegates tasks to different worker pools (process vs. Docker) via subflow calls.
m
thought for 390.1 seconds
Thanks for the detailed write-up — this is a great use case for Prefect 3.x. I’ll assume Prefect 3.x unless you tell me otherwise. High-level confirmation - Yes: your FastAPI endpoint should always trigger the main “orchestrator” deployment (the dynamic pipeline flow). That flow decides per-module whether to run in-process or spin up a Docker-based subflow via a separate deployment. This is the correct pattern in Prefect. Below are concrete code examples and explanations for each of your questions. 1) Implementing the Docker subflow call from run_module Use
prefect.deployments.run_deployment
to submit the subflow to the Docker pool, then wait for completion with
wait_for_flow_run
and pull the return value from the flow’s terminal state with
get_state_result
. Key imports: -
prefect.deployments.run_deployment
-
prefect.flow_runs.wait_for_flow_run
-
prefect.states.get_state_result
Example task (async) showing both in-process and Docker modes:
Copy code
from typing import Any, Dict, Optional
import importlib
from prefect import task, get_run_logger
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result

def dynamic_import(module_path: str):
    # module_path format: "package.module:callable_name"
    mod_path, func_name = module_path.split(":")
    module = importlib.import_module(mod_path)
    return getattr(module, func_name)

def load_artifact(uri: Optional[str]) -> Any:
    if uri is None:
        return None
    # Implement your artifact loader here (local, S3, etc.)
    # For example:
    # if uri.startswith("s3://"): ...
    # else: open(local_path) ...
    raise NotImplementedError("load_artifact is env-specific")

def save_artifact(obj: Any) -> str:
    # Implement your artifact writer and return a URI
    # For example: write to S3/local and return "s3://..." or "file://..."
    raise NotImplementedError("save_artifact is env-specific")

@task
async def run_module(
    module_path: str,
    params: Dict[str, Any],
    upstream_data: Optional[str],
    mode: str = "process",
) -> str:
    """
    Returns a URI string for the output artifact produced by the module.
    """
    logger = get_run_logger()
    if mode == "process":
        <http://logger.info|logger.info>(f"Running module in-process: {module_path}")
        func = dynamic_import(module_path)
        input_obj = load_artifact(upstream_data)
        output_obj = func(input_obj, **params)
        out_uri = save_artifact(output_obj)
        return out_uri

    elif mode == "docker":
        <http://logger.info|logger.info>(f"Running module in Docker via deployment: {module_path}")
        # Submit to the Docker subflow deployment with all needed parameters
        subflow_run = await run_deployment(
            name="module-docker-subflow-deployment",
            parameters={
                "module_path": module_path,
                "params": params,
                "upstream_data": upstream_data,
            },
            as_subflow=True,  # keep parent/child relationship in UI
            # timeout=None means do not wait here; we'll wait using wait_for_flow_run below
        )

        # Wait for completion
        finished_subflow = await wait_for_flow_run(
            subflow_run.id, log_states=True
        )

        # Extract the subflow return value (artifact URI)
        artifact_uri = get_state_result(finished_subflow.state)
        if not isinstance(artifact_uri, str):
            raise ValueError("Docker subflow did not return a string artifact URI")
        return artifact_uri

    else:
        raise ValueError(f"Unknown mode: {mode}")
Notes - The parameters for
run_deployment
go in the
parameters
dict exactly as you’d call the subflow. -
wait_for_flow_run
blocks until the flow reaches a terminal state.
-
get_state_result(finished_subflow.state)
gives you the value returned by the subflow (i.e., your artifact URI). This requires the subflow result to be retrievable (more on this in section 2). References: - Create deployments - Customize job variables 2) Implementing docker_subflow This flow runs inside your Docker worker. It needs to: - Accept
module_path
,
params
,
upstream_data
- Load input data from the URI - Dynamically import and run the specified module - Save the output as an artifact and return the artifact URI (a string) Example:
Copy code
from typing import Any, Dict, Optional
import importlib
from prefect import flow, get_run_logger

def dynamic_import(module_path: str):
    mod_path, func_name = module_path.split(":")
    module = importlib.import_module(mod_path)
    return getattr(module, func_name)

def load_artifact(uri: Optional[str]) -> Any:
    if uri is None:
        return None
    # Same implementation contract as in-process
    raise NotImplementedError("load_artifact is env-specific in Docker too")

def save_artifact(obj: Any) -> str:
    # Same implementation contract as in-process
    raise NotImplementedError("save_artifact is env-specific in Docker too")

@flow(name="module-docker-subflow")
def docker_subflow(
    module_path: str,
    params: Dict[str, Any],
    upstream_data: Optional[str],
) -> str:
    """
    Executes a single module inside a Docker container and returns the output artifact URI.
    """
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Docker subflow starting: {module_path}")

    # Ensure the container has access to the upstream_data storage (e.g., S3 creds, mounted volumes, etc.)
    input_obj = load_artifact(upstream_data)

    func = dynamic_import(module_path)
    output_obj = func(input_obj, **params)

    out_uri = save_artifact(output_obj)
    <http://logger.info|logger.info>(f"Docker subflow completed; artifact URI: {out_uri}")

    # Return the URI as the flow's return value; parent flow will fetch this via get_state_result
    return out_uri
Important considerations - Code availability: Your Docker image must contain your project code (so
importlib.import_module
works). With Prefect 3.x deployments from source, the Docker worker can clone/pull your repo automatically; otherwise bake code into the image. - Credentials and networking: Make sure the container can read
upstream_data
and write the new artifact (e.g., S3 credentials via environment variables; host volumes; network access). - Returning the URI: Returning a small string is ideal. The parent can reliably read this via
get_state_result
. If you return large objects, configure a result store; not necessary for a short URI string. 3) Correctly using deployments and the FastAPI entrypoint - Yes: the FastAPI endpoint should trigger the main pipeline deployment. The main flow orchestrates per-module execution and can call the Docker subflow deployment whenever a module is marked for Docker. Programmatically create both deployments on startup (recommended in Prefect 3.x): ``` from prefect import flow from prefect.flows import Flow # Assume both flows live in this repo and these entrypoints are correct # Adjust source/entrypoint to your code layout. def create_deployments(): # Main orchestrator deployment -> process pool Flow.from_source( source="https://github.com/your-org/your-repo", # or "." if local + remote storage entrypoint="app/pipeline.py:dynamic_pipeline_flow", ).deploy( name="dynamic-pipeline", work_pool_name="local-pool", # Optional: default parameters # parameters={"default_param": "value"}, # Optional: tags=["prod"], ) # Docker subflow deployment -> docker pool Flow.from_source( source="https://github.com/your-org/your-repo",
entrypoint="app/pipeline.py:docker_subflow", ).deploy( name="module-docker-subflow-deployment", work_pool_name="local-docker-pool", # job_variables are important for Docker workers (image, env, volumes, etc.) job_variables={ "image": "your-registry/your-image:latest", # "env": {"AWS_ACCESS_KEY_ID": "...", "AWS_SECRET_ACCESS_KEY": "..."}, # "volumes": ["host_path/container pathrw"], }, )
Copy code
FastAPI endpoint that triggers the main flow deployment:
from fastapi import FastAPI from pydantic import BaseModel from prefect.deployments import run_deployment app = FastAPI() class ModuleConfig(BaseModel): module_path: str params: dict mode: str # "process" or "docker" class RunRequest(BaseModel): modules: list[ModuleConfig] initial_artifact: str | None = None @app.on_event("startup") async def on_startup(): # Optionally: create_deployments() if you programmatically manage them at app startup pass @app.post("/pipelines/run") async def run_pipeline(req: RunRequest): # Important: use the full identifier "Flow Name/Deployment Name" if you have multiple fr = await run_deployment( name="dynamic_pipeline_flow/dynamic-pipeline", parameters={ "modules": [m.dict() for m in req.modules], "initial_artifact": req.initial_artifact, }, # as_subflow=False because this is a top-level run ) return {"flow_run_id": str(fr.id)}
Copy code
And your main flow should iterate the modules and call `run_module`:
from typing import Any, Dict, List, Optional from prefect import flow @flow(name="dynamic_pipeline_flow") def dynamic_pipeline_flow( modules: List[Dict[str, Any]], initial_artifact: Optional[str] = None, ) -> str: """ modules: [{"module_path": "...", "params": {...}, "mode": "process"|"docker"}, ...] Returns final artifact URI. """ current_uri = initial_artifact for i, mod in enumerate(modules): current_uri = run_module.submit( module_path=mod["module_path"], params=mod.get("params", {}), upstream_data=current_uri, mode=mod.get("mode", "process"), ).result() return current_uri ``` Common gotchas and tips - Use
as_subflow=True
in
run_deployment
calls for child runs so the UI shows parent/child relationships and logs get grouped. - Results: returning a small string from the Docker subflow is best. The parent uses
get_state_result
to read it. If you ever return larger data across containers, configure result storage appropriately. - Parameters must be JSON-serializable. Pass URIs and standard types — not complex objects. - Docker job variables: place everything the container needs there (image, env for credentials, volumes). - Source-of-truth: With
Flow.from_source(...).deploy(...)
, the worker will pull your repo at run time. Ensure entrypoints are correct and code is importable. Docs - Create deployments - Customize job variables If you can share your current
run_module
and any helper loaders/savers, I can tailor these snippets exactly to your code paths (e.g., S3 vs. local filesystem).
d
@Marvin what if the docker container / image can change with every submission of new dynamic pipeline with varying modules? how does the docker deployment work in this case?
m
thought for 161.5 seconds
Great question — you do not need a new deployment every time the Docker image changes. How to vary the Docker image per submission (per module) - Keep a single deployment for your Docker subflow (e.g., “module-docker-subflow-deployment”) bound to a Docker work pool. - At run time, override the image (and other Docker settings) by passing
job_variables
to
run_deployment
. This lets each module/run use a different container image without creating new deployments. Example: pass image/env/volumes per module
Copy code
# In your module config, allow docker overrides
{
  "module_path": "your_pkg.mod:run",
  "params": {"x": 1},
  "mode": "docker",
  "docker": {
    "image": "<http://ghcr.io/you/pipeline-mod-a:2025-09-08|ghcr.io/you/pipeline-mod-a:2025-09-08>",
    "env": {"EXTRA_FLAG": "1"},
    "volumes": ["host_path:/container_path:rw"]
  }
}
Copy code
# In run_module task
@task
async def run_module(module_path, params, upstream_data, mode="process", docker: dict | None = None) -> str:
    if mode == "process":
        ...
    elif mode == "docker":
        subflow_run = await run_deployment(
            name="module-docker-subflow-deployment",
            parameters={
                "module_path": module_path,
                "params": params,
                "upstream_data": upstream_data,
            },
            job_variables=(docker or {}),   # <-- override image/env/volumes per run
            as_subflow=True,
        )
        finished = await wait_for_flow_run(subflow_run.id, log_states=True)
        return get_state_result(finished.state)
    else:
        raise ValueError(f"Unknown mode: {mode}")
Why this works - Deployments define the flow entrypoint and default infra. Work pools + job variables control the “how to run.” In Prefect 3.x you can override those per flow run submission via
run_deployment(..., job_variables=...)
. No redeploy needed. - This is ideal when modules have different dependencies or when you build/push an image per submission. Where to put the code and dependencies - If your modules live in your repo: use a generic base image (Python + system deps), deploy the flow with
Flow.from_source(...).deploy(...)
, and let Prefect pull your repo at run time. Then you can still override the image to pick different Python/system stacks for different modules. - If modules change outside the repo: build/push a purpose-built image per submission (CI or on-demand) and pass its tag via
job_variables["image"]
. Operational tips - Make sure your Docker work pool’s worker can pull images from your registry. - Keep
job_variables
small and focused: commonly
image
,
env
,
volumes
. Defaults can be set on the work pool or deployment; overrides at run time take precedence. - Security: if you don’t want arbitrary images at run time, lock down your pool/deployment defaults and only allow specific overrides. End-to-end pattern recap - FastAPI endpoint triggers the main orchestrator deployment (correct approach). - The main flow iterates over modules. For each module in Docker mode, it: - Passes
module_path
,
params
,
upstream_data
to the subflow deployment. - Overrides Docker
job_variables
with the module’s desired image/env/volumes. - Waits for completion and reads the returned artifact URI. Docs - Customize job variables - Create deployments If you want, share how you plan to select/build the image per module (e.g., naming/tagging scheme), and I can suggest a concrete structure for your module config and CI step that pairs cleanly with the
job_variables
override.
d
@Marvin what if i am currently using local codebase instead of pulling from github source? where if inprocess run it is straightforward via the modules, but if docker it will take image from the docker config, if available locally use the image directly, if not either pull or read the dockerfile from the module_path and build based on the config.
m
thought for 60.1 seconds
Short answer: you can keep your “single” Docker subflow deployment and decide the image at run time. For local development with a local codebase, you have two workable patterns: - Easiest locally: base image + volume mount your project into the container per run - More controlled: build or pull the image on-demand before you call run_deployment, then pass the chosen tag in job_variables Important constraint: the image you choose must exist on the Docker host where the Docker worker runs. If your worker runs on the same machine (same Docker daemon) as your FastAPI app, building locally is fine. If workers run elsewhere, you’ll need to push the image to a registry and reference that tag. Option A: Base image + volume mount (best for local dev) - Keep a single image with Python + system deps (no app code inside). - Mount your project directory into the container so your modules import from your local files. - Pass volumes and env in job_variables per run. Your module imports will “just work” in the container since your code is mounted. Example:
Copy code
# Module config
{
  "module_path": "your_pkg.mod_a:run",
  "params": {"x": 1},
  "mode": "docker",
  "docker": {
    "image": "python:3.11-slim",
    "volumes": ["/abs/path/to/your/project:/app:ro"],
    "env": {"PYTHONPATH": "/app"},
    # Optional: set working dir via env or your flow code
  }
}
Copy code
# In run_module
@task
async def run_module(..., docker: dict | None = None) -> str:
    if mode == "docker":
        job_vars = (docker or {})
        subflow = await run_deployment(
            name="module-docker-subflow-deployment",
            parameters={
                "module_path": module_path,
                "params": params,
                "upstream_data": upstream_data,
            },
            job_variables=job_vars,      # volumes/env/image passed here
            as_subflow=True,
        )
        finished = await wait_for_flow_run(subflow.id, log_states=True)
        return get_state_result(finished.state)
Pros - No image builds per run - Always uses your current local code Cons - Works only when the Docker worker is on the same machine and can mount your host path Option B: Build/pull image dynamically per run - Before calling run_deployment, ensure the desired image exists on the worker’s Docker host: - If image tag provided and present locally, use it - If not present: - Pull it (if allowed), or - Build it from a Dockerfile in your module’s directory - If the worker is remote, push the image to a registry and reference the registry tag Minimal utilities using docker SDK (local build/pull) ``` # pip install docker import docker from docker.errors import ImageNotFound _docker = docker.from_env() def image_exists_locally(tag: str) -> bool: try: _docker.images.get(tag) return True except ImageNotFound: return False def pull_image(tag: str): _docker.images.pull(tag) def build_image(context_dir: str, dockerfile: str, tag: str, buildargs: dict | None = None, platform: str | None = None): _docker.images.build( path=context_dir, dockerfile=dockerfile, tag=tag, buildargs=buildargs or {}, platform=platform, ) def ensure_image_available(docker_cfg: dict) -> str: """ docker_cfg accepts: - image: str | None - pull: bool (default True) - dockerfile: str | None - context: str | None - build_args: dict | None - platform: str | None - push: bool (default False) # requires registry auth if True - push_tag: str | None # registry tag to push to and use Returns an image tag that exists locally (or in registry for remote workers). """ image = docker_cfg.get("image") pull_ok = docker_cfg.get("pull", True) dockerfile = docker_cfg.get("dockerfile") context_dir = docker_cfg.get("context") build_args = docker_cfg.get("build_args") platform = docker_cfg.get("platform")
do_push = docker_cfg.get("push", False) push_tag = docker_cfg.get("push_tag") if image: if not image_exists_locally(image): if pull_ok: pull_image(image) elif dockerfile and context_dir: build_image(context_dir, dockerfile, image, build_args, platform) else: raise RuntimeError(f"Image {image} not available and no pull/build path supplied") # Optional: push to registry if worker is remote if do_push and push_tag and image != push_tag: _docker.images.get(image).tag(push_tag) for line in _docker.images.push(push_tag, stream=True, decode=True): pass return push_tag return image # No image provided; require Dockerfile path + context to build if not (dockerfile and context_dir): raise ValueError("Either 'image' or both 'dockerfile' and 'context' must be provided in docker config") # Choose a tag (e.g., module-derived) auto_tag = docker_cfg.get("auto_tag") or "local/module-auto:latest" build_image(context_dir, dockerfile, auto_tag, build_args, platform) if do_push and push_tag: _docker.images.get(auto_tag).tag(push_tag) for line in _docker.images.push(push_tag, stream=True, decode=True): pass return push_tag return auto_tag
Copy code
Use it before calling the Docker subflow:
@task async def run_module(..., docker: dict | None = None) -> str: if mode == "docker": # Prepare or resolve the image to use image_to_use = ensure_image_available(docker or {}) job_vars = { "image": image_to_use, # forward additional runtime settings **{k: v for k, v in (docker or {}).items() if k in ("env", "volumes")} } subflow = await run_deployment( name="module-docker-subflow-deployment", parameters={"module_path": module_path, "params": params, "upstream_data": upstream_data}, job_variables=job_vars, as_subflow=True, ) finished = await wait_for_flow_run(subflow.id, log_states=True) return get_state_result(finished.state) ``` Key considerations - Same Docker daemon: If your FastAPI/orchestrator and Docker worker use the same local Docker daemon, a local build (via docker SDK) is immediately usable by the worker. - Remote workers: If the Docker worker runs on a different host, you must push the built image to a registry accessible by that worker and pass the registry tag in job_variables. - Volume mounting vs builds: For local-only dev flows, mounting your code into a generic base image is simpler and very fast. For reproducible or remote execution, build/push images per module/version. - Module_path with local code: for volume-mount approach, set PYTHONPATH to your mount (e.g., /app) and import dynamically inside docker_subflow just like in-process. - Security: Only allow trusted Dockerfile paths and registries; avoid arbitrary inputs becoming build context without validation. Your docker_subflow stays the same - It only needs to import and run the requested module using the image/environment you provided; it does not need to know whether you mounted the code or baked it into the image. Docs - Job variables overview: Customize job variables - Deployment creation pattern: Create deployments If you can confirm: - Is your Docker worker on the same machine as your FastAPI app? - Do you want to rely on volume mounts (local dev) or prefer building/pushing images (reproducible/remote)? I can tailor the code to exactly match your module config shape (e.g., auto-tag scheme, Dockerfile lookup like “<module_dir>/Dockerfile”, and default volumes/env).
d
@Marvin for option B would using prefect-docker make more sense?
m
thought for 68.9 seconds
Short answer: It can, but it depends on what you want to optimize. What prefect-docker helps with - If you’re using a Docker work pool (as you are), you already rely on the prefect-docker worker on the machine that runs the containers. That’s the right infra for Option B. - If you also want to orchestrate “build/push/pull” steps with Prefect tasks/blocks (instead of your own Docker SDK/CLI calls), adding the prefect-docker integration to your orchestrator can make those steps cleaner and more standardized. Where prefect-docker fits vs. your Option B - Per-run dynamic images: Prefect does not automatically build an image when you call run_deployment. You still need to ensure the image exists on the Docker worker host (or in a registry it can pull from) before you submit the run with job_variables["image"]. - You can achieve that in three ways: 1) CI-first (recommended for reliability): Build/push images in CI and pass an immutable tag at runtime via job_variables. Easiest to operate; zero “build” logic in your flow code. 2) Pre-step in your flow (local worker): Run a small “prepare image” task that builds the image locally (docker SDK/CLI) and then call run_deployment with that tag. This works only when the orchestrator and the Docker worker share the same Docker daemon. 3) Pre-step that builds and pushes: If the worker is remote, your pre-step must push to a registry. Then pass the registry tag via job_variables. - Using prefect-docker is most helpful for (2) and (3), because it provides Prefect-native tasks/blocks to manage Docker operations and credentials instead of hand-rolled docker SDK/CLI calls. It won’t change the core pattern: you still build/push/pull first, then run_deployment with job_variables["image"]. When I’d choose prefect-docker - You want Docker operations (build, push, pull) as first-class Prefect tasks with retries/logging/visibility. - You manage registry credentials as Prefect blocks and prefer not to write Docker SDK code. - You plan to reuse the pattern across many flows and want a standardized approach. When I’d skip it - Local-only development, same Docker daemon as the worker, and a simple “docker build/pull” subprocess or docker SDK call is sufficient. - You move all builds to CI and only ever pass stable tags at runtime (no building from your flow). Operational reminder - Regardless of approach, you’ll still pass the chosen image per run using job_variables on run_deployment, e.g.: - image: the tag you built/pulled/pushed - env/volumes: any runtime mounts or credentials - If your worker is remote, prefer CI-built images pushed to a registry; it’s more predictable and faster at run time. Docs - Customize job variables: Customize job variables - Create deployments and use DockerImage during deployment builds: Create deployments (note: DockerImage here is for deployment-time builds, not per-run building) If you share your exact preference (CI-built tags vs. flow-built, local vs. remote worker), I can sketch a minimal “prepare_image” step using either prefect-docker tasks or a small subprocess/docker SDK helper, including how to wire its output tag into the subsequent run_deployment call.
d
@Marvin given the setup described above, with the following being called for a docker subflow in a docker worker:
Copy code
job_variables = {
                "image": docker_config.get("image", "python:3.10-slim"),
                "env": {
                    "PYTHONPATH": "/app",
                    **docker_config.get("env", {})
                },
                "volumes": volumes,
                "pull_policy": "Never"  # Force use of local image
            }
            
            # Submit to Docker deployment (worker will create container)
            subflow_run = await run_deployment(
                name=f"{DOCKER_FLOW_NAME}/{DOCKER_DEPLOYMENT_NAME}",
                parameters={
                    "module_path": module_path,
                    "params": params,
                    "upstream_data": upstream_data,
                    "target_uri": target_uri
                },
                job_variables=job_variables,  # This configures the container
                as_subflow=True,
            )
i am currently facing this error
Copy code
Failed to submit flow run 'b1b444ce-9a3a-49a1-92a0-304bdd6bc9b9' to infrastructure.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/docker/api/client.py", line 275, in _raise_for_status
    response.raise_for_status()
  File "/usr/local/lib/python3.10/site-packages/requests/models.py", line 1026, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 404 Client Error: Not Found for url: <http+docker://localhost/v1.49/images/create?tag=latest&fromImage=example_deploy_flow>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/workers/base.py", line 1262, in _submit_run_and_capture_errors
    result = await self.run(
  File "/usr/local/lib/python3.10/site-packages/prefect_docker/worker.py", line 457, in run
    container, created_event = await run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 233, in run_sync_in_worker_thread
    result = await anyio.to_thread.run_sync(
  File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 2405, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 914, in run
    result = context.run(func, *args)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 243, in call_with_mark
    return call()
  File "/usr/local/lib/python3.10/site-packages/prefect_docker/worker.py", line 727, in _create_and_start_container
    self._pull_image(docker_client, configuration)
  File "/usr/local/lib/python3.10/site-packages/prefect_docker/worker.py", line 876, in _pull_image
    return docker_client.images.pull(image, tag)
  File "/usr/local/lib/python3.10/site-packages/docker/models/images.py", line 464, in pull
    pull_log = self.client.api.pull(
  File "/usr/local/lib/python3.10/site-packages/docker/api/image.py", line 429, in pull
    self._raise_for_status(response)
  File "/usr/local/lib/python3.10/site-packages/docker/api/client.py", line 277, in _raise_for_status
    raise create_api_error_from_http_exception(e) from e
  File "/usr/local/lib/python3.10/site-packages/docker/errors.py", line 39, in create_api_error_from_http_exception
    raise cls(e, response=response, explanation=explanation) from e
docker.errors.ImageNotFound: 404 Client Error for <http+docker://localhost/v1.49/images/create?tag=latest&fromImage=example_deploy_flow>: Not Found ("pull access denied for example_deploy_flow, repository does not exist or may require 'docker login': denied: requested access to the resource is denied")
m
thought for 122.0 seconds
Thanks for the stack trace — it tells us exactly what’s happening. What’s going on - The Docker worker is trying to pull the image named “example_deploy_flowlatest” - fromImage=example_deploy_flow, tag=latest - That means the worker did not honor your intended image override (“python:3.10-slim”) and fell back to an image configured on the deployment or the work pool defaults. - It also ignored your pull policy because the key you passed is not recognized; the worker attempted a pull and got a 404. Likely causes - Wrong key for the pull policy: use image_pull_policy (not pull_policy). - Your job_variables override did not apply (e.g., wrong deployment name, older package versions, or a subtle mismatch). - Your deployment or work pool default job variables specify image=example_deploy_flow and the worker used that because the override wasn’t applied. How to fix 1) Use the correct job variable keys and ensure the override is applied - Replace pull_policy with image_pull_policy. - Verify the image is exactly what you want (include a tag). - After you submit the run, open the flow run in the UI and check “Infrastructure” — it should show the resolved image and job variables. If it still shows example_deploy_flow, the override is not being applied. Updated snippet:
Copy code
job_variables = {
    "image": docker_config.get("image", "python:3.10-slim"),
    "env": {
        "PYTHONPATH": "/app",
        **docker_config.get("env", {})
    },
    "volumes": volumes,
    "image_pull_policy": "Never"  # correct key
}

subflow_run = await run_deployment(
    name=f"{DOCKER_FLOW_NAME}/{DOCKER_DEPLOYMENT_NAME}",
    parameters={
        "module_path": module_path,
        "params": params,
        "upstream_data": upstream_data,
        "target_uri": target_uri
    },
    job_variables=job_variables,
    as_subflow=True,
)
2) Confirm your deployment and work pool defaults - If your deployment was created with a Docker image (e.g., via DockerImage(name="example_deploy_flow", tag="latest")), that’s the image you’re seeing. - Overrides from run_deployment(job_variables=...) should still win, but if they aren’t, double-check: - You’re calling the correct deployment (Flow/Deployment pair). - You’re on Prefect 3.x with prefect-docker installed on the worker. - Your Docker worker belongs to the work pool targeted by that deployment. - You can inspect the deployment in the UI (Deployments tab) and also review the work pool’s default job variables. 3) If you truly want to use a local image on the worker - Make sure the image exists on the worker’s Docker daemon with the exact name:tag you pass. - With image_pull_policy="Never", the worker will not pull; if the image is missing locally, the run will fail. If you need “pull if missing,” use image_pull_policy="IfNotPresent". 4) If the worker is on a different host than your API/orchestrator - “Local” means local to the worker’s Docker daemon. If you build images on another host, push them to a registry and reference the registry tag in job_variables["image"]. 5) Quick debugging steps - In the Prefect UI, open the failed flow run and check: - Flow run > Details > Infrastructure: does it show image=python:3.10-slim and image_pull_policy=Never? If not, the override didn’t apply. - Worker logs: they’ll print the chosen image and pull policy. - If you can’t access the UI, you can also run: - prefect deployment inspect "YourFlow/YourDeployment" (to see deployment defaults) - Make sure your job_variables dict only uses keys the Docker worker understands: commonly image, image_pull_policy, env, volumes, (and optionally others like networks). See: Customize job variables Why the error says “pull access denied…example_deploy_flow” - That’s the image configured on your deployment or pool, likely created during an earlier “build with DockerImage(name='example_deploy_flow')”.
- Since the override wasn’t applied and the pull policy default allowed pulling, the worker tried to pull that name from a registry and failed. If this doesn’t resolve it, please share: - The exact deployment name you pass to run_deployment - A screenshot or JSON from the UI showing the flow run’s Infrastructure section - Versions of prefect and prefect-docker on your worker - Whether the worker runs on the same machine as your FastAPI app I can then pinpoint whether it’s an override issue, a pool/deployment default, or a version mismatch.
d
@Marvin from within the newly created container by the run_deployment, how do i pass logging information back into the worker?
m
thought for 43.8 seconds
Short answer: log with Prefect’s logger inside the container. Those logs are sent to the Prefect API and show up in the UI and (depending on worker config) alongside worker/infrastructure logs. How to do it - In flows and tasks, use get_run_logger:
Copy code
from prefect import flow, task, get_run_logger

@task
def do_work():
    logger = get_run_logger()
    <http://logger.info|logger.info>("Task is running inside the Docker container")
    # your code...

@flow
def docker_subflow(module_path: str, params: dict, upstream_data: str | None) -> str:
    logger = get_run_logger()
    <http://logger.info|logger.info>("Subflow started in Docker")
    # your code...
    <http://logger.info|logger.info>("Subflow completed")
    return "artifact://..."
- If your dynamic modules use Python’s standard logging, make sure their logs propagate so Prefect can capture them. Optionally set an env var to capture extra loggers in your job_variables:
Copy code
job_variables = {
  "image": "...",
  "env": {
    "PYTHONPATH": "/app",
    # Capture non-Prefect loggers by name (e.g., your_pkg, uvicorn)
    "PREFECT_LOGGING_EXTRA_LOGGERS": "your_pkg,uvicorn",
    # Optional: set level
    "PREFECT_LOGGING_LEVEL": "INFO",
  },
  "volumes": [...],
}
- If you spawn subprocesses and want their output in Prefect logs, capture stdout/stderr and forward to the Prefect logger:
Copy code
import subprocess
from prefect import get_run_logger

def run_and_stream(cmd: list[str]) -> None:
    logger = get_run_logger()
    with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1) as p:
        for line in p.stdout:
            <http://logger.info|logger.info>(line.rstrip())
    rc = p.wait()
    if rc != 0:
        raise RuntimeError(f"Command failed with exit code {rc}")
What gets where - Prefect logger calls (get_run_logger) are shipped to the Prefect API and appear on the flow/task run in the UI. - Standard Python loggers from your code can be captured if they propagate and you include them via PREFECT_LOGGING_EXTRA_LOGGERS. - Bare prints/subprocess output do not automatically reach Prefect; capture and forward them as shown above. - The Docker worker also emits “infrastructure” logs (container creation, image pulls). Those are separate from flow/task logs. Gotchas - Make sure your Docker subflow actually runs inside a Prefect flow/task context (i.e., the container is running your Prefect flow). Then get_run_logger will work and send logs to the API. - If the container runs code outside Prefect (e.g., just a shell script), the worker may only see stdout/stderr as generic container logs. If you want them in the Prefect flow logs, capture and forward via a Prefect task/flow process as shown above. - Ensure the environment inside the container includes the Prefect configuration the worker injects (the Docker worker handles this automatically for flow runs launched via run_deployment).
d
@Marvin how to debug problems after i call
Copy code
subflow_run = await run_deployment(
    name=f"{DOCKER_FLOW_NAME}/{DOCKER_DEPLOYMENT_NAME}",
    parameters={
        "module_path": module_path,
        "params": params,
        "upstream_data": upstream_data,
        "target_uri": target_uri
    },
    job_variables=job_variables,
    as_subflow=True,
)

# Wait for container execution to complete
<http://logger.info|logger.info>(f"Waiting for Docker subflow completion: {subflow_run.id}")
finished_subflow = await wait_for_flow_run(
    subflow_run.id,
    log_states=True
)
and before the subflow runs?
Copy code
@flow(name=DOCKER_FLOW_NAME)
def docker_subflow(module_path: str, params: dict, upstream_data: str, target_uri: str) -> str:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Docker subflow executing in container for: {module_path}")
currently the flow run crashes before any logs gets displayed in the subflow as per:
Copy code
prefect-docker-worker_1   | 02:27:20.125 | INFO    | prefect.flow_runs.worker - Worker 'DockerWorker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3' submitting flow run '41942f4a-4606-41dd-a631-e9c0117a7488'
prefect-docker-worker_1   | 02:27:20.207 | INFO    | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Creating Docker container 'tricky-skua'...
prefect-docker-worker_1   | 02:27:20.208 | INFO    | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Creating Docker container 'tricky-skua'...
prefect-docker-worker_1   | 02:27:20.218 | INFO    | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Docker container 'tricky-skua' has status 'created'
prefect-docker-worker_1   | 02:27:20.218 | INFO    | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Docker container 'tricky-skua' created successfully.
prefect-docker-worker_1   | 02:27:20.365 | INFO    | prefect.flow_runs.worker - Completed submission of flow run '41942f4a-4606-41dd-a631-e9c0117a7488'
prefect-docker-worker_1   | 02:27:20.367 | INFO    | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Docker container 'tricky-skua' has status 'running'
prefect-docker-worker_1   | 02:27:26.342 | INFO    | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Docker container 'tricky-skua' has status 'exited'
prefect-docker-worker_1   | 02:27:26.345 | INFO    | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Docker container 'tricky-skua' has status 'exited'
prefect-docker-worker_1   | 02:27:26.380 | INFO    | prefect.flow_runs.worker - Reported flow run '41942f4a-4606-41dd-a631-e9c0117a7488' as crashed: Flow run infrastructure exited with non-zero status code 1.
prefect-process-worker_1  | 02:27:28.203 | INFO    | prefect.haimo_orchestration.flows - Waiting for Docker subflow completion: 41942f4a-4606-41dd-a631-e9c0117a7488
prefect-process-worker_1  | 02:27:28.225 | INFO    | prefect - Flow run is in state 'Crashed'
prefect-process-worker_1  | 02:27:28.226 | ERROR   | prefect.haimo_orchestration.flows - Error executing module ExampleDeploy: Docker subflow failed with state: Crashed('Flow run infrastructure exited with non-zero status code 1.')
m
thought for 105.5 seconds
This symptom (“container created → running → exited quickly, state: Crashed; no subflow logs”) almost always means the process inside the container failed before Prefect could start your flow engine and logger. In practice, that’s usually one of: - Prefect is not installed in the image (e.g., using python:3.10-slim as-is) - Your flow code is not present/visible in the container (bad mount/entrypoint path) - The container’s working directory/PYTHONPATH doesn’t match the deployment’s entrypoint - You’ve overridden CMD/entrypoint in a way that prevents Prefect from launching the flow Here’s a concrete, step-by-step way to debug between run_deployment and the first subflow log line: 1) Inspect the container itself after the crash - The worker logs show the container name (e.g., ‘tricky-skua’). On the same Docker host: - docker logs tricky-skua - docker inspect tricky-skua - You’ll usually see a clear Python traceback in docker logs (e.g., ModuleNotFoundError: No module named 'prefect' or your module). - docker inspect will show the command the worker tried to run and the environment injected. That tells you exactly what Prefect tried to execute. 2) Confirm Prefect is installed in the image - If you’re using a base image like python:3.10-slim, your container will exit immediately with a ModuleNotFoundError for prefect. - Quick check: - docker run --rm -it <your-image> python -c "import prefect; print(prefect.version)" - If this fails, bake Prefect into your image or use a Prefect base image. - Minimal Dockerfile for local dev:
Copy code
FROM python:3.11-slim
  RUN pip install --no-cache-dir "prefect==3.*" "prefect-docker"
  WORKDIR /app
  ENV PYTHONPATH=/app
Or start from a Prefect image:
Copy code
FROM prefecthq/prefect:3-latest
  WORKDIR /app
  ENV PYTHONPATH=/app
3) Ensure your code is present in the container at the path your deployment expects - If your deployment entrypoint is something like app/pipeline.py:docker_subflow, then app/pipeline.py must exist inside the container at runtime. - If you’re using your local codebase (not pulling from source), mount it and set PYTHONPATH: - job_variables example:
Copy code
job_variables = {
      "image": "prefecthq/prefect:3-latest",  # or your custom image with Prefect installed
      "env": {
        "PYTHONPATH": "/app",
        "PREFECT_LOGGING_LEVEL": "DEBUG",  # more verbosity
        # capture additional loggers if you want
        "PREFECT_LOGGING_EXTRA_LOGGERS": "uvicorn,your_pkg"
      },
      "volumes": [
        "/abs/path/to/your/project:/app:ro"
      ],
      # optional, helps relative imports if your code assumes /app
      "working_dir": "/app",
      # pulling policy depending on your workflow
      "image_pull_policy": "IfNotPresent"
    }
- If your entrypoint path is different (e.g., monorepo subdir), align the mount and working_dir accordingly. 4) Verify you aren’t overriding the container command - Let the Docker worker set the command needed to run the flow (it will run a Prefect command with your flow run ID). - If you pass a “command” override in job_variables (or your image ENTRYPOINT/CMD conflicts), it can kill the run before Prefect starts. Remove custom command overrides while debugging. 5) Confirm your deployment target and overrides are actually applied - In the Prefect UI, open the subflow run → Details → Infrastructure and verify: - image is what you set in job_variables - working_dir, env, volumes look correct - If you still see an unexpected image (e.g., example_deploy_flow), you’re calling the wrong deployment or the override did not apply. - Use the correct key name for pull policy: image_pull_policy (not pull_policy). 6) Turn up worker-side logs - Run your Docker worker with PREFECT_LOGGING_LEVEL=DEBUG to get more infra detail. - The worker logs will show the container command, env, and any errors in creating/starting the container. 7) Reproduce interactively to isolate the import issue
- Once you know the command/environment from docker inspect, you can reproduce interactively: - docker run -it --env <each env var from inspect> -v /abs/path:/app -w /app <image> bash - Inside, try: - python -c "import prefect; import importlib; importlib.import_module('your_pkg'); print('ok')" - Make sure importlib.import_module('path.module') matches your module_path Most likely root cause in your case - Your example earlier defaulted to "python:3.10-slim". That image does not include Prefect, so the container would exit immediately with ModuleNotFoundError before your flow could start — hence no subflow logs. Switching to an image that includes Prefect (prefecthq/prefect:3-latest or your own image with pip install prefect) plus ensuring your code is available (volume or baked in) usually resolves this exact pattern. Once the engine starts, your get_run_logger calls in docker_subflow will show up as normal flow logs. Until then, the only clues are in the container’s stdout/stderr and the worker’s infrastructure logs. Docs - Customize job variables (image/env/volumes/working_dir etc.): Customize job variables - Create deployments: Create deployments If you paste: - The deployment entrypoint you used to create the docker_subflow deployment - The job_variables you’re passing - The output of docker logs/inspect for the failed container …I can zero in on the exact fix (path vs. import vs. missing Prefect).
d
@Marvin here are the logs:
Copy code
docker logs tricky-skua
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/cli/_utilities.py", line 44, in wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect/cli/_types.py", line 156, in sync_fn
    return asyncio.run(async_fn(*args, **kwargs))
  File "/usr/local/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/usr/local/lib/python3.10/site-packages/prefect/cli/flow_run.py", line 412, in execute
    await runner.execute_flow_run(id)
  File "/usr/local/lib/python3.10/site-packages/prefect/runner/runner.py", line 571, in execute_flow_run
    async with context:
  File "/usr/local/lib/python3.10/site-packages/prefect/runner/runner.py", line 1498, in __aenter__
    await self._exit_stack.enter_async_context(
  File "/usr/local/lib/python3.10/contextlib.py", line 619, in enter_async_context
    result = await _cm_type.__aenter__(cm)
  File "/usr/local/lib/python3.10/site-packages/prefect/runner/_observers.py", line 42, in __aenter__
    self._events_subscriber = await self._exit_stack.enter_async_context(
  File "/usr/local/lib/python3.10/contextlib.py", line 619, in enter_async_context
    result = await _cm_type.__aenter__(cm)
  File "/usr/local/lib/python3.10/site-packages/prefect/events/clients.py", line 555, in __aenter__
    await self._reconnect()
  File "/usr/local/lib/python3.10/site-packages/prefect/events/clients.py", line 566, in _reconnect
    self._websocket = await self._connect.__aenter__()
  File "/usr/local/lib/python3.10/site-packages/websockets/asyncio/client.py", line 482, in __aenter__
    return await self
  File "/usr/local/lib/python3.10/site-packages/prefect/_internal/websockets.py", line 135, in _proxy_connect
    proto = await self.__await_impl__()
  File "/usr/local/lib/python3.10/site-packages/websockets/asyncio/client.py", line 439, in __await_impl__
    self.connection = await self.create_connection()
  File "/usr/local/lib/python3.10/site-packages/websockets/asyncio/client.py", line 365, in create_connection
    _, connection = await loop.create_connection(factory, **kwargs)
  File "/usr/local/lib/python3.10/asyncio/base_events.py", line 1036, in create_connection
    infos = await self._ensure_resolved(
  File "/usr/local/lib/python3.10/asyncio/base_events.py", line 1418, in _ensure_resolved
    return await loop.getaddrinfo(host, port, family=family, type=type,
  File "/usr/local/lib/python3.10/asyncio/base_events.py", line 863, in getaddrinfo
    return await self.run_in_executor(
  File "/usr/local/lib/python3.10/concurrent/futures/thread.py", line 58, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/usr/local/lib/python3.10/socket.py", line 967, in getaddrinfo
    for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
socket.gaierror: [Errno -2] Name or service not known
An exception occurred.
m
thought for 69.9 seconds
Thanks for the logs — they pinpoint the issue. Root cause - Inside the job container, Prefect is trying to open a websocket to the Prefect API/events service and fails with DNS resolution: - socket.gaierror: [Errno -2] Name or service not known - This almost always means the PREFECT_API_URL the container is using points to a host that is not resolvable/reachable from inside the container (e.g., “localhost”, a host-only name, or a service on a different Docker network). What to check and fix 1) Inspect the failing container to see exactly what it tried to use - Look at the environment injected by the worker: - docker inspect tricky-skua | jq '.[0].Config.Env' - Find PREFECT_API_URL (and PREFECT_API_KEY if you’re on Cloud). - Also check the container’s command (Config.Cmd) to confirm it is running the Prefect runner. 2) Ensure the API URL is reachable from inside containers - If you’re using Prefect Server in Docker Compose: - Put your Prefect API (server), your Docker worker, and job containers on the same user-defined Docker network. - Set PREFECT_API_URL to the service name on that network, e.g. http://prefect-server:4200/api (replace with your service name/port). - Attach the job container to that network via job_variables:
Copy code
job_variables = {
      "image": "...",
      "env": {
        "PYTHONPATH": "/app",
        # optional for more logs:
        "PREFECT_LOGGING_LEVEL": "DEBUG",
      },
      "volumes": [...],
      "networks": ["your_compose_network_name"],
      "image_pull_policy": "IfNotPresent",
      # optionally set working_dir if needed
      "working_dir": "/app",
    }
- Also ensure the Docker worker service itself is attached to the same network; the worker will connect job containers to the listed networks. - If you’re using Prefect Server running on your host (not containerized): - Do not use localhost or 127.0.0.1 in PREFECT_API_URL — that points to the job container itself. - On Mac/Windows Docker Desktop, you can use http://host.docker.internal:4200/api. - On Linux, consider: - Exposing the host on a reachable IP/hostname and use that in PREFECT_API_URL, or - Mapping host.docker.internal via extra hosts on your image/daemon and using http://host.docker.internal:4200/api, or - Running the job container with host networking (not generally recommended) if your environment permits it. - If you’re using Prefect Cloud: - PREFECT_API_URL should be the Cloud API URL and should resolve on the internet. If you see DNS failure here, check outbound DNS/firewall from the Docker host network. 3) Make sure you’re not wiping out the worker’s injected env - The Docker worker injects Prefect env (PREFECT_API_URL, flow run ID, etc.). Your job_variables["env"] is merged, not replaced, in current releases — but if you’re on older versions or have custom code, confirm that the Prefect env vars are present in docker inspect. 4) Quick network test - From the Docker host, run a test container on the same network to curl the API URL you plan to use: - docker run --rm --network your_compose_network_name curlimages/curl:8.7.1 -v http://prefect-server:4200/api - If that fails, it’s a network or naming issue, not Prefect. 5) Prefer consistent defaults - Put the correct network(s) in your Docker work pool default job variables so you don’t have to pass them for every subflow. - Set PREFECT_API_URL at the worker container level (env) to a value that works for all job containers; the worker will propagate it. Likely fix for your setup - You have both a process worker and a docker worker in Compose (“prefect-process-worker_1”, “prefect-docker-worker_1”). If your Prefect API is also in Compose, create/use a shared network (e.g., “prefect-net”), attach all three services to it, set the docker worker’s environment: - PREFECT_API_URL=http://prefect-server:4200/api - Then add the same network to job_variables["networks"] so the job container can resolve “prefect-server”.
Example docker-compose.yml sketch
Copy code
services:
  prefect-server:
    image: prefecthq/prefect:3-latest
    command: server start
    ports: ["4200:4200"]
    networks: ["prefect-net"]

  prefect-docker-worker:
    image: prefecthq/prefect:3-latest
    command: >
      prefect worker start
      --type docker
      --pool local-docker-pool
    environment:
      - PREFECT_API_URL=<http://prefect-server:4200/api>
      - PREFECT_LOGGING_LEVEL=DEBUG
    networks: ["prefect-net"]

  prefect-process-worker:
    image: prefecthq/prefect:3-latest
    command: >
      prefect worker start
      --type process
      --pool local-pool
    environment:
      - PREFECT_API_URL=<http://prefect-server:4200/api>
    networks: ["prefect-net"]

networks:
  prefect-net:
    driver: bridge
Then your job_variables should include:
Copy code
job_variables = {
  "image": "prefecthq/prefect:3-latest",  # or your image with Prefect installed
  "env": {"PYTHONPATH": "/app"},
  "volumes": ["/abs/path/to/project:/app:ro"],
  "networks": ["prefect-net"],
  "image_pull_policy": "IfNotPresent",
  "working_dir": "/app",
}
Once the container can resolve and reach PREFECT_API_URL, your flow engine will start and you’ll see the first logger.info lines from docker_subflow. If you share: - Your PREFECT_API_URL value - Whether your Prefect API is in Compose or on the host - The network(s) your workers and API are attached to …I can tailor the exact network and env settings for your case.
d
@Marvin I am using the docker-compose method as reccomended, with shared network across the services. Added the job_variables as described, but now facing the following error:
Copy code
prefect-docker-worker_1   | 03:00:25.482 | INFO    | prefect.flow_runs.worker - Worker 'DockerWorker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3' submitting flow run 'd5ea019d-dd70-4d4c-9bc0-bbf4ddf2270e'
prefect-docker-worker_1   | 03:00:25.563 | INFO    | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Creating Docker container 'golden-bittern'...
prefect-docker-worker_1   | 03:00:25.564 | INFO    | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Creating Docker container 'golden-bittern'...
prefect-docker-worker_1   | 03:00:25.574 | INFO    | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Docker container 'golden-bittern' has status 'created'
prefect-docker-worker_1   | 03:00:25.574 | INFO    | prefect.workers.docker.dockerworker 12e8a2ce-ec0f-46f6-a19b-aefcd13c32d3 - Docker container 'golden-bittern' created successfully.
prefect-docker-worker_1   | 03:00:25.721 | ERROR   | prefect.flow_runs.worker - Failed to submit flow run 'd5ea019d-dd70-4d4c-9bc0-bbf4ddf2270e' to infrastructure.
prefect-docker-worker_1   | Traceback (most recent call last):
prefect-docker-worker_1   |   File "/usr/local/lib/python3.10/site-packages/docker/api/client.py", line 275, in _raise_for_status
prefect-docker-worker_1   |     response.raise_for_status()
prefect-docker-worker_1   |   File "/usr/local/lib/python3.10/site-packages/requests/models.py", line 1026, in raise_for_status
prefect-docker-worker_1   |     raise HTTPError(http_error_msg, response=self)
prefect-docker-worker_1   | requests.exceptions.HTTPError: 404 Client Error: Not Found for url: <http+docker://localhost/v1.49/containers/2050a4c76c34e7f58126a7e49a52aef35e5d19ffb5a1b4a59db3f86d87224834/start>
prefect-docker-worker_1   | 
prefect-docker-worker_1   | The above exception was the direct cause of the following exception:
prefect-docker-worker_1   | 
prefect-docker-worker_1   | Traceback (most recent call last):
prefect-docker-worker_1   |   File "/usr/local/lib/python3.10/site-packages/prefect/workers/base.py", line 1262, in _submit_run_and_capture_errors
prefect-docker-worker_1   |     result = await self.run(
prefect-docker-worker_1   |   File "/usr/local/lib/python3.10/site-packages/prefect_docker/worker.py", line 457, in run
prefect-docker-worker_1   |     container, created_event = await run_sync_in_worker_thread(
prefect-docker-worker_1   |   File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 233, in run_sync_in_worker_thread
prefect-docker-worker_1   |     result = await anyio.to_thread.run_sync(
prefect-docker-worker_1   |   File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 56, in run_sync
prefect-docker-worker_1   |     return await get_async_backend().run_sync_in_worker_thread(
prefect-docker-worker_1   |   File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 2405, in run_sync_in_worker_thread
prefect-docker-worker_1   |     return await future
prefect-docker-worker_1   |   File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 914, in run
prefect-docker-worker_1   |     result = context.run(func, *args)
prefect-docker-worker_1   |   File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 243, in call_with_mark
prefect-docker-worker_1   |     return call()
prefect-docker-worker_1   |   File "/usr/local/lib/python3.10/site-packages/prefect_docker/worker.py", line 753, in _create_and_start_container
prefect-docker-worker_1   |     container.start()
prefect-docker-worker_1   |   File "/usr/local/lib/python3.10/site-packages/docker/models/containers.py", line 420, in start
prefect-docker-worker_1   |     return self.client.api.start(self.id, **kwargs)
prefect-docker-worker_1   |   File "/usr/local/lib/python3.10/site-packages/docker/utils/decorators.py", line 19, in wrapped
prefect-docker-worker_1   |     return f(self, resource_id, *args, **kwargs)
prefect-docker-worker_1   |   File "/usr/local/lib/python3.10/site-packages/docker/api/container.py", line 1136, in start
prefect-docker-worker_1   |     self._raise_for_status(res)
prefect-docker-worker_1   |   File "/usr/local/lib/python3.10/site-packages/docker/api/client.py", line 277, in _raise_for_status
prefect-docker-worker_1   |     raise create_api_error_from_http_exception(e) from e
prefect-docker-worker_1   |   File "/usr/local/lib/python3.10/site-packages/docker/errors.py", line 39, in create_api_error_from_http_exception
prefect-docker-worker_1   |     raise cls(e, response=response, explanation=explanation) from e
prefect-docker-worker_1   | docker.errors.NotFound: 404 Client Error for <http+docker://localhost/v1.49/containers/2050a4c76c34e7f58126a7e49a52aef35e5d19ffb5a1b4a59db3f86d87224834/start>: Not Found ("failed to set up container networking: network my-network not found")
prefect-docker-worker_1   | 03:00:25.748 | INFO    | prefect.flow_runs.worker - Reported flow run 'd5ea019d-dd70-4d4c-9bc0-bbf4ddf2270e' as crashed: Flow run could not be submitted to infrastructure:
prefect-docker-worker_1   | NotFound(HTTPError('404 Client Error: Not Found for url: <http+docker://localhost/v1.49/containers/2050a4c76c34e7f58126a7e49a52aef35e5d19ffb5a1b4a59db3f86d87224834/start'))>
m
thought for 46.4 seconds
That error is coming from Docker: the worker tried to attach your job container to a network that does not exist on the Docker host: docker.errors.NotFound: ... failed to set up container networking: network my-network not found What this means - When you pass job_variables["networks"] = ["my-network"], the Docker worker expects an existing Docker network named exactly “my-network”. - With docker-compose, the actual network name is often prefixed with the compose project name (e.g., <project>_<network>). If you didn’t pin the name, “my-network” may not exist — instead it might be something like “myproj_my-network” or “myproj_default”. How to fix it 1) Find the real network name - On the Docker host, run: - docker network ls - Or inspect your worker container to see the networks it’s attached to: - docker inspect <docker-worker-container> | jq '.[0].NetworkSettings.Networks' - Use one of those exact network names in job_variables["networks"]. 2) Pin a stable network name in docker-compose - In your compose file, declare the network with a fixed name so it’s predictable:
Copy code
services:
  prefect-server:
    ...
    networks: ["prefect-net"]

  prefect-docker-worker:
    ...
    networks: ["prefect-net"]

  prefect-process-worker:
    ...
    networks: ["prefect-net"]

networks:
  prefect-net:
    name: prefect-net      # pins the actual Docker network name
    driver: bridge
- Then use:
Copy code
job_variables = {
  "image": "...",
  "env": {...},
  "volumes": [...],
  "networks": ["prefect-net"],   # must match docker network ls
  "image_pull_policy": "IfNotPresent",
  "working_dir": "/app",
}
Alternative (external network) - Create the network once and mark it external so Compose won’t rename it:
Copy code
# Create it once:
# docker network create prefect-net

networks:
  prefect-net:
    external: true
    name: prefect-net
- Then attach your services to “prefect-net” and use that exact name in job_variables. 3) Sanity checks - After changing compose config, re-create services: docker compose up -d --force-recreate - Verify the network exists: docker network ls should show “prefect-net”. - Verify the worker is on that network: docker inspect <docker-worker-container> → NetworkSettings.Networks keys include “prefect-net”. - Try again; the job container should start and attach to the network. Notes and tips - If you omit job_variables["networks"], Docker will attach the job to the default “bridge” network. That won’t be able to resolve your Prefect server container by its compose service name. For compose-to-compose connectivity, attaching to the shared network is the right move. - Make sure you’re using the correct key names in job variables; for pull policy use "image_pull_policy", not "pull_policy". - You can set the default networks at the work pool level to avoid specifying them on every
run_deployment
. In the UI: Work Pools → your Docker pool → Default job variables. Docs - Customize job variables If you share your docker-compose networks section and the exact value you’re passing in job_variables["networks"], I can suggest the exact string to use.
d
@Marvin network works now, but i am currently facing issues passing files over the shared mount from a previous module to the current module. the folder seems to not exist in the current module. could it be that the files have not been written yet as of the current module running? or why else would this not work?
m
thought for 72.9 seconds
Short answer: it’s rarely a “timing” issue if you’re awaiting the previous subflow’s completion. It’s almost always a path/volume mismatch, mount mode, or permissions problem. Checklist to make shared files work reliably between modules - Use the same absolute host path and the same container path every time: - Pick a single host directory for artifacts, e.g., /var/prefect/artifacts. - Mount it into every Docker job at the same container path, e.g., /var/prefect/artifacts. - Example default on your Docker work pool (or per-run job_variables):
Copy code
"volumes": ["/var/prefect/artifacts:/var/prefect/artifacts:rw"]
- Use absolute paths in your code/URIs: - Save/load artifacts under a fixed root, e.g., file:///var/prefect/artifacts/{run_id}/... - Do not rely on relative paths or working directory. - Keep the same path across in-process and Docker: - If an in-process module writes to /var/prefect/artifacts on the host, the Docker job must mount that same host path at /var/prefect/artifacts (not /app/data or some other path). - Avoid mixing named volumes and bind mounts unintentionally: - If you mount a named volume (e.g., data-vol:/data), that’s not the same as a bind mount to a host directory. If the previous module wrote to a host path, but the next module mounts a named volume, you’ll see an empty folder. - Use :rw for data volumes: - If you mounted your project code as read-only (:ro), that’s fine for code. - For data/artifacts, use :rw so modules can create folders/files. - Ensure the host directory exists before runs: - Docker will create missing host paths as an empty directory for bind mounts; if you expected pre-existing content and see an empty dir, you may be mounting the wrong host path. - Create the directory ahead of time with correct permissions. - Permissions: - If module A writes as root and module B runs as a non-root user, B may not see or access the files. - Consider aligning users across images or set permissive directory permissions on the host directory. - Consistent job_variables across runs: - Every Docker subflow invocation must include the same volume mapping for the data root; otherwise the next container won’t see the previous output. Is it a timing issue? - If you are awaiting the previous run (wait_for_flow_run) before starting the next module, the previous process has fully exited. Standard file writes should be complete and visible to the host immediately after close. Timing is rarely the cause unless: - Your code is writing asynchronously (threads/processes) and the flow returns before joins flush/close. - You’re using an eventually consistent remote FS (not typical for local bind mounts). - You wrote to a different path (e.g., /tmp) that isn’t a bind mount. Recommended “always works” pattern 1) Choose a single data root on the Docker host, e.g., /var/prefect/artifacts. 2) Add this to your Docker work pool default job variables so every Docker job gets it automatically:
Copy code
{
  "volumes": [
    "/var/prefect/artifacts:/var/prefect/artifacts:rw"
  ],
  "env": {
    "ARTIFACT_ROOT": "/var/prefect/artifacts"
  }
}
3) In your save/load helpers, always read/write under ARTIFACT_ROOT (fall back to /var/prefect/artifacts if not set). Return file:// URIs with absolute paths:
Copy code
file:///var/prefect/artifacts/<flow_run_id>/<module_name>/...
4) For in-process runs, write to the same absolute host path (/var/prefect/artifacts) so the next Docker container sees the files unchanged. Quick debugging steps - Confirm the run’s volumes in the UI: - Open the flow run > Details > Infrastructure and verify the volumes mapping is exactly what you expect for that run. - Inside the container, list the directory: - Temporarily add a debug task in the docker_subflow: ``` import os, subprocess from prefect import task, get_run_logger @task def debug_list(path: str): logger = get_run_logger()
logger.info(f"Checking path: {path}") logger.info(f"Exists? {os.path.exists(path)}") out = subprocess.run(["ls", "-la", path], capture_output=True, text=True) logger.info(out.stdout or out.stderr) ``` Then call
debug_list("/var/prefect/artifacts")
and
debug_list("/var/prefect/artifacts/<expected_subdir>")
. - Verify you’re not using different container paths in different runs: - If module A writes to /data (inside its container) but that’s not a bind mount to the host (or is mounted at a different host path), the next module won’t see it. - Check your job_variables source: - If you build the volumes list dynamically, log it before calling run_deployment to ensure it’s consistent between modules and points to the correct host path. If you share: - The exact volumes value you’re passing to job_variables for both the producer and consumer modules - An example artifact URI returned by module A - The code in your save_artifact/load_artifact helpers …I can pinpoint the mismatch (host path vs container path vs named volume) and suggest a precise fix.
d
@Marvin how to properly get the results from the docker container?
Copy code
subflow_run = await run_deployment(
                name=f"{DOCKER_FLOW_NAME}/{DOCKER_DEPLOYMENT_NAME}",
                parameters={
                    "module_path": module_path,
                    "params": params,
                    "upstream_data": upstream_data,
                    "target_uri": target_uri
                },
                job_variables=job_variables,  # This configures the container
                as_subflow=True,
            )
            
            # Wait for container execution to complete
            <http://logger.info|logger.info>(f"Waiting for Docker subflow completion: {subflow_run.id}")
            finished_subflow = await wait_for_flow_run(
                subflow_run.id,
                log_states=True
            )
            
            # Extract return value (artifact URI) from subflow
            if not finished_subflow.state.is_completed():
                raise RuntimeError(f"Docker subflow failed with state: {finished_subflow.state}")
then what?
m
thought for 37.1 seconds
You’re 95% there. After you’ve waited for the subflow to finish, read its return value from the terminal state. Use: - prefect.states.get_state_result to pull the returned value - prefect.states.raise_state_exception to re-raise the original error if the subflow failed Example:
Copy code
from prefect.states import get_state_result, raise_state_exception

# Submit and wait as you already do...
subflow_run = await run_deployment(
    name=f"{DOCKER_FLOW_NAME}/{DOCKER_DEPLOYMENT_NAME}",
    parameters={
        "module_path": module_path,
        "params": params,
        "upstream_data": upstream_data,
        "target_uri": target_uri,
    },
    job_variables=job_variables,
    as_subflow=True,
)

<http://logger.info|logger.info>(f"Waiting for Docker subflow completion: {subflow_run.id}")
finished_subflow = await wait_for_flow_run(subflow_run.id, log_states=True)

# Handle terminal state
if not finished_subflow.state.is_completed():
    # Re-raise the subflow's underlying exception with its original traceback
    raise_state_exception(finished_subflow.state)

# Retrieve the return value (e.g., your artifact URI string)
try:
    artifact_uri = get_state_result(finished_subflow.state)
except Exception as e:
    # Typically MissingResult if the flow returned a non-serializable object without a result store
    raise RuntimeError("Failed to read subflow result") from e

# Optional: sanity check type
if not isinstance(artifact_uri, str):
    raise TypeError(f"Expected subflow to return artifact URI string, got: {type(artifact_uri)}")

return artifact_uri
Important notes - Make sure your docker_subflow actually returns the URI:
Copy code
@flow(name=DOCKER_FLOW_NAME)
  def docker_subflow(...)->str:
      # compute and persist your artifact
      return out_uri
- Returning a small string (URI) is ideal; it’s easy for Prefect to serialize and retrieve. If you ever return large or complex objects, configure a result store/serializer or just stick to “return a URI, write the data to shared storage.” - If you see MissingResult when calling get_state_result, it means the subflow did not (or could not) persist a result that can be retrieved. Return a basic type (string) or configure result storage. Docs - Create deployments: https://docs-3.prefect.io/v3/how-to-guides/deployments/create-deployments - Customize job variables: https://docs-3.prefect.io/v3/how-to-guides/deployments/customize-job-variables
d
@Marvin should get_state_result be awaited? i added await as i was getting a coroutine return instead of a string, then with await added it led to the following:
Copy code
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
m
thought for 95.6 seconds
Short answers - No, do not await get_state_result. It’s synchronous: import it like
from prefect.states import get_state_result
and call it directly. - The MissingResult error means your subflow’s return value wasn’t persisted anywhere retrievable. By default, flow results are not persisted in Prefect 3.x. You have two good options: Option 1 (simplest): Don’t fetch a return value at all - You’re already passing
target_uri
into the docker subflow. Have the subflow write to that URI and just use that known URI as the “result” in the parent. No need to call get_state_result. - Parent code can skip result retrieval entirely:
Copy code
subflow_run = await run_deployment(..., parameters={..., "target_uri": target_uri}, ...)
finished = await wait_for_flow_run(subflow_run.id, log_states=True)
from prefect.states import raise_state_exception
if not finished.state.is_completed():
    raise_state_exception(finished.state)

# Use the same target_uri you passed in
return target_uri
Option 2 (persist and fetch the returned value) - Enable result persistence for the docker subflow and give it a result store. Then
get_state_result(finished.state)
will work. Steps: 1) Pick a result store accessible to both the subflow container and the parent (process worker). For local dev, a shared host path bind-mounted into both containers works; for production, prefer a remote store (S3/GCS/Azure). 2) Configure the docker_subflow to persist results:
Copy code
from prefect import flow
from prefect.filesystems import LocalFileSystem
from prefect.serializers import JSONSerializer

# Shared path mounted into both the Docker worker's job containers and your process worker
RESULTS_ROOT = "/var/prefect/results"
results_fs = LocalFileSystem(basepath=RESULTS_ROOT)

@flow(
    name=DOCKER_FLOW_NAME,
    persist_result=True,
    result_storage=results_fs,
    result_serializer=JSONSerializer(),
)
def docker_subflow(module_path: str, params: dict, upstream_data: str | None, target_uri: str) -> str:
    # compute & persist output at target_uri
    return target_uri  # small string is ideal
3) Mount the same path into both containers: - In your Docker work pool defaults or per-run job_variables:
Copy code
"volumes": [
  "/host/results:/var/prefect/results:rw"
]
- In your process worker service (compose), also mount
/host/results:/var/prefect/results:rw
. 4) In the parent, retrieve the result:
Copy code
from prefect.states import get_state_result, raise_state_exception

finished = await wait_for_flow_run(subflow_run.id, log_states=True)
if not finished.state.is_completed():
    raise_state_exception(finished.state)

artifact_uri = get_state_result(finished.state)  # no await
Notes and gotchas - If you saw “coroutine” from get_state_result, double-check your import. It should be: -
from prefect.states import get_state_result
- And called without await. - MissingResult means the state had no embedded data and no result store reference. Setting
persist_result=True
and a result store fixes that. - If you use LocalFileSystem for results, it must be a shared bind mount visible to the parent and job containers. Otherwise, prefer a remote store (S3/GCS) so Prefect can fetch results from anywhere. - Keep the actual data in your artifact store and only return a small string (the URI). Large return values are not recommended. If you want, share: - Your current docker_subflow decorator - The job_variables volumes for both the docker job and the process worker I can validate the exact storage setup to ensure
get_state_result
can read the persisted return.
d
@Marvin when removing the await this is the error:
Copy code
raise TypeError(f"Expected subflow to return artifact URI string, got: {type(artifact_uri)}")
prefect-process-worker_1  | TypeError: Expected subflow to return artifact URI string, got: <class 'coroutine'>
m
thought for 65.1 seconds
You’re seeing a coroutine because the subflow is returning an awaitable, not a string. Two key points: - get_state_result is synchronous. Don’t await it. Import it with
from prefect.states import get_state_result
and call it directly. - If
get_state_result
returns a coroutine, it means your docker_subflow returned a coroutine object (usually because you called an async function without awaiting it). Fix: ensure docker_subflow returns a plain string Handle both sync and async dynamic modules so you never return an awaitable. Option A: Make docker_subflow async and await as needed
Copy code
import importlib
import inspect
from typing import Any, Dict, Optional
from prefect import flow, get_run_logger

def dynamic_import(module_path: str):
    # "pkg.module:callable"
    mod_path, func_name = module_path.split(":")
    module = importlib.import_module(mod_path)
    return getattr(module, func_name)

def load_artifact(uri: Optional[str]) -> Any:
    # your implementation
    ...

def save_artifact(obj: Any, target_uri: Optional[str] = None) -> str:
    # your implementation; return the final URI
    ...

@flow(name="module-docker-subflow")
async def docker_subflow(
    module_path: str,
    params: Dict[str, Any],
    upstream_data: Optional[str],
    target_uri: Optional[str] = None,
) -> str:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Docker subflow executing: {module_path}")

    func = dynamic_import(module_path)
    input_obj = load_artifact(upstream_data)

    # Call the dynamic module safely for both sync/async impls
    if inspect.iscoroutinefunction(func):
        result = await func(input_obj, **params)
    else:
        maybe = func(input_obj, **params)
        result = await maybe if inspect.isawaitable(maybe) else maybe

    out_uri = save_artifact(result, target_uri=target_uri)
    return out_uri
Option B: Keep docker_subflow sync and run async functions explicitly
Copy code
import asyncio
import inspect

@flow(name="module-docker-subflow")
def docker_subflow(...)->str:
    func = dynamic_import(module_path)
    input_obj = load_artifact(upstream_data)

    if inspect.iscoroutinefunction(func):
        result = asyncio.run(func(input_obj, **params))
    else:
        maybe = func(input_obj, **params)
        if inspect.isawaitable(maybe):
            result = asyncio.run(maybe)
        else:
            result = maybe

    out_uri = save_artifact(result, target_uri=target_uri)
    return out_uri
Then, in the parent: - Do not await get_state_result. - First, raise the original exception if the subflow failed; then read the return value.
Copy code
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result, raise_state_exception

subflow_run = await run_deployment(
    name=f"{DOCKER_FLOW_NAME}/{DOCKER_DEPLOYMENT_NAME}",
    parameters={
        "module_path": module_path,
        "params": params,
        "upstream_data": upstream_data,
        "target_uri": target_uri,
    },
    job_variables=job_variables,
    as_subflow=True,
)

finished = await wait_for_flow_run(subflow_run.id, log_states=True)

if not finished.state.is_completed():
    # surfaces the real error/traceback
    raise_state_exception(finished.state)

artifact_uri = get_state_result(finished.state)  # no await
if not isinstance(artifact_uri, str):
    raise TypeError(f"Expected subflow to return artifact URI string, got {type(artifact_uri)}")
About the MissingResult error you saw - That happens when Prefect tries to fetch a result from the API but there’s nothing persisted. If you stick to returning a small string, Prefect will typically embed it in the state and
get_state_result
will succeed. - If you ever need to return larger/complex objects, set
persist_result=True
and configure a
result_storage
on the docker_subflow so Prefect can fetch it later. Otherwise, the best practice is returning a small URI and writing the real data to your artifact store.
Alternative: avoid fetching the return entirely - Since you pass
target_uri
into the subflow, you can just use that value in the parent (have the subflow write to it) and skip
get_state_result
altogether. This avoids result persistence concerns entirely.
d
@Marvin
Copy code
@flow(name="module-docker-subflow")
async def docker_subflow(
    module_path: str,
    params: Dict[str, Any],
    upstream_data: Optional[str],
    target_uri: Optional[str] = None,
) -> str:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Docker subflow executing: {module_path}")
m
thought for 97.3 seconds
Great — here’s a complete docker_subflow you can drop in that: - Works with both sync and async module functions - Loads input from a URI, writes output to a target_uri on your shared mount - Returns a plain string (the artifact URI) - Optionally persists the return value so the parent can reliably read it with get_state_result Option A: return target_uri and skip result fetching in parent (simplest and recommended) - You’re already passing target_uri into the subflow. Have the subflow write there and return it. In the parent, just use target_uri without calling get_state_result — nothing else needed. Option B: parent reads the subflow’s return value with get_state_result - Add persist_result=True and a result store shared by the process worker and the Docker jobs (e.g., bind-mounted LocalFileSystem or a remote store like S3). Then the parent can call get_state_result(finished.state) to get the returned string. Below implements Option B (so both A and B will work). If you decide to follow Option A, you can remove the persist_result/result_storage bits.
Copy code
import importlib
import inspect
import os
from typing import Any, Dict, Optional

from prefect import flow, get_run_logger
from prefect.filesystems import LocalFileSystem
from prefect.serializers import JSONSerializer

# Shared results root path inside containers and process worker
# Ensure this path is a bind-mount on BOTH your docker worker jobs AND your process worker
# e.g., host: /host/prefect/results -> container: /var/prefect/results
RESULTS_ROOT = os.getenv("PREFECT_RESULTS_ROOT", "/var/prefect/results")
RESULTS_FS = LocalFileSystem(basepath=RESULTS_ROOT)

def dynamic_import(module_path: str):
    # "package.module:callable"
    mod_path, func_name = module_path.split(":")
    module = importlib.import_module(mod_path)
    return getattr(module, func_name)

def load_artifact(uri: Optional[str]) -> Any:
    if uri is None:
        return None
    if uri.startswith("file://"):
        path = uri[len("file://"):]
        with open(path, "rb") as f:
            return f.read()
    # Add other schemes (s3://, gs://) as needed
    raise NotImplementedError(f"Unsupported artifact URI: {uri}")

def save_artifact(obj: Any, target_uri: Optional[str]) -> str:
    if not target_uri:
        raise ValueError("target_uri is required for docker_subflow")
    if target_uri.startswith("file://"):
        path = target_uri[len("file://"):]
        os.makedirs(os.path.dirname(path), exist_ok=True)
        # Choose your serialization; here we assume bytes or str
        mode = "wb" if isinstance(obj, (bytes, bytearray)) else "w"
        with open(path, mode) as f:
            f.write(obj if isinstance(obj, (bytes, bytearray)) else str(obj))
        return target_uri
    # Add other schemes (s3://, gs://) as needed
    raise NotImplementedError(f"Unsupported artifact URI: {target_uri}")

@flow(
    name="module-docker-subflow",
    # Persist the return value so parent can call get_state_result(...)
    persist_result=True,
    result_storage=RESULTS_FS,
    result_serializer=JSONSerializer(),
)
async def docker_subflow(
    module_path: str,
    params: Dict[str, Any],
    upstream_data: Optional[str],
    target_uri: Optional[str] = None,
) -> str:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Docker subflow executing: {module_path}")

    func = dynamic_import(module_path)
    input_obj = load_artifact(upstream_data)

    # Call module function; support both sync and async variants
    if inspect.iscoroutinefunction(func):
        result = await func(input_obj, **params)
    else:
        maybe = func(input_obj, **params)
        result = await maybe if inspect.isawaitable(maybe) else maybe

    out_uri = save_artifact(result, target_uri=target_uri)
    <http://logger.info|logger.info>(f"Wrote artifact to: {out_uri}")

    # Return a small string; parent can read via get_state_result (with persist_result=True)
    return out_uri
Parent side (recap) - Do not await get_state_result.
- If you want to surface the original error/traceback from the subflow, call raise_state_exception first.
Copy code
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import get_state_result, raise_state_exception

subflow_run = await run_deployment(
    name=f"{DOCKER_FLOW_NAME}/{DOCKER_DEPLOYMENT_NAME}",
    parameters={
        "module_path": module_path,
        "params": params,
        "upstream_data": upstream_data,
        "target_uri": target_uri,
    },
    job_variables=job_variables,
    as_subflow=True,
)

finished = await wait_for_flow_run(subflow_run.id, log_states=True)
if not finished.state.is_completed():
    raise_state_exception(finished.state)

# Option A: just use target_uri you passed in (no result fetching needed)
# return target_uri

# Option B: read the subflow's returned string
artifact_uri = get_state_result(finished.state)  # no await
if not isinstance(artifact_uri, str):
    raise TypeError(f"Expected subflow to return artifact URI string, got {type(artifact_uri)}")
return artifact_uri
Mounts you’ll need - Make sure both your process worker and the docker job containers mount the same host path at /var/prefect/results (or set PREFECT_RESULTS_ROOT to your chosen path): - job_variables on the docker subflow runs: - "volumes": ["/host/prefect/results/var/prefect/resultsrw"] - docker-compose for your process worker service: - volumes: ["./results/var/prefect/resultsrw"] or the same host path mapping Common mistakes that cause “coroutine” or MissingResult - Returning an awaitable from the subflow (fix by awaiting the dynamic module call as shown) - Trying to get_state_result when the subflow didn’t persist its return (fix by using Option A or enabling persist_result with a result store) Docs - Create deployments: https://docs-3.prefect.io/v3/how-to-guides/deployments/create-deployments - Customize job variables: https://docs-3.prefect.io/v3/how-to-guides/deployments/customize-job-variables If you share your artifact loader/saver patterns (e.g., S3 vs file://), I can adapt the save/load helpers accordingly.
d
@Marvin i am doing the following instead, but the same error appears
Copy code
@flow(name="module-docker-subflow")
async def docker_subflow(
    module_path: str,
    params: Dict[str, Any],
    upstream_data: Optional[str],
    target_uri: Optional[str] = None,
) -> str:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Docker subflow executing: {module_path}")
sys.path.insert(0, "/app")
            # call python module.py --upstream_artifact_path <upstream_data> --target_uri <target_uri> --kwargs key=value ...
            command = [
                "python", "/app/module.py",
                "--module_path", module_path,
                "--upstream_artifact_path", upstream_data or "",
                "--target_uri", target_uri,
                "--kwargs"
            ]
            for k, v in params.items():
                command.append(f"{k}={v}")
            command = [str(c) for c in command]  # Ensure all parts are strings
            <http://logger.info|logger.info>(f"Running command: {' '.join(command)}")
            result = subprocess.run(command, capture_output=True, text=True, check=False)
m
thought for 66.1 seconds
You’re still seeing that error because your subflow isn’t returning a concrete string result, and/or you’re trying to fetch a result that wasn’t persisted. Since you’ve switched to running your module via a subprocess inside the Docker job, the most robust pattern is: - Have the subflow write to the
target_uri
you pass in - Return that same
target_uri
as a plain string - In the parent, don’t call `get_state_result`; just use the known
target_uri
- If you do want to fetch the returned value with
get_state_result
, you must enable result persistence on the subflow and configure a result store shared between the parent and the Docker job Concrete fixes for your current subprocess approach 1) Make your docker_subflow return a string and surface subprocess output - Since your flow is async, avoid blocking the event loop. Run
subprocess.run
in a worker thread and log stdout/stderr. - Return
target_uri
explicitly.
Copy code
import os
import sys
import shlex
import subprocess
from typing import Any, Dict, Optional

from prefect import flow, get_run_logger
from prefect.utilities.asyncutils import run_sync_in_worker_thread

@flow(name="module-docker-subflow")
async def docker_subflow(
    module_path: str,
    params: Dict[str, Any],
    upstream_data: Optional[str],
    target_uri: Optional[str] = None,
) -> str:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Docker subflow executing: {module_path}")

    if not target_uri:
        raise ValueError("target_uri is required")

    # Ensure your code path is importable if your module.py imports project code
    sys.path.insert(0, "/app")

    # Build the command
    cmd = [
        "python", "/app/module.py",
        "--module_path", module_path,
        "--upstream_artifact_path", upstream_data or "",
        "--target_uri", target_uri,
        "--kwargs",
    ]
    for k, v in params.items():
        cmd.append(f"{k}={v}")

    <http://logger.info|logger.info>(f"Running command: {shlex.join(map(str, cmd))}")

    # Run synchronously in a worker thread so we can await it
    def _run():
        return subprocess.run(cmd, capture_output=True, text=True, cwd="/app", env=os.environ.copy())

    cp = await run_sync_in_worker_thread(_run)

    # Stream logs
    if cp.stdout:
        for line in cp.stdout.splitlines():
            <http://logger.info|logger.info>(line)
    if cp.stderr:
        for line in cp.stderr.splitlines():
            logger.error(line)

    if cp.returncode != 0:
        raise RuntimeError(f"module.py exited with {cp.returncode}")

    # Option A (recommended): return the known target_uri that module.py wrote to
    return target_uri
2) Parent: do not fetch a result; use the same target_uri you passed - This avoids MissingResult entirely.
Copy code
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import raise_state_exception

# submit
subflow_run = await run_deployment(
    name=f"{DOCKER_FLOW_NAME}/{DOCKER_DEPLOYMENT_NAME}",
    parameters={
        "module_path": module_path,
        "params": params,
        "upstream_data": upstream_data,
        "target_uri": target_uri,
    },
    job_variables=job_variables,
    as_subflow=True,
)

# wait
finished = await wait_for_flow_run(subflow_run.id, log_states=True)
if not finished.state.is_completed():
    raise_state_exception(finished.state)

# Just use the known target_uri
return target_uri
If you must read the subflow’s return via get_state_result - Keep the subflow’s
return target_uri
as above, but also enable result persistence and a result store for the subflow so the parent can read it later: - Add to docker_subflow decorator: -
persist_result=True
-
result_storage=LocalFileSystem(basepath="/var/prefect/results")
(bind-mount the same host path into BOTH the Docker job containers and your process worker), or use S3/GCS/Azure -
result_serializer=JSONSerializer()
- Then the parent can do: -
from prefect.states import get_state_result
-
artifact_uri = get_state_result(finished.state)
(no await) - Without persistence, calling
get_state_result
after fetching the state from the API will raise
MissingResult
. Why you saw “coroutine” earlier - That happens when the flow returned an awaitable (e.g., you called an async function and didn’t await it) and you then tried to treat it as a string. With the subprocess approach above, you return a plain string. Other gotchas to double-check - Ensure
/app/module.py
exists inside the container; mount your project into the container consistently and set
working_dir="/app"
in job_variables if you rely on relative paths. - Use
volumes
consistently for data/artifacts so the subprocess can write to the same host path the next module will read. - Keep
PREFECT_LOGGING_LEVEL=DEBUG
temporarily to see more in logs. Docs - Create deployments - Customize job variables If you want, share your
job_variables
for the subflow run and how
module.py
writes to
target_uri
— I can validate the mount/paths and tailor the snippet exactly.
d
@Marvin seems like this pattern generates a subflow run and a task run at the same time for this docker module. is this expected?
m
thought for 44.8 seconds
Yes — that’s expected with your current pattern. - You’ve wrapped the Docker call in a Prefect task (
run_module
). That creates a task run in the parent flow. - That task then triggers a separate flow run via
run_deployment(..., as_subflow=True)
. That creates a child flow run (the Docker subflow) on your Docker worker. So in the UI you’ll see: - Parent flow run - Task run: run_module (awaiting/monitoring) - Child flow run: module-docker-subflow (running in the container) If you prefer not to see an extra task node, you can call
run_deployment
directly from the flow body instead of inside a task:
Copy code
from prefect import flow
from prefect.deployments import run_deployment
from prefect.flow_runs import wait_for_flow_run
from prefect.states import raise_state_exception, get_state_result

@flow
async def dynamic_pipeline_flow(...):
    fr = await run_deployment(
        name="module-docker-subflow-deployment-name",
        parameters={...},
        job_variables={...},
        as_subflow=True,
    )
    finished = await wait_for_flow_run(fr.id, log_states=True)
    if not finished.state.is_completed():
        raise_state_exception(finished.state)
    # artifact_uri = get_state_result(finished.state)
    ...
Trade-offs: - Inside a task: you get task-level features (retry, timeout, caching, concurrency limits) and a clear node in the task graph. - In the flow body: a cleaner graph (no extra task node) but you lose task-level controls for that step. Both patterns are valid; choose based on how much task-level control/observability you want. Docs - Deployments: https://docs-3.prefect.io/v3/how-to-guides/deployments/create-deployments - Job variables: https://docs-3.prefect.io/v3/how-to-guides/deployments/customize-job-variables
d
@Marvin how to enable gpu support for in-process and for docker flows?
m
thought for 81.8 seconds
Short answer - In-process (process worker): there is no Prefect-specific GPU toggle. Your code will use GPUs if the host/container where it runs has drivers + CUDA libs installed and your framework (PyTorch/TF/etc.) detects them. Control which GPU(s) via environment variables like CUDA_VISIBLE_DEVICES. - Docker subflow (Docker worker): ensure the Docker host has NVIDIA Container Toolkit; then either: 1) Configure Docker’s default runtime to nvidia so all job containers see GPUs, or 2) Pass GPU requests in job_variables (if supported by your prefect-docker version), and use a CUDA-enabled base image. Details and examples A) In-process (Process worker) Prereqs: - The machine running your process worker must have NVIDIA drivers + CUDA libs (or a framework image with CUDA bundled) installed. - If the process worker is itself running in Docker (via docker-compose), start it with GPU access: - Use a CUDA-enabled image for the worker container (e.g., nvidia/cuda:12.2.0-runtime-ubuntu22.04 plus pip install prefect) OR - Use compose’s gpus support or default nvidia runtime on the host (see below). Control GPU usage per run: - Set environment variables in your deployment or work pool default job variables for the process worker:
Copy code
# Process work pool default job variables (or per deployment call)
{
  "env": {
    "CUDA_VISIBLE_DEVICES": "0",   # limit to GPU 0 for this run
    "NVIDIA_VISIBLE_DEVICES": "0"  # older runtimes sometimes pay attention to this too
  }
}
- In your model code:
Copy code
import torch
device = "cuda" if torch.cuda.is_available() else "cpu"
# or explicitly pick GPU 0 if you’ve set CUDA_VISIBLE_DEVICES appropriately
If your “in-process” mode literally runs inside your FastAPI process (not via a worker), that FastAPI container/host must be started with GPU access and CUDA libs too. B) Docker subflow (Docker worker) Prereqs on the Docker host: - Install NVIDIA Container Toolkit so Docker can run GPU workloads: https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/install-guide.html - Use a CUDA-enabled image for your module (e.g., nvidia/cuda:* or framework-specific GPU image). Prefect base images do NOT include CUDA. Two reliable ways to expose GPUs to job containers: 1) Default runtime method (most robust, no job variables needed) - Set Docker’s default runtime to nvidia on the host:
Copy code
# /etc/docker/daemon.json
{
  "runtimes": {
    "nvidia": { "path": "nvidia-container-runtime", "runtimeArgs": [] }
  },
  "default-runtime": "nvidia"
}
- Restart Docker. Now every container (including Prefect job containers) can see GPUs automatically. - You still control which GPUs per run with env:
Copy code
job_variables = {
  "image": "your-gpu-image:tag",
  "env": { "CUDA_VISIBLE_DEVICES": "0" },
  "volumes": [...],
  "networks": ["prefect-net"],
  "image_pull_policy": "IfNotPresent"
}
2) Per-run request method (job_variables) - Depending on your prefect-docker version, the worker may accept GPU-related job variables and pass them to Docker, e.g.:
Copy code
job_variables = {
  "image": "your-gpu-image:tag",
  "env": { "CUDA_VISIBLE_DEVICES": "0" },
  "volumes": [...],
  "networks": ["prefect-net"],
  "image_pull_policy": "IfNotPresent",
  "gpus": "all"  # equivalent to `docker run --gpus all` if supported
}
Notes: - Support for keys like gpus/device_requests can vary by version. If you see errors, prefer the “default runtime” method. - You can always set default job variables with your Docker work pool in the UI so you don’t pass gpus/env each run: Work Pools → your pool → Default job variables. - If the worker logs show it’s ignoring unknown keys, your version may not support per-run GPU keys; use the default runtime method instead. C) Verify GPU visibility (both modes) Add a quick check in your docker_subflow (and in-process flow) to confirm GPU presence: ``` import subprocess, shutil from prefect import get_run_logger logger = get_run_logger() if shutil.which("nvidia-smi"):
out = subprocess.run(["nvidia-smi"], capture_output=True, text=True) logger.info(out.stdout or out.stderr) else: logger.warning("nvidia-smi not found in PATH") try: import torch logger.info(f"torch.cuda.is_available(): {torch.cuda.is_available()}") logger.info(f"torch.cuda.device_count(): {torch.cuda.device_count()}") except Exception as e: logger.warning(f"PyTorch check skipped: {e}") ``` D) Image selection - Use images that include GPU libraries: - PyTorch: pytorch/pytorch:2.4.0-cuda12.1-cudnn8-runtime - TensorFlow: tensorflow/tensorflow:2.16.1-gpu - Base CUDA: nvidia/cuda:12.4.1-runtime-ubuntu22.04 + pip install your deps + prefect - Keep your module images small but GPU-capable. E) Scheduling/concurrency (optional) - If multiple flows share a single GPU, add your own scheduling or use Prefect concurrency limits in Cloud to avoid oversubscription. - Alternatively, partition by CUDA_VISIBLE_DEVICES and route different runs to different GPU ids. Useful references - Customize job variables: https://docs-3.prefect.io/v3/how-to-guides/deployments/customize-job-variables - NVIDIA Container Toolkit: https://docs.nvidia.com/datacenter/cloud-native/container-toolkit/latest/install-guide.html - Docker “--gpus” docs: https://docs.docker.com/config/containers/resource_constraints/#gpu If you share: - Whether your process worker also needs GPU - Your docker-compose for the workers - The image you intend to use …I can provide exact compose snippets and job_variables to match your environment.