https://prefect.io logo
Title
c

Claire Herdeman

10/19/2022, 4:42 PM
Hi, probably straightforward question on Prefect 1.x, how do you force a flow to fail if a task fails?
1
I've got a
AWSCLientWait
task that fails (as desired) when a Batch job failures. All of the following tasks have
TriggerFailed
, but the final state of the flow us
SUCCESS
.
[2022-10-19 15:39:49+0000] INFO - prefect.TaskRunner | Task 'AWSClientWait': Starting task run...
[2022-10-19 15:44:50+0000] INFO - prefect.TaskRunner | FAIL signal raised: FAIL('AWS batch waiter \'JobComplete\' failed with: Waiter JobComplete failed: Waiter encountered a terminal failure state: For expression "jobs[].status" we matched expected path: "FAILED" at least once')
[2022-10-19 15:44:50+0000] INFO - prefect.TaskRunner | Task 'AWSClientWait': Finished task run for task with final state: 'Failed'
[2022-10-19 15:44:50+0000] INFO - prefect.TaskRunner | Task 'load_mapping_to_schema': Starting task run...
[2022-10-19 15:44:50+0000] INFO - prefect.TaskRunner | Task 'load_mapping_to_schema': Finished task run for task with final state: 'TriggerFailed'
[2022-10-19 15:44:50+0000] INFO - prefect.TaskRunner | Task 'transform_and_upsert': Starting task run...
[2022-10-19 15:44:50+0000] INFO - prefect.TaskRunner | Task 'transform_and_upsert': Finished task run for task with final state: 'TriggerFailed'
[2022-10-19 15:44:50+0000] INFO - prefect.TaskRunner | Task 'delete_schema': Starting task run...
[2022-10-19 15:44:50+0000] INFO - prefect.TaskRunner | Task 'delete_schema': Finished task run for task with final state: 'Skipped'
[2022-10-19 15:44:50+0000] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
m

Marwan Sarieddine

10/19/2022, 5:11 PM
Flow run SUCCESS: all reference tasks succeeded
As this log statement indicates, a flow’s state is determined by the reference tasks So if you want to ensure a flow fails when a given task fails, you need to make sure the task’s failure path leads to a failure in a reference task
You can set reference tasks explicitly by calling something like
flow.set_reference_tasks([submitted])
See the docs here for more details
🙌 1
c

Claire Herdeman

10/19/2022, 5:43 PM
Knew it would be straightforward, thanks for the quick response!
🙏 1