i'm trying to parse suricata, logstash and opensearch logs with dictionary filter
here's part of my config
input {
file {
path =\> "/opt/logs/opensearchTest/opensearch_server.json"
codec =\> "json"
type =\> "opensearch_log"
}
}
filter {
if \[type\] == "opensearch_log" {
if \[level\] == "ERROR" {
mutate {
add_field =\> { "translated_values" =\> "\*\*\*\*\*\*\*\*\*\* " }
}
} else {
translate {
source =\> "message"
target =\> "translated_field"
dictionary =\> {
"***" =\> "***"
"**" =\> "**"
"***" =\> "***\*\*\*\*\*\*\*"
}
fallback =\> ""
}
mutate {
add_field => { "translated_values" => "%{translated_field}" }
remove_field => [ "message", "translated_field" ]
}
}
}
}
output {
else if \[type\] == "logstash_log" or \[type\] == "suricata_log" or \[type\] == "opensearch_log" {
if \[translated_values\] {
http {
http_method =\> "post"
url =\> "http://\*\*\*\*\*\*\*\*\*\*\*\*/"
format =\> "form"
headers => {
"Authorization" => "Token *********"
"Content-Type" => "application/json"
}
mapping => ["action_result", "True", "action_type", "add_audit", "comment", "%{translated_values}"]
}
}
}
}
and i've got the same pattern for logstash and suricata, that perfectly works, but with opensearch logs i receive this error:
JSON parse error, original data now in message field {:message=>"incompatible json object type=java.lang.String , only hash map or arrays are supported", :exception=>LogStash::Json::ParserError, :data=>"\"type\": \"server\", \"timestamp\": \"2023-10-19T00:00:00,834+03:00\", \"level\": \"INFO\", \"component\": \"o.o.c.m.MetadataUpdateSettingsService\", \"cluster.name\": \"opensearch\", \"node.name\": \"astra-altar-wazuh\", \"message\": \"updating number_of_replicas to [0] for indices [wazuh-monitoring-2023.42w]\", \"cluster.uuid\": \"7wgF5XaTRuqjNBptO52m-g\", \"node.id\": \"JBRbEq1VTey6vZjMxcWPfQ\"
here's example of opensearch logs:
{"type": "server", "timestamp": "2023-10-19T11:33:04,042+03:00", "level": "INFO", "component": "o.o.p.PluginsService", "cluster.name": "opensearch", "node.name": "astra-altar-wazuh", "message": "PluginService:onIndexModule index:[wazuh-monitoring-2023.37w/mbsyMgM7STWGQ6zkR6FdHw]", "cluster.uuid": "7wgF5XaTRuqjNBptO52m-g", "node.id": "JBRbEq1VTey6vZjMxcWPfQ" } {"type": "server", "timestamp": "2023-10-19T11:33:04,045+03:00", "level": "INFO", "component": "o.o.p.PluginsService", "cluster.name": "opensearch", "node.name": "astra-altar-wazuh", "message": "PluginService:onIndexModule index:[.kibana_1/0XetZAI8QqSmtSAOuOrsxg]", "cluster.uuid": "7wgF5XaTRuqjNBptO52m-g", "node.id": "JBRbEq1VTey6vZjMxcWPfQ" } {"type": "server", "timestamp": "2023-10-19T11:33:04,123+03:00", "level": "INFO", "component": "o.o.c.r.a.AllocationService", "cluster.name": "opensearch", "node.name": "astra-altar-wazuh", "message": "Cluster health status changed from [RED] to [YELLOW] (reason: [shards started [[.kibana_1][0]]]).", "cluster.uuid": "7wgF5XaTRuqjNBptO52m-g", "node.id": "JBRbEq1VTey6vZjMxcWPfQ" } {"type": "server", "timestamp": "2023-10-19T11:34:25,979+03:00", "level": "INFO", "component": "o.o.n.Node", "cluster.name": "opensearch", "node.name": "astra-altar-wazuh", "message": "stopping ...", "cluster.uuid": "7wgF5XaTRuqjNBptO52m-g", "node.id": "JBRbEq1VTey6vZjMxcWPfQ" }
The error message you're encountering suggests that the data that Logstash is trying to parse JSON isn't valid JSON.
In your case, the OpenSearch logs you've shown appear to be multiple JSON objects per line, which is not a valid JSON format.
Logstash's JSON codec expects each line to contain a single JSON object. You need to adjust the input so that each line of your OpenSearch logs contains a single JSON object.
One way to achieve this is by using a multiline filter in Logstash to combine the lines until a complete JSON object is formed, and then use the JSON codec.
You can use the below configuration:
In this configuration, the multiline codec is used to combine lines that start with "{"(indicating the start of a JSON object) into a single event, and then the JSON codec is applied to parse the combined event.