Андрій Демиденко
01/14/2022, 1:06 PMAnna Geller
import prefect
from prefect import task, Flow
from prefect.engine.state import Finished
def log_state(obj, old_state, new_state):
if isinstance(new_state, Finished):
logger = prefect.context.get("logger")
task_name = prefect.context.get("task_name")
<http://logger.info|logger.info>("Logging task run state for the task %s: %s", task_name, new_state)
@task(state_handlers=[log_state])
def say_hi():
print("Hi")
@task(state_handlers=[log_state])
def say_hello():
print("Hello")
with Flow("print_states_task_level") as flow:
t1 = say_hi()
t2 = say_hello()
if __name__ == "__main__":
state = flow.run()
This will log the task name and its final state as follows:
└── 15:08:14 | INFO | Logging task run state for the task say_hi: <Success: "Task run succeeded.">
Андрій Демиденко
01/14/2022, 2:16 PMAnna Geller
Андрій Демиденко
01/14/2022, 2:48 PMAnna Geller
Андрій Демиденко
01/14/2022, 2:56 PMАндрій Демиденко
01/14/2022, 3:03 PM