I have a requirement to create state machine implementation in my spark streaming application. After reading thru some posts found AKKA comes with FSM out of the box. I created a simple AKKA FSM and I am able to run it locally. I am not sure how to integrate this code into spark structured streaming (mapGroupsWithState). I want to take this FSM and plug into mapGroupsWithState so I can store the last state even if my application goes down.
object BuilderFSM {
//States
sealed trait MachineState
case object tuned extends MachineState
case object trick extends MachineState
case object nonTuned extends MachineState
case class TuningEvent(macId: String, acctNumber: String, eventType: String)
}
object InputEvent {
sealed trait tuneEvents
sealed trait trickEvents
case object PLAY extends tuneEvents
case object REPLAY extends trickEvents
case object PLAY1 extends trickEvents
}
class BuilderFSM extends FSM[MachineState, TuningEvent] {
startWith(nonTuned, TuningEvent("DEVICE", "ACCOUNT", "NOTHING"))
when(nonTuned) {
case Event(PLAY, _) => {
println("I am in NON-TUNE -> Going to Tuned")
goto(tuned)
}
case Event(REPLAY, _) => {
println("I am in NON-TUNED -> Going to Tuned")
goto(tuned)
}} initialize()
}