<@ULVA73B9P> I encounter an issue when running a t...
# ask-marvin
s
@Marvin I encounter an issue when running a task with an input and/or a result that extends pydantic "BaseModel" class, that has optional unset fields. Here is the error I get:
Copy code
TypeError: 'MockValSer' object cannot be converted to 'SchemaSerializer'
Is there any solution to work around this ? It looks to be an issue in the pickle serializer when dealing with pydantic object serialization, when the "exclude_unset" parameter is set to True. https://github.com/pydantic/pydantic/issues/7713
m
thought for 628.6 seconds
Thanks for sharing the Pydantic issue link — that error aligns with a known Pydantic v2/pydantic-core serialization edge case that can surface when objects with unset optionals are serialized and certain serializers get involved. What’s happening - In Prefect 3.x, the default result serializer is pickle (via cloudpickle). Some Pydantic BaseModel objects (especially when exclude_unset behavior is involved) can trigger pydantic-core internals that don’t play nicely with pickling, yielding errors like “MockValSer … cannot be converted to SchemaSerializer”. - With background tasks (
.delay()
+
.result_async()
), you typically shouldn’t hit serialization unless results are being persisted or sent through a boundary that requires serialization. Still, this can surface depending on your configuration and versions. Workarounds you can try 1) Switch your results to JSON serialization - Prefect’s JSON serializer uses a Prefect-aware encoder that handles Pydantic BaseModel cleanly. - Per-task:
Copy code
from prefect import task
  from prefect.serializers import JSONSerializer

  @task(result_serializer=JSONSerializer())
  def my_task(m: MyModel) -> MyModel:
      ...
- Per-flow (for all tasks without their own override):
Copy code
from prefect import flow
  from prefect.serializers import JSONSerializer

  @flow(result_serializer=JSONSerializer())
  def my_flow(...):
      ...
- Globally (recommended if you want this everywhere): - Set env var:
PREFECT_RESULTS_DEFAULT_SERIALIZER=json
- Or in code:
Copy code
from prefect.settings import temporary_settings

    with temporary_settings({"PREFECT_RESULTS_DEFAULT_SERIALIZER": "json"}):
        ...
Docs: Prefect 3 Results and Serializer API 2) If you must use pickle, try stdlib pickle instead of cloudpickle - This sometimes avoids the pydantic-core interaction:
Copy code
from prefect import task
  from prefect.serializers import PickleSerializer

  @task(result_serializer=PickleSerializer(picklelib="pickle"))
  def my_task(m: MyModel):
      ...
3) Pass plain dicts across task boundaries, then reconstruct - Keep your exclude_unset behavior and avoid serializer pitfalls:
Copy code
@task
  def to_dict(m: MyModel) -> dict:
      return m.model_dump(exclude_unset=True)

  @task
  def from_dict(d: dict) -> MyModel:
      return MyModel(**d)
4) Upgrade Pydantic / pydantic-core to latest - Several related issues have been addressed in recent releases of Pydantic v2 and pydantic-core. If you’re not on the latest, please upgrade and see if the error disappears. 5) If the error happens during parameter validation - As a diagnostic step (not a long-term recommendation), try disabling flow-level parameter validation:
Copy code
from prefect import flow

  @flow(validate_parameters=False)
  def my_flow(...):
      ...
If this removes the error, it points to the validation layer’s interaction with Pydantic. Prefer using the JSON serializer or upgrading Pydantic as a proper fix. Notes specific to your setup - Dynamic task naming via
.with_options(name=...)
won’t affect serialization. - Background tasks (
.delay()
+
await future.result_async()
) generally don’t pickle unless you’re persisting results or crossing process boundaries. If you’ve enabled result persistence, using
JSONSerializer
is the most robust path for Pydantic models. Could you share: - Your Pydantic and pydantic-core versions - Whether you’ve enabled result persistence and/or a custom
result_serializer
- A minimal example (model + task) that reproduces the error I can tailor the fix precisely with those details. In the meantime, switching the task/flow/global serializer to JSON is the most reliable Prefect-side workaround. Links - Pydantic issue you cited: pydantic#7713
- Prefect 3 Results: How results are stored/serialized - Serializer API: prefect.serializers
s
@Marvin I tried with the json serializer, but I get the following error:
Copy code
SerializationError: Failed to serialize object of type 'dict' with serializer 'json'. You can try a different serializer (e.g. result_serializer="json") or disabling persistence (persist_result=False) for this flow or task.
Is the JSONSerializer compatible with
dict
and pydantic
BaseModel
objects ?
m
thought for 102.8 seconds
Short answer: yes—Prefect’s JSONSerializer can handle plain dicts and Pydantic BaseModel objects. That error means something inside your dict isn’t JSON-safe with the default encoder. What’s likely happening - JSONSerializer uses Prefect’s
prefect_json_object_encoder
to convert non-JSON-native types (e.g., datetimes, UUIDs, Paths, BaseModel) before
json.dumps
. - If your dict contains a value the encoder doesn’t know how to convert (e.g., a custom class, set, Decimal, bytes, or a Pydantic object that needs special dumping semantics), you’ll see “Failed to serialize object of type 'dict'”. Quick ways to fix/diagnose 1) Force BaseModel to a JSON-safe dict yourself - Especially if you rely on exclude_unset, use Pydantic v2’s JSON mode:
Copy code
from prefect import task
from prefect.serializers import JSONSerializer

