hi <@ULVA73B9P> how do i handle objects between ta...
# ask-marvin
o
hi @Marvin how do i handle objects between tasks? i have for example a pytorch model that is instantiated in one task and a pass is evoked in another task. how do i pass the model object between the tasks? what are the resitrictions on object passing between tasks?
m
thought for 163.3 seconds
Great question. Short answer: inside a single flow run, you can pass any Python object (including a PyTorch model) between tasks as long as everything stays in the same process. As soon as you cross a process boundary or need to persist results, the object must be serializable (pickle-able), which is often painful for large GPU-backed models. Key concepts to keep in mind (Prefect 3.x): - Task outputs are propagated via Prefect futures. Within a single process, Prefect keeps task results in memory by default (
cache_result_in_memory=True
), so downstream tasks can use the original Python object without serialization. - If you use a process-based runner or need to persist/restore results, then objects must be serialized with the configured serializer (pickle-based by default). Large or GPU-resident models can be slow to serialize or fail entirely. - Prefect executes all tasks for a flow run in the same Python process by default unless you opt into a different task runner (e.g., ProcessPool). Subflows are separate flow runs and cannot share in-memory objects. What this means for your PyTorch model - Works well (no serialization): - Same flow run, same process, in-memory passing (default or ThreadPoolTaskRunner). - Keep
persist_result=False
for the task that returns the model. - Risky/expensive (requires serialization): - ProcessPoolTaskRunner (child processes) or any cross-process boundary. - Persisting task results (
persist_result=True
) or resuming from persisted results. - Passing models into subflows (separate flow runs). Recommended patterns 1) Easiest: keep it all in one task - Load the model and run inference in the same task to avoid passing the model between tasks.
Copy code
from prefect import flow, task

@task
def predict_batch(batch):
    import torch
    model = ...  # build/load once here
    model.eval()
    with torch.no_grad():
        return model(batch).cpu().numpy()

@flow
def main_flow(batches):
    return [predict_batch.submit(b) for b in batches]
2) If you want separate “load” and “predict” tasks, stay in one process - Use thread-based execution and avoid persisting the model result.
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@task(persist_result=False)
def load_model():
    import torch
    model = ...
    model.eval()
    return model  # stays in memory only

@task
def predict(model, batch):
    import torch
    with torch.no_grad():
        return model(batch).cpu().numpy()

@flow(task_runner=ThreadPoolTaskRunner())
def main_flow(batch):
    model = load_model()
    preds = predict(model, batch)
    return preds
Notes: - ThreadPoolTaskRunner keeps tasks in the same process, so the model isn’t serialized. - Avoid ProcessPoolTaskRunner for this pattern. 3) Singleton/lazy-init inside tasks (no passing at all) - Initialize the model once per process and reuse it.
Copy code
from functools import lru_cache
from prefect import flow, task

@lru_cache(maxsize=1)
def get_model():
    import torch
    m = ...
    m.eval()
    return m

@task
def predict(batch):
    import torch
    model = get_model()
    with torch.no_grad():
        return model(batch).cpu().numpy()

@flow
def main_flow(batches):
    return [predict.submit(b) for b in batches]
