Hello! I am writing a custom retry logic. When a ...
# ask-community
m
Hello! I am writing a custom retry logic. When a task fails, I run a second task that does a retry. If the retry succeeds, I want the entire flow to succeed. For this reason, I am trying to update the state of the failed task to Completed with this:
Copy code
with get_client(sync_client=True) as client:
        client.set_task_run_state(
            task_run_id=failed_task.state_details.task_run_id,
            force=True,
            state=State(
                type=StateType.COMPLETED,
                name="Completed",
                message="Completed after retry",
            ),
        )
This allows me to update the state to Completed and it actually works. However, the flow still fails mentioning that there are failed tasks. How can I trick the Flow into thinking everything is fine?
n
hi @Mattia Sappa > I am writing a custom retry logic are you using
retry_condition_fn
?
m
No, I am not using it. I am using prefect-dbt to first run
dbt build
and then run
dbt retry
if the build fails. I don't really care about having the task Completed, I am more interested in the flow. The logic is: if
dbt_build_task
fails, then
dbt_retry_task
. If
dbt_retry_task
Completed, then Flow Completed. The problem is that after
dbt_build_task
and
dbt_retry_task
I have many other tasks and returning a Complete() state manually is not very convenient. I am using Prefect 2. So my "custom retry logic" is slightly different, it's more about allowing a task to fail than having a custom retry logic 😄
n
are you familiar with using
return_state
on tasks? that way you can trap exceptions and decide whether it raises. you dont need to return literal state values like
Completed
m
Yes I am doing something like this
Copy code
dbt_command: State = trigger_dbt_cli_command.submit(return_state=True)

    if dbt_command.is_failed():
        dbt_retry()
if
dbt_command
fails, the flow continues but eventually fails
n
what is your flows ultimate return value?
m
Nothing,
None
probably
n
right, so in prefect 2 if you don't have a return value, the final state is based on all the states of the stuff from your flow so if you want a final state of
Completed
return any non-
None
value
m
I see, my trouble is that after
dbt_command
and
dbt_retry
I have other tasks that might happen or not. I can't just return a constant value because the flow will be
Completed
always. I probably need to take into account all tasks and manually decide if it's Failed or Completed
what I wanted to do was a way to tell
dbt_command
to be ignored on the final check if failed
n
what I wanted to do was a way to tell
dbt_command
to be ignored on the final check if failed
I probably need to take into account all tasks and manually decide if it's Failed or Completed
yeah. it seems like you could keep track of some results in a list of something and do some logic at the end of your flow to decide if you should return a literal value to produce a Completed state or to raise and fail
m
Yes ok, I wanted to force it via
set_task_run_state
but it doesn't work. It works in the UI but the flow is still marked as Failed
n
the sdk incorporates the rules like "fail the flow if any tasks failed and no return value" but the API itself doesnt know about that, so thats why you can update the task run state and the flow doesnt know i am pretty sure there's a fairly normal way to do this with the SDK that does not involve using the client to force states
m
It's just that assuming I have 100 tasks, it would be easier to tell one specific task to be ignored rather than listing the rest 99 to check all 100 are as you want them to be
n
can you make an MRE showing how the prefect mechanics are getting in the way here?
Copy code
from prefect import flow, task
from prefect.states import Failed, State

@task
def task_that_might_fail(should_fail: bool = False):
    if should_fail:
        raise ValueError("I failed!")

@task
def other_tasks():
    pass

@flow
def example_flow(should_fail: bool = False):
    states: list[State] = []
    states.append(task_that_might_fail(should_fail=should_fail, return_state=True))
    for _ in range(3):
        states.append(other_tasks.submit(return_state=True))

    return (
        "ok"
        if all([state.is_completed() for state in states])
        else Failed(message="I made an arbitrary choice to fail this")
    )

if __name__ == "__main__":
    example_flow(should_fail=True)
as it stands its not clear to me why you cannot do something like this (2.x)
m
You are right, I was mostly worried about having to handle all the tasks of a flow, but I imagine the tasks with
return_state=False
will raise an exception straight away. So your suggestion is fine, I keep the list of the tasks with
return_state=True
and if I reach the end of the flow, it is because everything else Completed fine and I only need to check the list of states. I think it works fine 👌
n
catjam good to hear! let us know if something about that pattern becomes unwieldy and we can chat more on a convenience util or something
🙌 1
m
Thank you very much for your help @Nate 😄 I am happy now 🟢 😉
catjam 1