Data manipulation with ZIO Stream -- Compiles and runs but does not finish

208 Views Asked by At

I created 3 different versions of a data processing mini pipeline. One with Scala Views, one with FS2 and the one with ZIO Streams. The View and FS2 implementation both run and finishing pretty quickly (FS2 being much faster). However, my ZIO implementation compiles and runs but doesn't finish. I was wondering if someone has an idea why it doesn't finish (The data is about 900MB).

ZIO Implementation (compiles and runs but does not complete) :

`

val zeroTupple = (0.0, 0.0)
val monoid = Monoid[(Double, Double)]

val file = "src/main/FlightData/2018.csv"

val parseLine: String => SpeedRow = str =>
    castToSpeedRow(str.split(",").map(_.trim).toVector) //Used in FS2 implementation

val parseCSV =
    ZPipeline.utf8Decode >>>
        ZPipeline.splitLines >>>
        ZPipeline.drop(1) >>>   //Dropping the header
        ZPipeline.map(parseLine(_))

val filterSpeedRows
    : ZPipeline[Any, CharacterCodingException, SpeedRow, SpeedRow] =
    ZPipeline.filter(p => p.airtime.nonEmpty && p.distance.nonEmpty)

def getStream(filename: String) = ZStream.fromFileName(filename)

val transformationPipe
    : ZPipeline[Any, Throwable, SpeedRow, (Double, Double)] =
    ZPipeline.map(sr => (sr.distance.get, sr.airtime.get))

val groupedStream =
    getStream(file)
        .via(parseCSV >>> filterSpeedRows)
        .groupBy(sr =>
            ZIO.succeed {
                ((sr.origin, sr.destination), sr)
            }
        ) { (k, s) =>
            ZStream.fromZIO {
                s.via(transformationPipe)
                    .runFold(zeroTupple)((sum, value) =>
                        monoid.combine(sum, value)
                    )
                    //.map(values => k -> values)
            }
        }

override def run: ZIO[Any & (ZIOAppArgs & Scope), Any, Any] =
    groupedStream.run(ZSink.collectAll)`

FS2 Implementation as comparison (runs fine):

`

val file = "src/main/FlightData/2018.csv"

def stringToRows(string: String): SpeedRow =
    val result = castToSpeedRow(string.split(",").map(_.trim).toVector)
    result

def getFileStream(path: String): Stream[IO, SpeedRow] =
    val filePath = Path(path)
    Files[IO].readUtf8Lines(filePath).drop(1).dropLast.map(stringToRows)

def processStream(stream: Stream[IO, SpeedRow]) =
    val monoidDD = Monoid[(Double, Double)]
    val filtered =
        stream.filter(p => p.airtime != None && p.distance != None)
    val grouped = filtered.fold(
      Map.empty[(String, String), List[(Double, Double)]]
    )((m, sr) =>
        m + (
          (sr.origin, sr.destination) -> ((
            sr.distance.get,
            sr.airtime.get
          ) :: m
              .getOrElse((sr.origin, sr.destination), Nil))
        )
    )
    val foldedResult = grouped
        .map(m => m.mapValues(e => monoidDD.combineAll(e)))
        .map(m => m.toMap)
    foldedResult

override def run: IO[Unit] =
    val streamOfData =
        processStream(getFileStream(file)).compile.toVector
    streamOfData.myDebug.void`
0

There are 0 best solutions below