4) If you must cross processes/machines or want durability - Save the model externally and pass a lightweight reference (path/URI). Load it where you need it (e.g.,
torch.load
with
map_location='cpu'
or as appropriate). - This avoids passing giant Python objects around. Restrictions and gotchas - Process boundaries: Returning a model from a task running in a separate process requires pickling; this can fail for GPU tensors or be very slow/large. - Persistence: If you set
persist_result=True
(or end up persisting for retries/resume), Prefect will serialize your model. Consider leaving it
False
for non-serializable objects.
- Subflows: Do not pass in-memory models to subflows; send references (paths) instead. - Concurrency: If multiple threads use the same model instance, ensure your inference is thread-safe for your device configuration. Often fine for read-only inference, but evaluate for your workload. Docs for deeper reading: - Futures and result handling: Prefect Futures API - Serializers: Prefect Serializers API A couple of clarifying questions to tailor the advice: - Are you running everything on a single machine/worker, and do you plan to use the default runner or a ProcessPool? - Do you need to persist results for retries/resume, or is in-memory-only fine? - Is your model on CPU or GPU during inference?
o
can you show an example which includes two machines/processes? where the model is loaded in one task and evaluated in other task and in the same example loaded on one machine and evaluated on another machine?
@Marvin can you show an example which includes two machines/processes? where the model is loaded in one task and evaluated in other task and in the same example loaded on one machine and evaluated on another machine?
m
thought for 243.9 seconds
Absolutely — the safest way to cross processes or machines is to materialize the model to shared storage and pass a lightweight reference (path/URI) between tasks. Below are two complete examples: Example A: Two processes on the same machine (ProcessPoolTaskRunner) - The “load” task builds/loads the model and saves it to a local file. - The “predict” task receives the filepath and loads the model in a separate process. - This avoids trying to pickle the model object between processes.
Copy code
from pathlib import Path
from functools import lru_cache

from prefect import flow, task
from prefect.context import get_run_context
from prefect.task_runners import ProcessPoolTaskRunner

# Simple torch model for demo
def build_model():
    import torch
    import torch.nn as nn
    model = nn.Sequential(nn.Linear(4, 8), nn.ReLU(), nn.Linear(8, 1))
    model.eval()
    return model

@task
def load_model_to_disk() -> str:
    import torch
    ctx = get_run_context()
    run_id = ctx.flow_run.id

    # Use a run-scoped path so concurrent runs don't collide
    out_dir = Path("/tmp") / f"prefect-model-{run_id}"
    out_dir.mkdir(parents=True, exist_ok=True)
    path = out_dir / "<http://model.pt|model.pt>"

    model = build_model()
    # Option 1 (simple, requires this code to be importable where you load):
    torch.save(model, path)  # pickles the full model object

    # Option 2 (portable): torch.jit.trace/script and save a TorchScript file
    # example:
    # example_input = torch.randn(1, 4)
    # ts_model = torch.jit.trace(model, example_input)
    # ts_path = out_dir / "model.ts"
    # ts_model.save(str(ts_path))
    # return str(ts_path)

    return str(path)

@task
def predict_from_disk(model_path: str, batch: list[float]) -> float:
    import torch
    import numpy as np

    # If you saved with torch.save(model, ...):
    model = torch.load(model_path, map_location="cpu")

    # If you saved a TorchScript file instead:
    # model = torch.jit.load(model_path, map_location="cpu")

    x = torch.tensor(np.array(batch, dtype=np.float32)).view(1, -1)
    with torch.no_grad():
        y = model(x).item()
    return y

@flow(task_runner=ProcessPoolTaskRunner())
def multiprocess_flow(batch: list[float]) -> float:
    model_path = load_model_to_disk()
    return predict_from_disk(model_path, batch)

if __name__ == "__main__":
    print(multiprocess_flow([1.0, 2.0, 3.0, 4.0]))
