<@ULVA73B9P> Make a minimal example ThreadPoolTas...
# ask-marvin
o
@Marvin Make a minimal example ThreadPoolTaskRunner (same process, pass-by-reference) but fail with ProcessPoolTaskRunner (separate processes, requires pickling). 1) PyTorch model on GPU (cuda) and live CUDA tensors - ThreadPool: OK for inference within one process. You can share a loaded torch.nn.Module and torch.Tensor references across threads if your usage is thread-safe (often is for pure inference). - ProcessPool: Fails because the live model/tensors and CUDA context are not safely picklable/transferable. You’ll see pickling errors or CUDA initialization/memory issues. - Workaround for processes: pass a model path/config and load the model inside each process; move tensors to CPU or reconstruct them from bytes/np arrays inside the task.
m
thought for 97.6 seconds
Got it — here’s a minimal, self-contained Prefect 3.x example that: - Succeeds with ThreadPoolTaskRunner (same process; pass-by-reference works) - Fails with ProcessPoolTaskRunner (separate processes; pickling of live CUDA tensors/model fails) - Shows a working process-based workaround (load model inside each process, pass CPU/serializable data) Run this as a script (the main guard is needed for process pools).
Copy code
import os
import sys
import numpy as np
import torch
import torch.nn as nn

from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner


# Simple tiny model for demo
class TinyNet(nn.Module):
    def __init__(self, d_in=4, d_out=2):
        super().__init__()
        <http://self.net|self.net> = nn.Sequential(
            nn.Linear(d_in, d_out),
            nn.ReLU(),
        )

    def forward(self, x):
        return <http://self.net|self.net>(x)


# Task that expects a ready-to-use model and tensor
@task
def infer(model: nn.Module, x: torch.Tensor) -> float:
    with torch.no_grad():
        y = model(x)
        return float(y.sum().item())


# Workaround task: load model inside the process and reconstruct tensors from CPU-friendly inputs
@task
def load_and_infer_from_path(model_path: str, x_np: np.ndarray, device: str = "cuda") -> float:
    model = TinyNet().to(device)
    state = torch.load(model_path, map_location=device)
    model.load_state_dict(state)
    x = torch.from_numpy(x_np).to(device)
    with torch.no_grad():
        y = model(x)
        return float(y.sum().item())


# ThreadPool: OK (same process, pass-by-reference is fine)
@flow(name="thread_ok", task_runner=ThreadPoolTaskRunner(max_workers=4))
def thread_flow() -> list[float]:
    device = "cuda"
    model = TinyNet().to(device)
    x = torch.randn(32, 4, device=device)
    futures = [infer.submit(model, x) for _ in range(2)]
    return [f.result() for f in futures]


# ProcessPool: expected to FAIL (separate processes require pickling; live CUDA objects are not picklable)
@flow(name="process_fails", task_runner=ProcessPoolTaskRunner(max_workers=2))
def process_flow_should_fail() -> list[float]:
    device = "cuda"
    model = TinyNet().to(device)
    x = torch.randn(32, 4, device=device)
    futures = [infer.submit(model, x) for _ in range(2)]
    return [f.result() for f in futures]


# ProcessPool workaround: pass CPU/serializable inputs and load model inside each process
@flow(name="process_workaround", task_runner=ProcessPoolTaskRunner(max_workers=2))
def process_flow_workaround(model_path: str) -> list[float]:
    # Use CPU-friendly payloads (e.g., numpy arrays)
    x_np = np.random.randn(32, 4).astype("float32")
    futures = [load_and_infer_from_path.submit(model_path, x_np) for _ in range(2)]
    return [f.result() for f in futures]


if __name__ == "__main__":
    if not torch.cuda.is_available():
        print("CUDA is not available on this machine. This demo requires a GPU.")
        sys.exit(0)

    # Save a CPU state dict for the workaround
    model_cpu = TinyNet()
    model_path = "<http://model.pt|model.pt>"
    torch.save(model_cpu.state_dict(), model_path)

    print("Running thread_flow (should succeed)...")
    thread_results = thread_flow()
    print("thread_flow results:", thread_results)

    print("\nRunning process_flow_should_fail (should raise due to pickling/CUDA issues)...")
    try:
        process_results = process_flow_should_fail()
        print("Unexpected success:", process_results)
    except Exception as e:
        print("Expected failure with ProcessPoolTaskRunner:", type(e).__name__, str(e)[:500])

    print("\nRunning process_flow_workaround (should succeed by loading model in each process)...")
    workaround_results = process_flow_workaround(model_path)
    print("process_flow_workaround results:", workaround_results)

    # Cleanup
    if os.path.exists(model_path):
        os.remove(model_path)
What this shows:
- ThreadPoolTaskRunner: The flow shares a live CUDA
nn.Module
and
torch.Tensor
across threads; inference works. - ProcessPoolTaskRunner: Submitting tasks with live CUDA objects triggers pickling errors or CUDA context issues (expected). - Workaround: Pass a model path and CPU-friendly arrays; load the model and move tensors to CUDA inside each process. References: - Prefect 3.x task runners concept - prefect.task_runners source
o
@Marvin why does this not fail? @flow(name="process_fails", task_runner=ProcessPoolTaskRunner(max_workers=2)) def process_flow_should_fail() -> list[float]: device = "cuda" model = TinyNet().to(device) x = torch.randn(32, 4, device=device) futures = [infer.submit(model, x) for _ in range(25)] return [f.result() for f in futures]
m
thought for 66.8 seconds
Short answer: because on your system the process pool likely isn’t forcing pickling, or PyTorch is successfully pickling/unpickling your objects — so it “works,” even though it’s not a safe pattern. Here are the two common reasons this doesn’t fail: - Linux fork semantics: On Linux, ProcessPoolExecutor typically uses the “fork” start method. Forked children inherit the parent’s memory, so your live CUDA model/tensor references can appear to “just work” without being pickled. This is unsafe/unsupported with CUDA and can lead to hangs or crashes under load. - PyTorch can pickle CUDA tensors/modules: Even under spawn, simple nn.Module and CUDA tensors can sometimes be serialized and reconstructed in the worker. It’s inefficient (copies GPU data per task/process) and still risky for real workloads, but it may not error for small examples. How to confirm what’s happening - Check the start method and PIDs:
Copy code
import multiprocessing as mp, os, torch
print("flow pid:", os.getpid(), "start:", mp.get_start_method())

@task
def infer(model, x):
    print("task pid:", os.getpid(), "start:", mp.get_start_method(), "cuda initialized:", torch.cuda.is_initialized())
    with torch.no_grad():
        return float(model(x).sum().item())
Force a failure (or at least force true process isolation) - Enforce spawn so arguments must be pickled:
Copy code
if __name__ == "__main__":
    import multiprocessing as mp
    mp.set_start_method("spawn", force=True)
    process_flow_should_fail()
With spawn, you’re more likely to see pickling/initialization issues or massive overhead from shipping CUDA tensors/models to workers. Recommended pattern for ProcessPool with CUDA - Don’t pass live CUDA objects between processes. Instead: - Pass a model path/config and load the model inside the task - Pass CPU/serializable payloads (e.g., numpy arrays or bytes) and move to CUDA inside the task - Alternatively, stick with ThreadPoolTaskRunner for single-process/multi-thread inference on one GPU Minimal “safe” process example
Copy code
@task
def load_and_infer(model_path: str, x_np):
    import torch, numpy as np
    device = "cuda"
    model = TinyNet().to(device)
    model.load_state_dict(torch.load(model_path, map_location=device))
    x = torch.from_numpy(x_np).to(device)
    with torch.no_grad():
        return float(model(x).sum().item())

@flow(task_runner=ProcessPoolTaskRunner(max_workers=2))
def process_ok(model_path: str):
    x_np = np.random.randn(32, 4).astype("float32")
    futures = [load_and_infer.submit(model_path, x_np) for _ in range(4)]
    return [f.result() for f in futures]

if __name__ == "__main__":
    import multiprocessing as mp, torch
    mp.set_start_method("spawn", force=True)  # ensure real process isolation
    model = TinyNet()
    torch.save(model.state_dict(), "<http://model.pt|model.pt>")
    process_ok("<http://model.pt|model.pt>")
