Hi <@ULVA73B9P> I am looking to set up a prefect a...
# ask-marvin
m
Hi @Marvin I am looking to set up a prefect automation (on self-hosted prefect) that will survey for any flow runs moving into crashed state and then trigger a deployment run for me which will do various alerting tasks, let's call this deployment "alerting_deployment" This all seems to be working well, except for the risk that if the "alerting_deployment" flow run crashes, it might continually trigger itself in an infinite loop. I would like to exclude "alerting_deployment" from the automation trigger to avoid this, but can't seem to find a way to do this- it seems like you can match based on tags but not exclude. Is there any way of doing this, or alternatively any other recommended solution to avoid this infinite loop?
m
thought for 498.6 seconds
Great question — and you’re right to guard against a self-trigger loop. Short answer: Yes, you can exclude your alerting deployment from the Automation. In Prefect 3.x, Automations support exclusion filters on events. For flow-run events, you can filter by resource labels like deployment name and by related resources like tags. The UI supports “must not include” label filters. What to do 1) Exclude the alerting deployment by label (recommended) - In your Automation: - Trigger on event name:
prefect.flow-run.crashed
- Add a Resource filter → “Labels must not include” - Key:
prefect.deployment.name
- Value:
alerting_deployment
- This prevents crashes from your alerting deployment from matching the trigger. 2) Or exclude by tag (also works) - Add a distinctive tag to your alerting deployment’s flow runs, e.g.
internal-alerting
- In the Automation, add a filter to “must not include” the tag. Tags are attached to flow-run events as related/tag resources, so in the UI you can exclude the tag as a label/related resource (e.g.,
prefect.tag.internal-alerting
). - If your UI doesn’t show a negative tag filter, invert the logic by using a whitelist tag strategy: add a tag like
monitoring-target
to every deployment you want monitored and filter your Automation to “must include” this tag. Do not apply the tag to
alerting_deployment
. 3) Add a defensive guard in the alerting flow Even with the filter, it’s good practice to make the alerting flow no-op if it’s triggered by its own crash. You can pass the triggering event into the run and check it in code: - In the Automation action (Run deployment), set a parameter like: - `event`:
{{ event }}
- In your alerting flow:
Copy code
from prefect import flow

@flow
def alerting_flow(event: dict | None = None):
    # Defensive guard: if this event is from our own deployment, exit
    deployment_name = (event or {}).get("resource", {}).get("labels", {}).get("prefect.deployment.name")
    if deployment_name == "alerting_deployment":
        return  # avoid a loop

    # ... your alerting logic here ...
Notes and context - Flow-run events include labels like
prefect.deployment.name
,
prefect.deployment.id
, and
prefect.flow.name
. Tags are exposed as related resources (usable in filters). - The event-name filter supports negative matching (e.g., exclude_name), and resource filters support positive and negative label sets; the UI exposes “must include” and “must not include” label conditions. - If your specific self-hosted UI build doesn’t expose a negative filter for the field you want, the whitelist tag strategy is a safe alternative. Docs for deeper reference - Event triggers and automations: Event triggers - Filters API reference: Event filters API - Detect zombies / crash handling patterns: Detect zombie flows Quick confirmation - Are you on Prefect 3.x and configuring this in the UI? If so, I can give exact click-path steps to add the “must not include” filter for
prefect.deployment.name = alerting_deployment
. If you’d prefer to do this via code or YAML, I can share that pattern too.
m
@Marvin I'm on prefect 3.4.14, but I am configuring the automation through the python SDK, not through the UI. I am using
prefect.automations.Automation
and
prefect.events.schemas.automations.EventTrigger
How would I configure the exclusion by label or by tag when defining an automation this way?
m
thought for 557.2 seconds
Thanks — since you’re using the Python SDK on Prefect 3.4.14, here are two concrete ways to define your automation trigger so it won’t self-trigger on your alerting deployment. Option A (recommended): Whitelist by tag (no negatives required) - Tag every deployment you want monitored with a tag like monitoring-target. - Do not tag your alerting deployment with monitoring-target. - Filter your Automation to only trigger when the flow run has that tag. Code (focus on the trigger’s match filters):
Copy code
from prefect.automations import Automation, EventTrigger, Posture
from prefect.events.filters import EventFilter, EventNameFilter, EventRelatedFilter
from prefect.events.actions import RunDeployment

