Logstash [aggregate filter] to pass data between events

1.5k Views Asked by At

i am currently working on a project with the Elastic stack for a log monitoring system. The logs i have to load are in a specific format so i have to write my own logstash scripts to read them. In particular one type of logs where i have a date in the start of the file and the timestamp in each of the other lines has no date, my goal is to extract the date from the first line and add it to all the next ones, after some research i found that the aggregate filter can help but i can't get it to work, here is my config file :

input
{
    file {
        path => "F:/ELK/data/testFile.txt"
        #path => "F:/ELK/data/*/request/*"
        start_position => "beginning"
        sincedb_path => "NUL"
    }
}
filter
{
    mutate {
        add_field => { "taskId" => "all" }
    }

        grok
        {
            match => {"message" => "-- %{NOTSPACE} %{NOTSPACE}: %{DAY}, %{MONTH:month} %{MONTHDAY:day}, %{YEAR:year}%{GREEDYDATA}"}
            tag_on_failure => ["not_date_line"]
        }

        
    
    if "not_date_line" not in [tags]
    {
        mutate{
            replace => {'taskId' => "%{day}/%{month}/%{year}"}
            remove_field => ["day","month","year"]
        }

        aggregate
        {
            task_id => "%{taskId}"
            code => "map['taskId'] = event.get('taskId')"
            map_action => "create"
        }
    }
    else
    {
        dissect
        {
            mapping => { message => "%{sequence_index}  %{time} %{pid}  %{puid} %{stack_level}  %{operation}    %{params}   %{op_type}  %{form_event}   %{op_duration}"}
        }

        aggregate {
            task_id => "%{taskId}"
            code => "event.set('taskId', map['taskId'])"
            map_action => "update"
            timeout => 0
        }
        mutate
        {
            strip => ["op_duration"]
            replace => {"time" => "%{taskId}-%{time}"}
        }
    }
    
    mutate
    {
        remove_field => ['@timestamp','host','@version','path','message','tags']
    }
}
output 
{
    stdout{}
}

the scripts reads the date correctly but then doesn't work to replace the value in the other events :


{
    "taskId" => "22/October/2020"
}
{
               "pid" => "45",
    "sequence_index" => "10853799",
           "op_type" => "1",
              "time" => "all-16:23:29:629",
            "params" => "90",
       "stack_level" => "0",
       "op_duration" => "",
         "operation" => "10",
        "form_event" => "0",
            "taskId" => "all",
              "puid" => "1724"
}

I am using only one worker to ensure the order of the events is kept intact , if you know of any other way to achieve this i'm open to suggestions, thank you !

2

There are 2 best solutions below

0
On BEST ANSWER

Thanks to @Badger and some other post he answered on the elastic forum, i found a solution using a single ruby filter and an instance variable, couldn't get it to work with the aggregate filter but that is not an issue for me.

ruby
{
    init => '@date = ""'
    code => "
        event.set('date',@date) unless @date.empty?
        @date = event.get('date') unless event.get('date').empty?
    "
}
3
On

For the lines which have a date you are setting the taskId to "%{day}/%{month}/%{year}", for the rest of the lines you are setting it to "all". The aggregate filter will not aggregate across events with different task ids.

I suggest you use a constant taskId and store the date in some other field, then in a single aggregate filter you can use something like

code => '
    date = event.get("date")
    if date
        @date = date
    else
        event.set("date", @date)
    end
'

@date is an instance variable, so its scope is limited to that aggregate filter, but it is preserved across events. It is not shared with other aggregate filters (that would require a class variable or a global variable).

Note that you require event order to be preserved, so you should set pipeline.workers to 1.