Elapsed filter joining elapses_time from other dir files resulting in false time duration

105 Views Asked by At

https://stackoverflow.com/a/51825609/16120054

Hi All,

Based on the above link solution, has this to be implemented with pipeline.workers 1 in conf settings?

Can anyone please advise?

3

There are 3 best solutions below

7
On

The aggregate filter needs pipeline.workers set to 1 to work correctly and while the elapsed filter does not says anything about the number of workers in the documentation, it is also recommended to set the pipeline.workers to 1.

Both of these filters needs the events to pass through the pipeline on the same thread, so to do that you need pipeline.workers set to 1.

1
On
input { 
    file    {
        path => "/home/dev*/status.log*"
        exclude => "status.log.10"
        start_position => "beginning"
        sincedb_path => "/dev/null"
#       sincedb_path => "/home/dev/db/devdb"
        file_sort_by => "path"
        file_sort_direction => "desc"
        }
}



output
{
stdout { codec => rubydebug }
}

filter {


if [path] =~ "dev1" 
{
mutate
{
replace => { "host" => "dev1" }
}
}
else if [path] =~ "dev2" 
{
mutate
{
replace => { "host" => "dev2" }
}
}
else if [path] =~ "dev3" 
{
mutate
{
replace => { "host" => "dev3" }
}
}
else if [path] =~ "dev4" 
{
mutate
{
replace => { "host" => "dev4" }
}
}


if [message] =~ "devManager"
{
grok
{
match => { "message" => "(?<logtime>%{DAY} %{MONTH} %{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND}).*= %{BASE10NUM:status}" }
                    
}
date
{
match =>  [ "logtime", "EEE MMM dd HH:mm:ss.SSS" ] 
}

    if [status] == "0" {
    mutate
    {
      update => { "status" => "down" }
    }
    }
    else if [status] == "1" {
    mutate
    {
      update => { "status" => "up" }
    }
    }

mutate
{
add_tag => [ "%{status}" ]
}

elapsed
{
start_tag => "up"
end_tag => "down"
unique_id_field => "host"
timeout => 86400
}

elapsed
{
start_tag => "down"
end_tag => "up"
unique_id_field => "host"
timeout => 86400
}

if "up" in [tags] and [host]
{
mutate
{
add_field => { "host_down" => "%{elapsed_time}" }
}
mutate
{
convert =>
{
"host_down" => "float"
}
}
}



else if "down" in [tags] and [host]
{
mutate
{
add_field => { "host_up" => "%{elapsed_time}" }
}
mutate
{
convert =>
{
"host_up" => "float"
}
}
}


mutate
{
rename => { 
"status" => "%{host}_status" 
"host_up" => "%{host}_up"
"host_down" => "%{host}_down"
}
remove_field => [ "info" , "@version"    ]
}
}        
     
else { drop { } }

Here is the conf file which i am using with one worker. the path - "dev*" has dev1 to dev12 folders, which are to be read from.

log sample is as below;

/dev/status.log
Wed Jun 09 22:26:37.296  devManager: status = 1
Wed Jun 09 23:09:40.191  devManager: status = 0
Wed Jun 09 23:10:17.064  devManager: status = 0
Wed Jun 09 23:11:14.692  devManager: status = 1

@leandrojmp

0
On

I am thinking of another way. May be a little extra step to bring all my devs data to be ordered by one pipeline. Here's the process;

  1. Make pipeline to capture all the status = 0/1 from all devices, lets name it grabber.conf
  2. grabber.conf will do grok, and output to a status.log on disk. This status.log will have all the data on the status from all devs.
  3. grabber.conf will have input { exec { command => "sort...." } } . This sort will be directed to the status.log to sort all the dates in order and save itself. ( the interval for exec is trivial ).
  4. Make another pipeline to just execute elapsed filter on all the status from devs by adding tags to them as start and end. Lets name it durations.conf
  5. So, no matter what order of dates the logs are gathered and saved on status.log, the sort command will arrange them in order and make it ready for durations.conf to do its elapsed calculations.

Another risk of duplication of data can be avoided by adding fingerprint to the durations.conf.

Let me know if this could be an alternative solution to my query.