I wrote a little data processing pipeline with FS2 (my first try). It works but is pretty clunky. The input data are flight records and the output is a Map. A recommendation would be great (Besides creating new data types so that I don't have to sum tuples). Also any recommendations on performance optimization?
This is my code below
import cats.effect.{IO, IOApp}
import fs2.{Stream, text}
import fs2.io.file.{Files, Path}
import Utils._
import cats.Semigroup
import cats.syntax.all._
import cats.instances.map._
import cats.instances.vector._
import fs2.Chunk
object FS2Implementation extends IOApp.Simple {
case class SpeedRow(
recordDate: Date,
origin: String,
destination: String,
distance: Option[Double],
airtime: Option[Double]
)
val file = "src/main/FlightData/2018.csv"
def stringToRows(string: String): SpeedRow = {
val result = castToSpeedRow(string.split(",").map(_.trim).toVector)
result
}
def getFileStream(path: String): Stream[IO, SpeedRow] =
val filePath = Path(path)
Files[IO].readUtf8Lines(filePath).drop(1).dropLast.map(stringToRows)
def processStream(stream: Stream[IO, SpeedRow]) = {
def createMapFromRow(
row: SpeedRow
): Map[(String, String), Vector[(Double, Double)]] = Map(
(row.origin, row.destination) -> Vector(
(row.airtime.get, row.distance.get)
)
)
val filteredStream =
stream.filter(p => p.airtime != None && p.distance != None)
val groupedBy = filteredStream
.chunkN(10000, true)
.map(c =>
c.foldLeft(Map[(String, String), Vector[(Double, Double)]]())(
(m, row) => m |+| createMapFromRow(row)
)
)
groupedBy
}
override def run: IO[Unit] =
val emptyMap = Map[(String, String), Vector[(Double, Double)]]()
val vectorOfMaps =
processStream(getFileStream(file)).compile.toVector
val elem2 = for
elem <- vectorOfMaps
map <- IO(elem.foldLeft(emptyMap)(_ |+| _))
mapValues <- IO(
map.mapValues(w =>
w.foldLeft((0.0, 0.0))((l, r) => (l._1 + r._1, l._2 + r._2))
)
)
yield println(mapValues.toMap.size)
elem2
}
def castToSpeedRow(row: Vector[String]): SpeedRow =
val format = SimpleDateFormat("yyyy-MM-dd")
val date = format.parse(row(0))
val origin = row(3)
val destination = row(4)
val airTime = row(20).toDoubleOption
val distance = row(21).toDoubleOption
SpeedRow(date, origin, destination, distance, airTime)`