Mattia Sappa
12/04/2024, 3:53 PMwith get_client(sync_client=True) as client:
client.set_task_run_state(
task_run_id=failed_task.state_details.task_run_id,
force=True,
state=State(
type=StateType.COMPLETED,
name="Completed",
message="Completed after retry",
),
)
This allows me to update the state to Completed and it actually works. However, the flow still fails mentioning that there are failed tasks.
How can I trick the Flow into thinking everything is fine?Nate
12/04/2024, 5:17 PMretry_condition_fn
?Nate
12/04/2024, 5:17 PMMattia Sappa
12/04/2024, 5:29 PMdbt build
and then run dbt retry
if the build fails.
I don't really care about having the task Completed, I am more interested in the flow.
The logic is: if dbt_build_task
fails, then dbt_retry_task
. If dbt_retry_task
Completed, then Flow Completed.
The problem is that after dbt_build_task
and dbt_retry_task
I have many other tasks and returning a Complete() state manually is not very convenient.
I am using Prefect 2.
So my "custom retry logic" is slightly different, it's more about allowing a task to fail than having a custom retry logic 😄Nate
12/04/2024, 5:30 PMreturn_state
on tasks? that way you can trap exceptions and decide whether it raises. you dont need to return literal state values like Completed
Nate
12/04/2024, 5:30 PMMattia Sappa
12/04/2024, 5:33 PMdbt_command: State = trigger_dbt_cli_command.submit(return_state=True)
if dbt_command.is_failed():
dbt_retry()
Mattia Sappa
12/04/2024, 5:33 PMdbt_command
fails, the flow continues but eventually failsNate
12/04/2024, 5:34 PMMattia Sappa
12/04/2024, 5:35 PMNone
probablyNate
12/04/2024, 5:36 PMCompleted
return any non-None
valueNate
12/04/2024, 5:36 PMMattia Sappa
12/04/2024, 5:39 PMdbt_command
and dbt_retry
I have other tasks that might happen or not. I can't just return a constant value because the flow will be Completed
always. I probably need to take into account all tasks and manually decide if it's Failed or CompletedMattia Sappa
12/04/2024, 5:40 PMdbt_command
to be ignored on the final check if failedMattia Sappa
12/04/2024, 5:41 PMNate
12/04/2024, 5:41 PMwhat I wanted to do was a way to tellto be ignored on the final check if faileddbt_command
I probably need to take into account all tasks and manually decide if it's Failed or Completedyeah. it seems like you could keep track of some results in a list of something and do some logic at the end of your flow to decide if you should return a literal value to produce a Completed state or to raise and fail
Mattia Sappa
12/04/2024, 5:44 PMset_task_run_state
but it doesn't work. It works in the UI but the flow is still marked as FailedNate
12/04/2024, 5:47 PMMattia Sappa
12/04/2024, 5:49 PMNate
12/04/2024, 6:00 PMfrom prefect import flow, task
from prefect.states import Failed, State
@task
def task_that_might_fail(should_fail: bool = False):
if should_fail:
raise ValueError("I failed!")
@task
def other_tasks():
pass
@flow
def example_flow(should_fail: bool = False):
states: list[State] = []
states.append(task_that_might_fail(should_fail=should_fail, return_state=True))
for _ in range(3):
states.append(other_tasks.submit(return_state=True))
return (
"ok"
if all([state.is_completed() for state in states])
else Failed(message="I made an arbitrary choice to fail this")
)
if __name__ == "__main__":
example_flow(should_fail=True)
as it stands its not clear to me why you cannot do something like this (2.x)Mattia Sappa
12/04/2024, 6:06 PMreturn_state=False
will raise an exception straight away.
So your suggestion is fine, I keep the list of the tasks with return_state=True
and if I reach the end of the flow, it is because everything else Completed fine and I only need to check the list of states.
I think it works fine 👌Nate
12/04/2024, 6:08 PMMattia Sappa
12/04/2024, 6:16 PM