<@ULVA73B9P> how can I send a custom New Relic eve...
# ask-marvin
k
@Marvin how can I send a custom New Relic event from a Prefect flow?
m
thought for 297.2 seconds
Short answer: post to New Relic’s Event API from your flow (or use the New Relic Python agent’s record_custom_event). Below are two solid patterns for Prefect 3.x. Option A: Call New Relic’s Event API directly (recommended) - Store your New Relic Insert Key in a Prefect Secret block - Build a payload with flow metadata from prefect.runtime - POST it with httpx/requests (or via a Webhook block) Example using Secret + httpx:
Copy code
from datetime import datetime, timezone
import time
import httpx

from prefect import flow, task, get_run_logger
from prefect.runtime import flow_run, deployment
from prefect.blocks.system import Secret

NR_US_EVENTS_URL = "<https://insights-collector.newrelic.com/v1/accounts/{account_id}/events>"
# For EU: "<https://insights-collector.eu01.nr-data.net/v1/accounts/{account_id}/events>"

@task(retries=3, retry_delay_seconds=5)
def send_new_relic_event(account_id: int, event_type: str, attributes: dict):
    insert_key = Secret.load("new-relic-insert-key").get()

    # enrich with Prefect metadata
    payload = {
        "eventType": event_type,
        "timestamp": int(time.time() * 1000),  # ms epoch
        "flow_run_id": flow_run.id,
        "flow_name": flow_run.flow_name,
        "flow_run_name": flow_run.name,
        "deployment_name": deployment.name,
        "deployment_version": deployment.version,
        "parameters": flow_run.parameters,
        "tags": flow_run.tags,
        **attributes,
    }

    url = NR_US_EVENTS_URL.format(account_id=account_id)
    headers = {
        "Content-Type": "application/json",
        "X-Insert-Key": insert_key,  # Insert key header
    }

    with httpx.Client(timeout=10) as client:
        r = <http://client.post|client.post>(url, json=[payload], headers=headers)  # list = batch of events
        r.raise_for_status()
    return r.status_code

@flow
def my_flow():
    logger = get_run_logger()
    status = send_new_relic_event(
        account_id=1234567,
        event_type="PrefectFlowEvent",
        attributes={"stage": "prod", "custom_metric": 42},
    )
    <http://logger.info|logger.info>(f"New Relic event sent (HTTP {status})")

if __name__ == "__main__":
    my_flow()
Alternative using a Webhook block (no extra deps): - Create a Webhook block named new-relic-events with: - URL: US or EU event endpoint for your account - Headers (SecretDict): X-Insert-Key: <your-insert-key>, Content-Type: application/json - Then call it in your flow:
Copy code
from prefect import flow, task
from prefect.runtime import flow_run, deployment
from prefect.blocks.webhook import Webhook
import time

@task
def send_via_webhook():
    wh = Webhook.load("new-relic-events")
    payload = {
        "eventType": "PrefectFlowEvent",
        "timestamp": int(time.time() * 1000),
        "flow_run_id": flow_run.id,
        "flow_name": flow_run.flow_name,
        "deployment_name": deployment.name,
    }
    resp = wh.call(payload=[payload])
    resp.raise_for_status()

@flow
def my_flow():
    send_via_webhook()

if __name__ == "__main__":
    my_flow()
Option B: Use the New Relic Python agent (if you already run it) - Ensure the agent is initialized (license key/app name via config or env) - Use record_custom_event to emit an event with attributes
Copy code
import newrelic.agent
from prefect import flow
from prefect.runtime import flow_run

@flow
def my_flow():
    newrelic.agent.initialize()  # or rely on env/config
    newrelic.agent.record_custom_event(
        "PrefectFlowEvent",
        {
            "flow_run_id": flow_run.id,
            "flow_name": flow_run.flow_name,
            "custom_metric": 42,
        },
    )

if __name__ == "__main__":
    my_flow()
