<@ULVA73B9P> what syntax do I use to trigger a sla...
# ask-marvin
a
@Marvin what syntax do I use to trigger a slack block notification upon the failure of a run
m
thought for 72.0 seconds
This confirms that the SlackWebhook block accepts a
url
parameter and has various notification types available. When setting up through the UI, you'll just need to provide the webhook URL. The most important things to remember are: 1. Set up the Slack Webhook block first (either via UI or code) 2. Create an automation in Prefect Cloud to trigger on flow run failures 3. The automation will handle everything automatically - no need to modify your flow code Would you like me to provide more specific details about any part of this setup process?
a
@Marvin how do I stop flows from timing out after 10 minutes
m
thought for 55.4 seconds
Let me provide you with a clear answer on how to extend or modify flow timeouts in Prefect 3.x: There are two main ways to handle flow timeouts: 1. Flow-Level Timeout You can set a custom timeout when defining your flow using the
timeout_seconds
parameter:
Copy code
python
from prefect import flow

@flow(timeout_seconds=3600)  # Set timeout to 1 hour
def my_long_running_flow():
    # Your flow logic here
    pass
2. Environment Variable Configuration If you're experiencing timeouts related to database interactions (which are different from flow execution timeouts), you can modify these using environment variables:
Copy code
bash
# For database interaction timeout (default 10 seconds)
export PREFECT_SERVER_DATABASE_TIMEOUT=60

# For database connection timeout (default 5 seconds)
export PREFECT_SERVER_DATABASE_CONNECTION_TIMEOUT=30
Important notes: - If you don't specify
timeout_seconds
, flows will run without a timeout limit - The database timeouts mentioned above are different from flow execution timeouts - they control how long Prefect will wait for database operations - Make sure to set timeouts that are appropriate for your workload - setting them too high might mask underlying issues, while setting them too low might cause unnecessary failures Let me know if you need any clarification or have questions about implementing these timeout configurations!
a
@Marvin "Failed due to a(n)
Flow run reached configured timeout
error: The flow run exceeded the configured timeout of 600 seconds, indicating a potential performance issue or a need to adjust the timeout settings."
m
thought for 63.2 seconds
Based on the error message you're seeing and my research, I can now provide a more specific solution to your timeout issue. The 600-second (10-minute) timeout you're encountering can be modified in several ways depending on how you're running your flow: 1. For Flow Definitions
Copy code
python
from prefect import flow

@flow(timeout_seconds=3600)  # Set to 1 hour (3600 seconds)
def my_long_running_flow():
    # Your flow logic here
    pass
2. For Deployments When creating a deployment, you can specify the timeout in the deployment configuration:
Copy code
python
from prefect import flow
from prefect.deployments import Deployment

@flow
def my_long_running_flow():
    # Your flow logic here
    pass

deployment = my_flow.from_source(
    source="<https://github.com/your/repo>",
    entrypoint="path/to/file.py:my_long_running_flow"
).set_timeout(3600)  # Set to 1 hour
deployment.deploy(name="my-deployment")
3. For Existing Deployments You can update the timeout for an existing deployment using the Prefect UI or API. In the UI, you can modify the deployment settings and update the timeout value. Important Notes: - If you don't specify a timeout, flows will run without a timeout limit - Choose a timeout value that makes sense for your workload - if your flow typically takes 30 minutes, setting a timeout of 40-45 minutes might be reasonable - Consider adding logging or progress updates in your flow to help diagnose where time is being spent if you're consistently hitting timeout limits - If you're running into timeouts frequently, it might be worth profiling your flow to identify potential performance improvements Would you like me to provide more specific guidance based on how you're running your flows (e.g., local execution, deployment, specific infrastructure)?
a
@Marvin where in the UI am I able to specift the timeout. I am using a prefect managed workflow
m
thought for 38.3 seconds
I need to correct my previous response - after researching specifically about the UI settings, I've found that timeout settings are not directly configurable through the Prefect UI for managed workflows. Instead, you'll need to modify the timeout in one of these ways: 1. Update the Flow Code This is the most straightforward approach. Modify your flow definition to include the timeout:
Copy code
python
from prefect import flow

@flow(timeout_seconds=3600)  # Set to 1 hour
def my_long_running_flow():
    # Your flow logic here
    pass
2. Update the Deployment Configuration When creating or updating your deployment, set the timeout in your deployment code:
Copy code
python
from prefect import flow

@flow
def my_long_running_flow():
    pass

if __name__ == "__main__":
    my_long_running_flow.from_source(
        source="your-source",
        entrypoint="path/to/file.py:my_long_running_flow"
    ).set_timeout(3600).deploy(name="my-deployment", work_pool_name="your-work-pool")
Since you're using a managed workflow, you'll need to redeploy your flow with the updated timeout settings. The UI itself doesn't provide direct controls for modifying timeout settings - these need to be specified in your flow or deployment code. Would you like help with updating your specific flow or deployment with the new timeout settings?
a
@Marvin AttributeError: 'Flow' object has no attribute 'set_timeout'
m
thought for 76.5 seconds
I apologize for my previous incorrect response. Let me provide the correct way to set a timeout for your flow in Prefect 3.x: The timeout needs to be set at the flow level, not in the deployment configuration. Here's the correct way to do it:
Copy code
python
from prefect import flow

@flow(timeout_seconds=3600)  # Set to 1 hour
def my_long_running_flow():
    # Your flow logic here
    pass

if __name__ == "__main__":
    my_long_running_flow.from_source(
        source="your-source",
        entrypoint="path/to/file.py:my_long_running_flow"
    ).deploy(name="my-deployment", work_pool_name="your-work-pool")
The key difference is that the
timeout_seconds
parameter is set in the flow decorator, not in the deployment configuration. The
set_timeout()
method I incorrectly suggested earlier doesn't exist. If you already have a flow defined and want to modify its timeout, you'll need to: 1. Update the flow decorator to include the timeout 2. Redeploy the flow For example, if your current flow looks like this:
Copy code
python
@flow
def my_long_running_flow():
    pass
Change it to:
Copy code
python
@flow(timeout_seconds=3600)  # or whatever timeout you need
def my_long_running_flow():
    pass
Then redeploy the flow. The new timeout setting will apply to all future runs of this flow. Would you like help with updating your specific flow code with the correct timeout setting?
a
@Marvin none of these things are working
m
thought for 67.3 seconds
I apologize for the confusion in my previous responses. Let's try a different approach. Could you please tell me: 1. Are you running this flow as part of a deployment? 2. Can you share a simplified version of your flow code? 3. Are you using a work pool, and if so, what type (Kubernetes, Docker, Process, etc.)? This information will help me provide a more accurate solution. The fact that none of the previous suggestions worked suggests we might need to look at infrastructure-specific timeout settings or work pool configurations. Also, just to verify - when you say you're using a "prefect managed workflow", are you using Prefect Cloud with managed work pools? This would help me understand which specific timeout settings would apply in your case.
thought for 76.1 seconds
Since you're using a managed work pool, you don't need to configure it manually. However, you might want to check your work pool settings in the Prefect Cloud UI to ensure it's configured for longer-running tasks. The key changes here are: 1. Increased HTTP timeout in the
push_action_api
task 2. Added task-level timeouts to both tasks 3. Added better error handling for task submissions 4. Maintained the flow-level timeout of 3600 seconds (1 hour) Would you like me to explain any of these changes in more detail? Also, are you seeing any specific error messages when the timeout occurs?
a
@Marvin why isn't this working to trigger? from prefect import flow from prefect*.*events import DeploymentEventTrigger from prefect*.runner.*storage import GitRepository from prefect_github import GitHubCredentials if name == "__main__"*:* github_repo = GitRepository( *url="https://github.com/Minnesota-Nurses-Association/mna.git",* *credentials=GitHubCredentials.load("github"),* ) # Downstream deployment: run after the upstream deployment completes flow*.*from_source( *source=github_repo,* *entrypoint="rETL/retl_steps/retl_new_remove.py:main",* )*.*deploy( *name="new-remove-retl",* *work_pool_name="mna_pool",* *triggers=*[ DeploymentEventTrigger( *expect={"prefect.flow-run.Completed"},* # Match the upstream deployment by name *match_related={"prefect.resource.name":* "new-remove-query-deployment"}*,* # (optional) You can also pass parameters: # parameters={"event": "{{ event }}"}, )*,* ]*,* )
m
thought for 494.1 seconds
Short answer: your trigger’s filter likely isn’t matching the upstream event. A couple common gotchas with DeploymentEventTrigger in Prefect 3.x: -
prefect.resource.name
for a deployment is flow-name/deployment-name, not just the deployment name - Use a list for
expect
(e.g.,
["prefect.flow-run.Completed"]
) - It helps to also include the resource role so you’re matching the upstream deployment specifically Try this adjusted snippet:
Copy code
from prefect import flow
from prefect.events import DeploymentEventTrigger
from prefect.runner.storage import GitRepository
from prefect_github import GitHubCredentials

