KG
01/24/2024, 4:22 PMMarvin
01/24/2024, 4:24 PMjson
{
"version": 1,
"description": "Notify on three consecutive flow failures with a certain tag",
"state_handlers": [
{
"type": "custom_consecutive_failure_handler",
"config": {
"tag_to_track": "critical-flow",
"failure_threshold": 3,
"notification_config": {
"method": "webhook",
"webhook_url": "<https://hooks.example.com/alarms>"
}
}
}
]
}
In the example above, you would need to replace "critical-flow"
with the actual tag you wish to monitor, and "<https://hooks.example.com/alarms>"
with the URL of your notification service.
Unfortunately, GitHub search did not result in any issues directly related to this topic, so it seems we're crafting a solution from documentation and first principles.
Please note that the actual implementation of the custom_consecutive_failure_handler
is not detailed here and would require programming knowledge to develop according to Prefect's state handler API. You would also have to adjust the JSON schema based on your specific version of Prefect and your execution environment.Marvin
01/24/2024, 4:24 PMNate
01/24/2024, 4:25 PMNate
01/24/2024, 6:40 PMfrom prefect import flow, tags
from prefect.events.schemas import DeploymentTrigger
TAG_NAME = "foobar"
@flow
def upstream():
pass
@flow
def downstream():
pass
if __name__ == "__main__":
with tags(TAG_NAME):
upstream()
upstream()
upstream()
downstream.serve(
name="downstream-of-tagged-upstream",
triggers=[
DeploymentTrigger(
expect={"prefect.flow-run.Completed"},
match_related=[
{
"prefect.resource.role": "tag",
"prefect.resource.id": f"prefect.tag.{TAG_NAME}",
},
{
"prefect.resource.role": "flow",
"prefect.resource.name": "upstream",
}
],
threshold=3,
)
]
)
Nate
01/24/2024, 6:40 PM