How to flatten SCollection[SCollection[SomeType]] into SCollection[SomeType]

271 Views Asked by At

I'm using Beam (and Scio, though feel free to answer this question for PCollections too) to read from multiple tables in BigQuery. Because I'm reading multiple datasets from a dynamically generated list (it is itself an SCollection[String], where the String specifies the table name essentially), I wind up with an SCollection[SCollection[MyCoolDataType]].

Is there any way to flatten (union) these SCollection objects into one? I've tried:

doubleCollection.reduce((col1, col2) => col1.union(col2))

and

sc.unionAll(doubleCollection)

but unfortunately an SCollection is not itself an iterable, so I think I may need to get more creative about mapping elements.

1

There are 1 best solutions below

0
On

Flattening SCollection[SCollection[T]] isn't supported in scio or the underlying beam model.

If you were using FileIO, you could use FileIO.matchAll() followed by FileIO.readMatches() to accept a list of file patterns and then read these into the PCollection.

For BigQuery however this is not currently supported by beam (nor scio). What you can instead do is use scio's taps to materialize the dynamic list and use the result to construct a subsequent step. See example here

val (sc1, _) = ContextAndArgs(cmdlineArgs)
val queries: ClosedTap[String] = 
  sc1.typedBigQuery[Row]()
     .map(_.table_to_query)
     .materialize

val sr: ScioResult = sc1.run().waitUntilDone()
val (sc2, _) = ContextAndArgs(cmdlineArgs)

val queries: List[SCollection[TableRow]] =
  sr.tap(queriesTap)
    .value
    .toList
    .map(q => sc2.bigQuerySelect(Query(q)))

if(queries.nonEmpty) {
  val (head :: tail) = queries
  val unioned: SCollection[TableRow] =
    head.transform("unionQueries") { h =>
      tail.foldLeft(h) { case (t, acc) => acc.union(t) }
    }
}