i am using akka timer
i am using twitter streams and i am trying to get the number of tweets in 5 second here is my code
case class PerSecond(tweet:Tweet)
case class TweetPerSecondCount(tweet:Tweet)
class TweetPerSecondActor extends Actor with Timers{
var counter=0
def receive: PartialFunction[Any, Unit] = {
case PerSecond(tweet) =>
log.info("Actor TweetPerSecondActor recevied the message PerSecond")
timers.startPeriodicTimer("perSecond", TweetPerSecondCount(tweet), 5.second)
case TweetPerSecondCount(tweet) =>
log.info("Actor TweetPerSecondActor recevied the message TweetPerSecondCount")
log.info("got the tweet {}",getCounter+"in 5 seconds")
case message =>
log.warn("Actor TweetPerSecondActor: Unhandled message received : {}", message)
unhandled(message)
}
}
in play framework's controller Action i am taking the tweets objects from the twitter stream (continuous stream without stooping it)
class Mycontroller extends Controller {
val tweetPerSecondActor = system.actorOf......//create actor
def tweetAveragePerSecond = Action {
log.debug("in the action tweetAveragePerSecond")
def getTweet: PartialFunction[StreamingMessage, Unit] = {
case tweet: Tweet =>
val future = ask(tweetPerSecondActor, PerSecond(tweet))
}
val streaming: Future[TwitterStream] = handleTwitterStreamClient.getStreamingCLient.sampleStatuses(stall_warnings = true)(getTweet)
Ok("tweet average per second")
}
}
when i hit the route the logs show
TweetPerSecondActor INFO - Actor TweetPerSecondActor recevied the message PerSecond
16:42:44.335 28939 [ArteciateActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Cancel timer [perSecond] with generation [758]
16:42:44.335 28939 [ArteciateActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Start timer [perSecond] with generation [759]
6:42:44.335 28939 [TwitterActorSystem-akka.actor.default-dispatcher-6] TweetPerSecondActor INFO - Actor TweetPerSecondActor recevied the message PerSecond
16:42:44.335 28939 [TwitterActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Cancel timer [perSecond] with generation [758]
16:42:44.335 28939 [TwitterActorSystem-akka.actor.default-dispatcher-5] TimerScheduler DEBUG - Start timer [perSecond] with generation [759]
and if i pass the dummy string values instead of running the stream and passing the twitter object the timer works fine like below case class PerSecond(tweet:String) case class TweetPerSecondCount(tweet:String)
class TweetPerSecondActor extends Actor with Timers{
var counter=0
def receive: PartialFunction[Any, Unit] = {
case PerSecond(tweet) =>
log.info("Actor TweetPerSecondActor recevied the message PerSecond")
timers.startPeriodicTimer("perSecond", TweetPerSecondCount(tweet), 5.second)
case TweetPerSecondCount(tweet) =>
log.info("Actor TweetPerSecondActor recevied the message TweetPerSecondCount")
log.info("got the tweet {}",getCounter+"in 5 seconds")
case message =>
log.warn("Actor TweetPerSecondActor: Unhandled message received : {}", message)
unhandled(message)
}
}
class Mycontroller extends Controller {
val tweetPerSecondActor = system.actorOf......//create actor
def tweetAveragePerSecond = Action {
log.debug("in the action tweetAveragePerSecond")
val future = ask(tweetPerSecondActor, PerSecond("dummy value"))
Ok("tweet average per second")
}
}
Timers in Akka are keyed (in your case the key is
"perSecond") and the API guarantees thatSo every time the
getTweetfunction (given that it's called for effect not for value, I prefer "procedure" but that's perhaps idiosyncratic) is called, the timer will get canceled.Depending on what you're trying to accomplish solutions could include: