Andrea Flores
08/01/2025, 5:54 PMMarvin
08/01/2025, 5:54 PMMarvin
08/01/2025, 5:56 PMurl
parameter and has various notification types available. When setting up through the UI, you'll just need to provide the webhook URL.
The most important things to remember are:
1. Set up the Slack Webhook block first (either via UI or code)
2. Create an automation in Prefect Cloud to trigger on flow run failures
3. The automation will handle everything automatically - no need to modify your flow code
Would you like me to provide more specific details about any part of this setup process?Andrea Flores
08/02/2025, 5:03 PMMarvin
08/02/2025, 5:03 PMMarvin
08/02/2025, 5:04 PMtimeout_seconds
parameter:
python
from prefect import flow
@flow(timeout_seconds=3600) # Set timeout to 1 hour
def my_long_running_flow():
# Your flow logic here
pass
2. Environment Variable Configuration
If you're experiencing timeouts related to database interactions (which are different from flow execution timeouts), you can modify these using environment variables:
bash
# For database interaction timeout (default 10 seconds)
export PREFECT_SERVER_DATABASE_TIMEOUT=60
# For database connection timeout (default 5 seconds)
export PREFECT_SERVER_DATABASE_CONNECTION_TIMEOUT=30
Important notes:
- If you don't specify timeout_seconds
, flows will run without a timeout limit
- The database timeouts mentioned above are different from flow execution timeouts - they control how long Prefect will wait for database operations
- Make sure to set timeouts that are appropriate for your workload - setting them too high might mask underlying issues, while setting them too low might cause unnecessary failures
Let me know if you need any clarification or have questions about implementing these timeout configurations!Andrea Flores
08/02/2025, 5:08 PMFlow run reached configured timeout
error: The flow run exceeded the configured timeout of 600 seconds, indicating a potential performance issue or a need to adjust the timeout settings."Marvin
08/02/2025, 5:08 PMMarvin
08/02/2025, 5:09 PMpython
from prefect import flow
@flow(timeout_seconds=3600) # Set to 1 hour (3600 seconds)
def my_long_running_flow():
# Your flow logic here
pass
2. For Deployments
When creating a deployment, you can specify the timeout in the deployment configuration:
python
from prefect import flow
from prefect.deployments import Deployment
@flow
def my_long_running_flow():
# Your flow logic here
pass
deployment = my_flow.from_source(
source="<https://github.com/your/repo>",
entrypoint="path/to/file.py:my_long_running_flow"
).set_timeout(3600) # Set to 1 hour
deployment.deploy(name="my-deployment")
3. For Existing Deployments
You can update the timeout for an existing deployment using the Prefect UI or API. In the UI, you can modify the deployment settings and update the timeout value.
Important Notes:
- If you don't specify a timeout, flows will run without a timeout limit
- Choose a timeout value that makes sense for your workload - if your flow typically takes 30 minutes, setting a timeout of 40-45 minutes might be reasonable
- Consider adding logging or progress updates in your flow to help diagnose where time is being spent if you're consistently hitting timeout limits
- If you're running into timeouts frequently, it might be worth profiling your flow to identify potential performance improvements
Would you like me to provide more specific guidance based on how you're running your flows (e.g., local execution, deployment, specific infrastructure)?Andrea Flores
08/07/2025, 4:30 PMMarvin
08/07/2025, 4:30 PMMarvin
08/07/2025, 4:30 PMpython
from prefect import flow
@flow(timeout_seconds=3600) # Set to 1 hour
def my_long_running_flow():
# Your flow logic here
pass
2. Update the Deployment Configuration
When creating or updating your deployment, set the timeout in your deployment code:
python
from prefect import flow
@flow
def my_long_running_flow():
pass
if __name__ == "__main__":
my_long_running_flow.from_source(
source="your-source",
entrypoint="path/to/file.py:my_long_running_flow"
).set_timeout(3600).deploy(name="my-deployment", work_pool_name="your-work-pool")
Since you're using a managed workflow, you'll need to redeploy your flow with the updated timeout settings. The UI itself doesn't provide direct controls for modifying timeout settings - these need to be specified in your flow or deployment code.
Would you like help with updating your specific flow or deployment with the new timeout settings?Andrea Flores
08/07/2025, 4:34 PMMarvin
08/07/2025, 4:34 PMMarvin
08/07/2025, 4:35 PMpython
from prefect import flow
@flow(timeout_seconds=3600) # Set to 1 hour
def my_long_running_flow():
# Your flow logic here
pass
if __name__ == "__main__":
my_long_running_flow.from_source(
source="your-source",
entrypoint="path/to/file.py:my_long_running_flow"
).deploy(name="my-deployment", work_pool_name="your-work-pool")
The key difference is that the timeout_seconds
parameter is set in the flow decorator, not in the deployment configuration. The set_timeout()
method I incorrectly suggested earlier doesn't exist.
If you already have a flow defined and want to modify its timeout, you'll need to:
1. Update the flow decorator to include the timeout
2. Redeploy the flow
For example, if your current flow looks like this:
python
@flow
def my_long_running_flow():
pass
Change it to:
python
@flow(timeout_seconds=3600) # or whatever timeout you need
def my_long_running_flow():
pass
Then redeploy the flow. The new timeout setting will apply to all future runs of this flow.
Would you like help with updating your specific flow code with the correct timeout setting?Andrea Flores
08/07/2025, 5:16 PMMarvin
08/07/2025, 5:16 PMMarvin
08/07/2025, 5:17 PMMarvin
08/07/2025, 5:22 PMMarvin
08/07/2025, 5:23 PMpush_action_api
task
2. Added task-level timeouts to both tasks
3. Added better error handling for task submissions
4. Maintained the flow-level timeout of 3600 seconds (1 hour)
Would you like me to explain any of these changes in more detail? Also, are you seeing any specific error messages when the timeout occurs?Andrea Flores
08/21/2025, 3:07 PMMarvin
08/21/2025, 3:07 PMMarvin
08/21/2025, 3:15 PMprefect.resource.name
for a deployment is flow-name/deployment-name, not just the deployment name
- Use a list for expect
(e.g., ["prefect.flow-run.Completed"]
)
- It helps to also include the resource role so you’re matching the upstream deployment specifically
Try this adjusted snippet:
from prefect import flow
from prefect.events import DeploymentEventTrigger
from prefect.runner.storage import GitRepository
from prefect_github import GitHubCredentials
if __name__ == "__main__":
github_repo = GitRepository(
url="<https://github.com/Minnesota-Nurses-Association/mna.git>",
credentials=GitHubCredentials.load("github"),
)
# Replace <UPSTREAM_FLOW_NAME> with the actual upstream flow's name
flow.from_source(
source=github_repo,
entrypoint="rETL/retl_steps/retl_new_remove.py:main",
).deploy(
name="new-remove-retl",
work_pool_name="mna_pool",
triggers=[
DeploymentEventTrigger(
expect=["prefect.flow-run.Completed"],
match_related={
"prefect.resource.role": "deployment",
"prefect.resource.name": "<UPSTREAM_FLOW_NAME>/new-remove-query-deployment",
},
# Optionally pass event into your flow params if you want
# parameters={"event": "{{ event }}"},
),
],
)
How to find the exact labels to match:
- In the UI, open a completed run of the upstream deployment, go to the Events tab, open the “prefect.flow-run.Completed” event, and inspect Related resources. Copy the exact labels (especially prefect.resource.name
) for the deployment. This is the most reliable way to get the right value.
- Alternatively, temporarily loosen the filter to confirm triggers fire:
DeploymentEventTrigger(
expect=["prefect.flow-run.Completed"],
match_related={"prefect.resource.role": "deployment"},
)
Once you see it triggering, tighten the match_related
to the exact name.
Other things to double‑check:
- Both deployments are in the same Prefect workspace.
- You have an active worker polling the mna_pool
work pool (the trigger schedules the downstream run; a worker still has to pick it up).
- You’re on Prefect 3.x (triggers are configured via flow.from_source(...).deploy(..., triggers=[...])
in 3.x, not via the old 2.x APIs).
Docs for reference:
- Creating deployment triggers: Creating deployment triggers
- Events/trigger classes exported in `prefect.events`: Deployment trigger schema
If you can share the upstream flow name and a screenshot of the Completed event’s Related resources, I can give you the exact match_related
filter to use.Andrea Flores
08/21/2025, 3:26 PM"account": "38952a0f-7f5e-4040-9b1a-91758ac48b18",
"event": "prefect.task-run.Completed",
"occurred": "2025-08-21T13:00:52.188Z",
"payload": {
"intended": {
"from": "RUNNING",
"to": "COMPLETED"
},
"initial_state": {
"type": "RUNNING",
"name": "Running",
"message": "",
"state_details": {}
},
"validated_state": {
"type": "COMPLETED",
"name": "Completed",
"message": "",
"state_details": {
"untrackable_result": true
},
"data": null
},
"task_run": {
"name": "run_new_remove_union-dcb",
"task_key": "run_new_remove_union-19ff5f9c",
"dynamic_key": "dcbc9548-f014-40ac-af44-b163f70e239a",
"empirical_policy": {
"max_retries": 0,
"retry_delay_seconds": 0,
"retries": 0,
"retry_delay": 0
},
"tags": [],
"labels": {},
"task_inputs": {},
"run_count": 1,
"flow_run_run_count": 1,
"expected_start_time": "2025-08-21T13:00:38.852241Z",
"start_time": "2025-08-21T13:00:38.859322Z",
"end_time": "2025-08-21T13:00:52.188045Z",
"total_run_time": 13.328723
}
},
"received": "2025-08-21T13:00:52.223Z",
"related": [
{
"prefect.resource.id": "prefect.flow-run.068a32f2-4a81-729b-8000-6ca24085b9f3",
"prefect.resource.role": "flow-run",
"prefect.resource.name": "rigorous-mustang"
},
{
"prefect.resource.id": "prefect.flow.01923fe6-34e8-4f3e-8db0-5c2f0e4128a7",
"prefect.resource.role": "flow",
"prefect.resource.name": "main"
},
{
"prefect.resource.id": "prefect.deployment.1633c415-c4c4-4926-8ae6-7240341eba1f",
"prefect.resource.role": "deployment",
"prefect.resource.name": "new-remove-query-deployment",
"prefect.deployment.version-id": "06891394-b9be-7153-8000-3923b78bfffd",
"prefect.deployment.version-type": "vcs:git",
"prefect.deployment.version": "c55a2069"
},
{
"prefect.resource.id": "prefect.work-queue.0611e979-d7ee-4839-be0d-75fe5bba41f6",
"prefect.resource.role": "work-queue",
"prefect.resource.name": "default"
},
{
"prefect.resource.id": "prefect.work-pool.0bdf1f64-8bd0-4d51-a772-a8ebb4ea2bcc",
"prefect.resource.role": "work-pool",
"prefect.resource.name": "mna_pool"
},
{
"prefect.resource.id": "prefect.tag.auto-scheduled",
"prefect.resource.role": "tag"
}
],
"resource": {
"prefect.resource.id": "prefect.task-run.0198ccb7-9044-7af5-8974-e6ca11ed188a",
"prefect.resource.name": "run_new_remove_union-dcb",
"prefect.run-count": "1",
"prefect.state-message": "",
"prefect.state-name": "Completed",
"prefect.state-timestamp": "2025-08-21T13:00:52.188045+00:00",
"prefect.state-type": "COMPLETED",
"prefect.orchestration": "client"
},
"workspace": "16fe9013-2669-404a-bbbb-a61ae0e696ed"
}
Marvin
08/21/2025, 3:26 PMMarvin
08/21/2025, 3:27 PMtriggers=[
DeploymentEventTrigger(
expect=["prefect.flow-run.Completed"],
match_related={
"prefect.resource.role": "deployment",
"prefect.resource.name": "new-remove-query-deployment",
# even better — match by id to avoid issues if the name changes:
# "prefect.resource.id": "prefect.deployment.1633c415-c4c4-4926-8ae6-7240341eba1f",
},
# parameters={"event": "{{ event }}"}, # optional: pass event into flow params
)
]
If you actually want to trigger as soon as that TASK completes
- Change expect to prefect.task-run.Completed
- Optionally add a match on the task-run itself and keep the deployment in match_related
Example:
triggers=[
DeploymentEventTrigger(
expect=["prefect.task-run.Completed"],
match={
"prefect.resource.role": "task-run",
"prefect.resource.name": "run_new_remove_union-dcb",
},
match_related={
"prefect.resource.role": "deployment",
"prefect.resource.name": "new-remove-query-deployment",
# or by id:
# "prefect.resource.id": "prefect.deployment.1633c415-c4c4-4926-8ae6-7240341eba1f",
},
)
]
Important nits
- Use a list for expect, not a set. In Python, {"prefect.flow-run.Completed"}
is a set; prefer ["prefect.flow-run.Completed"]
.
- Triggers live on the downstream deployment. After deploying, confirm in the UI that your deployment shows the trigger under the Triggers tab and that it’s enabled.
- Ensure a worker is running for the mna_pool work pool so the triggered run can be picked up.
Refs
- Creating deployment triggers: Creating deployment triggers
- Deployment trigger schema/API: Deployment trigger schema
If you can confirm whether you want to chain on the flow completion or that specific task completion, I can give you the exact trigger block you should drop into your deploy code.Andrea Flores
08/21/2025, 3:30 PM{
"id": "068a7188-44f4-74e1-8000-b37afb9c156e",
"account": "38952a0f-7f5e-4040-9b1a-91758ac48b18",
"event": "prefect.flow-run.Completed",
"occurred": "2025-08-21T13:00:52.309Z",
"payload": {
"intended": {
"from": "RUNNING",
"to": "COMPLETED"
},
"initial_state": {
"type": "RUNNING",
"name": "Running",
"timestamp": "2025-08-21T13:00:38.648723+00:00",
"id": "068a7187-6a60-7fd2-8000-11b92ef15424"
},
"validated_state": {
"type": "COMPLETED",
"name": "Completed",
"timestamp": "2025-08-21T13:00:52.309689+00:00",
"id": "068a7188-44f4-74e1-8000-b37afb9c156e"
}
},
"received": "2025-08-21T13:00:52.586Z",
"related": [
{
"prefect.resource.id": "prefect.flow.01923fe6-34e8-4f3e-8db0-5c2f0e4128a7",
"prefect.resource.role": "flow",
"prefect.resource.name": "main"
},
{
"prefect.resource.id": "prefect.deployment.1633c415-c4c4-4926-8ae6-7240341eba1f",
"prefect.resource.role": "deployment",
"prefect.resource.name": "new-remove-query-deployment",
"prefect.deployment.version-id": "06891394-b9be-7153-8000-3923b78bfffd",
"prefect.deployment.version-type": "vcs:git",
"prefect.deployment.version": "c55a2069"
},
{
"prefect.resource.id": "prefect.work-queue.0611e979-d7ee-4839-be0d-75fe5bba41f6",
"prefect.resource.role": "work-queue",
"prefect.resource.name": "default"
},
{
"prefect.resource.id": "prefect.work-pool.0bdf1f64-8bd0-4d51-a772-a8ebb4ea2bcc",
"prefect.resource.role": "work-pool",
"prefect.resource.name": "mna_pool",
"prefect.work-pool.type": "prefect:managed"
},
{
"prefect.resource.id": "prefect.tag.auto-scheduled",
"prefect.resource.role": "tag"
},
{
"prefect.resource.id": "prefect.schedule.5944df2f-907d-42a2-ba4f-8eba094ed9c2",
"prefect.resource.role": "creator",
"prefect.resource.name": "CronSchedule"
}
],
"resource": {
"prefect.resource.id": "prefect.flow-run.068a32f2-4a81-729b-8000-6ca24085b9f3",
"prefect.resource.name": "rigorous-mustang",
"prefect.run-count": "1",
"prefect.state-message": "",
"prefect.state-name": "Completed",
"prefect.state-timestamp": "2025-08-21T13:00:52.309689+00:00",
"prefect.state-type": "COMPLETED"
},
"workspace": "16fe9013-2669-404a-bbbb-a61ae0e696ed"
}
@Marvin sorry this is the right oneMarvin
08/21/2025, 3:31 PMMarvin
08/21/2025, 3:31 PM{"prefect.flow-run.Completed"}
. Use ["prefect.flow-run.Completed"]
.
2) Match the upstream deployment by id or name from the event you sent
From your event:
- deployment id: prefect.deployment.1633c415-c4c4-4926-8ae6-7240341eba1f
- deployment name: new-remove-query-deployment
I recommend matching by id to be unambiguous:
from prefect import flow
from prefect.events import DeploymentEventTrigger
from prefect.runner.storage import GitRepository
from prefect_github import GitHubCredentials
if __name__ == "__main__":
github_repo = GitRepository(
url="<https://github.com/Minnesota-Nurses-Association/mna.git>",
credentials=GitHubCredentials.load("github"),
)
flow.from_source(
source=github_repo,
entrypoint="rETL/retl_steps/retl_new_remove.py:main",
).deploy(
name="new-remove-retl",
work_pool_name="mna_pool",
triggers=[
DeploymentEventTrigger(
expect=["prefect.flow-run.Completed"],
match_related={
"prefect.resource.role": "deployment",
"prefect.resource.id": "prefect.deployment.1633c415-c4c4-4926-8ae6-7240341eba1f",
# or use name if you prefer:
# "prefect.resource.name": "new-remove-query-deployment",
},
# parameters={"event": "{{ event }}"}, # optional
)
],
)
3) Re-deploy, then test with a fresh upstream run
Triggers are not retroactive. After you deploy the downstream with the trigger, kick off a new run of the upstream deployment and confirm a downstream flow run is created.
4) Sanity checks
- UI: open the downstream deployment and confirm the trigger is listed and enabled.
- Worker: ensure a worker is polling the mna_pool so the triggered run gets picked up.
Docs for reference:
- Creating deployment triggers: Creating deployment triggers
- Trigger schema: Deployment trigger schema
If you redeploy with the snippet above and the new upstream run still doesn’t create a downstream run, ping me and I’ll help inspect the automation and event filters further.Andrea Flores
08/21/2025, 3:45 PMMarvin
08/21/2025, 3:45 PMMarvin
08/21/2025, 3:46 PMprefect.flow-run.Completed
- related deployment name: new-remove-query-deployment
- related deployment id: prefect.deployment.1633c415-c4c4-4926-8ae6-7240341eba1f
Make sure your trigger uses expect=["prefect.flow-run.Completed"]
and match_related
that matches one of those exactly. To test, start broad:
match_related={"prefect.resource.role": "deployment"}
Then tighten it to name or id.
- Upstream run wasn’t from a deployment. If you run the flow code locally (no deployment), there may be no related “deployment” resource, so a filter requiring a deployment will never match.
- No worker polling the downstream work pool, so the trigger fired but nothing picked up the run. Check the downstream deployment’s “Runs” tab.
- Overly specific version filters. If you matched on prefect.deployment.version-id
or prefect.deployment.version
, those change with new builds.
How to confirm:
- Downstream deployment > Triggers tab: ensure the trigger is listed and enabled.
- Automations/Triggers activity: look for evaluations/firings around the time of the upstream completion.
- Event feed for the upstream run: you already have the correct prefect.flow-run.Completed
event; your trigger should match it.
Docs:
- Creating deployment triggers: Creating deployment triggers
- Deployment trigger schema: Deployment trigger schema
If you confirm you Quick Ran the upstream deployment (not local flow code) and it still didn’t fire, share a screenshot of the downstream deployment’s Trigger config and I’ll help pinpoint the filter.Andrea Flores
08/21/2025, 3:54 PMMarvin
08/21/2025, 3:54 PMMarvin
08/21/2025, 3:56 PMmna_pool
) until a worker connected to that pool picks it up.
- If no worker is polling the pool, the run will be created but won’t start.
Notes for your setup:
- Your event shows prefect.work-pool.type: "prefect:managed"
. For a Managed work pool, Prefect Cloud provides the worker—so you do not need to start one yourself.
- If your downstream deployment is in a non-managed pool (e.g., Process, Docker, Kubernetes), you must have a worker running for that pool.
How to check your pool type and status
prefect work-pool inspect "mna_pool"
How to start a worker (only if your pool is not managed)
- Process/Docker/Kubernetes pools require a worker you run:
prefect worker start --pool "mna_pool" --name "mna-worker-1"
- If your pool type is Docker or Kubernetes, be sure the environment has the right extras installed before starting the worker (e.g., via your env or lockfile). If needed:
uv run --with 'prefect[docker]' prefect worker start --pool "mna_pool"
uv run --with 'prefect[kubernetes]' prefect worker start --pool "mna_pool"
Quick checklist when chaining with triggers
- Confirm the downstream deployment is assigned to the intended work pool (the one your worker or managed pool serves).
- Verify the trigger exists on the downstream deployment before the upstream run.
- After the upstream completes, check the downstream deployment’s Runs tab:
- If you see a run in Pending/Late, the trigger fired but no worker is picking it up.
- If there’s no run, the trigger filter didn’t match (or the trigger wasn’t present yet).