Marwan Sarieddine
08/13/2022, 8:32 PMfrom prefect import flow, task, get_run_logger
from pydantic import BaseModel
class Model(BaseModel):
a: int
b: float
c: str
@task
def parser(obj: Model) -> Model:
logger = get_run_logger()
<http://logger.info|logger.info>(f"{obj.a=}, {obj.b=}, {obj.c=}")
return obj
@task
def printer(obj: Model) -> None:
logger = get_run_logger()
<http://logger.info|logger.info>(f"Received a {type(obj)} with value {obj}")
@flow
def model_validator(model: Model) -> None:
obj = parser(model)
printer(obj=obj)
model_validator({"a": 42, "b": 0, "c": 55})
16:32:15.606 | INFO | prefect.engine - Created flow run 'sensible-nuthatch' for flow 'model-validator'
16:32:15.789 | INFO | Flow run 'sensible-nuthatch' - Created task run 'parser-dda0fc11-0' for task 'parser'
16:32:15.790 | INFO | Flow run 'sensible-nuthatch' - Executing 'parser-dda0fc11-0' immediately...
16:32:15.819 | INFO | Task run 'parser-dda0fc11-0' - obj.a=42, obj.b=0.0, obj.c='55'
16:32:15.820 | ERROR | Flow run 'sensible-nuthatch' - Encountered exception during execution:
Traceback (most recent call last):
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/Users/marwansarieddine/demo-interests/orion/orion/flow_2.py", line 23, in model_validator
obj = parser(model)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/tasks.py", line 294, in __call__
return enter_task_run_engine(
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 687, in enter_task_run_engine
return run_async_from_worker_thread(begin_run)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/Users/marwansarieddine/.pyenv/versions/3.9.12/lib/python3.9/concurrent/futures/_base.py", line 446, in result
return self.__get_result()
File "/Users/marwansarieddine/.pyenv/versions/3.9.12/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 806, in create_task_run_then_submit
return await future._result()
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/futures.py", line 220, in _result
return final_state.result(raise_on_failure=raise_on_failure)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/task_runners.py", line 214, in submit
result = await run_fn(**run_kwargs)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 947, in begin_task_run
return await orchestrate_task_run(
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 1065, in orchestrate_task_run
terminal_state = await return_value_to_state(
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/states.py", line 129, in return_value_to_state
return Completed(data=DataDocument.encode(serializer, result))
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/orion/schemas/data.py", line 42, in encode
blob = lookup_serializer(encoding).dumps(data, **kwargs)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/serializers.py", line 59, in dumps
data_bytes = cloudpickle.dumps(data)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
return Pickler.dump(self, obj)
_pickle.PicklingError: Can't pickle <cyfunction int_validator at 0x1079a91e0>: it's not the same object as pydantic.validators.int_validator
16:32:15.828 | INFO | Task run 'parser-dda0fc11-0' - Crash detected! Execution was interrupted by an unexpected exception.
16:32:15.883 | ERROR | Flow run 'sensible-nuthatch' - Finished in state Failed('Flow run encountered an exception.')
Traceback (most recent call last):
File "/Users/marwansarieddine/demo-interests/orion/orion/flow_2.py", line 26, in <module>
model_validator({"a": 42, "b": 0, "c": 55})
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/flows.py", line 390, in __call__
return enter_flow_run_engine_from_flow_call(
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 152, in enter_flow_run_engine_from_flow_call
return anyio.run(begin_run)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/_core/_eventloop.py", line 70, in run
return asynclib.run(func, *args, **backend_options)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 292, in run
return native_run(wrapper(), debug=debug)
File "/Users/marwansarieddine/.pyenv/versions/3.9.12/lib/python3.9/asyncio/runners.py", line 44, in run
return loop.run_until_complete(main)
File "/Users/marwansarieddine/.pyenv/versions/3.9.12/lib/python3.9/asyncio/base_events.py", line 647, in run_until_complete
return future.result()
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 287, in wrapper
return await func(*args)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/client.py", line 104, in with_injected_client
return await fn(*args, **kwargs)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 228, in create_then_begin_flow_run
return state.result()
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 557, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/Users/marwansarieddine/demo-interests/orion/orion/flow_2.py", line 23, in model_validator
obj = parser(model)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/tasks.py", line 294, in __call__
return enter_task_run_engine(
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 687, in enter_task_run_engine
return run_async_from_worker_thread(begin_run)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/Users/marwansarieddine/.pyenv/versions/3.9.12/lib/python3.9/concurrent/futures/_base.py", line 446, in result
return self.__get_result()
File "/Users/marwansarieddine/.pyenv/versions/3.9.12/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 806, in create_task_run_then_submit
return await future._result()
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/futures.py", line 220, in _result
return final_state.result(raise_on_failure=raise_on_failure)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result
raise data
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/task_runners.py", line 214, in submit
result = await run_fn(**run_kwargs)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 947, in begin_task_run
return await orchestrate_task_run(
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/engine.py", line 1065, in orchestrate_task_run
terminal_state = await return_value_to_state(
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/states.py", line 129, in return_value_to_state
return Completed(data=DataDocument.encode(serializer, result))
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/orion/schemas/data.py", line 42, in encode
blob = lookup_serializer(encoding).dumps(data, **kwargs)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/prefect/serializers.py", line 59, in dumps
data_bytes = cloudpickle.dumps(data)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 73, in dumps
cp.dump(obj)
File "/Users/marwansarieddine/.pyenv/versions/prefect2/lib/python3.9/site-packages/cloudpickle/cloudpickle_fast.py", line 633, in dump
return Pickler.dump(self, obj)
_pickle.PicklingError: Can't pickle <cyfunction int_validator at 0x1079a91e0>: it's not the same object as pydantic.validators.int_validator
Anna Geller
Marwan Sarieddine
08/13/2022, 9:13 PMAnna Geller
Marwan Sarieddine
08/13/2022, 9:17 PMAnna Geller
a given storage type that relies on cloudpicklewe deprecated cloudpickle for flow storage too
Marwan Sarieddine
08/13/2022, 9:18 PMI would like to have flow-level validation with pydantic and to also use tasks in my flowAs you see from the example I shared, this raises an error.. I am on the latest prefect version
Anna Geller
@task
def parser(obj: Model) -> Model:
logger = get_run_logger()
<http://logger.info|logger.info>(f"{obj.a=}, {obj.b=}, {obj.c=}")
# return obj
Marwan Sarieddine
08/13/2022, 9:32 PMAnna Geller
Marwan Sarieddine
08/13/2022, 9:34 PMAnna Geller
Marwan Sarieddine
08/13/2022, 9:38 PMFelix Sonntag
08/23/2022, 12:06 PMAnna Geller
Oliver Mannion
09/24/2022, 1:30 AM