Kate
09/29/2025, 10:42 AMMarvin
09/29/2025, 10:42 AMMarvin
09/29/2025, 10:47 AMfrom datetime import datetime, timezone
import time
import httpx
from prefect import flow, task, get_run_logger
from prefect.runtime import flow_run, deployment
from prefect.blocks.system import Secret
NR_US_EVENTS_URL = "<https://insights-collector.newrelic.com/v1/accounts/{account_id}/events>"
# For EU: "<https://insights-collector.eu01.nr-data.net/v1/accounts/{account_id}/events>"
@task(retries=3, retry_delay_seconds=5)
def send_new_relic_event(account_id: int, event_type: str, attributes: dict):
insert_key = Secret.load("new-relic-insert-key").get()
# enrich with Prefect metadata
payload = {
"eventType": event_type,
"timestamp": int(time.time() * 1000), # ms epoch
"flow_run_id": flow_run.id,
"flow_name": flow_run.flow_name,
"flow_run_name": flow_run.name,
"deployment_name": deployment.name,
"deployment_version": deployment.version,
"parameters": flow_run.parameters,
"tags": flow_run.tags,
**attributes,
}
url = NR_US_EVENTS_URL.format(account_id=account_id)
headers = {
"Content-Type": "application/json",
"X-Insert-Key": insert_key, # Insert key header
}
with httpx.Client(timeout=10) as client:
r = <http://client.post|client.post>(url, json=[payload], headers=headers) # list = batch of events
r.raise_for_status()
return r.status_code
@flow
def my_flow():
logger = get_run_logger()
status = send_new_relic_event(
account_id=1234567,
event_type="PrefectFlowEvent",
attributes={"stage": "prod", "custom_metric": 42},
)
<http://logger.info|logger.info>(f"New Relic event sent (HTTP {status})")
if __name__ == "__main__":
my_flow()
Alternative using a Webhook block (no extra deps):
- Create a Webhook block named new-relic-events with:
- URL: US or EU event endpoint for your account
- Headers (SecretDict): X-Insert-Key: <your-insert-key>, Content-Type: application/json
- Then call it in your flow:
from prefect import flow, task
from prefect.runtime import flow_run, deployment
from prefect.blocks.webhook import Webhook
import time
@task
def send_via_webhook():
wh = Webhook.load("new-relic-events")
payload = {
"eventType": "PrefectFlowEvent",
"timestamp": int(time.time() * 1000),
"flow_run_id": flow_run.id,
"flow_name": flow_run.flow_name,
"deployment_name": deployment.name,
}
resp = wh.call(payload=[payload])
resp.raise_for_status()
@flow
def my_flow():
send_via_webhook()
if __name__ == "__main__":
my_flow()
Option B: Use the New Relic Python agent (if you already run it)
- Ensure the agent is initialized (license key/app name via config or env)
- Use record_custom_event to emit an event with attributes
import newrelic.agent
from prefect import flow
from prefect.runtime import flow_run
@flow
def my_flow():
newrelic.agent.initialize() # or rely on env/config
newrelic.agent.record_custom_event(
"PrefectFlowEvent",
{
"flow_run_id": flow_run.id,
"flow_name": flow_run.flow_name,
"custom_metric": 42,
},
)
if __name__ == "__main__":
my_flow()
Notes
- Choose the right endpoint for your region:
- US: https://insights-collector.newrelic.com/v1/accounts/{account_id}/events
- EU: https://insights-collector.eu01.nr-data.net/v1/accounts/{account_id}/events
- Query your events in New Relic with something like:
- SELECT * FROM PrefectFlowEvent SINCE 1 hour agoMarvin
09/29/2025, 10:47 AMKate
09/29/2025, 11:29 AMMarvin
09/29/2025, 11:29 AMMarvin
09/29/2025, 11:31 AMimport os
import newrelic.agent
from prefect import flow, get_run_logger
from prefect.runtime import flow_run, deployment
@flow
def my_flow():
logger = get_run_logger()
# Initialize if not already done by your runtime
# newrelic.agent.initialize('/path/to/newrelic.ini') # optional if using env vars
app = newrelic.agent.register_application(timeout=10.0)
if not app:
logger.warning("New Relic app registration failed; custom events may not be sent.")
else:
newrelic.agent.record_custom_event(
"PrefectFlowEvent",
{
"flow_run_id": str(flow_run.id),
"flow_name": flow_run.flow_name,
"flow_run_name": flow_run.name,
"deployment_name": deployment.name,
},
application=app,
)
# Force a harvest/flush so short-lived Prefect runs don’t drop events
newrelic.agent.shutdown_agent(timeout=10.0)
Common gotchas
- Process exits too fast: Without shutdown_agent, the agent may not harvest in time.
- Not registered: If register_application returns None, events won’t be sent.
- Disabled custom events: Check config for custom_insights_events.enabled = true.
- Invalid event type/attributes: Event type must start with a letter and be alphanumeric/underscore; keep attribute names simple and values serializable.
- Looking in the wrong place: Use NRQL Query builder to find your events.
If this doesn’t solve it:
- Enable agent debug logs to see what’s happening:
- NEW_RELIC_LOG=stdout
- NEW_RELIC_LOG_LEVEL=debug
- Share whether you’re running flows in Docker/Kubernetes or a local worker (helps decide where/how to call shutdown_agent).
Docs
- New Relic Python agent: Python agent docs
- New Relic Event API (for reference/alternative): Event API docsKate
09/29/2025, 11:39 AMMarvin
09/29/2025, 11:39 AMMarvin
09/29/2025, 11:40 AMinitialize
, register_application
, or shutdown_agent
timing. That’s one less moving part in ephemeral jobs and parallel tasks.
- Minimal dependencies and simpler packaging: A single HTTP call (or a Prefect Webhook block) avoids shipping and configuring the agent in all worker images.
- Least-privilege credentials: Use an Insert (ingest) key scoped to events instead of a broad license key used by the agent. Easy to rotate via Prefect Secret/Webhook blocks.
- Better observability and control: You can log payloads and inspect HTTP responses, add custom retries, batch events, and handle rate limits explicitly.
- Consistent across environments: Works the same in local, Docker, K8s, and serverless; you can point to US/EU endpoints or route via proxies/VPC egress rules.
- Language/runtime flexibility: You can emit from any step (Python, shell, etc.) without coupling to NR agent behavior.
Trade-offs to be aware of:
- You own retries/backoff and batching strategy.
- No automatic correlation to APM transactions/spans unless you add linking attributes yourself (e.g., trace/span IDs).
- You must manage Insert keys securely and respect NR rate limits.
Docs for reference:
- Event API: New Relic Event API
- Insert (ingest) keys: New Relic API keys (Ingest/Insert)
- Python agent custom events (alternative approach): record_custom_event docs
If you’d like, I can share a small Prefect task helper that batches events with retries and uses a Webhook block or httpx
under the hood.