automation = Automation(
    name="crash-alerts",
    trigger=EventTrigger(
        posture=Posture("reactive"),
        match=EventFilter(
            event=EventNameFilter(name=["prefect.flow-run.crashed"]),
            # Only events that have the tag 'monitoring-target'
            related=EventRelatedFilter(role=["tag"], id=["monitoring-target"]),
        ),
        actions=[
            # Use deployment_id or deployment_name depending on your setup
            RunDeployment(
                deployment_name="alerting_deployment",
                parameters={"event": "{{ event }}"},
            )
        ],
    ),
)
Option B: Explicitly exclude the alerting deployment by label Flow-run events include deployment labels like prefect.deployment.name and prefect.deployment.id. You can attach a negative label condition to the resource labels in your filter. Code (negative match on prefect.deployment.name):
Copy code
from prefect.automations import Automation, EventTrigger, Posture
from prefect.events.filters import EventFilter, EventNameFilter, EventResourceFilter
from prefect.events.schemas.events import ResourceSpecification
from prefect.events.actions import RunDeployment

match = EventFilter(
    event=EventNameFilter(name=["prefect.flow-run.crashed"]),
    resource=EventResourceFilter(
        labels=ResourceSpecification({
            # Exclude events whose resource has deployment name 'alerting_deployment'
            "prefect.deployment.name": {
                "negative": { "simple": ["alerting_deployment"] }
            }
        })
    ),
)

automation = Automation(
    name="crash-alerts",
    trigger=EventTrigger(
        posture=Posture("reactive"),
        match=match,
        actions=[
            RunDeployment(
                deployment_name="alerting_deployment",
                parameters={"event": "{{ event }}"},
            )
        ],
    ),
)
Notes - Keys you can rely on: - Deployment name: prefect.deployment.name - Deployment id: prefect.deployment.id - Tags are exposed as related resources with role="tag" and id equal to the tag string (e.g., internal-alerting or monitoring-target). - If you prefer excluding by tag instead, add a negative label spec for tags; however, the whitelist tag approach (Option A) is simpler and avoids negative logic entirely. - Defensive guard in your alerting flow: still a good idea to no-op if it detects it was triggered by its own crash. Pass the event into parameters as shown and check the deployment name inside the flow. References - Event triggers: Event triggers - Filters API: Event filters API - Detect zombie/crashed flows: Detect zombie flows If you share how you’re currently instantiating the Automation (e.g., how you’re creating/saving it), I can tailor the snippet exactly to your pattern and verify the action field (deployment_name vs deployment_id) for your setup.
m
@Marvin when looking to implement Option B, I get typehint errors in the
ResourceSpecification
object as included below. Is this something that can be safely ignored or is the syntax that you have suggested incorrect?
Copy code
Argument of type "dict[str, dict[str, dict[str, list[str]]]]" cannot be assigned to parameter "root" of type "Dict[str, str | List[str]]" in function "__init__"
m
thought for 153.4 seconds
Great catch — that typehint error is valid. My earlier “negative labels” example for ResourceSpecification was incorrect for Prefect 3.4.x. What’s actually supported - ResourceSpecification expects a simple mapping of label key -> value(s), i.e. Dict[str, str | List[str]]. - There is no negative/exclude syntax for labels in the Python SDK’s EventResourceFilter in 3.4.x. - EventNameFilter supports exclude_name/exclude_prefix, but that doesn’t help for deployment labels. - EventRelatedFilter (e.g., for tags) is also positive-only. Recommended patterns (with correct code) A) Whitelist by tag (preferred) Tag all deployments you want monitored with a tag like monitoring-target and omit that tag from your alerting deployment. Then filter on that tag:
Copy code
from prefect.automations import Automation, EventTrigger, Posture, RunDeployment
from prefect.events.filters import EventFilter, EventNameFilter, EventRelatedFilter

