Basically I trying to get data from database using akka http. if i pass (EmployeeRepo.findAll()) directly in api then it show all data but while using actor it shows cast error.... here problem for data fetching only please solve it

this is my EmployeeRepo-----------------------------------

    package org.repo

import org.data.Employee
import org.db.DbConfig
import org.mongodb.scala.MongoCollection
import org.utils.JsonUtils
import org.mongodb.scala.bson.conversions.Bson
import org.mongodb.scala.model.Filters.equal
import org.mongodb.scala.model.FindOneAndUpdateOptions
import org.mongodb.scala.model.Updates.{combine, set}

import scala.concurrent.Future

object EmployeeRepo extends JsonUtils{
private val employeeDoc:MongoCollection[Employee]=DbConfig.employees

def createCollection()={
DbConfig.database.createCollection("employee").subscribe(
  (result)=>println(s"$result"),
  e=>println(e.getLocalizedMessage),
()=>println("complete")
)
}


  def insertData(emp:Employee)={
    employeeDoc.insertOne(emp).toFuture()
  }
  def findAll()= employeeDoc.find().toFuture()

  def update(emp:Employee,id:String)=
    employeeDoc
      .findOneAndUpdate(equal("_id", id),
        setBsonValue(emp),
        FindOneAndUpdateOptions().upsert(true)).toFuture()

  def delete(id:String)=
    employeeDoc.deleteOne(equal("_id",id)).toFuture()
  private def setBsonValue(emp:Employee)={
    combine(
      set("name",emp.name),
      set("dateOfBirth",emp.dateOfBirth)
    )
  }
}

============EMployeeService------------ package org.service

import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.util.UUID

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import org.data.Employee
import org.domain.EmployeeRequest
import org.repo.EmployeeRepo

import scala.concurrent.Future

class EmployeeService {

implicit val system=ActorSystem("Employeee")
  implicit val materializer=ActorMaterializer
  import system.dispatcher

def saveEmployeeData= (employeeRequest:EmployeeRequest) => {
val employeeDoc:Employee=employeeMapperWithNewId(employeeRequest)
EmployeeRepo.insertData(employeeDoc)
}
def findAll={
  EmployeeRepo.findAll()
}

  def update(employeeRequest: EmployeeRequest,id:String)={
    val employeeDoc=employeeMapperWithNewId(employeeRequest)
EmployeeRepo.update(emp = employeeDoc,id)
  }
  def delete(id:String)=EmployeeRepo.delete(id)

  private def employeeMapperWithNewId(employee:EmployeeRequest)={
Employee(name=employee.name,dateOfBirth = LocalDate.parse(employee.dateOfBirth, DateTimeFormatter.ISO_DATE),
_id=UUID.randomUUID.toString)
  }
}

-----------------EmployeeActor------

package org.actor

import akka.actor.{Actor, ActorLogging}
import org.actor.EmployeeActor.{Delete, Save, SearchAll, Update}
import org.data.Employee
import org.domain.EmployeeRequest
import org.service.EmployeeService

object EmployeeActor {

  case class Save(emp: EmployeeRequest)

  case object SearchAll

  case class Update(emp: EmployeeRequest, id: String)

  case class Delete(id: String)

}

class
EmployeeActor extends Actor with ActorLogging {
  private val employeeService: EmployeeService = new EmployeeService()

  override def receive: Receive = {
    case Save(emp) =>
      log.info(s"recevied msg saved with employee :$emp")
      sender() ! employeeService.saveEmployeeData(emp)

    case SearchAll =>
      log.info("received msg find ALL")
      sender() ! employeeService.findAll

    case Update(emp, id) =>
      log.info(s"received update msg for id $id and employee $emp")
      sender() ! employeeService.update(emp, id)
    case Delete(id) =>
      log.info(s"received msg for deleting employee of id $id")
      sender() ! employeeService.delete(id)
    case _ =>
      log.debug("Unhandled msg")


  }
}

---------------EmployeeRoute----

    package org
    
    import akka.actor.{ActorSystem, Props}
    import akka.http.scaladsl.model.{ContentTypes, HttpEntity, StatusCodes}
    import akka.stream.ActorMaterializer
    import org.actor.EmployeeActor
    import org.utils.{JsonUtils, TimeUtils}
    import akka.http.scaladsl.server.Directives._
    import akka.{NotUsed, util}
    import akka.util.{ByteString, Timeout}
    import org.data.Employee
    import org.domain.EmployeeRequest
    import akka.pattern.{Patterns, ask}
    import akka.stream.scaladsl.Source
    import org.actor.EmployeeActor.{Delete, Save, SearchAll, Update}
    import org.service.EmployeeService
    
    import scala.concurrent.duration._
    import spray.json._
    
    import scala.concurrent.{Await, Future}
    
    class EmployeeRoute extends JsonUtils{
    
      implicit val system=ActorSystem("Employee")
      implicit val materializer=ActorMaterializer
      import system.dispatcher
      val actor=system.actorOf(Props[EmployeeActor],"employeeActor")
      val employeeService=new EmployeeService()
    
      implicit val timeOut=Timeout(5.seconds)
    
      val getRoute={
        pathPrefix("employee"){
    
          (pathEndOrSingleSlash & get){
            complete((actor ? SearchAll).mapTo[Seq[EmployeeRequest]])
  }~
          ( path("update") & put &  parameter("id".as[String])){id=>
            entity(as[EmployeeRequest]){employee=>
              complete((actor ? Update(employee,id)).map(_=>StatusCodes.OK))
            }
          }~
          post{
            entity(as[EmployeeRequest]) { employee =>
              complete((actor ? Save(employee)).map(_ => StatusCodes.OK))
            }
          }~
            delete{
              (path(Segment) |parameter("id".as[String])){id=>
                complete((actor ? Delete(id)).map(_=>StatusCodes.OK))
              }
            }
    
    
    
        }
      }
    }

=================Error===============

[ERROR] [09/09/2020 19:46:48.551] [web-app-akka.actor.default-dispatcher-4] [akka.actor.ActorSystemImpl(web-app)] Error during processing of request: 'Cannot cast scala.concurrent.impl.Promise$Transformation to scala.collection.immutable.Seq'. Completing with 500 Internal Server Error response. To change default exception handling behavior, provide a custom ExceptionHandler.
java.lang.ClassCastException: Cannot cast scala.concurrent.impl.Promise$Transformation to scala.collection.immutable.Seq
    at java.base/java.lang.Class.cast(Class.java:3734)
    at scala.concurrent.Future.$anonfun$mapTo$1(Future.scala:464)
    at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:430)
    at scala.concurrent.ExecutionContext$parasitic$.execute(ExecutionContext.scala:164)
    at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:392)
    at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:299)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete0(Promise.scala:249)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:242)
    at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:615)
    at org.actor.EmployeeActor$$anonfun$receive$1.applyOrElse(EmployeeActor.scala:32)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at org.actor.EmployeeActor.aroundReceive(EmployeeActor.scala:22)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
    at akka.actor.ActorCell.invoke(ActorCell.scala:547)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)

please help me out..

1

There are 1 best solutions below

1
On

You analyzed almost everything. EmployeeRepo.findAll is your problem. You should not use Futures in akka actors. pipeTo should be used instead.

Please try to update EmployeeActor

case SearchAll =>
  log.info("received msg find ALL")
  employeeService.findAll.pipeTo(sender())