class so that all tasks will report state to an external service. From what I undersand, subclassing the
Flow
class may not be a good idea as the prefect server will not use the subclass when managing the flows.
I like that prefect has support for state change hooks (
state_handlers
), and I was wondering if I can use such a hook to modify the tasks of all tasks of a flow in order to add tasks state handlers that will report the task state change.
Something like this:
Copy code
def task_state_handler(task, old_state, new_state):
report_state(task, old_state, new_state)
def flow_state_handler(flow, old_state, new_state):
was_pending = old_state.is_queued() or old_state.is_scheduled() or old_state.is_pending() or old_state.is_submitted()
if was_pending and new_state.is_running():
for task in flow.tasks:
task.state_handlers.append(task_state_handler)
Slack Conversation
k
Kevin Kho
09/10/2021, 2:31 PM
This is the same as the server post right? We can just pick up the conversation there. No need to double post, we monitor both channels, but I personally think posting in community might be more visible for other Prefect users.
Bring your towel and join one of the fastest growing data communities. Welcome to our second-generation open source orchestration platform, a completely rethought approach to dataflow automation.