Hans Lellelid
05/11/2023, 5:44 PMFile "/opt/executor/.venv/lib/python3.10/site-packages/prefect/client/orchestration.py", line 460, in create_flow_run_from_deployment
json=flow_run_create.dict(json_compatible=True),
│ └ <function PrefectBaseModel.dict at 0xffffa59ff2e0>
└ DeploymentFlowRunCreate(state=StateCreate(type=StateType.SCHEDULED, name='Scheduled', message=None, data=None, state_details=...
File "/opt/executor/.venv/lib/python3.10/site-packages/prefect/server/utilities/schemas.py", line 293, in dict
return json.loads(self.json(*args, **kwargs))
│ │ │ │ │ └ {}
│ │ │ │ └ ()
│ │ │ └ <function PrefectBaseModel.json at 0xffffa59ff250>
│ │ └ DeploymentFlowRunCreate(state=StateCreate(type=StateType.SCHEDULED, name='Scheduled', message=None, data=None, state_details=...
│ └ <function loads at 0xffffb5241090>
└ <module 'json' from '/usr/local/lib/python3.10/json/__init__.py'>
File "/opt/executor/.venv/lib/python3.10/site-packages/prefect/server/utilities/schemas.py", line 247, in json
return super().json(*args, **kwargs)
│ └ {}
└ ()
File "/opt/executor/.venv/lib/python3.10/site-packages/pydantic/main.py", line 504, in json
return self.__config__.json_dumps(data, default=encoder, **dumps_kwargs)
│ │ │ │ │ └ {}
│ │ │ │ └ functools.partial(<function custom_pydantic_encoder at 0xffffb440e4d0>, {<class 'pydantic.types.SecretField'>: <function Pref...
│ │ │ └ {'state': {'type': StateType.SCHEDULED, 'name': 'Scheduled', 'message': None, 'data': None, 'state_details': {'flow_run_id': ...
│ │ └ <function orjson_dumps at 0xffffa59fee60>
│ └ <class 'pydantic.config.Config'>
└ DeploymentFlowRunCreate(state=StateCreate(type=StateType.SCHEDULED, name='Scheduled', message=None, data=None, state_details=...
File "/opt/executor/.venv/lib/python3.10/site-packages/prefect/server/utilities/schemas.py", line 126, in orjson_dumps
return orjson.dumps(v, default=default).decode()
│ │ │ └ functools.partial(<function custom_pydantic_encoder at 0xffffb440e4d0>, {<class 'pydantic.types.SecretField'>: <function Pref...
│ │ └ {'state': {'type': StateType.SCHEDULED, 'name': 'Scheduled', 'message': None, 'data': None, 'state_details': {'flow_run_id': ...
│ └ <built-in function dumps>
└ <module 'orjson' from '/opt/executor/.venv/lib/python3.10/site-packages/orjson/__init__.py'>
TypeError: Type is not JSON serializable: WrappedFloat
In this case, I have a fairly complex graph of Pydantic objects, but specifically the one causing issue is defined something like this:
class WrappedFloat(float):
...
class MyObject(MyBaseModel):
probability: WrappedFloat = WrappedFloat(1.0)
Obviously that is simplified, but effectively removing the WrappedFloat value fixes the issue ... but this is definitely a code quality compromise to change this just so that Prefect can serialize it. Pydantic itself does not complain about serializing this; base classes of known types are supported (thanks to, I believe, https://github.com/pydantic/pydantic/pull/1291)
Are there other workarounds we should pursue? Thanks in advance!import orjson
from pydantic import BaseModel
def orjson_dumps(v, *, default):
# orjson.dumps returns bytes, to match standard json.dumps we need to decode
return orjson.dumps(v, default=default).decode()
class WrappedFloat(float):
def get(self):
return float(self)
class NormalModel(BaseModel):
probability: WrappedFloat = WrappedFloat(1.0)
class OrjsonModel(BaseModel):
probability: WrappedFloat = WrappedFloat(1.0)
class Config:
orm_mode = True
allow_population_by_field_name = True
# There is room for performance improvement, but this isn't as flexible as json.dumps
json_dumps = orjson_dumps
json_loads = orjson.loads
if __name__ == "__main__":
# This passes:
m = NormalModel()
print(m.json())
# This fails:
m = OrjsonModel()
print(m.json())
Nate
05/11/2023, 6:13 PMHans Lellelid
05/11/2023, 6:14 PMNate
05/11/2023, 6:15 PMHans Lellelid
05/11/2023, 6:17 PM@flow(
task_runner=ConcurrentTaskRunner(), # This is also the default, just here for clarity.
log_prints=True,
)
async def run_pipeline(
job_id: uuid.UUID,
complex_param_json: str, # <-- workaround for prefect orjson serialization issues
):
complex_param = ComplexParam.parse_raw(complex_param_json)
(I'll also add this comment to ticket.)