I am implementing Opentelemetry Collector in Kubernetes. I have a fluentbit set up for log forwarding. Logs are forwarded to OpenTelemetry fluentforward endpoint and I need to parse the log data and export it to Loki. **The goal: ** add labels to log data and then export to Loki.
**The problem: ** Attribute includes a data field which is a type Map . I need to assign the fields of this map to new Attributes fields. I have not able to accomplish this with the following OTEL Collector set up.
receivers:
fluentforward:
endpoint: 0.0.0.0:8006
exporters:
logging:
verbosity: detailed
loki:
endpoint: "http://loki.monitoring.svc.cluster.local:3100/loki/api/v1/push"
headers:
"X-Scope-OrgID": otel
processors:
batch:
transform:
log_statements:
- context: log
statements:
- merge_maps(attributes, attributes["data"]["kubernetes"], "upsert") #Fails with an error saying unexpected token [
- merge_maps(attributes, ParseJSON(attributes["data"]), "upsert") # Fails saying the expected value is a String but received pcommons.Map
- set(attributes["container_name"], attributes["data"].kubernetes.container_name) # Sets foo to data instead of data.kubernetes
attributes:
actions:
- action: insert
key: loki.attribute.labels
value: namespace_name, container_name
- action: insert
key: container_name
from_attribute: data.kubernetes.container_name # Just container_name also does not work
- action: insert
key: namespace_name
from_attribute: namespace_name #Sets it to attributes["data"]
resource:
attributes:
- action: insert
key: loki.resource.labels
value: host_name, pod_name
- action: insert
key: host_name
value: guarana
- action: insert
key: pod_name
value: guarana-pod-01
cumulativetodelta:
k8sattributes:
auth_type: "serviceAccount"
passthrough: false
filter:
node_from_env_var: KUBE_NODE_NAME
pod_association:
- sources:
- from: resource_attribute
name: k8s.pod.ip
extract:
metadata:
- k8s.pod.name
- k8s.pod.uid
- k8s.deployment.name
- k8s.namespace.name
- k8s.node.name
- k8s.pod.start_time
resourcedetection:
detectors: [env, aks, azure]
timeout: 2s
override: false
extensions:
memory_ballast:
health_check:
zpages:
endpoint: :55679
service:
telemetry:
logs:
level: "debug"
extensions: [zpages, health_check, memory_ballast]
pipelines:
logs:
receivers: [fluentforward]
processors: [ attributes, resource, transform]
exporters: [loki, logging/debug]
Here is the partial output data from otel collector output:
Timestamp: 2023-04-06 05:02:48.73177434 +0000 UTC
SeverityText:
SeverityNumber: Unspecified(0)
Body: Empty()
Attributes:
-> data: Map({"kubernetes":{"annotations":{"checksum/config":"fff"},"container_hash":"fff","container_image":"image_name","container_name":"ingester","docker_id":"234","host":"host1","namespace_name":"monitoring","pod_id":"654","pod_name":"some_pod"},"logtag":"F","message":"info log"","stream":"stderr","time":"2023-04-06T05:02:48.730945334Z"})
-> type: Str(logs)
-> agg: Str(logs)
-> fluent.tag: Str(sometag.log)
-> loki.attribute.labels: Str(http_status_code, container_name, namespace_name)