Alpakka s3 `multipartUpload` doesn't upload files

502 Views Asked by At

I have a question regarding the alpakka_kafka+alpakka_s3 integration. Alpakka s3 multipartUpload doesn't seems to upload files when I use alpakka kafka sources.

kafkaSource ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
    bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
    bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore

However, as soon as I added .take(100) after the kafkaSource. Everything worked fine.

kafkaSource.take(100) ~> kafkaSubscriber.serializer.deserializeFlow ~>     bcast.in
    bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
    bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore

Any help will be greatly appreciated. Thanks in advance!

Here's the complete code snippet:

// Source
val kafkaSource: Source[(CommittableOffset, Array[Byte]), Consumer.Control] = {
    Consumer
      .committableSource(consumerSettings, Subscriptions.topics(prefixedTopics))
      .map(committableMessage => (committableMessage.committableOffset, committableMessage.record.value))
      .watchTermination() { (mat, f: Future[Done]) =>
        f.foreach { _ =>
          log.debug("consumer source shutdown, consumerId={}, group={}, topics={}", consumerId, group,     prefixedTopics.mkString(", "))
        }

        mat
      }
  }

// Flow
val commitFlow: Flow[CommittableOffset, Done, NotUsed] = {
    Flow[CommittableOffset]
      .groupedWithin(batchingSize, batchingInterval)
      .map(group => group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) => batch.updated(elem) })
      .mapAsync(parallelism = 3) { msg =>
        log.debug("committing offset, msg={}", msg)

        msg.commitScaladsl().map { result =>
          log.debug("committed offset, msg={}", msg)
          result
        }
      }
  }

private val kafkaMsgToByteStringFlow = Flow[KafkaMessage[Any]].map(x => ByteString(x.msg + "\n"))

private val kafkaMsgToOffsetFlow = {
    implicit val askTimeout: Timeout = Timeout(5.seconds)
    Flow[KafkaMessage[Any]].mapAsync(parallelism = 5) { elem =>
      Future(elem.offset)
    }
  }


// Sink

val s3Sink = {
      val BUCKET = "test-data"
      s3Client.multipartUpload(BUCKET, s"tmp/data.txt")



// Doesnt' work..... ( no files are showing up on the S3)
kafkaSource ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
        bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
        bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore

// This one works...
kafkaSource.take(100) ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
        bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
        bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore
2

There are 2 best solutions below

0
On
 private def running: Receive = {
    case Subscribe(subscriberId) =>

      val kafkaSubscriber = new KafkaSubscriber(
        serviceName = "akka_kafka_subscriber",
        group = kafkaConfig.group,
        topics = kafkaConfig.subscriberTopics,
        system = system,
        configurationProperties = Seq(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest")
      )

      RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
        import GraphDSL.Implicits._
        val bcast = builder.add(Broadcast[KafkaMessage[Any]](2))

        kafkaSource ~> kafkaSubscriber.serializer.deserializeFlow ~> kafkaSubscriber.filterTypeFlow[Any] ~> bcast.in
        bcast.out(0) ~> kafkaMsgToStringFlow
          .groupedWithin(BATCH_SIZE, BATCH_DURATION)
          .map(group => group.foldLeft(new StringBuilder()) { (batch, elem) => batch.append(elem) })
          .mapAsync(parallelism = 3) { data =>
            self ? ReadyForUpload(ByteString(data.toString()),UUID.randomUUID().toString,subscriberId)
          } ~> Sink.ignore
        bcast.out(1) ~> kafkaMsgToOffsetFlow ~> kafkaSubscriber.commitFlow ~> Sink.ignore
        ClosedShape
      }).withAttributes(ActorAttributes.supervisionStrategy(decider)).run()
      sender ! "subscription started"

    case ready: ReadyForUpload=>
      println("==========================Got ReadyForUpload: " + ready.fileName)
      val BUCKET = "S3_BUCKET"
      Source.single(ready.data).runWith(s3Client.multipartUpload(BUCKET, s"tmp/${ready.fileName}_${ready.subscriberId}.txt"))
      sender() ! "Done"
1
On

Actually, it does upload. The issue is, you need to send a completion request to s3 in order to finalize your upload and just then your file will be available inside the bucket. I'm betting since kafka source without take(n) never stops producing data downstream, the sink never sends a completion request to s3 because the flow never completes actually, so the sink is always expecting for more data to upload before completing the request.

There's no way to do what you want just uploading everything to one file, so, my tip is: group your kafkaSource messages and send the zipped Array[Byte] to the sink. The trick part is you have to create one sink for each file instead of using just one sink.