KQL - My query only seems to work with limited data sets and I can't get my query to show me old data

249 Views Asked by At

This is probably a pretty basic issue, but I'm new to KQL so I can't make sense of the resources online.

Basically, I need a query that can read IPs on a watchlist and return results that include an IP that has not had any activity in 30 days or more.

This is the base of my query:

_GetWatchlist('Testing_Watchlist')
| join 
(
    CommonSecurityLog
    | summarize arg_max(TimeGenerated, *) by TimeGenerated
) on $left.IPAddress == $right.SourceIP
| project-keep LastUpdatedTimeUTC, TimeGenerated, IPAddress, DeviceAction
| project-reorder TimeGenerated, IPAddress, DeviceAction, LastUpdatedTimeUTC

Currently the test watchlist has 4 IP on it and it returns those, without any problem. When I added more IPs, 6 in this case, I started getting errors regarding my query consuming too many resources. Is there something in there that could be made more efficient? I set it so I only get the last written log for each IP because that's really the only information relevant to the objective of this query.

The second issue is regardless of how many IPs I have in the watchlist

When I add

| where TimeGenerated <= 30days

Nothing comes up, the query runs, but I don't get any results. I manually when through the common security log for each IP and I verified there is data that is older than 30 days so it should be working? But maybe my line is incorrect?

I tried >= that only returns data within 30 days.

I did some reading and someone suggested that I clear my cache after updating the watchlist with more IPs and running the query, that didn't help.

1

There are 1 best solutions below

1
On

I have used sample data to get the result. Assuming _GetWatchlist() function is returning a list of IPs for which you wanted the records from the table CommonSecurityLog. For same I have created a table with name listIps. I have created another sample table named CommonSecurityLog with the columns primarily used in the query you shared.

Query Explanation: -

  • Step1: - First, I used arg_max() on CommonSecurityLog for TimeGenerated column by SourceIP, which will return the latest record per IP based on the values of TimeGenerated column from the CommonSecurityLog table.
    The arg_max() criteria which you used would have returned you the same count of records as the original table. The arg_max() criteria which has been used in below code should significantly reduce the number of records as the output of this should be one record per IP. It should significantly improve the performance.

  • Step2: - Then used |where TimeGenerated < ago(30d) condition to get IPs which has no activities in the last 30 days. This where clause is on a set of data which has the last record of an IP. Thereby, ensuring that the IP chosen doesn't have any record with in the last 30 days.

  • Step3: - Finally using Inner Join on these datasets which has data of greater than 30 days and IPs of ListIps on IPAddress, we get the list of latest records of those particular selected IPs.

Code

let  ListIps = datatable (IPAddress : string)
[
"192.158.1.38",
"192.158.1.40",
"192.158.1.45"
];
let  CommonSecurityLog = datatable (TimeGenerated : datetime ,SourceIP: string)
[
datetime(2023-03-16 07:43:44.757),"192.158.1.38",
datetime(2023-03-10 07:43:44.067),"192.158.1.15",
datetime(2023-02-28 07:43:44.757),"192.158.1.42",
datetime(2023-02-17 07:43:44.757),"192.158.1.38",
datetime(2023-02-22 07:43:44.857),"192.158.1.38",
datetime(2023-02-16 07:43:44.757),"192.158.1.40",
datetime(2023-02-18 07:43:44.757),"192.158.1.38",
datetime(2023-02-14 07:43:44.757),"192.158.1.42",
datetime(2023-02-06 07:43:44.757),"192.158.1.50",
datetime(2023-01-30 07:43:44.757),"192.158.1.41",
];
CommonSecurityLog
|summarize arg_max(TimeGenerated,*) by SourceIP
| where  TimeGenerated < ago(30d)
| join kind=inner ListIps  on  $left.SourceIP == $right.IPAddress

Result

enter image description here

If there is still performance issue, then you can use hint.shufflekey = key in the summarize and join command by rewriting the query as below.

let listIps = datatable (IPAddress : string)
[
    "192.158.1.38",
    "192.158.1.40",
    "192.158.1.45"
];
let CommonSecurityLog = datatable (TimeGenerated : datetime ,SourceIP: string)
[
    datetime(2023-03-16 07:43:44.757),"192.158.1.38",
    datetime(2023-03-10 07:43:44.067),"192.158.1.15",
    datetime(2023-02-28 07:43:44.757),"192.158.1.42",
    datetime(2023-02-17 07:43:44.757),"192.158.1.38",
    datetime(2023-02-22 07:43:44.857),"192.158.1.38",
    datetime(2023-02-16 07:43:44.757),"192.158.1.40",
    datetime(2023-02-18 07:43:44.757),"192.158.1.38",
    datetime(2023-02-14 07:43:44.757),"192.158.1.42",
    datetime(2023-02-06 07:43:44.757),"192.158.1.50",
    datetime(2023-01-30 07:43:44.757),"192.158.1.41",
];
CommonSecurityLog
|summarize hint.shufflekey = SourceIP arg_max(TimeGenerated,*) by SourceIP 
| where TimeGenerated < ago(30d)
| join kind=inner hint.shufflekey = SourceIP listIps on $left.SourceIP == $right.IPAddress

For more detail on improving query performance using hint strategy please refer.