https://prefect.io logo
m

Marwan Sarieddine

08/13/2022, 8:32 PM
Hey folks, a question about prefect 2.0 and pydantic. I certainly appreciate using pydantic at the flow level to allow for complex parameter types and to enable type validation/conversion out of the box. However it seems that pydantic supports starts to fail at the task level. More specifically it seems pickling pydantic models using cloudpickle is troublesome. See the thread for a small example. I am curious if this will remain a known issue with certain storage types going forward…
Here is the flow code - I took the code from here and added one additional task
Copy code
from prefect import flow, task, get_run_logger
from pydantic import BaseModel

class Model(BaseModel):
    a: int
    b: float
    c: str

@task
def parser(obj: Model) -> Model:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"{obj.a=}, {obj.b=}, {obj.c=}")
    return obj

@task
def printer(obj: Model) -> None:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"Received a {type(obj)} with value {obj}")


@flow
def model_validator(model: Model) -> None:
    obj = parser(model)
    printer(obj=obj)

model_validator({"a": 42, "b": 0, "c": 55})
if I run this I get this error
Copy code
16:32:15.606 | INFO    | prefect.engine - Created flow run 'sensible-nuthatch' for flow 'model-validator'
16:32:15.789 | INFO    | Flow run 'sensible-nuthatch' - Created task run 'parser-dda0fc11-0' for task 'parser'
16:32:15.790 | INFO    | Flow run 'sensible-nuthatch' - Executing 'parser-dda0fc11-0' immediately...
16:32:15.819 | INFO    | Task run 'parser-dda0fc11-0' - obj.a=42, obj.b=0.0, obj.c='55'
16:32:15.820 | ERROR   | Flow run 'sensible-nuthatch' - Encountered exception during execution:
Traceback (most recent call last):
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/Users/marwansarieddine/demo-interests/orion/orion/flow_2.py", line 23, in model_validator
    obj = parser(model)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/tasks.py", line 294, in __call__
    return enter_task_run_engine(
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 687, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/Users/marwansarieddine/.pyenv/versions/3.9.12/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/Users/marwansarieddine/.pyenv/versions/3.9.12/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 806, in create_task_run_then_submit
    return await future._result()
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/futures.py", line 220, in _result
    return final_state.result(raise_on_failure=raise_on_failure)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/task_runners.py", line 214, in submit
    result = await run_fn(**run_kwargs)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 947, in begin_task_run
    return await orchestrate_task_run(
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 1065, in orchestrate_task_run
    terminal_state = await return_value_to_state(
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/states.py", line 129, in return_value_to_state
    return Completed(data=DataDocument.encode(serializer, result))
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/orion/schemas/data.py", line 42, in encode
    blob = lookup_serializer(encoding).dumps(data, **kwargs)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/serializers.py", line 59, in dumps
    data_bytes = cloudpickle.dumps(data)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
    return Pickler.dump(self, obj)
_pickle.PicklingError: Can't pickle <cyfunction int_validator at 0x1079a91e0>: it's not the same object as pydantic.validators.int_validator
16:32:15.828 | INFO    | Task run 'parser-dda0fc11-0' - Crash detected! Execution was interrupted by an unexpected exception.
16:32:15.883 | ERROR   | Flow run 'sensible-nuthatch' - Finished in state Failed('Flow run encountered an exception.')
Traceback (most recent call last):
  File "/Users/marwansarieddine/demo-interests/orion/orion/flow_2.py", line 26, in <module>
    model_validator({"a": 42, "b": 0, "c": 55})
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/flows.py", line 390, in __call__
    return enter_flow_run_engine_from_flow_call(
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 152, in enter_flow_run_engine_from_flow_call
    return anyio.run(begin_run)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
    return asynclib.run(func, *args, **backend_options)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
    return native_run(wrapper(), debug=debug)
  File "/Users/marwansarieddine/.pyenv/versions/3.9.12/lib/python3.9/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/Users/marwansarieddine/.pyenv/versions/3.9.12/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
    return future.result()
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
    return await func(*args)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 228, in create_then_begin_flow_run
    return state.result()
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/Users/marwansarieddine/demo-interests/orion/orion/flow_2.py", line 23, in model_validator
    obj = parser(model)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/tasks.py", line 294, in __call__
    return enter_task_run_engine(
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 687, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/Users/marwansarieddine/.pyenv/versions/3.9.12/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/Users/marwansarieddine/.pyenv/versions/3.9.12/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 806, in create_task_run_then_submit
    return await future._result()
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/futures.py", line 220, in _result
    return final_state.result(raise_on_failure=raise_on_failure)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/task_runners.py", line 214, in submit
    result = await run_fn(**run_kwargs)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 947, in begin_task_run
    return await orchestrate_task_run(
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 1065, in orchestrate_task_run
    terminal_state = await return_value_to_state(
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/states.py", line 129, in return_value_to_state
    return Completed(data=DataDocument.encode(serializer, result))
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/orion/schemas/data.py", line 42, in encode
    blob = lookup_serializer(encoding).dumps(data, **kwargs)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/serializers.py", line 59, in dumps
    data_bytes = cloudpickle.dumps(data)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
    cp.dump(obj)
  File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
    return Pickler.dump(self, obj)
_pickle.PicklingError: Can't pickle <cyfunction int_validator at 0x1079a91e0>: it's not the same object as pydantic.validators.int_validator
a

Anna Geller

08/13/2022, 9:10 PM
you can pass validate_parameters=False to disable validation
m

Marwan Sarieddine

08/13/2022, 9:13 PM
So I saw this but it is problematic for more than one reason 1. It does not resolve the error given the input now remains a dictionary and doesn’t get converted into a pydantic Model 2. We lose the parameter validation at the flow level
Is the suggestion, that if one wants use tasks and a given storage type that relies on cloudpickle, that for now pydantic validation is not supported ?
a

Anna Geller

08/13/2022, 9:15 PM
we don't have pydantic validation on a task level
that must be major source of confusion for you
we do it only for flows to validate input parameters to the flow
feel free to open a GitHub issue with a feature request to add task-level validation and we could discuss in a team, but this won't work atm
m

Marwan Sarieddine

08/13/2022, 9:17 PM
no I understand there is no pydantic validation on the task level - that is not what I am requesting I would like to have flow-level validation with pydantic and to also use tasks in my flow
a

Anna Geller

08/13/2022, 9:17 PM
a given storage type that relies on cloudpickle
we deprecated cloudpickle for flow storage too
m

Marwan Sarieddine

08/13/2022, 9:18 PM
I would like to have flow-level validation with pydantic and to also use tasks in my flow
As you see from the example I shared, this raises an error.. I am on the latest prefect version
a

Anna Geller

08/13/2022, 9:19 PM
ok sorry, let me investigate a bit more before telling you sth wrong
ok, the thing is: you can totally use pydantic models in your tasks and call such tasks in your flow, the problem is with returning a pydantic model, e.g. changing your task this way will work:
Copy code
@task
def parser(obj: Model) -> Model:
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"{obj.a=}, {obj.b=}, {obj.c=}")
    # return obj
do you have to return it in your use case? if not, this should fix your issue
m

Marwan Sarieddine

08/13/2022, 9:32 PM
yea so I can’t pass pydantic Model instances between tasks - this is unfortunate
thanks for confirming this is the case, thought I was missing something
a

Anna Geller

08/13/2022, 9:33 PM
do you need this functionality?
m

Marwan Sarieddine

08/13/2022, 9:34 PM
potentially yes, still in the early phases of exploring migrating to prefect 2
a

Anna Geller

08/13/2022, 9:35 PM
this is more sth related to pydantic than to Prefect https://github.com/cloudpipe/cloudpickle/issues/408
I believe you shouldn't worry too much about this since Michael is currently working on making the task run persistence more configurable, so I believe soon you should be able to avoid checkpointing/pickling such pydantic objects and thus you should be able to pass it between tasks
🚀 1
making task run results persistent more configurable is a high priority roadmap item atm so I would expect to see this being tackled within the next 2 weeks
🚀 1
m

Marwan Sarieddine

08/13/2022, 9:38 PM
ok will keep an eye out for this update - thanks for providing the support and update, much appreciated
🙌 2
f

Felix Sonntag

08/23/2022, 12:06 PM
Also 👍🏻 for that, currently facing issues with serializing Keras models 🙂
a

Anna Geller

08/23/2022, 1:40 PM
Thanks to you both for proving the urgency of this, I'll ping the team to highlight the importance of this issue to prioritize
🙌 1
o

Oliver Mannion

09/24/2022, 1:30 AM
Thank you for the link! I ran into this and resolved it by moving my model into a separate file from my flow as per this comment.
👍 1
2 Views