automation = Automation(
    name="crash-alerts",
    trigger=EventTrigger(
        posture=Posture("reactive"),
        match=EventFilter(
            event=EventNameFilter(name=["prefect.flow-run.crashed"]),
            related=EventRelatedFilter(
                role=["tag"],
                id=["monitoring-target"],  # only fire when this tag is present
            ),
        ),
        actions=[
            RunDeployment(
                deployment_name="alerting_deployment",
                parameters={"event": "{{ event }}"},
            )
        ],
    ),
)
B) Positive include by deployment label (workaround for “exclude”) If you really want to do this by deployment label instead of tags, you can explicitly include the set of deployments you want monitored. This is brittle (you must keep the list up-to-date) but it works:
Copy code
from prefect.automations import Automation, EventTrigger, Posture, RunDeployment
from prefect.events.filters import EventFilter, EventNameFilter, EventResourceFilter
from prefect.events.schemas.events import ResourceSpecification

automation = Automation(
    name="crash-alerts",
    trigger=EventTrigger(
        posture=Posture("reactive"),
        match=EventFilter(
            event=EventNameFilter(name=["prefect.flow-run.crashed"]),
            resource=EventResourceFilter(
                labels=ResourceSpecification({
                    "prefect.deployment.name": [
                        "data_pipeline_a",
                        "data_pipeline_b",
                        # ... all deployments to monitor (exclude alerting_deployment by omission)
                    ]
                })
            ),
        ),
        actions=[
            RunDeployment(
                deployment_name="alerting_deployment",
                parameters={"event": "{{ event }}"},
            )
        ],
    ),
)
C) Add a defensive guard inside the alerting flow Even with the filter, I recommend making the alerting flow a no-op if it was triggered by its own crash:
Copy code
from prefect import flow

@flow
def alerting_flow(event: dict | None = None):
    deployment_name = (event or {}).get("resource", {}).get("labels", {}).get("prefect.deployment.name")
    if deployment_name == "alerting_deployment":
        return  # avoid self-trigger loops
    # ... alerting logic ...
Why you saw the error - ResourceSpecification in 3.4.x expects a simple dict of label keys to value(s). Nested “negative” structures aren’t supported there, so the type checker is correctly complaining. If you’d like, I can help you: - Switch to the tag-whitelist pattern quickly by showing how to add a tag to your target deployments. - Enumerate the exact label keys available in your events (e.g., deployment/work-pool/project) so you can positively include on those dimensions. References - Event triggers: Event triggers - Filters API reference: Event filters API
m
@Marvin Hmmm, it feels annoying to have to tag over a hundred deployments just for the purpose of excluding a single one. On the other hand, I feel like it would be insufficient to simply add a defensive guard within the alerting flow, as if it crashed before hitting that guard it wouldn't stop the infinite loop. 1. Is there any way this could be resolved by using another automation in tandem, perhaps some clever use of state change? 2. You mentioned that "exclude" filtering is not available in 3.4.x- is there a prefect version in which it is available and do you know if it is in any future plans?
m
thought for 172.8 seconds
Totally fair concerns. Short answers: - There isn’t a negative/exclude labels filter in the Python SDK on 3.4.14 today. - You can avoid tagging 100+ deployments by either: - Moving the alerting deployment to its own project/work pool and positively filtering on your “main” project/pool, or - Using a tandem-automations pattern that pauses the main automation if your alerting deployment crashes. Details and code below. 1) Tandem automations pattern (pause on alerting crash, resume on recovery) Set up three automations: - A: Crash detector (runs your alerting deployment on any crash) - B: Breaker (pauses A if the alerting deployment itself crashes) - C: Healer (resumes A when the alerting deployment completes/succeeds) You’ll create A first, capture its id, then create B and C referencing A’s id. This prevents an infinite loop. In the worst case, a single self-trigger might slip through on the same crash event, but A gets paused immediately so it won’t repeat. Code sketch (fill in your action fields accordingly; you already know how you’re persisting automations):
Copy code
from prefect.automations import Automation, EventTrigger, Posture
from prefect.events.filters import EventFilter, EventNameFilter, EventResourceFilter
from prefect.events.schemas.events import ResourceSpecification
from prefect.events.actions import RunDeployment, PauseAutomation, ResumeAutomation, SendNotification