References - Prefect task runners in 3.x: Task runners concept - Source for task runners: prefect.task_runners - PyTorch multiprocessing and CUDA guidance (avoid fork; use spawn): PyTorch multiprocessing notes
o
@Marvin also with mp.set_start_method("spawn", force=True) ProcessPoolTaskRunner does not fail...
m
thought for 50.4 seconds
Totally fair observation — it doesn’t have to fail. What’s happening: - With ProcessPoolTaskRunner + spawn, Prefect pickles your task arguments and ships them to worker processes. - PyTorch can pickle nn.Module objects and CUDA tensors; in the worker, they’re reconstructed and a fresh CUDA context is created for that process. - So you aren’t “sharing” a live GPU model/tensor; you’re copying/serializing and re-creating them per task call in each worker. That’s expensive but not inherently an error, so small demos often “work.” How to verify it’s copy/recreate (not shared)
Copy code
import os, multiprocessing as mp, torch

@task
def infer(model, x):
    print({
        "task_pid": os.getpid(),
        "cuda_initialized": torch.cuda.is_initialized(),
        "x_device": str(x.device),
        "x_ptr": x.data_ptr(),  # different per process
        "model_id": id(model),  # different per reconstruction
    })
    with torch.no_grad():
        return float(model(x).sum().item())

if __name__ == "__main__":
    mp.set_start_method("spawn", force=True)
    process_flow_should_fail()  # your flow
You should see different PIDs and different
data_ptr()
values across workers, showing new CUDA allocations per process. Why this pattern is risky/inefficient - GPU memory is duplicated per process; each task call serializes the model/tensor and re-allocates GPU memory in the worker. - Significant overhead (GPU↔️CPU copies during pickling/unpickling), potential fragmentation, and instability under load. - It may start failing when models/tensors are large or concurrency grows. Safer process-based pattern: load per worker, pass CPU-friendly data
Copy code
# per-worker cache
_worker = {"model": None, "device": None}

@task
def infer_cached(model_path: str, x_np, device: str = "cuda"):
    import torch, numpy as np
    if _worker["model"] is None or _worker["device"] != device:
        m = TinyNet().to(device).eval()
        m.load_state_dict(torch.load(model_path, map_location=device))
        _worker["model"] = m
        _worker["device"] = device
    x = torch.from_numpy(x_np).to(device)
    with torch.no_grad():
        return float(_worker<x|"model">.sum().item())

@flow(task_runner=ProcessPoolTaskRunner(max_workers=2))
def process_ok(model_path: str):
    x_np = np.random.randn(32, 4).astype("float32")
    futs = [infer_cached.submit(model_path, x_np) for _ in range(8)]
    return [f.result() for f in futs]
- Each worker initializes CUDA and loads the model once, then reuses it. - Only CPU-serializable payloads are sent between processes. When to use which: - ThreadPoolTaskRunner: Best if you want to share a single GPU-resident model across concurrent tasks in the same process (pure inference is often thread-safe). - ProcessPoolTaskRunner: Only if you really need separate processes; use per-worker init to avoid re-sending live CUDA objects. References: - Prefect 3.x task runners: Task runners concept - PyTorch multiprocessing and CUDA: PyTorch multiprocessing notes
o
@Marvin what about loading the model into redis and then access it put it into the GPU? prefecto comes with redis or not?
m
thought for 103.7 seconds
Short answer - Yes, you can stash a serialized model in Redis and load it inside each task/process, then move it to CUDA. - Prefect does not “come with” a Redis server. You need to run your own Redis and install a Redis client. Prefect 3.x has a Redis block you can use to store connection details, but it doesn’t bundle or manage Redis for you. When this makes sense (and caveats) - Works: storing a CPU state_dict (or full model) as bytes in Redis and loading it inside tasks. Then move to GPU with
map_location="cuda"
or
.to("cuda")
. - Caveats: - Redis stores data in RAM. Large models will be costly and may hit Redis’s 512 MB per-string limit. Many modern models exceed this. - You’ll pay serialization/deserialization costs on every load unless you cache per worker. - Object storage (S3/GCS/Azure Blob) is generally a better fit for large binaries. Use Redis as a pointer/version registry, not the payload. Minimal pattern with redis-py (recommended over inventing block APIs) - One-time “publish” to Redis:
Copy code
# one-time setup (outside Prefect)
import io, torch, redis

# your model defined somewhere importable by workers
state = TinyNet().cpu().state_dict()

buf = io.BytesIO()
torch.save(state, buf)  # CPU tensors; smaller and portable
r = redis.from_url("<redis://localhost:6379/0>")
r.set("model:tinynet:v1", buf.getvalue())
- Prefect 3.x flow/tasks: lazy-load the model once per worker and reuse
Copy code
import os, io, numpy as np, torch
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner

# simple worker-local cache
_worker = {"model": None, "device": None}

class TinyNet(torch.nn.Module):
    def __init__(self, d_in=4, d_out=2):
        super().__init__()
        <http://self.net|self.net> = torch.nn.Sequential(torch.nn.Linear(d_in, d_out), torch.nn.ReLU())
    def forward(self, x): return <http://self.net|self.net>(x)

@task
def infer_from_redis(x_np, redis_url="<redis://localhost:6379/0>", key="model:tinynet:v1", device="cuda"):
    import redis
    if _worker["model"] is None or _worker["device"] != device:
        r = redis.from_url(redis_url)
        blob = r.get(key)
        if blob is None:
            raise RuntimeError(f"Missing key in Redis: {key}")
        state = torch.load(io.BytesIO(blob), map_location=device)
        m = TinyNet().to(device).eval()
        m.load_state_dict(state)
        _worker["model"] = m
        _worker["device"] = device

    x = torch.from_numpy(x_np).to(device)
    with torch.no_grad():
        return float(_worker<x|"model">.sum().item())

@flow(task_runner=ProcessPoolTaskRunner(max_workers=2))
def process_pool_infer():
    x_np = np.random.randn(32, 4).astype("float32")
    futures = [infer_from_redis.submit(x_np) for _ in range(4)]
    return [f.result() for f in futures]

if __name__ == "__main__":
    # If you want to force true process isolation on Unix:
    # import multiprocessing as mp; mp.set_start_method("spawn", force=True)
    print(process_pool_infer())
Notes - Keep the saved weights on CPU when serializing; load with
map_location="cuda"
or transfer after load. This avoids embedding device-specific state in the payload. - Cache the model per worker to avoid pulling from Redis on each task call. Should I use Redis for this? - For small models or as a fast cache: fine. - For large models: use object storage (S3/GCS/Azure) for the bytes and Redis only for a pointer/version. Prefect has first-class blocks for these storages. Prefect + Redis specifics - Prefect 3.x includes a Redis block you can configure to store connection settings, but it does not run Redis for you. - You still need a Redis server and typically the redis-py client installed in your environment. Docs - Prefect Redis integration: Redis integration - Redis block API: prefect.blocks.redis
- Prefect storage blocks (S3/GCS/Azure): Integrations catalog - PyTorch multiprocessing notes (map_location, spawn vs fork): PyTorch multiprocessing notes
o
@Marvin ok, from what i see i do not see where it is useful to use infer with "processpooltaskrunner" can you tell me what the preferred pattrn for model eval is?
m
thought for 37.0 seconds
Great question. For GPU model inference, ProcessPoolTaskRunner is rarely the right choice. Preferred patterns: 1) Single-GPU, high throughput (most common) - Load the model once in a single process and reuse it. - Use ThreadPoolTaskRunner to run multiple inference tasks concurrently in the same process. Threads can share the live CUDA model/tensors safely for pure inference. - Or, even better for throughput: keep inference inside one task/flow and batch internally to minimize orchestration overhead. Minimal pattern with a shared model across threads
Copy code
import torch, numpy as np
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

