<@ULVA73B9P> i want to show my colleques what obje...
# ask-marvin
o
@Marvin i want to show my colleques what objects persist over tasks and which not, for all available runners. could you generate a flow for each task runner showing the limitations in object persisitence / under the hood serialisation ?
m
thought for 287.8 seconds
Great idea — object “persistence” really varies by task runner because of where serialization boundaries appear. In Prefect 3.x: - PrefectTaskRunner: in-process, sequential — no serialization between tasks - ThreadPoolTaskRunner: in-process, multi-thread — no serialization between tasks (but watch out for thread-safety) - ProcessPoolTaskRunner: multi-process — task inputs/outputs must be picklable - DaskTaskRunner/RayTaskRunner: distributed — task inputs/outputs must be serializable (cloudpickle/pickle) Below is a single script that defines the same demo for each runner to show what survives across tasks. It covers primitives, dataclass, Pydantic model, pandas DataFrame, and a couple of “tricky” objects (open file handle and sqlite3 connection). Copy/paste into a file and run the flows you want. For Dask/Ray, install extras first: - pip install "prefect[dask]" "dask[distributed]" - pip install "prefect[ray]" "ray" Code ``` from future import annotations from dataclasses import dataclass import sqlite3 from tempfile import NamedTemporaryFile from typing import Any, Optional from prefect import flow, task from prefect.task_runners import PrefectTaskRunner, ThreadPoolTaskRunner, ProcessPoolTaskRunner # Optional runners (Dask/Ray) if installed try: from prefect_dask.task_runners import DaskTaskRunner except Exception: DaskTaskRunner = None try: from prefect_ray.task_runners import RayTaskRunner except Exception: RayTaskRunner = None # ------------------- # Sample object makers # ------------------- @dataclass class MyDataClass: x: int y: str class MyPydModelFallback: # Fallback if Pydantic is not available def __init__(self, a: int, b: str): self.a = a self.b = b def __repr__(self): return f"MyPydModelFallback(a={self.a}, b={self.b})" def make_pydantic_model(a: int, b: str) -> Any: try: from pydantic import BaseModel class MyModel(BaseModel): a: int b: str return MyModel(a=a, b=b) except Exception: return MyPydModelFallback(a=a, b=b) @task(name="make_primitives", log_prints=True) def make_primitives() -> dict[str, Any]: obj = { "int": 42, "list": [1, 2, 3], "str": "hello", "dict": {"k": "v"}, } print("Produced primitives:", obj) return obj @task(name="use_primitives", log_prints=True) def use_primitives(d: dict[str, Any]) -> None: print("Using primitives OK. Types:", {k: type(v).name for k, v in d.items()}) @task(name="make_dataclass", log_prints=True) def make_dataclass() -> MyDataClass: obj = MyDataClass(x=1, y="dataclass") print("Produced dataclass:", obj) return obj @task(name="use_dataclass", log_prints=True) def use_dataclass(obj: MyDataClass) -> None: print("Using dataclass OK. Values:", obj.x, obj.y) @task(name="make_pydantic", log_prints=True) def make_pydantic() -> Any: obj = make_pydantic_model(10, "model") print("Produced pydantic-like object:", obj, type(obj).name) return obj @task(name="use_pydantic", log_prints=True) def use_pydantic(obj: Any) -> None: # Works for pydantic v2 or fallback a = getattr(obj, "a", None) b = getattr(obj, "b", None) print("Using pydantic-like OK. Values:", a, b) @task(name="make_dataframe", log_prints=True) def make_dataframe() -> Optional[Any]: try: import pandas as pd except Exception as e: print("pandas not installed, skipping DataFrame demo.", e) return None df = pd.DataFrame({"a": [1, 2], "b": ["x", "y"]}) print("Produced DataFrame shape:", df.shape) return df @task(name="use_dataframe", log_prints=True) def use_dataframe(df: Optional[Any]) -> None: if df is None: print("Skipping DataFrame usage task.") return print("Using DataFrame OK. Head:\n", df.head(1).to_string(index=False)) @task(name="make_open_file_handle", log_prints=True)
def make_open_file_handle(): # Note: returning an open file handle will FAIL under ProcessPool/Dask/Ray (not picklable) f = NamedTemporaryFile(mode="w+", delete=False, encoding="utf-8") f.write("hello from file handle\n") f.flush() print("Produced open file handle:", f.name) return f # return the open handle itself @task(name="use_open_file_handle", log_prints=True) def use_open_file_handle(fh) -> None: # Works in-process; unsafe/not picklable across processes/distributed. print("Attempting to read from open handle:", fh.name) fh.seek(0) print("File content:", fh.readline().strip()) @task(name="make_sqlite_connection", log_prints=True) def make_sqlite_connection(): # Allow cross-thread usage for ThreadPool; still not picklable across processes. conn = sqlite3.connect("memory", check_same_thread=False) cur = conn.cursor() cur.execute("create table t (x int)") cur.execute("insert into t values (1)") conn.commit() print("Produced sqlite3 connection (in-memory).") return conn @task(name="use_sqlite_connection", log_prints=True) def use_sqlite_connection(conn) -> None: cur = conn.cursor() cur.execute("select count(*) from t") print("Using sqlite connection OK. Row count:", cur.fetchone()[0]) # ------------------- # Runner-specific flows # ------------------- def run_demo_suite(): # Each pair demonstrates passing results across tasks. prim = make_primitives() use_primitives(prim) dc = make_dataclass() use_dataclass(dc) pyd = make_pydantic() use_pydantic(pyd) df = make_dataframe() use_dataframe(df) # These two will only work in in-process runners (PrefectTaskRunner, ThreadPoolTaskRunner) # and will FAIL (by design) with ProcessPool/Dask/Ray due to pickling/serialization. fh = make_open_file_handle() use_open_file_handle(fh) conn = make_sqlite_connection() use_sqlite_connection(conn) @flow(name="object-persistence-prefect", task_runner=PrefectTaskRunner(), log_prints=True) def demo_prefect_task_runner(): print("Runner: PrefectTaskRunner (sequential, in-process)") run_demo_suite() @flow(name="object-persistence-threads", task_runner=ThreadPoolTaskRunner(), log_prints=True) def demo_threadpool_task_runner(): print("Runner: ThreadPoolTaskRunner (threads, in-process)") run_demo_suite() @flow(name="object-persistence-processes", task_runner=ProcessPoolTaskRunner(), log_prints=True) def demo_processpool_task_runner(): print("Runner: ProcessPoolTaskRunner (processes; requires picklable objects)") run_demo_suite() if DaskTaskRunner is not None: @flow(name="object-persistence-dask", task_runner=DaskTaskRunner(), log_prints=True) def demo_dask_task_runner(): print("Runner: DaskTaskRunner (distributed; requires serializable objects)") run_demo_suite() else: def demo_dask_task_runner(): print("Dask not installed; install with: pip install 'prefect[dask]' 'dask[distributed]'") if RayTaskRunner is not None: @flow(name="object-persistence-ray", task_runner=RayTaskRunner(), log_prints=True) def demo_ray_task_runner(): print("Runner: RayTaskRunner (distributed; requires serializable objects)") run_demo_suite() else: def demo_ray_task_runner(): print("Ray not installed; install with: pip install 'prefect[ray]' 'ray'") if name == "__main__": # Run one or more to see behavior differences: demo_prefect_task_runner() demo_threadpool_task_runner() demo_processpool_task_runner() # Optional if installed: demo_dask_task_runner() demo_ray_task_runner() ``` What you’ll observe - PrefectTaskRunner (sequential, in-process): Everything succeeds, including open file handle and sqlite3 connection. No serialization boundary between tasks.
- ThreadPoolTaskRunner (threads, in-process): Same as above; objects are shared in-memory across threads. Beware thread safety; we set sqlite3 check_same_thread=False to make this work. - ProcessPoolTaskRunner (processes): Primitives, dataclass, Pydantic model, pandas DataFrame work. Open file handle and sqlite3 connection FAIL because they are not picklable/meaningful across processes. - DaskTaskRunner/RayTaskRunner (distributed): Same expectations as ProcessPool — object arguments/results must be serializable. Open handles and live connections will fail. Notes on “persistence” and serialization - Between tasks: The runner dictates whether arguments/results must be serialized. In-process runners pass Python objects directly; process/distributed runners serialize arguments and results at every task boundary. - Persisting results to storage: Controlled separately via
persist_result
,
result_storage
, and
result_serializer
on tasks/flows. This affects storing results for later retrieval but does not change runner-level transport requirements between tasks. See serializers in code: prefect/serializers.py - Task runners API: prefect/task_runners.py - Prefect 3 release notes: Prefect 3.0 release notes Want me to add a variant that persists DataFrames to storage using
PickleSerializer
to show cross-run retrieval as well?
o
@Marvin you cricumvented the limits. all is working. what we want to learn with the example what works and what does not work. but the what does not work part was not emphasized with your example. can you adjust the tasks to show what is not working ?
m
thought for 156.4 seconds
Got it — let’s make failures loud and intentional. Below is a revised demo that: - Forces non-serializable objects to cross a serialization boundary where they will fail - Prints PASS/FAIL per case so your team can see exactly what breaks per runner - Uses the same flow body for each runner to keep comparisons apples-to-apples Key idea we use to provoke failures: - We create tricky objects in the flow body (main thread/process) and pass them into tasks. - PrefectTaskRunner (sequential, same thread): passes by reference → succeeds. - ThreadPoolTaskRunner (threads): passes to a worker thread → often succeeds (no serialization) but will fail for SQLite due to thread affinity. - ProcessPoolTaskRunner (processes), DaskTaskRunner, RayTaskRunner: arguments must be serialized → non-serializable objects fail immediately. Install extras for Dask/Ray if you want to run those flows: - pip install "prefect[dask]" "dask[distributed]" - pip install "prefect[ray]" "ray" Code ``` from future import annotations from dataclasses import dataclass import sqlite3 from tempfile import NamedTemporaryFile from typing import Any, Optional from prefect import flow, task from prefect.task_runners import PrefectTaskRunner, ThreadPoolTaskRunner, ProcessPoolTaskRunner # Optional runners (Dask/Ray) try: from prefect_dask.task_runners import DaskTaskRunner except Exception: DaskTaskRunner = None try: from prefect_ray.task_runners import RayTaskRunner except Exception: RayTaskRunner = None # ------------------- # Helpers and tasks # ------------------- def check(label: str, fut): try: val = fut.result() # will raise on failure tname = type(val).name if val is not None else "None" print(f"PASS: {label} -> {tname}") except Exception as e: print(f"FAIL: {label} -> {type(e).name}: {e!s}") @dataclass class MyData: x: int y: str @task(name="echo", log_prints=True) def echo(obj: Any) -> str: print("echo type:", type(obj).name) return f"got-{type(obj).name}" @task(name="use_open_file", log_prints=True) def use_open_file(fh) -> str: fh.seek(0) return fh.readline().strip() @task(name="use_sqlite", log_prints=True) def use_sqlite(conn) -> int: cur = conn.cursor() cur.execute("select 1") return cur.fetchone()[0] @task(name="use_callable", log_prints=True) def use_callable(fn) -> int: return fn(2) @task(name="use_generator", log_prints=True) def use_generator(gen) -> list: return [next(gen), next(gen)] @task(name="make_dataframe", log_prints=True) def make_dataframe() -> Optional[Any]: try: import pandas as pd except Exception as e: print("pandas not installed; skipping DataFrame return demo.", e) return None df = pd.DataFrame({"a": [1, 2], "b": ["x", "y"]}) print("made DataFrame shape:", df.shape) return df # ------------------- # Core demo logic # ------------------- def run_demo(label: str): print(f"=== Runner: {label} ===") print("This demo forces serialization boundaries for inputs (flow -> task).") print("- Expect PrefectTaskRunner to pass almost everything.") print("- Expect ThreadPoolTaskRunner to FAIL on sqlite (thread-bound), others PASS.") print("- Expect ProcessPool/Dask/Ray to FAIL on non-serializable inputs (file handle, lambda, generator, sqlite connection).") # Safe things created in flow and passed to task primitives = {"a": 1, "b": [1, 2], "c": {"k": "v"}} dataclass_obj = MyData(1, "ok") # Tricky things created in flow (to force failure for non-inproc runners) # 1) open file handle: not pickleable (will fail for process/distributed), okay for threads fh = NamedTemporaryFile(mode="w+", delete=False, encoding="utf-8") fh.write("hello-from-fh\n") fh.flush() # 2) sqlite connection: by default thread-bound; also not pickleable conn = sqlite3.connect("memory") # check_same_thread=True by default
# 3) lambda: not pickleable lam = lambda x: x + 1 # noqa: E731 # 4) generator: not pickleable gen = (i for i in range(10)) # Also show a DataFrame being returned from a task (output serialization) df_future = make_dataframe() # Schedule tasks that consume these objects safe1 = echo(primitives) safe2 = echo(dataclass_obj) t_fh = use_open_file(fh) t_conn = use_sqlite(conn) t_lam = use_callable(lam) t_gen = use_generator(gen) # Now observe results (this is where failures are printed) check("input: primitives (flow->task)", safe1) check("input: dataclass (flow->task)", safe2) check("input: open file handle (flow->task)", t_fh) check("input: sqlite connection (flow->task)", t_conn) check("input: lambda (flow->task)", t_lam) check("input: generator (flow->task)", t_gen) # And observe output side for a commonly large type check("output: DataFrame (task->flow)", df_future) print("=== End ===\n") # ------------------- # Runner-specific flows # ------------------- @flow(name="object-persistence-prefect", task_runner=PrefectTaskRunner(), log_prints=True) def demo_prefect_task_runner(): run_demo("PrefectTaskRunner (sequential, in-process; no serialization boundary)") @flow(name="object-persistence-threads", task_runner=ThreadPoolTaskRunner(max_workers=4), log_prints=True) def demo_threadpool_task_runner(): run_demo("ThreadPoolTaskRunner (threads, in-process; no serialization, but thread-affinity matters)") @flow(name="object-persistence-processes", task_runner=ProcessPoolTaskRunner(max_workers=2), log_prints=True) def demo_processpool_task_runner(): run_demo("ProcessPoolTaskRunner (processes; input/output must be pickleable)") if DaskTaskRunner is not None: @flow(name="object-persistence-dask", task_runner=DaskTaskRunner(), log_prints=True) def demo_dask_task_runner(): run_demo("DaskTaskRunner (distributed; input/output must be serializable)") else: def demo_dask_task_runner(): print("Dask not installed; pip install 'prefect[dask]' 'dask[distributed]'") if RayTaskRunner is not None: @flow(name="object-persistence-ray", task_runner=RayTaskRunner(), log_prints=True) def demo_ray_task_runner(): run_demo("RayTaskRunner (distributed; input/output must be serializable)") else: def demo_ray_task_runner(): print("Ray not installed; pip install 'prefect[ray]' 'ray'") if name == "__main__": demo_prefect_task_runner() demo_threadpool_task_runner() demo_processpool_task_runner() demo_dask_task_runner() demo_ray_task_runner() ``` What you’ll see (typical) - PrefectTaskRunner - PASS: open file, sqlite, lambda, generator (no serialization; same thread) - PASS: DataFrame return - ThreadPoolTaskRunner - PASS: open file handle, lambda, generator (no serialization; cross-thread is fine) - FAIL: sqlite connection (ProgrammingError: created in one thread, used in another) - PASS: DataFrame return - ProcessPoolTaskRunner, DaskTaskRunner, RayTaskRunner - FAIL: open file handle, sqlite connection, lambda, generator (not serializable/pickleable) - PASS: primitives, dataclass, DataFrame return Notes and links - Task runners: prefect/task_runners.py - Serializers (used for persisted results; separate from runner transport): prefect/serializers.py - Prefect 3.0 release notes: Prefect 3.0 release notes Want me to add an optional “between tasks” section that shows the same failures when chaining make_task -> use_task (to illustrate where the error appears: producer return vs consumer input)?
o
@Marvin is there a way to try except a failing task because of non serializable input object of as task?
m
thought for 222.1 seconds
Short answer: you can catch it, but only around the task call in your flow. You generally cannot catch it inside the task itself because the failure happens before the task body runs. Why: In Prefect 3.x, task inputs may be serialized when the task is submitted/scheduled (and/or when computing cache keys). If the argument can’t be serialized (e.g., open file handles, DB connections, sockets), Prefect raises a
SerializationError
before invoking your task function. How to handle it - Catch in the flow around the task call:
Copy code
from prefect import flow, task, get_run_logger
from prefect.exceptions import SerializationError

