How to iterate through aggregation buckets and send mail corresponding to each bucket using watcher action

309 Views Asked by At

I am trying to iterate through the aggregation bucket results. The aggregation response is :

"aggregations" : {
    "agg1" : {
      "doc_count_error_upper_bound" : 0,
      "sum_other_doc_count" : 0,
      "buckets" : [
        {
          "key" : "India",
          "doc_count" : 1,
          "agg2" : {
            "hits" : {
              "total" : {
                "value" : 2,
                "relation" : "eq"
              },
              "max_score" : 1.0,
              "hits" : [
                {
                  "_index" : "my-idx",
                  "_type" : "_doc",
                  "_id" : "yCREtoIB_SgE9GPnmqdM",
                  "_score" : 1.0,
                  "_source" : {
                    "agent" : {
                      "name" : "john"
                    },
                    "country": "India",
                    "@timestamp" : "2022-08-19T13:22:03.818Z",
                  }
                },
                {
                  "_index" : "my-idx",
                  "_type" : "_doc",
                  "_id" : "zIxEtoIBwzTD3EsaokUn",
                  "_score" : 1.0,
                  "_source" : {
                    "agent" : {
                      "name" : "jack"
                    },
                    "country": "India",
                    "@timestamp" : "2022-08-19T13:22:04.771Z"
                  }
                }
              ]
          }
        }
      },
      {
          "key" : "USA",
          "doc_count" : 1,
          "agg2" : {
            "hits" : {
              "total" : {
                "value" : 1,
                "relation" : "eq"
              },
              "max_score" : 1.0,
              "hits" : [
                {
                  "_index" : "my-idx",
                  "_type" : "_doc",
                  "_id" : "SgE9GPnmqdM",
                  "_score" : 1.0,
                  "_source" : {
                    "agent" : {
                      "name" : "harry"
                    },
                    "country": "USA",
                    "@timestamp" : "2022-08-19T13:22:03.818Z",
                  }
                }
                }
              ]
          }
        }
      }

In the above response since the number of buckets formed are 2 (i.e for India and USA), so 2 emails should be sent.

And in the 1st email (corresponding to India), the body should contain its top hits aggregation response, such as:

agent.name: "john"
@timestamp: 
country: "India"

agent.name: "jack"
@timestamp: 
country: "India"

Similarly for 2nd email (corresponding to USA), the body should contain

agent.name: "harry"
@timestamp: 
country: "USA"

I have tried with this watcher config.

{
  "trigger": {
    "schedule": {
      "interval": "30m"
    }
  },
  "input": {
    "search": {
      "request": {
        "search_type": "query_then_fetch",
        "indices": [
          "my-idx"
        ],
        "rest_total_hits_as_int": true,
        "body": {
        }
      }
    }
  },
  "transform" : {
      "script":
      """
        return [
          'test1': ctx.payload.aggregations.agg1.buckets.stream().map(a->a.agg2.hits)
              .collect(Collectors.toList())
          ]
      """
    },
  "actions": {
    "send_email": {
      "foreach": "ctx.payload.aggregations.agg1.buckets",
      "max_iterations": 100,
      "email": {
        "profile": "standard",
        "to": [
          "[email protected]"
        ],
        "subject": "Foreach Test",
        "body": {
          "html": """
          <html>
          <body>    
          <table tyle="background-color: #f9f9ff;">
          <tr>
          <td class="col1">{{ctx.payload.test1}}</td>
          </tr>
          </table> 
          </body>
          </html>
          """
        }
      }
    }
  }
}

The issue is that using the transform script, in the ctx.payload.test1, I am getting the complete hits response of top hits aggregation for both the keys (i.e India and USA) in a single email. Although I added for each loop, but it's not working.

Can anyone please help me resolve this issue ?

1

There are 1 best solutions below

5
On

Tldr;

The transform step replace completely the current context by a new one computed in the script.

... a watch transform to transform a payload into a new payload ...

So ctx.payload.aggregations.agg1.buckets should not exist anymore.

You need to use the ctx.payload.test1 to perform your iteration.