Notes: - Using ProcessPoolTaskRunner runs tasks in separate processes on the same machine. Passing the raw model object would require pickling and will be slow/fragile for large models — passing a file path is robust. - TorchScript (commented option) makes the artifact more portable and avoids Python pickling entirely. Example B: Two different machines (DaskTaskRunner + shared storage like S3) - The “load” task runs on one worker/machine, writes a model artifact to S3 (or any shared storage), and returns an
s3://...
URI. - The “predict” task runs on another worker/machine, downloads and evaluates it. - You’ll need a Dask cluster with multiple workers on different machines and
prefect[dask]
+
boto3
installed. ``` # pip/uv deps you’ll need: # uv run --with 'prefect[dask]' --with boto3 python your_script.py import os from pathlib import Path import uuid from prefect import flow, task from prefect_dask.task_runners import DaskTaskRunner # Configure your S3 destination via env or hardcode for demo S3_BUCKET = os.getenv("S3_BUCKET", "my-bucket") S3_PREFIX = os.getenv("S3_PREFIX", "artifacts/models") def build_model(): import torch import torch.nn as nn model = nn.Sequential(nn.Linear(4, 8), nn.ReLU(), nn.Linear(8, 1)) model.eval() return model @task def load_model_to_s3() -> str: import torch import boto3 # Create a unique key per run key = f"{S3_PREFIX}/model-{uuid.uuid4().hex}.pt" # Save locally first local_dir = Path("/tmp/prefect-model") local_dir.mkdir(parents=True, exist_ok=True)
local_path = local_dir / Path(key).name model = build_model() torch.save(model, local_path) # or use TorchScript as in Example A # Upload to S3 s3 = boto3.client("s3") s3.upload_file(str(local_path), S3_BUCKET, key) return f"s3://{S3_BUCKET}/{key}" @task def predict_from_s3(model_uri: str, batch: list[float]) -> float: import boto3 import torch import numpy as np from urllib.parse import urlparse # Parse s3://bucket/key u = urlparse(model_uri) bucket = u.netloc key = u.path.lstrip("/") # Download to local temp local_dir = Path("/tmp/prefect-model") local_dir.mkdir(parents=True, exist_ok=True) local_path = local_dir / Path(key).name s3 = boto3.client("s3") s3.download_file(bucket, key, str(local_path)) # Load and predict model = torch.load(local_path, map_location="cpu") x = torch.tensor(np.array(batch, dtype=np.float32)).view(1, -1) with torch.no_grad(): y = model(x).item() return y @flow( # Point to your existing Dask scheduler (multi-machine cluster) task_runner=DaskTaskRunner(address="tcp://YOUR-DASK-SCHEDULER:8786") ) def multi_machine_flow(batch: list[float]) -> float: model_uri = load_model_to_s3() return predict_from_s3(model_uri, batch) if name == "__main__": print(multi_machine_flow([1.0, 2.0, 3.0, 4.0])) ``` Notes: - The Dask scheduler will place tasks on different workers; they may run on different machines. Exact placement is up to the scheduler. - Using a shared store (S3 here) is the reliable way to move heavy artifacts between machines. You can use any shared storage (NFS, GCS, Azure Blob, etc.). - Prefer TorchScript artifacts when you want portability between environments or Python versions. Why not pass the model object directly? - Cross-process/machine = serialization. For large GPU-backed models, pickle-based transfer is slow, error-prone, or fails. Persisting to a file/URI and passing a reference is both faster and more reliable. Related links - Dask integration: Prefect Dask integration - Futures, results, and serialization background: Futures API, Serializers API If you share a bit about your environment (CPU vs GPU, available shared storage, and whether you already use Dask/Ray), I can tailor the examples for your stack (e.g., TorchScript vs state_dict, S3 vs GCS, GPU map_location usage, etc.).
o
yeah but now you have only shown one flow, my question was to create one flow that is using one machine and one flow across two machines
yeah but now you have only shown one flow, my question was to create one flow that is using one machine and one flow across two machines
@Marvin yeah but now you have only shown one flow, my question was to create one flow that is using one machine/process with in memory handling of the model and one flow across two machines in the same python file
m
thought for 60.0 seconds
Got it — here’s a single Python file that defines two flows side-by-side: - local_in_memory_flow: one machine/process, passing the model in-memory between tasks - distributed_cross_machine_flow: two machines via Dask, passing a reference (S3 URI) between tasks Notes before you run: - For the distributed flow, you’ll need a running Dask cluster and to set
address="tcp://<scheduler-host>:8786"
. - Install extras:
prefect[dask]
and
boto3
. - Set
S3_BUCKET
and (optionally)
S3_PREFIX
env vars for the S3 example. ``` # deps (examples): # uv run --with 'prefect[dask]' --with boto3 python this_file.py import os import uuid from pathlib import Path from typing import List from prefect import flow, task from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner # not used below, but handy # ----------------------- # Shared utility: model # ----------------------- def build_model(): import torch import torch.nn as nn model = nn.Sequential(nn.Linear(4, 8), nn.ReLU(), nn.Linear(8, 1)) model.eval() return model # ----------------------- # 1) Local: in-memory passing within a single process # ----------------------- @task(persist_result=False) def load_model_in_memory(): # Returns the live Python object (no persistence / no pickling) return build_model() @task def predict_in_memory(model, batch: List[float]) -> float: import torch import numpy as np x = torch.tensor(np.array(batch, dtype=np.float32)).view(1, -1) with torch.no_grad(): y = model(x).item() return y @flow(name="local_in_memory_flow", task_runner=ThreadPoolTaskRunner()) def local_in_memory_flow(batch: List[float]) -> float: # ThreadPoolTaskRunner keeps tasks in the same process model = load_model_in_memory() return predict_in_memory(model, batch) # ----------------------- # 2) Distributed: two machines via Dask + shared storage (S3) # ----------------------- S3_BUCKET = os.getenv("S3_BUCKET", "my-bucket") S3_PREFIX = os.getenv("S3_PREFIX", "artifacts/models") @task def load_model_to_s3() -> str: """ Build the model on one worker/machine, save it to S3, and return s3:// URI. """ import torch import boto3 key = f"{S3_PREFIX}/model-{uuid.uuid4().hex}.pt" tmp_dir = Path("/tmp/prefect-model") tmp_dir.mkdir(parents=True, exist_ok=True) local_path = tmp_dir / Path(key).name model = build_model() # Optionally use TorchScript for portability: # example_input = torch.randn(1, 4) # ts_model = torch.jit.trace(model, example_input) # ts_model.save(str(local_path)) # Otherwise, pickle the full object: torch.save(model, local_path) s3 = boto3.client("s3") s3.upload_file(str(local_path), S3_BUCKET, key) return f"s3://{S3_BUCKET}/{key}" @task def predict_from_s3(model_uri: str, batch: List[float]) -> float: """ Download the model on another worker/machine and run inference. """ import boto3 import torch import numpy as np from urllib.parse import urlparse u = urlparse(model_uri) bucket = u.netloc key = u.path.lstrip("/") tmp_dir = Path("/tmp/prefect-model") tmp_dir.mkdir(parents=True, exist_ok=True) local_path = tmp_dir / Path(key).name s3 = boto3.client("s3") s3.download_file(bucket, key, str(local_path)) # If saved with torch.save(model, ...): model = torch.load(local_path, map_location="cpu") # If you saved TorchScript instead: # model = torch.jit.load(local_path, map_location="cpu") x = torch.tensor(np.array(batch, dtype=np.float32)).view(1, -1) with torch.no_grad(): y = model(x).item() return y # Define the distributed flow with DaskTaskRunner # IMPORTANT: requires
prefect[dask]
installed and a reachable Dask scheduler from prefect_dask.task_runners import DaskTaskRunner @flow( name="distributed_cross_machine_flow",
task_runner=DaskTaskRunner(address=os.getenv("DASK_SCHEDULER_ADDRESS", "tcp://YOUR-DASK-SCHEDULER:8786")), ) def distributed_cross_machine_flow(batch: List[float]) -> float: model_uri = load_model_to_s3() return predict_from_s3(model_uri, batch) # ----------------------- # Demo entrypoint # ----------------------- if name == "__main__": test_batch = [1.0, 2.0, 3.0, 4.0] print("Running local_in_memory_flow ...") print(local_in_memory_flow(test_batch)) # Only run the distributed example if configured dask_addr = os.getenv("DASK_SCHEDULER_ADDRESS") if dask_addr and os.getenv("S3_BUCKET"): print(f"Running distributed_cross_machine_flow against {dask_addr} ...") print(distributed_cross_machine_flow(test_batch)) else: print("Skipping distributed_cross_machine_flow. Set DASK_SCHEDULER_ADDRESS and S3_BUCKET to run it.") ``` Why this structure: - The local flow keeps everything in one process; the model never leaves memory, so there’s no pickling/persistence overhead. - The distributed flow uses S3 as a rendezvous point so different machines don’t have to exchange Python objects — just a URI. This is reliable and scalable. Useful links: - Dask integration: Prefect Dask integration - Futures/serialization background: Futures API, Serializers API If you want, I can adapt this to: - TorchScript-only artifacts (more portable across Python versions) - GCS/Azure/NFS instead of S3 - GPU inference (map_location and device handling)
o
@Marvin ok, but the model must be reusable in the distributed version. adjust, that you after loading it from the s3 have then again the regualr in memory utilization in a thrid task for the distributed flow
m
thought for 56.5 seconds
Perfect — below is one Python file with both flows: - local_in_memory_flow: single machine/process, pass the model in memory between tasks - distributed_cross_machine_flow: two machines via Dask; loads model once from S3 into a Dask Actor (kept in memory on one worker), then a third task uses that in-memory actor repeatedly for predictions without reloading Notes: - For the distributed flow you need a Dask scheduler and workers; set DASK_SCHEDULER_ADDRESS env var. - Install extras: prefect[dask], boto3. - Set S3_BUCKET (and optionally S3_PREFIX) for the shared model artifact. - The Dask Actor ensures the model is loaded once and reused in memory across many predictions. ``` # deps (examples): # uv run --with 'prefect[dask]' --with boto3 python this_file.py import os import uuid from pathlib import Path from typing import List from prefect import flow, task from prefect.task_runners import ThreadPoolTaskRunner # ========================= # Local (single machine) flow # ========================= def build_model(): import torch import torch.nn as nn model = nn.Sequential(nn.Linear(4, 8), nn.ReLU(), nn.Linear(8, 1)) model.eval() return model @task(persist_result=False) def load_model_in_memory(): # Returns a live Python object; stays in-process return build_model() @task def predict_in_memory(model, batch: List[float]) -> float: import torch import numpy as np x = torch.tensor(np.array(batch, dtype=np.float32)).view(1, -1) with torch.no_grad(): y = model(x).item() return y @flow(name="local_in_memory_flow", task_runner=ThreadPoolTaskRunner()) def local_in_memory_flow(batch: List[float]) -> float: model = load_model_in_memory() return predict_in_memory(model, batch) # ========================= # Distributed (multi-machine) flow with Dask # ========================= S3_BUCKET = os.getenv("S3_BUCKET", "my-bucket") S3_PREFIX = os.getenv("S3_PREFIX", "artifacts/models") @task def load_model_to_s3() -> str: """ Build the model on one worker/machine, save it to S3, and return s3:// URI. """ import torch import boto3 key = f"{S3_PREFIX}/model-{uuid.uuid4().hex}.pt" tmp_dir = Path("/tmp/prefect-model") tmp_dir.mkdir(parents=True, exist_ok=True) local_path = tmp_dir / Path(key).name model = build_model() # For portability you can swap to TorchScript instead of pickling: # example_input = torch.randn(1, 4) # ts_model = torch.jit.trace(model, example_input) # ts_model.save(str(local_path)) torch.save(model, local_path) s3 = boto3.client("s3") s3.upload_file(str(local_path), S3_BUCKET, key) return f"s3://{S3_BUCKET}/{key}" # A Dask Actor that keeps the model in memory on a single worker class ModelActor: def __init__(self, model_uri: str): import boto3 import torch from urllib.parse import urlparse # Download model once to the actor's local filesystem u = urlparse(model_uri) bucket = u.netloc key = u.path.lstrip("/") tmp_dir = Path("/tmp/prefect-model-actor") tmp_dir.mkdir(parents=True, exist_ok=True) local_path = tmp_dir / Path(key).name s3 = boto3.client("s3") s3.download_file(bucket, key, str(local_path)) # If you saved with torch.save(model, ...): self.model = torch.load(local_path, map_location="cpu") self.model.eval() # If you saved TorchScript: # self.model = torch.jit.load(local_path, map_location="cpu") def predict(self, batch: List[float]) -> float: import torch import numpy as np x = torch.tensor(np.array(batch, dtype=np.float32)).view(1, -1) with torch.no_grad(): y = self.model(x).item() return y @task(persist_result=False) def create_model_actor(model_uri: str): """ Create a Dask Actor holding the in-memory model on a worker.
Returns an Actor handle (lightweight proxy), not the model bytes. """ from dask.distributed import get_client client = get_client() actor_future = client.submit(ModelActor, model_uri, actor=True) # Return the actor handle so downstream tasks can reuse the same in-memory model return actor_future.result() @task def predict_with_actor(actor, batch: List[float]) -> float: """ Use the in-memory model inside the actor to predict without reloading. """ # Call the method on the actor; it runs on the actor's worker process fut = actor.predict(batch) return fut.result() # DaskTaskRunner requires a reachable Dask scheduler from prefect_dask.task_runners import DaskTaskRunner @flow( name="distributed_cross_machine_flow", task_runner=DaskTaskRunner(address=os.getenv("DASK_SCHEDULER_ADDRESS", "tcp://YOUR-DASK-SCHEDULER:8786")), ) def distributed_cross_machine_flow(batches: List[List[float]]) -> list[float]: """ - Task 1: load_model_to_s3 -> model_uri (runs on one machine) - Task 2: create_model_actor(model_uri) -> actor (loads model into memory on a worker) - Task 3: predict_with_actor(actor, batch) for many batches (reuses the in-memory model) """ model_uri = load_model_to_s3() actor = create_model_actor(model_uri) # Reuse the same in-memory model for multiple predictions preds = [predict_with_actor.submit(actor, b) for b in batches] return [p.result() for p in preds] # ========================= # Demo entrypoint # ========================= if name == "__main__": test_batch = [1.0, 2.0, 3.0, 4.0] print("Running local_in_memory_flow ...") print(local_in_memory_flow(test_batch)) dask_addr = os.getenv("DASK_SCHEDULER_ADDRESS") if dask_addr and os.getenv("S3_BUCKET"): print(f"Running distributed_cross_machine_flow against {dask_addr} ...") batch_list = [ [1.0, 2.0, 3.0, 4.0], [0.5, 1.5, 2.5, 3.5], [2.0, 0.0, -1.0, 1.0], ] print(distributed_cross_machine_flow(batch_list)) else: print("Skipping distributed_cross_machine_flow. Set DASK_SCHEDULER_ADDRESS and S3_BUCKET to run it.") ``` Why this meets your requirements: - The distributed flow uses three tasks: load to S3 (Task 1), load once into memory on a worker via a Dask Actor (Task 2), and then perform predictions many times using that same in-memory model (Task 3). No reloading happens for each prediction. - The actor handle is a lightweight proxy you can pass between tasks; the heavy model stays resident on the actor’s worker process. References: - Prefect Dask integration: Prefect Dask integration If you prefer TorchScript-only artifacts, GCS/Azure/NFS instead of S3, or GPU inference (map_location/device handling), I can adapt this snippet accordingly.
o
@Marvin regarding your answers, which are helpful. which task runners can run distributed (multiple machines) and which task runner can only run on one machine?
m
thought for 65.2 seconds
Here’s the quick breakdown for Prefect 3.x task runners: Single-machine (local) task runners - ThreadPoolTaskRunner (default-ish choice for simple concurrency) - Concurrency via threads in a single process on one machine - Great for sharing in-memory objects between tasks - ProcessPoolTaskRunner - Concurrency via multiple processes on one machine - Cross-process requires pickling; can’t share in-memory objects directly - PrefectTaskRunner - Serial execution on one machine (Primarily for debugging or very simple flows) Distributed (can run across multiple machines) - DaskTaskRunner (from prefect-dask) - Runs tasks on a Dask cluster; workers can span many machines - Provide an existing scheduler address (e.g., tcp://host:8786) or create a cluster - Link: Prefect Dask integration - RayTaskRunner (from prefect-ray) - Runs tasks on a Ray cluster; nodes can span many machines - Connect with an address like ray//&lt;head node ip&gt;10001 (or start local Ray) - Link: Prefect Ray integration Rules of thumb - Need to share heavy Python objects in-memory between tasks? Use ThreadPoolTaskRunner (same process) or a cluster-side primitive (e.g., Dask Actors or Ray Actors) to keep the object resident on a worker and pass a handle. - Need multi-machine scaling? Use DaskTaskRunner or RayTaskRunner and move heavy data/models via shared storage (S3/GCS/NFS) or via cluster-native object stores/actors. - Avoid ProcessPoolTaskRunner for large non-serializable objects unless you persist to disk and pass paths.
o
@Marvin does the threadpooltaskrunner have any limitations regarding docker container spawning? so in other words if i use threadpooltasktunner does it allow to create multiple containers in each thread allowing results of them to be shared in the prrocess?
m
thought for 35.6 seconds
Short answer: No Prefect-imposed limitation. ThreadPoolTaskRunner runs all tasks as threads in a single Python process, so: - You can start multiple Docker containers concurrently from different tasks/threads. - Any Python objects you create/collect in those tasks (e.g., logs, IDs, parsed results, even client handles) can be passed to downstream tasks in-memory within the same flow run. - There’s no cross-process serialization with ThreadPoolTaskRunner, so object sharing is straightforward. Important caveats to keep in mind - Access to Docker: Your flow’s process must be able to talk to the Docker daemon (e.g., /var/run/docker.sock mounted or remote TCP). If your flow itself runs inside a container (e.g., via a Docker work pool), you must mount the Docker socket or use DinD appropriately. - Thread safety: The Python Docker SDK can be used from multiple threads; to be safe, create a client per task/thread (docker.from_env()) instead of sharing a single global client across threads. - Cleanup: Make sure containers are stopped/removed, even on failure (use try/finally). - CPU-bound work: Python threads won’t parallelize CPU-bound Python code due to the GIL, but Docker API calls and container workloads are external, so starting/waiting on containers is generally fine in threads. - Single machine only: ThreadPoolTaskRunner will not distribute across machines. For multi-machine, use DaskTaskRunner or RayTaskRunner, or run separate flows via deployments/workers on different agents. Minimal example: spawn multiple containers in parallel threads and share results in-memory
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@task
def run_container(image: str, command: list[str]) -> dict:
    import docker
    client = docker.from_env()  # per-thread client
    container = client.containers.run(image, command, detach=True)
    try:
        result = container.wait()            # {'StatusCode': int}
        logs = container.logs().decode()
        return {"id": container.id, "status": result["StatusCode"], "logs": logs}
    finally:
        # cleanup
        try:
            container.remove(force=True)
        except Exception:
            pass

@task
def aggregate(results: list[dict]) -> dict:
    # All results are just Python objects; shared in the same process
    return {
        "count": len(results),
        "exit_codes": [r["status"] for r in results],
        "first_log": results[0]["logs"][:200] if results else "",
    }

@flow(task_runner=ThreadPoolTaskRunner(max_workers=4))
def docker_threaded_flow():
    futures = [
        run_container.submit("alpine:3.19", ["sh", "-c", "echo hi && sleep 1 && echo bye"]),
        run_container.submit("alpine:3.19", ["sh", "-c", "echo task2 && sleep 2"]),
        run_container.submit("alpine:3.19", ["sh", "-c", "echo task3 && sleep 3"]),
        run_container.submit("alpine:3.19", ["sh", "-c", "echo task4 && sleep 1"]),
    ]
    results = [f.result() for f in futures]
    return aggregate(results)

if __name__ == "__main__":
    print(docker_threaded_flow())
Bottom line: ThreadPoolTaskRunner is compatible with launching many containers in parallel, and results can be shared in-memory across tasks in the process. Just ensure Docker access, create per-thread clients, and handle cleanup. If you need multi-machine distribution of those container-launch tasks, switch to DaskTaskRunner or RayTaskRunner and use shared storage or cluster-native object stores/actors for coordination.