class TinyNet(torch.nn.Module):
    def __init__(self, d_in=4, d_out=2):
        super().__init__()
        <http://self.net|self.net> = torch.nn.Sequential(torch.nn.Linear(d_in, d_out), torch.nn.ReLU())
    def forward(self, x): return <http://self.net|self.net>(x)

@task
def infer(model, x):
    with torch.no_grad():
        return float(model(x).sum().item())

@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def gpu_infer_threadpool():
    device = "cuda"
    model = TinyNet().to(device).eval()
    batches = [torch.randn(32, 4, device=device) for _ in range(16)]
    futures = [infer.submit(model, x) for x in batches]
    return [f.result() for f in futures]

if __name__ == "__main__":
    print(gpu_infer_threadpool())
2) Multi-GPU scaling - Run one process per GPU. Each process loads its own model once and serves requests locally. - With Prefect, create multiple runs/workers, each pinned to a GPU (e.g., set CUDA_VISIBLE_DEVICES per worker/pod/container). Inside each run, use ThreadPoolTaskRunner or a single-task batching loop. - Avoid passing live CUDA objects across process boundaries. If you really need processes, use a per-worker cache: load model once per worker and only pass CPU-serializable inputs (numpy/bytes). 3) CPU-only or pure Python that does not release the GIL - ProcessPoolTaskRunner can help if your workload is CPU-bound Python that doesn’t release the GIL. PyTorch ops usually release the GIL, so threads are typically fine. Why not ProcessPool for GPU inference? - Each process creates its own CUDA context and duplicates GPU memory. - Passing models/tensors as task arguments triggers serialization and re-allocation in workers, which is expensive and can OOM at scale. - Even with spawn, small demos may “work” but will not be efficient or stable under load. Batching is king - For throughput, do fewer Prefect tasks and more intra-task batching on the GPU. - A common pattern: one task loads the model once and loops over a data iterator in mini-batches. If you need a distributed GPU scheduler - Keep Prefect as the orchestrator and use a GPU-serving layer (e.g., Triton/TorchServe) or a GPU framework (e.g., Ray with GPU resources) inside a task. Prefect coordinates jobs; the service or framework handles GPU scheduling. Docs - Task runners overview: Prefect 3.x task runners concept If you share a bit about your target throughput, number of GPUs, and deployment environment (local, k8s, VMs), I can sketch an architecture and a concrete Prefect deployment layout for it.
o
@Marvin so for me it is very important that i have my model available fast not slow. the model is ideally already in the ram in non ephemeral container, the workflow only should call the model and retrieve the results with only one load into the GPU from ram, is this a common pattern?
m
thought for 57.8 seconds
Yes — that’s a very common pattern. In low-latency inference, you generally want a long-lived process/container that: - Loads the model once at startup (from disk or RAM cache) - Moves it to the GPU once and keeps it resident - Serves requests over HTTP/gRPC or a queue without reloading weights How to do this with Prefect - Use Prefect to orchestrate, not to host the model. Run a dedicated model-serving service and have Prefect tasks/flows call it. - Good options for the serving tier: - NVIDIA Triton Inference Server (dynamic batching, multi-model, metrics) - TorchServe - A lightweight FastAPI/uvicorn service that loads the model at startup - Prefect tasks just call the service (HTTP/gRPC), pass inputs, get outputs. The service keeps the model warm on GPU. Why this is preferred - No per-request/deserialization of weights - No per-run GPU context/model re-init - Predictable, low tail latency and better throughput (especially with batching) Minimal shape of a setup 1) Model server (FastAPI)
Copy code
# app.py
import torch
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class TinyNet(torch.nn.Module):
    def __init__(self, d_in=4, d_out=2):
        super().__init__()
        <http://self.net|self.net> = torch.nn.Sequential(torch.nn.Linear(d_in, d_out), torch.nn.ReLU())
    def forward(self, x): return <http://self.net|self.net>(x)

device = "cuda"
model = TinyNet().to(device).eval()
warmup = torch.randn(32, 4, device=device)
with torch.inference_mode(): model(warmup)  # warm-up

class Inp(BaseModel):
    x: list[list[float]]  # small example; in prod prefer binary/npz

@app.post("/infer")
def infer(inp: Inp):
    x = torch.tensor(inp.x, dtype=torch.float32, device=device)
    with torch.inference_mode():
        y = model(x)
    return {"sum": float(y.sum().item())}
Run it:
Copy code
uvicorn app:app --host 0.0.0.0 --port 8000 --workers 1
- One worker per GPU is typical; use multiple threads if ops release the GIL. For conv nets, also consider
torch.backends.cudnn.benchmark = True
if input shapes are stable. 2) Prefect flow calls the service
Copy code
import httpx, numpy as np
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

@task
def call_model_server(payload):
    with httpx.Client(timeout=5.0) as client:
        resp = <http://client.post|client.post>("<http://model-server:8000/infer>", json=payload)
        resp.raise_for_status()
        return resp.json()

@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
def orchestrate_inference():
    batches = [np.random.randn(32, 4).astype("float32").tolist() for _ in range(8)]
    futures = [call_model_server.submit({"x": b}) for b in batches]
    return [f.result() for f in futures]
Notes and best practices - Multi-GPU: Run one model-server container per GPU (set CUDA_VISIBLE_DEVICES per container). Have Prefect route work accordingly or use a load-balancer aware of GPU capacity. - Batching: Let the server batch internally (critical for throughput). Triton/TorchServe do this out-of-the-box; you can implement request coalescing in FastAPI too. - Inputs: Keep payloads small. For large tensors, stage them in object storage and send pointers, or use an internal network file/share. - Warm start: Preload weights on startup, run a warm-up forward pass, and keep the process alive. Use liveness/readiness probes to ensure the GPU model is ready. - RAM caching: If you have a CPU RAM cache (e.g., Redis, memcached) for weights, use it only at startup; don’t fetch per request. For large models, object storage is a better source of truth and Redis can store pointers/versions. - Avoid ProcessPoolTaskRunner for GPU inference: it duplicates CUDA contexts/memory and adds serialization overhead. ThreadPoolTaskRunner is fine for client-side concurrency when calling a server. References - Prefect task runners overview: Task runners concept
- NVIDIA Triton: Triton Inference Server - TorchServe: TorchServe - PyTorch perf tips: Performance tuning guide
o
@Marvin when we say we want to avoid processpooltaskrunner, is there any use case connected to model inference that may be a good fit to processpooltaskrunner?
m
thought for 67.4 seconds
Short version: For GPU inference, ProcessPoolTaskRunner is almost never the best fit. But there are a few inference-adjacent cases where it can make sense. When ProcessPoolTaskRunner can be useful around inference - CPU-only inference that is GIL-bound: If your pipeline spends most time in pure-Python steps (custom feature engineering, tokenization, data munging) that do not release the GIL, processes can outscale threads. Load the model once per worker and cache it. - Isolation and stability: If some pre/post-processing code or a vendor lib occasionally leaks memory, deadlocks under threads, or crashes the interpreter, processes give you fault isolation. A crashed worker doesn’t take down your flow process. - BLAS/OpenMP oversubscription control: Some CPU libs spawn their own threads (MKL/OpenBLAS, XGBoost). Running them in separate processes (with OMP/MKL threads pinned low) can avoid thread pool thrashing in a threaded runner. - Multi-tenant sandboxing: If you run untrusted models/pipelines, per-process boundaries provide a minimal sandbox and clearer resource accounting. Caveats - Don’t pass live GPU objects between processes. Each process creates its own CUDA context and duplicates GPU memory. - Pickling/serialization adds latency; large weights or tensors will hurt throughput. - Workers are reused but not sticky to a specific task; use per-worker lazy caches, not per-task globals. A safe pattern (CPU-only inference, per-worker cache)
Copy code
import multiprocessing as mp
import numpy as np
import torch
import torch.nn as nn

from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner

# Per-worker cache (each process has its own copy)
_worker = {"model": None}

class TinyNet(nn.Module):
    def __init__(self, d_in=4, d_out=2):
        super().__init__()
        <http://self.net|self.net> = nn.Sequential(nn.Linear(d_in, d_out), nn.ReLU())
    def forward(self, x): return <http://self.net|self.net>(x)

