oleander-whirlwind
10/09/2025, 12:31 PMMarvin
10/09/2025, 12:31 PMMarvin
10/09/2025, 12:36 PMMarvin
10/09/2025, 12:36 PMMarvin
10/09/2025, 12:36 PMpersist_result, result_storage, and result_serializer on tasks/flows. This affects storing results for later retrieval but does not change runner-level transport requirements between tasks. See serializers in code: prefect/serializers.py
- Task runners API: prefect/task_runners.py
- Prefect 3 release notes: Prefect 3.0 release notes
Want me to add a variant that persists DataFrames to storage using PickleSerializer to show cross-run retrieval as well?oleander-whirlwind
10/09/2025, 1:42 PMMarvin
10/09/2025, 1:42 PMMarvin
10/09/2025, 1:44 PMMarvin
10/09/2025, 1:44 PMoleander-whirlwind
10/10/2025, 11:52 AMMarvin
10/10/2025, 11:52 AMMarvin
10/10/2025, 11:55 AMSerializationError before invoking your task function.
How to handle it
- Catch in the flow around the task call:
from prefect import flow, task, get_run_logger
from prefect.exceptions import SerializationError
@task
def do_work(cfg):
# construct heavy objects here instead of passing them in
...
@flow
def main(non_serializable_obj):
logger = get_run_logger()
try:
# Prefer passing simple, serializable data (e.g., dicts, IDs)
result = do_work({"host": non_serializable_obj.host})
except SerializationError:
logger.warning("Task input could not be serialized; applying fallback")
# fallback/skip logic here
- Don’t pass non-serializable objects. Pass IDs, paths, or small config dicts and construct the heavy object inside the task.
- If your failure is on the RETURN value of the task (not the input), use a result serializer that can handle complex objects:
from prefect import task
from prefect.serializers import PickleSerializer
@task(result_serializer=PickleSerializer())
def produce_complex_result(...):
...
Note: result_serializer controls how the task’s return value is stored, not how inputs are serialized.
- If you must pass a custom object, make it picklable (e.g., implement `__getstate__`/`__setstate__`) or convert it to a serializable proxy (dict/dataclass).
Useful references
- Serializers API: prefect.serializers
- Exceptions (includes SerializationError): prefect.exceptions source
If you can share the type of object you’re passing and the stack trace snippet, I can suggest the cleanest pattern (e.g., how to refactor or a serializer workaround) for your case.oleander-whirlwind
10/10/2025, 11:58 AMMarvin
10/10/2025, 11:58 AMMarvin
10/10/2025, 11:59 AMdefault
- "fallback": call fallback(*args, **kwargs) (can be a task or function)
- "raise": raise SerializationError
"""
if serializer in (None, "json"):
ser = JSONSerializer()
elif serializer == "pickle":
ser = PickleSerializer()
elif hasattr(serializer, "dumps"):
ser = serializer
else:
raise ValueError("serializer must be 'json', 'pickle', or a Serializer instance")
def decorator(callable_obj: Callable):
@wraps(callable_obj)
def wrapper(*args, **kwargs):
payload = {"args": _strip_futures(args), "kwargs": _strip_futures(kwargs)}
try:
# Attempt to serialize inputs with the chosen serializer
ser.dumps(payload)
except Exception as exc:
# Try to log using a run logger if we're in a flow/task context
try:
logger = get_run_logger()
logger.warning(
f"Skipping call to {getattr(callable_obj, '__name__', str(callable_obj))} "
f"due to non-serializable input: {exc!r}"
)
except Exception:
# Best-effort logging outside of run context
print(
f"[precheck_serializable] Non-serializable input, skipping "
f"{getattr(callable_obj, '__name__', str(callable_obj))}: {exc!r}"
)
if on_fail == "skip":
return default
elif on_fail == "fallback":
if fallback is None:
raise SerializationError(
f"Inputs failed pre-serialization and no fallback provided; original error: {exc}"
) from exc
return fallback(*args, **kwargs)
elif on_fail == "raise":
raise SerializationError("Inputs failed pre-serialization") from excMarvin
10/10/2025, 11:59 AMHow to use it
- Wrap a Prefect task to prevent submission when inputs aren’t serializable:
from prefect import task, flow
@task
def do_work(cfg):
return "ok"
@task
def fallback_work(cfg):
return "used fallback"
# Wrap the task callable so the check happens BEFORE submission
do_work_checked = precheck_serializable(
serializer="json",
on_fail="fallback", # or "skip" or "raise"
fallback=fallback_work, # can be a task (returns a Future) or a function (returns a value)
)(do_work)
@flow
def main(cfg):
# If cfg isn't serializable, we won't submit do_work; we'll call fallback_work instead
fut = do_work_checked(cfg)
return fut
Notes and caveats
- If you choose on_fail="skip", the decorator returns `default` (a plain value). If downstream logic expects a PrefectFuture, prefer providing a fallback task so you still get a Future.
- This is a best-effort pre-check. Passing this check doesn’t guarantee that Prefect won’t encounter a different serialization issue later, but it catches common cases early.
- If you know you need to persist complex task results, set a `result_serializer` on the task (this only affects the return value):
from prefect.serializers import PickleSerializer
@task(result_serializer=PickleSerializer())
def returns_complex(...):
...
```
Docs
- Serializers: prefect.serializers
- Exceptions (includes SerializationError): prefect.exceptions source
If you tell me your preferred failure behavior (skip vs fallback vs raise) and which serializer to check against (json vs pickle), I can tailor the decorator or give you a ready-to-use snippet for your exact case.oleander-whirlwind
10/10/2025, 12:15 PMMarvin
10/10/2025, 12:15 PMMarvin
10/10/2025, 12:16 PM@task decorator so it runs before Prefect submits the task. Example:
- Good: @require_first_arg_serializable above @task
- Not good: wrapping the plain function then applying @task (the check would run inside the task, too late)
- Returning None means you will not get a PrefectFuture. If downstream code expects a future, consider returning a fallback task call instead.
Here’s a minimal version of your idea that checks the first positional arg with cloudpickle and skips calling the task if it’s not serializable:
import cloudpickle
from functools import wraps
from prefect import task, get_run_logger
def test_serializable(obj, label: str):
try:
cloudpickle.dumps(obj)
print(f"SERIALIZABLE: {label}")
return True
except Exception as e:
print(f"NOT SERIALIZABLE: {label} -> {type(e).__name__}: {e}")
return False
def require_first_arg_serializable(func):
@wraps(func)
def wrapper(*args, **kwargs):
if not args:
print("No arguments provided, skipping function execution.")
return None
first_arg = args[0]
label = f"{func.__name__} first argument"
if test_serializable(first_arg, label):
return func(*args, **kwargs)
else:
# Optional: try to log via Prefect if in a run context
try:
logger = get_run_logger()
logger.warning(f"Skipping '{func.__name__}' due to unserializable first argument.")
except Exception:
print(f"Skipping execution of '{func.__name__}' due to unserializable first argument.")
# Return None; if you need a PrefectFuture, call a fallback Prefect task instead
return None
return wrapper
@require_first_arg_serializable # outermost so the check happens BEFORE Prefect submits the task
@task
def my_task(obj, label: str):
# task body
return f"processed {label}"
If you need a PrefectFuture even when skipping, add a small fallback task and call it instead of returning `None`:
@task
def noop(value=None):
return value
def require_first_arg_serializable(func):
@wraps(func)
def wrapper(*args, **kwargs):
if not args:
return noop(None) # returns a PrefectFuture
if test_serializable(args[0], f"{func.__name__} first argument"):
return func(*args, **kwargs)
else:
return noop(None) # returns a PrefectFuture instead of None
return wrapper
Important notes
- cloudpickle is a good “preflight” check, but Prefect may still perform other serialization steps (e.g., when computing cache keys or persisting results). If your object is picklable but not JSON-serializable and some part of your run uses JSON, you could still fail later. If needed, switch your task’s result serializer to pickle:
from prefect.serializers import PickleSerializer
@task(result_serializer=PickleSerializer())
def my_task(...):
...
Refs:
- Serializers: prefect.serializers
- Exceptions: prefect.exceptions source
If you share the type of object you’re passing, I can help tailor the check (e.g., ignore Prefect futures, check kwargs, or use JSON vs pickle depending on your pipeline).oleander-whirlwind
10/10/2025, 12:25 PMMarvin
10/10/2025, 12:25 PMMarvin
10/10/2025, 12:29 PMoleander-whirlwind
10/10/2025, 12:35 PMMarvin
10/10/2025, 12:35 PMMarvin
10/10/2025, 12:37 PMfrom prefect import task, flow
@task
def query_sqlite(db_path: str, sql: str, params=None):
import sqlite3
conn = sqlite3.connect(db_path) # same thread as the task
try:
cur = conn.execute(sql, params or ())
return cur.fetchall()
finally:
conn.close()
@flow
def main():
# Works with submit (parallel) or direct calls (sequential)
f1 = query_sqlite.submit("my.db", "select 1")
f2 = query_sqlite.submit("my.db", "select 2")
return f1.result(), f2.result()
If you absolutely must accept a connection-like object:
- Expect it to fail with .submit (thread boundary) and likely fail if Prefect tries to hash/serialize inputs.
- You can reduce risk by disabling input-based caching on that task:
from prefect import task
from prefect.cache_policies import NO_CACHE
@task(cache_policy=NO_CACHE)
def uses_conn(conn):
...
…but you’ll still be vulnerable to thread/process boundaries and any other feature that inspects parameters.
References
- Task runners and concurrency: Task runners
- Serializers: prefect.serializers
If you’re seeing a specific error (e.g., the SQLite “same thread” error or a SerializationError), share the stack trace and I’ll tailor the fix.oleander-whirlwind
10/10/2025, 12:39 PMMarvin
10/10/2025, 12:39 PMMarvin
10/10/2025, 12:40 PMfrom prefect import task
from prefect.serializers import PickleSerializer
@task(result_serializer=PickleSerializer())
def compute(arr):
...
- Caching/hash: Prefect may hash inputs for caching. Hashing typically uses a pickle-based byte representation, so NumPy arrays are fine here. But if you introduce custom JSON serializers for parameters, expect failures.
Rules of thumb:
- For large arrays, prefer threads locally to avoid pickling overhead; use processes only if you need CPU-bound parallelism and accept the copy cost.
- For distributed/remote execution (e.g., workers, containers), always assume serialization is required; choose serializers that support your types or convert arrays to a portable form.
Docs:
- Task runners: Task runners
- Serializers: prefect.serializers
If you share your exact setup (runner, array sizes, whether results are persisted), I can suggest the most performant pattern.oleander-whirlwind
10/10/2025, 12:42 PMMarvin
10/10/2025, 12:42 PMMarvin
10/10/2025, 12:43 PMtensor.cpu()), device RNG states
- JAX device buffers (device-backed arrays), OpenCL contexts
Framework sessions/executors/runtimes
- TensorFlow 1.x Graph/Session; tf.data.Dataset iterators
- SparkSession, PySpark DataFrames/RDDs (pass identifiers; recreate in task)
- Event loops and async primitives: asyncio loop/Task, generators/coroutines
Iterators and streaming datasets
- Python generators/itertools, file iterators
- torch.utils.data.DataLoader iterators/workers
- Hugging Face streaming datasets (iterable datasets)
Concurrency primitives and system objects
- Thread/process pools, multiprocessing.Queue/Lock, threading.Lock/RLock
- Loggers with live handlers, tqdm progress bars with live output
Models and training state (use framework-native save/load instead)
- Keras/TensorFlow models: prefer `model.save()`/SavedModel over pickle
- PyTorch nn.Module: can often pickle, but prefer state_dict + `load_state_dict`; CUDA adds complexity
- XGBoost/LightGBM/CatBoost models: use `.save_model()`/`.load_model()` rather than Python pickle (version sensitive)
- spaCy pipelines: use nlp.to_bytes()/from_bytes(), not pickle
- Tokenizers: many are picklable, but safer to reload via pretrained name/path
Arrow/columnar and big tables
- PyArrow Dataset scanners, file-system handles; materialize to Table/RecordBatch first
- Dask/Ray futures/objects (framework-specific serializers; not plain pickle/JSON)
Guidelines that work well in Prefect 3.x
- Don’t pass live handles; pass small, serializable config (paths, DSNs, model names) and construct inside the task.
- For arrays/tensors:
- JSON path: convert to lists or a compact dtype+bytes representation
- Pickle path: prefer CPU numpy arrays or CPU torch tensors; set a pickle-based result serializer when returning them
from prefect.serializers import PickleSerializer
@task(result_serializer=PickleSerializer())
def compute(...): ...
- Avoid passing iterators/generators; pass parameters to recreate them or materialize to concrete data.
- If you must pass a custom object, implement __getstate__/__setstate__ or provide `.to_bytes()`/`.from_bytes()` helpers and wrap them.
Reference
- Prefect serializers: prefect.serializers
If you share which objects you’re passing (e.g., CUDA tensors, sessions, DataLoader, Spark DF), I can suggest the safest conversion or a serializer strategy for your flow.oleander-whirlwind
10/10/2025, 12:55 PMMarvin
10/10/2025, 12:55 PMMarvin
10/10/2025, 12:56 PMfrom prefect import task, flow
@task
def run_query(db_path: str, sql: str, params=None):
import sqlite3
conn = sqlite3.connect(db_path)
try:
# Optional: improve concurrent access characteristics
conn.execute("PRAGMA journal_mode=WAL;")
conn.execute("PRAGMA busy_timeout=5000;")
cur = conn.execute(sql, params or ())
return cur.fetchall()
finally:
conn.close()
@flow
def main():
f1 = run_query.submit("my.db", "select 1")
f2 = run_query.submit("my.db", "select 2")
return f1.result(), f2.result()
2) If you must share a single connection across threads (not recommended)
- Create it with check_same_thread=False and serialize access with a lock. This is fragile and easy to get wrong; SQLite may still lock under write load.
import sqlite3, threading
conn = sqlite3.connect("my.db", check_same_thread=False)
lock = threading.Lock()
@task
def run_query_shared(sql: str, params=None):
with lock:
cur = conn.execute(sql, params or ())
return cur.fetchall()
Caveats: still a single-process solution; won’t survive process boundaries, and concurrency is effectively serialized by the lock.
3) If you’re using an in-memory DB
- Each connection gets its own DB by default. To share in-memory across connections, use a shared-cache URI:
db_path = "file:memdb1?mode=memory&cache=shared"
conn = sqlite3.connect(db_path, uri=True)
You still need to open per-task connections and remain in one process.
4) If tasks must share a DB but you want to avoid contention
- Keep DB I/O in one task (sequential) and fan out CPU work in others.
- Or enforce concurrency=1 for DB-tagged tasks using global concurrency limits: Global concurrency limits
- Or just call the DB task without .submit (run sequentially).
Related docs
- Task runners overview: Task runners
If you share your exact error text and how you create the connection (file vs in-memory), I can suggest the safest change (PRAGMA settings, connection-per-task, or a small refactor).oleander-whirlwind
10/10/2025, 12:59 PMMarvin
10/10/2025, 12:59 PMMarvin
10/10/2025, 12:59 PMtorch.nn.Module and torch.Tensor references across threads if your usage is thread-safe (often is for pure inference).
- ProcessPool: Fails because the live model/tensors and CUDA context are not safely picklable/transferable. You’ll see pickling errors or CUDA initialization/memory issues.
- Workaround for processes: pass a model path/config and load the model inside each process; move tensors to CPU or reconstruct them from bytes/np arrays inside the task.
2) Hugging Face transformers Pipeline (especially on GPU)
- ThreadPool: OK to share a loaded transformers.Pipeline across threads for inference.
- ProcessPool: Typically fails to pickle the pipeline (complex object graph with device-bound tensors, file handles, and non-picklable components).
- Workaround for processes: pass the model identifier and device config; re-create the pipeline inside the task process.
3) spaCy Language pipeline (nlp)
- ThreadPool: OK to use a loaded spacy.Language object across threads for inference (tokenization, NER, etc.). spaCy is generally thread-safe for read-only pipeline use.
- ProcessPool: Not picklable by default; sending a live nlp object to another process will error. spaCy recommends loading the pipeline per process.
- Workaround for processes: pass the model name/path (e.g., "en_core_web_sm") and call spacy.load(...) inside the task.
Key takeaway
- ThreadPoolTaskRunner: no inter-process serialization, so many complex in-memory objects can be shared by reference if the library is thread-safe and doesn’t enforce same-thread rules (sqlite does).
- ProcessPoolTaskRunner: every argument/return must be picklable; objects tied to GPU contexts, C++ handles, sockets, or complex Python graphs usually are not.
General patterns in Prefect 3.x
- For threads: you can pre-load heavy models once (module/global scope) and use them in tasks, but ensure thread-safety.
- For processes: pass small, serializable parameters (paths/model names/config) and construct the heavy object inside each task.
- If you persist complex results, set a pickle-based result serializer on the task:
from prefect import task
from prefect.serializers import PickleSerializer
@task(result_serializer=PickleSerializer())
def infer(...):
...
Docs for context
- Task runners: Task runners
If you share your exact runner choice and objects (e.g., torch model + GPU, transformers pipeline, spaCy nlp), I can sketch the safest pattern (preload vs construct-in-task, locks, or per-process init).oleander-whirlwind
10/10/2025, 1:36 PMMarvin
10/10/2025, 1:36 PMMarvin
10/10/2025, 1:36 PM@flow(task_runner="ThreadPoolTaskRunner") # <-- string
def my_flow(name): ...
Prefect tries to call .duplicate() on the task runner, which strings don’t have, hence: 'str' object has no attribute 'duplicate'.
Fix: pass an actual TaskRunner instance from prefect.task_runners.
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner
@task
def greet(who: str):
return f"Hello, {who}!"
@flow(task_runner=ThreadPoolTaskRunner(max_workers=4)) # correct: instance, not string
def my_flow(name: str):
# use .submit() if you want concurrency
fut = greet.submit(name)
return fut.result()
if __name__ == "__main__":
my_flow("World")
You can also switch runners without redecorating using with_options:
my_flow_proc = my_flow.with_options(task_runner=ProcessPoolTaskRunner(max_workers=2))
my_flow_proc("World")
Docs for context:
- Task runners overview: Task runners
If you share the exact decorator line you used, I can confirm the minimal change.oleander-whirlwind
10/10/2025, 1:39 PMoleander-whirlwind
10/10/2025, 1:39 PMMarvin
10/10/2025, 1:39 PMMarvin
10/10/2025, 1:40 PMfrom prefect import task, flow
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner
class Probe:
def __init__(self, tag): self.tag = tag
def __getstate__(self):
print(f"[PROBE] pickling {self.tag}")
with open(f"/tmp/probe_pickled_{self.tag}", "w") as f: f.write("pickled")
return {"tag": self.tag}
def __setstate__(self, state):
print(f"[PROBE] unpickling {state['tag']}")
with open(f"/tmp/probe_unpickled_{state['tag']}", "w") as f: f.write("unpickled")
self.tag = state["tag"]
@task(log_prints=True)
def echo_probe(p: Probe):
print(f"[TASK] tag={p.tag}, id={id(p)}")
return id(p)
@flow(log_prints=True, task_runner=ThreadPoolTaskRunner())
def thread_flow():
p = Probe("thread")
rid = echo_probe.submit(p).result()
print(f"[FLOW] thread result id={rid}")
@flow(log_prints=True, task_runner=ProcessPoolTaskRunner())
def process_flow():
p = Probe("process")
rid = echo_probe.submit(p).result()
print(f"[FLOW] process result id={rid}")
if __name__ == "__main__":
thread_flow()
process_flow()
Expected:
- Thread flow: no “[PROBE] pickling/unpickling” prints; no /tmp/probe_* files; object id in task likely equals the id in the flow (same object).
- Process flow: you’ll see “[PROBE] pickling …” in the parent, “[PROBE] unpickling …” in the worker, and the /tmp files will be created. The object id inside the task will differ (it’s a deserialized copy).
2) Show a non-picklable object works with threads but fails with processes
- Lambdas and many live handles aren’t picklable. With threads, they pass by reference; with processes, you get a pickling error.
from prefect import task, flow
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner
from prefect.cache_policies import NO_CACHE # avoid hashing inputs for this demo
@task(log_prints=True, cache_policy=NO_CACHE)
def call_fn(fn):
print(f"fn result: {fn(2)}")
@flow(task_runner=ThreadPoolTaskRunner())
def thread_ok():
call_fn.submit(lambda x: x + 1).result() # OK
@flow(task_runner=ProcessPoolTaskRunner())
def process_fails():
# Will raise "Can't pickle local object ..." from the worker boundary
call_fn.submit(lambda x: x + 1).result()
if __name__ == "__main__":
thread_ok()
process_fails()
3) Compare object identity across runners
- The same object will have the same identity in threads; in processes it’ll be a different instance after unpickling. The first probe example prints ids to show this.
Extra tips
- Turn up logging to see runner details:
- Set env var PREFECT_LOGGING_LEVEL=DEBUG
- You’ll see logs like “Using ProcessPoolTaskRunner …” and task submission messages, but there isn’t a specific “serializing inputs now” log line.
- If you need rock-solid evidence beyond prints, you can write timestamps/markers to files in __getstate__/__setstate__ as shown, or use logging to a known file to prove pickling/unpickling occurs only under the process pool.
If you paste your current minimal example, I can adapt the probe pattern directly into it so you can run A/B with ThreadPool vs ProcessPool.oleander-whirlwind
10/10/2025, 1:53 PMMarvin
10/10/2025, 1:54 PMMarvin
10/10/2025, 1:54 PMimport os
from functools import wraps
from prefect import task, flow
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner
from prefect.cache_policies import NO_CACHE
class Probe:
def __init__(self, tag):
self.tag = tag
def __reduce_ex__(self, protocol):
# This is called during pickling (process pool boundary)
print(f"[PROBE] __reduce_ex__ called for tag={self.tag} in pid={os.getpid()}")
with open(f"/tmp/probe_reduced_{self.tag}", "w") as f:
f.write(f"reduced in pid {os.getpid()}")
return (Probe._reconstruct, (self.tag,))
@staticmethod
def _reconstruct(tag):
print(f"[PROBE] reconstruct tag={tag} in pid={os.getpid()}")
return Probe(tag)
@task(log_prints=True, cache_policy=NO_CACHE)
def where_am_i(p: Probe):
print(f"[TASK] pid={os.getpid()} id(p)={id(p)} tag={p.tag}")
return id(p)
@flow(log_prints=True, task_runner=ThreadPoolTaskRunner())
def thread_flow():
p = Probe("thread")
print(f"[FLOW] THREAD pid={os.getpid()} id(p)={id(p)}")
rid = where_am_i.submit(p).result()
print(f"[FLOW] THREAD result id={rid}")
@flow(log_prints=True, task_runner=ProcessPoolTaskRunner(max_workers=2))
def process_flow():
p = Probe("process")
print(f"[FLOW] PROCESS pid={os.getpid()} id(p)={id(p)}")
rid = where_am_i.submit(p).result()
print(f"[FLOW] PROCESS result id={rid}")
if __name__ == "__main__":
thread_flow()
process_flow()
What you should observe
- ThreadPoolTaskRunner:
- No “[PROBE] __reduce_ex__ …” or “reconstruct” prints
- No /tmp/probe_reduced_thread file
- [TASK] pid equals the flow’s pid (same process)
- id(p) in flow and task may match (same object by reference), or at least there is no pickling evidence
- ProcessPoolTaskRunner:
- You’ll see “[PROBE] __reduce_ex__ …” and “reconstruct …”
- /tmp/probe_reduced_process file is created
- [TASK] pid differs from the flow’s pid (different process)
- id(p) in task differs from flow (it’s a deserialized copy)
If you still don’t see the reduce prints in the process case:
- Double-check you are actually using ProcessPoolTaskRunner in the flow decorator and calling .submit (not a direct call).
- Ensure the task has cache_policy=NO_CACHE as above; otherwise, Prefect’s input hashing may pickle in both cases, confusing the signal.
- Set PREFECT_LOGGING_LEVEL=DEBUG to confirm “Using ProcessPoolTaskRunner…” appears in logs.
Why your earlier “both work” examples didn’t fail
- Prefect’s process pool uses cloudpickle, which can serialize lambdas, closures, many ML objects, etc. That’s why you didn’t get errors. The probe above does not rely on errors; it shows serialization via side effects and PID changes.
If you want, paste your current snippet and I’ll adapt the probe into it so you can run an apples-to-apples comparison.oleander-whirlwind
10/13/2025, 6:09 AMMarvin
10/13/2025, 6:09 AMMarvin
10/13/2025, 6:09 AMfrom prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
import threading
@task
def use_lock(lock):
# Just try to use the lock so it isn't optimized away
lock.acquire()
lock.release()
return "ok"
@flow(task_runner=ProcessPoolTaskRunner())
def my_flow():
lock = threading.Lock() # NOT pickleable
use_lock.submit(lock) # This will fail to serialize across processes
if __name__ == "__main__":
my_flow()
What you’ll see: a pickling error like “TypeError: cannot pickle '_thread.lock' object” — because arguments/results must be pickleable when using process-based execution.
Fixed version (create the resource inside the task, pass only pickleable data):
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
import threading
@task
def make_and_use_lock():
lock = threading.Lock() # Created inside the process, no pickling needed
lock.acquire()
lock.release()
return "ok"
@flow(task_runner=ProcessPoolTaskRunner())
def my_flow():
make_and_use_lock.submit() # No non-pickleable args cross the boundary
if __name__ == "__main__":
my_flow()
Notes:
- With ProcessPoolTaskRunner, task functions, their inputs, and their outputs must be pickleable.
- If you switch to ThreadPoolTaskRunner, the first example will work because threads share memory and don’t require pickling.
- Always define task/flow functions at module top-level and avoid passing non-serializable objects (open files, locks, DB connections, sessions, etc.). Create those inside the task instead.
If you want, I can tailor this to your exact object (e.g., a DB client or session) and show the minimal fix for that case.oleander-whirlwind
10/13/2025, 6:11 AMMarvin
10/13/2025, 6:12 AMMarvin
10/13/2025, 6:12 AMfrom prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner
import threading
@task
def use_lock(lock):
# Touch the lock so it's actually used
lock.acquire()
lock.release()
return "ok"
# 1) Works: threads share memory, no pickling needed
@flow(task_runner=ThreadPoolTaskRunner())
def thread_flow():
lock = threading.Lock() # not pickleable, but fine in threads
fut = use_lock.submit(lock) # shared in-memory reference
print("ThreadPool result:", fut.result())
# 2) Fails: processes require pickleable args/results
@flow(task_runner=ProcessPoolTaskRunner())
def process_flow():
lock = threading.Lock() # not pickleable
fut = use_lock.submit(lock) # will try to pickle and fail
print("ProcessPool result:", fut.result())
if __name__ == "__main__":
# Run one at a time to see the behavior
thread_flow() # expect: prints "ThreadPool result: ok"
# process_flow() # expect: TypeError: cannot pickle '_thread.lock' object
Fix for ProcessPoolTaskRunner (create unpickleable resources inside the task; pass only pickleable data across the boundary):
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
import threading
@task
def make_and_use_lock():
lock = threading.Lock() # created inside the worker process
lock.acquire()
lock.release()
return "ok"
@flow(task_runner=ProcessPoolTaskRunner())
def process_flow_fixed():
fut = make_and_use_lock.submit() # no unpickleable args cross processes
print("ProcessPool result:", fut.result())
if __name__ == "__main__":
process_flow_fixed() # expect: prints "ProcessPool result: ok"
Key takeaways:
- ThreadPoolTaskRunner: tasks execute in the same process; objects are shared by reference, no pickling.
- ProcessPoolTaskRunner: tasks execute in separate processes; function, args, and results must be pickleable.
References:
- prefect.task_runners source
- prefect.serializers source
If you have a specific object (e.g., a DB client or session) you want to pass, share that and I’ll show the smallest safe pattern for each runner.oleander-whirlwind
10/13/2025, 6:16 AMcache_key_fn
2. Disable caching by passing cache_policy=NO_CACHEMarvin
10/13/2025, 6:17 AMMarvin
10/13/2025, 6:18 AM_thread.lock.
Two minimal fixes for the threaded case:
- Disable caching for that task
- Or provide a cache_key_fn that ignores the unpickleable arg
Side‑by‑side comparison
1) ThreadPoolTaskRunner: works if you disable caching on the task
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.cache_policies import NO_CACHE
import threading
@task(cache_policy=NO_CACHE) # prevent hashing of inputs
def use_lock(lock):
lock.acquire()
lock.release()
return "ok"
@flow(task_runner=ThreadPoolTaskRunner())
def thread_flow():
lock = threading.Lock() # not pickleable, but fine with threads
print(use_lock.submit(lock).result())
if __name__ == "__main__":
thread_flow() # prints "ok" with no cache key errors
Alternative for threads (custom cache key that ignores the lock):
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
import threading
def no_cache_key(ctx, inputs):
return None # skip caching for this run
@task(cache_key_fn=no_cache_key)
def use_lock(lock):
lock.acquire()
lock.release()
return "ok"
@flow(task_runner=ThreadPoolTaskRunner())
def thread_flow():
lock = threading.Lock()
print(use_lock.submit(lock).result())
2) ProcessPoolTaskRunner: even with caching disabled, passing the lock will fail due to the process boundary
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
from prefect.cache_policies import NO_CACHE
import threading
@task(cache_policy=NO_CACHE)
def use_lock(lock):
lock.acquire()
lock.release()
return "ok"
@flow(task_runner=ProcessPoolTaskRunner())
def process_flow():
lock = threading.Lock() # not pickleable -> will still fail
print(use_lock.submit(lock).result())
if __name__ == "__main__":
process_flow() # expect: TypeError cannot pickle '_thread.lock' object
ProcessPool fixed version (create the resource inside the task; pass only pickleable data):
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
@task
def make_and_use_lock():
import threading
lock = threading.Lock()
lock.acquire()
lock.release()
return "ok"
@flow(task_runner=ProcessPoolTaskRunner())
def process_flow_fixed():
print(make_and_use_lock.submit().result())
if __name__ == "__main__":
process_flow_fixed() # prints "ok"
Summary:
- ThreadPoolTaskRunner: same-process, shared memory. You must disable or customize caching if inputs are not serializable.
- ProcessPoolTaskRunner: separate processes. All task functions/inputs/outputs must be pickleable regardless of caching.
References:
- prefect.task_runners source
- prefect.cache_policies sourceoleander-whirlwind
10/13/2025, 6:20 AMMarvin
10/13/2025, 6:20 AMMarvin
10/13/2025, 6:21 AMfrom prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner
from prefect.cache_policies import NO_CACHE
import threading
# Task that accepts a lock argument.
# For threads: disable caching so Prefect won't try to hash (serialize) the lock.
@task(cache_policy=NO_CACHE)
def use_lock(lock):
lock.acquire()
lock.release()
return "ok (used external lock)"
# Process-safe task: creates the lock inside the worker process, so no pickling of args.
@task
def make_and_use_lock():
lock = threading.Lock()
lock.acquire()
lock.release()
return "ok (lock created in task)"
@flow(task_runner=ThreadPoolTaskRunner())
def thread_flow_ok():
print("\n[ThreadPoolTaskRunner] START (expect success)")
lock = threading.Lock() # not pickleable, but fine with threads
res = use_lock.submit(lock).result()
print("[ThreadPoolTaskRunner] RESULT:", res)
@flow(task_runner=ProcessPoolTaskRunner())
def process_flow_fails():
print("\n[ProcessPoolTaskRunner] START (expect failure when passing lock)")
lock = threading.Lock() # not pickleable across processes
fut = use_lock.submit(lock)
try:
res = fut.result()
print("[ProcessPoolTaskRunner] RESULT:", res)
except Exception as e:
print("[ProcessPoolTaskRunner] ERROR TYPE:", type(e).__name__)
print("[ProcessPoolTaskRunner] ERROR MSG:", e)
@flow(task_runner=ProcessPoolTaskRunner())
def process_flow_fixed():
print("\n[ProcessPoolTaskRunner - FIXED] START (expect success)")
res = make_and_use_lock.submit().result()
print("[ProcessPoolTaskRunner - FIXED] RESULT:", res)
if __name__ == "__main__":
# Run all three to see the contrast
thread_flow_ok() # works (caching disabled avoids hashing the lock)
process_flow_fails() # fails (cannot pickle '_thread.lock' object)
process_flow_fixed() # works (lock created inside the task)
What this demonstrates:
- ThreadPoolTaskRunner: same-process, shared memory. Passing a lock works as long as you disable or customize caching to avoid hashing non-serializable inputs.
- ProcessPoolTaskRunner: separate processes. Arguments/results must be pickleable. Passing a lock fails; creating it inside the task works.
References:
- prefect.task_runners source
- prefect.cache_policies sourceoleander-whirlwind
10/13/2025, 6:35 AMMarvin
10/13/2025, 6:35 AMMarvin
10/13/2025, 6:37 AMimport os
import threading
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner
from prefect.cache_policies import NO_CACHE
@task(cache_policy=NO_CACHE, log_prints=True)
def inspect_arg(label, arg):
# Runs inside a thread or a process depending on the task runner
print(f"[{label}] pid={os.getpid()} "
f"thread={threading.current_thread().name} "
f"id(arg)={id(arg)} type={type(arg)}")
return id(arg)
@flow(task_runner=ThreadPoolTaskRunner())
def thread_flow():
print(f"\n[thread_flow] parent pid={os.getpid()}")
data = {"k": "v", "nums": [1, 2, 3]}
print(f"[thread_flow] id(data) in parent={id(data)}")
# In threads, the same object is shared by reference
child_id = inspect_arg.submit("thread-data", data).result()
print(f"[thread_flow] same object across threads? {child_id == id(data)}") # expect True
# Generators are fine in threads if caching is disabled
gen = (i for i in range(3))
inspect_arg.submit("thread-gen", gen).result()
@flow(task_runner=ProcessPoolTaskRunner())
def process_flow():
print(f"\n[process_flow] parent pid={os.getpid()}")
data = {"k": "v", "nums": [1, 2, 3]}
print(f"[process_flow] id(data) in parent={id(data)}")
# In processes, args are pickled/unpickled -> new object with a different id
child_id = inspect_arg.submit("process-data", data).result()
print(f"[process_flow] same object across processes? {child_id == id(data)}") # expect False
# Generators cannot be pickled -> this will fail
try:
gen = (i for i in range(3))
inspect_arg.submit("process-gen", gen).result()
except Exception as e:
print(f"[process_flow] passing generator failed as expected: {type(e).__name__}: {e}")
if __name__ == "__main__":
print(f"[main] pid={os.getpid()}")
# IMPORTANT: Do not use flow.fn(...) here; that bypasses orchestration and the task runner
thread_flow()
process_flow()
Why your original example misleads:
- @flow(task_runner=ProcessPoolTaskRunner) should be @flow(task_runner=ProcessPoolTaskRunner())
- say_hello(name) executes inline; use say_hello.submit(name) to schedule it on the runner
- Globals in a spawned process come from module import, not from parent memory; and string interning makes id(...) unreliable for this proof
References:
- prefect.task_runners source
- prefect.cache_policies source