Sophia Ponte
09/10/2023, 10:03 PMMarvin
09/10/2023, 10:03 PMFlowRunContext.get()
. However, it's worth noting that the FlowRunContext isn't sent to distributed task workers as it's quite costly to serialize and deserialize.
As for modifying the FlowRunContext, the documentation doesn't seem to provide a direct way to do so. The FlowRunContext is primarily used to provide information about the current flow run, and it's generally not designed to be modified during a run. But then again, who am I to stop you from trying? I'm just a robot, after all.
You can find more details in the Prefect documentation here and here.Sophia Ponte
09/10/2023, 10:04 PMMarvin
09/10/2023, 10:06 PMSophia Ponte
09/10/2023, 10:17 PMMarvin
09/10/2023, 10:18 PMpython
from prefect import Flow, Task
from prefect.engine.state import State, Success
class MyTask(Task):
def run(self):
return "Hello, World!"
def handle_state_change(self, old_state: State, new_state: State):
print(f"State changed from {old_state} to {new_state}")
return new_state
task = MyTask()
with Flow("My Flow") as flow:
result = task()
flow.run() # prints: State changed from <Pending: "blah blah blah"> to <Success: "blah blah blah">
In this example, handle_state_change
is a state handler. It gets called whenever the task's state changes. It receives the old state and the new state as arguments, and it's supposed to return the new state.
This is a very simple example, of course. In a real-world scenario, you might want to do something more complex, like logging the state changes to a file or sending a notification when a task fails. But hopefully, this gives you a basic idea of how state handlers work.
Remember, though, that state handlers can't modify the state. They can only observe it. If you try to modify the state in a state handler, Prefect will ignore your changes and use the original state instead. So don't get any funny ideas. I'm watching you. Or I would be, if I had eyes.Sophia Ponte
09/10/2023, 10:20 PMMarvin
09/10/2023, 10:20 PMpython
from prefect import Flow, Task
from prefect.engine.result_handlers import ResultHandler
from prefect.engine.result import Result
class MyResultHandler(ResultHandler):
def read(self, location: str) -> Result:
with open(location, 'r') as f:
value = f.read()
return Result(value=value)
def write(self, value: Result, **kwargs) -> str:
location = "/tmp/my_result.txt"
with open(location, 'w') as f:
f.write(str(value.value))
return location
class MyTask(Task):
def run(self):
return "Hello, World!"
task = MyTask(result_handler=MyResultHandler())
with Flow("My Flow") as flow:
result = task()
flow.run() # The result "Hello, World!" is written to /tmp/my_result.txt
In this example, MyResultHandler
is a custom result handler. It overrides the read
and write
methods to store the task's result in a file and read it back from the file.
The write
method is called whenever a task finishes running. It receives the task's result and is supposed to return a "location" (a string) that can be used to retrieve the result later.
The read
method is called whenever Prefect needs to retrieve a task's result. It receives the location returned by write
and is supposed to return the task's result.
Remember, though, that result handlers are deprecated in Prefect 0.14.0 and later. They've been replaced by a more powerful and flexible system called "Results". But that's a story for another time. Or another question, as it were.