@task
def infer_cpu_cached(x_np: np.ndarray) -> float:
    # Lazy init once per worker process
    if _worker["model"] is None:
        m = TinyNet().to("cpu").eval()
        # load weights if you have them, e.g., m.load_state_dict(torch.load("<http://model.pt|model.pt>", map_location="cpu"))
        _worker["model"] = m

    x = torch.from_numpy(x_np).to("cpu")
    with torch.no_grad():
        y = _worker<x|"model">
        return float(y.sum().item())

@flow(task_runner=ProcessPoolTaskRunner(max_workers=4))
def cpu_infer_flow():
    batches = [np.random.randn(32, 4).astype("float32") for _ in range(16)]
    futs = [infer_cpu_cached.submit(b) for b in batches]
    return [f.result() for f in futs]

if __name__ == "__main__":
    mp.set_start_method("spawn", force=True)  # ensure real process isolation
    print(cpu_infer_flow())
Preferred patterns for GPU inference - Single GPU: Keep a single long-lived process with the model resident on the GPU; use ThreadPoolTaskRunner or, better, run a model server (FastAPI/Triton/TorchServe) and have Prefect call it. - Multi GPU: One process/container per GPU, model loaded once per container; coordinate with Prefect at the job level, not with a process pool inside a single run. Rule of thumb - GPU inference: use threads in one process or an external model server. - Need process-based parallelism? Do it for CPU-only stages, isolation, or unstable code — not to share a single GPU across multiple worker processes. Refs - Prefect task runners overview: Task runners concept - PyTorch multiprocessing guidance: PyTorch multiprocessing notes
o
@Marvin but I have multiple GPU. how do i do it for two gpu? do nest a threadpooltaskrunner inside a processpooltaskrunner? or is this dumb?
m
thought for 70.8 seconds
Great question. For two GPUs, don’t nest task runners inside each other. Pick one of these two patterns: Pattern A: Single process drives both GPUs (simple, fast) - Load one model per device in a single process. - Use ThreadPoolTaskRunner to fan out requests; route each request to the right model/device. - No process boundaries, so no pickling; fastest startup and lowest overhead. Example
Copy code
import torch
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner

class TinyNet(torch.nn.Module):
    def __init__(self): 
        super().__init__()
        <http://self.net|self.net> = torch.nn.Sequential(torch.nn.Linear(4, 2), torch.nn.ReLU())
    def forward(self, x): return <http://self.net|self.net>(x)

@task
def infer(model, x):
    with torch.no_grad():
        return float(model(x).sum().item())

@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def two_gpu_threads():
    m0 = TinyNet().to("cuda:0").eval()
    m1 = TinyNet().to("cuda:1").eval()
    # Build device-specific batches
    b0 = [torch.randn(32, 4, device="cuda:0") for _ in range(8)]
    b1 = [torch.randn(32, 4, device="cuda:1") for _ in range(8)]
    futures = [infer.submit(m0, x) for x in b0] + [infer.submit(m1, x) for x in b1]
    return [f.result() for f in futures]
When to use: Single service/process managing multiple GPUs is okay for inference; PyTorch handles targeting cuda:0 and cuda:1 cleanly. This avoids process-spawn cost and duplicate CUDA contexts. Pattern B: One process per GPU (isolation; also common) - Run one long-lived process per GPU, each loading the model once. - In Prefect, either: - Create two deployments (same code), pin each to a GPU via environment (CUDA_VISIBLE_DEVICES) and run them concurrently; or - Use a parent flow with a ProcessPoolTaskRunner to start one task per GPU. Inside each task, set the GPU ID, load the model once, then process a list of requests. Optionally use plain Python threads inside the task for per-GPU concurrency. Example (per-GPU worker tasks; no cross-process model passing)
Copy code
import os, numpy as np, torch
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner

class TinyNet(torch.nn.Module):
    def __init__(self):
        super().__init__()
        <http://self.net|self.net> = torch.nn.Sequential(torch.nn.Linear(4, 2), torch.nn.ReLU())
    def forward(self, x): return <http://self.net|self.net>(x)

@task
def gpu_worker(gpu_id: int, batches: list[np.ndarray]) -> list[float]:
    os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)  # pin process to a single GPU
    device = "cuda:0"  # becomes the visible GPU in this process
    model = TinyNet().to(device).eval()
    with torch.no_grad():
        out = []
        for b in batches:
            x = torch.from_numpy(b).to(device)
            out.append(float(model(x).sum().item()))
        return out

@flow(task_runner=ProcessPoolTaskRunner(max_workers=2))
def two_gpu_processes():
    b0 = [np.random.randn(32, 4).astype("float32") for _ in range(8)]
    b1 = [np.random.randn(32, 4).astype("float32") for _ in range(8)]
    f0 = gpu_worker.submit(0, b0)
    f1 = gpu_worker.submit(1, b1)
    return f0.result() + f1.result()
Notes: - We only pass CPU-serializable arrays to processes; each process loads the model once and keeps it on its GPU. No pickling of live CUDA objects. - If you want per-GPU concurrency, do it inside gpu_worker with a Python thread pool and a shared model. Is nesting ThreadPoolTaskRunner inside ProcessPoolTaskRunner dumb? - Don’t try to “nest” task runners in one flow. Instead, structure as above: - Either one flow with ThreadPoolTaskRunner managing both GPUs in-process, or - A parent flow with ProcessPoolTaskRunner starting one worker task per GPU; each worker can use regular threads internally if needed. - If your goal is serving, a cleaner approach is one model-serving container per GPU and Prefect orchestrates requests/jobs to each service. Scaling beyond two GPUs
- Replicate Pattern B: one process/container per GPU (set CUDA_VISIBLE_DEVICES per container). Prefect schedules runs across them. - Or use a GPU-aware serving layer (NVIDIA Triton, TorchServe) with one instance per GPU and let Prefect call the service. References - Prefect 3.x task runners: Task runners concept - PyTorch multiprocessing and CUDA guidance: PyTorch multiprocessing notes
o
@Marvin what ML relevant parameters can i pass to @task ? which are often used in training and inference context what is the gain ?
m
thought for 117.0 seconds
Great question. In Prefect 3.x, the most ML-relevant knobs on
@task
help you get resilience, speed, and observability for training/inference. Here are the ones teams use most and why they matter: Core reliability and performance - retries, retry_delay_seconds, retry_jitter_factor - Why: absorb transient failures (e.g., dataloader hiccups, brief GPU OOM/retry after GC, flaky network fetches). - Example:
Copy code
@task(retries=5, retry_delay_seconds=[1, 2, 4, 8, 16])
  def fetch_batch(...): ...
- timeout_seconds - Why: bound worst-case latency; fail-fast or reschedule if a step hangs (I/O stall, deadlock). - Example:
Copy code
@task(timeout_seconds=600)
  def train_epoch(...): ...
- cache_key_fn, cache_expiration, refresh_cache, cache_result_in_memory - Why: memoize deterministic work (tokenization, feature extraction, small inference outputs) to save time/money. - Tips: only cache small/serializable results; set cache_result_in_memory=False if outputs are large to avoid RAM bloat. - Example:
Copy code
import hashlib, json, datetime as dt

  def hash_inputs(ctx, params):
      keyable = {"model_ver": params["model_ver"], "inputs": params["inputs"]}
      return hashlib.sha256(json.dumps(keyable, sort_keys=True).encode()).hexdigest()

  @task(cache_key_fn=hash_inputs, cache_expiration=dt.timedelta(hours=12), cache_result_in_memory=False)
  def preprocess_or_infer(inputs, model_ver: str): ...
Results and artifacts - persist_result, result_storage, result_storage_key, result_serializer - Why: persist metrics, predictions, and small artifacts to object storage (S3/GCS/Azure) so downstream tasks/runs can fetch them. - Recommended: persist small JSON/CSV metrics via Prefect results; store big tensors/models with your storage SDK (S3/GCS) directly in the task. - Example:
Copy code
from prefect.results import ResultStorage
  from prefect.results.serializers import JSONSerializer

  rs = ResultStorage.from_url("<s3://my-bucket/prefix>")  # configure creds via env/blocks

  @task(persist_result=True, result_storage=rs, result_storage_key="predictions/{run_id}.json", result_serializer=JSONSerializer())
  def write_preds(preds): return {"n": len(preds)}
