https://prefect.io logo
Title
p

Philip MacMenamin

11/07/2022, 9:28 PM
previously I had something along the lines of
with Flow(
    "...",
    state_handlers=[utils.notify_completion, utils.notify_running],...
with
def notify_running(flow: Flow, old_state, new_state) -> State:
And I could ask what the new state is doing. eg if
if new_state.is_running()
I'm reading https://docs.prefect.io/concepts/states/to try to work out how to capture these flow level state changes and am struggling. Any pointers?
1
z

Zanie

11/07/2022, 9:31 PM
We haven’t added hooks for notifications on state transitions client-side yet, I think we’ll probably add a callback in our internal engine that works similar to
state_handlers
.
Are you planning on using Cloud or the open source server?
p

Philip MacMenamin

11/07/2022, 9:32 PM
I have to use the OS server.
z

Zanie

11/07/2022, 9:33 PM
You could add an orchestration rule that fires on every state transition then and send the notifications server-side
p

Philip MacMenamin

11/07/2022, 9:33 PM
(Well, for production I have to.)
z

Zanie

11/07/2022, 9:33 PM
That’d be the “proper” way to do it
We haven’t really exposed dynamic adding of user orchestration rules yet either though, it’s a more advanced use-case.
p

Philip MacMenamin

11/07/2022, 9:35 PM
i guess... i could just put a task at the top and bottom of the flow to kinda get the job done for now.
z

Zanie

11/07/2022, 9:35 PM
😄 that’s one way to do it
p

Philip MacMenamin

11/07/2022, 9:36 PM
yeah, i have to just hammer out this move to Prefect2. OK...
z

Zanie

11/07/2022, 9:36 PM
You can also decorate your function with start / end notifications?
p

Philip MacMenamin

11/07/2022, 9:36 PM
decorate the flow function?
oh, as in write my own decorator to do something at the start and end of a flow? Hmm.
z

Zanie

11/07/2022, 9:38 PM
Like…
from prefect import flow
from functools import wraps 


def notify(fn):
    @wraps(fn)
    def wrapper(*args, **kwargs):
        print("Starting!")
        result = fn()
        print("Ending!")
        return result
    return wrapper


@flow
@notify
def foo():
    return "hi!"

if __name__ == "__main__":
    foo()
❯ PREFECT_DEBUG_MODE=1 python example.py     
15:39:01.803 | DEBUG   | prefect.client - Using ephemeral application with database at sqlite+aiosqlite:////Users/mz/.prefect/orion.db
15:39:01.853 | INFO    | prefect.engine - Created flow run 'gorgeous-waxbill' for flow 'foo'
15:39:01.854 | DEBUG   | Flow run 'gorgeous-waxbill' - Starting 'ConcurrentTaskRunner'; submitted tasks will be run concurrently...
15:39:01.855 | DEBUG   | prefect.task_runner.concurrent - Starting task runner...
15:39:01.855 | DEBUG   | prefect.client - Using ephemeral application with database at sqlite+aiosqlite:////Users/mz/.prefect/orion.db
15:39:01.987 | DEBUG   | Flow run 'gorgeous-waxbill' - Executing flow 'foo' for flow run 'gorgeous-waxbill'...
15:39:01.987 | DEBUG   | Flow run 'gorgeous-waxbill' - Executing foo()
Starting!
Ending!
15:39:02.000 | DEBUG   | prefect.task_runner.concurrent - Shutting down task runner...
15:39:02.000 | INFO    | Flow run 'gorgeous-waxbill' - Finished in state Completed(message=None, type=COMPLETED, result='hi!')
15:39:02.001 | DEBUG   | prefect.client - Using ephemeral application with database at sqlite+aiosqlite:////Users/mz/.prefect/orion.db
p

Philip MacMenamin

11/07/2022, 9:39 PM
right right.
z

Zanie

11/07/2022, 9:39 PM
That’ll be called multiple times on retry though
p

Philip MacMenamin

11/07/2022, 9:39 PM
yeah, that's reasonably elegant.
Not stressed about retry ATM really
z

Zanie

11/07/2022, 9:40 PM
👍 Yeah I think that’s the cleanest path for now
p

Philip MacMenamin

11/07/2022, 9:40 PM
as in, when the flow is retried in its entirety, right?
z

Zanie

11/07/2022, 9:40 PM
Yep
I’m guessing we’ll have hooks for callbacks on state changes in the next couple months as well
p

Philip MacMenamin

11/07/2022, 9:41 PM
yeah, that's not a big thing for us right now. Thanks Michael!