@task
def do_work(cfg):
    # construct heavy objects here instead of passing them in
    ...

@flow
def main(non_serializable_obj):
    logger = get_run_logger()
    try:
        # Prefer passing simple, serializable data (e.g., dicts, IDs)
        result = do_work({"host": non_serializable_obj.host})
    except SerializationError:
        logger.warning("Task input could not be serialized; applying fallback")
        # fallback/skip logic here
- Don’t pass non-serializable objects. Pass IDs, paths, or small config dicts and construct the heavy object inside the task. - If your failure is on the RETURN value of the task (not the input), use a result serializer that can handle complex objects:
Copy code
from prefect import task
from prefect.serializers import PickleSerializer

@task(result_serializer=PickleSerializer())
def produce_complex_result(...):
    ...
Note:
result_serializer
controls how the task’s return value is stored, not how inputs are serialized. - If you must pass a custom object, make it picklable (e.g., implement `__getstate__`/`__setstate__`) or convert it to a serializable proxy (dict/dataclass). Useful references - Serializers API: prefect.serializers - Exceptions (includes
SerializationError
): prefect.exceptions source If you can share the type of object you’re passing and the stack trace snippet, I can suggest the cleanest pattern (e.g., how to refactor or a serializer workaround) for your case.
o
@Marvin can you write me a decorator that tries to serialize object before calling the taks, and if not serializable, it should not command the task with some control logic?
m
thought for 70.7 seconds
Absolutely — here’s a reusable decorator that “pre-checks” your task’s inputs for serializability before submitting the task. If the inputs aren’t serializable, the decorator applies control logic instead of calling the task. Key features: - Uses Prefect’s serializers (JSON by default; optional pickle) - Ignores Prefect futures during the check to avoid false positives - Flexible control on failure: skip (return default), call a fallback (function or task), or raise Code: ``` from functools import wraps from typing import Any, Callable, Optional, Iterable from prefect import get_run_logger from prefect.exceptions import SerializationError from prefect.serializers import JSONSerializer, PickleSerializer try: # PrefectFuture exists in Prefect 3.x from prefect.futures import PrefectFuture except Exception: PrefectFuture = tuple() # fallback placeholder; won't match anything def _strip_futures(obj: Any) -> Any: # Replace Prefect futures with placeholders so pre-checks don't fail if isinstance(obj, PrefectFuture): return "<PrefectFuture>" if isinstance(obj, (list, tuple, set)): t = type(obj) return t(_strip_futures(x) for x in obj) if isinstance(obj, dict): return {k: _strip_futures(v) for k, v in obj.items()} return obj def precheck_serializable( serializer: Optional[Any] = "json", # "json" | "pickle" | Serializer instance on_fail: str = "skip", # "skip" | "fallback" | "raise" fallback: Optional[Callable] = None, # callable (can be a Prefect task) if on_fail="fallback" default: Any = None, # return if on_fail="skip" ): """ Wrap a callable (including a Prefect task) so that inputs are pre-serialized before submission. If inputs cannot be serialized, apply control logic. - serializer: "json" (default), "pickle", or a Serializer instance - on_fail: - "skip": log and return
default
- "fallback": call
fallback(*args, **kwargs)
(can be a task or function) - "raise": raise SerializationError """ if serializer in (None, "json"): ser = JSONSerializer() elif serializer == "pickle": ser = PickleSerializer() elif hasattr(serializer, "dumps"): ser = serializer else: raise ValueError("serializer must be 'json', 'pickle', or a Serializer instance") def decorator(callable_obj: Callable): @wraps(callable_obj) def wrapper(*args, **kwargs): payload = {"args": _strip_futures(args), "kwargs": _strip_futures(kwargs)} try: # Attempt to serialize inputs with the chosen serializer ser.dumps(payload) except Exception as exc: # Try to log using a run logger if we're in a flow/task context try: logger = get_run_logger() logger.warning( f"Skipping call to {getattr(callable_obj, '__name__', str(callable_obj))} " f"due to non-serializable input: {exc!r}" ) except Exception: # Best-effort logging outside of run context print( f"[precheck_serializable] Non-serializable input, skipping " f"{getattr(callable_obj, '__name__', str(callable_obj))}: {exc!r}" ) if on_fail == "skip": return default elif on_fail == "fallback": if fallback is None: raise SerializationError( f"Inputs failed pre-serialization and no fallback provided; original error: {exc}" ) from exc return fallback(*args, **kwargs) elif on_fail == "raise": raise SerializationError("Inputs failed pre-serialization") from exc
else: raise ValueError("on_fail must be 'skip', 'fallback', or 'raise'") # If serialization succeeded, proceed to call the original callable return callable_obj(*args, **kwargs) return wrapper return decorator
Copy code
How to use it

- Wrap a Prefect task to prevent submission when inputs aren’t serializable:
from prefect import task, flow @task def do_work(cfg): return "ok" @task def fallback_work(cfg): return "used fallback" # Wrap the task callable so the check happens BEFORE submission do_work_checked = precheck_serializable( serializer="json", on_fail="fallback", # or "skip" or "raise" fallback=fallback_work, # can be a task (returns a Future) or a function (returns a value) )(do_work) @flow def main(cfg): # If cfg isn't serializable, we won't submit do_work; we'll call fallback_work instead fut = do_work_checked(cfg) return fut
Copy code
Notes and caveats
- If you choose on_fail="skip", the decorator returns `default` (a plain value). If downstream logic expects a PrefectFuture, prefer providing a fallback task so you still get a Future.
- This is a best-effort pre-check. Passing this check doesn’t guarantee that Prefect won’t encounter a different serialization issue later, but it catches common cases early.
- If you know you need to persist complex task results, set a `result_serializer` on the task (this only affects the return value):
from prefect.serializers import PickleSerializer @task(result_serializer=PickleSerializer()) def returns_complex(...): ... ``` Docs - Serializers: prefect.serializers - Exceptions (includes SerializationError): prefect.exceptions source If you tell me your preferred failure behavior (skip vs fallback vs raise) and which serializer to check against (json vs pickle), I can tailor the decorator or give you a ready-to-use snippet for your exact case.
o
@Marvin would this kind of wrapper for taks work, to test the first object inserted in the task def test_serializable(obj, label: str): try: cloudpickle.dumps(obj) print(color(f"SERIALIZABLE: {label}", "PASS")) return True except Exception as e: print(color(f"NOT SERIALIZABLE: {label} -> {type(e).name}: {e}", "FAIL")) return False def require_first_arg_serializable(func): @wraps(func) def wrapper(*args, **kwargs): if not args: print(color("No arguments provided, skipping function execution.", "FAIL")) return None first_arg = args[0] label = func.name + " first argument" if test_serializable(first_arg, label): return func(*args, **kwargs) else: print(color(f"Skipping execution of '{func.name}' due to unserializable first argument.", "FAIL")) return None return wrapper
m
thought for 57.9 seconds
Yes, that pattern can work — with a couple of Prefect-specific caveats: - Put your wrapper OUTSIDE the
@task
decorator so it runs before Prefect submits the task. Example: - Good:
@require_first_arg_serializable
above
@task
- Not good: wrapping the plain function then applying
@task
(the check would run inside the task, too late) - Returning
None
means you will not get a
PrefectFuture
. If downstream code expects a future, consider returning a fallback task call instead. Here’s a minimal version of your idea that checks the first positional arg with cloudpickle and skips calling the task if it’s not serializable:
Copy code
import cloudpickle
from functools import wraps
from prefect import task, get_run_logger

