Riya Sinha
08/31/2025, 1:16 AMlease check your network settings to ensure websocket connections to the API are allowed
@Marvin what is the websocket used for? is this important?Marvin
08/31/2025, 1:17 AMMarvin
08/31/2025, 1:18 AMRiya Sinha
08/31/2025, 1:21 AMprefect.events.clients - Unable to connect to '<wss://api.prefect.cloud/api/accounts[...]>
. so like the server is the prefect cloud. doesnt that mean the prefect ui should still realtime update since managed by cloud? why does the events.clients try to open a websocket? @MarvinMarvin
08/31/2025, 1:21 AMMarvin
08/31/2025, 1:23 AMprefect.events.clients
. This is used to stream Prefect Events (and subscribe when needed) for real-time features in the platform.
- That log you’re seeing is from the runtime side, not the UI: prefect.events.clients - Unable to connect to '<wss://api.prefect.cloud/api/accounts[...]>'
.
What happens if the runtime’s WebSocket can’t connect?
- It’s non-fatal. Your flows still run, states are still set via HTTPS, and logs are still sent via HTTPS in batches.
- You may lose some “real-time” behavior from that process (e.g., immediate event emission/observation), but core functionality continues.
- The UI can still live-update as long as your browser can open its own WSS connection to Prefect Cloud. If your browser network allows WSS, the UI live updates; if it doesn’t, the UI won’t live update even though it’s cloud-hosted.
Networking to allow
- Allow outbound WSS (port 443) with WebSocket Upgrade to api.prefect.cloud.
- Many corporate proxies block or break WebSocket upgrades by default.
If you just want to quiet the log on your runtime, you can lower that logger’s level:
import logging
logging.getLogger("prefect.events.clients").setLevel(logging.ERROR)
References
- prefect.events.clients (API)
- Server WebSocket subscriptions (API)
If you share where you’re seeing the error (worker VM, container, or your browser dev console) and whether you’re behind a proxy/firewall, I can suggest exact allowlist rules or proxy settings.Riya Sinha
08/31/2025, 1:27 AMMarvin
08/31/2025, 1:28 AMMarvin
08/31/2025, 1:32 AMimport asyncio
import os
from prefect.events.clients import get_events_subscriber
# Terminal flow-run outcomes you care about
TERMINAL_SUFFIXES = {"Completed", "Failed", "Crashed", "Cancelled"}
async def wait_for_flow_run_done(flow_run_id: str):
# Uses PREFECT_API_URL and PREFECT_API_KEY from the environment
subscriber = get_events_subscriber()
async with subscriber:
async for ev in subscriber.events():
# ev is a ReceivedEvent
# ev.event is a string like "prefect.flow-run.Completed"
# ev.resource.id should be the flow run ID
if str(ev.resource.id) == str(flow_run_id):
suffix = ev.event.split(".")[-1]
if suffix in TERMINAL_SUFFIXES:
print(f"Flow run {flow_run_id} finished: {ev.event}")
return ev # you can inspect ev.payload if needed
# Example usage:
# asyncio.run(wait_for_flow_run_done("<your-flow-run-id>"))
Notes:
- If you prefer to listen per deployment, subscribe once and check any(str(r.id) == deployment_id for r in ev.related)
and the same terminal suffix test. That will catch all runs of that deployment.
- Make sure your server has PREFECT_API_URL
set to your Cloud workspace API URL and PREFECT_API_KEY
set; the subscriber uses those for the WSS connection.
Option B — polling (no WebSocket required)
- Reliable behind strict firewalls; trades real-time for a small delay.
import asyncio
from prefect.client.orchestration import get_client
TERMINAL_TYPES = {"COMPLETED", "FAILED", "CANCELLED", "CRASHED"}
async def wait_for_flow_run_done_poll(flow_run_id: str, interval_seconds: int = 2):
async with get_client() as client:
while True:
fr = await client.read_flow_run(flow_run_id)
# State type may be an enum or string; normalize to string
state_type = getattr(fr.state.type, "value", str(fr.state.type)).upper()
if state_type in TERMINAL_TYPES:
print(f"Flow run {flow_run_id} finished with state: {fr.state.name}")
return fr
await asyncio.sleep(interval_seconds)
Option C — webhook callback (no long-lived connection on your side)
- Use a Prefect Cloud Automation: trigger on “Flow run enters Completed/Failed/Crashed/Cancelled” and action “Call webhook” to your server. Your server gets an HTTP POST when the run finishes.
When to choose which:
- Want immediate, push-style updates and can allow WSS: Option A
- Strict network/proxy or simplest reliability: Option B
- Prefer push without maintaining a subscriber process: Option C
References
- get_events_subscriber (events client API)
- Server WebSocket subscriptions API
If you tell me whether you have the flow_run_id at start time and whether WSS is allowed from your server, I can tailor the exact snippet.Riya Sinha
08/31/2025, 1:37 AMMarvin
08/31/2025, 1:37 AMMarvin
08/31/2025, 1:38 AMimport asyncio
from fastapi import FastAPI
from prefect.events.clients import get_events_subscriber
app = FastAPI()
stop_event = asyncio.Event()
TERMINAL = {"Completed", "Failed", "Crashed", "Cancelled"}
async def subscriber_loop():
subscriber = get_events_subscriber() # uses PREFECT_API_URL/API_KEY env vars
async with subscriber:
async for ev in subscriber.events():
# flow-run terminal events look like "prefect.flow-run.Completed"
if ev.event.startswith("prefect.flow-run."):
suffix = ev.event.split(".")[-1]
if suffix in TERMINAL:
print(f"[Prefect] Flow run {ev.resource.id} finished: {suffix}")
if stop_event.is_set():
break
@app.on_event("startup")
async def start_subscriber():
app.state.sub_task = asyncio.create_task(subscriber_loop())
@app.on_event("shutdown")
async def stop_subscriber():
stop_event.set()
await app.state.sub_task
Example: receive a webhook instead (no WebSocket needed)
- Create a Cloud Automation: Trigger = “Flow run enters Completed/Failed/Crashed/Cancelled”, Action = “Call webhook”, set your URL.
- Optionally add an Authorization header and validate it in your server.
Minimal FastAPI handler:
from fastapi import FastAPI, Request, Header, HTTPException
app = FastAPI()
SECRET = "my-shared-secret"
@app.post("/prefect-callback")
async def prefect_callback(req: Request, authorization: str | None = Header(None)):
if authorization != f"Bearer {SECRET}":
raise HTTPException(status_code=401, detail="unauthorized")
payload = await req.json()
print("Received Prefect webhook:", payload)
return {"ok": True}
Which route do you prefer (webhook vs subscriber), and what framework are you using? I can tailor the snippet to filter by a specific flow_run_id or deployment.Nate
08/31/2025, 1:33 PMYes: if you use a Cloud Automation with “Call webhook”, your server does not need WebSocket access. Prefect Cloud will POST to your endpoint over HTTPS when the run finishes.marvin is broadly incorrect here
Nate
08/31/2025, 1:33 PMNate
08/31/2025, 1:34 PMNate
08/31/2025, 1:35 PM