https://prefect.io logo
j

James Phoenix

01/22/2021, 7:23 PM
Copy code
from prefect import Task


def learning(task, old_state, new_state):
    if isinstance(new_state, state.Success):
        print("Yay")
    return new_state

class MyTask(Task):
    def run(self):
        return "hello"

task_1 = MyTask()
flow = Flow(name="my_flow", tasks=[task_1], state_handlers=[learning])
state = flow.run()
z

Zanie

01/22/2021, 7:31 PM
Hi James! With the addition of
from prefect.engine import state
and
from prefect import Flow
this runs as expected
Copy code
[2021-01-22 13:30:33] INFO - prefect.FlowRunner | Beginning Flow run for 'my_flow'
/Users/michaeladkins/prefect/core/src/prefect/engine/flow_runner.py:235: UserWarning: prefect.engine.executors.LocalExecutor has been moved to `prefect.executors.LocalExecutor`, please update your imports
  executor = prefect.engine.get_default_executor_class()()
[2021-01-22 13:30:33] DEBUG - prefect.FlowRunner | Using executor type LocalExecutor
[2021-01-22 13:30:33] DEBUG - prefect.FlowRunner | Flow 'my_flow': Handling state change from Scheduled to Running
[2021-01-22 13:30:33] INFO - prefect.TaskRunner | Task 'MyTask': Starting task run...
[2021-01-22 13:30:33] DEBUG - prefect.TaskRunner | Task 'MyTask': Handling state change from Pending to Running
[2021-01-22 13:30:33] DEBUG - prefect.TaskRunner | Task 'MyTask': Calling task.run() method...
[2021-01-22 13:30:33] DEBUG - prefect.TaskRunner | Task 'MyTask': Handling state change from Running to Success
[2021-01-22 13:30:33] INFO - prefect.TaskRunner | Task 'MyTask': Finished task run for task with final state: 'Success'
[2021-01-22 13:30:33] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2021-01-22 13:30:33] DEBUG - prefect.FlowRunner | Flow 'my_flow': Handling state change from Running to Success
Yay
Although note you are overriding your global
state
variable which should contain the Prefect states with the result of your flow
j

James Phoenix

01/23/2021, 1:31 PM
Prefect, thanks @Zanie