Observability and control - log_prints - Why: capture training logs, batch/epoch metrics, and kernel warnings in Prefect logs. - Example:
Copy code
@task(log_prints=True)
  def train_step(...): print("loss=", loss)
- tags, version, task_run_name - Why: slice/filter in the UI and attach lineage to model/data versions. - Examples:
Copy code
@task(tags={"inference", "cuda"}, version="model=v2")
  def infer(...): ...

  @task(task_run_name=lambda p: f"infer-{p['model_ver']}-{p['batch_id']}")
  def infer(...): ...
- on_completion, on_failure - Why: notify, push metrics to MLflow/W&B, free external resources, or write checkpoints on failure. - Example:
Copy code
def notify_failure(state, obj):
      # post to Slack / record incident
      ...

  @task(on_failure=[notify_failure])
  def train_epoch(...): ...
Putting it together (inference-friendly example) ``` import datetime as dt, hashlib, json, numpy as np, torch from prefect import task def hash_payload(ctx, params): # Use a stable, small representation (IDs, hashes, or shapes), not raw tensors keyable = {"model_ver": params["model_ver"], "inputs": params["inputs"]} return hashlib.sha256(json.dumps(keyable, sort_keys=True).encode()).hexdigest() @task( retries=3, retry_delay_seconds=[1, 2, 5], timeout_seconds=30, cache_key_fn=hash_payload, cache_expiration=dt.timedelta(hours=1), cache_result_in_memory=False, log_prints=True, tags={"inference", "cuda"}, version="model=v2", task_run_name=lambda p: f"infer-v2-batch-{p['batch_id']}", ) def run_inference(model, inputs, model_ver: str, batch_id: int): with torch.inference_mode(): x = torch.tensor(inputs, dtype=torch.float32, device=next(model.parameters()).device)
y = model(x) out = y.softmax(-1).cpu().numpy() print("batch", batch_id, "mean_prob", float(out.mean())) return out.tolist() ``` When to use which (training vs inference) - Training - Use retries and timeouts around data loading/remote IO; log_prints for progress; hooks for reporting; persist_result for metrics checkpoints; cache preprocessing if deterministic. - For large artifacts (checkpoints), write directly to object storage inside the task and return only metadata/URIs via Prefect results. - Inference - Prefer a long-lived model server kept warm on GPU; Prefect tasks call it. If calling the model directly in a flow, keep the model in-process and use ThreadPoolTaskRunner; leverage caching for deterministic pre/post steps and persist small outputs when needed. Notes -
task_runner
is configured on the
@flow
, not on
@task
. - Avoid caching or persisting giant tensors via Prefect results; use object storage for heavy binaries and keep Prefect results to metadata. Docs - Caching: Caching in Prefect 3.x - Results API: Results and serializers - Task decorator source (parameter reference): @task source
o
@Marvin write an example which stores artifacts and resutlts to s3 Results and artifacts - persist_result, result_storage, result_storage_key, result_serializer - Why: persist metrics, predictions, and small artifacts to object storage (S3/GCS/Azure) so downstream tasks/runs can fetch them. - Recommended: persist small JSON/CSV metrics via Prefect results; store big tensors/models with your storage SDK (S3/GCS) directly in the task. - Example:
Copy code
from prefect.results import ResultStorage
  from prefect.results.serializers import JSONSerializer

  rs = ResultStorage.from_url("<s3://my-bucket/prefix>")  # configure creds via env/blocks

  @task(persist_result=True, result_storage=rs, result_storage_key="predictions/{run_id}.json", result_serializer=JSONSerializer())
  def write_preds(preds): return {"n": len(preds)}
