How to migrate a Prefect v1 flow-level state handl...
# ask-community
d
Hey, I’m struggling in 2.0 without a Flow State Handler and wondering what the best approach might be to replicate my 1.0 logic for a 2.0 flow. In 1.0, I had a flow state handler which did the following pseudo logic: • if flow just started: ◦ send slack message • if flow cancelled: ◦ make post request to an endpoint which does thing A • if flow successful and some other params are true: ◦ make post request to an endpoint which does thing B • once flow finished (failed or successful) ◦ post to slack with a custom message including task state counts for failed/success etc (queried from the backend) I know there are Notifications in 2.0 for state changes, but they can’t do the custom logic I need so wondering if there are any suggestions? FWIW I really, really miss state change handers - they were incredibly powerful customisation tools 🙂 I have a feeling you’ll tell me I need to create a subflow and put all this logic in the parent flow, but hoping to avoid that as it seems overkill… hoping someone has some ideas..! 🙏
1
a
Sending notifications is now easier than ever - once you configured notifications block, you can: #1 configure alert on when flow run starts #2 when it'S cancelled #2 completed #4 finished for custom conditions, you can already implement that using any notification block and simple if/else logic handling state we are also working on custom Triggers allowing you to take some action based on specific state transitions
• once flow finished (failed or successful)
◦ post to slack with a custom message including task state counts for failed/success etc (queried from the backend)
this is already the default state of notifications, see the image - if you want to make it even richer, we are totally open to your contribution state handlers still exists, they are just not call state handlers because you would implement that in a normal function, let me check if I can find some example for you
Copy code
from prefect import task, flow
from prefect.blocks.notifications import SlackWebhook


def send_slack_alert(message: str):
    slack_webhook_block = SlackWebhook.load("default")  # adjust to match your Block name
    slack_webhook_block.notify(message)


@task
def always_succeeds_task():
    return "I'm fail safe! ✅"


@flow
def flow_reacting_to_states():
    state = always_succeeds_task(return_state=True)
    if state.name == "Completed":
        send_slack_alert("Important task completed! 🎉")


if __name__ == "__main__":
    flow_reacting_to_states()
you can do the same for any state and you can build a custom module to avoid repeating the logic, or you could build a custom block to do it, tons of options already, will only be more optionality later with Triggers
d
Thanks Anna! Couple of follow ups: • RE doing something like making a POST/GET request on a flow state change (say e.g flow cancelled), how would I do that? you mention
using any notification block
- not entirely sure what is meant by that, do you have an example? or is that one where I’d write a custom block myself? ◦ Can I do this without the UI? it’s important that this solution is configured in code and not via the cloud UI • Any (very rough) timelines on Triggers? sounds v interesting! • You mentioned
they are just not call state handlers because you would implement that in a normal function
- totally get that for tasks (have moved our task handler logic to plain python for tasks no prob), but for flows it seems less obvious..? The above example is state handling task states, not flow states. ◦ What I had in mind is something like the below, though I can’t get it to wait for the tasks to finish executing (please excuse the pseudo-code!)
Copy code
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner

@task()
def sql_task(task_name: str):
    print(f"RUNNING: {task_name}") # some execution done here

def my_tasks():
    task_1 = sql_task.with_options(name="task_1").submit(task_name="task_1", wait_for=[])
    task_2 = sql_task.with_options(name="task_2").submit(task_name="task_2", wait_for=[task_1])
    #... task_3000+

@flow(name="demo_7",task_runner=ConcurrentTaskRunner())
def demo_7():
    print("Started Flow (some start actions here)")
    flow_state = my_tasks() # executing the actual tasks in the flow
    # somehow make the above wait for all of the tasks to finish then take action based on the state...?
    if flow_state.state == "Cancelled":
        print('Failed actions here')
    elif flow_state.state == "...":
        pass # etc...

if __name__ == "__main__":
    demo_7()
e
@David Elliott How about something like this? The key difference is that we return the Prefect futures created on
.submit()
and then
.wait()
for their completion before looking at the resulting states:
Copy code
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner

@task()
def sql_task(task_name: str):
    print(f"RUNNING: {task_name}") # some execution done here

def my_tasks():
    task_1 = sql_task.with_options(name="task_1").submit(task_name="task_1", wait_for=[])
    task_2 = sql_task.with_options(name="task_2").submit(task_name="task_2", wait_for=[task_1])
    #... task_3000+
    # Return the task futures
    return [item for item in locals() if item.startswith("task_")]

@flow(name="demo_7",task_runner=ConcurrentTaskRunner())
def demo_7():
    print("Started Flow (some start actions here)")
    tasks = my_tasks() # executing the actual tasks in the flow
    # Wait for all tasks to finish
    states = [task.wait() for task in tasks]
    # Take action based on state
    if any([s.name == "Cancelled" for s in states]):
        print('Failed actions here')
    if any([s.state == "..." for s in states]):
        pass # etc...

if __name__ == "__main__":
    demo_7()
gratitude thank you 1
👍 1
👀 1
using any notification block
I think Anna means the various notification blocks: Slack, MS Teams, Email (docs)
👍 2
gratitude thank you 1
Any (very rough) timelines on Triggers? sounds v interesting!
Not yet (to my knowledge at least). It’s a feature we’re super excited about, but it’ll take time to develop.
👍 1
a
the only thing I can add is that for flow-level action such as cancel flow run if X, the Triggers will be indeed better suited for your use case -- follow the #CKNSX5WG3 channel, our team is working on this with high prior, can't promise ETA but if I were to guess, I'd say probably by the end of Nov (don't take my word for it, just to show you how high priority this is)
d
@Emil Christensen THANK YOU!! 🙏 That above snippet is exactly what I was failing to figure out (how to wait for the tasks in the flow function). 🌟 @Anna Geller thanks so much again, triggers sound great! I’ll use the above approach for now and will keep an eye on triggers once they come along. We’re under huge time pressure to get 2.0 live (we’ve hit the node limit in 1.0) but more than happy to update to use new functionality as + when it arrives! Slight error in the
locals()
line above so posting my working snippet below for anyone else who comes across it (and added type hints)
Copy code
from prefect import flow, task, allow_failure
from prefect.task_runners import ConcurrentTaskRunner
from typing import List
from prefect.orion.schemas.states import State
from prefect.futures import PrefectFuture
import time

@task()
def sql_task(task_name: str):
    print(f"RUNNING: {task_name}") # some execution done here
    time.sleep(5)
    if task_name == "task_2":
        raise Exception("failed task")

def my_tasks() -> List[PrefectFuture]:
    task_1 = sql_task.with_options(name="task_1").submit(task_name="task_1", wait_for=[])
    task_4 = sql_task.with_options(name="task_4").submit(task_name="task_4", wait_for=[])
    task_2 = sql_task.with_options(name="task_2").submit(task_name="task_2", wait_for=[task_1, task_4])
    task_3 = sql_task.with_options(name="task_3").submit(task_name="task_3", wait_for=[allow_failure(task_2)])
    #... task_3000+
    # Return the task futures
    return [task_future for task_name, task_future in locals().items() if task_name.startswith("task_")]

@flow(name="demo_7",task_runner=ConcurrentTaskRunner())
def demo_7():
    print("Started Flow (some start actions here)")
    tasks = my_tasks() # executing the actual tasks in the flow
    # Wait for all tasks to finish
    states: List[State] = [task.wait() for task in tasks]
    # Take action based on state
    if any([s.name == "Failed" for s in states]):
        print('Failed actions here')
    if any([s.name == "Canceled" for s in states]):
        pass # etc...

if __name__ == "__main__":
    demo_7()
👍 1
🙌 2
e
@David Elliott Nice catch! Glad to hear this is what you were looking for. Hopefully we can get the
allow_failure
issues fixed ASAP. The team is aware of the priority and working on it
🤞 1
gratitude thank you 2
a
@Emil Christensen, @David Elliott thanks for this thread! It serves me a lot of! One question how can I pass the return value between task? Imagine my task_2 needs the result of task_1 and task_4 like an input parameters?