Parsing NSG Flowlogs in Azure Log Analytics Workspace to separate Public IP addresses

1.3k Views Asked by At

I have been updating a KQL query for use in reviewing NSG Flow Logs to separate the columns for Public/External IP addresses. However the data within each cell of the column contains additional information that needs to be parsed out so my excel addin can run NSLOOKUP against each cell and looking for additional insights. Later I would like to use the parse operator (https://learn.microsoft.com/en-us/azure/data-explorer/kusto/query/parseoperator) to separate this information to determine what that external IP address belongs to through nslookup, resolve-dnsname, whois , or other means.

However currently I am attempting to parse out the column, but is not comma delimited and instead uses a single space and multiple pipes. Below is my query and I would like to add a parse to this to either have a comma delimited string in a single cell [ for PublicIP (combination of Source and Destination), PublicSourceIP, and PublicDestIP. ] or break it out into multiple rows. How would parse be best used to separate this information, or is there a better operator to use to carry this out?

For Example the content could look like this
"20.xx.xx.xx|1|0|0|0|0|0 78.xxx.xxx.xxx|1|0|0|0|0|0"


AzureNetworkAnalytics_CL
| where SubType_s == 'FlowLog' and (FASchemaVersion_s == '1'or FASchemaVersion_s == '2')
| extend NSG = NSGList_s, Rule = NSGRule_s,Protocol=L4Protocol_s,  Hits = (AllowedInFlows_d + AllowedOutFlows_d + DeniedInFlows_d + DeniedOutFlows_d)
| project-away NSGList_s, NSGRule_s
| project TimeGenerated, NSG, Rule,  SourceIP = SrcIP_s, DestinationIP = DestIP_s, DestinationPort = DestPort_d, FlowStatus = FlowStatus_s, FlowDirection = FlowDirection_s, Protocol=L4Protocol_s, PublicIP=PublicIPs_s,PublicSourceIP = SrcPublicIPs_s,PublicDestIP=DestPublicIPs_s
// ## IP Address Filtering ##
| where isnotempty(PublicIP)
**| parse kind = regex  PublicIP with * "|1|0|0|0|0|0" ipnfo ' ' *
| project ipnfo**
// ## port filtering
| where DestinationPort == '443'
2

There are 2 best solutions below

1
On BEST ANSWER

Based on extract_all() followed by strcat_array() or mv-expand

let AzureNetworkAnalytics_CL = datatable (RecordId:int, PublicIPs_s:string)
[
     1 ,"51.105.236.244|2|0|0|0|0|0 51.124.32.246|12|0|0|0|0|0 51.124.57.242|1|0|0|0|0|0"
    ,2 ,"20.44.17.10|6|0|0|0|0|0 20.150.38.228|1|0|0|0|0|0 20.150.70.36|2|0|0|0|0|0 20.190.151.9|2|0|0|0|0|0 20.190.151.134|1|0|0|0|0|0 20.190.154.137|1|0|0|0|0|0 65.55.44.109|2|0|0|0|0|0"
    ,3 ,"20.150.70.36|1|0|0|0|0|0 52.183.220.149|1|0|0|0|0|0 52.239.152.234|2|0|0|0|0|0 52.239.169.68|1|0|0|0|0|0"
];
// Option 1
AzureNetworkAnalytics_CL
| project RecordId, PublicIPs = strcat_array(extract_all("(?:^| )([^|]+)", PublicIPs_s),',');
// Option 2
AzureNetworkAnalytics_CL
| mv-expand with_itemindex=i PublicIP = extract_all("(?:^| )([^|]+)", PublicIPs_s) to typeof(string)
| project RecordId, i = i+1, PublicIP

Fiddle

Option 1

RecordId PublicIPs
1 51.105.236.244,51.124.32.246,51.124.57.242
2 20.44.17.10,20.150.38.228,20.150.70.36,20.190.151.9,20.190.151.134,20.190.154.137,65.55.44.109
3 20.150.70.36,52.183.220.149,52.239.152.234,52.239.169.68

Option 2

RecordId i PublicIP
1 1 51.105.236.244
1 2 51.124.32.246
1 3 51.124.57.242
2 1 20.44.17.10
2 2 20.150.38.228
2 3 20.150.70.36
2 4 20.190.151.9
2 5 20.190.151.134
2 6 20.190.154.137
2 7 65.55.44.109
3 1 20.150.70.36
3 2 52.183.220.149
3 3 52.239.152.234
3 4 52.239.169.68
0
On

David answers your question. I would just like to add that I worked on the raw NSG Flow Logs and parsed them using kql in this way:

The raw JSON:

{"records":[{"time":"2022-05-02T04:00:48.7788837Z","systemId":"x","macAddress":"x","category":"NetworkSecurityGroupFlowEvent","resourceId":"/SUBSCRIPTIONS/x/RESOURCEGROUPS/x/PROVIDERS/MICROSOFT.NETWORK/NETWORKSECURITYGROUPS/x","operationName":"NetworkSecurityGroupFlowEvents","properties":{"Version":2,"flows":[{"rule":"DefaultRule_DenyAllInBound","flows":[{"mac":"x","flowTuples":["1651463988,0.0.0.0,192.168.1.6,49944,8008,T,I,D,B,,,,"]}]}]}}]}

kql parsing:

| mv-expand records
| evaluate bag_unpack(records)
| extend flows = properties.flows
| mv-expand flows
| evaluate bag_unpack(flows)
| mv-expand flows
| extend flowz = flows.flowTuples
| mv-expand flowz
| extend result=split(tostring(flowz), ",")
| extend source_ip=tostring(result[1])
| extend destination_ip=tostring(result[2])
| extend source_port=tostring(result[3])
| extend destination_port=tostring(result[4])
| extend protocol=tostring(result[5])
| extend traffic_flow=tostring(result[6])
| extend traffic_decision=tostring(result[7])
| extend flow_state=tostring(result[8])
| extend packets_src_to_dst=tostring(result[9])
| extend bytes_src_to_dst=tostring(result[10])
| extend packets_dst_to_src=tostring(result[11])
| extend bytes_dst_to_src=tostring(result[12])