I have a data pipeline I am moving to prefect. At ...
# ask-community
j
I have a data pipeline I am moving to prefect. At the moment, between stages, it writes it's current state to a json file so that it's possible to resume the processing by rereading the saved json file. My thought was to make this json file the output of a task and then it could be used to restart a failed task. Here's the question: Sometimes the failure is external to the data pipeline, but sometimes it's something intrinsic to the job like a corrupt file. In that case, I can edit the saved json file as the save point, delete "corrupted_file.example" from the job, and resume from the edited json file rather than the original one. I know that I can save state and use that to restart a failed task in prefect. Can I edit that saved state before retrying? (in other words, I want to change the output of step 1 or the input to step 2 on the fly to get my job to finish)
1
c
Hey Janet! Fantastic question: yes this is possible - so long as the filename and the method of serialization (in your case,
json
) remains the same
j
Can you provide instructions on how you'd do it please? It sounds like the filename would be passed around rather than the file content json-blob itself so that the file would be editable on the side?
c
I'm sorry I thought you already had a setup that I was agreeing would work - the way I would personally do this is by using 3.0rc and writing a task that looks like this:
Copy code
from prefect.serializers import JSONSerializer


@task(result_storage_key="~/filename.json", result_serializer=JSONSerializer)
def my_json_task(**kwargs):
    ...
    return obj_that_is_json_serializable
now the filename is explicitly set on the task, and you can edit its contents after the task runs. For more in-depth information on how this sort of setup works in 3.0rc you can check out the docs on task caching.
j
No, I have a non-prefect data pipeline that I'm going to move into prefect
👍 1
When it says "return obj_that_is_json_serializable" does that mean I can just return a pydantic model instance? I actually use pydantic to dump my current json file...
c
good question - right now pydantic models are not json serializable but I agree they should be through our
JSONSerializer
. If you open an enhancement request requesting this change we can try to turn it around quickly!
so in the meantime, you'd need to do
Copy code
return pydantic_model.json()
or whatever the correct method call is on the pydantic model
j
Can I use pydantic's serializer?
Copy code
from pydantic_core import to_json

@task(result_storage_key="~/filename.json, result_serializer=to_json)
def my_json_task(checkpoint_data_model: MyCheckPointDataModel):
    ...
    return new_checkpoint_data_model_with_completed_task_info
Otherwise, it would be two steps, one to convert the model to a dict and another to serialize to json.
c
at this moment that won't work as the
to_json
utility won't conform to our serializer interface, but I'm happy to support some version of that (e.g.,
result_serializer="pydantic"
could be a more explicit option too)
j
OK - I logged the enhancement request #14938 In the meantime, would it make more sense to return
Copy code
my_model.model_dump()
which converts the model to a python dictionary Or,
Copy code
my_model.model_dump_json()
which converts the model to a single json-string
n
hey @Janet Carson - one small correction to add here. we should currently be able to serialize model instances fine with the json serializer
Copy code
from pydantic import BaseModel

from prefect import flow, task


class MyModel(BaseModel):
    name: str
    age: int
    address: str
    phone_number: str


@task(persist_result=True, result_serializer="json")
def make_model() -> MyModel:
    return MyModel(name="John", age=30, address="123 Main St", phone_number="555-1234")


@flow
def main():
    model = make_model()
    model_again = make_model() # cached

    assert model == model_again


if __name__ == "__main__":
    main()
does that help in your case?
💯 1
thank you 1
👏 1
j
Copy code
from pydantic import BaseModel

from prefect import flow, task


class MyModel(BaseModel):
    name: str
    age: int
    address: str
    phone_number: str


@task(persist_result=True, result_serializer="json")
def step_one() -> MyModel:
    return MyModel(name="John", age=30, address="123 Main St", phone_number="555-1234-with-bogus-character")

@task(persist_result=True, result_serializer="json")
def step_two(results_from_step_one: MyModel) -> MyModel:
    # imagine there is code in here that crashes due to bad phone number
    return MyModel(name="John", age=30, address="123 Main St", phone_number="555-1234")

@flow
def main():
    model = step_one()
    final_result = step_two(model)

if __name__ == "__main__":
    main()
Assume that step one returned OK, but step two catches the bogus data. I want to edit the model returned in step one and retry step two with the edited result (perhaps by editing that model as stored in a flat file somewhere)
n
hmm would you get away with just wrapping the code that might fail in
try/except
? since
step_two
would be receiving that same
model
that you'd persist to that file wherever your
result_storage
is
Copy code
@task(persist_result=True, result_serializer="json")
def step_two(results_from_step_one: MyModel) -> MyModel:
    try:
        code_that_might_crash(model)
    except SomeException:
        code_that_might_crash(edit_model(model))
    ...
as an aside, a
Field(pattern=...)
would make a bogus value impossible 🙂
j
Not really - I work with experiment data and every experiment is a little different and it's not easy to write that
edit_model
function in there.
But, I don't really want to start over, either
c
this dev pattern is achievable if you're comfortable using 3.0rc
the simplest version would look like this (assuming all data is stored on a local machine):
Copy code
@task(persist_result=True, result_serializer="json", result_storage_key="~/my_model1.json")
def step_one() -> MyModel:
    return MyModel(name="John", age=30, address="123 Main St", phone_number="555-1234-with-bogus-character")
and then you could edit the data living in
~/my_model1.json
directly prior to the next run
there is some nuance about how often you want
step_one
to run but hopefully that doc I shared above can guide you to the right setup for your workflow
n
not to overload with information, but just to throw it on the map as an option if you happened to want to eyeball some changes as a human, you could also pause the flow and
wait_for_input
so you could manually tweak the value as needed
j
@Nate that could be useful! @Chris White can you explain what you mean by "some nuance"?
c
the short version is that in 3.0 the presence of the file specified in
result_storage_key
means the task will return the data in that file. So if you want to rerun the task often, you'll need to either delete the file manually as-needed, or specify a cache policy that determines when it should rerun vs. load the data from the file. These docs explain it better than I could here: https://docs-3.prefect.io/3.0rc/develop/task-caching
j
OK - I don't want to rerun step one once it's returned a file. The file output by step two won't exist unless the step completes successfully, right? So if I have to edit the output of step one, it will only rerun step two?
c
yup that's correct!
j
I think I've got it now, thanks for the help!
🫡 1
🙌 3