m
thought for 74.3 seconds
Here’s a complete Prefect 3.x example that: - Persists small results (metrics/predictions) to S3 using Prefect’s results system - Uploads large artifacts (a model checkpoint and a NumPy array) directly to S3 via prefect-aws - Returns only lightweight metadata/URIs to Prefect Assumptions - You have AWS credentials available (env/instance profile) and have
prefect[aws]
and
prefect-aws
installed. - Replace bucket/prefix values as needed. ``` import io import os import json import time import numpy as np import torch import torch.nn as nn from prefect import flow, task from prefect.task_runners import ThreadPoolTaskRunner # Prefect results to S3 (for small JSON-like objects) from prefect.results import ResultStore from prefect.serializers import JSONSerializer # Direct S3 artifact handling (for big binaries) from prefect_aws.s3 import S3Bucket # ----- Model definition ----- class TinyNet(nn.Module): def __init__(self, d_in=4, d_out=2): super().__init__() self.net = nn.Sequential(nn.Linear(d_in, d_out), nn.ReLU()) def forward(self, x): return self.net(x) # ----- Configure result storage (small JSON) ----- # Store small results as JSON under s3://my-bucket/prefect-results/ RESULTS_BUCKET_URL = "s3://my-bucket/prefect-results/" result_store = ResultStore( # ResultStore can take a RemoteFileSystem; using URL shorthand: result_storage=None, # let Prefect resolve from URL below serializer=JSONSerializer(), ) # When using the decorator, we’ll pass result_storage=ResultStore.from_url(...). # Using from_url gives a quick shorthand: result_store = ResultStore.from_url(RESULTS_BUCKET_URL, serializer=JSONSerializer()) # ----- Configure an S3Bucket block client for large artifacts ----- # You can also create and load a saved block by name, but direct instantiation works with env creds. artifacts_bucket = S3Bucket(bucket_name="my-bucket", basepath="artifacts/") # ----- Tasks ----- @task def train_and_save_model(ckpt_key: str) -> dict: model = TinyNet().to("cpu").train() # Fake training for _ in range(2): x = torch.randn(16, 4) y = model(x).sum() y.backward() for p in model.parameters(): p.data -= 0.001 * p.grad p.grad = None # Save CPU state_dict to bytes buf = io.BytesIO() torch.save(model.state_dict(), buf) buf.seek(0) # Upload checkpoint to S3 # Example key: artifacts/models/run-123/model.pt artifacts_bucket.upload_from_file_object(file_object=buf, to_path=ckpt_key) return { "checkpoint_s3": f"s3://{artifacts_bucket.bucket_name}/{artifacts_bucket.basepath}{ckpt_key}", "sha": None, # optionally compute hash } @task def generate_predictions(ckpt_key: str, n_batches: int = 3) -> tuple[list[list[float]], dict]: # Download checkpoint buf = io.BytesIO() artifacts_bucket.download_object(ckpt_key, buf) buf.seek(0) # Load model model = TinyNet().to("cpu").eval() state = torch.load(buf, map_location="cpu") model.load_state_dict(state) # Predict preds = [] for _ in range(n_batches): x = torch.randn(32, 4) with torch.no_grad(): y = model(x) preds.append(y.softmax(-1).numpy().tolist()) # Example metrics metrics = {"mean_prob": float(np.mean([np.mean(b) for b in preds]))} return preds, metrics @task def upload_numpy_artifact(arr: np.ndarray, key: str) -> str: # Save ndarry to npy bytes and upload buf = io.BytesIO() np.save(buf, arr) buf.seek(0) artifacts_bucket.upload_from_file_object(buf, key) return f"s3://{artifacts_bucket.bucket_name}/{artifacts_bucket.basepath}{key}" # Persist small JSON results via Prefect Results to S3 @task( persist_result=True, result_storage=result_store, result_storage_key="predictions/{run_id}.json", result_serializer=JSONSerializer(), ) def persist_small_predictions(preds: list[list[float]]) -> dict:
# Return a small structure; Prefect writes JSON to S3 return {"n_batches": len(preds), "n_rows": sum(len(b) for b in preds)} @task( persist_result=True, result_storage=result_store, result_storage_key="metrics/{run_id}.json", result_serializer=JSONSerializer(), ) def persist_metrics(metrics: dict) -> dict: return metrics # ----- Flow ----- @flow(name="s3-artifacts-and-results", task_runner=ThreadPoolTaskRunner(max_workers=4)) def main(): run_id = str(int(time.time())) # Where to put the model checkpoint and an example large array ckpt_key = f"models/run-{run_id}/model.pt" arr_key = f"arrays/run-{run_id}/embeddings.npy" # 1) Train and store large artifact (checkpoint) to S3 ckpt_meta = train_and_save_model.submit(ckpt_key).result() # 2) Run inference using the stored checkpoint preds, metrics = generate_predictions.submit(ckpt_key).result() # 3) Persist small results via Prefect results (JSON) — lands under s3://my-bucket/prefect-results/ pred_meta = persist_small_predictions.submit(preds).result() metrics_meta = persist_metrics.submit(metrics).result() # 4) Store a large numpy artifact directly to S3 embeddings = np.random.randn(10_000, 128).astype("float32") embeddings_uri = upload_numpy_artifact.submit(embeddings, arr_key).result() return { "checkpoint_uri": ckpt_meta["checkpoint_s3"], "predictions_manifest": pred_meta, # stored by Prefect in S3 "metrics_manifest": metrics_meta, # stored by Prefect in S3 "embeddings_uri": embeddings_uri, # direct S3 "run_id": run_id, } if name == "__main__": out = main() print(json.dumps(out, indent=2)) ``` How this works - Small JSON results:
persist_result=True
with
ResultStore.from_url("s3://...")
and
JSONSerializer()
writes task returns to S3 as JSON. - Large artifacts: Use
prefect-aws
S3Bucket to upload/download bytes directly; tasks return only URIs/metadata. - You can mix and match: keep small metadata in Prefect’s results, large binaries in S3. Useful links - Prefect results overview: Results API - Serializers: Serializers - prefect-aws S3 utilities and block: prefect-aws S3
o
@Marvin no is use MinIO as s3 bucket
m
thought for 101.8 seconds
Got it — here’s a MinIO-friendly example that: - Persists small JSON results to your MinIO bucket using Prefect’s Results - Stores large artifacts (model checkpoint, NumPy arrays) directly to MinIO via a Prefect S3Bucket block - Avoids pickling GPU objects and only returns lightweight metadata/URIs Notes - Prefect’s Results use s3fs/boto under the hood. Set your MinIO endpoint and credentials via environment variables so the S3 URL resolves correctly. - For large binaries, use a Prefect
S3Bucket
block configured for MinIO (endpoint + creds), then
load
the block in code. Prep - Create a Prefect block: S3Bucket named
minio-artifacts
with: - Bucket name: your MinIO bucket (e.g.,
my-minio-bucket
) - Basepath:
artifacts/
(optional) - Credentials: MinIO credentials - Endpoint URL: your MinIO endpoint (e.g., `http://localhost:9000`) - Ensure these environment variables are set where your flow runs: -
AWS_ACCESS_KEY_ID
,
AWS_SECRET_ACCESS_KEY
,
AWS_DEFAULT_REGION=us-east-1
-
AWS_ENDPOINT_URL=<http://localhost:9000>
(your MinIO endpoint) Example ``` import io import os import json import time import numpy as np import torch import torch.nn as nn from prefect import flow, task from prefect.task_runners import ThreadPoolTaskRunner # Prefect Results to MinIO via S3-compatible URL from prefect.results import ResultStore from prefect.serializers import JSONSerializer # MinIO artifacts via Prefect AWS S3 bucket block from prefect_aws.s3 import S3Bucket # ----- Ensure MinIO env is present (or set in your deployment/worker env) ----- # os.environ["AWS_ACCESS_KEY_ID"] = "minio" # os.environ["AWS_SECRET_ACCESS_KEY"] = "miniosecret" # os.environ["AWS_DEFAULT_REGION"] = "us-east-1" # os.environ["AWS_ENDPOINT_URL"] = "http://localhost:9000" # ----- Model ----- class TinyNet(nn.Module): def __init__(self, d_in=4, d_out=2): super().__init__() self.net = nn.Sequential(nn.Linear(d_in, d_out), nn.ReLU()) def forward(self, x): return self.net(x) # ----- Prefect Results store (small JSON) to MinIO ----- # IMPORTANT: s3fs/boto will target MinIO using AWS_ENDPOINT_URL RESULTS_URL = "s3://my-minio-bucket/prefect-results/" result_store = ResultStore.from_url(RESULTS_URL, serializer=JSONSerializer()) # ----- S3Bucket block (large artifacts) configured for MinIO in Prefect UI ----- artifacts_bucket = S3Bucket.load("minio-artifacts") @task def train_and_save_model(ckpt_key: str) -> dict: model = TinyNet().to("cpu").train() # Fake "training" for _ in range(2): x = torch.randn(16, 4) y = model(x).sum() y.backward() for p in model.parameters(): p.data -= 0.001 * p.grad p.grad = None # Save CPU state_dict to bytes buf = io.BytesIO() torch.save(model.state_dict(), buf) buf.seek(0) # Upload to MinIO via S3Bucket block artifacts_bucket.upload_from_file_object(file_object=buf, to_path=ckpt_key) return { "checkpoint_s3": f"s3://{artifacts_bucket.bucket_name}/{artifacts_bucket.basepath}{ckpt_key}", } @task def generate_predictions(ckpt_key: str, n_batches: int = 3) -> tuple[list[list[float]], dict]: # Download checkpoint buf = io.BytesIO() artifacts_bucket.download_object(ckpt_key, buf) buf.seek(0) # Load model and run inference on CPU for demo model = TinyNet().to("cpu").eval() state = torch.load(buf, map_location="cpu") model.load_state_dict(state) preds = [] for _ in range(n_batches): x = torch.randn(32, 4) with torch.no_grad(): y = model(x) preds.append(y.softmax(-1).numpy().tolist()) metrics = {"mean_prob": float(np.mean([np.mean(b) for b in preds]))} return preds, metrics @task def upload_numpy_artifact(arr: np.ndarray, key: str) -> str: buf = io.BytesIO() np.save(buf, arr) buf.seek(0) artifacts_bucket.upload_from_file_object(buf, key)
return f"s3://{artifacts_bucket.bucket_name}/{artifacts_bucket.basepath}{key}" # Persist small JSON results via Prefect Results (to MinIO) @task( persist_result=True, result_storage=result_store, result_storage_key="predictions/{run_id}.json", result_serializer=JSONSerializer(), ) def persist_small_predictions(preds: list[list[float]]) -> dict: return {"n_batches": len(preds), "n_rows": sum(len(b) for b in preds)} @task( persist_result=True, result_storage=result_store, result_storage_key="metrics/{run_id}.json", result_serializer=JSONSerializer(), ) def persist_metrics(metrics: dict) -> dict: return metrics @flow(name="minio-artifacts-and-results", task_runner=ThreadPoolTaskRunner(max_workers=4)) def main(): run_id = str(int(time.time())) ckpt_key = f"models/run-{run_id}/model.pt" arr_key = f"arrays/run-{run_id}/embeddings.npy" ckpt_meta = train_and_save_model.submit(ckpt_key).result() preds, metrics = generate_predictions.submit(ckpt_key).result() pred_manifest = persist_small_predictions.submit(preds).result() metrics_manifest = persist_metrics.submit(metrics).result() embeddings = np.random.randn(10_000, 128).astype("float32") embeddings_uri = upload_numpy_artifact.submit(embeddings, arr_key).result() return { "checkpoint_uri": ckpt_meta["checkpoint_s3"], "predictions_manifest": pred_manifest, # JSON in MinIO via Prefect Results "metrics_manifest": metrics_manifest, # JSON in MinIO via Prefect Results "embeddings_uri": embeddings_uri, # Binary in MinIO via S3Bucket block "run_id": run_id, } if name == "__main__": out = main() print(json.dumps(out, indent=2)) ``` Key points - Results (small JSON) go to
<s3://my-minio-bucket/prefect-results/>
— MinIO is picked up via
AWS_ENDPOINT_URL
. - Large binaries (checkpoints, arrays) use the
S3Bucket
block that’s explicitly configured for MinIO (endpoint + creds). No special code is required beyond
S3Bucket.load("minio-artifacts")
. Helpful links - prefect-aws (S3, MinIO creds): prefect-aws - Serializers and results: Serializers and Results API If you prefer not to rely on environment variables for Results, we can switch to storing “results” via the S3Bucket block as JSON files as well — just say the word and I’ll show that version.
o
@Marvin i do not have AWS, can i use another block? # MinIO artifacts via Prefect AWS S3 bucket block from prefect_aws.s3 import S3Bucket also i am local for now and want to change later just the endpoint
m
thought for 462.6 seconds
Yes — you don’t need an AWS account. The S3Bucket block works with any S3‑compatible service (including MinIO). You just point the block at your MinIO endpoint and use your MinIO access/secret keys. Later, you can switch endpoints by updating the saved block without touching your flow code. Quick setup (Prefect 3.x): 1) Install the integration:
Copy code
pip install "prefect-aws"
2) Create and save an S3Bucket block configured for MinIO:
Copy code
from prefect_aws.s3 import S3Bucket
from prefect_aws.credentials import AwsCredentials
from prefect_aws.client_parameters import AwsClientParameters

