Thread
#prefect-community
    f

    FuETL

    6 months ago
    Hey guys what is standard way to restart flows via client (library)? I'm trying set to
    set_flow_run_state
    Scheduled (i tried to Pending but this make flow be in idle and not be executed) i want to restart all the tasks.
    client.set_flow_run_state(
            flow_run_id=flow_run_id,
            state=Scheduled()
    )
    Anna Geller

    Anna Geller

    6 months ago
    This is a bit more complicated. Restarts is effectively a UI feature that does quite a lot. If you are interested in implementing this in your Client code, check out this PR that added Restarts in the UI repo: https://github.com/PrefectHQ/ui/pull/285/files My understanding of the process: it queries for Failed task runs, queries for downstream tasks of failed task runs, restarts the task runs in the right order (respecting all the dependencies) and sets new task run states after completion. So it's really not a simple and straightforward process because Restarts don't create a new flow run, instead they restart failed task runs, and update their states after completion. The states of restarted task runs are updated within the history of the original flow run. Perhaps it would be easier for you to use
    retries
    and
    triggers
    to automatically perform some recomputation of task runs depending on what you try to accomplish?
    f

    FuETL

    6 months ago
    Thanks for the @Anna Geller, basically i have a process that could fail because of missing of information and because of this we need a manual intervention in the database that we are querying the data the idea is after the fix my human the flow restart and now can have the missing information. The easy way could be start a new flow run right?
    Or i could do the same query that is done on UI to archive the same result as well?
    Reading what you sent instead of FAIL i could raise a PAUSE state?
    and then trigger a resume ?
    Anna Geller

    Anna Geller

    6 months ago
    Here is how I would tackle this:
    import random
    import prefect
    from prefect import task, Flow
    from prefect.client import Client
    from prefect.tasks.notifications import SlackTask
    from prefect.triggers import all_successful
    from typing import cast
    
    
    def post_to_slack_and_cancel_run_on_task_failure(task, old_state, new_state):
        if new_state.is_failed():
            flow_run_id = prefect.context.flow_run_id
            if isinstance(new_state.result, Exception):
                value = "```{}```".format(repr(new_state.result))
            else:
                value = cast(str, new_state.message)
            msg = (
                f"The task `{prefect.context.task_name}` failed "
                f"in a flow run {flow_run_id} "
                f"with an exception {value}. Cancelling the flow run and manually resuming later."
            )
            SlackTask(message=msg).run()
            client = Client()
            client.cancel_flow_run(flow_run_id)
        return new_state
    
    
    @task(state_handlers=[post_to_slack_and_cancel_run_on_task_failure])
    def run_process_that_may_fail():
        if random.random() > 0.5:
            raise ValueError("Failing due to missing information")
    
    
    @task(trigger=all_successful)
    def run_if_success():
        print("Success")
    
    
    with Flow("state_handler_ex") as flow:
        first_task = run_process_that_may_fail()
        run_if_success(upstream_tasks=[first_task])
    
    if __name__ == "__main__":
        flow.run()
    Note that: • attaching
    all_successful
    trigger is not needed explicitly since this is the default • cancelling a flow run within this state handler is not necessary because due to downstream task being triggered only on Success, this task wouldn't be triggered if this flaky task fails. But wanted to show it in case you need something like that in your logic I wouldn't recommend rebuilding the Restart logic since this is hard to implement and frankly a bit overkill for the problem at hand. I think you can totally solve your issue using: • retries • triggers • conditional tasks • state handlers • and optionally subflows