def test_serializable(obj, label: str):
    try:
        cloudpickle.dumps(obj)
        print(f"SERIALIZABLE: {label}")
        return True
    except Exception as e:
        print(f"NOT SERIALIZABLE: {label} -> {type(e).__name__}: {e}")
        return False

def require_first_arg_serializable(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        if not args:
            print("No arguments provided, skipping function execution.")
            return None
        first_arg = args[0]
        label = f"{func.__name__} first argument"

        if test_serializable(first_arg, label):
            return func(*args, **kwargs)
        else:
            # Optional: try to log via Prefect if in a run context
            try:
                logger = get_run_logger()
                logger.warning(f"Skipping '{func.__name__}' due to unserializable first argument.")
            except Exception:
                print(f"Skipping execution of '{func.__name__}' due to unserializable first argument.")

            # Return None; if you need a PrefectFuture, call a fallback Prefect task instead
            return None
    return wrapper

@require_first_arg_serializable  # outermost so the check happens BEFORE Prefect submits the task
@task
def my_task(obj, label: str):
    # task body
    return f"processed {label}"
If you need a PrefectFuture even when skipping, add a small fallback task and call it instead of returning `None`:
Copy code
@task
def noop(value=None):
    return value

def require_first_arg_serializable(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        if not args:
            return noop(None)  # returns a PrefectFuture
        if test_serializable(args[0], f"{func.__name__} first argument"):
            return func(*args, **kwargs)
        else:
            return noop(None)  # returns a PrefectFuture instead of None
    return wrapper
Important notes - cloudpickle is a good “preflight” check, but Prefect may still perform other serialization steps (e.g., when computing cache keys or persisting results). If your object is picklable but not JSON-serializable and some part of your run uses JSON, you could still fail later. If needed, switch your task’s result serializer to pickle:
Copy code
from prefect.serializers import PickleSerializer

@task(result_serializer=PickleSerializer())
def my_task(...):
    ...
Refs: - Serializers: prefect.serializers - Exceptions: prefect.exceptions source If you share the type of object you’re passing, I can help tailor the check (e.g., ignore Prefect futures, check kwargs, or use JSON vs pickle depending on your pipeline).
o
@Marvin so one moment, why use .submit instead of regular decorated task execution?
m
thought for 251.3 seconds
Great question — you don’t have to use .submit. Use it only when you need concurrency or a future. - my_task(...): runs synchronously and returns the actual result. Good for simple, sequential logic. - my_task.submit(...): submits the task to the task runner and returns a PrefectFuture immediately. Use this for parallelism (fan-out/fan-in), passing unresolved results to other tasks, or when you want to manage task state/futures explicitly. Why I mentioned .submit for your wrapper - If the rest of your flow uses .submit and expects PrefectFutures, a “skip” path that returns None can break downstream code. In that case, the wrapper should return a fallback submitted task (or a no-op task) so callers still get a PrefectFuture. - If you call tasks directly (no .submit), returning a plain value (or None) is fine and simpler. This applies in Prefect 3.x where direct task calls are blocking and .submit enables concurrency. More on task runners and futures: - Task runners: Task runners - Concurrency patterns blog: Scaling to thousands of parallel tasks If you tell me whether your flow uses futures or runs sequentially, I can tweak the wrapper to return either a plain value or a PrefectFuture on the “skip” path.
o
@Marvin does task or task submit behave differently for sqlite connection as input?
m
thought for 107.5 seconds
Short answer: passing a sqlite3.Connection as a task argument is unsafe in both cases. It can “seem” to work with a direct call, but it’s brittle; with .submit it’s more likely to fail. Why - Serialization: sqlite3.Connection objects are not picklable/JSON-serializable. Prefect may serialize or hash inputs for caching/parameter storage, which can raise errors in either call style. - Threads: if you use .submit with a threaded task runner, the task runs in another thread. SQLite enforces “same thread” by default, so you’ll likely get “SQLite objects created in a thread can only be used in that same thread”. A direct call runs inline in the same thread, so it might appear to work, but any later feature that serializes/inspects inputs can still break. Recommended pattern - Don’t pass the connection. Pass the DB path/DSN, and open/close the connection inside the task.
Copy code
from prefect import task, flow

@task
def query_sqlite(db_path: str, sql: str, params=None):
    import sqlite3
    conn = sqlite3.connect(db_path)  # same thread as the task
    try:
        cur = conn.execute(sql, params or ())
        return cur.fetchall()
    finally:
        conn.close()

@flow
def main():
    # Works with submit (parallel) or direct calls (sequential)
    f1 = query_sqlite.submit("my.db", "select 1")
    f2 = query_sqlite.submit("my.db", "select 2")
    return f1.result(), f2.result()
If you absolutely must accept a connection-like object: - Expect it to fail with .submit (thread boundary) and likely fail if Prefect tries to hash/serialize inputs. - You can reduce risk by disabling input-based caching on that task:
Copy code
from prefect import task
from prefect.cache_policies import NO_CACHE

@task(cache_policy=NO_CACHE)
def uses_conn(conn):
    ...
…but you’ll still be vulnerable to thread/process boundaries and any other feature that inspects parameters. References - Task runners and concurrency: Task runners - Serializers: prefect.serializers If you’re seeing a specific error (e.g., the SQLite “same thread” error or a SerializationError), share the stack trace and I’ll tailor the fix.
o
@Marvin for me it stays unclear why ProcessPoolTaskRunner and ThreadPoolTaskRunner can both cope with numpy objects
m
thought for 31.0 seconds
They “both work” with NumPy because of how the data gets to the task: - ThreadPoolTaskRunner (threads): runs tasks in the same process. Your Python objects (including NumPy arrays) stay in memory—no inter-process serialization is needed to call the task. So NumPy “just works.” - ProcessPoolTaskRunner (processes): uses multiprocessing, which pickles arguments/results to move them between processes. NumPy arrays implement the pickle protocol, so they can be serialized/deserialized and passed across processes. It works, but copies data and can be slow for large arrays. A few important caveats: - Serialization cost: With processes, large arrays incur CPU/memory overhead to pickle/unpickle and copy between processes. With threads, there’s no IPC copy, so it’s cheaper. - Persistence/JSON: If you persist task results or otherwise use a JSON-only path, NumPy arrays will not serialize with JSON. Use a pickle-based result serializer or cast to a JSON-friendly form:
Copy code
from prefect import task
  from prefect.serializers import PickleSerializer

  @task(result_serializer=PickleSerializer())
  def compute(arr):
      ...
- Caching/hash: Prefect may hash inputs for caching. Hashing typically uses a pickle-based byte representation, so NumPy arrays are fine here. But if you introduce custom JSON serializers for parameters, expect failures. Rules of thumb: - For large arrays, prefer threads locally to avoid pickling overhead; use processes only if you need CPU-bound parallelism and accept the copy cost. - For distributed/remote execution (e.g., workers, containers), always assume serialization is required; choose serializers that support your types or convert arrays to a portable form. Docs: - Task runners: Task runners - Serializers: prefect.serializers If you share your exact setup (runner, array sizes, whether results are persisted), I can suggest the most performant pattern.
o
@Marvin what datatypes in machine learning and ai are typically non-serialisable?
m
thought for 49.2 seconds
Common culprits in ML/AI that are not (or are poorly) serializable JSON-unsafe (and often pickle-unsafe) live resources - Database handles: sqlite3.Connection, SQLAlchemy Engine/Session, psycopg2 connections - Network clients: requests.Session, aiohttp.ClientSession, boto3 clients/resources, Kafka producers/consumers - Open resources: file handles, h5py.File, cv2.VideoCapture, memory-mapped files GPU/device-bound objects - CUDA contexts/streams, cuBLAS/cuDNN handles, TensorRT engines/contexts - PyTorch CUDA tensors (move to CPU before pickling:
tensor.cpu()
), device RNG states - JAX device buffers (device-backed arrays), OpenCL contexts Framework sessions/executors/runtimes - TensorFlow 1.x Graph/Session; tf.data.Dataset iterators - SparkSession, PySpark DataFrames/RDDs (pass identifiers; recreate in task) - Event loops and async primitives: asyncio loop/Task, generators/coroutines Iterators and streaming datasets - Python generators/itertools, file iterators - torch.utils.data.DataLoader iterators/workers - Hugging Face streaming datasets (iterable datasets) Concurrency primitives and system objects - Thread/process pools, multiprocessing.Queue/Lock, threading.Lock/RLock - Loggers with live handlers, tqdm progress bars with live output Models and training state (use framework-native save/load instead) - Keras/TensorFlow models: prefer `model.save()`/SavedModel over pickle - PyTorch nn.Module: can often pickle, but prefer
state_dict
+ `load_state_dict`; CUDA adds complexity - XGBoost/LightGBM/CatBoost models: use `.save_model()`/`.load_model()` rather than Python pickle (version sensitive) - spaCy pipelines: use
nlp.to_bytes()/from_bytes()
, not pickle - Tokenizers: many are picklable, but safer to reload via pretrained name/path Arrow/columnar and big tables - PyArrow Dataset scanners, file-system handles; materialize to Table/RecordBatch first - Dask/Ray futures/objects (framework-specific serializers; not plain pickle/JSON) Guidelines that work well in Prefect 3.x - Don’t pass live handles; pass small, serializable config (paths, DSNs, model names) and construct inside the task. - For arrays/tensors: - JSON path: convert to lists or a compact dtype+bytes representation - Pickle path: prefer CPU numpy arrays or CPU torch tensors; set a pickle-based result serializer when returning them
Copy code
from prefect.serializers import PickleSerializer

    @task(result_serializer=PickleSerializer())
    def compute(...): ...
- Avoid passing iterators/generators; pass parameters to recreate them or materialize to concrete data. - If you must pass a custom object, implement
__getstate__/__setstate__
or provide `.to_bytes()`/`.from_bytes()` helpers and wrap them. Reference - Prefect serializers: prefect.serializers If you share which objects you’re passing (e.g., CUDA tensors, sessions, DataLoader, Spark DF), I can suggest the safest conversion or a serializer strategy for your flow.
o
@Marvin actually handing over sqlite conn obhect doesnt work for ThreadPoolTaskRunner
m
thought for 29.3 seconds
You’re right — passing a sqlite3.Connection to a task won’t work with ThreadPoolTaskRunner. Why it fails - Python’s sqlite3 enforces “same thread” by default: connections must be used in the thread where they were created. ThreadPoolTaskRunner executes the task in a different thread, so you’ll hit “SQLite objects created in a thread can only be used in that same thread”. - Even aside from threading, connection objects are live resources that shouldn’t be passed around tasks. Recommended patterns 1) Open the connection inside the task (best) - Pass the DB path/URI (a simple string), not the connection.
Copy code
from prefect import task, flow

@task
def run_query(db_path: str, sql: str, params=None):
    import sqlite3
    conn = sqlite3.connect(db_path)
    try:
        # Optional: improve concurrent access characteristics
        conn.execute("PRAGMA journal_mode=WAL;")
        conn.execute("PRAGMA busy_timeout=5000;")
        cur = conn.execute(sql, params or ())
        return cur.fetchall()
    finally:
        conn.close()

@flow
def main():
    f1 = run_query.submit("my.db", "select 1")
    f2 = run_query.submit("my.db", "select 2")
    return f1.result(), f2.result()
2) If you must share a single connection across threads (not recommended) - Create it with
check_same_thread=False
and serialize access with a lock. This is fragile and easy to get wrong; SQLite may still lock under write load.
Copy code
import sqlite3, threading
conn = sqlite3.connect("my.db", check_same_thread=False)
lock = threading.Lock()

@task
def run_query_shared(sql: str, params=None):
    with lock:
        cur = conn.execute(sql, params or ())
        return cur.fetchall()
Caveats: still a single-process solution; won’t survive process boundaries, and concurrency is effectively serialized by the lock. 3) If you’re using an in-memory DB - Each connection gets its own DB by default. To share in-memory across connections, use a shared-cache URI: db_path = "file:memdb1?mode=memory&cache=shared" conn = sqlite3.connect(db_path, uri=True) You still need to open per-task connections and remain in one process. 4) If tasks must share a DB but you want to avoid contention - Keep DB I/O in one task (sequential) and fan out CPU work in others. - Or enforce concurrency=1 for DB-tagged tasks using global concurrency limits: Global concurrency limits - Or just call the DB task without .submit (run sequentially). Related docs - Task runners overview: Task runners If you share your exact error text and how you create the connection (file vs in-memory), I can suggest the safest change (PRAGMA settings, connection-per-task, or a small refactor).
o
@Marvin you beat around the bush, at first you say that threading allows for all types of objects, but now you are saying that threading does not support conn. objects. give 3 examples that will work with threading and wont with processpooltaskrunner in the context of mahine learning.
m
thought for 38.3 seconds
You’re right to call that out — I overgeneralized. Threads do not “magically” accept all objects; they just avoid inter-process pickling. Some objects still have thread-affinity rules (like sqlite3 connections), so they can’t cross threads either. Here’s what you asked for: three concrete ML/AI examples that typically work with ThreadPoolTaskRunner (same process, pass-by-reference) but fail with ProcessPoolTaskRunner (separate processes, requires pickling). 1) PyTorch model on GPU (cuda) and live CUDA tensors - ThreadPool: OK for inference within one process. You can share a loaded
torch.nn.Module
and
torch.Tensor
references across threads if your usage is thread-safe (often is for pure inference). - ProcessPool: Fails because the live model/tensors and CUDA context are not safely picklable/transferable. You’ll see pickling errors or CUDA initialization/memory issues. - Workaround for processes: pass a model path/config and load the model inside each process; move tensors to CPU or reconstruct them from bytes/np arrays inside the task. 2) Hugging Face transformers Pipeline (especially on GPU) - ThreadPool: OK to share a loaded
transformers.Pipeline
across threads for inference. - ProcessPool: Typically fails to pickle the pipeline (complex object graph with device-bound tensors, file handles, and non-picklable components). - Workaround for processes: pass the model identifier and device config; re-create the pipeline inside the task process. 3) spaCy Language pipeline (nlp) - ThreadPool: OK to use a loaded
spacy.Language
object across threads for inference (tokenization, NER, etc.). spaCy is generally thread-safe for read-only pipeline use. - ProcessPool: Not picklable by default; sending a live
nlp
object to another process will error. spaCy recommends loading the pipeline per process. - Workaround for processes: pass the model name/path (e.g., "en_core_web_sm") and call
spacy.load(...)
inside the task. Key takeaway - ThreadPoolTaskRunner: no inter-process serialization, so many complex in-memory objects can be shared by reference if the library is thread-safe and doesn’t enforce same-thread rules (sqlite does). - ProcessPoolTaskRunner: every argument/return must be picklable; objects tied to GPU contexts, C++ handles, sockets, or complex Python graphs usually are not. General patterns in Prefect 3.x - For threads: you can pre-load heavy models once (module/global scope) and use them in tasks, but ensure thread-safety. - For processes: pass small, serializable parameters (paths/model names/config) and construct the heavy object inside each task. - If you persist complex results, set a pickle-based result serializer on the task:
Copy code
from prefect import task
from prefect.serializers import PickleSerializer

@task(result_serializer=PickleSerializer())
def infer(...):
    ...
Docs for context - Task runners: Task runners If you share your exact runner choice and objects (e.g., torch model + GPU, transformers pipeline, spaCy nlp), I can sketch the safest pattern (preload vs construct-in-task, locks, or per-process init).
o
@Marvin what does it want from me? t/object_persistence/limits_of_object_presence.py", line 26, in <module> my_flow("World") ~~~~~~~^^^^^^^^^ File "/home/labor/.local/lib/python3.13/site-packages/prefect/flows.py", line 1702, in call return run_flow( flow=self, ...<2 lines>... return_type=return_type, ) File "/home/labor/.local/lib/python3.13/site-packages/prefect/flow_engine.py", line 1552, in run_flow ret_val = run_flow_sync(**kwargs) File "/home/labor/.local/lib/python3.13/site-packages/prefect/flow_engine.py", line 1394, in run_flow_sync with engine.run_context(): ~~~~~~~~~~~~~~~~~~^^ File "/usr/local/lib/python3.13/contextlib.py", line 141, in enter return next(self.gen) File "/home/labor/.local/lib/python3.13/site-packages/prefect/flow_engine.py", line 772, in run_context with self.setup_run_context(): ~~~~~~~~~~~~~~~~~~~~~~^^ File "/usr/local/lib/python3.13/contextlib.py", line 141, in enter return next(self.gen) File "/home/labor/.local/lib/python3.13/site-packages/prefect/flow_engine.py", line 612, in setup_run_context task_runner = stack.enter_context(self.flow.task_runner.duplicate()) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AttributeError: 'str' object has no attribute 'duplicate'
m
thought for 20.9 seconds
The error means your flow’s task_runner is a string, but Prefect expects a TaskRunner object. You likely did something like:
Copy code
@flow(task_runner="ThreadPoolTaskRunner")  # <-- string
def my_flow(name): ...
Prefect tries to call
.duplicate()
on the task runner, which strings don’t have, hence: 'str' object has no attribute 'duplicate'. Fix: pass an actual TaskRunner instance from prefect.task_runners.
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner

@task
def greet(who: str):
    return f"Hello, {who}!"

@flow(task_runner=ThreadPoolTaskRunner(max_workers=4))  # correct: instance, not string
def my_flow(name: str):
    # use .submit() if you want concurrency
    fut = greet.submit(name)
    return fut.result()

if __name__ == "__main__":
    my_flow("World")
You can also switch runners without redecorating using with_options:
Copy code
my_flow_proc = my_flow.with_options(task_runner=ProcessPoolTaskRunner(max_workers=2))
my_flow_proc("World")
Docs for context: - Task runners overview: Task runners If you share the exact decorator line you used, I can confirm the minimal change.
o
how can i show that there is serialisation happening between processes? is there some form of debug view that allows me to see that serialisation is happening in ProcessPoolTaskRunner and not in ThreadPoolTaskRunner
@Marvin how can i show that there is serialisation happening between processes? is there some form of debug view that allows me to see that serialisation is happening in ProcessPoolTaskRunner and not in ThreadPoolTaskRunner
m
thought for 40.8 seconds
Short answer: there isn’t a built-in “serialization viewer” in Prefect, but you can prove it very clearly. ProcessPoolTaskRunner must pickle arguments/results to move them between processes; ThreadPoolTaskRunner stays in-process and does not pickle. Three practical ways to show it: 1) Use a probe object that logs when it’s pickled/unpickled - Define a class with __getstate__/__setstate__ that prints and writes a marker file. You’ll see those fire only with ProcessPool.
Copy code
from prefect import task, flow
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner

class Probe:
    def __init__(self, tag): self.tag = tag
    def __getstate__(self):
        print(f"[PROBE] pickling {self.tag}")
        with open(f"/tmp/probe_pickled_{self.tag}", "w") as f: f.write("pickled")
        return {"tag": self.tag}
    def __setstate__(self, state):
        print(f"[PROBE] unpickling {state['tag']}")
        with open(f"/tmp/probe_unpickled_{state['tag']}", "w") as f: f.write("unpickled")
        self.tag = state["tag"]

@task(log_prints=True)
def echo_probe(p: Probe):
    print(f"[TASK] tag={p.tag}, id={id(p)}")
    return id(p)

@flow(log_prints=True, task_runner=ThreadPoolTaskRunner())
def thread_flow():
    p = Probe("thread")
    rid = echo_probe.submit(p).result()
    print(f"[FLOW] thread result id={rid}")

@flow(log_prints=True, task_runner=ProcessPoolTaskRunner())
def process_flow():
    p = Probe("process")
    rid = echo_probe.submit(p).result()
    print(f"[FLOW] process result id={rid}")

if __name__ == "__main__":
    thread_flow()
    process_flow()
Expected: - Thread flow: no “[PROBE] pickling/unpickling” prints; no /tmp/probe_* files; object id in task likely equals the id in the flow (same object). - Process flow: you’ll see “[PROBE] pickling …” in the parent, “[PROBE] unpickling …” in the worker, and the /tmp files will be created. The object id inside the task will differ (it’s a deserialized copy). 2) Show a non-picklable object works with threads but fails with processes - Lambdas and many live handles aren’t picklable. With threads, they pass by reference; with processes, you get a pickling error.
Copy code
from prefect import task, flow
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner
from prefect.cache_policies import NO_CACHE  # avoid hashing inputs for this demo

@task(log_prints=True, cache_policy=NO_CACHE)
def call_fn(fn):
    print(f"fn result: {fn(2)}")

@flow(task_runner=ThreadPoolTaskRunner())
def thread_ok():
    call_fn.submit(lambda x: x + 1).result()  # OK

@flow(task_runner=ProcessPoolTaskRunner())
def process_fails():
    # Will raise "Can't pickle local object ..." from the worker boundary
    call_fn.submit(lambda x: x + 1).result()

if __name__ == "__main__":
    thread_ok()
    process_fails()
3) Compare object identity across runners - The same object will have the same identity in threads; in processes it’ll be a different instance after unpickling. The first probe example prints ids to show this. Extra tips - Turn up logging to see runner details: - Set env var PREFECT_LOGGING_LEVEL=DEBUG - You’ll see logs like “Using ProcessPoolTaskRunner …” and task submission messages, but there isn’t a specific “serializing inputs now” log line. - If you need rock-solid evidence beyond prints, you can write timestamps/markers to files in __getstate__/__setstate__ as shown, or use logging to a known file to prove pickling/unpickling occurs only under the process pool. If you paste your current minimal example, I can adapt the probe pattern directly into it so you can run A/B with ThreadPool vs ProcessPool.
o
@Marvin both work without any error.
m
thought for 33.9 seconds
Got it — the reason both of your tests “work” is that Prefect’s ProcessPoolTaskRunner uses cloudpickle under the hood. cloudpickle can serialize many things that the stdlib pickle cannot (e.g., lambdas, local functions), so you won’t see failures there. That doesn’t mean there’s no serialization; it just means serialization succeeds. How to prove serialization is happening in ProcessPool but not in ThreadPool - Key idea: intercept pickling via __reduce_ex__ and show: - It is NOT called with threads - It IS called with processes - Also print PIDs to show the task ran in a different process. Use this minimal, robust probe. Note: we disable task input caching to avoid Prefect hashing the input (which may also pickle) even in the thread case.
Copy code
import os
from functools import wraps
from prefect import task, flow
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner
from prefect.cache_policies import NO_CACHE

class Probe:
    def __init__(self, tag):
        self.tag = tag

    def __reduce_ex__(self, protocol):
        # This is called during pickling (process pool boundary)
        print(f"[PROBE] __reduce_ex__ called for tag={self.tag} in pid={os.getpid()}")
        with open(f"/tmp/probe_reduced_{self.tag}", "w") as f:
            f.write(f"reduced in pid {os.getpid()}")
        return (Probe._reconstruct, (self.tag,))

    @staticmethod
    def _reconstruct(tag):
        print(f"[PROBE] reconstruct tag={tag} in pid={os.getpid()}")
        return Probe(tag)

@task(log_prints=True, cache_policy=NO_CACHE)
def where_am_i(p: Probe):
    print(f"[TASK] pid={os.getpid()} id(p)={id(p)} tag={p.tag}")
    return id(p)

@flow(log_prints=True, task_runner=ThreadPoolTaskRunner())
def thread_flow():
    p = Probe("thread")
    print(f"[FLOW] THREAD pid={os.getpid()} id(p)={id(p)}")
    rid = where_am_i.submit(p).result()
    print(f"[FLOW] THREAD result id={rid}")

@flow(log_prints=True, task_runner=ProcessPoolTaskRunner(max_workers=2))
def process_flow():
    p = Probe("process")
    print(f"[FLOW] PROCESS pid={os.getpid()} id(p)={id(p)}")
    rid = where_am_i.submit(p).result()
    print(f"[FLOW] PROCESS result id={rid}")

if __name__ == "__main__":
    thread_flow()
    process_flow()
