I created 3 different versions of a data processing mini pipeline. One with Scala Views, one with FS2 and the one with ZIO Streams. The View and FS2 implementation both run and finishing pretty quickly (FS2 being much faster). However, my ZIO implementation compiles and runs but doesn't finish. I was wondering if someone has an idea why it doesn't finish (The data is about 900MB).
ZIO Implementation (compiles and runs but does not complete) :
`
val zeroTupple = (0.0, 0.0)
val monoid = Monoid[(Double, Double)]
val file = "src/main/FlightData/2018.csv"
val parseLine: String => SpeedRow = str =>
castToSpeedRow(str.split(",").map(_.trim).toVector) //Used in FS2 implementation
val parseCSV =
ZPipeline.utf8Decode >>>
ZPipeline.splitLines >>>
ZPipeline.drop(1) >>> //Dropping the header
ZPipeline.map(parseLine(_))
val filterSpeedRows
: ZPipeline[Any, CharacterCodingException, SpeedRow, SpeedRow] =
ZPipeline.filter(p => p.airtime.nonEmpty && p.distance.nonEmpty)
def getStream(filename: String) = ZStream.fromFileName(filename)
val transformationPipe
: ZPipeline[Any, Throwable, SpeedRow, (Double, Double)] =
ZPipeline.map(sr => (sr.distance.get, sr.airtime.get))
val groupedStream =
getStream(file)
.via(parseCSV >>> filterSpeedRows)
.groupBy(sr =>
ZIO.succeed {
((sr.origin, sr.destination), sr)
}
) { (k, s) =>
ZStream.fromZIO {
s.via(transformationPipe)
.runFold(zeroTupple)((sum, value) =>
monoid.combine(sum, value)
)
//.map(values => k -> values)
}
}
override def run: ZIO[Any & (ZIOAppArgs & Scope), Any, Any] =
groupedStream.run(ZSink.collectAll)`
FS2 Implementation as comparison (runs fine):
`
val file = "src/main/FlightData/2018.csv"
def stringToRows(string: String): SpeedRow =
val result = castToSpeedRow(string.split(",").map(_.trim).toVector)
result
def getFileStream(path: String): Stream[IO, SpeedRow] =
val filePath = Path(path)
Files[IO].readUtf8Lines(filePath).drop(1).dropLast.map(stringToRows)
def processStream(stream: Stream[IO, SpeedRow]) =
val monoidDD = Monoid[(Double, Double)]
val filtered =
stream.filter(p => p.airtime != None && p.distance != None)
val grouped = filtered.fold(
Map.empty[(String, String), List[(Double, Double)]]
)((m, sr) =>
m + (
(sr.origin, sr.destination) -> ((
sr.distance.get,
sr.airtime.get
) :: m
.getOrElse((sr.origin, sr.destination), Nil))
)
)
val foldedResult = grouped
.map(m => m.mapValues(e => monoidDD.combineAll(e)))
.map(m => m.toMap)
foldedResult
override def run: IO[Unit] =
val streamOfData =
processStream(getFileStream(file)).compile.toVector
streamOfData.myDebug.void`