I use R to process log files of zigbee2mqtt which logs entries like this
head(data)
[1] "info 2024-03-11 14:08:01: MQTT publish: topic 'zigbee2mqtt/TempBadEG', payload '{\"battery\":17,\"humidity\":50.1,\"linkquality\":105,\"temperature\":17.25,\"voltage\":2500}'"
[2] "info 2024-03-11 14:08:04: MQTT publish: topic 'zigbee2mqtt/MotionBadOben', payload '{\"battery\":20,\"battery_low\":true,\"illuminance\":37914,\"illuminance_lux\":12369,\"led_control\":\"off\",\"linkquality\":69,\"occupancy\":false,\"occupancy_timeout\":60,\"tamper\":false,\"temperature\":17,\"update\":{\"installed_version\":262147,\"latest_version\":262146,\"state\":\"idle\"},\"voltage\":2600}'"
[3] "info 2024-03-11 14:08:04: MQTT publish: topic 'zigbee2mqtt/SwitchBadOben', payload '{\"linkquality\":21,\"state\":\"OFF\"}'"
[4] "info 2024-03-11 14:08:22: MQTT publish: topic 'zigbee2mqtt/MotionBadEG', payload '{\"battery\":60,\"battery_low\":false,\"illuminance\":31370,\"illuminance_lux\":1371,\"led_control\":\"fault_only\",\"linkquality\":21,\"occupancy\":false,\"occupancy_timeout\":60,\"tamper\":false,\"temperature\":17.06,\"update\":{\"installed_version\":262147,\"latest_version\":262146,\"state\":\"idle\"},\"voltage\":2800}'"
[5] "info 2024-03-11 14:08:22: MQTT publish: topic 'zigbee2mqtt/SwitchBadEG', payload '{\"linkquality\":21,\"state\":\"OFF\"}'"
[6] "info 2024-03-11 14:08:44: MQTT publish: topic 'zigbee2mqtt/TempKucheUG', payload '{\"battery\":14,\"humidity\":54.87,\"linkquality\":21,\"temperature\":14.41,\"voltage\":2500}'"
I have
data<-c("info 2024-03-11 14:08:01: MQTT publish: topic 'zigbee2mqtt/TempBadEG', payload '{\"battery\":17,\"humidity\":50.1,\"linkquality\":105,\"temperature\":17.25,\"voltage\":2500}'",
"info 2024-03-11 14:08:04: MQTT publish: topic 'zigbee2mqtt/MotionBadOben', payload '{\"battery\":20,\"battery_low\":true,\"illuminance\":37914,\"illuminance_lux\":12369,\"led_control\":\"off\",\"linkquality\":69,\"occupancy\":false,\"occupancy_timeout\":60,\"tamper\":false,\"temperature\":17,\"update\":{\"installed_version\":262147,\"latest_version\":262146,\"state\":\"idle\"},\"voltage\":2600}'",
"info 2024-03-11 14:08:04: MQTT publish: topic 'zigbee2mqtt/SwitchBadOben', payload '{\"linkquality\":21,\"state\":\"OFF\"}'",
"info 2024-03-11 14:08:22: MQTT publish: topic 'zigbee2mqtt/MotionBadEG', payload '{\"battery\":60,\"battery_low\":false,\"illuminance\":31370,\"illuminance_lux\":1371,\"led_control\":\"fault_only\",\"linkquality\":21,\"occupancy\":false,\"occupancy_timeout\":60,\"tamper\":false,\"temperature\":17.06,\"update\":{\"installed_version\":262147,\"latest_version\":262146,\"state\":\"idle\"},\"voltage\":2800}'",
"info 2024-03-11 14:08:22: MQTT publish: topic 'zigbee2mqtt/SwitchBadEG', payload '{\"linkquality\":21,\"state\":\"OFF\"}'",
"info 2024-03-11 14:08:44: MQTT publish: topic 'zigbee2mqtt/TempKucheUG', payload '{\"battery\":14,\"humidity\":54.87,\"linkquality\":21,\"temperature\":14.41,\"voltage\":2500}'"
and I do the following processing :
library(tidyverse)
library(jsonlite)
library(RColorBrewer)
library(ggthemes)
library(furrr)
plan(multisession, workers = availableCores() - 1)
log_data.start.timestamp <- Sys.time()
log_data <- data %>%
enframe(name = NULL, value = "line") %>%
mutate(timestamp = str_extract(line, "\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}"),
topic = str_extract(line, "(?<=topic ')[^']+(?=')"),
payload = str_extract(line, "(?<=payload ')[^']+(?=')")) %>%
mutate(topic = str_remove(topic, "zigbee2mqtt/")) %>%
distinct() %>%
mutate(payload = future_map(payload, ~{
tryCatch({
json <- fromJSON(.x, flatten = TRUE)
# Remove unwanted fields
json$data <- NULL
json$update <- NULL
json$type <- NULL
json$tamper <- NULL
json$power_on_behavior <- NULL
json$battery_low <- NULL
json$occupancy_timeout <- NULL
as_tibble(json)
}, error = function(e) {
tibble(value = .x)
})
})) %>%
select(-line) %>%
mutate(timestamp = as.POSIXct(timestamp, format = "%Y-%m-%d %H:%M:%S")) %>%
unnest(payload) %>%
mutate(date = as.Date(timestamp), time = format(timestamp, "%H:%M:%S")) %>%
distinct() %>%
arrange(timestamp)
Sys.time()-log_data.start.timestamp
and I get
head(log_data)
# A tibble: 6 × 15
timestamp topic battery humidity linkquality temperature voltage illuminance illuminance_lux led_control occupancy state action date time
<dttm> <chr> <int> <dbl> <int> <dbl> <int> <int> <int> <chr> <lgl> <chr> <chr> <date> <chr>
1 2024-03-11 14:08:01 TempBadEG 17 50.1 105 17.2 2500 NA NA NA NA NA NA 2024-03-11 14:08:01
2 2024-03-11 14:08:04 MotionBadOben 20 NA 69 17 2600 37914 12369 off FALSE NA NA 2024-03-11 14:08:04
3 2024-03-11 14:08:04 SwitchBadOben NA NA 21 NA NA NA NA NA NA OFF NA 2024-03-11 14:08:04
4 2024-03-11 14:08:22 MotionBadEG 60 NA 21 17.1 2800 31370 1371 fault_only FALSE NA NA 2024-03-11 14:08:22
5 2024-03-11 14:08:22 SwitchBadEG NA NA 21 NA NA NA NA NA NA OFF NA 2024-03-11 14:08:22
6 2024-03-11 14:08:44 TempKucheUG 14 54.9 21 14.4 2500 NA NA NA NA NA NA 2024-03-11 14:08:44
from there on I can do my analyses. However, for 5 days of logging (5 MB input) despite using furrr i need 20 sec for this step. I suspect that payload processing and json flattening leads to this increased processing times.
I can not imagine how this would scale up if I get a years data. Since the different sensors report different payloads, I need a generic approach that will unflatten the json payload without prespecification of the variables. This also allows to add or change sensors that may report different variables.
Is there a way to make this more efficient and faster?
I didn't benchmark it, but I'd guess all the parallel processing overhead and moving every single (relatively small) JSON to and from a worker costs more than you'd gain from it. If you need to parallelize, perhaps consider somewhat larger chunks, i.e. all daily or at least all hourly records.
Following is just a test of how
jsonlite::stream_in()(with a defualt handler) might perform when combining all json strings to a single nd-json string first.Generated 25000 lines (~5.3MB on disk as text) for testing, took less than 6s to process.
Created on 2024-03-16 with reprex v2.1.0
System details: