Access the keys of an attribute with Map type in Opentelemetry Collector

1k Views Asked by At

I am implementing Opentelemetry Collector in Kubernetes. I have a fluentbit set up for log forwarding. Logs are forwarded to OpenTelemetry fluentforward endpoint and I need to parse the log data and export it to Loki. **The goal: ** add labels to log data and then export to Loki.

**The problem: ** Attribute includes a data field which is a type Map . I need to assign the fields of this map to new Attributes fields. I have not able to accomplish this with the following OTEL Collector set up.

receivers:
  fluentforward:
    endpoint: 0.0.0.0:8006
exporters:
  logging:
    verbosity: detailed
  loki:
    endpoint: "http://loki.monitoring.svc.cluster.local:3100/loki/api/v1/push"
    headers:
      "X-Scope-OrgID": otel
processors:
  batch:
  transform:
    log_statements:
      - context: log
        statements:
          - merge_maps(attributes, attributes["data"]["kubernetes"], "upsert") #Fails with an error saying unexpected token [
          - merge_maps(attributes, ParseJSON(attributes["data"]), "upsert") # Fails saying the expected value is a String but received pcommons.Map
          - set(attributes["container_name"], attributes["data"].kubernetes.container_name) # Sets foo to data instead of data.kubernetes

  attributes:
    actions:
    - action: insert
      key: loki.attribute.labels
      value: namespace_name, container_name
    - action: insert
      key: container_name
      from_attribute: data.kubernetes.container_name # Just container_name also does not work 
    - action: insert
      key: namespace_name
      from_attribute: namespace_name #Sets it to attributes["data"]
  resource:
    attributes:
    - action: insert
      key: loki.resource.labels
      value: host_name, pod_name
    - action: insert
      key: host_name
      value: guarana
    - action: insert
      key: pod_name
      value: guarana-pod-01
  cumulativetodelta:
  k8sattributes:
    auth_type: "serviceAccount"
    passthrough: false
    filter:
      node_from_env_var: KUBE_NODE_NAME
    pod_association:
      - sources:
        - from: resource_attribute
          name: k8s.pod.ip
    extract:
      metadata:
        - k8s.pod.name
        - k8s.pod.uid
        - k8s.deployment.name
        - k8s.namespace.name
        - k8s.node.name
        - k8s.pod.start_time
  resourcedetection:
    detectors: [env, aks, azure]
    timeout: 2s
    override: false
extensions:
  memory_ballast: 
  health_check:
  zpages:
    endpoint: :55679
service:
  telemetry:
    logs:
      level: "debug"
  extensions: [zpages, health_check, memory_ballast]
  pipelines:
    logs:
      receivers: [fluentforward]
      processors: [ attributes, resource, transform]
      exporters: [loki, logging/debug]

Here is the partial output data from otel collector output:


Timestamp: 2023-04-06 05:02:48.73177434 +0000 UTC
SeverityText:
SeverityNumber: Unspecified(0)
Body: Empty()
Attributes:
     -> data: Map({"kubernetes":{"annotations":{"checksum/config":"fff"},"container_hash":"fff","container_image":"image_name","container_name":"ingester","docker_id":"234","host":"host1","namespace_name":"monitoring","pod_id":"654","pod_name":"some_pod"},"logtag":"F","message":"info log"","stream":"stderr","time":"2023-04-06T05:02:48.730945334Z"})
     -> type: Str(logs)
     -> agg: Str(logs)
     -> fluent.tag: Str(sometag.log)
     -> loki.attribute.labels: Str(http_status_code, container_name, namespace_name)
0

There are 0 best solutions below