# MinIO credentials (use your values)
creds = AwsCredentials(
    aws_access_key_id="minio",
    aws_secret_access_key="minio123",
)

# Point the AWS client at MinIO
client_params = AwsClientParameters(
    endpoint_url="<http://localhost:9000>",  # or <https://your-minio.example.com>
    region_name="us-east-1",
    # If you're using self-signed certs locally, you may need:
    # verify=False
    # Some MinIO setups also need path-style addressing; if required in your env,
    # set the botocore config accordingly in AwsClientParameters (addressing_style='path').
)

s3_block = S3Bucket(
    bucket_name="my-bucket",
    credentials=creds,
    aws_client_parameters=client_params,
)

s3_block.save("minio-bucket", overwrite=True)
3) Use it in your flows (no code changes needed when you later update the block’s endpoint):
Copy code
from prefect import flow
from prefect_aws.s3 import S3Bucket

@flow
def example():
    s3 = S3Bucket.load("minio-bucket")
    # Upload a file
    s3.upload_from_path("local/path/file.parquet", "artifacts/file.parquet")
    # Download a file
    s3.download_object_to_path("artifacts/file.parquet", "local/path/file.parquet")
When you’re ready to switch from local MinIO to another endpoint (or to AWS S3 later), just edit the saved block (in the UI or via code) to change
endpoint_url
and credentials — your flow code that loads
S3Bucket.load("minio-bucket")
stays the same. Notes and tips: - For MinIO, you often need path‑style addressing; if your setup requires it, configure the client to use
addressing_style='path'
. - If you’re using TLS with a self‑signed cert locally, set
verify=False
(or provide a CA bundle path) in the client parameters. - Prefect also exposes a
MinIOCredentials
block if you prefer that over
AwsCredentials
, but
S3Bucket
+
AwsCredentials
+
endpoint_url
is a common, simple setup. If your goal is to store flow code for deployments in MinIO as well, you can use this same S3Bucket block as your deployment storage so changing the endpoint later is just a block update. If that’s your plan, I can share a minimal deployment example tailored to your setup. Docs: - Prefect AWS integration (v3) - Blocks in Prefect 3.x - Store flow code in remote storage Quick check: are you planning to use MinIO just for files/artifacts during runs, or also as storage for your deployment’s flow code? If you share that (and whether your MinIO uses TLS), I’ll provide the exact config you need.
o
@Marvin Traceback (most recent call last): File "/workflow-orchestrator-example-prefect/results_and_artifacts/0_results_number_one", line 36, in <module> result_store = ResultStore.from_url(RESULTS_URL, serializer=JSONSerializer()) ^^^^^^^^^^^^^^^^^^^^ File "/home/labor/.local/lib/python3.12/site-packages/pydantic/_internal/_model_construction.py", line 289, in getattr raise AttributeError(item) AttributeError: from_url. Did you mean: 'from_orm'?
m
thought for 203.4 seconds
You’re hitting that error because
ResultStore
doesn’t have a
from_url
method in Prefect 3.x. Instead, construct a
ResultStore
with a filesystem block (local or remote) and a serializer. Here are working patterns you can use. Local (good for “I’m local for now”):
Copy code
pip install "prefect>=3"  # if not installed
Copy code
from prefect.results import ResultStore
from prefect.filesystems import LocalFileSystem
from prefect.serializers import JSONSerializer

store = ResultStore(
    result_storage=LocalFileSystem(basepath="/tmp/prefect-results"),
    serializer=JSONSerializer(),
)