Notes - Choose the right endpoint for your region: - US: https://insights-collector.newrelic.com/v1/accounts/{account_id}/events - EU: https://insights-collector.eu01.nr-data.net/v1/accounts/{account_id}/events - Query your events in New Relic with something like: - SELECT * FROM PrefectFlowEvent SINCE 1 hour ago
- Keep secrets in Prefect Secret or Webhook blocks to avoid logging keys. - Retry your POSTs to handle transient network issues. - New Relic Event API docs: Event API docs Quick question to tailor this: do you prefer using the Event API directly, or are you already running the New Relic agent in the same process as your flows? Also, which region (US or EU) is your account in?
k
@Marvin I'm already running New Relic agent but see no events in the NR interface when I try to record_custom_event
m
thought for 110.4 seconds
Got it — since you’re using the New Relic Python agent and don’t see your custom events, here’s the most common fix path for Prefect flows (short-lived processes are the usual culprit): Checklist 1) Initialize and register the app before recording: - Ensure the agent is initialized and the application is registered; otherwise events can be dropped. - Wait briefly for registration. 2) Record with an application handle: - Pass the registered application to record_custom_event. 3) Flush before the process exits: - Prefect flow processes tend to be short-lived; call shutdown_agent to force a harvest/flush. 4) Verify config: - custom_insights_events.enabled must be true (default is true). - If using EU region, your agent must be configured for EU (but if you see APM data already, region is fine). 5) Query correctly: - Custom events won’t show in APM screens — use NRQL: SELECT count(*) FROM PrefectFlowEvent SINCE 30 minutes ago Minimal example in a Prefect 3.x flow
Copy code
import os
import newrelic.agent
from prefect import flow, get_run_logger
from prefect.runtime import flow_run, deployment

@flow
def my_flow():
    logger = get_run_logger()

    # Initialize if not already done by your runtime
    # newrelic.agent.initialize('/path/to/newrelic.ini')  # optional if using env vars
    app = newrelic.agent.register_application(timeout=10.0)
    if not app:
        logger.warning("New Relic app registration failed; custom events may not be sent.")
    else:
        newrelic.agent.record_custom_event(
            "PrefectFlowEvent",
            {
                "flow_run_id": str(flow_run.id),
                "flow_name": flow_run.flow_name,
                "flow_run_name": flow_run.name,
                "deployment_name": deployment.name,
            },
            application=app,
        )

    # Force a harvest/flush so short-lived Prefect runs don’t drop events
    newrelic.agent.shutdown_agent(timeout=10.0)
Common gotchas - Process exits too fast: Without shutdown_agent, the agent may not harvest in time. - Not registered: If register_application returns None, events won’t be sent. - Disabled custom events: Check config for custom_insights_events.enabled = true. - Invalid event type/attributes: Event type must start with a letter and be alphanumeric/underscore; keep attribute names simple and values serializable. - Looking in the wrong place: Use NRQL Query builder to find your events. If this doesn’t solve it: - Enable agent debug logs to see what’s happening: - NEW_RELIC_LOG=stdout - NEW_RELIC_LOG_LEVEL=debug - Share whether you’re running flows in Docker/Kubernetes or a local worker (helps decide where/how to call shutdown_agent). Docs - New Relic Python agent: Python agent docs - New Relic Event API (for reference/alternative): Event API docs
k
@Marvin Why using New Relic’s Event API directly is a recommended option?
m
thought for 24.6 seconds
Great question. For Prefect flows, using New Relic’s Event API directly is often recommended because: - Reliability in short-lived runs: Prefect flows frequently run as short-lived processes/containers. The NR agent buffers and harvests on intervals; events can be dropped if the process exits before a harvest. With the Event API, you control the POST, get a status code, and can retry/backoff deterministically. - No agent lifecycle management: You don’t need to worry about
initialize
,
register_application
, or
shutdown_agent
timing. That’s one less moving part in ephemeral jobs and parallel tasks. - Minimal dependencies and simpler packaging: A single HTTP call (or a Prefect Webhook block) avoids shipping and configuring the agent in all worker images. - Least-privilege credentials: Use an Insert (ingest) key scoped to events instead of a broad license key used by the agent. Easy to rotate via Prefect Secret/Webhook blocks. - Better observability and control: You can log payloads and inspect HTTP responses, add custom retries, batch events, and handle rate limits explicitly. - Consistent across environments: Works the same in local, Docker, K8s, and serverless; you can point to US/EU endpoints or route via proxies/VPC egress rules. - Language/runtime flexibility: You can emit from any step (Python, shell, etc.) without coupling to NR agent behavior. Trade-offs to be aware of: - You own retries/backoff and batching strategy. - No automatic correlation to APM transactions/spans unless you add linking attributes yourself (e.g., trace/span IDs). - You must manage Insert keys securely and respect NR rate limits. Docs for reference: - Event API: New Relic Event API - Insert (ingest) keys: New Relic API keys (Ingest/Insert) - Python agent custom events (alternative approach): record_custom_event docs If you’d like, I can share a small Prefect task helper that batches events with retries and uses a Webhook block or
httpx
under the hood.