R foreach: run for loop in parallel to read and write out json files

42 Views Asked by At

I have a large folder storing 10,000 + CSV files. And I have constructed a for loop to read in each file, manipulate it as need via a series sub-setting, calculations then merging. Next I want to write it out as new csv files. After that, I have to make the files into a heavily nested list before writing it into a json file for delivery. The actual csv handling is fairly easy but the list nesting and json writing bit is very very slow. A csv file of 40kb gets nested and spit out the json file is ~ 9MB.

If I ran this in sequence it will take forever therefore I thought about using parallel processing with foreach. However, after reading the posts: enter link description here enter link description here, I'm still confused over the argument:

.combine = cbind    

Since all I want is to read and write the file one by one, do I need to care about the

.combine

I am testing my attempt of foreach at the moment, it seems to be running, but it has been running a long time, yet I have not seen files in output, so I have a very bad feeling.

Below are the details: Apologies for not posting my actual scripts, as it is very long and tedious (bad programming skills), but it is basically below:

#find all the csv files I need to process
trades <- list.files(market.data.folder, pattern = "*.csv$", full.names = TRUE)

#standard for loop to iterate through all of them one by one
for (i in 1:length(trades)) {

  name <- trades[i]

  #read in data
  data <- data.table::fread(name, header = TRUE)

  #some manipulation of the data using dplyr
  daily.5min.data<- 
   data %>% 
   filter(InstruStatus == "TRADE") %>% 
   mutate(Date = lubridate::ymd(Date),
          UpdateTime = lubridate::hms(UpdateTime)) %>% 
   mutate(TimeStamp = paste0(Date, UpdateTime),
          TimeStamp = lubridate::ymd_hms(TimeStamp)) %>% 
   dplyr::relocate(TimeStamp, .before = Date) %>% 
   mutate(Time.Intervals = cut(TimeStamp, breaks = dropdown)) %>% 
   dplyr::relocate(Time.Intervals, .before = TimeStamp) 

  #more manipulation to calculate some numbers
  hl.price <- 
      daily.5min.data %>% 
      select(c("Date", "Time.Intervals", "TimeStamp","SecurityID", 
               "PreCloPrice", "OpenPrice" ,  "HighPrice", "LowPrice","LastPrice","ClosePrice",
               "TradVolume", "Turnover")) %>% 
      group_by_at(c("Date", "Time.Intervals","SecurityID")) %>% 
      filter(!LastPrice==0) %>% 
      summarise(across(c("LastPrice" ), list(min = ~ min(., na.rm = TRUE), 
                                            max = ~ max(., na.rm = TRUE)))) %>% 
      rename(HighPrice_max=LastPrice_max,
             LowPrice_min = LastPrice_min) %>% 
      select(c("Date", "Time.Intervals", "SecurityID", "HighPrice_max","LowPrice_min")) %>% 
      rename_with(~str_remove(., '_min|_max$'))

 #write the csv out
 write_csv(hl.price, file =
          file.path(here::here(),"output","hl.csv")))


 #Next def the function to arrange the designed list structure

  row_to_list <- function(row) {
                 return(list(
                 cs_startTimestamp = row[["cs_startTimestamp"]],
                 cs_endTimestamp = row[["cs_endTimestamp"]],
                 open = row[["open"]],
                 close = row[["close"]],
                 high = row[["high"]],
                 low = row[["low"]],
                 volume = row[["volume"]]))
 }

 #split the dataframe into lists and map the function to each list to generate the nested lists

 updated_nested_lists<-
 hl.price %>% 
    group_split(Date)%>% 
    map(row_to_list)

#unlist
final_list <- unlist(rapply(updated_nested_lists, unlist, how = "list", recursive = F), 
              recursive = F)

#write out the final json file
cat(jsonlite::toJSON(final_list, pretty=TRUE,auto_unbox = TRUE),
    file = file.path(here::here(), "output",
          "15min_marketdata.json"))

}

Now I try to just lump it into the R foreach loop and utilise the computing cluster at work. I launch R studio via OpenOnDemand:

library(doSNOW)
library(foreach)
library(doParallel)

Then

#setup parallel backend to use many processors
cores=detectCores()

cl <- makeCluster(cores[1]-1) #not to overload your computer

registerDoParallel(cl)



#find all the csv files I need to process
trades <- list.files(market.data.folder, pattern = "*.csv$", full.names = TRUE)

#standard foreach loop and include the packages

foreach(i = 1:length(trades), .packages = c("tidyverse",
                                        "data.table",
                                        "lubridate",
                                        "dplyr",
                                        "jsonlite"))%dopar% {

  name <- trades[i]

  #read in data
  data <- data.table::fread(name, header = TRUE)

  #some manipulation of the data using dplyr
  daily.5min.data<- 
   data %>% 
   filter(InstruStatus == "TRADE") %>% 
   mutate(Date = lubridate::ymd(Date),
          UpdateTime = lubridate::hms(UpdateTime)) %>% 
   mutate(TimeStamp = paste0(Date, UpdateTime),
          TimeStamp = lubridate::ymd_hms(TimeStamp)) %>% 
   dplyr::relocate(TimeStamp, .before = Date) %>% 
   mutate(Time.Intervals = cut(TimeStamp, breaks = dropdown)) %>% 
   dplyr::relocate(Time.Intervals, .before = TimeStamp) 

  #more manipulation to calculate some numbers
  hl.price <- 
      daily.5min.data %>% 
      select(c("Date", "Time.Intervals", "TimeStamp","SecurityID", 
               "PreCloPrice", "OpenPrice" ,  "HighPrice", "LowPrice","LastPrice","ClosePrice",
               "TradVolume", "Turnover")) %>% 
      group_by_at(c("Date", "Time.Intervals","SecurityID")) %>% 
      filter(!LastPrice==0) %>% 
      summarise(across(c("LastPrice" ), list(min = ~ min(., na.rm = TRUE), 
                                            max = ~ max(., na.rm = TRUE)))) %>% 
      rename(HighPrice_max=LastPrice_max,
             LowPrice_min = LastPrice_min) %>% 
      select(c("Date", "Time.Intervals", "SecurityID", "HighPrice_max","LowPrice_min")) %>% 
      rename_with(~str_remove(., '_min|_max$'))

 #write the csv out
 write_csv(hl.price, file =
          file.path(here::here(),"output","hl.csv")))


 #Next def the function to arrange the designed list structure

  row_to_list <- function(row) {
                 return(list(
                 cs_startTimestamp = row[["cs_startTimestamp"]],
                 cs_endTimestamp = row[["cs_endTimestamp"]],
                 open = row[["open"]],
                 close = row[["close"]],
                 high = row[["high"]],
                 low = row[["low"]],
                 volume = row[["volume"]]))
 }

 #split the dataframe into lists and map the function to each list to generate the nested lists

 updated_nested_lists<-
 hl.price %>% 
    group_split(Date)%>% 
    map(row_to_list)

#unlist
final_list <- unlist(rapply(updated_nested_lists, unlist, how = "list", recursive = F), 
              recursive = F)

#write out the final json file
cat(jsonlite::toJSON(final_list, pretty=TRUE,auto_unbox = TRUE),
    file = file.path(here::here(), "output",
          "15min_marketdata.json"))

}
0

There are 0 best solutions below