Akka Streams flow to handle paginated results doesn't complete

1.2k Views Asked by At

I'd like to implement a Flow to handle paginated results (e.g., underlying service returns some results, but also indicates that more results are available by making another request, passing in e.g. a cursor).

Things I've done so far:

  1. I have implemented the following flow and test, but the flow doesn't complete.

    object AdditionalRequestsFlow {
    
      private def keepRequest[Request, Response](flow: Flow[Request, Response, NotUsed]): Flow[Request, (Request, Response), NotUsed] = {
        Flow.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
          import GraphDSL.Implicits._
          val in = builder.add(Flow[Request])
    
          val bcast = builder.add(Broadcast[Request](2))
          val merge = builder.add(Zip[Request, Response]())
    
          in ~> bcast         ~> merge.in0
                bcast ~> flow ~> merge.in1
    
          FlowShape(in.in, merge.out)
        })
      }
    
      def flow[Request, Response, Output](
        inputFlow: Flow[Request, Response, NotUsed],
        anotherRequest: (Request, Response) => Option[Request],
        extractOutput: Response => Output,
        mergeOutput: (Output, Output) => Output
      ): Flow[Request, Output, NotUsed] = {
        Flow.fromGraph(GraphDSL.create() { implicit b =>
          import GraphDSL.Implicits._
    
          val start = b.add(Flow[Request])
          val merge = b.add(Merge[Request](2))
          val underlying = b.add(keepRequest(inputFlow))
          val unOption = b.add(Flow[Option[Request]].mapConcat(_.toList))
          val unzip = b.add(UnzipWith[(Request, Response), Response, Option[Request]] { case (req, res) =>
            (res, anotherRequest(req, res))
          })
          val finish = b.add(Flow[Response].map(extractOutput)) // this is wrong as we don't keep to 1 Request -> 1 Output, but first let's get the flow to work
    
          start ~> merge ~> underlying ~> unzip.in
                                          unzip.out0            ~>  finish
                   merge <~ unOption   <~ unzip.out1
    
          FlowShape(start.in, finish.out)
        })
      }       
    }
    

    The test:

        import akka.NotUsed
        import akka.actor.ActorSystem
        import akka.stream.ActorMaterializer
        import akka.stream.scaladsl.{Flow, Sink, Source}
        import org.scalatest.FlatSpec
        import org.scalatest.Matchers._
        import cats.syntax.option._
        import org.scalatest.concurrent.ScalaFutures.whenReady
    
        class AdditionalRequestsFlowSpec extends FlatSpec {
          implicit val system = ActorSystem()
          implicit val materializer = ActorMaterializer()
    
          case class Request(max: Int, batchSize: Int, offset: Option[Int] = None)
          case class Response(values: List[Int], nextOffset: Option[Int])
    
          private val flow: Flow[Request, Response, NotUsed] = {
            Flow[Request]
              .map { request =>
                val start = request.offset.getOrElse(0)
                val end = Math.min(request.max, start + request.batchSize)
                val nextOffset = if (end == request.max) None else Some(end)
                val result = Response((start until end).toList, nextOffset)
                result
              }
          }
    
          "AdditionalRequestsFlow" should "collect additional responses" in {
            def anotherRequest(request: Request, response: Response): Option[Request] = {
              response.nextOffset.map { nextOffset => request.copy(offset = nextOffset.some) }
            }
    
            def extract(x: Response): List[Int] = x.values
            def merge(a: List[Int], b: List[Int]): List[Int] = a ::: b
    
            val requests =
              Request(max = 35, batchSize = 10) ::
              Request(max = 5, batchSize = 10) ::
              Request(max = 100, batchSize = 1) ::
              Nil
    
            val expected = requests.map { x =>
              (0 until x.max).toList
            }
    
            val future = Source(requests)
              .via(AdditionalRequestsFlow.flow(flow, anotherRequest, extract, merge))
              .runWith(Sink.seq)
    
            whenReady(future) { x =>
              x shouldEqual expected
            }
          }
        }
    
  2. Implemented the same flow in a terrible, blocking way to illustrate what I'm trying to achieve:

       def uglyHackFlow[Request, Response, Output](
        inputFlow: Flow[Request, Response, NotUsed],
        anotherRequest: (Request, Response) => Option[Request],
        extractOutput: Response => Output,
        mergeOutput: (Output, Output) => Output
      ): Flow[Request, Output, NotUsed] = {
        implicit val system = ActorSystem()
        implicit val materializer = ActorMaterializer()
    
        Flow[Request]
          .map { x =>
            def grab(request: Request): Output = {
              val response = Await.result(Source.single(request).via(inputFlow).runWith(Sink.head), 10.seconds) // :(
              val another = anotherRequest(request, response)
              val output = extractOutput(response)
              another.map { another =>
                mergeOutput(output, grab(another))
              } getOrElse output
            }
    
            grab(x)
          }
      }
    

    This works (but we should not be materializing anything / Await-ing at this point).

  3. Reviewed http://doc.akka.io/docs/akka/2.4/scala/stream/stream-graphs.html#Graph_cycles__liveness_and_deadlocks which I believe contains the answer, however I cannot seem to find it there. In my case, I would expect the cycle should contain one element at most times so neither buffer overflow nor complete starvation should occur - but evidently does.

  4. Tried to debug the stream using .withAttributes(Attributes(LogLevels(...))) however it doesn't result in any output despite seemingly correctly configured loggers.

