We have a PubSub topic with events sinking into BigQuery (though particular DB is almost irrelevant here). Events can come with new unknown properties that eventually should end up as separate BigQuery columns.
So, basically I have two questions here:
- What is the right way for maintaining a global state within Pipeline (with set of encountered properties in my case)?
- What would be a good strategy for buffering/holding stream of events as soon as new property is encountered and until
ALTER TABLE
is executed
Right now I tried to use following (I'm using Spotify scio):
rows
.withFixedWindows(Duration.millis(duration))
.withWindow[IntervalWindow]
.swap
.groupByKey
.map { case (window, rowsIterable) =>
val newRows = findNewProperties(rowsIterable)
mutateTableWith(newRows)
rowsIterable
}
.flatMap(id)
.saveAsBigQuery()
But this is terribly inefficient, as we at least need to load whole rowsIterable
into memory and even traverse it.
We're building the very same project and we're following this approach with a refreshing side input containing the schemas (refreshed at intervals from BQ). So basically:
I have an example of a job with that refreshing side input approach here