I have a question regarding the alpakka_kafka+alpakka_s3 integration. Alpakka s3 multipartUpload doesn't seems to upload files when I use alpakka kafka sources.
kafkaSource ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore
However, as soon as I added .take(100) after the kafkaSource. Everything worked fine.
kafkaSource.take(100) ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore
Any help will be greatly appreciated. Thanks in advance!
Here's the complete code snippet:
// Source
val kafkaSource: Source[(CommittableOffset, Array[Byte]), Consumer.Control] = {
Consumer
.committableSource(consumerSettings, Subscriptions.topics(prefixedTopics))
.map(committableMessage => (committableMessage.committableOffset, committableMessage.record.value))
.watchTermination() { (mat, f: Future[Done]) =>
f.foreach { _ =>
log.debug("consumer source shutdown, consumerId={}, group={}, topics={}", consumerId, group, prefixedTopics.mkString(", "))
}
mat
}
}
// Flow
val commitFlow: Flow[CommittableOffset, Done, NotUsed] = {
Flow[CommittableOffset]
.groupedWithin(batchingSize, batchingInterval)
.map(group => group.foldLeft(CommittableOffsetBatch.empty) { (batch, elem) => batch.updated(elem) })
.mapAsync(parallelism = 3) { msg =>
log.debug("committing offset, msg={}", msg)
msg.commitScaladsl().map { result =>
log.debug("committed offset, msg={}", msg)
result
}
}
}
private val kafkaMsgToByteStringFlow = Flow[KafkaMessage[Any]].map(x => ByteString(x.msg + "\n"))
private val kafkaMsgToOffsetFlow = {
implicit val askTimeout: Timeout = Timeout(5.seconds)
Flow[KafkaMessage[Any]].mapAsync(parallelism = 5) { elem =>
Future(elem.offset)
}
}
// Sink
val s3Sink = {
val BUCKET = "test-data"
s3Client.multipartUpload(BUCKET, s"tmp/data.txt")
// Doesnt' work..... ( no files are showing up on the S3)
kafkaSource ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore
// This one works...
kafkaSource.take(100) ~> kafkaSubscriber.serializer.deserializeFlow ~> bcast.in
bcast.out(0) ~> kafkaMsgToByteStringFlow ~> s3Sink
bcast.out(1) ~> kafkaMsgToOffsetFlow ~> commitFlow ~> Sink.ignore
Actually, it does upload. The issue is, you need to send a completion request to s3 in order to finalize your upload and just then your file will be available inside the bucket. I'm betting since kafka source without
take(n)never stops producing data downstream, the sink never sends a completion request to s3 because the flow never completes actually, so the sink is always expecting for more data to upload before completing the request.There's no way to do what you want just uploading everything to one file, so, my tip is: group your
kafkaSourcemessages and send the zipped Array[Byte] to the sink. The trick part is you have to create one sink for each file instead of using just one sink.