# A) Crash detector: run alerting_deployment on any flow-run crash
automation_a = Automation(
    name="crash-detector",
    trigger=EventTrigger(
        posture=Posture("reactive"),
        match=EventFilter(
            event=EventNameFilter(name=["prefect.flow-run.crashed"]),
        ),
        actions=[
            RunDeployment(
                deployment_name="alerting_deployment",
                parameters={"event": "{{ event }}"},
            )
        ],
    ),
)
# persist A and capture its id
# automation_a_id = <id after creation>

# B) Breaker: if alerting_deployment crashes, pause A immediately
automation_b = Automation(
    name="breaker-alerting-crash",
    trigger=EventTrigger(
        posture=Posture("reactive"),
        match=EventFilter(
            event=EventNameFilter(name=["prefect.flow-run.crashed"]),
            resource=EventResourceFilter(
                labels=ResourceSpecification({
                    "prefect.deployment.name": "alerting_deployment"
                })
            ),
        ),
        actions=[
            PauseAutomation(
                # Prefer automation_id if you have it; some builds may support name
                automation_id="<automation_a_id>",
            ),
            # Optional: also notify
            # SendNotification(message="Alerting deployment crashed; paused crash-detector.")
        ],
    ),
)

# C) Healer: when alerting_deployment completes or succeeds, resume A
automation_c = Automation(
    name="healer-alerting-recovery",
    trigger=EventTrigger(
        posture=Posture("reactive"),
        match=EventFilter(
            event=EventNameFilter(name=[
                "prefect.flow-run.completed",
                "prefect.flow-run.succeeded",
            ]),
            resource=EventResourceFilter(
                labels=ResourceSpecification({
                    "prefect.deployment.name": "alerting_deployment"
                })
            ),
        ),
        actions=[
            ResumeAutomation(
                automation_id="<automation_a_id>",
            )
        ],
    ),
)
Notes: - Create A first, get its id, then use that id in B and C. If your environment supports pausing by name instead of id, you can use that, but id is safest. - This design prevents any ongoing loop. At most, a single extra trigger might occur on the same crash before B pauses A. 2) Avoid tagging 100+ deployments
If you can move only the alerting deployment to a separate project or work pool, then filter A to include only your “main” project/work pool. That’s a one-time change to just the alerting deployment. For example, if you filter positively by work pool:
Copy code
from prefect.events.filters import EventRelatedFilter

match = EventFilter(
    event=EventNameFilter(name=["prefect.flow-run.crashed"]),
    related=EventRelatedFilter(
        role=["work-pool"],
        id=["main-work-pool"],  # include only your standard pool
    ),
)
This relies on those related resources being present on your events. If you’re unsure, I can help you peek at a sample event payload to confirm the available labels/related resources in your self-hosted build. 3) Why a guard inside the alerting flow isn’t enough You’re right: if the alerting flow crashes before your guard runs, it won’t stop the loop. That’s why the breaker/healer automations are helpful — they act at the automation level to cut off the trigger. 4) Exclude filters availability and plans - In 3.4.14 via the Python SDK, negative/exclude filtering on resource labels is not supported. - EventNameFilter does have exclude fields, but that doesn’t help for “exclude a specific deployment.” - I don’t have a committed release to point you to where negative label filters are available in the SDK. If this would meaningfully simplify your setup, I recommend opening a feature request so we can track it: Prefect feature request. If you can share: - Whether work pool or project labels appear in your flow-run events, or - The exact way you currently create/persist automations, I can tailor the code to your environment, including the correct identifiers for PauseAutomation/ResumeAutomation actions. Links for reference: - Event triggers: Event triggers - Filters API: Event filters API