how to remove the filtered lines from the current streaming file using fs2 and get the count of filtered lines as the return type?
ex: If the old.txt contains strings separated by a newline (\n):
john
sam
chen
yval
....
and val myList = List("chen","yval").
def converter[F[_]](implicit F: Sync[F]): F[Unit] =
io.file.readAll[F](Paths.get("testdata/old.txt"), 4096)
.through(text.utf8Decode)
.through(text.lines)
.filter(s => myList.contains(s))//remove this from the old file and write to new file
.intersperse("\n")
.through(text.utf8Encode)
.through(io.file.writeAll(Paths.get("testdata/new.txt")))
.compile.drain
// at the end of the universe...
val u: Unit = converter[IO].unsafeRunSync()
You can use the observe method of the
Streamclass.You are looking for a function
def converter[F[_]: Sync]: F[Int], which produces a computationF[Int]whose result (of typeInt) is the number of filtered lines, and whose effect is to write those lines to the output file. To follow a plumbing analogy, you want to feed your filtered stream to two outputs, one for the result, and one for the effect. You can do this using the function observe, defined asA
Sink[F,O]is an alias for a functionStream[F, O] => Stream[F, Unit]. In your case, the sink is the part of your code that writes your filtered stream into the output file:The other output is to reduce, or rather fold,
Note: for this solution you need to restrict the type-class of
Fto beEffect, and you need to use anExecutionContext. Thefoldis defined in theToEffectclass.