James Phoenix
01/22/2021, 7:23 PMfrom prefect import Task
def learning(task, old_state, new_state):
if isinstance(new_state, state.Success):
print("Yay")
return new_state
class MyTask(Task):
def run(self):
return "hello"
task_1 = MyTask()
flow = Flow(name="my_flow", tasks=[task_1], state_handlers=[learning])
state = flow.run()
Zanie
from prefect.engine import state
and from prefect import Flow
this runs as expected[2021-01-22 13:30:33] INFO - prefect.FlowRunner | Beginning Flow run for 'my_flow'
/Users/michaeladkins/prefect/core/src/prefect/engine/flow_runner.py:235: UserWarning: prefect.engine.executors.LocalExecutor has been moved to `prefect.executors.LocalExecutor`, please update your imports
executor = prefect.engine.get_default_executor_class()()
[2021-01-22 13:30:33] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2021-01-22 13:30:33] DEBUG - prefect.FlowRunner | Flow 'my_flow': Handling state change from Scheduled to Running
[2021-01-22 13:30:33] INFO - prefect.TaskRunner | Task 'MyTask': Starting task run...
[2021-01-22 13:30:33] DEBUG - prefect.TaskRunner | Task 'MyTask': Handling state change from Pending to Running
[2021-01-22 13:30:33] DEBUG - prefect.TaskRunner | Task 'MyTask': Calling task.run() method...
[2021-01-22 13:30:33] DEBUG - prefect.TaskRunner | Task 'MyTask': Handling state change from Running to Success
[2021-01-22 13:30:33] INFO - prefect.TaskRunner | Task 'MyTask': Finished task run for task with final state: 'Success'
[2021-01-22 13:30:33] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-01-22 13:30:33] DEBUG - prefect.FlowRunner | Flow 'my_flow': Handling state change from Running to Success
Yay
state
variable which should contain the Prefect states with the result of your flowJames Phoenix
01/23/2021, 1:31 PM