So here is my websocket server implementation.
val route = get {
pathEndOrSingleSlash {
handleWebSocketMessages(websocketFlow)
}
}
def websocketFlow: Flow[Message, Message, Any] =
Flow[Message]
.collect { case TextMessage.Strict(textMessage) => protocol.hydrate(textMessage) }
.via(chatActorFlow(UUID.randomUUID()))
.map(event => TextMessage.Strict(protocol.serialize(event)))
def chatActorFlow(connectionId: UUID) : Flow[Protocol.Message, Protocol.Event, Any] = {
val sink = Flow[Protocol.Message]
.map(msg => Protocol.SignedMessage(connectionId, msg))
.to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId)))
val source = Source
.mapMaterializedValue {
actor : ActorRef => {
chatRef ! Protocol.OpenConnection(actor, connectionId)
}
}
Flow.fromSinkAndSource(sink, source)
}
I'm wondering if there is any way to close connection once message of type ConnectionClosed
is sent by chatRef
?
The solution below allows to drop connections from the server side by terminating the Actor materialized by the
Source.actorRef
stage. This is simply done by sending aPoisonPill
to it.Now, it is still not clear to me how you'd like to identify a "banned" client at connection time, so the example is - on purpose - very simple: the server drops any connection after a maximum amount of clients are connected. If you want to use any other strategy to kick out clients at any time, you can still apply the same logic and send
PoisonPill
to their own source actors.