I'm looking for hints how to fix the flow method keeping the same signature and semantics (test would pass).

Or perhaps I'm doing something completely off-base here (e.g., there is an existing feature in, say, akka-stream-contrib which solves this)?

3

There are 3 best solutions below

2
On BEST ANSWER

I think it's much safer to use Source.unfold than to create custom graphs. Here is what I typically do (with minor variations depending on API).

  override def getArticles(lastTokenOpt: Option[String], filterIds: (Seq[Id]) => Seq[Id]): Source[Either[String, ImpArticle], NotUsed] = {

    val maxRows = 1000

    def getUri(cursor: String, count: Int) = s"/works?rows=$count&filter=type:journal-article&order=asc&sort=deposited&cursor=${URLEncoder.encode(cursor, "UTF-8")}"

    Source.unfoldAsync(lastTokenOpt.getOrElse("*")) { cursor =>

      println(s"Getting ${getUri(cursor, maxRows)}")
      if (cursor.nonEmpty) {
        sendGetRequest[CrossRefResponse[CrossRefList[JsValue]]](getUri(cursor, maxRows)).map {
          case Some(response) =>
            response.message match {
              case Left(list) if response.status == "ok" =>

                println(s"Got ${list.items.length} items")
                val items = list.items.flatMap { js =>
                  try {
                    parseArticle(js)
                  } catch {
                    case ex: Throwable =>
                      logger.error(s"Error on parsing: ${js.compactPrint}")
                      throw ex
                  }
                }

                list.`next-cursor` match {
                  case Some(nextCursor) =>
                    Some(nextCursor -> (items.map(Right.apply).toList ::: List(Left(nextCursor))))
                  case None =>
                    logger.error(s"`next-cursor` is missing when fetching from CrossRef [status ${response.status}][${getUri(cursor, maxRows)}]")
                    Some("" -> items.map(Right.apply).toList)
                }
              case Left(jsvalue) if response.status != "ok" =>
                logger.error(s"API error on fetching data from CrossRef [status ${response.status}][${getUri(cursor, maxRows)}]")
                None
              case Right(someError) =>
                val cause = someError.fold(errors => errors.map(_.message).mkString(", "), ex => ex.message)
                logger.error(s"API error on fetching data from CrossRef [status $cause}][${getUri(cursor, maxRows)}]")
                None
            }

          case None =>
            logger.error(s"Got error on fetching ${getUri(cursor, maxRows)} from CrossRef")
            None
        }
      } else
        Future.successful(None)
    }.mapConcat(identity)
  }

In your case you probably don't even need to push cursor to the stream. I do that because I store last successful cursor in database to be able to resume later in case of failure.

0
On

feels like this video covers the gist of what you're trying to do. They create a custom Graphstage that maintains state and sends it back to the server and the stream of responses depends on the state sent back, they have an event to signal completion as well(in your case it would be where you have this check

if (end == request.max) None

0
On

You can use Source.unfoldAsync.

I've prepared a simple project that involves sending requests to a REST API, retrieving the data and next_page_url from the response, and utilizing next_page_url to obtain next chunk of data. This process is iterated until the next_page_url is None. At each step, the stream accumulates data, ultimately combining all collected data into a single Seq.

The complete source code can be found over on GitHub

class CatsHttpClientImpl(implicit system: ActorSystem[_], ec: ExecutionContext) extends CatsHttpClient {
  private val logger: Logger = LoggerFactory.getLogger(classOf[CatsHttpClientImpl])
  private val start: Option[String] = Some("https://catfact.ninja/breeds")

  override def getAllBreads: Future[Seq[Cat]] = {
    Source
      .unfoldAsync(start) {
        case Some(next) =>
          val nextChunkFuture: Future[CatsResponse] = sendRequest(next)

          nextChunkFuture.map { resp =>
            resp.nextPageUrl match {
              case Some(url) => Some((Some(url), resp.data))
              case None => Some((None, resp.data))
            }
          }
        case None => Future.successful(None)
      }
      .runWith(Sink.fold(Seq(): Seq[Cat])(_ ++ _))
  }

  private def sendRequest(url: String): Future[CatsResponse] = {
    logger.info(s"CatsHttpClientImpl: Sending request $url")

    val request = HttpRequest(
      uri = Uri(url),
      headers = List(
        RawHeader("Accept", "application/json")
      )
    )
    Http(system).singleRequest(request).flatMap { response =>
      response.status match {
        case StatusCodes.OK =>
          logger.info("CatsHttpClientImpl: Received success")
          Unmarshal(response.entity).to[CatsResponse]

        case _ =>
          logger.error("CatsHttpClientImpl: Received error")
          throw new CatsHttpClientException()
      }
    }
  }
}