FuETL
03/10/2022, 12:59 PMset_flow_run_state
Scheduled (i tried to Pending but this make flow be in idle and not be executed) i want to restart all the tasks.
client.set_flow_run_state(
flow_run_id=flow_run_id,
state=Scheduled()
)
Anna Geller
03/10/2022, 1:44 PMretries
and triggers
to automatically perform some recomputation of task runs depending on what you try to accomplish?case
), retries
and triggers
to automatically perform some recomputation - check out those examples:
• https://discourse.prefect.io/t/how-can-i-trigger-downstream-tasks-based-on-upstream-task-s-state/106#prefect-10-2
• https://discourse.prefect.io/t/how-can-i-stop-the-task-run-based-on-a-custom-logic/83#prefect-10-2
• https://discourse.prefect.io/t/how-can-i-stop-a-flow-run-execution-based-on-a-condition/105#prefect-10-2
• https://discourse.prefect.io/t/how-to-build-a-conditional-flow-of-flows-i-e-trigger-a-different-child-flow-depending-on-the-upstream-flows-state/202FuETL
03/10/2022, 2:20 PMAnna Geller
03/10/2022, 2:37 PMimport random
import prefect
from prefect import task, Flow
from prefect.client import Client
from prefect.tasks.notifications import SlackTask
from prefect.triggers import all_successful
from typing import cast
def post_to_slack_and_cancel_run_on_task_failure(task, old_state, new_state):
if new_state.is_failed():
flow_run_id = prefect.context.flow_run_id
if isinstance(new_state.result, Exception):
value = "```{}```".format(repr(new_state.result))
else:
value = cast(str, new_state.message)
msg = (
f"The task `{prefect.context.task_name}` failed "
f"in a flow run {flow_run_id} "
f"with an exception {value}. Cancelling the flow run and manually resuming later."
)
SlackTask(message=msg).run()
client = Client()
client.cancel_flow_run(flow_run_id)
return new_state
@task(state_handlers=[post_to_slack_and_cancel_run_on_task_failure])
def run_process_that_may_fail():
if random.random() > 0.5:
raise ValueError("Failing due to missing information")
@task(trigger=all_successful)
def run_if_success():
print("Success")
with Flow("state_handler_ex") as flow:
first_task = run_process_that_may_fail()
run_if_success(upstream_tasks=[first_task])
if __name__ == "__main__":
flow.run()
Note that:
• attaching all_successful
trigger is not needed explicitly since this is the default
• cancelling a flow run within this state handler is not necessary because due to downstream task being triggered only on Success, this task wouldn't be triggered if this flaky task fails. But wanted to show it in case you need something like that in your logic
I wouldn't recommend rebuilding the Restart logic since this is hard to implement and frankly a bit overkill for the problem at hand. I think you can totally solve your issue using:
• retries
• triggers
• conditional tasks
• state handlers
• and optionally subflows