key = store.write({"x": 1})
rec = store.read(key)
print(rec.value)  # {'x': 1}
MinIO via S3-compatible endpoint (switchable later by updating a saved block): - Install s3fs (fsspec driver for s3://)
Copy code
pip install s3fs
Create and save a RemoteFileSystem block pointing at MinIO:
Copy code
from prefect.filesystems import RemoteFileSystem

rfs = RemoteFileSystem(
    basepath="<s3://my-bucket/results/>",  # still use s3://, just point the client at MinIO
    settings={
        "key": "minio",                # your MinIO access key
        "secret": "minio123",          # your MinIO secret key
        "client_kwargs": {
            "endpoint_url": "<http://localhost:9000>"  # or <https://minio.example.com>
        },
        "use_ssl": False,              # set True if using TLS
        # If your MinIO needs path-style addressing, uncomment:
        # "config_kwargs": {"s3": {"addressing_style": "path"}},
    },
)
rfs.save("minio-results", overwrite=True)
Use that block in your code (no URL parsing needed):
Copy code
from prefect.results import ResultStore
from prefect.filesystems import RemoteFileSystem
from prefect.serializers import JSONSerializer

store = ResultStore(
    result_storage=RemoteFileSystem.load("minio-results"),
    serializer=JSONSerializer(),
)

key = store.write({"ok": True})
print(store.read(key).value)
Later, to move from local MinIO to another endpoint or AWS S3, just update the saved
RemoteFileSystem
block (change
endpoint_url
, creds, SSL) — your flow code that loads the block by name doesn’t change. Notes: - If you prefer, you can set result storage on tasks/flows directly:
Copy code
from prefect import task

  rfs = RemoteFileSystem.load("minio-results")

  @task(result_storage=rfs, result_serializer="json")
  def do_work():
      return {"status": "done"}
- Artifacts and results are different in Prefect 3.x: results are data returned by tasks/flows; artifacts are UI-visible outputs. If you meant artifacts, I can show how to create them too. Docs: - Results (Prefect 3.x) - Filesystems API - Serializers API If you share your exact MinIO setup (TLS or not, path-style needed or not), I’ll tailor the
RemoteFileSystem.settings
for you.
o
@Marvin ile "/uvd-workflow-orchestrator-example-prefect/results_and_artifacts/0_create_bucket_s3fs.py", line 35, in <module> print(store.read(key).value) ^^^^^^^^^^^^^^^ File "/home/labor/.local/lib/python3.12/site-packages/prefect/results.py", line 631, in read return self._read(key=key, holder=holder, _sync=True) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/labor/.local/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 347, in coroutine_wrapper return run_coro_as_sync(ctx_call()) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/labor/.local/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 207, in run_coro_as_sync return call.result() ^^^^^^^^^^^^^ File "/home/labor/.local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 365, in result return self.future.result(timeout=timeout) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/labor/.local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 192, in result return self.__get_result() ^^^^^^^^^^^^^^^^^^^ File "/usr/local/lib/python3.12/concurrent/futures/_base.py", line 401, in __get_result raise self._exception File "/home/labor/.local/lib/python3.12/site-packages/prefect/_internal/concurrency/calls.py", line 441, in _run_async result = await coro ^^^^^^^^^^ File "/home/labor/.local/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 188, in coroutine_wrapper return await task ^^^^^^^^^^ File "/home/labor/.local/lib/python3.12/site-packages/prefect/utilities/asyncutils.py", line 341, in ctx_call result = await async_fn(*args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/labor/.local/lib/python3.12/site-packages/prefect/results.py", line 601, in _read content = await call_explicitly_async_block_method( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/labor/.local/lib/python3.12/site-packages/prefect/_internal/compatibility/blocks.py", line 21, in call_explicitly_async_block_method return await getattr(block, method).aio(block, *args, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/labor/.local/lib/python3.12/site-packages/prefect/filesystems.py", line 506, in read_path path = self._resolve_path(path) ^^^^^^^^^^^^^^^^^^^^^^^^ File "/home/labor/.local/lib/python3.12/site-packages/prefect/filesystems.py", line 425, in _resolve_path return f"{self.basepath.rstrip('/')}/{urlpath.lstrip('/')}" ^^^^^^^^^^^^^^^^^^^ TypeError: a bytes-like object is required, not 'str'
m
thought for 73.1 seconds
Thanks for the stack trace — that error is coming from mixing bytes and strings in the path handling. In Prefect 3.x,
ResultStore.read()
expects the key to be a string. Your key (or less likely, your
basepath
) is bytes, which triggers this when Prefect tries to do
urlpath.lstrip('/')
. What to check/fix: - Ensure the key you pass to
store.read(key)
is a str, not bytes. - If you persisted the key to disk or received it as bytes, decode it:
key = key.decode()
before calling
read
. - Do not pass
key=b"..."
to
store.write(...)
. Keys must be strings. - If you use a custom
storage_key_fn
, make sure it returns a str. - Ensure your filesystem block’s
basepath
is a str (e.g.,
"<s3://my-bucket/results/>"
), not bytes. Quick diagnostic:
Copy code
print(type(key), repr(key))  # should be <class 'str'>
If it prints bytes, decode it:
Copy code
if isinstance(key, (bytes, bytearray)):
    key = key.decode("utf-8")
Minimal working example (local):
Copy code
from prefect.results import ResultStore
from prefect.filesystems import LocalFileSystem
from prefect.serializers import JSONSerializer

store = ResultStore(
    result_storage=LocalFileSystem(basepath="/tmp/prefect-results"),
    serializer=JSONSerializer(),
)

key = store.write({"x": 1})
print(type(key), key)           # should be str
rec = store.read(key)
print(rec.value)                # {'x': 1}
MinIO (via s3fs) example:
Copy code
# pip install s3fs
from prefect.results import ResultStore
from prefect.filesystems import RemoteFileSystem
from prefect.serializers import JSONSerializer

rfs = RemoteFileSystem(
    basepath="<s3://my-bucket/results/>",
    settings={
        "key": "minio",
        "secret": "minio123",
        "client_kwargs": {"endpoint_url": "<http://localhost:9000>"},
        "use_ssl": False,
        # If needed:
        # "config_kwargs": {"s3": {"addressing_style": "path"}},
    },
)

store = ResultStore(result_storage=rfs, serializer=JSONSerializer())

key = store.write({"ok": True})
print(type(key), key)  # should be str
print(store.read(key).value)
Common patterns that cause this error: - Using
key=b"...bytes..."
when calling
store.write(...)
- Loading a saved key from a binary file or a message queue without decoding - Returning bytes from a custom
storage_key_fn
If you can share the few lines where
key
is created (and how it’s stored/passed into
read
), I can pinpoint the exact fix. Docs: - Results (Prefect 3.x)
o
@Marvin print(type(key), key) results to <class 'NoneType'> None
m
thought for 51.5 seconds
Ah — that explains it. In Prefect 3.x,
ResultStore.write(...)
returns None. It does not return the key. You need to generate (or supply) a key yourself, pass it to
write
, then use that same key to
read
. Correct pattern:
Copy code
from uuid import uuid4
from prefect.results import ResultStore
from prefect.filesystems import RemoteFileSystem
from prefect.serializers import JSONSerializer

rfs = RemoteFileSystem(
    basepath="<s3://my-bucket/results/>",
    settings={
        "key": "minio",
        "secret": "minio123",
        "client_kwargs": {"endpoint_url": "<http://localhost:9000>"},
        "use_ssl": False,
        # "config_kwargs": {"s3": {"addressing_style": "path"}},  # if your MinIO needs it
    },
)

store = ResultStore(result_storage=rfs, serializer=JSONSerializer())

# Generate a key (you can use your own convention)
key = f"results/{uuid4()}.json"  # or: key = store.storage_key_fn()

store.write({"ok": True}, key=key)  # write returns None
rec = store.read(key)
print(rec.value)  # {'ok': True}
If you want an automatic key generator: - Use
store.storage_key_fn()
to generate a key each time, or - Provide a custom
storage_key_fn
when constructing the `ResultStore`:
Copy code
from uuid import uuid4

store = ResultStore(
    result_storage=rfs,
    serializer=JSONSerializer(),
    storage_key_fn=lambda: f"results/{uuid4()}.json",
)

key = store.storage_key_fn()
store.write({"ok": True}, key=key)
print(store.read(key).value)
Quick sanity checks: -
print(type(key), key)
should show a string (not None, not bytes). - Ensure your
basepath
is a string like
"<s3://my-bucket/results/>"
. Docs: - Results (Prefect 3.x) - Filesystems API - Serializers API If you share your current snippet (how you build
store
and the call to
write
), I’ll adapt it precisely.
o
@Marvin so this just dumps the results in the store, can i name them with the flow run id or task id string? from prefect.filesystems import RemoteFileSystem rfs = RemoteFileSystem( basepath="s3://prefect-test/results/", # still use s3://, just point the client at MinIO settings={ "key": "um_minio", # your MinIO access key "secret": "um_minio", # your MinIO secret key "client_kwargs": { "endpoint_url": "http://host.docker.internal:9900" # or https://minio.example.com }, "use_ssl": False, # set True if using TLS # If your MinIO needs path-style addressing, uncomment: # "config_kwargs": {"s3": {"addressing_style": "path"}}, }, ) rfs.save("prefect-results-minio-s3", overwrite=True) ##### from prefect.results import ResultStore from prefect.filesystems import RemoteFileSystem from prefect import flow, task rfs = RemoteFileSystem.load("prefect-results-minio-s3") @task(result_storage=rfs, result_serializer="json") def do_work(): return {"status": "work completed"} @flow(result_storage=rfs, result_serializer="json") def my_flow(): do_work() return {"status": "flow completed"} my_flow()
m
thought for 450.3 seconds
Yes! You can control the filename/key used for persisted results. - For tasks: use the
result_storage_key
parameter on the
@task
decorator. It supports runtime templating with flow/task run info. - For flows: there’s no
result_storage_key
parameter. If you need a specific key, write to the result store yourself inside the flow using the runtime context. Task example (using flow run id + task run id):
Copy code
from prefect import task
from prefect.filesystems import RemoteFileSystem

rfs = RemoteFileSystem.load("prefect-results-minio-s3")

@task(
    result_storage=rfs,
    result_serializer="json",
    # basepath is <s3://prefect-test/results/>, so don’t repeat "results/" here
    result_storage_key="{flow_run.id}/{task_run.id}.json",
)
def do_work():
    return {"status": "work completed"}
Flow example (manual write using the run context):
Copy code
from prefect import flow
from prefect.context import get_run_context

@flow(result_storage=rfs, result_serializer="json")
def my_flow():
    res = do_work()
    # manually persist something under a key with the flow run id
    ctx = get_run_context()
    key = f"{ctx.flow_run.id}/flow-summary.json"
    ctx.result_store.write({"status": "flow completed"}, key=key)
    return {"status": "flow completed"}  # the flow’s own persisted return uses an auto key
Notes: - Don’t include angle brackets in your config values (use
<s3://prefect-test/results/>
and `http://host.docker.internal:9900`). - You can also use other fields, e.g.
{task_run.name}
,
{flow_run.name}
, or task parameters:
{parameters.some_param}
. - Since your
basepath
is
<s3://prefect-test/results/>
, the final object keys will be like: -
<s3://prefect-test/results/<flow_run_id>/<task_run_id>.json>
Docs: - Results (Prefect 3.x) - Runtime context