```lease check your network settings to ensure web...
# marvin-ai
r
Copy code
lease check your network settings to ensure websocket connections to the API are allowed
@Marvin what is the websocket used for? is this important?
m
thought for 108.0 seconds
Short answer: it’s for real-time updates in the Prefect UI. In Prefect 3.x, the browser opens a WebSocket to the API so the UI can: - Stream logs live (including the “Live console”) - Show immediate state changes for flow/task runs - Update other real-time UI elements (e.g., queue/run counts, events) Is it required? No. Flows and workers run via normal HTTPS REST calls and do not depend on WebSockets. If WebSockets are blocked: - Your runs will still execute - The UI won’t live-update; you may need to refresh to see new states - Logs won’t stream in real time Networking notes: - Prefect Cloud: allow outbound WSS (WebSockets over TLS) to api.prefect.cloud on port 443. Some corporate proxies block WebSocket “Upgrade” by default or break it via SSL inspection. - Self-hosted: ensure your reverse proxy forwards WebSocket upgrades to the API (e.g., in Nginx, set `Upgrade`/`Connection: upgrade` headers to pass through). Docs for reference: - WebSocket subscriptions (server API) - Self-hosting guide (proxy considerations) If you want help with exact firewall/proxy rules, let me know whether you’re on Prefect Cloud or self-hosted and what proxy/firewall you’re using.
r
i guess my question is that the other part of the log entry is
prefect.events.clients - Unable to connect to '<wss://api.prefect.cloud/api/accounts[...]>
. so like the server is the prefect cloud. doesnt that mean the prefect ui should still realtime update since managed by cloud? why does the events.clients try to open a websocket? @Marvin
m
thought for 129.3 seconds
Great question — there are two separate WebSocket connections involved: 1) Browser/UI WebSocket - Your browser opens a WSS connection to Prefect Cloud so the UI can live-update (states, logs, etc.). - This is independent of anything your workers/flow runs do. 2) Runtime (worker/flow) Events WebSocket - The Python runtime also tries to open a WSS connection to Prefect Cloud via
prefect.events.clients
. This is used to stream Prefect Events (and subscribe when needed) for real-time features in the platform. - That log you’re seeing is from the runtime side, not the UI:
prefect.events.clients - Unable to connect to '<wss://api.prefect.cloud/api/accounts[...]>'
. What happens if the runtime’s WebSocket can’t connect? - It’s non-fatal. Your flows still run, states are still set via HTTPS, and logs are still sent via HTTPS in batches. - You may lose some “real-time” behavior from that process (e.g., immediate event emission/observation), but core functionality continues. - The UI can still live-update as long as your browser can open its own WSS connection to Prefect Cloud. If your browser network allows WSS, the UI live updates; if it doesn’t, the UI won’t live update even though it’s cloud-hosted. Networking to allow - Allow outbound WSS (port 443) with WebSocket Upgrade to api.prefect.cloud. - Many corporate proxies block or break WebSocket upgrades by default. If you just want to quiet the log on your runtime, you can lower that logger’s level:
Copy code
import logging
logging.getLogger("prefect.events.clients").setLevel(logging.ERROR)
References - prefect.events.clients (API) - Server WebSocket subscriptions (API) If you share where you’re seeing the error (worker VM, container, or your browser dev console) and whether you’re behind a proxy/firewall, I can suggest exact allowlist rules or proxy settings.
r
@Marvin gotcha. ok so if my server kicks off a flow run, how does it listen for when that is done? do i subscribe per flow run? or per deployment? give me an example of just like printing in my server when the prefect cloud has finished the job
m
thought for 240.3 seconds
You’ve got a few options. The simplest pattern if your server already creates the run is to subscribe to events and filter by that specific flow_run_id. Option A — subscribe to events (WebSocket) and wait for that flow run to finish - Works in real time if your server can open WSS to Prefect Cloud. - You subscribe once and filter on the flow_run_id you just created.
Copy code
import asyncio
import os

from prefect.events.clients import get_events_subscriber

# Terminal flow-run outcomes you care about
TERMINAL_SUFFIXES = {"Completed", "Failed", "Crashed", "Cancelled"}

async def wait_for_flow_run_done(flow_run_id: str):
    # Uses PREFECT_API_URL and PREFECT_API_KEY from the environment
    subscriber = get_events_subscriber()

    async with subscriber:
        async for ev in subscriber.events():
            # ev is a ReceivedEvent
            # ev.event is a string like "prefect.flow-run.Completed"
            # ev.resource.id should be the flow run ID
            if str(ev.resource.id) == str(flow_run_id):
                suffix = ev.event.split(".")[-1]
                if suffix in TERMINAL_SUFFIXES:
                    print(f"Flow run {flow_run_id} finished: {ev.event}")
                    return ev  # you can inspect ev.payload if needed

# Example usage:
# asyncio.run(wait_for_flow_run_done("<your-flow-run-id>"))
Notes: - If you prefer to listen per deployment, subscribe once and check
any(str(r.id) == deployment_id for r in ev.related)
and the same terminal suffix test. That will catch all runs of that deployment. - Make sure your server has
PREFECT_API_URL
set to your Cloud workspace API URL and
PREFECT_API_KEY
set; the subscriber uses those for the WSS connection. Option B — polling (no WebSocket required) - Reliable behind strict firewalls; trades real-time for a small delay.
Copy code
import asyncio
from prefect.client.orchestration import get_client

TERMINAL_TYPES = {"COMPLETED", "FAILED", "CANCELLED", "CRASHED"}

async def wait_for_flow_run_done_poll(flow_run_id: str, interval_seconds: int = 2):
    async with get_client() as client:
        while True:
            fr = await client.read_flow_run(flow_run_id)
            # State type may be an enum or string; normalize to string
            state_type = getattr(fr.state.type, "value", str(fr.state.type)).upper()
            if state_type in TERMINAL_TYPES:
                print(f"Flow run {flow_run_id} finished with state: {fr.state.name}")
                return fr
            await asyncio.sleep(interval_seconds)
Option C — webhook callback (no long-lived connection on your side) - Use a Prefect Cloud Automation: trigger on “Flow run enters Completed/Failed/Crashed/Cancelled” and action “Call webhook” to your server. Your server gets an HTTP POST when the run finishes. When to choose which: - Want immediate, push-style updates and can allow WSS: Option A - Strict network/proxy or simplest reliability: Option B - Prefer push without maintaining a subscriber process: Option C References - get_events_subscriber (events client API) - Server WebSocket subscriptions API If you tell me whether you have the flow_run_id at start time and whether WSS is allowed from your server, I can tailor the exact snippet.
r
@Marvin ok, so if i use a webhook then i dont need to enable the websocket for live events? also can the event subscriber run while i also have my server running generally?
m
thought for 70.6 seconds
Short answers - Yes: if you use a Cloud Automation with “Call webhook”, your server does not need WebSocket access. Prefect Cloud will POST to your endpoint over HTTPS when the run finishes. - Yes: an event subscriber can run alongside your server. Run it as a background async task (or a separate thread/process) and it won’t block your HTTP handlers. Notes and links - Webhook route: outbound HTTP from Prefect Cloud to your server; no WSS needed on your side. - Event subscriber: your server opens a WSS to Prefect Cloud; needed only if you want push-style updates without using webhooks. - API refs if you go the subscriber route: - get_events_subscriber - Server subscriptions Example: run the event subscriber in the background with a FastAPI app
Copy code
import asyncio
from fastapi import FastAPI
from prefect.events.clients import get_events_subscriber

app = FastAPI()
stop_event = asyncio.Event()
TERMINAL = {"Completed", "Failed", "Crashed", "Cancelled"}

async def subscriber_loop():
    subscriber = get_events_subscriber()  # uses PREFECT_API_URL/API_KEY env vars
    async with subscriber:
        async for ev in subscriber.events():
            # flow-run terminal events look like "prefect.flow-run.Completed"
            if ev.event.startswith("prefect.flow-run."):
                suffix = ev.event.split(".")[-1]
                if suffix in TERMINAL:
                    print(f"[Prefect] Flow run {ev.resource.id} finished: {suffix}")
            if stop_event.is_set():
                break

@app.on_event("startup")
async def start_subscriber():
    app.state.sub_task = asyncio.create_task(subscriber_loop())

@app.on_event("shutdown")
async def stop_subscriber():
    stop_event.set()
    await app.state.sub_task
Example: receive a webhook instead (no WebSocket needed) - Create a Cloud Automation: Trigger = “Flow run enters Completed/Failed/Crashed/Cancelled”, Action = “Call webhook”, set your URL. - Optionally add an Authorization header and validate it in your server. Minimal FastAPI handler:
Copy code
from fastapi import FastAPI, Request, Header, HTTPException

app = FastAPI()
SECRET = "my-shared-secret"

@app.post("/prefect-callback")
async def prefect_callback(req: Request, authorization: str | None = Header(None)):
    if authorization != f"Bearer {SECRET}":
        raise HTTPException(status_code=401, detail="unauthorized")
    payload = await req.json()
    print("Received Prefect webhook:", payload)
    return {"ok": True}
Which route do you prefer (webhook vs subscriber), and what framework are you using? I can tailor the snippet to filter by a specific flow_run_id or deployment.
n
Yes: if you use a Cloud Automation with “Call webhook”, your server does not need WebSocket access. Prefect Cloud will POST to your endpoint over HTTPS when the run finishes.
marvin is broadly incorrect here
the SDK emits events via websocket client
the UI is just one client that reads from the server to see what's going on you need to be able to create websocket clients to emit events where your work happens, so that • the server can track what work happened • other clients can then read what work happens from the server