I have a case where my client opens a WebSocket connection upon which I subscribe to a MQTT topic and fetch the messages that I need to publish to this client. It could be that in-between the client disconnects and my message that I need to send to this client is not available anymore. So I re-write this message back to MQTT. Since I use Play Framework for my WebSocket server, I have an Actor that handles it. Here it is:
class OCPPActor(sink: ActorRef, chargingStationId: String, isPersistentConn: Boolean = false) extends Actor with Timers {
// This will schedule to send the KeepAlive for WebSocket connections
timers.startTimerWithFixedDelay("KeepAliveKey", "KeepAlive", 50.seconds)
// Subscribe to dead letters to handle message sending failures
context.system.eventStream.subscribe(self, classOf[DeadLetter])
private val mqttConfig = bindings.appConfig.mqttConfig
override def receive: Receive = {
case jsValue: JsValue =>
logger.info(s"Received OCPPCallRequest: \n ${Json.prettyPrint(jsValue)}")
jsValue.validate[OCPPCallRequest].asEither match {
case Right(ocppCall) => handleOCPPCallRequest(chargingStationId, ocppCall).onComplete {
case Failure(fail) => sink ! JsError(s"${fail.getMessage}")
case Success(succ) => sink ! Json.toJson(succ)
}
case Left(errors) =>
logger.error(s"Errors occurred when validating OCPPCallRequest: \n $errors")
sink ! Json.toJson(s"error -> ${errors.head._2}") // TODO: Work on this issue here on how we want to propagate errors
}
// For case where we pickup messages from the MQTT to be sent to the CS
case csmsMessage: MqttCSMSMessage =>
logger.info(s"Received CSMSMessage:\n${Json.prettyPrint(Json.toJson(csmsMessage))}")
// Attempt to send CSMSMessage to the Charging Station
sink ! csmsMessage
case DeadLetter(msg, _, _) if msg.isInstanceOf[MqttCSMSMessage] =>
logger.error("Failed to send CSMSMessage due to CS disconnection")
val str = msg.asInstanceOf[MqttCSMSMessage].toString // TODO: This should be a Json String
MqttClientFactory.publish(mqttConfig, chargingStationId, MqttQos.EXACTLY_ONCE, str)
case "KeepAlive" =>
logger.info("Received message KeepAlive .........")
// TODO: How to deal with msgTypeId. I just choose a random value
sink ! Json.toJson(heartbeatResponse(2,"HeartbeatRequest"))
case msg: Any =>
logger.warn(s"Received unknown message ${msg.getClass.getTypeName} that cannot be handled, " +
s"eagerly closing websocket connection")
timers.cancel("KeepAliveKey")
self ! PoisonPill
}
}
As you can see that the case DeadLetter handles the scenarios where I know that the write to the client via the open WebSocket fails. Is this a good approach? What pitfalls could I expect?