What you should observe - ThreadPoolTaskRunner: - No “[PROBE] __reduce_ex__ …” or “reconstruct” prints - No /tmp/probe_reduced_thread file - [TASK] pid equals the flow’s pid (same process) - id(p) in flow and task may match (same object by reference), or at least there is no pickling evidence - ProcessPoolTaskRunner: - You’ll see “[PROBE] __reduce_ex__ …” and “reconstruct …” - /tmp/probe_reduced_process file is created - [TASK] pid differs from the flow’s pid (different process) - id(p) in task differs from flow (it’s a deserialized copy) If you still don’t see the reduce prints in the process case: - Double-check you are actually using ProcessPoolTaskRunner in the flow decorator and calling .submit (not a direct call). - Ensure the task has cache_policy=NO_CACHE as above; otherwise, Prefect’s input hashing may pickle in both cases, confusing the signal. - Set PREFECT_LOGGING_LEVEL=DEBUG to confirm “Using ProcessPoolTaskRunner…” appears in logs. Why your earlier “both work” examples didn’t fail - Prefect’s process pool uses cloudpickle, which can serialize lambdas, closures, many ML objects, etc. That’s why you didn’t get errors. The probe above does not rely on errors; it shows serialization via side effects and PID changes. If you want, paste your current snippet and I’ll adapt the probe into it so you can run an apples-to-apples comparison.
o
@Marvin But still this example does not show the serialisation under processpooltaskrunner ... make it simpler
m
thought for 43.6 seconds
Got it — here’s the simplest possible demo that shows the serialization boundary with ProcessPoolTaskRunner. Example that FAILS (passing a non-pickleable object):
Copy code
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
import threading

@task
def use_lock(lock):
    # Just try to use the lock so it isn't optimized away
    lock.acquire()
    lock.release()
    return "ok"

@flow(task_runner=ProcessPoolTaskRunner())
def my_flow():
    lock = threading.Lock()  # NOT pickleable
    use_lock.submit(lock)    # This will fail to serialize across processes

if __name__ == "__main__":
    my_flow()
What you’ll see: a pickling error like “TypeError: cannot pickle '_thread.lock' object” — because arguments/results must be pickleable when using process-based execution. Fixed version (create the resource inside the task, pass only pickleable data):
Copy code
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
import threading

@task
def make_and_use_lock():
    lock = threading.Lock()  # Created inside the process, no pickling needed
    lock.acquire()
    lock.release()
    return "ok"

@flow(task_runner=ProcessPoolTaskRunner())
def my_flow():
    make_and_use_lock.submit()  # No non-pickleable args cross the boundary

if __name__ == "__main__":
    my_flow()
Notes: - With ProcessPoolTaskRunner, task functions, their inputs, and their outputs must be pickleable. - If you switch to ThreadPoolTaskRunner, the first example will work because threads share memory and don’t require pickling. - Always define task/flow functions at module top-level and avoid passing non-serializable objects (open files, locks, DB connections, sessions, etc.). Create those inside the task instead. If you want, I can tailor this to your exact object (e.g., a DB client or session) and show the minimal fix for that case.
o
@Marvin you need to compare it to ThreadPoolTaskRunner otherwise is the comparision not a comparison
m
thought for 41.6 seconds
You’re right — here’s a minimal side-by-side that clearly shows the difference between ThreadPoolTaskRunner (no serialization boundary) and ProcessPoolTaskRunner (requires pickleable args/results). Assumes Prefect 3.x. Try these two flows separately:
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner
import threading

@task
def use_lock(lock):
    # Touch the lock so it's actually used
    lock.acquire()
    lock.release()
    return "ok"

# 1) Works: threads share memory, no pickling needed
@flow(task_runner=ThreadPoolTaskRunner())
def thread_flow():
    lock = threading.Lock()           # not pickleable, but fine in threads
    fut = use_lock.submit(lock)       # shared in-memory reference
    print("ThreadPool result:", fut.result())

# 2) Fails: processes require pickleable args/results
@flow(task_runner=ProcessPoolTaskRunner())
def process_flow():
    lock = threading.Lock()           # not pickleable
    fut = use_lock.submit(lock)       # will try to pickle and fail
    print("ProcessPool result:", fut.result())

if __name__ == "__main__":
    # Run one at a time to see the behavior
    thread_flow()   # expect: prints "ThreadPool result: ok"
    # process_flow() # expect: TypeError: cannot pickle '_thread.lock' object
Fix for ProcessPoolTaskRunner (create unpickleable resources inside the task; pass only pickleable data across the boundary):
Copy code
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
import threading

@task
def make_and_use_lock():
    lock = threading.Lock()  # created inside the worker process
    lock.acquire()
    lock.release()
    return "ok"

@flow(task_runner=ProcessPoolTaskRunner())
def process_flow_fixed():
    fut = make_and_use_lock.submit()  # no unpickleable args cross processes
    print("ProcessPool result:", fut.result())

if __name__ == "__main__":
    process_flow_fixed()  # expect: prints "ProcessPool result: ok"
Key takeaways: - ThreadPoolTaskRunner: tasks execute in the same process; objects are shared by reference, no pickling. - ProcessPoolTaskRunner: tasks execute in separate processes; function, args, and results must be pickleable. References: - prefect.task_runners source - prefect.serializers source If you have a specific object (e.g., a DB client or session) you want to pass, share that and I’ll show the smallest safe pattern for each runner.
o
@Marvin the threaded version also leads to errors... Task run 'use_lock-2f6' - Error encountered when computing cache key - result will not be persisted. Traceback (most recent call last): File "/home/labor/.local/lib/python3.13/site-packages/prefect/cache_policies.py", line 374, in compute_key return hash_objects(hashed_inputs, raise_on_failure=True) File "/home/labor/.local/lib/python3.13/site-packages/prefect/utilities/hashing.py", line 89, in hash_objects raise HashError(msg) prefect.exceptions.HashError: Unable to create hash - objects could not be serialized. JSON error: Unable to serialize unknown type: <class '_thread.lock'> Pickle error: cannot pickle '_thread.lock' object The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/home/labor/.local/lib/python3.13/site-packages/prefect/task_engine.py", line 171, in compute_transaction_key key = self.task.cache_policy.compute_key( task_ctx=task_run_context, inputs=self.parameters or {}, flow_parameters=parameters or {}, ) File "/home/labor/.local/lib/python3.13/site-packages/prefect/cache_policies.py", line 214, in compute_key policy_key = policy.compute_key( task_ctx=task_ctx, ...<2 lines>... **kwargs, ) File "/home/labor/.local/lib/python3.13/site-packages/prefect/cache_policies.py", line 384, in compute_key raise ValueError(msg) from exc ValueError: Unable to create hash - objects could not be serialized. JSON error: Unable to serialize unknown type: <class '_thread.lock'> Pickle error: cannot pickle '_thread.lock' object This often occurs when task inputs contain objects that cannot be cached like locks, file handles, or other system resources. To resolve this, you can: 1. Exclude these arguments by defining a custom
cache_key_fn
2. Disable caching by passing
cache_policy=NO_CACHE
m
thought for 70.2 seconds
Great callout — that error is from Prefect’s caching layer, not from threading itself. Even with ThreadPoolTaskRunner, Prefect’s default cache policy hashes task inputs to compute a cache key; hashing tries to serialize the args and chokes on a
_thread.lock
. Two minimal fixes for the threaded case: - Disable caching for that task - Or provide a cache_key_fn that ignores the unpickleable arg Side‑by‑side comparison 1) ThreadPoolTaskRunner: works if you disable caching on the task
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
from prefect.cache_policies import NO_CACHE
import threading

@task(cache_policy=NO_CACHE)  # prevent hashing of inputs
def use_lock(lock):
    lock.acquire()
    lock.release()
    return "ok"

@flow(task_runner=ThreadPoolTaskRunner())
def thread_flow():
    lock = threading.Lock()            # not pickleable, but fine with threads
    print(use_lock.submit(lock).result())

if __name__ == "__main__":
    thread_flow()  # prints "ok" with no cache key errors
Alternative for threads (custom cache key that ignores the lock):
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner
import threading

def no_cache_key(ctx, inputs):
    return None  # skip caching for this run

@task(cache_key_fn=no_cache_key)
def use_lock(lock):
    lock.acquire()
    lock.release()
    return "ok"

@flow(task_runner=ThreadPoolTaskRunner())
def thread_flow():
    lock = threading.Lock()
    print(use_lock.submit(lock).result())