if __name__ == "__main__":
    github_repo = GitRepository(
        url="<https://github.com/Minnesota-Nurses-Association/mna.git>",
        credentials=GitHubCredentials.load("github"),
    )

    # Replace <UPSTREAM_FLOW_NAME> with the actual upstream flow's name
    flow.from_source(
        source=github_repo,
        entrypoint="rETL/retl_steps/retl_new_remove.py:main",
    ).deploy(
        name="new-remove-retl",
        work_pool_name="mna_pool",
        triggers=[
            DeploymentEventTrigger(
                expect=["prefect.flow-run.Completed"],
                match_related={
                    "prefect.resource.role": "deployment",
                    "prefect.resource.name": "<UPSTREAM_FLOW_NAME>/new-remove-query-deployment",
                },
                # Optionally pass event into your flow params if you want
                # parameters={"event": "{{ event }}"},
            ),
        ],
    )
How to find the exact labels to match: - In the UI, open a completed run of the upstream deployment, go to the Events tab, open the “prefect.flow-run.Completed” event, and inspect Related resources. Copy the exact labels (especially
prefect.resource.name
) for the deployment. This is the most reliable way to get the right value. - Alternatively, temporarily loosen the filter to confirm triggers fire:
Copy code
DeploymentEventTrigger(
      expect=["prefect.flow-run.Completed"],
      match_related={"prefect.resource.role": "deployment"},
  )
