Jai P
06/01/2022, 6:01 PMprefect 2.0pydanticJai P
06/01/2022, 6:01 PMclass Something(BaseModel):
    a: int
    class Config:
        extra = Extra.allow
@task
def my_task(model: Something):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"in my_task: {model.dict()}")
def my_regular_function(model: Something):
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"in my_regular_function: {model.dict()}")
@flow
def my_flow():
    model = Something(a=1, b=2)
    logger = get_run_logger()
    <http://logger.info|logger.info>(f"in my_flow: {model.dict()}")
    my_regular_function(model)
    my_task(model)
if __name__ == '__main__':
    my_flow()Jai P
06/01/2022, 6:01 PM11:00:32.593 | INFO    | prefect.engine - Created flow run 'interesting-otter' for flow 'my-flow'
11:00:32.593 | INFO    | Flow run 'interesting-otter' - Using task runner 'ConcurrentTaskRunner'
11:00:32.645 | WARNING | Flow run 'interesting-otter' - No default storage is configured on the server. Results from this flow run will be stored in a temporary directory in its runtime environment.
11:00:32.707 | INFO    | Flow run 'interesting-otter' - in my_flow: {'a': 1, 'b': 2}
11:00:32.707 | INFO    | Flow run 'interesting-otter' - in my_regular_function: {'a': 1, 'b': 2}
11:00:32.765 | INFO    | Flow run 'interesting-otter' - Created task run 'my_task-ec9685da-0' for task 'my_task'
11:00:32.857 | INFO    | Task run 'my_task-ec9685da-0' - in my_task: {'a': 1}
11:00:32.908 | INFO    | Task run 'my_task-ec9685da-0' - Finished in state Completed()
11:00:32.953 | INFO    | Flow run 'interesting-otter' - Finished in state Completed('All states completed.')Jai P
06/01/2022, 6:04 PMmy_regular_function{'a': 1, 'b': 2}my_task{'a': 1}Jai P
06/01/2022, 6:07 PMJai P
06/01/2022, 6:29 PMelif (
        # Recurse into Pydantic models but do _not_ do so for states/datadocs
        isinstance(expr, pydantic.BaseModel)
        and not isinstance(expr, prefect.orion.schemas.states.State)
        and not isinstance(expr, prefect.orion.schemas.data.DataDocument)
    ):
        values = await gather(
            *[visit_nested(getattr(expr, field)) for field in expr.__fields__]
        )
        result = {field: value for field, value in zip(expr.__fields__, values)}
        return typ(**result) if return_data else NoneKevin Kho
Marvin
06/01/2022, 6:42 PMJai P
06/01/2022, 6:42 PMKevin Kho
