Cosimo Alfarano
09/16/2025, 9:43 AMMarvin
09/16/2025, 9:43 AMMarvin
09/16/2025, 9:45 AMPREFECT_API_AUTH_STRINGuser:pass//api/*Cosimo Alfarano
09/16/2025, 9:51 AMMarvin
09/16/2025, 9:51 AMMarvin
09/16/2025, 9:56 AMX-Auth-Request-UserX-Forwarded-Usertriggered_byuser:<username>{"actor": "<username>"}POST /api/deployments/<deployment_id>/create_flow_run
Content-Type: application/json
Authorization: Basic <...>  # if using PREFECT_API_AUTH_STRING
{
  "name": "my-run",
  "parameters": { "triggered_by": "<mailto:alice@example.com|alice@example.com>" },
  "tags": ["user:alice@example.com"],
  "context": { "actor": "<mailto:alice@example.com|alice@example.com>" }
}from prefect import flow, task, get_run_context
def get_triggering_user():
    ctx = get_run_context()
    # Prefer a parameter
    if "triggered_by" in ctx.parameters:
        return ctx.parameters["triggered_by"]
    # Or read from flow run context
    if getattr(ctx, "flow_run", None) and ctx.flow_run.context:
        user = ctx.flow_run.context.get("actor")
        if user:
            return user
    # Or parse a tag if you added one like "user:<name>"
    if getattr(ctx, "flow_run", None) and ctx.flow_run.tags:
        for t in ctx.flow_run.tags:
            if t.startswith("user:"):
                return t.split(":", 1)[1]
    return "unknown"
@task
def do_work():
    user = get_triggering_user()
    print(f"Task running; triggered by {user}")
@flow
def my_flow():
    user = get_triggering_user()
    print(f"Flow running; triggered by {user}")
    do_work()from prefect.artifacts import create_markdown_artifact
create_markdown_artifact(f"Run requested by {get_triggering_user()}")X-Auth-Request-Usertriggered_byuser:<name>Cosimo Alfarano
09/16/2025, 12:08 PMMarvin
09/16/2025, 12:08 PMMarvin
09/16/2025, 12:09 PMtriggered_byuser:alice@example.comcontextfrom prefect import flow, task, get_run_context
def get_actor_from_runtime():
    ctx = get_run_context()
    # Prefer a parameter
    if "triggered_by" in ctx.parameters:
        return ctx.parameters["triggered_by"]
    # Or read from flow run context (if you put it there at creation)
    fr = getattr(ctx, "flow_run", None)
    if fr and getattr(fr, "context", None):
        actor = fr.context.get("actor")
        if actor:
            return actor
    # Or parse a tag you added like "user:<name>"
    if fr and fr.tags:
        for t in fr.tags:
            if t.startswith("user:"):
                return t.split(":", 1)[1]
    return None
@task
def do_work():
    actor = get_actor_from_runtime() or "unknown"
    print(f"Task running; triggered by {actor}")
@flow
def my_flow():
    actor = get_actor_from_runtime() or "unknown"
    print(f"Flow running; triggered by {actor}")
    do_work()POST /api/deployments/<deployment_id>/create_flow_run
{
  "name": "run-with-actor",
  "parameters": {"triggered_by": "<mailto:alice@example.com|alice@example.com>"},
  "tags": ["user:alice@example.com"],
  "context": {"actor": "<mailto:alice@example.com|alice@example.com>"}
}get_run_context().flow_run.idCosimo Alfarano
09/16/2025, 12:16 PMMarvin
09/16/2025, 12:16 PMMarvin
09/16/2025, 12:19 PMX_AUTHORISED_USERX_AUTHORISED_USERimport os
import httpx
from fastapi import FastAPI, Request, HTTPException
from prefect.events import emit_event
PREFECT_API_URL = os.environ["PREFECT_API_URL"]
# If you're using PREFECT_API_AUTH_STRING (Basic Auth), expose it as an env var
PREFECT_API_AUTH = os.environ.get("PREFECT_API_AUTH")  # e.g. "Basic <base64>"
app = FastAPI()
@app.post("/trigger/{deployment_id}")
async def trigger_flow(deployment_id: str, request: Request):
    user = request.headers.get("X_AUTHORISED_USER")
    if not user:
        raise HTTPException(status_code=401, detail="Missing X_AUTHORISED_USER")
    # 1) Create the flow run and attach user info so the flow can read it directly
    create_url = f"{PREFECT_API_URL}/api/deployments/{deployment_id}/create_flow_run"
    body = {
        "name": f"requested-by-{user}",
        "parameters": {"triggered_by": user},           # easy to read inside the flow
        "tags": [f"user:{user}"],                       # useful for filtering
        "context": {"actor": user},                     # optional
    }
    headers = {"Authorization": PREFECT_API_AUTH} if PREFECT_API_AUTH else {}
    async with httpx.AsyncClient(timeout=20) as client:
        resp = await <http://client.post|client.post>(create_url, json=body, headers=headers)
        resp.raise_for_status()
        fr = resp.json()
        flow_run_id = fr["id"]
    # 2) Emit an event associated with the flow run
    # Resource id format is important to associate to the flow run:
    #   "prefect.flow-run.<UUID>"
    flow_run_resource = {"prefect.resource.id": f"prefect.flow-run.{flow_run_id}"}
    user_resource = {"prefect.resource.id": f"myorg.user.{user}"}  # your own namespace
    emit_event(
        event="myorg.flow-run.requested",
        resource=flow_run_resource,
        related=[user_resource],
        payload={"user": user},
    )
    return {"flow_run_id": flow_run_id}from prefect import flow, task, get_run_context
def get_actor_from_runtime():
    ctx = get_run_context()
    if "triggered_by" in ctx.parameters:
        return ctx.parameters["triggered_by"]
    fr = getattr(ctx, "flow_run", None)
    if fr and getattr(fr, "tags", None):
        for t in fr.tags:
            if t.startswith("user:"):
                return t.split(":", 1)[1]
    # if you set context={"actor": "..."} at creation time:
    if fr and getattr(fr, "context", None):
        return fr.context.get("actor")
    return None
@task
def do_work():
    user = get_actor_from_runtime() or "unknown"
    print(f"Task running; triggered by {user}")
@flow
def my_flow():
    user = get_actor_from_runtime() or "unknown"
    print(f"Flow running; triggered by {user}")
    do_work()myorg.flow-run.requestedMarvin
09/16/2025, 12:19 PMprefect.flow-run.<current_run_id>import os
import httpx
from prefect.runtime import flow_run
def get_requesting_user_from_event():
    api = os.environ["PREFECT_API_URL"]
    auth = os.environ.get("PREFECT_API_AUTH")  # e.g., "Basic <...>"
    headers = {"Authorization": auth} if auth else {}
    body = {
        "limit": 1,
        "order": "DESC",
        "filter": {
            "name": {"any_": ["myorg.flow-run.requested"]},
            "resource": {"any_": [{"id": f"prefect.flow-run.{flow_run.id}"}]}
        }
    }
    r = <http://httpx.post|httpx.post>(f"{api}/api/events/read", json=body, headers=headers, timeout=20)
    r.raise_for_status()
    events = r.json().get("events", [])
    if not events:
        return None
    return events[0].get("payload", {}).get("user")resource={"prefect.resource.id": f"prefect.flow-run.{flow_run_id}"}/api/events/*relatedCosimo Alfarano
09/16/2025, 12:22 PMMarvin
09/16/2025, 12:22 PMMarvin
09/16/2025, 12:22 PMX-Authenticated-UserPREFECT_API_AUTH_STRING