@task(result_serializer=JSONSerializer())
def my_task(m: MyModel) -> dict:
    # mode="json" makes nested values JSON-friendly
    return m.model_dump(mode="json", exclude_unset=True)
2) Add a custom encoder for your specific types - If your dict contains custom/unhandled types, extend Prefect’s encoder:
Copy code
from pydantic import BaseModel
from prefect.serializers import JSONSerializer, prefect_json_object_encoder

def my_encoder(obj):
    if isinstance(obj, BaseModel):
        # Ensure exclude_unset and JSON-friendly conversion
        return obj.model_dump(mode="json", exclude_unset=True)
    # Add other custom cases as needed, e.g.:
    # if isinstance(obj, Decimal): return float(obj)
    # if isinstance(obj, set): return list(obj)
    return prefect_json_object_encoder(obj)

json_ser = JSONSerializer(object_encoder="path.to.module.my_encoder")
Use it on the task/flow or globally:
Copy code
@task(result_serializer=json_ser)
def my_task(...): ...
3) Pinpoint the offending value - Try serializing locally to see what fails:
Copy code
from prefect.serializers import JSONSerializer
JSONSerializer().dumps(your_object)  # will raise on the first unknown type
4) If you don’t need persistence, disable it
Copy code
@task(persist_result=False)
def my_task(...): ...
This avoids result serialization entirely. Notes - Dicts are supported, but every contained value must be JSON-serializable after the encoder runs. - BaseModel is supported, but Prefect’s default encoder does not enforce `exclude_unset=True`—do it yourself or use a custom encoder as shown above. - Given your earlier pickle error with Pydantic, JSON +
model_dump(mode="json", exclude_unset=True)
tends to be the most reliable path. If you can share a minimal example of the dict content (or the model fields/types), I can suggest the exact encoder tweaks you’ll need. Links - Prefect Results: How results are stored/serialized - Serializer API: prefect.serializers
s
@Marvin when I do the exercise of running serialization and deserialization of a pydantic object, out of a task execution, it works well, with both Pickle and JSON serializer. However, when it is running in a task worker, it fails. What could cause that ?
m
thought for 123.7 seconds
Great question — this pattern usually points to differences between your local “pure Python” test and what happens inside a Prefect worker run. A few common causes: - Result persistence kicks in on the worker - Locally, you may not be persisting results, so serialization isn’t exercised. On a worker (especially via a deployment), Prefect often persists task/flow results to storage — triggering serialization. - Quick check: set
@task(persist_result=False)
on the task that returns the model/dict. If the error goes away, it’s a serialization/persistence issue. - Different environment inside the worker - Container/VM may have different versions of pydantic/pydantic-core/cloudpickle/Python than your dev environment. Mismatches can expose Pydantic v2 edge cases only in the worker. - Action: ensure the worker image/venv has the same versions you tested locally (especially pydantic and pydantic-core). - The object you return from the task is not JSON-safe in that environment - JSONSerializer can handle dicts and BaseModel, but every nested value must be encodable. If your dict or model includes unsupported types (e.g., numpy arrays, Decimal, Path objects without conversion, sets, custom classes), JSON will fail. - This can differ between local test and worker if, for example, your task returns more fields or different values at runtime. - Pickle specifics (if you use PickleSerializer on the worker) - Pickle re-imports classes by module path; if your model is defined in `__main__`/a notebook or not importable in the worker, pickle can fail. - If you must use pickle, try
PickleSerializer(picklelib="pickle")
— it sometimes avoids pydantic-core/cloudpickle interactions. Recommended fixes/tests 1) Prove it’s result persistence - Temporarily disable for the problematic task:
Copy code
from prefect import task

  @task(persist_result=False)
  def my_task(...):
      ...
- If that fixes it, add an explicit serializer and/or convert to JSON-safe data before returning. 2) Return JSON-safe data from the task - For Pydantic v2 models, force JSON mode and exclude unset:
Copy code
from prefect import task
  from prefect.serializers import JSONSerializer

  @task(result_serializer=JSONSerializer())
  def my_task(m: MyModel) -> dict:
      return m.model_dump(mode="json", exclude_unset=True)
- If you need to return the model object itself, use a custom encoder that enforces JSON-safe dumping:
Copy code
from pydantic import BaseModel
  from prefect.serializers import JSONSerializer, prefect_json_object_encoder

  def my_encoder(obj):
      if isinstance(obj, BaseModel):
          return obj.model_dump(mode="json", exclude_unset=True)
      # Add other conversions as needed (Decimal -> float, set -> list, etc.)
      return prefect_json_object_encoder(obj)

  json_ser = JSONSerializer(object_encoder="path.to.module.my_encoder")

  @task(result_serializer=json_ser)
  def my_task(m: MyModel):
      return m
