I would like to be able to send data from a scalaz stream into an external program and then get the result of that item back in about 100ms in the future. Although I was able to do this with the code below by zipping the output stream Sink
with the input stream Process
and then throwing away the Sink
side effect, I feel like this solution may be very brittle.
If the external program has an error for one of the input items everything will be out of sync. I feel like the best bet would be to send some sort of incremental ID into the external program which it can echo back out in the future so that if an error occurs we can resync.
The main trouble I am having is joining together the result of sending data into the external program Process[Task, Unit]
with the output of the program Process[Task, String]
. I feel like I should be using something from wyn
but not really sure.
import java.io.PrintStream
import scalaz._
import scalaz.concurrent.Task
import scalaz.stream.Process._
import scalaz.stream._
object Main extends App {
/*
# echo.sh just prints to stdout what it gets on stdin
while read line; do
sleep 0.1
echo $line
done
*/
val p: java.lang.Process = Runtime.getRuntime.exec("/path/to/echo.sh")
val source: Process[Task, String] = Process.repeatEval(Task{
Thread.sleep(1000)
System.currentTimeMillis().toString
})
val linesR: stream.Process[Task, String] = stream.io.linesR(p.getInputStream)
val printLines: Sink[Task, String] = stream.io.printLines(new PrintStream(p.getOutputStream))
val in: Process[Task, Unit] = source to printLines
val zip: Process[Task, (Unit, String)] = in.zip(linesR)
val out: Process[Task, String] = zip.map(_._2) observe stream.io.stdOutLines
out.run.run
}
After delving a little deeper into the more advanced types. It looks like
Exchange
does exactly what I want.