Title
d

Doug Balog

11/11/2021, 2:32 PM
Hi, I’m trying to get a Datadog agent running in k8s to collect application metrics from my Prefect Flows. Just wondering if anybody has this working and if they could share how they did it.
k

Kevin Kho

11/11/2021, 2:53 PM
I’ve only seen people adding Datadog handlers to Prefect flows. Haven’t seen this yet myself. Let’s see if the community chimes in.
👍 1
b

Billy McMonagle

11/12/2021, 12:25 AM
Im curious what kinds of metrics you're looking for, @Doug Balog ? We're using datadog and I have setup prefect logs to be ingested into datadog, faceted on flow name, task name, etc. happy to share info about that if you're interested.
k

Kevin Kho

11/12/2021, 12:32 AM
Hey Billy, this is often requested and I’d love to archive it. Could you make a Github discussion about what you do?
b

Billy McMonagle

11/12/2021, 12:34 AM
Hey Kevin absolutely. It relies a bit on some work done by our devops team (which isn't some secret but some details are beyond my knowledge). I can share the general setup and how I am doing the logging from my prefect flows.
k

Kevin Kho

11/12/2021, 2:20 AM
Thank you!
d

Doug Balog

11/12/2021, 3:54 PM
@Billy McMonagle We have some data pipeline metrics, things like how many new/updated/deleted records. I was trying to use the datadog python library and create a few Gauages in my flow. This led me down the rabbit hole of trying to get DogStatsD running on each node in k8s and connecting to it from my prefect job through a socket in a shared volume.
b

Billy McMonagle

11/12/2021, 8:29 PM
That's a cool idea! Sorry that I don't have anything to suggest. For DogStatsD, I believe we run some sort of datadog helm chart using FluxCD.
d

Doug Balog

11/23/2021, 5:43 PM
btw, I got it working with some help from data dog. You need to use a custom job template and set the
flow.run_config = KubernetedRun(job_template_path="custom_job_template.yaml" )
apiVersion: batch/v1
kind: Job
spec:
  template:
    spec:
      containers:
        - name: flow
          volumeMounts:
            - name: dsdsocket
              mountPath: /var/run/datadog
              readOnly: false
          env:
            - name: DD_DOGSTATSD_SOCKET
              value: /var/run/datadog/dsd.socket

      volumes:
        - hostPath:
            path: /var/run/datadog/
          name: dsdsocket
And install via the data-dog helm chart. This is what I used for
values.yaml
. I haven’t had a chance to reduce to the min working config yet.
---

datadog:
  apiKeyExistingSecret: datadog-secret
  appKeyExistingSecret: datadog-secret
  dogstatsd:
    originDetection: true
    useHostPort: true
    useHostPID: true
    targetSystem: linux
    useSocketVolume: true
    nonLocalTraffic: true
  apm:
    portEnabled: true
    socketEnabled: true
  env:
    - name: DD_DOGSTATSD_SOCKET
      value: /var/run/datadog/dsd.socket

  clusterAgent:
    enabled: true
  metricsProvider:
    enabled: true

agents:
  volumes:
    - hostPath:
        path: /var/run/datadog
      name: dsdsocket
  volumeMounts:
    - name: dsdsocket
      mountPath: /var/run/datadog
      readOnly: false
Hoping that this helps the next person who wants to send custom metrics from a flow to datadog.
Almost 2 weeks ago I asked about sending custom application metrics from a Flow to DataDogStatsD on K8s. I was able to get it working. See thread for more info.
a

Anna Geller

11/23/2021, 6:25 PM
@Doug Balog that’s brilliant, thank you so much for sharing! 💯 Is it this helm chart you were referring to?
d

Doug Balog

11/23/2021, 6:52 PM
yes.
b

Billy McMonagle

11/23/2021, 6:55 PM
Very cool @Doug Balog ! What does your app code to actually send these metrics look like?
d

Doug Balog

11/23/2021, 10:49 PM
@Billy McMonagle Sorry, in marathon meetings today. I haven’t figured out what the code will look like yet. I’ve been passing a dict with stats from one task to another, each task adds it own stats, then I’ve had a final task that wrote it to a bucket as json. I’ll probably adapt that and start with something like this.
@task
def report_to_data_dog(flow_config, stats):
    # This import needs to be here to avoid cloud pickle issues with prefect.
    from datadog import initialize, statsd
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("Reporting to DataDog")
    dogstatsd_socket = os.environ.get("DD_DOGSTATSD_SOCKET")
    initialize(statsd_socket_path=dogstatsd_socket)
    tags = config.get_datadog_tags(prefect.context)

    # Stats is a List of dicts that have various stats from previous tasks.
    # for s in stats:
    #    for k,v in s.items():
    #        statsd.gauge(k,v, tags=tags)

    statsd.gauge("test.gauge", randint(0,100), tags=tags)
The
test.gauge
was just to have the simplest flow to prove that I could get it to work. Also thinking about looking into the work you did with structured logging. Not sure if I should use logs or metrics. My first thought was metrics.