Why is there no output in Scala + Akka Typed project?

63 Views Asked by At

The app is pretend to be a simple console chat, but it's not working and I don't know why. Please, tell me where I'm wrong

RoomActor:

package Chat

import Chat.UserActor.{JoinRoomNotification, UserCommand}
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}

import java.time.LocalDateTime
import scala.collection.mutable

object RoomActor {
    sealed trait RoomCommand

    case class CreateRoom(user: ActorRef[UserCommand], roomName: String) extends RoomCommand
    case class LeaveRoom(user: ActorRef[UserCommand], roomName: String) extends RoomCommand
    case class JoinRoom(user: ActorRef[UserCommand], roomName: String) extends RoomCommand
    case class BroadcastMessage(user: ActorRef[UserCommand], content: String) extends RoomCommand
    case class Room(roomName: String, owner: ActorRef[UserCommand], creationDate: LocalDateTime, users: mutable.Set[ActorRef[UserCommand]])

    def apply(): Behavior[RoomCommand] = roomBehavior(Map.empty)

    private def roomBehavior(rooms: Map[String, Room]): Behavior[RoomCommand] =
        Behaviors.receive { (context, message) =>
            message match {
                case CreateRoom(user, roomName) =>
                    val room = Room(roomName, user, LocalDateTime.now(), mutable.Set(user))
                    context.log.info(s"$user created room: $roomName")
                    roomBehavior(rooms + (roomName -> room))
                case LeaveRoom(user, roomName) =>
                    rooms.get(roomName).foreach { room =>
                        room.users -= user
                        if(user == room.owner) {
                            if(room.users.isEmpty) {
                                context.log.info(s"$user was the owner and the last user in room: $roomName. Removing the room.")
                                roomBehavior(rooms - roomName)
                            } else {
                                val newOwner = room.users.head
                                context.log.info(s"$user was the owner. Assigning the new owner: $newOwner")
                                val updatedRoom = room.copy(owner = newOwner)
                                roomBehavior(rooms + (roomName -> updatedRoom))
                            }
                        } else {
                            context.log.info(s"$user left the room: $roomName")
                            for (elem <- room.users) {
                                elem.tell(UserActor.LeftRoomNotification(user, roomName))
                            }
                            Behaviors.same
                        }
                    }
                    context.log.info(s"$user left room: $roomName")
                    Behaviors.same
                case JoinRoom(user, roomName) =>
                    rooms.get(roomName).foreach {room =>
                        room.users += user
                        user ! JoinRoomNotification(user, roomName)
                    }
                    Behaviors.same

                case BroadcastMessage(sender, content) =>
                    rooms.values.foreach{room =>
                        room.users.foreach{user =>
                            if(user != sender) {
                                user ! UserActor.ReceiveMessage(sender, content)
                            }
                        }
                    }
                    context.log.info(s"Broadcast message from $sender: $content")
                    Behaviors.same

                case _ =>
                    throw new IllegalArgumentException("Unknown message for RoomActor")
                    Behaviors.same
            }
        }
}

UserActor:

package Chat

import Chat.RoomActor.RoomCommand
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors

import java.util.UUID

object UserActor {
    sealed trait UserCommand
    case class CreateUser(name: String, surname: String, age: Int, userActor: ActorRef[UserCommand]) extends UserCommand
    case class User(name: String, surname: String, age: Int, userId: String, userActor: ActorRef[UserCommand])
    case class SendMessageToRoom(room: ActorRef[RoomCommand], content: String, userName: String) extends UserCommand
    case class ReceiveMessage(sender: ActorRef[UserCommand], content: String) extends UserCommand

    case class LeftRoomNotification(user: ActorRef[UserCommand], roomName: String) extends UserCommand
    case class JoinRoomNotification(user: ActorRef[UserCommand], roomName: String) extends UserCommand

    def apply(): Behavior[UserCommand] = userBehavior(Map.empty)

    private def userBehavior(users: Map[String, User]): Behavior[UserCommand] =
        Behaviors.receive{(context, message) =>
            message match {
                case ReceiveMessage(sender, content) =>
                    context.log.info(s"User ${context.self} received message from $sender: $content")
                    Behaviors.same
                case SendMessageToRoom(room, content, userName) =>
                    users.get(userName) match {
                        case Some(user) =>
                            room ! RoomActor.BroadcastMessage(user.userActor, content)
                        case None =>
                            throw new Exception(s"Unknown User $userName!")
                    }
                    Behaviors.same
                case CreateUser(name, surname, age, userActor) =>
                    val userId = UUID.randomUUID().toString
                    val newUser = User(name, surname, age, userId, userActor)
                    context.log.info(s"User $newUser added in user list")
                    context.log.info(s"Created user: $userId")
                    users + (userId -> newUser)
                    Behaviors.same
                case LeftRoomNotification(user, roomName) =>
                    context.log.info(s"User $user received LeftRoomNotification for room: $roomName")
                    Behaviors.same
                case JoinRoomNotification(user, roomName) =>
                    context.log.info(s"User $user received JoinRoomNotification for room: $roomName")
                    Behaviors.same
                case _ =>
                    context.log.warn("Unknown message for UserActor")
                    Behaviors.same
            }
        }
}

