Janet Carson
08/14/2024, 5:41 PMChris White
json
) remains the sameJanet Carson
08/14/2024, 5:49 PMChris White
from prefect.serializers import JSONSerializer
@task(result_storage_key="~/filename.json", result_serializer=JSONSerializer)
def my_json_task(**kwargs):
...
return obj_that_is_json_serializable
now the filename is explicitly set on the task, and you can edit its contents after the task runs. For more in-depth information on how this sort of setup works in 3.0rc you can check out the docs on task caching.Janet Carson
08/14/2024, 5:53 PMJanet Carson
08/14/2024, 5:55 PMChris White
JSONSerializer
. If you open an enhancement request requesting this change we can try to turn it around quickly!Chris White
return pydantic_model.json()
or whatever the correct method call is on the pydantic modelJanet Carson
08/14/2024, 6:05 PMfrom pydantic_core import to_json
@task(result_storage_key="~/filename.json, result_serializer=to_json)
def my_json_task(checkpoint_data_model: MyCheckPointDataModel):
...
return new_checkpoint_data_model_with_completed_task_info
Otherwise, it would be two steps, one to convert the model to a dict and another to serialize to json.Chris White
to_json
utility won't conform to our serializer interface, but I'm happy to support some version of that (e.g., result_serializer="pydantic"
could be a more explicit option too)Janet Carson
08/14/2024, 6:16 PMmy_model.model_dump()
which converts the model to a python dictionary
Or,
my_model.model_dump_json()
which converts the model to a single json-stringNate
08/14/2024, 6:17 PMfrom pydantic import BaseModel
from prefect import flow, task
class MyModel(BaseModel):
name: str
age: int
address: str
phone_number: str
@task(persist_result=True, result_serializer="json")
def make_model() -> MyModel:
return MyModel(name="John", age=30, address="123 Main St", phone_number="555-1234")
@flow
def main():
model = make_model()
model_again = make_model() # cached
assert model == model_again
if __name__ == "__main__":
main()
does that help in your case?Janet Carson
08/14/2024, 6:22 PMfrom pydantic import BaseModel
from prefect import flow, task
class MyModel(BaseModel):
name: str
age: int
address: str
phone_number: str
@task(persist_result=True, result_serializer="json")
def step_one() -> MyModel:
return MyModel(name="John", age=30, address="123 Main St", phone_number="555-1234-with-bogus-character")
@task(persist_result=True, result_serializer="json")
def step_two(results_from_step_one: MyModel) -> MyModel:
# imagine there is code in here that crashes due to bad phone number
return MyModel(name="John", age=30, address="123 Main St", phone_number="555-1234")
@flow
def main():
model = step_one()
final_result = step_two(model)
if __name__ == "__main__":
main()
Assume that step one returned OK, but step two catches the bogus data. I want to edit the model returned in step one and retry step two with the edited result (perhaps by editing that model as stored in a flat file somewhere)Nate
08/14/2024, 6:30 PMtry/except
?
since step_two
would be receiving that same model
that you'd persist to that file wherever your result_storage
is
@task(persist_result=True, result_serializer="json")
def step_two(results_from_step_one: MyModel) -> MyModel:
try:
code_that_might_crash(model)
except SomeException:
code_that_might_crash(edit_model(model))
...
as an aside, a Field(pattern=...)
would make a bogus value impossible 🙂Janet Carson
08/14/2024, 6:32 PMedit_model
function in there.Janet Carson
08/14/2024, 6:32 PMChris White
Chris White
@task(persist_result=True, result_serializer="json", result_storage_key="~/my_model1.json")
def step_one() -> MyModel:
return MyModel(name="John", age=30, address="123 Main St", phone_number="555-1234-with-bogus-character")
and then you could edit the data living in ~/my_model1.json
directly prior to the next runChris White
step_one
to run but hopefully that doc I shared above can guide you to the right setup for your workflowNate
08/14/2024, 6:37 PMwait_for_input
so you could manually tweak the value as neededJanet Carson
08/14/2024, 6:43 PMChris White
result_storage_key
means the task will return the data in that file. So if you want to rerun the task often, you'll need to either delete the file manually as-needed, or specify a cache policy that determines when it should rerun vs. load the data from the file.
These docs explain it better than I could here: https://docs-3.prefect.io/3.0rc/develop/task-cachingJanet Carson
08/14/2024, 6:49 PMChris White
Janet Carson
08/14/2024, 6:50 PM