Akka websocket - how to close connection by server?

1.9k Views Asked by At

So here is my websocket server implementation.

val route = get {
  pathEndOrSingleSlash {
    handleWebSocketMessages(websocketFlow)
  }
}

def websocketFlow: Flow[Message, Message, Any] =
  Flow[Message]
    .collect { case TextMessage.Strict(textMessage) => protocol.hydrate(textMessage) }
    .via(chatActorFlow(UUID.randomUUID()))
    .map(event => TextMessage.Strict(protocol.serialize(event)))


def chatActorFlow(connectionId: UUID) : Flow[Protocol.Message, Protocol.Event, Any] = {

  val sink = Flow[Protocol.Message]
    .map(msg => Protocol.SignedMessage(connectionId, msg))
    .to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))

  val source = Source
      .mapMaterializedValue {
        actor : ActorRef => {
          chatRef ! Protocol.OpenConnection(actor, connectionId)
        }
      }

  Flow.fromSinkAndSource(sink, source)
}

I'm wondering if there is any way to close connection once message of type ConnectionClosed is sent by chatRef?

1

There are 1 best solutions below

2
On BEST ANSWER

The solution below allows to drop connections from the server side by terminating the Actor materialized by the Source.actorRef stage. This is simply done by sending a PoisonPill to it.

Now, it is still not clear to me how you'd like to identify a "banned" client at connection time, so the example is - on purpose - very simple: the server drops any connection after a maximum amount of clients are connected. If you want to use any other strategy to kick out clients at any time, you can still apply the same logic and send PoisonPill to their own source actors.

object ChatApp extends App {

  implicit val system = ActorSystem("chat")
  implicit val executor: ExecutionContextExecutor = system.dispatcher
  implicit val materializer = ActorMaterializer()

  val route = get {
    pathEndOrSingleSlash {
      handleWebSocketMessages(websocketFlow)
    }
  }

  val maximumClients = 1

  class ChatRef extends Actor {
    override def receive: Receive = withClients(Map.empty[UUID, ActorRef])

    def withClients(clients: Map[UUID, ActorRef]): Receive = {
      case SignedMessage(uuid, msg) => clients.collect{
        case (id, ar) if id == uuid => ar ! msg
      }
      case OpenConnection(ar, uuid) if clients.size == maximumClients => ar ! PoisonPill
      case OpenConnection(ar, uuid) => context.become(withClients(clients.updated(uuid, ar)))
      case CloseConnection(uuid) => context.become(withClients(clients - uuid))
    }
  }

  object Protocol {
    case class SignedMessage(uuid: UUID, msg: String)
    case class OpenConnection(actor: ActorRef, uuid: UUID)
    case class CloseConnection(uuid: UUID)
  }

  val chatRef = system.actorOf(Props[ChatRef])

  def websocketFlow: Flow[Message, Message, Any] =
    Flow[Message]
      .mapAsync(1) {
        case TextMessage.Strict(s) => Future.successful(s)
        case TextMessage.Streamed(s) => s.runFold("")(_ + _)
        case b: BinaryMessage => throw new Exception("Binary message cannot be handled")
      }.via(chatActorFlow(UUID.randomUUID()))
      .map(TextMessage(_))

  def chatActorFlow(connectionId: UUID) : Flow[String, String, Any] = {

    val sink = Flow[String]
      .map(msg => Protocol.SignedMessage(connectionId, msg))
      .to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))

    val source = Source.actorRef(16, OverflowStrategy.fail)
      .mapMaterializedValue {
        actor : ActorRef => {
          chatRef ! Protocol.OpenConnection(actor, connectionId)
        }
      }

    Flow.fromSinkAndSource(sink, source)
  }

  Http().bindAndHandle(route, "0.0.0.0", 8080)
    .map(_ => println(s"Started server..."))

}