Once you see it triggering, tighten the
match_related
to the exact name. Other things to double‑check: - Both deployments are in the same Prefect workspace. - You have an active worker polling the
mna_pool
work pool (the trigger schedules the downstream run; a worker still has to pick it up). - You’re on Prefect 3.x (triggers are configured via
flow.from_source(...).deploy(..., triggers=[...])
in 3.x, not via the old 2.x APIs). Docs for reference: - Creating deployment triggers: Creating deployment triggers - Events/trigger classes exported in `prefect.events`: Deployment trigger schema If you can share the upstream flow name and a screenshot of the Completed event’s Related resources, I can give you the exact
match_related
filter to use.
a
@Marvin
Copy code
"account": "38952a0f-7f5e-4040-9b1a-91758ac48b18",
  "event": "prefect.task-run.Completed",
  "occurred": "2025-08-21T13:00:52.188Z",
  "payload": {
    "intended": {
      "from": "RUNNING",
      "to": "COMPLETED"
    },
    "initial_state": {
      "type": "RUNNING",
      "name": "Running",
      "message": "",
      "state_details": {}
    },
    "validated_state": {
      "type": "COMPLETED",
      "name": "Completed",
      "message": "",
      "state_details": {
        "untrackable_result": true
      },
      "data": null
    },
    "task_run": {
      "name": "run_new_remove_union-dcb",
      "task_key": "run_new_remove_union-19ff5f9c",
      "dynamic_key": "dcbc9548-f014-40ac-af44-b163f70e239a",
      "empirical_policy": {
        "max_retries": 0,
        "retry_delay_seconds": 0,
        "retries": 0,
        "retry_delay": 0
      },
      "tags": [],
      "labels": {},
      "task_inputs": {},
      "run_count": 1,
      "flow_run_run_count": 1,
      "expected_start_time": "2025-08-21T13:00:38.852241Z",
      "start_time": "2025-08-21T13:00:38.859322Z",
      "end_time": "2025-08-21T13:00:52.188045Z",
      "total_run_time": 13.328723
    }
  },
  "received": "2025-08-21T13:00:52.223Z",
  "related": [
    {
      "prefect.resource.id": "prefect.flow-run.068a32f2-4a81-729b-8000-6ca24085b9f3",
      "prefect.resource.role": "flow-run",
      "prefect.resource.name": "rigorous-mustang"
    },
    {
      "prefect.resource.id": "prefect.flow.01923fe6-34e8-4f3e-8db0-5c2f0e4128a7",
      "prefect.resource.role": "flow",
      "prefect.resource.name": "main"
    },
    {
      "prefect.resource.id": "prefect.deployment.1633c415-c4c4-4926-8ae6-7240341eba1f",
      "prefect.resource.role": "deployment",
      "prefect.resource.name": "new-remove-query-deployment",
      "prefect.deployment.version-id": "06891394-b9be-7153-8000-3923b78bfffd",
      "prefect.deployment.version-type": "vcs:git",
      "prefect.deployment.version": "c55a2069"
    },
    {
      "prefect.resource.id": "prefect.work-queue.0611e979-d7ee-4839-be0d-75fe5bba41f6",
      "prefect.resource.role": "work-queue",
      "prefect.resource.name": "default"
    },
    {
      "prefect.resource.id": "prefect.work-pool.0bdf1f64-8bd0-4d51-a772-a8ebb4ea2bcc",
      "prefect.resource.role": "work-pool",
      "prefect.resource.name": "mna_pool"
    },
    {
      "prefect.resource.id": "prefect.tag.auto-scheduled",
      "prefect.resource.role": "tag"
    }
  ],
  "resource": {
    "prefect.resource.id": "prefect.task-run.0198ccb7-9044-7af5-8974-e6ca11ed188a",
    "prefect.resource.name": "run_new_remove_union-dcb",
    "prefect.run-count": "1",
    "prefect.state-message": "",
    "prefect.state-name": "Completed",
    "prefect.state-timestamp": "2025-08-21T13:00:52.188045+00:00",
    "prefect.state-type": "COMPLETED",
    "prefect.orchestration": "client"
  },
  "workspace": "16fe9013-2669-404a-bbbb-a61ae0e696ed"
}
m
thought for 54.6 seconds
Thanks for sharing the event — that explains it. What you posted is a prefect.task-run.Completed event. Your trigger (as written earlier) is waiting for prefect.flow-run.Completed, so it won’t fire on the task-run event. Here’s how to make it work based on the labels in your event: If you want to trigger when the upstream FLOW finishes successfully - Use expect=["prefect.flow-run.Completed"] - Match the upstream deployment via related resources (you can match by name or, more robustly, by id) Example:
Copy code
triggers=[
    DeploymentEventTrigger(
        expect=["prefect.flow-run.Completed"],
        match_related={
            "prefect.resource.role": "deployment",
            "prefect.resource.name": "new-remove-query-deployment",
            # even better — match by id to avoid issues if the name changes:
            # "prefect.resource.id": "prefect.deployment.1633c415-c4c4-4926-8ae6-7240341eba1f",
        },
        # parameters={"event": "{{ event }}"},  # optional: pass event into flow params
    )
]
If you actually want to trigger as soon as that TASK completes - Change expect to prefect.task-run.Completed - Optionally add a match on the task-run itself and keep the deployment in match_related Example:
Copy code
triggers=[
    DeploymentEventTrigger(
        expect=["prefect.task-run.Completed"],
        match={
            "prefect.resource.role": "task-run",
            "prefect.resource.name": "run_new_remove_union-dcb",
        },
        match_related={
            "prefect.resource.role": "deployment",
            "prefect.resource.name": "new-remove-query-deployment",
            # or by id:
            # "prefect.resource.id": "prefect.deployment.1633c415-c4c4-4926-8ae6-7240341eba1f",
        },
    )
]
Important nits - Use a list for expect, not a set. In Python,
{"prefect.flow-run.Completed"}
is a set; prefer
["prefect.flow-run.Completed"]
. - Triggers live on the downstream deployment. After deploying, confirm in the UI that your deployment shows the trigger under the Triggers tab and that it’s enabled. - Ensure a worker is running for the mna_pool work pool so the triggered run can be picked up. Refs - Creating deployment triggers: Creating deployment triggers - Deployment trigger schema/API: Deployment trigger schema If you can confirm whether you want to chain on the flow completion or that specific task completion, I can give you the exact trigger block you should drop into your deploy code.
a
Copy code
{
  "id": "068a7188-44f4-74e1-8000-b37afb9c156e",
  "account": "38952a0f-7f5e-4040-9b1a-91758ac48b18",
  "event": "prefect.flow-run.Completed",
  "occurred": "2025-08-21T13:00:52.309Z",
  "payload": {
    "intended": {
      "from": "RUNNING",
      "to": "COMPLETED"
    },
    "initial_state": {
      "type": "RUNNING",
      "name": "Running",
      "timestamp": "2025-08-21T13:00:38.648723+00:00",
      "id": "068a7187-6a60-7fd2-8000-11b92ef15424"
    },
    "validated_state": {
      "type": "COMPLETED",
      "name": "Completed",
      "timestamp": "2025-08-21T13:00:52.309689+00:00",
      "id": "068a7188-44f4-74e1-8000-b37afb9c156e"
    }
  },
  "received": "2025-08-21T13:00:52.586Z",
  "related": [
    {
      "prefect.resource.id": "prefect.flow.01923fe6-34e8-4f3e-8db0-5c2f0e4128a7",
      "prefect.resource.role": "flow",
      "prefect.resource.name": "main"
    },
    {
      "prefect.resource.id": "prefect.deployment.1633c415-c4c4-4926-8ae6-7240341eba1f",
      "prefect.resource.role": "deployment",
      "prefect.resource.name": "new-remove-query-deployment",
      "prefect.deployment.version-id": "06891394-b9be-7153-8000-3923b78bfffd",
      "prefect.deployment.version-type": "vcs:git",
      "prefect.deployment.version": "c55a2069"
    },
    {
      "prefect.resource.id": "prefect.work-queue.0611e979-d7ee-4839-be0d-75fe5bba41f6",
      "prefect.resource.role": "work-queue",
      "prefect.resource.name": "default"
    },
    {
      "prefect.resource.id": "prefect.work-pool.0bdf1f64-8bd0-4d51-a772-a8ebb4ea2bcc",
      "prefect.resource.role": "work-pool",
      "prefect.resource.name": "mna_pool",
      "prefect.work-pool.type": "prefect:managed"
    },
    {
      "prefect.resource.id": "prefect.tag.auto-scheduled",
      "prefect.resource.role": "tag"
    },
    {
      "prefect.resource.id": "prefect.schedule.5944df2f-907d-42a2-ba4f-8eba094ed9c2",
      "prefect.resource.role": "creator",
      "prefect.resource.name": "CronSchedule"
    }
  ],
  "resource": {
    "prefect.resource.id": "prefect.flow-run.068a32f2-4a81-729b-8000-6ca24085b9f3",
    "prefect.resource.name": "rigorous-mustang",
    "prefect.run-count": "1",
    "prefect.state-message": "",
    "prefect.state-name": "Completed",
    "prefect.state-timestamp": "2025-08-21T13:00:52.309689+00:00",
    "prefect.state-type": "COMPLETED"
  },
  "workspace": "16fe9013-2669-404a-bbbb-a61ae0e696ed"
}
@Marvin sorry this is the right one
m
thought for 44.2 seconds
Perfect — thanks. That event will match, so a couple concrete fixes: 1) Use a list for expect (not a set) Your earlier snippet used a set:
{"prefect.flow-run.Completed"}
. Use
["prefect.flow-run.Completed"]
. 2) Match the upstream deployment by id or name from the event you sent From your event: - deployment id:
prefect.deployment.1633c415-c4c4-4926-8ae6-7240341eba1f
- deployment name:
new-remove-query-deployment
I recommend matching by id to be unambiguous:
Copy code
from prefect import flow
from prefect.events import DeploymentEventTrigger
from prefect.runner.storage import GitRepository
from prefect_github import GitHubCredentials

