https://prefect.io logo
Title
h

Hans Lellelid

05/11/2023, 5:44 PM
Hey folks -- question about Pydantic & JSON. I'm adding Prefect into an application that makes pretty heavy use of Pydantic right now. We have some subtypes of builtins that serialize fine using our Pydantic setup but don't seem to work when passing these as parameters to a Prefect deployment -- or specifically the orjson serializer. For our models, we are overriding some of the serializers, though not for this specific type (more below). Is there a way to configure Prefect not use orjson or to respect our serializers ... or something else? Here's the prefect side of the stack trace I'm getting:
File "/opt/executor/.venv/lib/python3.10/site-packages/prefect/client/orchestration.py", line 460, in create_flow_run_from_deployment
    json=flow_run_create.dict(json_compatible=True),
         │               └ <function PrefectBaseModel.dict at 0xffffa59ff2e0>
         └ DeploymentFlowRunCreate(state=StateCreate(type=StateType.SCHEDULED, name='Scheduled', message=None, data=None, state_details=...
  File "/opt/executor/.venv/lib/python3.10/site-packages/prefect/server/utilities/schemas.py", line 293, in dict
    return json.loads(self.json(*args, **kwargs))
           │    │     │    │     │       └ {}
           │    │     │    │     └ ()
           │    │     │    └ <function PrefectBaseModel.json at 0xffffa59ff250>
           │    │     └ DeploymentFlowRunCreate(state=StateCreate(type=StateType.SCHEDULED, name='Scheduled', message=None, data=None, state_details=...
           │    └ <function loads at 0xffffb5241090>
           └ <module 'json' from '/usr/local/lib/python3.10/json/__init__.py'>
  File "/opt/executor/.venv/lib/python3.10/site-packages/prefect/server/utilities/schemas.py", line 247, in json
    return super().json(*args, **kwargs)
                         │       └ {}
                         └ ()
  File "/opt/executor/.venv/lib/python3.10/site-packages/pydantic/main.py", line 504, in json
    return self.__config__.json_dumps(data, default=encoder, **dumps_kwargs)
           │    │          │          │             │          └ {}
           │    │          │          │             └ functools.partial(<function custom_pydantic_encoder at 0xffffb440e4d0>, {<class 'pydantic.types.SecretField'>: <function Pref...
           │    │          │          └ {'state': {'type': StateType.SCHEDULED, 'name': 'Scheduled', 'message': None, 'data': None, 'state_details': {'flow_run_id': ...
           │    │          └ <function orjson_dumps at 0xffffa59fee60>
           │    └ <class 'pydantic.config.Config'>
           └ DeploymentFlowRunCreate(state=StateCreate(type=StateType.SCHEDULED, name='Scheduled', message=None, data=None, state_details=...
  File "/opt/executor/.venv/lib/python3.10/site-packages/prefect/server/utilities/schemas.py", line 126, in orjson_dumps
    return orjson.dumps(v, default=default).decode()
           │      │     │          └ functools.partial(<function custom_pydantic_encoder at 0xffffb440e4d0>, {<class 'pydantic.types.SecretField'>: <function Pref...
           │      │     └ {'state': {'type': StateType.SCHEDULED, 'name': 'Scheduled', 'message': None, 'data': None, 'state_details': {'flow_run_id': ...
           │      └ <built-in function dumps>
           └ <module 'orjson' from '/opt/executor/.venv/lib/python3.10/site-packages/orjson/__init__.py'>

TypeError: Type is not JSON serializable: WrappedFloat
In this case, I have a fairly complex graph of Pydantic objects, but specifically the one causing issue is defined something like this:
class WrappedFloat(float):
  ...

class MyObject(MyBaseModel):
  probability: WrappedFloat = WrappedFloat(1.0)
Obviously that is simplified, but effectively removing the WrappedFloat value fixes the issue ... but this is definitely a code quality compromise to change this just so that Prefect can serialize it. Pydantic itself does not complain about serializing this; base classes of known types are supported (thanks to, I believe, https://github.com/pydantic/pydantic/pull/1291) Are there other workarounds we should pursue? Thanks in advance!
Here's a simple reproduce of the issue (w/o Prefect in the loop):
import orjson
from pydantic import BaseModel


def orjson_dumps(v, *, default):
    # orjson.dumps returns bytes, to match standard json.dumps we need to decode
    return orjson.dumps(v, default=default).decode()


class WrappedFloat(float):
    def get(self):
        return float(self)


class NormalModel(BaseModel):
    probability: WrappedFloat = WrappedFloat(1.0)


class OrjsonModel(BaseModel):

    probability: WrappedFloat = WrappedFloat(1.0)

    class Config:
        orm_mode = True
        allow_population_by_field_name = True
        # There is room for performance improvement, but this isn't as flexible as json.dumps
        json_dumps = orjson_dumps
        json_loads = orjson.loads




if __name__ == "__main__":

    # This passes:
    m = NormalModel()
    print(m.json())

    # This fails:
    m = OrjsonModel()
    print(m.json())
Digging through the source, it looks like I can provide an encoder since kwargs get passed on to the superclass. (Well, if I want to do that I need to redefine the create_flow_run_from_deployment method.)
It sounds like I should probably just file a ticket for this. And maybe the answer is "we can't support pydantic classes that aren't orjson serializable" ....
n

Nate

05/11/2023, 6:13 PM
hey @Hans Lellelid can you show the code where you're interacting with Prefect? it looks like you're creating a flow run from a deployment which I believe should work fine with your pydantic types as paramaters. you should just have to pass a JSON serialized version of the parameter values to the endpoint - unless I'm missing an aspect of your question
h

Hans Lellelid

05/11/2023, 6:14 PM
I think it gets too confusing to show the full code, but • Yes, Pydantic parameters work fine. Just not ones that are not serializable with orjson
I created a ticket here that might do a better job explaining this: https://github.com/PrefectHQ/prefect/issues/9536
n

Nate

05/11/2023, 6:15 PM
ok thanks! 👀
h

Hans Lellelid

05/11/2023, 6:17 PM
As a workaround I am currently just passing this as a JSON string. I redefined my flow to look like:
@flow(
    task_runner=ConcurrentTaskRunner(),  # This is also the default, just here for clarity.
    log_prints=True,
)
async def run_pipeline(
    job_id: uuid.UUID,
    complex_param_json: str,  # <-- workaround for prefect orjson serialization issues
):
    complex_param = ComplexParam.parse_raw(complex_param_json)
(I'll also add this comment to ticket.)
1