I have a system that is clustered deploy on k8s, it will have multiple instances when it's deployed. My code sample like below
import akka.actor.typed.pubsub.Topic
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, ActorSystem}
object ClusterSystem extends App {
private val logger: Logger = LoggerFactory.getLogger(getClass)
ActorSystem(
Behaviors.setup[Unit] { context =>
implicit val system: ActorSystem[_] = context.system
implicit val ex: ExecutionContext = system.executionContext
implicit val clusterSharding: ClusterSharding = ClusterSharding(system)
val pubSubRef: ActorRef[Topic.Command[Message]] = context.spawn(Topic[VendorData]. ("cluster-message"), "ClusterMessage")
val publishService = new PublisherService(pubSubRef)
val subscribeService = new SubscribeService(pubSubRef)
Behaviors.empty
},
"ClusterSystem"
)
}
PublisherService:
import akka.actor.typed.pubsub.Topic
class PublisherService(pubSubRef:ActorRef[Topic.Command[Message]]){
def publish():Future[Done]={
for{
_<- Future.unit
_ = pubSubRef ! Topic.Publish(Message("someRandomData"))
} yield Done
}
}
SubscribeService:
import akka.actor.typed.pubsub.Topic
import akka.stream.typed.scaladsl.PubSub
class SubscribeService(pubSubRef:ActorRef[Topic.Command[Message]]){
def subscribe():Source[Message, NotUsed]= PubSub.source(pubSubRef, bufferSize = 10, overflowStrategy = OverflowStrategy.dropHead)
}
What I expecting is I can get all the message from the subscribe API, but right now, I only can get the message that published on each node. Example: instance-1 published message : M-1, M-2 instance-2 published message : M-3, M-4 Expecting: connect to the instance-1's subscribe api, and I got message: M-1, M-2, M-3, M-4