I am trying to build a simple producer-consumer buffer problem using Skala Typed Actors. The code I am trying to run is:

package main

import akka.actor._

trait ProducerBehavior {
  def setBuffer(b:BufferBehavior)
  def run
}
trait ConsumerBehavior {
  def setBuffer(b:BufferBehavior)
  def run
}
trait BufferBehavior {
  def put(i:Int):Unit  //fire-forget
  def get():Int  //blocking send-request-reply
}

class Producer() extends ProducerBehavior {
  var buf:BufferBehavior = null
  def setBuffer(b:BufferBehavior) = buf = b 
  var i:Int = 1
  def run : Unit = {
    buf.put(i)
    i = i+1
    if(i<10)
      run
  }
}

class Consumer() extends ConsumerBehavior {
  var buf:BufferBehavior = null
  def setBuffer(b:BufferBehavior) = buf = b
  var sum:Int = 0
  @throws(classOf[Exception])
  def run : Unit = {
    if(buf == null) {
      println("Yes buf is null")
      throw new Exception
    }
    val j = buf.get()
    sum = sum + j
    println(sum)
    run
  }
}

class Buffer extends BufferBehavior {
  var value:Int = 0
  var full:Boolean = false
  def put(x:Int) : Unit  = {
    while(full) {
    }
    value = x
    full = true
  }
  def get() : Int = {
    while(!full) {
    }
    full = false
    value
  }
}

object CreolExampleUsingTypedActors extends App {
  val system = ActorSystem("creolTypedActors")
  val buffer: BufferBehavior = TypedActor(system).typedActorOf(TypedProps[Buffer]())

  val p1: ProducerBehavior = TypedActor(system).typedActorOf(TypedProps[Producer]())
  val c1: ConsumerBehavior = TypedActor(system).typedActorOf(TypedProps[Consumer]())
  p1.setBuffer(buffer)
  c1.setBuffer(buffer)
  p1.run
//  p2.run
  c1.run
}

Complete stack trace is:

[ERROR] [08/09/2016 13:50:35.965] [creolTypedActors-akka.actor.default-dispatcher-5] [akka://creolTypedActors/user/$c] null
java.lang.reflect.UndeclaredThrowableException
    at com.sun.proxy.$Proxy0.get(Unknown Source)
    at main.Consumer.run(CreolExampleUsingTypedActors.scala:40)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at akka.actor.TypedActor$MethodCall.apply(TypedActor.scala:145)
    at akka.actor.TypedActor$TypedActor$$anonfun$receive$1$$anonfun$applyOrElse$2.apply(TypedActor.scala:308)
    at akka.actor.TypedActor$TypedActor.withContext(TypedActor.scala:300)
    at akka.actor.TypedActor$TypedActor$$anonfun$receive$1.applyOrElse(TypedActor.scala:307)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:480)
    at akka.actor.TypedActor$TypedActor.aroundReceive(TypedActor.scala:247)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:525)
    at akka.actor.ActorCell.invoke(ActorCell.scala:494)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
    at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
    at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
    at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
    at scala.concurrent.Await$.result(package.scala:190)
    at akka.actor.TypedActor$TypedActorInvocationHandler.invoke(TypedActor.scala:434)
    ... 21 more

What is the workflow here? The code creates 3 typed-actors first: Producer, Consumer and Buffer. Producer and Consumer try to communicate via Buffer object. The buffer can store only one value at a time. Producer can't put new value unless previous value is "consumed" by the Consumer and vice versa.

0

There are 0 best solutions below