Main:

package Chat

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.AskPattern._
import akka.util.Timeout

import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.language.postfixOps

object Main {
    def main(args: Array[String]): Unit = {
        implicit val timeout: Timeout = 3 seconds

        sealed trait MainActor

        val guardianActor: Behavior[Any] = Behaviors.receive { (context, message) =>
            val userActor = context.spawn(UserActor(), "userActor")
            val roomActor = context.spawn(RoomActor(), "roomActor")
            implicit val ec: ExecutionContext = context.executionContext
            implicit val scheduler = context.system.scheduler
            implicit val timeout: Timeout = Timeout(3.seconds)

            message match {
                case _ =>
                    // Send a message to userActor to create a user
                    val newUser = userActor.ask(UserActor.CreateUser("John", "Doe", 30, _))
                    newUser.onComplete { _ =>
                        roomActor ! RoomActor.CreateRoom(userActor, "General")

                        userActor ! UserActor.SendMessageToRoom(roomActor, "Hello, everyone!", "John")

                        context.system.terminate()
                    }

                    roomActor ! RoomActor.CreateRoom(userActor, "General")
                    userActor ! UserActor.SendMessageToRoom(roomActor, "Hello, everyone!", "John")

                    Behaviors.same
            }
        }
        val system: ActorSystem[Nothing] = ActorSystem(guardianActor, "ChatSystem")
    }
}

build.gradle:

plugins {
    id 'scala'
    id 'application'
}

repositories {
    mavenCentral()
}

def versions = [
    ScalaBinary: "2.13",
    AkkaVersion: "2.6.17" // Update to the latest Akka version
]

dependencies {
    implementation "com.typesafe.akka:akka-actor-typed_2.13:2.6.17"
    implementation "com.typesafe.akka:akka-stream_${versions.ScalaBinary}:${versions.AkkaVersion}"
    implementation "com.typesafe.akka:akka-protobuf_${versions.ScalaBinary}:${versions.AkkaVersion}"
    implementation "com.typesafe.akka:akka-http_2.13:10.5.2"
    implementation "com.typesafe.akka:akka-actor-testkit-typed_${versions.ScalaBinary}:${versions.AkkaVersion}"
    testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.2'
    testRuntimeOnly 'org.junit.jupiter:junit-jupiter-engine:5.8.2'
    implementation "ch.qos.logback:logback-classic:1.2.13"
    implementation group: 'com.typesafe.akka', name: 'akka-slf4j_2.13', version: '2.6.17'
}

application {
    mainClassName = 'Chat.Main'
}

compileScala {
    scalaCompileOptions.additionalParameters = [
        "-deprecation",
        "-feature",
        "-unchecked",
        "-language:postfixOps",
        "-language:implicitConversions"
    ]
}

tasks.withType(JavaCompile) {
    options.encoding = 'UTF-8'
}

The output is:

14:40:29.284 [ChatSystem-akka.actor.default-dispatcher-3] INFO  akka.event.slf4j.Slf4jLogger - Slf4jLogger started
SLF4J: A number (1) of logging calls during the initialization phase have been intercepted and are
SLF4J: now being replayed. These are subject to the filtering rules of the underlying logging system.
SLF4J: See also http://www.slf4j.org/codes.html#replay

So, the program not even stop

I guess, the problem is in ask pattern in Main, but I don't know where's problem actually and don't know how to fix it

1

There are 1 best solutions below

1
Muhammadsiddiq On BEST ANSWER

The issue in your code lies in the usage of the ask pattern (userActor.ask(UserActor.CreateUser("John", "Doe", 30, _))) within your guardianActor. The ask pattern (?) is not recommended for usage within an actor because it blocks the current thread until a response is received, which can lead to deadlocks in some situations, especially when combined with other blocking operations or when used within the same actor's behavior.

Instead of using the ask pattern, you should use tell (!) to send messages asynchronously between actors. Here's how you can refactor your guardianActor to fix this issue:

val guardianActor: Behavior[Any] = Behaviors.setup { context =>
  val userActor = context.spawn(UserActor(), "userActor")
  val roomActor = context.spawn(RoomActor(), "roomActor")

  // Create user
  userActor ! UserActor.CreateUser("John", "Doe", 30, context.self)

  // Create room
  roomActor ! RoomActor.CreateRoom(userActor, "General")

  // Send message to room
  userActor ! UserActor.SendMessageToRoom(roomActor, "Hello, everyone!", "John")

  Behaviors.empty
}

In the guardianActor, I've used Behaviors.setup to initialize the actors. Then, I've replaced the ask pattern with ! to send messages to userActor and roomActor. Additionally, I've used context.self as a parameter in CreateUser message to enable the UserActor to send a response back to the guardianActor. You should handle this response message in the UserActor and then terminate the system accordingly.

This should resolve the issue you're facing.