2) ProcessPoolTaskRunner: even with caching disabled, passing the lock will fail due to the process boundary
Copy code
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner
from prefect.cache_policies import NO_CACHE
import threading

@task(cache_policy=NO_CACHE)
def use_lock(lock):
    lock.acquire()
    lock.release()
    return "ok"

@flow(task_runner=ProcessPoolTaskRunner())
def process_flow():
    lock = threading.Lock()            # not pickleable -> will still fail
    print(use_lock.submit(lock).result())

if __name__ == "__main__":
    process_flow()  # expect: TypeError cannot pickle '_thread.lock' object
ProcessPool fixed version (create the resource inside the task; pass only pickleable data):
Copy code
from prefect import flow, task
from prefect.task_runners import ProcessPoolTaskRunner

@task
def make_and_use_lock():
    import threading
    lock = threading.Lock()
    lock.acquire()
    lock.release()
    return "ok"

@flow(task_runner=ProcessPoolTaskRunner())
def process_flow_fixed():
    print(make_and_use_lock.submit().result())

if __name__ == "__main__":
    process_flow_fixed()  # prints "ok"
Summary: - ThreadPoolTaskRunner: same-process, shared memory. You must disable or customize caching if inputs are not serializable. - ProcessPoolTaskRunner: separate processes. All task functions/inputs/outputs must be pickleable regardless of caching. References: - prefect.task_runners source - prefect.cache_policies source
o
@Marvin in one script. the example must show both worlds.
m
thought for 23.3 seconds
Totally — here’s a single, self-contained script that shows both worlds side-by-side: - ThreadPoolTaskRunner: works when you disable caching (to avoid hashing the lock) - ProcessPoolTaskRunner: fails when passing a lock (serialization boundary) - ProcessPoolTaskRunner fixed: works when the lock is created inside the task
Copy code
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner
from prefect.cache_policies import NO_CACHE
import threading

# Task that accepts a lock argument.
# For threads: disable caching so Prefect won't try to hash (serialize) the lock.
@task(cache_policy=NO_CACHE)
def use_lock(lock):
    lock.acquire()
    lock.release()
    return "ok (used external lock)"

# Process-safe task: creates the lock inside the worker process, so no pickling of args.
@task
def make_and_use_lock():
    lock = threading.Lock()
    lock.acquire()
    lock.release()
    return "ok (lock created in task)"

@flow(task_runner=ThreadPoolTaskRunner())
def thread_flow_ok():
    print("\n[ThreadPoolTaskRunner] START (expect success)")
    lock = threading.Lock()  # not pickleable, but fine with threads
    res = use_lock.submit(lock).result()
    print("[ThreadPoolTaskRunner] RESULT:", res)

@flow(task_runner=ProcessPoolTaskRunner())
def process_flow_fails():
    print("\n[ProcessPoolTaskRunner] START (expect failure when passing lock)")
    lock = threading.Lock()  # not pickleable across processes
    fut = use_lock.submit(lock)
    try:
        res = fut.result()
        print("[ProcessPoolTaskRunner] RESULT:", res)
    except Exception as e:
        print("[ProcessPoolTaskRunner] ERROR TYPE:", type(e).__name__)
        print("[ProcessPoolTaskRunner] ERROR MSG:", e)

@flow(task_runner=ProcessPoolTaskRunner())
def process_flow_fixed():
    print("\n[ProcessPoolTaskRunner - FIXED] START (expect success)")
    res = make_and_use_lock.submit().result()
    print("[ProcessPoolTaskRunner - FIXED] RESULT:", res)

if __name__ == "__main__":
    # Run all three to see the contrast
    thread_flow_ok()      # works (caching disabled avoids hashing the lock)
    process_flow_fails()  # fails (cannot pickle '_thread.lock' object)
    process_flow_fixed()  # works (lock created inside the task)
What this demonstrates: - ThreadPoolTaskRunner: same-process, shared memory. Passing a lock works as long as you disable or customize caching to avoid hashing non-serializable inputs. - ProcessPoolTaskRunner: separate processes. Arguments/results must be pickleable. Passing a lock fails; creating it inside the task works. References: - prefect.task_runners source - prefect.cache_policies source
o
@Marvin i do net get it the memory id is always the same from prefect import flow, task from prefect.task_runners import ProcessPoolTaskRunner myObject = "Hello World! from the global scope" print(f"The memory id of {myObject} is {id(myObject)}") gen = (i for i in range(10)) @task(log_prints=True) def say_hello(name: str): # Although you can access the objects here # clarifiy for yourself they are serialised and desierialsed to # appear here in ProcessPoolTaksRunner print(myObject) print(f"The memory id of {myObject} is {id(myObject)}") print(gen) return f"Hello, {name}!" @flow(task_runner=ProcessPoolTaskRunner) def my_flow(name: str): # Run with full Prefect orchestration print(f"The memory id of {myObject} is {id(myObject)}") result = say_hello(name) print(result) if name == "__main__": # Decide mode from environment variable (default to prod) print(f"The memory id of {myObject} is {id(myObject)}") DEV_MODE = False if DEV_MODE: my_flow.fn("World") else: my_flow("World")
m
thought for 88.0 seconds
You’re seeing the same id because your example never actually leaves the main process: - You passed the class instead of an instance: use ProcessPoolTaskRunner() not ProcessPoolTaskRunner - You called the task directly (say_hello(name)) instead of scheduling it with .submit(...), so it runs in-process - Accessing globals doesn’t demonstrate serialization; child processes import the module and create their own globals. Strings may also be interned, making ids misleading Here’s a single script that shows both worlds correctly: - ThreadPool: same process, same object id - ProcessPool: separate process, arg is pickled/unpickled so id differs - ProcessPool + generator: fails to pickle - Avoids cache hashing issues with NO_CACHE
Copy code
import os
import threading
from prefect import flow, task
from prefect.task_runners import ThreadPoolTaskRunner, ProcessPoolTaskRunner
from prefect.cache_policies import NO_CACHE

@task(cache_policy=NO_CACHE, log_prints=True)
def inspect_arg(label, arg):
    # Runs inside a thread or a process depending on the task runner
    print(f"[{label}] pid={os.getpid()} "
          f"thread={threading.current_thread().name} "
          f"id(arg)={id(arg)} type={type(arg)}")
    return id(arg)

@flow(task_runner=ThreadPoolTaskRunner())
def thread_flow():
    print(f"\n[thread_flow] parent pid={os.getpid()}")
    data = {"k": "v", "nums": [1, 2, 3]}
    print(f"[thread_flow] id(data) in parent={id(data)}")

    # In threads, the same object is shared by reference
    child_id = inspect_arg.submit("thread-data", data).result()
    print(f"[thread_flow] same object across threads? {child_id == id(data)}")  # expect True

    # Generators are fine in threads if caching is disabled
    gen = (i for i in range(3))
    inspect_arg.submit("thread-gen", gen).result()

@flow(task_runner=ProcessPoolTaskRunner())
def process_flow():
    print(f"\n[process_flow] parent pid={os.getpid()}")
    data = {"k": "v", "nums": [1, 2, 3]}
    print(f"[process_flow] id(data) in parent={id(data)}")

    # In processes, args are pickled/unpickled -> new object with a different id
    child_id = inspect_arg.submit("process-data", data).result()
    print(f"[process_flow] same object across processes? {child_id == id(data)}")  # expect False

    # Generators cannot be pickled -> this will fail
    try:
        gen = (i for i in range(3))
        inspect_arg.submit("process-gen", gen).result()
    except Exception as e:
        print(f"[process_flow] passing generator failed as expected: {type(e).__name__}: {e}")

if __name__ == "__main__":
    print(f"[main] pid={os.getpid()}")
    # IMPORTANT: Do not use flow.fn(...) here; that bypasses orchestration and the task runner
    thread_flow()
    process_flow()
Why your original example misleads: - @flow(task_runner=ProcessPoolTaskRunner) should be @flow(task_runner=ProcessPoolTaskRunner()) - say_hello(name) executes inline; use say_hello.submit(name) to schedule it on the runner - Globals in a spawned process come from module import, not from parent memory; and string interning makes id(...) unreliable for this proof References: - prefect.task_runners source - prefect.cache_policies source