<@ULVA73B9P>: How do I add to our pipelines in Pre...
# marvin-ai
j
@Marvin: How do I add to our pipelines in Prefect Cloud, to send OpenTelemetry traces/metric to our own collector, external to Prefect?
m
thought for 300.8 seconds
Great question — you can send OpenTelemetry traces (and metrics) from your Prefect 3.x flow runs to your own OTLP collector by configuring the flow-run environment and (optionally) initializing the OpenTelemetry SDK early in your code. High-level steps: 1) Make sure your flow runtime has OpenTelemetry packages - Add these to your image/venv:
Copy code
pip install "opentelemetry-sdk>=1.20" "opentelemetry-exporter-otlp>=1.20"
2) Point flow runs at your collector via environment variables - Set the standard OTEL env vars on the work pool job template so every flow run inherits them. Example
prefect.yaml
snippet:
Copy code
deployments:
- name: send-otel
  work_pool:
    name: k8s-pool  # or docker/process/etc.
  job_variables:
    env:
      OTEL_EXPORTER_OTLP_ENDPOINT: "<http://otel-collector.default.svc:4317>"
      OTEL_EXPORTER_OTLP_PROTOCOL: "grpc"            # or "http/protobuf"
      OTEL_SERVICE_NAME: "prefect-flows"
      OTEL_RESOURCE_ATTRIBUTES: "service.namespace=data-platform,telemetry.distro=prefect"
      # Optional auth/header(s)
      # OTEL_EXPORTER_OTLP_HEADERS: "authorization=Bearer YOUR_TOKEN"
Docs on customizing job variables: Customize job variables - This ensures the variables propagate regardless of worker type (Kubernetes/Docker/Process). If you prefer, you can also set these on the worker process itself so worker-level traces/metrics are exported too. 3) (Recommended) Initialize the OpenTelemetry SDK early so Prefect’s spans use your exporter - Prefect uses OpenTelemetry under the hood. Setting the global tracer/meter provider at process start guarantees Prefect’s internal spans and your custom spans go to your collector. - Put this at the top of your flow module (before defining/running flows), or import from a small `otel_setup.py`:
Copy code
# Requires: opentelemetry-sdk, opentelemetry-exporter-otlp

import os
from opentelemetry import trace, metrics
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader

# Set resource attributes; add your own org/team identifiers
resource = Resource.create({
    "service.name": os.getenv("OTEL_SERVICE_NAME", "prefect-flows"),
    "service.namespace": "data-platform",
})

# Tracing
tracer_provider = TracerProvider(resource=resource)
tracer_provider.add_span_processor(
    BatchSpanProcessor(
        OTLPSpanExporter(
            endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "<http://otel-collector:4317>"),
            insecure=True  # set accordingly for your endpoint
        )
    )
)
trace.set_tracer_provider(tracer_provider)

# Metrics (optional)
metric_reader = PeriodicExportingMetricReader(
    OTLPMetricExporter(
        endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "<http://otel-collector:4317>"),
        insecure=True
    )
)
meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader])
metrics.set_meter_provider(meter_provider)
4) Add custom spans/metrics where helpful - Custom spans inside a flow:
Copy code
from prefect import flow, task
from opentelemetry import trace

@task
def work():
    tracer = trace.get_tracer(__name__)
    with tracer.start_as_current_span("expensive-step") as span:
        # annotate spans with useful attributes
        span.set_attribute("component", "feature_x")
        # do work...
        return 1

@flow
def my_flow():
    return work()

if __name__ == "__main__":
    my_flow()
- Custom metrics: ``` from opentelemetry import metrics meter = metrics.get_meter("prefect.custom")
items_processed = meter.create_counter("items_processed", unit="1", description="Items processed") # Use inside your task/flow code items_processed.add(1, {"component": "feature_x"}) ``` Notes and references: - Prefect 3.x telemetry API (internal): prefect.telemetry.bootstrap and prefect.telemetry.run_telemetry - Settings/how to set env via deployments/work pools: Customize job variables A few practical tips: - Protocol/port: gRPC uses 4317; HTTP/protobuf uses 4318. Match your collector. - Headers/auth: set
OTEL_EXPORTER_OTLP_HEADERS
if your collector requires auth. - Service naming: keep
OTEL_SERVICE_NAME
stable so your backend aggregates correctly; add
service.namespace
via
OTEL_RESOURCE_ATTRIBUTES
. - Workers vs flows: If you also want worker process telemetry, set the same env vars on the worker’s environment (e.g., the Kubernetes Deployment or Docker service that runs the worker). Questions so I can tailor examples: - Which work pool type are you using (Kubernetes, Docker, Process, ECS, etc.)? - Which collector/backend and protocol (gRPC vs HTTP) do you prefer? - Do you want telemetry from just flows/tasks, or also from the worker processes?
j
@Marvin: Would I need to disable Prefect’s own OLTP?
m
thought for 159.2 seconds
Do you mean OTLP (OpenTelemetry Protocol)? If so: no—there’s nothing you need to “disable” in Prefect. Prefect 3.x uses OpenTelemetry APIs internally, but it does not ship traces/metrics to any OTLP endpoint by default. When you configure an exporter (via OTEL_* environment variables or by initializing the SDK in your code), Prefect’s spans will flow to your collector. Just make sure you only set one global tracer/meter provider to avoid duplicate exports. References: - Customize env on work pools/deployments: Customize job variables - Telemetry internals: prefect.telemetry.run_telemetry If you were instead asking about Prefect’s product analytics (separate from OTLP), let me know and I’ll share how to turn that off as well.
j
@Marvin: I mean PREFECT_CLOUD_ENABLE_ORCHESTRATION_TELEMETRY?
m
thought for 74.9 seconds
Short answer: No—you don’t need to change that for OTLP.
PREFECT_CLOUD_ENABLE_ORCHESTRATION_TELEMETRY
controls Prefect Cloud’s own orchestration telemetry (internal to Prefect Cloud). It does not configure or interfere with OpenTelemetry/OTLP export to your collector. - Default:
true
- Effect: Enables/disables Prefect Cloud’s internal orchestration telemetry - Not related to OTLP exporters; your OTEL_* env vars and SDK setup will still send traces/metrics to your collector regardless of this flag. If you do want to turn it off:
Copy code
PREFECT_CLOUD_ENABLE_ORCHESTRATION_TELEMETRY=false
Docs: Settings reference (see CloudSettings:
enable_orchestration_telemetry
) If you share which work pool type you’re using, I can show exactly where to set this env var (and your OTEL_* vars) so it propagates to flow runs.