I have a large folder storing 10,000 + CSV files. And I have constructed a for loop to read in each file, manipulate it as need via a series sub-setting, calculations then merging. Next I want to write it out as new csv files. After that, I have to make the files into a heavily nested list before writing it into a json file for delivery. The actual csv handling is fairly easy but the list nesting and json writing bit is very very slow. A csv file of 40kb gets nested and spit out the json file is ~ 9MB.
If I ran this in sequence it will take forever therefore I thought about using parallel processing with foreach. However, after reading the posts: enter link description here enter link description here, I'm still confused over the argument:
.combine = cbind
Since all I want is to read and write the file one by one, do I need to care about the
.combine
I am testing my attempt of foreach at the moment, it seems to be running, but it has been running a long time, yet I have not seen files in output, so I have a very bad feeling.
Below are the details: Apologies for not posting my actual scripts, as it is very long and tedious (bad programming skills), but it is basically below:
#find all the csv files I need to process
trades <- list.files(market.data.folder, pattern = "*.csv$", full.names = TRUE)
#standard for loop to iterate through all of them one by one
for (i in 1:length(trades)) {
name <- trades[i]
#read in data
data <- data.table::fread(name, header = TRUE)
#some manipulation of the data using dplyr
daily.5min.data<-
data %>%
filter(InstruStatus == "TRADE") %>%
mutate(Date = lubridate::ymd(Date),
UpdateTime = lubridate::hms(UpdateTime)) %>%
mutate(TimeStamp = paste0(Date, UpdateTime),
TimeStamp = lubridate::ymd_hms(TimeStamp)) %>%
dplyr::relocate(TimeStamp, .before = Date) %>%
mutate(Time.Intervals = cut(TimeStamp, breaks = dropdown)) %>%
dplyr::relocate(Time.Intervals, .before = TimeStamp)
#more manipulation to calculate some numbers
hl.price <-
daily.5min.data %>%
select(c("Date", "Time.Intervals", "TimeStamp","SecurityID",
"PreCloPrice", "OpenPrice" , "HighPrice", "LowPrice","LastPrice","ClosePrice",
"TradVolume", "Turnover")) %>%
group_by_at(c("Date", "Time.Intervals","SecurityID")) %>%
filter(!LastPrice==0) %>%
summarise(across(c("LastPrice" ), list(min = ~ min(., na.rm = TRUE),
max = ~ max(., na.rm = TRUE)))) %>%
rename(HighPrice_max=LastPrice_max,
LowPrice_min = LastPrice_min) %>%
select(c("Date", "Time.Intervals", "SecurityID", "HighPrice_max","LowPrice_min")) %>%
rename_with(~str_remove(., '_min|_max$'))
#write the csv out
write_csv(hl.price, file =
file.path(here::here(),"output","hl.csv")))
#Next def the function to arrange the designed list structure
row_to_list <- function(row) {
return(list(
cs_startTimestamp = row[["cs_startTimestamp"]],
cs_endTimestamp = row[["cs_endTimestamp"]],
open = row[["open"]],
close = row[["close"]],
high = row[["high"]],
low = row[["low"]],
volume = row[["volume"]]))
}
#split the dataframe into lists and map the function to each list to generate the nested lists
updated_nested_lists<-
hl.price %>%
group_split(Date)%>%
map(row_to_list)
#unlist
final_list <- unlist(rapply(updated_nested_lists, unlist, how = "list", recursive = F),
recursive = F)
#write out the final json file
cat(jsonlite::toJSON(final_list, pretty=TRUE,auto_unbox = TRUE),
file = file.path(here::here(), "output",
"15min_marketdata.json"))
}
Now I try to just lump it into the R foreach loop and utilise the computing cluster at work. I launch R studio via OpenOnDemand:
library(doSNOW)
library(foreach)
library(doParallel)
Then
#setup parallel backend to use many processors
cores=detectCores()
cl <- makeCluster(cores[1]-1) #not to overload your computer
registerDoParallel(cl)
#find all the csv files I need to process
trades <- list.files(market.data.folder, pattern = "*.csv$", full.names = TRUE)
#standard foreach loop and include the packages
foreach(i = 1:length(trades), .packages = c("tidyverse",
"data.table",
"lubridate",
"dplyr",
"jsonlite"))%dopar% {
name <- trades[i]
#read in data
data <- data.table::fread(name, header = TRUE)
#some manipulation of the data using dplyr
daily.5min.data<-
data %>%
filter(InstruStatus == "TRADE") %>%
mutate(Date = lubridate::ymd(Date),
UpdateTime = lubridate::hms(UpdateTime)) %>%
mutate(TimeStamp = paste0(Date, UpdateTime),
TimeStamp = lubridate::ymd_hms(TimeStamp)) %>%
dplyr::relocate(TimeStamp, .before = Date) %>%
mutate(Time.Intervals = cut(TimeStamp, breaks = dropdown)) %>%
dplyr::relocate(Time.Intervals, .before = TimeStamp)
#more manipulation to calculate some numbers
hl.price <-
daily.5min.data %>%
select(c("Date", "Time.Intervals", "TimeStamp","SecurityID",
"PreCloPrice", "OpenPrice" , "HighPrice", "LowPrice","LastPrice","ClosePrice",
"TradVolume", "Turnover")) %>%
group_by_at(c("Date", "Time.Intervals","SecurityID")) %>%
filter(!LastPrice==0) %>%
summarise(across(c("LastPrice" ), list(min = ~ min(., na.rm = TRUE),
max = ~ max(., na.rm = TRUE)))) %>%
rename(HighPrice_max=LastPrice_max,
LowPrice_min = LastPrice_min) %>%
select(c("Date", "Time.Intervals", "SecurityID", "HighPrice_max","LowPrice_min")) %>%
rename_with(~str_remove(., '_min|_max$'))
#write the csv out
write_csv(hl.price, file =
file.path(here::here(),"output","hl.csv")))
#Next def the function to arrange the designed list structure
row_to_list <- function(row) {
return(list(
cs_startTimestamp = row[["cs_startTimestamp"]],
cs_endTimestamp = row[["cs_endTimestamp"]],
open = row[["open"]],
close = row[["close"]],
high = row[["high"]],
low = row[["low"]],
volume = row[["volume"]]))
}
#split the dataframe into lists and map the function to each list to generate the nested lists
updated_nested_lists<-
hl.price %>%
group_split(Date)%>%
map(row_to_list)
#unlist
final_list <- unlist(rapply(updated_nested_lists, unlist, how = "list", recursive = F),
recursive = F)
#write out the final json file
cat(jsonlite::toJSON(final_list, pretty=TRUE,auto_unbox = TRUE),
file = file.path(here::here(), "output",
"15min_marketdata.json"))
}