Joe
09/12/2023, 9:12 PMNate
09/12/2023, 10:19 PMfrom prefect import flow, task, unmapped
from pydantic import BaseModel, Field
class DataModel(BaseModel):
foo: str
bar: int
baz: list = Field(default_factory=list)
@task
async def call_my_service(item, client):
pass
@flow(log_prints=True)
async def ai_workflow(input_data: DataModel):
print(f"got {input_data}")
client = get_some_service_client()
await call_my_service.map(input_data.baz, unmapped(client))
if __name__ == "__main__":
import asyncio
asyncio.run(
ai_workflow(
{"foo": "marvin", "bar": 42, "baz": [1, 2, 3]}
)
)
where you're sharing the client instance across tasks and deserializing via the model given to the flow signature
not exactly sure how pydantic plays with binary formats as you mention, but perhaps this could be a starting placeJoe
09/13/2023, 12:15 AMNate
09/13/2023, 12:20 AMsince I know there's data science folks about, I thought they might have some lessons learned in specific applicationsfeel free to ask if you've specific questions about prefect / infra stuff