3) Instrument inside the task to locate the offending value - Add a quick try/except before returning:
Copy code
from prefect.serializers import JSONSerializer

  @task(result_serializer=JSONSerializer())
  def my_task(value):
      ser = JSONSerializer()
      try:
          ser.dumps(value)
      except Exception as e:
          # Log problematic types/fields here
          raise
      return value
- This will confirm exactly what the JSON encoder cannot handle in the worker. 4) Align environments - Pin the same versions of Python, pydantic, and pydantic-core in your worker image/venv as you used locally. 5) If you must use pickle - Try stdlib pickle:
Copy code
from prefect.serializers import PickleSerializer

  @task(result_serializer=PickleSerializer(picklelib="pickle"))
  def my_task(...):
      ...
If you can share: - Your worker type (process/docker/k8s), deployment config, and whether persistence is enabled - pydantic and pydantic-core versions on both local and worker
- A minimal model + task that reproduces the failure on the worker - The full traceback from the worker logs …I can pinpoint the exact fix. For many Pydantic v2 cases like yours, the most robust path has been: explicitly set
JSONSerializer
on the task and return
model_dump(mode="json", exclude_unset=True)
so the payload is guaranteed JSON-safe before Prefect persists it. References - Prefect Results: How results are stored/serialized - Serializer API: prefect.serializers
s
@Marvin I'm using workers running docker containers, with pydantic version 2.11.7 and pydantic-core version 2.33.2
m
thought for 64.3 seconds
Thanks — that helps narrow it down. Why it works locally but fails in the Docker worker - In a worker run, Prefect always serializes task results to report state back to the API (even if you aren’t persisting to external storage). So serialization is exercised more often in workers than in a quick local round‑trip. - The worker environment may differ: Python, cloudpickle, or how your model’s module is imported. Pickle depends on the class being importable by module path inside the container; JSON depends on every nested value being JSON-encodable with the object encoder. - Under real task execution, your model/dict may contain values that aren’t present in your local test (e.g., sets, Decimal, numpy types, bytes, or a custom object), which JSON can’t encode by default. Given your versions (pydantic 2.11.7 / pydantic-core 2.33.2), here’s what I recommend. Do this first (most robust with Pydantic v2) - Ensure results are JSON-safe at the task boundary:
Copy code
from prefect import task
  from prefect.serializers import JSONSerializer

  @task(result_serializer=JSONSerializer())
  def my_task(m: MyModel) -> dict:
      # Force JSON-compatible payload and drop unset optionals
      return m.model_dump(mode="json", exclude_unset=True)
- If you need to return the model object itself, give JSONSerializer a custom encoder that enforces JSON-safe dumping for BaseModel (and anything else you use like Decimal/sets):
Copy code
from pydantic import BaseModel
  from decimal import Decimal
  from prefect.serializers import JSONSerializer, prefect_json_object_encoder

  def my_encoder(obj):
      if isinstance(obj, BaseModel):
          return obj.model_dump(mode="json", exclude_unset=True)
      if isinstance(obj, Decimal):
          return float(obj)
      if isinstance(obj, set):
          return list(obj)
      # fall back to Prefect's defaults (handles datetime, UUID, Path, etc.)
      return prefect_json_object_encoder(obj)

  json_ser = JSONSerializer(object_encoder="your_package.your_module.my_encoder")

  @task(result_serializer=json_ser)
  def my_task(m: MyModel):
      return m
- If parameters (inputs) are the source of failure, pass
m.model_dump(mode="json", exclude_unset=True)
into
.delay()
and reconstruct inside the task:
MyModel(**d)
. If you must use pickle - Prefer stdlib pickle in the worker to avoid cloudpickle/pydantic-core interactions:
Copy code
from prefect.serializers import PickleSerializer

  @task(result_serializer=PickleSerializer(picklelib="pickle"))
  def my_task(...):
      ...
- Make sure the model class is importable by full module path in the container (not defined in
__main__
or an interactive context), and that the code is installed/available in the image. Align environments - Verify the worker’s image has the same versions of Python, Prefect, pydantic, pydantic-core, and cloudpickle as your local env. A mismatch in cloudpickle especially can surface these issues. Quick diagnostic inside the worker - Add a pre-return check to pinpoint the offending value:
Copy code
from prefect.serializers import JSONSerializer

  @task(result_serializer=JSONSerializer())
  def my_task(value):
      ser = JSONSerializer()
      try:
          ser.dumps(value)
      except Exception:
          # log types/structure to identify the non-serializable member
          # e.g., iterate fields and log type(v) for each
          raise
      return value
If you share: - Whether you’ve enabled result persistence in your deployment/worker - The exact model definition and a minimal example of the dict/result that fails in the worker - The full traceback from the worker logs …I can zero in on the exact field/type causing the JSON failure and suggest the precise encoder tweaks. References - Prefect results and serializers: How results are stored/serialized - Serializer API: prefect.serializers
- Related Pydantic issue: MockValSer / SchemaSerializer