Tim Ricablanca
11/10/2022, 10:24 PMprefect-orion-1 | 21:53:34.590 | DEBUG | prefect.orion.services.flowrunnotifications - Got 3 notifications from queue.
However, it looks like the code intends for only one notification to be read off the queue in the first place: https://github.com/PrefectHQ/prefect/blob/main/src/prefect/orion/services/flow_run_notifications.py#L38-L42.Running
and Failed
in succession, only one of them gets read off the queue and the other is discarded.Running
state, then the Failed
notification makes it out of the queue. If there’s more time (I believe the polling time is 4 seconds) in the flow run between the Running and Failed notifications getting enqueued, then both seem to get read off appropriately.Zanie
prefect version
where you’re running your server? Curious if you’re using Postgres or SQLite since these queries are different for each backend.Carlo
11/10/2022, 10:50 PMTim Ricablanca
11/10/2022, 10:52 PMVersion: 2.6.4
API version: 0.8.2
Python version: 3.9.15
Git commit: 51e92dda
Built: Thu, Oct 20, 2022 3:11 PM
OS/Arch: linux/x86_64
Profile: default
Server type: hosted
psql (PostgreSQL) 14.1
Zanie
pip show sqlalchemy asyncpg
?Tim Ricablanca
11/10/2022, 11:27 PMName: SQLAlchemy
Version: 1.4.42
Summary: Database Abstraction Library
Home-page: <https://www.sqlalchemy.org>
Author: Mike Bayer
Author-email: <mailto:mike_mp@zzzcomputing.com|mike_mp@zzzcomputing.com>
License: MIT
Location: /usr/local/lib/python3.9/site-packages
Requires: greenlet
Required-by: alembic, prefect
---
Name: asyncpg
Version: 0.26.0
Summary: An asyncio PostgreSQL driver
Home-page: <https://github.com/MagicStack/asyncpg>
Author: MagicStack Inc
Author-email: <mailto:hello@magic.io|hello@magic.io>
License: Apache License, Version 2.0
Location: /usr/local/lib/python3.9/site-packages
Requires:
Required-by: prefect
Zanie
Tim Ricablanca
11/10/2022, 11:35 PMZanie
PREFECT_ORION_DATABASE_ECHO
Tim Ricablanca
11/10/2022, 11:43 PMprefect config set PREFECT_ORION_DATABASE_ECHO=True
— i’ll tail the logs and try to re-runZanie
LIMIT 1
from the code but apparently that’s not working for ya’llTim Ricablanca
11/10/2022, 11:46 PMZanie
Tim Ricablanca
11/10/2022, 11:47 PMprefect-orion-1 | 23:52:39.362 | INFO | sqlalchemy.engine.Engine - WITH queued_notifications AS
prefect-orion-1 | (DELETE FROM flow_run_notification_queue WHERE flow_run_notification_queue.id IN (SELECT flow_run_notification_queue.id
prefect-orion-1 | FROM flow_run_notification_queue ORDER BY flow_run_notification_queue.updated
prefect-orion-1 | LIMIT %s FOR UPDATE SKIP LOCKED) RETURNING flow_run_notification_queue.id, flow_run_notification_queue.flow_run_notification_policy_id, flow_run_notification_queue.flow_run_state_id)
prefect-orion-1 | SELECT queued_notifications.id AS queue_id, flow_run_notification_policy.id AS flow_run_notification_policy_id, flow_run_notification_policy.message_template AS flow_run_notification_policy_message_template, flow_run_notification_policy.block_document_id, flow.id AS flow_id, flow.name AS flow_name, flow_run.id AS flow_run_id, flow_run.name AS flow_run_name, flow_run.parameters AS flow_run_parameters, flow_run_state.type AS flow_run_state_type, flow_run_state.name AS flow_run_state_name, flow_run_state.timestamp AS flow_run_state_timestamp, flow_run_state.message AS flow_run_state_message
prefect-orion-1 | FROM queued_notifications JOIN flow_run_notification_policy ON queued_notifications.flow_run_notification_policy_id = flow_run_notification_policy.id JOIN flow_run_state ON queued_notifications.flow_run_state_id = flow_run_state.id JOIN flow_run ON flow_run_state.flow_run_id = flow_run.id JOIN flow ON flow_run.flow_id = flow.id
prefect-orion-1 | 2022-11-10 23:52:39,363 INFO sqlalchemy.engine.Engine [cached since 75.8s ago] (1,)
apprise
—
prefect-orion-1 | 23:53:27.396 | DEBUG | prefect.orion.services.flowrunnotifications - Got 2 notifications from queue.
prefect-orion-1 | 23:53:27.396 | DEBUG | prefect.orion.services.flowrunnotifications - Got 2 notifications from queue.
Zanie
LIMIT %s
👀Tim Ricablanca
11/10/2022, 11:57 PMZanie
(Pdb) print(notification_details_stmt.compile(compile_kwargs={"literal_binds": True}))
WITH queued_notifications AS
(DELETE FROM flow_run_notification_queue WHERE flow_run_notification_queue.id IN (SELECT flow_run_notification_queue.id
FROM flow_run_notification_queue ORDER BY flow_run_notification_queue.updated
LIMIT 1 FOR UPDATE) RETURNING flow_run_notification_queue.id, flow_run_notification_queue.flow_run_notification_policy_id, flow_run_notification_queue.flow_run_state_id)
SELECT queued_notifications.id AS queue_id, flow_run_notification_policy.id AS flow_run_notification_policy_id, flow_run_notification_policy.message_template AS flow_run_notification_policy_message_template, flow_run_notification_policy.block_document_id, flow.id AS flow_id, flow.name AS flow_name, flow_run.id AS flow_run_id, flow_run.name AS flow_run_name, flow_run.parameters AS flow_run_parameters, flow_run_state.type AS flow_run_state_type, flow_run_state.name AS flow_run_state_name, flow_run_state.timestamp AS flow_run_state_timestamp, flow_run_state.message AS flow_run_state_message
FROM queued_notifications JOIN flow_run_notification_policy ON queued_notifications.flow_run_notification_policy_id = flow_run_notification_policy.id JOIN flow_run_state ON queued_notifications.flow_run_state_id = flow_run_state.id JOIN flow_run ON flow_run_state.flow_run_id = flow_run.id JOIN flow ON flow_run.flow_id = flow.id
Tim Ricablanca
11/11/2022, 12:14 AMZanie
LIMIT :param_1
for Postgres and LIMIT %s
for SQLite — are you sure it’s running against postgres as intended?Tim Ricablanca
11/11/2022, 12:17 AMPREFECT_ORION_DATABASE_CONNECTION_URL: <postgresql+asyncpg://postgres>@...
Zanie
Tim Ricablanca
11/11/2022, 12:20 AM# docker-compose.yml
version: "3.9"
services:
database:
image: postgres:14.1-alpine
restart: always
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=MySuperSecretPassword
- POSTGRES_DB=orion
expose:
- 5432
volumes:
- /prefect/postgres:/var/lib/postgresql/data
orion:
image: prefecthq/prefect:2.6.4-python3.9
restart: always
volumes:
- /prefect/orion:/root/.prefect
- /prefect/monkeypatch/orion.py:/usr/local/lib/python3.9/site-packages/prefect/cli/orion.py # temporary logging fix, see: <https://github.com/PrefectHQ/prefect/compare/poc/fix-prefect-logging-uvicorn>
entrypoint: ["prefect", "orion", "start"]
environment:
- PREFECT_ORION_API_HOST=0.0.0.0
- PREFECT_ORION_API_PORT=80
- PREFECT_API_URL=<http://our-server-lives-here.com:80/api>
- PREFECT_API_KEY=<guid>
- PREFECT_ORION_DATABASE_CONNECTION_URL=<postgresql+asyncpg://postgres:MySuperSecretPassword@database:5432/orion>
- PREFECT_LOGGING_LEVEL=DEBUG # default: INFO
- PREFECT_LOGGING_SERVER_LEVEL=DEBUG # default: WARNING
- PREFECT_DEBUG_MODE=True
- PREFECT_LOGGING_ORION_ENABLED=True
- PREFECT_ORION_DATABASE_ECHO=True
ports:
- 80:80
depends_on:
- database
Zanie
Carlo
11/11/2022, 9:14 PMZanie
Tim Ricablanca
11/11/2022, 9:15 PMZanie
Tim Ricablanca
11/14/2022, 7:26 PM