oleander-whirlwind
10/13/2025, 6:58 AMMarvin
10/13/2025, 6:58 AMMarvin
10/13/2025, 7:00 AMimport os
import sys
import numpy as np
import torch
import torch.nn as nn
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner
# Simple tiny model for demo
class TinyNet(nn.Module):
def __init__(self, d_in=4, d_out=2):
super().__init__()
<http://self.net|self.net> = nn.Sequential(
nn.Linear(d_in, d_out),
nn.ReLU(),
)
def forward(self, x):
return <http://self.net|self.net>(x)
# Task that expects a ready-to-use model and tensor
@task
def infer(model: nn.Module, x: torch.Tensor) -> float:
with torch.no_grad():
y = model(x)
return float(y.sum().item())
# Workaround task: load model inside the process and reconstruct tensors from CPU-friendly inputs
@task
def load_and_infer_from_path(model_path: str, x_np: np.ndarray, device: str = "cuda") -> float:
model = TinyNet().to(device)
state = torch.load(model_path, map_location=device)
model.load_state_dict(state)
x = torch.from_numpy(x_np).to(device)
with torch.no_grad():
y = model(x)
return float(y.sum().item())
# ThreadPool: OK (same process, pass-by-reference is fine)
@flow(name="thread_ok", task_runner=ThreadPoolTaskRunner(max_workers=4))
def thread_flow() -> list[float]:
device = "cuda"
model = TinyNet().to(device)
x = torch.randn(32, 4, device=device)
futures = [infer.submit(model, x) for _ in range(2)]
return [f.result() for f in futures]
# ProcessPool: expected to FAIL (separate processes require pickling; live CUDA objects are not picklable)
@flow(name="process_fails", task_runner=ProcessPoolTaskRunner(max_workers=2))
def process_flow_should_fail() -> list[float]:
device = "cuda"
model = TinyNet().to(device)
x = torch.randn(32, 4, device=device)
futures = [infer.submit(model, x) for _ in range(2)]
return [f.result() for f in futures]
# ProcessPool workaround: pass CPU/serializable inputs and load model inside each process
@flow(name="process_workaround", task_runner=ProcessPoolTaskRunner(max_workers=2))
def process_flow_workaround(model_path: str) -> list[float]:
# Use CPU-friendly payloads (e.g., numpy arrays)
x_np = np.random.randn(32, 4).astype("float32")
futures = [load_and_infer_from_path.submit(model_path, x_np) for _ in range(2)]
return [f.result() for f in futures]
if __name__ == "__main__":
if not torch.cuda.is_available():
print("CUDA is not available on this machine. This demo requires a GPU.")
sys.exit(0)
# Save a CPU state dict for the workaround
model_cpu = TinyNet()
model_path = "<http://model.pt|model.pt>"
torch.save(model_cpu.state_dict(), model_path)
print("Running thread_flow (should succeed)...")
thread_results = thread_flow()
print("thread_flow results:", thread_results)
print("\nRunning process_flow_should_fail (should raise due to pickling/CUDA issues)...")
try:
process_results = process_flow_should_fail()
print("Unexpected success:", process_results)
except Exception as e:
print("Expected failure with ProcessPoolTaskRunner:", type(e).__name__, str(e)[:500])
print("\nRunning process_flow_workaround (should succeed by loading model in each process)...")
workaround_results = process_flow_workaround(model_path)
print("process_flow_workaround results:", workaround_results)
# Cleanup
if os.path.exists(model_path):
os.remove(model_path)
What this shows:Marvin
10/13/2025, 7:00 AMnn.Module
and torch.Tensor
across threads; inference works.
- ProcessPoolTaskRunner: Submitting tasks with live CUDA objects triggers pickling errors or CUDA context issues (expected).
- Workaround: Pass a model path and CPU-friendly arrays; load the model and move tensors to CUDA inside each process.
References:
- Prefect 3.x task runners concept
- prefect.task_runners sourceoleander-whirlwind
10/13/2025, 10:57 AMMarvin
10/13/2025, 10:57 AMMarvin
10/13/2025, 10:58 AMimport multiprocessing as mp, os, torch
print("flow pid:", os.getpid(), "start:", mp.get_start_method())
@task
def infer(model, x):
print("task pid:", os.getpid(), "start:", mp.get_start_method(), "cuda initialized:", torch.cuda.is_initialized())
with torch.no_grad():
return float(model(x).sum().item())
Force a failure (or at least force true process isolation)
- Enforce spawn so arguments must be pickled:
if __name__ == "__main__":
import multiprocessing as mp
mp.set_start_method("spawn", force=True)
process_flow_should_fail()
With spawn, you’re more likely to see pickling/initialization issues or massive overhead from shipping CUDA tensors/models to workers.
Recommended pattern for ProcessPool with CUDA
- Don’t pass live CUDA objects between processes. Instead:
- Pass a model path/config and load the model inside the task
- Pass CPU/serializable payloads (e.g., numpy arrays or bytes) and move to CUDA inside the task
- Alternatively, stick with ThreadPoolTaskRunner for single-process/multi-thread inference on one GPU
Minimal “safe” process example
@task
def load_and_infer(model_path: str, x_np):
import torch, numpy as np
device = "cuda"
model = TinyNet().to(device)
model.load_state_dict(torch.load(model_path, map_location=device))
x = torch.from_numpy(x_np).to(device)
with torch.no_grad():
return float(model(x).sum().item())
@flow(task_runner=ProcessPoolTaskRunner(max_workers=2))
def process_ok(model_path: str):
x_np = np.random.randn(32, 4).astype("float32")
futures = [load_and_infer.submit(model_path, x_np) for _ in range(4)]
return [f.result() for f in futures]
if __name__ == "__main__":
import multiprocessing as mp, torch
mp.set_start_method("spawn", force=True) # ensure real process isolation
model = TinyNet()
torch.save(model.state_dict(), "<http://model.pt|model.pt>")
process_ok("<http://model.pt|model.pt>")
References
- Prefect task runners in 3.x: Task runners concept
- Source for task runners: prefect.task_runners
- PyTorch multiprocessing and CUDA guidance (avoid fork; use spawn): PyTorch multiprocessing notesoleander-whirlwind
10/13/2025, 11:03 AMMarvin
10/13/2025, 11:03 AMMarvin
10/13/2025, 11:04 AMimport os, multiprocessing as mp, torch
@task
def infer(model, x):
print({
"task_pid": os.getpid(),
"cuda_initialized": torch.cuda.is_initialized(),
"x_device": str(x.device),
"x_ptr": x.data_ptr(), # different per process
"model_id": id(model), # different per reconstruction
})
with torch.no_grad():
return float(model(x).sum().item())
if __name__ == "__main__":
mp.set_start_method("spawn", force=True)
process_flow_should_fail() # your flow
You should see different PIDs and different data_ptr()
values across workers, showing new CUDA allocations per process.
Why this pattern is risky/inefficient
- GPU memory is duplicated per process; each task call serializes the model/tensor and re-allocates GPU memory in the worker.
- Significant overhead (GPU↔️CPU copies during pickling/unpickling), potential fragmentation, and instability under load.
- It may start failing when models/tensors are large or concurrency grows.
Safer process-based pattern: load per worker, pass CPU-friendly data
# per-worker cache
_worker = {"model": None, "device": None}
@task
def infer_cached(model_path: str, x_np, device: str = "cuda"):
import torch, numpy as np
if _worker["model"] is None or _worker["device"] != device:
m = TinyNet().to(device).eval()
m.load_state_dict(torch.load(model_path, map_location=device))
_worker["model"] = m
_worker["device"] = device
x = torch.from_numpy(x_np).to(device)
with torch.no_grad():
return float(_worker<x|"model">.sum().item())
@flow(task_runner=ProcessPoolTaskRunner(max_workers=2))
def process_ok(model_path: str):
x_np = np.random.randn(32, 4).astype("float32")
futs = [infer_cached.submit(model_path, x_np) for _ in range(8)]
return [f.result() for f in futs]
- Each worker initializes CUDA and loads the model once, then reuses it.
- Only CPU-serializable payloads are sent between processes.
When to use which:
- ThreadPoolTaskRunner: Best if you want to share a single GPU-resident model across concurrent tasks in the same process (pure inference is often thread-safe).
- ProcessPoolTaskRunner: Only if you really need separate processes; use per-worker init to avoid re-sending live CUDA objects.
References:
- Prefect 3.x task runners: Task runners concept
- PyTorch multiprocessing and CUDA: PyTorch multiprocessing notesoleander-whirlwind
10/13/2025, 11:16 AMMarvin
10/13/2025, 11:16 AMMarvin
10/13/2025, 11:18 AMmap_location="cuda"
or .to("cuda")
.
- Caveats:
- Redis stores data in RAM. Large models will be costly and may hit Redis’s 512 MB per-string limit. Many modern models exceed this.
- You’ll pay serialization/deserialization costs on every load unless you cache per worker.
- Object storage (S3/GCS/Azure Blob) is generally a better fit for large binaries. Use Redis as a pointer/version registry, not the payload.
Minimal pattern with redis-py (recommended over inventing block APIs)
- One-time “publish” to Redis:
# one-time setup (outside Prefect)
import io, torch, redis
# your model defined somewhere importable by workers
state = TinyNet().cpu().state_dict()
buf = io.BytesIO()
torch.save(state, buf) # CPU tensors; smaller and portable
r = redis.from_url("<redis://localhost:6379/0>")
r.set("model:tinynet:v1", buf.getvalue())
- Prefect 3.x flow/tasks: lazy-load the model once per worker and reuse
import os, io, numpy as np, torch
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
# simple worker-local cache
_worker = {"model": None, "device": None}
class TinyNet(torch.nn.Module):
def __init__(self, d_in=4, d_out=2):
super().__init__()
<http://self.net|self.net> = torch.nn.Sequential(torch.nn.Linear(d_in, d_out), torch.nn.ReLU())
def forward(self, x): return <http://self.net|self.net>(x)
@task
def infer_from_redis(x_np, redis_url="<redis://localhost:6379/0>", key="model:tinynet:v1", device="cuda"):
import redis
if _worker["model"] is None or _worker["device"] != device:
r = redis.from_url(redis_url)
blob = r.get(key)
if blob is None:
raise RuntimeError(f"Missing key in Redis: {key}")
state = torch.load(io.BytesIO(blob), map_location=device)
m = TinyNet().to(device).eval()
m.load_state_dict(state)
_worker["model"] = m
_worker["device"] = device
x = torch.from_numpy(x_np).to(device)
with torch.no_grad():
return float(_worker<x|"model">.sum().item())
@flow(task_runner=ProcessPoolTaskRunner(max_workers=2))
def process_pool_infer():
x_np = np.random.randn(32, 4).astype("float32")
futures = [infer_from_redis.submit(x_np) for _ in range(4)]
return [f.result() for f in futures]
if __name__ == "__main__":
# If you want to force true process isolation on Unix:
# import multiprocessing as mp; mp.set_start_method("spawn", force=True)
print(process_pool_infer())
Notes
- Keep the saved weights on CPU when serializing; load with map_location="cuda"
or transfer after load. This avoids embedding device-specific state in the payload.
- Cache the model per worker to avoid pulling from Redis on each task call.
Should I use Redis for this?
- For small models or as a fast cache: fine.
- For large models: use object storage (S3/GCS/Azure) for the bytes and Redis only for a pointer/version. Prefect has first-class blocks for these storages.
Prefect + Redis specifics
- Prefect 3.x includes a Redis block you can configure to store connection settings, but it does not run Redis for you.
- You still need a Redis server and typically the redis-py client installed in your environment.
Docs
- Prefect Redis integration: Redis integration
- Redis block API: prefect.blocks.redisMarvin
10/13/2025, 11:18 AMoleander-whirlwind
10/13/2025, 11:21 AMMarvin
10/13/2025, 11:21 AMMarvin
10/13/2025, 11:22 AMimport torch, numpy as np
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
class TinyNet(torch.nn.Module):
def __init__(self, d_in=4, d_out=2):
super().__init__()
<http://self.net|self.net> = torch.nn.Sequential(torch.nn.Linear(d_in, d_out), torch.nn.ReLU())
def forward(self, x): return <http://self.net|self.net>(x)
@task
def infer(model, x):
with torch.no_grad():
return float(model(x).sum().item())
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def gpu_infer_threadpool():
device = "cuda"
model = TinyNet().to(device).eval()
batches = [torch.randn(32, 4, device=device) for _ in range(16)]
futures = [infer.submit(model, x) for x in batches]
return [f.result() for f in futures]
if __name__ == "__main__":
print(gpu_infer_threadpool())
2) Multi-GPU scaling
- Run one process per GPU. Each process loads its own model once and serves requests locally.
- With Prefect, create multiple runs/workers, each pinned to a GPU (e.g., set CUDA_VISIBLE_DEVICES per worker/pod/container). Inside each run, use ThreadPoolTaskRunner or a single-task batching loop.
- Avoid passing live CUDA objects across process boundaries. If you really need processes, use a per-worker cache: load model once per worker and only pass CPU-serializable inputs (numpy/bytes).
3) CPU-only or pure Python that does not release the GIL
- ProcessPoolTaskRunner can help if your workload is CPU-bound Python that doesn’t release the GIL. PyTorch ops usually release the GIL, so threads are typically fine.
Why not ProcessPool for GPU inference?
- Each process creates its own CUDA context and duplicates GPU memory.
- Passing models/tensors as task arguments triggers serialization and re-allocation in workers, which is expensive and can OOM at scale.
- Even with spawn, small demos may “work” but will not be efficient or stable under load.
Batching is king
- For throughput, do fewer Prefect tasks and more intra-task batching on the GPU.
- A common pattern: one task loads the model once and loops over a data iterator in mini-batches.
If you need a distributed GPU scheduler
- Keep Prefect as the orchestrator and use a GPU-serving layer (e.g., Triton/TorchServe) or a GPU framework (e.g., Ray with GPU resources) inside a task. Prefect coordinates jobs; the service or framework handles GPU scheduling.
Docs
- Task runners overview: Prefect 3.x task runners concept
If you share a bit about your target throughput, number of GPUs, and deployment environment (local, k8s, VMs), I can sketch an architecture and a concrete Prefect deployment layout for it.oleander-whirlwind
10/13/2025, 11:36 AMMarvin
10/13/2025, 11:36 AMMarvin
10/13/2025, 11:37 AM# app.py
import torch
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class TinyNet(torch.nn.Module):
def __init__(self, d_in=4, d_out=2):
super().__init__()
<http://self.net|self.net> = torch.nn.Sequential(torch.nn.Linear(d_in, d_out), torch.nn.ReLU())
def forward(self, x): return <http://self.net|self.net>(x)
device = "cuda"
model = TinyNet().to(device).eval()
warmup = torch.randn(32, 4, device=device)
with torch.inference_mode(): model(warmup) # warm-up
class Inp(BaseModel):
x: list[list[float]] # small example; in prod prefer binary/npz
@app.post("/infer")
def infer(inp: Inp):
x = torch.tensor(inp.x, dtype=torch.float32, device=device)
with torch.inference_mode():
y = model(x)
return {"sum": float(y.sum().item())}
Run it:
uvicorn app:app --host 0.0.0.0 --port 8000 --workers 1
- One worker per GPU is typical; use multiple threads if ops release the GIL. For conv nets, also consider torch.backends.cudnn.benchmark = True
if input shapes are stable.
2) Prefect flow calls the service
import httpx, numpy as np
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
@task
def call_model_server(payload):
with httpx.Client(timeout=5.0) as client:
resp = <http://client.post|client.post>("<http://model-server:8000/infer>", json=payload)
resp.raise_for_status()
return resp.json()
@flow(task_runner=ThreadPoolTaskRunner(max_workers=10))
def orchestrate_inference():
batches = [np.random.randn(32, 4).astype("float32").tolist() for _ in range(8)]
futures = [call_model_server.submit({"x": b}) for b in batches]
return [f.result() for f in futures]
Notes and best practices
- Multi-GPU: Run one model-server container per GPU (set CUDA_VISIBLE_DEVICES per container). Have Prefect route work accordingly or use a load-balancer aware of GPU capacity.
- Batching: Let the server batch internally (critical for throughput). Triton/TorchServe do this out-of-the-box; you can implement request coalescing in FastAPI too.
- Inputs: Keep payloads small. For large tensors, stage them in object storage and send pointers, or use an internal network file/share.
- Warm start: Preload weights on startup, run a warm-up forward pass, and keep the process alive. Use liveness/readiness probes to ensure the GPU model is ready.
- RAM caching: If you have a CPU RAM cache (e.g., Redis, memcached) for weights, use it only at startup; don’t fetch per request. For large models, object storage is a better source of truth and Redis can store pointers/versions.
- Avoid ProcessPoolTaskRunner for GPU inference: it duplicates CUDA contexts/memory and adds serialization overhead. ThreadPoolTaskRunner is fine for client-side concurrency when calling a server.
References
- Prefect task runners overview: Task runners conceptMarvin
10/13/2025, 11:37 AMoleander-whirlwind
10/13/2025, 12:01 PMMarvin
10/13/2025, 12:01 PMMarvin
10/13/2025, 12:02 PMimport multiprocessing as mp
import numpy as np
import torch
import torch.nn as nn
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
# Per-worker cache (each process has its own copy)
_worker = {"model": None}
class TinyNet(nn.Module):
def __init__(self, d_in=4, d_out=2):
super().__init__()
<http://self.net|self.net> = nn.Sequential(nn.Linear(d_in, d_out), nn.ReLU())
def forward(self, x): return <http://self.net|self.net>(x)
@task
def infer_cpu_cached(x_np: np.ndarray) -> float:
# Lazy init once per worker process
if _worker["model"] is None:
m = TinyNet().to("cpu").eval()
# load weights if you have them, e.g., m.load_state_dict(torch.load("<http://model.pt|model.pt>", map_location="cpu"))
_worker["model"] = m
x = torch.from_numpy(x_np).to("cpu")
with torch.no_grad():
y = _worker<x|"model">
return float(y.sum().item())
@flow(task_runner=ProcessPoolTaskRunner(max_workers=4))
def cpu_infer_flow():
batches = [np.random.randn(32, 4).astype("float32") for _ in range(16)]
futs = [infer_cpu_cached.submit(b) for b in batches]
return [f.result() for f in futs]
if __name__ == "__main__":
mp.set_start_method("spawn", force=True) # ensure real process isolation
print(cpu_infer_flow())
Preferred patterns for GPU inference
- Single GPU: Keep a single long-lived process with the model resident on the GPU; use ThreadPoolTaskRunner or, better, run a model server (FastAPI/Triton/TorchServe) and have Prefect call it.
- Multi GPU: One process/container per GPU, model loaded once per container; coordinate with Prefect at the job level, not with a process pool inside a single run.
Rule of thumb
- GPU inference: use threads in one process or an external model server.
- Need process-based parallelism? Do it for CPU-only stages, isolation, or unstable code — not to share a single GPU across multiple worker processes.
Refs
- Prefect task runners overview: Task runners concept
- PyTorch multiprocessing guidance: PyTorch multiprocessing notesoleander-whirlwind
10/13/2025, 12:06 PMMarvin
10/13/2025, 12:06 PMMarvin
10/13/2025, 12:07 PMimport torch
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
class TinyNet(torch.nn.Module):
def __init__(self):
super().__init__()
<http://self.net|self.net> = torch.nn.Sequential(torch.nn.Linear(4, 2), torch.nn.ReLU())
def forward(self, x): return <http://self.net|self.net>(x)
@task
def infer(model, x):
with torch.no_grad():
return float(model(x).sum().item())
@flow(task_runner=ThreadPoolTaskRunner(max_workers=8))
def two_gpu_threads():
m0 = TinyNet().to("cuda:0").eval()
m1 = TinyNet().to("cuda:1").eval()
# Build device-specific batches
b0 = [torch.randn(32, 4, device="cuda:0") for _ in range(8)]
b1 = [torch.randn(32, 4, device="cuda:1") for _ in range(8)]
futures = [infer.submit(m0, x) for x in b0] + [infer.submit(m1, x) for x in b1]
return [f.result() for f in futures]
When to use: Single service/process managing multiple GPUs is okay for inference; PyTorch handles targeting cuda:0 and cuda:1 cleanly. This avoids process-spawn cost and duplicate CUDA contexts.
Pattern B: One process per GPU (isolation; also common)
- Run one long-lived process per GPU, each loading the model once.
- In Prefect, either:
- Create two deployments (same code), pin each to a GPU via environment (CUDA_VISIBLE_DEVICES) and run them concurrently; or
- Use a parent flow with a ProcessPoolTaskRunner to start one task per GPU. Inside each task, set the GPU ID, load the model once, then process a list of requests. Optionally use plain Python threads inside the task for per-GPU concurrency.
Example (per-GPU worker tasks; no cross-process model passing)
import os, numpy as np, torch
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
class TinyNet(torch.nn.Module):
def __init__(self):
super().__init__()
<http://self.net|self.net> = torch.nn.Sequential(torch.nn.Linear(4, 2), torch.nn.ReLU())
def forward(self, x): return <http://self.net|self.net>(x)
@task
def gpu_worker(gpu_id: int, batches: list[np.ndarray]) -> list[float]:
os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id) # pin process to a single GPU
device = "cuda:0" # becomes the visible GPU in this process
model = TinyNet().to(device).eval()
with torch.no_grad():
out = []
for b in batches:
x = torch.from_numpy(b).to(device)
out.append(float(model(x).sum().item()))
return out
@flow(task_runner=ProcessPoolTaskRunner(max_workers=2))
def two_gpu_processes():
b0 = [np.random.randn(32, 4).astype("float32") for _ in range(8)]
b1 = [np.random.randn(32, 4).astype("float32") for _ in range(8)]
f0 = gpu_worker.submit(0, b0)
f1 = gpu_worker.submit(1, b1)
return f0.result() + f1.result()
Notes:
- We only pass CPU-serializable arrays to processes; each process loads the model once and keeps it on its GPU. No pickling of live CUDA objects.
- If you want per-GPU concurrency, do it inside gpu_worker with a Python thread pool and a shared model.
Is nesting ThreadPoolTaskRunner inside ProcessPoolTaskRunner dumb?
- Don’t try to “nest” task runners in one flow. Instead, structure as above:
- Either one flow with ThreadPoolTaskRunner managing both GPUs in-process, or
- A parent flow with ProcessPoolTaskRunner starting one worker task per GPU; each worker can use regular threads internally if needed.
- If your goal is serving, a cleaner approach is one model-serving container per GPU and Prefect orchestrates requests/jobs to each service.
Scaling beyond two GPUsMarvin
10/13/2025, 12:07 PMoleander-whirlwind
10/13/2025, 1:30 PMMarvin
10/13/2025, 1:31 PMMarvin
10/13/2025, 1:33 PM@task
help you get resilience, speed, and observability for training/inference. Here are the ones teams use most and why they matter:
Core reliability and performance
- retries, retry_delay_seconds, retry_jitter_factor
- Why: absorb transient failures (e.g., dataloader hiccups, brief GPU OOM/retry after GC, flaky network fetches).
- Example:
@task(retries=5, retry_delay_seconds=[1, 2, 4, 8, 16])
def fetch_batch(...): ...
- timeout_seconds
- Why: bound worst-case latency; fail-fast or reschedule if a step hangs (I/O stall, deadlock).
- Example:
@task(timeout_seconds=600)
def train_epoch(...): ...
- cache_key_fn, cache_expiration, refresh_cache, cache_result_in_memory
- Why: memoize deterministic work (tokenization, feature extraction, small inference outputs) to save time/money.
- Tips: only cache small/serializable results; set cache_result_in_memory=False if outputs are large to avoid RAM bloat.
- Example:
import hashlib, json, datetime as dt
def hash_inputs(ctx, params):
keyable = {"model_ver": params["model_ver"], "inputs": params["inputs"]}
return hashlib.sha256(json.dumps(keyable, sort_keys=True).encode()).hexdigest()
@task(cache_key_fn=hash_inputs, cache_expiration=dt.timedelta(hours=12), cache_result_in_memory=False)
def preprocess_or_infer(inputs, model_ver: str): ...
Results and artifacts
- persist_result, result_storage, result_storage_key, result_serializer
- Why: persist metrics, predictions, and small artifacts to object storage (S3/GCS/Azure) so downstream tasks/runs can fetch them.
- Recommended: persist small JSON/CSV metrics via Prefect results; store big tensors/models with your storage SDK (S3/GCS) directly in the task.
- Example:
from prefect.results import ResultStorage
from prefect.results.serializers import JSONSerializer
rs = ResultStorage.from_url("<s3://my-bucket/prefix>") # configure creds via env/blocks
@task(persist_result=True, result_storage=rs, result_storage_key="predictions/{run_id}.json", result_serializer=JSONSerializer())
def write_preds(preds): return {"n": len(preds)}
Observability and control
- log_prints
- Why: capture training logs, batch/epoch metrics, and kernel warnings in Prefect logs.
- Example:
@task(log_prints=True)
def train_step(...): print("loss=", loss)
- tags, version, task_run_name
- Why: slice/filter in the UI and attach lineage to model/data versions.
- Examples:
@task(tags={"inference", "cuda"}, version="model=v2")
def infer(...): ...
@task(task_run_name=lambda p: f"infer-{p['model_ver']}-{p['batch_id']}")
def infer(...): ...
- on_completion, on_failure
- Why: notify, push metrics to MLflow/W&B, free external resources, or write checkpoints on failure.
- Example:
def notify_failure(state, obj):
# post to Slack / record incident
...
@task(on_failure=[notify_failure])
def train_epoch(...): ...
Putting it together (inference-friendly example)
```
import datetime as dt, hashlib, json, numpy as np, torch
from prefect import task
def hash_payload(ctx, params):
# Use a stable, small representation (IDs, hashes, or shapes), not raw tensors
keyable = {"model_ver": params["model_ver"], "inputs": params["inputs"]}
return hashlib.sha256(json.dumps(keyable, sort_keys=True).encode()).hexdigest()
@task(
retries=3,
retry_delay_seconds=[1, 2, 5],
timeout_seconds=30,
cache_key_fn=hash_payload,
cache_expiration=dt.timedelta(hours=1),
cache_result_in_memory=False,
log_prints=True,
tags={"inference", "cuda"},
version="model=v2",
task_run_name=lambda p: f"infer-v2-batch-{p['batch_id']}",
)
def run_inference(model, inputs, model_ver: str, batch_id: int):
with torch.inference_mode():
x = torch.tensor(inputs, dtype=torch.float32, device=next(model.parameters()).device)Marvin
10/13/2025, 1:33 PMtask_runner
is configured on the @flow
, not on @task
.
- Avoid caching or persisting giant tensors via Prefect results; use object storage for heavy binaries and keep Prefect results to metadata.
Docs
- Caching: Caching in Prefect 3.x
- Results API: Results and serializers
- Task decorator source (parameter reference): @task sourceoleander-whirlwind
10/13/2025, 1:42 PMfrom prefect.results import ResultStorage
from prefect.results.serializers import JSONSerializer
rs = ResultStorage.from_url("<s3://my-bucket/prefix>") # configure creds via env/blocks
@task(persist_result=True, result_storage=rs, result_storage_key="predictions/{run_id}.json", result_serializer=JSONSerializer())
def write_preds(preds): return {"n": len(preds)}
Marvin
10/13/2025, 1:42 PMMarvin
10/13/2025, 1:43 PMprefect[aws]
and prefect-aws
installed.
- Replace bucket/prefix values as needed.
```
import io
import os
import json
import time
import numpy as np
import torch
import torch.nn as nn
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
# Prefect results to S3 (for small JSON-like objects)
from prefect.results import ResultStore
from prefect.serializers import JSONSerializer
# Direct S3 artifact handling (for big binaries)
from prefect_aws.s3 import S3Bucket
# ----- Model definition -----
class TinyNet(nn.Module):
def __init__(self, d_in=4, d_out=2):
super().__init__()
self.net = nn.Sequential(nn.Linear(d_in, d_out), nn.ReLU())
def forward(self, x): return self.net(x)
# ----- Configure result storage (small JSON) -----
# Store small results as JSON under s3://my-bucket/prefect-results/
RESULTS_BUCKET_URL = "s3://my-bucket/prefect-results/"
result_store = ResultStore(
# ResultStore can take a RemoteFileSystem; using URL shorthand:
result_storage=None, # let Prefect resolve from URL below
serializer=JSONSerializer(),
)
# When using the decorator, we’ll pass result_storage=ResultStore.from_url(...).
# Using from_url gives a quick shorthand:
result_store = ResultStore.from_url(RESULTS_BUCKET_URL, serializer=JSONSerializer())
# ----- Configure an S3Bucket block client for large artifacts -----
# You can also create and load a saved block by name, but direct instantiation works with env creds.
artifacts_bucket = S3Bucket(bucket_name="my-bucket", basepath="artifacts/")
# ----- Tasks -----
@task
def train_and_save_model(ckpt_key: str) -> dict:
model = TinyNet().to("cpu").train()
# Fake training
for _ in range(2):
x = torch.randn(16, 4)
y = model(x).sum()
y.backward()
for p in model.parameters():
p.data -= 0.001 * p.grad
p.grad = None
# Save CPU state_dict to bytes
buf = io.BytesIO()
torch.save(model.state_dict(), buf)
buf.seek(0)
# Upload checkpoint to S3
# Example key: artifacts/models/run-123/model.pt
artifacts_bucket.upload_from_file_object(file_object=buf, to_path=ckpt_key)
return {
"checkpoint_s3": f"s3://{artifacts_bucket.bucket_name}/{artifacts_bucket.basepath}{ckpt_key}",
"sha": None, # optionally compute hash
}
@task
def generate_predictions(ckpt_key: str, n_batches: int = 3) -> tuple[list[list[float]], dict]:
# Download checkpoint
buf = io.BytesIO()
artifacts_bucket.download_object(ckpt_key, buf)
buf.seek(0)
# Load model
model = TinyNet().to("cpu").eval()
state = torch.load(buf, map_location="cpu")
model.load_state_dict(state)
# Predict
preds = []
for _ in range(n_batches):
x = torch.randn(32, 4)
with torch.no_grad():
y = model(x)
preds.append(y.softmax(-1).numpy().tolist())
# Example metrics
metrics = {"mean_prob": float(np.mean([np.mean(b) for b in preds]))}
return preds, metrics
@task
def upload_numpy_artifact(arr: np.ndarray, key: str) -> str:
# Save ndarry to npy bytes and upload
buf = io.BytesIO()
np.save(buf, arr)
buf.seek(0)
artifacts_bucket.upload_from_file_object(buf, key)
return f"s3://{artifacts_bucket.bucket_name}/{artifacts_bucket.basepath}{key}"
# Persist small JSON results via Prefect Results to S3
@task(
persist_result=True,
result_storage=result_store,
result_storage_key="predictions/{run_id}.json",
result_serializer=JSONSerializer(),
)
def persist_small_predictions(preds: list[list[float]]) -> dict:Marvin
10/13/2025, 1:43 PMpersist_result=True
with ResultStore.from_url("s3://...")
and JSONSerializer()
writes task returns to S3 as JSON.
- Large artifacts: Use prefect-aws
S3Bucket to upload/download bytes directly; tasks return only URIs/metadata.
- You can mix and match: keep small metadata in Prefect’s results, large binaries in S3.
Useful links
- Prefect results overview: Results API
- Serializers: Serializers
- prefect-aws S3 utilities and block: prefect-aws S3oleander-whirlwind
10/13/2025, 1:44 PMMarvin
10/13/2025, 1:44 PMMarvin
10/13/2025, 1:46 PMS3Bucket
block configured for MinIO (endpoint + creds), then load
the block in code.
Prep
- Create a Prefect block: S3Bucket named minio-artifacts
with:
- Bucket name: your MinIO bucket (e.g., my-minio-bucket
)
- Basepath: artifacts/
(optional)
- Credentials: MinIO credentials
- Endpoint URL: your MinIO endpoint (e.g., `http://localhost:9000`)
- Ensure these environment variables are set where your flow runs:
- AWS_ACCESS_KEY_ID
, AWS_SECRET_ACCESS_KEY
, AWS_DEFAULT_REGION=us-east-1
- AWS_ENDPOINT_URL=<http://localhost:9000>
(your MinIO endpoint)
Example
```
import io
import os
import json
import time
import numpy as np
import torch
import torch.nn as nn
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
# Prefect Results to MinIO via S3-compatible URL
from prefect.results import ResultStore
from prefect.serializers import JSONSerializer
# MinIO artifacts via Prefect AWS S3 bucket block
from prefect_aws.s3 import S3Bucket
# ----- Ensure MinIO env is present (or set in your deployment/worker env) -----
# os.environ["AWS_ACCESS_KEY_ID"] = "minio"
# os.environ["AWS_SECRET_ACCESS_KEY"] = "miniosecret"
# os.environ["AWS_DEFAULT_REGION"] = "us-east-1"
# os.environ["AWS_ENDPOINT_URL"] = "http://localhost:9000"
# ----- Model -----
class TinyNet(nn.Module):
def __init__(self, d_in=4, d_out=2):
super().__init__()
self.net = nn.Sequential(nn.Linear(d_in, d_out), nn.ReLU())
def forward(self, x): return self.net(x)
# ----- Prefect Results store (small JSON) to MinIO -----
# IMPORTANT: s3fs/boto will target MinIO using AWS_ENDPOINT_URL
RESULTS_URL = "s3://my-minio-bucket/prefect-results/"
result_store = ResultStore.from_url(RESULTS_URL, serializer=JSONSerializer())
# ----- S3Bucket block (large artifacts) configured for MinIO in Prefect UI -----
artifacts_bucket = S3Bucket.load("minio-artifacts")
@task
def train_and_save_model(ckpt_key: str) -> dict:
model = TinyNet().to("cpu").train()
# Fake "training"
for _ in range(2):
x = torch.randn(16, 4)
y = model(x).sum()
y.backward()
for p in model.parameters():
p.data -= 0.001 * p.grad
p.grad = None
# Save CPU state_dict to bytes
buf = io.BytesIO()
torch.save(model.state_dict(), buf)
buf.seek(0)
# Upload to MinIO via S3Bucket block
artifacts_bucket.upload_from_file_object(file_object=buf, to_path=ckpt_key)
return {
"checkpoint_s3": f"s3://{artifacts_bucket.bucket_name}/{artifacts_bucket.basepath}{ckpt_key}",
}
@task
def generate_predictions(ckpt_key: str, n_batches: int = 3) -> tuple[list[list[float]], dict]:
# Download checkpoint
buf = io.BytesIO()
artifacts_bucket.download_object(ckpt_key, buf)
buf.seek(0)
# Load model and run inference on CPU for demo
model = TinyNet().to("cpu").eval()
state = torch.load(buf, map_location="cpu")
model.load_state_dict(state)
preds = []
for _ in range(n_batches):
x = torch.randn(32, 4)
with torch.no_grad():
y = model(x)
preds.append(y.softmax(-1).numpy().tolist())
metrics = {"mean_prob": float(np.mean([np.mean(b) for b in preds]))}
return preds, metrics
@task
def upload_numpy_artifact(arr: np.ndarray, key: str) -> str:
buf = io.BytesIO()
np.save(buf, arr)
buf.seek(0)
artifacts_bucket.upload_from_file_object(buf, key)Marvin
10/13/2025, 1:46 PM<s3://my-minio-bucket/prefect-results/>
— MinIO is picked up via AWS_ENDPOINT_URL
.
- Large binaries (checkpoints, arrays) use the S3Bucket
block that’s explicitly configured for MinIO (endpoint + creds). No special code is required beyond S3Bucket.load("minio-artifacts")
.
Helpful links
- prefect-aws (S3, MinIO creds): prefect-aws
- Serializers and results: Serializers and Results API
If you prefer not to rely on environment variables for Results, we can switch to storing “results” via the S3Bucket block as JSON files as well — just say the word and I’ll show that version.oleander-whirlwind
10/22/2025, 7:04 AMMarvin
10/22/2025, 7:04 AMMarvin
10/22/2025, 7:12 AMpip install "prefect-aws"
2) Create and save an S3Bucket block configured for MinIO:
from prefect_aws.s3 import S3Bucket
from prefect_aws.credentials import AwsCredentials
from prefect_aws.client_parameters import AwsClientParameters
# MinIO credentials (use your values)
creds = AwsCredentials(
aws_access_key_id="minio",
aws_secret_access_key="minio123",
)
# Point the AWS client at MinIO
client_params = AwsClientParameters(
endpoint_url="<http://localhost:9000>", # or <https://your-minio.example.com>
region_name="us-east-1",
# If you're using self-signed certs locally, you may need:
# verify=False
# Some MinIO setups also need path-style addressing; if required in your env,
# set the botocore config accordingly in AwsClientParameters (addressing_style='path').
)
s3_block = S3Bucket(
bucket_name="my-bucket",
credentials=creds,
aws_client_parameters=client_params,
)
s3_block.save("minio-bucket", overwrite=True)
3) Use it in your flows (no code changes needed when you later update the block’s endpoint):
from prefect import flow
from prefect_aws.s3 import S3Bucket
@flow
def example():
s3 = S3Bucket.load("minio-bucket")
# Upload a file
s3.upload_from_path("local/path/file.parquet", "artifacts/file.parquet")
# Download a file
s3.download_object_to_path("artifacts/file.parquet", "local/path/file.parquet")
When you’re ready to switch from local MinIO to another endpoint (or to AWS S3 later), just edit the saved block (in the UI or via code) to change endpoint_url
and credentials — your flow code that loads S3Bucket.load("minio-bucket")
stays the same.
Notes and tips:
- For MinIO, you often need path‑style addressing; if your setup requires it, configure the client to use addressing_style='path'
.
- If you’re using TLS with a self‑signed cert locally, set verify=False
(or provide a CA bundle path) in the client parameters.
- Prefect also exposes a MinIOCredentials
block if you prefer that over AwsCredentials
, but S3Bucket
+ AwsCredentials
+ endpoint_url
is a common, simple setup.
If your goal is to store flow code for deployments in MinIO as well, you can use this same S3Bucket block as your deployment storage so changing the endpoint later is just a block update. If that’s your plan, I can share a minimal deployment example tailored to your setup.
Docs:
- Prefect AWS integration (v3)
- Blocks in Prefect 3.x
- Store flow code in remote storage
Quick check: are you planning to use MinIO just for files/artifacts during runs, or also as storage for your deployment’s flow code? If you share that (and whether your MinIO uses TLS), I’ll provide the exact config you need.oleander-whirlwind
10/22/2025, 7:15 AMMarvin
10/22/2025, 7:15 AMMarvin
10/22/2025, 7:19 AMResultStore
doesn’t have a from_url
method in Prefect 3.x. Instead, construct a ResultStore
with a filesystem block (local or remote) and a serializer.
Here are working patterns you can use.
Local (good for “I’m local for now”):
pip install "prefect>=3" # if not installed
from prefect.results import ResultStore
from prefect.filesystems import LocalFileSystem
from prefect.serializers import JSONSerializer
store = ResultStore(
result_storage=LocalFileSystem(basepath="/tmp/prefect-results"),
serializer=JSONSerializer(),
)
key = store.write({"x": 1})
rec = store.read(key)
print(rec.value) # {'x': 1}
MinIO via S3-compatible endpoint (switchable later by updating a saved block):
- Install s3fs (fsspec driver for s3://)
pip install s3fs
Create and save a RemoteFileSystem block pointing at MinIO:
from prefect.filesystems import RemoteFileSystem
rfs = RemoteFileSystem(
basepath="<s3://my-bucket/results/>", # still use s3://, just point the client at MinIO
settings={
"key": "minio", # your MinIO access key
"secret": "minio123", # your MinIO secret key
"client_kwargs": {
"endpoint_url": "<http://localhost:9000>" # or <https://minio.example.com>
},
"use_ssl": False, # set True if using TLS
# If your MinIO needs path-style addressing, uncomment:
# "config_kwargs": {"s3": {"addressing_style": "path"}},
},
)
rfs.save("minio-results", overwrite=True)
Use that block in your code (no URL parsing needed):
from prefect.results import ResultStore
from prefect.filesystems import RemoteFileSystem
from prefect.serializers import JSONSerializer
store = ResultStore(
result_storage=RemoteFileSystem.load("minio-results"),
serializer=JSONSerializer(),
)
key = store.write({"ok": True})
print(store.read(key).value)
Later, to move from local MinIO to another endpoint or AWS S3, just update the saved RemoteFileSystem
block (change endpoint_url
, creds, SSL) — your flow code that loads the block by name doesn’t change.
Notes:
- If you prefer, you can set result storage on tasks/flows directly:
from prefect import task
rfs = RemoteFileSystem.load("minio-results")
@task(result_storage=rfs, result_serializer="json")
def do_work():
return {"status": "done"}
- Artifacts and results are different in Prefect 3.x: results are data returned by tasks/flows; artifacts are UI-visible outputs. If you meant artifacts, I can show how to create them too.
Docs:
- Results (Prefect 3.x)
- Filesystems API
- Serializers API
If you share your exact MinIO setup (TLS or not, path-style needed or not), I’ll tailor the RemoteFileSystem.settings
for you.oleander-whirlwind
10/22/2025, 8:03 AMMarvin
10/22/2025, 8:03 AMMarvin
10/22/2025, 8:04 AMResultStore.read()
expects the key to be a string. Your key (or less likely, your basepath
) is bytes, which triggers this when Prefect tries to do urlpath.lstrip('/')
.
What to check/fix:
- Ensure the key you pass to store.read(key)
is a str, not bytes.
- If you persisted the key to disk or received it as bytes, decode it: key = key.decode()
before calling read
.
- Do not pass key=b"..."
to store.write(...)
. Keys must be strings.
- If you use a custom storage_key_fn
, make sure it returns a str.
- Ensure your filesystem block’s basepath
is a str (e.g., "<s3://my-bucket/results/>"
), not bytes.
Quick diagnostic:
print(type(key), repr(key)) # should be <class 'str'>
If it prints bytes, decode it:
if isinstance(key, (bytes, bytearray)):
key = key.decode("utf-8")
Minimal working example (local):
from prefect.results import ResultStore
from prefect.filesystems import LocalFileSystem
from prefect.serializers import JSONSerializer
store = ResultStore(
result_storage=LocalFileSystem(basepath="/tmp/prefect-results"),
serializer=JSONSerializer(),
)
key = store.write({"x": 1})
print(type(key), key) # should be str
rec = store.read(key)
print(rec.value) # {'x': 1}
MinIO (via s3fs) example:
# pip install s3fs
from prefect.results import ResultStore
from prefect.filesystems import RemoteFileSystem
from prefect.serializers import JSONSerializer
rfs = RemoteFileSystem(
basepath="<s3://my-bucket/results/>",
settings={
"key": "minio",
"secret": "minio123",
"client_kwargs": {"endpoint_url": "<http://localhost:9000>"},
"use_ssl": False,
# If needed:
# "config_kwargs": {"s3": {"addressing_style": "path"}},
},
)
store = ResultStore(result_storage=rfs, serializer=JSONSerializer())
key = store.write({"ok": True})
print(type(key), key) # should be str
print(store.read(key).value)
Common patterns that cause this error:
- Using key=b"...bytes..."
when calling store.write(...)
- Loading a saved key from a binary file or a message queue without decoding
- Returning bytes from a custom storage_key_fn
If you can share the few lines where key
is created (and how it’s stored/passed into read
), I can pinpoint the exact fix.
Docs:
- Results (Prefect 3.x)oleander-whirlwind
10/22/2025, 8:15 AMMarvin
10/22/2025, 8:15 AMMarvin
10/22/2025, 8:16 AMResultStore.write(...)
returns None. It does not return the key. You need to generate (or supply) a key yourself, pass it to write
, then use that same key to read
.
Correct pattern:
from uuid import uuid4
from prefect.results import ResultStore
from prefect.filesystems import RemoteFileSystem
from prefect.serializers import JSONSerializer
rfs = RemoteFileSystem(
basepath="<s3://my-bucket/results/>",
settings={
"key": "minio",
"secret": "minio123",
"client_kwargs": {"endpoint_url": "<http://localhost:9000>"},
"use_ssl": False,
# "config_kwargs": {"s3": {"addressing_style": "path"}}, # if your MinIO needs it
},
)
store = ResultStore(result_storage=rfs, serializer=JSONSerializer())
# Generate a key (you can use your own convention)
key = f"results/{uuid4()}.json" # or: key = store.storage_key_fn()
store.write({"ok": True}, key=key) # write returns None
rec = store.read(key)
print(rec.value) # {'ok': True}
If you want an automatic key generator:
- Use store.storage_key_fn()
to generate a key each time, or
- Provide a custom storage_key_fn
when constructing the `ResultStore`:
from uuid import uuid4
store = ResultStore(
result_storage=rfs,
serializer=JSONSerializer(),
storage_key_fn=lambda: f"results/{uuid4()}.json",
)
key = store.storage_key_fn()
store.write({"ok": True}, key=key)
print(store.read(key).value)
Quick sanity checks:
- print(type(key), key)
should show a string (not None, not bytes).
- Ensure your basepath
is a string like "<s3://my-bucket/results/>"
.
Docs:
- Results (Prefect 3.x)
- Filesystems API
- Serializers API
If you share your current snippet (how you build store
and the call to write
), I’ll adapt it precisely.oleander-whirlwind
10/22/2025, 8:43 AMMarvin
10/22/2025, 8:43 AMMarvin
10/22/2025, 8:51 AMresult_storage_key
parameter on the @task
decorator. It supports runtime templating with flow/task run info.
- For flows: there’s no result_storage_key
parameter. If you need a specific key, write to the result store yourself inside the flow using the runtime context.
Task example (using flow run id + task run id):
from prefect import task
from prefect.filesystems import RemoteFileSystem
rfs = RemoteFileSystem.load("prefect-results-minio-s3")
@task(
result_storage=rfs,
result_serializer="json",
# basepath is <s3://prefect-test/results/>, so don’t repeat "results/" here
result_storage_key="{flow_run.id}/{task_run.id}.json",
)
def do_work():
return {"status": "work completed"}
Flow example (manual write using the run context):
from prefect import flow
from prefect.context import get_run_context
@flow(result_storage=rfs, result_serializer="json")
def my_flow():
res = do_work()
# manually persist something under a key with the flow run id
ctx = get_run_context()
key = f"{ctx.flow_run.id}/flow-summary.json"
ctx.result_store.write({"status": "flow completed"}, key=key)
return {"status": "flow completed"} # the flow’s own persisted return uses an auto key
Notes:
- Don’t include angle brackets in your config values (use <s3://prefect-test/results/>
and `http://host.docker.internal:9900`).
- You can also use other fields, e.g. {task_run.name}
, {flow_run.name}
, or task parameters: {parameters.some_param}
.
- Since your basepath
is <s3://prefect-test/results/>
, the final object keys will be like:
- <s3://prefect-test/results/<flow_run_id>/<task_run_id>.json>
Docs:
- Results (Prefect 3.x)
- Runtime context