if __name__ == "__main__":
    github_repo = GitRepository(
        url="<https://github.com/Minnesota-Nurses-Association/mna.git>",
        credentials=GitHubCredentials.load("github"),
    )

    flow.from_source(
        source=github_repo,
        entrypoint="rETL/retl_steps/retl_new_remove.py:main",
    ).deploy(
        name="new-remove-retl",
        work_pool_name="mna_pool",
        triggers=[
            DeploymentEventTrigger(
                expect=["prefect.flow-run.Completed"],
                match_related={
                    "prefect.resource.role": "deployment",
                    "prefect.resource.id": "prefect.deployment.1633c415-c4c4-4926-8ae6-7240341eba1f",
                    # or use name if you prefer:
                    # "prefect.resource.name": "new-remove-query-deployment",
                },
                # parameters={"event": "{{ event }}"},  # optional
            )
        ],
    )
3) Re-deploy, then test with a fresh upstream run Triggers are not retroactive. After you deploy the downstream with the trigger, kick off a new run of the upstream deployment and confirm a downstream flow run is created. 4) Sanity checks - UI: open the downstream deployment and confirm the trigger is listed and enabled. - Worker: ensure a worker is polling the mna_pool so the triggered run gets picked up. Docs for reference: - Creating deployment triggers: Creating deployment triggers - Trigger schema: Deployment trigger schema If you redeploy with the snippet above and the new upstream run still doesn’t create a downstream run, ping me and I’ll help inspect the automation and event filters further.
a
@Marvin is it true that a trigger won't go based on a quick run?
m
thought for 40.2 seconds
Short answer: No — a Quick Run should still fire a deployment trigger. Triggers react to events from any flow run (scheduled, Quick Run, API, etc.) as long as the event matches your filter and the trigger existed before the run. If your Quick Run didn’t trigger, it’s usually one of these: - Trigger wasn’t created yet (triggers aren’t retroactive). Re-deploy with the trigger, then run upstream again. - Filter mismatch. Your Completed event shows: - event:
prefect.flow-run.Completed
- related deployment name:
new-remove-query-deployment
- related deployment id:
prefect.deployment.1633c415-c4c4-4926-8ae6-7240341eba1f
Make sure your trigger uses
expect=["prefect.flow-run.Completed"]
and
match_related
that matches one of those exactly. To test, start broad:
Copy code
match_related={"prefect.resource.role": "deployment"}
Then tighten it to name or id. - Upstream run wasn’t from a deployment. If you run the flow code locally (no deployment), there may be no related “deployment” resource, so a filter requiring a deployment will never match. - No worker polling the downstream work pool, so the trigger fired but nothing picked up the run. Check the downstream deployment’s “Runs” tab. - Overly specific version filters. If you matched on
prefect.deployment.version-id
or
prefect.deployment.version
, those change with new builds. How to confirm: - Downstream deployment > Triggers tab: ensure the trigger is listed and enabled. - Automations/Triggers activity: look for evaluations/firings around the time of the upstream completion. - Event feed for the upstream run: you already have the correct
prefect.flow-run.Completed
event; your trigger should match it. Docs: - Creating deployment triggers: Creating deployment triggers - Deployment trigger schema: Deployment trigger schema If you confirm you Quick Ran the upstream deployment (not local flow code) and it still didn’t fire, share a screenshot of the downstream deployment’s Trigger config and I’ll help pinpoint the filter.
a
@Marvin what does this mean worker polling the downstream work pool
m
thought for 101.1 seconds
Great question. In Prefect 3.x: - A work pool is a “mailbox” for flow runs. - A worker is a process that continuously polls a work pool for new runs and, when it finds one, starts the flow run on the appropriate infrastructure. Why this matters for your trigger: - Your DeploymentEventTrigger creates a new flow run for the downstream deployment. - That run sits in the downstream work pool (here:
mna_pool
) until a worker connected to that pool picks it up. - If no worker is polling the pool, the run will be created but won’t start. Notes for your setup: - Your event shows
prefect.work-pool.type: "prefect:managed"
. For a Managed work pool, Prefect Cloud provides the worker—so you do not need to start one yourself. - If your downstream deployment is in a non-managed pool (e.g., Process, Docker, Kubernetes), you must have a worker running for that pool. How to check your pool type and status
Copy code
prefect work-pool inspect "mna_pool"
How to start a worker (only if your pool is not managed) - Process/Docker/Kubernetes pools require a worker you run:
Copy code
prefect worker start --pool "mna_pool" --name "mna-worker-1"
- If your pool type is Docker or Kubernetes, be sure the environment has the right extras installed before starting the worker (e.g., via your env or lockfile). If needed:
Copy code
uv run --with 'prefect[docker]' prefect worker start --pool "mna_pool"
uv run --with 'prefect[kubernetes]' prefect worker start --pool "mna_pool"
Quick checklist when chaining with triggers - Confirm the downstream deployment is assigned to the intended work pool (the one your worker or managed pool serves). - Verify the trigger exists on the downstream deployment before the upstream run. - After the upstream completes, check the downstream deployment’s Runs tab: - If you see a run in Pending/Late, the trigger fired but no worker is picking it up. - If there’s no run, the trigger filter didn’t match (or the trigger wasn’t present yet).