Thomas Fredriksen
09/10/2021, 8:10 AMFlow
class so that all tasks will report state to an external service. From what I undersand, subclassing the Flow
class may not be a good idea as the prefect server will not use the subclass when managing the flows.
I like that prefect has support for state change hooks (state_handlers
), and I was wondering if I can use such a hook to modify the tasks of all tasks of a flow in order to add tasks state handlers that will report the task state change.
Something like this:
def task_state_handler(task, old_state, new_state):
report_state(task, old_state, new_state)
def flow_state_handler(flow, old_state, new_state):
was_pending = old_state.is_queued() or old_state.is_scheduled() or old_state.is_pending() or old_state.is_submitted()
if was_pending and new_state.is_running():
for task in flow.tasks:
task.state_handlers.append(task_state_handler)
Slack ConversationKevin Kho
Thomas Fredriksen
09/10/2021, 4:50 PM