David Elliott
10/24/2022, 4:08 PMAnna Geller
• once flow finished (failed or successful)
◦ post to slack with a custom message including task state counts for failed/success etc (queried from the backend)this is already the default state of notifications, see the image - if you want to make it even richer, we are totally open to your contribution state handlers still exists, they are just not call state handlers because you would implement that in a normal function, let me check if I can find some example for you
Anna Geller
from prefect import task, flow
from prefect.blocks.notifications import SlackWebhook
def send_slack_alert(message: str):
slack_webhook_block = SlackWebhook.load("default") # adjust to match your Block name
slack_webhook_block.notify(message)
@task
def always_succeeds_task():
return "I'm fail safe! ✅"
@flow
def flow_reacting_to_states():
state = always_succeeds_task(return_state=True)
if state.name == "Completed":
send_slack_alert("Important task completed! 🎉")
if __name__ == "__main__":
flow_reacting_to_states()
Anna Geller
David Elliott
10/24/2022, 8:46 PMusing any notification block
- not entirely sure what is meant by that, do you have an example? or is that one where I’d write a custom block myself?
◦ Can I do this without the UI? it’s important that this solution is configured in code and not via the cloud UI
• Any (very rough) timelines on Triggers? sounds v interesting!
• You mentioned they are just not call state handlers because you would implement that in a normal function
- totally get that for tasks (have moved our task handler logic to plain python for tasks no prob), but for flows it seems less obvious..? The above example is state handling task states, not flow states.
◦ What I had in mind is something like the below, though I can’t get it to wait for the tasks to finish executing (please excuse the pseudo-code!)
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
@task()
def sql_task(task_name: str):
print(f"RUNNING: {task_name}") # some execution done here
def my_tasks():
task_1 = sql_task.with_options(name="task_1").submit(task_name="task_1", wait_for=[])
task_2 = sql_task.with_options(name="task_2").submit(task_name="task_2", wait_for=[task_1])
#... task_3000+
@flow(name="demo_7",task_runner=ConcurrentTaskRunner())
def demo_7():
print("Started Flow (some start actions here)")
flow_state = my_tasks() # executing the actual tasks in the flow
# somehow make the above wait for all of the tasks to finish then take action based on the state...?
if flow_state.state == "Cancelled":
print('Failed actions here')
elif flow_state.state == "...":
pass # etc...
if __name__ == "__main__":
demo_7()
Emil Christensen
10/25/2022, 4:19 PM.submit()
and then .wait()
for their completion before looking at the resulting states:
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
@task()
def sql_task(task_name: str):
print(f"RUNNING: {task_name}") # some execution done here
def my_tasks():
task_1 = sql_task.with_options(name="task_1").submit(task_name="task_1", wait_for=[])
task_2 = sql_task.with_options(name="task_2").submit(task_name="task_2", wait_for=[task_1])
#... task_3000+
# Return the task futures
return [item for item in locals() if item.startswith("task_")]
@flow(name="demo_7",task_runner=ConcurrentTaskRunner())
def demo_7():
print("Started Flow (some start actions here)")
tasks = my_tasks() # executing the actual tasks in the flow
# Wait for all tasks to finish
states = [task.wait() for task in tasks]
# Take action based on state
if any([s.name == "Cancelled" for s in states]):
print('Failed actions here')
if any([s.state == "..." for s in states]):
pass # etc...
if __name__ == "__main__":
demo_7()
Emil Christensen
10/25/2022, 4:20 PMusing any notification blockI think Anna means the various notification blocks: Slack, MS Teams, Email (docs)
Emil Christensen
10/25/2022, 4:21 PMAny (very rough) timelines on Triggers? sounds v interesting!Not yet (to my knowledge at least). It’s a feature we’re super excited about, but it’ll take time to develop.
Anna Geller
David Elliott
10/25/2022, 4:42 PMlocals()
line above so posting my working snippet below for anyone else who comes across it (and added type hints)
from prefect import flow, task, allow_failure
from prefect.task_runners import ConcurrentTaskRunner
from typing import List
from prefect.orion.schemas.states import State
from prefect.futures import PrefectFuture
import time
@task()
def sql_task(task_name: str):
print(f"RUNNING: {task_name}") # some execution done here
time.sleep(5)
if task_name == "task_2":
raise Exception("failed task")
def my_tasks() -> List[PrefectFuture]:
task_1 = sql_task.with_options(name="task_1").submit(task_name="task_1", wait_for=[])
task_4 = sql_task.with_options(name="task_4").submit(task_name="task_4", wait_for=[])
task_2 = sql_task.with_options(name="task_2").submit(task_name="task_2", wait_for=[task_1, task_4])
task_3 = sql_task.with_options(name="task_3").submit(task_name="task_3", wait_for=[allow_failure(task_2)])
#... task_3000+
# Return the task futures
return [task_future for task_name, task_future in locals().items() if task_name.startswith("task_")]
@flow(name="demo_7",task_runner=ConcurrentTaskRunner())
def demo_7():
print("Started Flow (some start actions here)")
tasks = my_tasks() # executing the actual tasks in the flow
# Wait for all tasks to finish
states: List[State] = [task.wait() for task in tasks]
# Take action based on state
if any([s.name == "Failed" for s in states]):
print('Failed actions here')
if any([s.name == "Canceled" for s in states]):
pass # etc...
if __name__ == "__main__":
demo_7()
Emil Christensen
10/25/2022, 5:08 PMallow_failure
issues fixed ASAP. The team is aware of the priority and working on itAleksandr Liadov
12/16/2022, 1:47 PM