How to process large amounts of data in scala fs2?

463 Views Asked by At

We have a Scala utility which reads data from database and then writes data to a text file in a csv format, using fs2 library. Then it does some processing on few columns and create the final file. So it is a 2 step process.

  1. Read the data from db and create a data_tmp csv file.
  2. Process few columns from _tmp file and create final file data_final csv file.

We use code similar to at link: https://levelup.gitconnected.com/how-to-write-data-processing-application-in-fs2-2b6f84e3939c

Stream.resource(Blocker[IO]).flatMap {  blocker =>
  val inResource = getClass.getResource(in) // data_tmp file location
  val outResource = getClass.getResource(out) // data_final file location
  io.file
    .readAll[IO](Paths.get(inResource.toURI), blocker, 4096)
    .through(text.utf8Decode)
    .through(text.lines)
    ..... // our processing logic here
    .through(text.utf8Encode)
    .through(io.file.writeAll(Paths.get(outResource.toURI), blocker))
  
}

Till now, this used to work as we did not have more than 5k records.

Now we have a new requirement where we expect the data from query to db to be in range of 50k to 1000k.

So we want to create multiple data_final files like data_final_1, data_final_2, ... and so on.

Each output file should not be more than a specific size, let's say 2 MB.

So the data_final should be created in chunks of 2 MB.

How can I modify the above code snippet so that we can create multiple output files from single large data_tmp csv file?

0

There are 0 best solutions below