https://prefect.io logo
Title
f

FuETL

03/10/2022, 12:59 PM
Hey guys what is standard way to restart flows via client (library)? I'm trying set to
set_flow_run_state
Scheduled (i tried to Pending but this make flow be in idle and not be executed) i want to restart all the tasks.
client.set_flow_run_state(
        flow_run_id=flow_run_id,
        state=Scheduled()
)
a

Anna Geller

03/10/2022, 1:44 PM
This is a bit more complicated. Restarts is effectively a UI feature that does quite a lot. If you are interested in implementing this in your Client code, check out this PR that added Restarts in the UI repo: https://github.com/PrefectHQ/ui/pull/285/files My understanding of the process: it queries for Failed task runs, queries for downstream tasks of failed task runs, restarts the task runs in the right order (respecting all the dependencies) and sets new task run states after completion. So it's really not a simple and straightforward process because Restarts don't create a new flow run, instead they restart failed task runs, and update their states after completion. The states of restarted task runs are updated within the history of the original flow run. Perhaps it would be easier for you to use
retries
and
triggers
to automatically perform some recomputation of task runs depending on what you try to accomplish?
f

FuETL

03/10/2022, 2:20 PM
Thanks for the @Anna Geller, basically i have a process that could fail because of missing of information and because of this we need a manual intervention in the database that we are querying the data the idea is after the fix my human the flow restart and now can have the missing information. The easy way could be start a new flow run right?
Or i could do the same query that is done on UI to archive the same result as well?
Reading what you sent instead of FAIL i could raise a PAUSE state?
and then trigger a resume ?
a

Anna Geller

03/10/2022, 2:37 PM
Here is how I would tackle this:
import random
import prefect
from prefect import task, Flow
from prefect.client import Client
from prefect.tasks.notifications import SlackTask
from prefect.triggers import all_successful
from typing import cast


def post_to_slack_and_cancel_run_on_task_failure(task, old_state, new_state):
    if new_state.is_failed():
        flow_run_id = prefect.context.flow_run_id
        if isinstance(new_state.result, Exception):
            value = "```{}```".format(repr(new_state.result))
        else:
            value = cast(str, new_state.message)
        msg = (
            f"The task `{prefect.context.task_name}` failed "
            f"in a flow run {flow_run_id} "
            f"with an exception {value}. Cancelling the flow run and manually resuming later."
        )
        SlackTask(message=msg).run()
        client = Client()
        client.cancel_flow_run(flow_run_id)
    return new_state


@task(state_handlers=[post_to_slack_and_cancel_run_on_task_failure])
def run_process_that_may_fail():
    if random.random() > 0.5:
        raise ValueError("Failing due to missing information")


@task(trigger=all_successful)
def run_if_success():
    print("Success")


with Flow("state_handler_ex") as flow:
    first_task = run_process_that_may_fail()
    run_if_success(upstream_tasks=[first_task])

if __name__ == "__main__":
    flow.run()
Note that: • attaching
all_successful
trigger is not needed explicitly since this is the default • cancelling a flow run within this state handler is not necessary because due to downstream task being triggered only on Success, this task wouldn't be triggered if this flaky task fails. But wanted to show it in case you need something like that in your logic I wouldn't recommend rebuilding the Restart logic since this is hard to implement and frankly a bit overkill for the problem at hand. I think you can totally solve your issue using: • retries • triggers • conditional tasks • state handlers • and optionally subflows