Applying custom functions to parquet files loaded with open_dataset

79 Views Asked by At

Apologies if this has been applied somewhere else, but I haven't been able to find a solution that works.

I have some data, spread across 4232 parquet files, that aggregate into roughly 6745697 rows and 30486 columns once the schema is fully parsed via open_dataset.

The rows need to be collapsed into participant-level summaries, with roughly 3k unique values after grouping.

Some columns are all demographic data (mixtures of strings, integers, and booleans) that are mostly repeated across participants. As such, I decided to just collapse these values across the most frequent occurrences in these variables.

The workflow looks something like this:

  mode <- function(v) {
    uniqv <- unique(v)
    uniqv[which.max(tabulate(match(v, uniqv)))]
  }

DOI <- open_dataset(sources = *<path_to_directory_of_parquet_files>*, unify_schemas = TRUE)

Demographics <- DOI |> 
  filter(treatment_flag == 0, comparator_flag == 0) |> 
  group_by(Participant) |> 
  summarize(across(DOI$schema$names[!grepl("VOI_1|VOI_2|Participant", DOI$schema$names)],mode)) |> 
  collect() |> 
  as.data.frame()

But throws the following error:

Error: Error in summarize_eval(names(exprs)[i], exprs[[i]], ctx, length(.data$group_by_vars) >  : 
  Expression mode(Dataset) is not an aggregate expression or is not supported in Arrow
Call collect() first to pull data into R.

Based on other threads, I assume this is due to the limited number of backends which can be passed through the arrow functions:

Is there a way this can be accomplished via map, apply, or summarize that can be utilized without first calling the data via collect() I am missing? Or is it locked to the packages arrow can utilize?

Thank you in advance.

1

There are 1 best solutions below

0
On

I don't know when window functions will be supported or when row_number() will be usable. Until then, here's a hack that will get you the numbers you want. How efficient and acceptable it is depends on the size of your data, the number of discrete values within each variable, and how much you're willing to pull in at a time. Up front, instead of pulling in 1 row per group, we're pulling in max(n_distinct(your_variables)) per group.

I'll use mtcars, and grouping by cyl, I'll count the other discrete variables vs, am, gear, and carb. Since you're determining the names by greping on the DOI$schema$names, I'll do the same.

I will first confirm what we expect (by collecting early):

mode <- function(v) {
  uniqv <- unique(v)
  uniqv[which.max(tabulate(match(v, uniqv)))]
}

library(dplyr)
DOI <- arrow::arrow_table(mtcars)
collect(DOI) %>%
  # filter(...) %>%
  # whatever you need here
  group_by(cyl) %>%
  summarize(across(grep("vs|am|gear|carb", DOI$schema$names, value=TRUE), ~ mode(.)))
# # A tibble: 3 × 5
#     cyl    vs    am  gear  carb
#   <dbl> <dbl> <dbl> <dbl> <dbl>
# 1     4     1     1     4     2
# 2     6     1     0     4     4
# 3     8     0     0     3     4

My proposal iterates over each of the variables you need, counts them (per group), and pulls the counts themselves, after which we can filter on the max-count (most-frequent).

lapply(grep("vs|am|gear|carb", DOI$schema$names, value=TRUE), function(nm) {
  nm <- sym(nm)
  DOI %>% 
    # filter(...) %>% # whatever you need here
    group_by(cyl) %>%
    count({{ nm }}) %>%
    collect() %>%
    slice_max(n = 1, order_by = n) %>%
    select(-n) %>%
    ungroup()
}) |>
  Reduce(function(a, b) left_join(a, b, by = "cyl"), x = _)
# # A tibble: 3 × 5
#     cyl    vs    am  gear  carb
#   <dbl> <dbl> <dbl> <dbl> <dbl>
# 1     4     1     1     4     2
# 2     6     1     0     4     4
# 3     8     0     0     3     4

You may want/need slice_max(..., with_ties=FALSE), depending on your tolerance for either (a) more than 1 row per group, or (b) not knowing there are multiple equal-count levels.