Hi I was trying to use Producer api like shown in Alpakka documentation. I'm able to consume record using Transactional source and Producer is created but not able to put message to topic Not able to Produce to topic using Transactional.Sink in Alpakka but I see idempotent producer is enabled. I see logs that it's comming into logic But it's not producing events to myTopic

[info] o.a.k.c.p.KafkaProducer - [Producer clientId=producer-7fe8789c-3171-429e-afbf-d8a8ba12700c, transactionalId=7fe8789c-3171-429e-afbf-d8a8ba12700c] the idempotent producer is enabled.

Could you please help me understand why it might not produce message to topic

I'm running my code locally using docker

Below is my code

``` Transactional.source(consumerSettings,
            Subscriptions.topics(topicNames))
            .mapMaterializedValue(innerControl = _)
            .map(consumerRecord => {

              handleBusiness(consumerRecord)
                .flatMap(res => Source.single(res)
                       .runWith(Transactional.sink(producerSettings, 
                                           UUID.randomUUID().toString)))

            })

        }
          source.runWith(Sink.ignore)

And my handleBusiness logics looks like below:

   ```
 private def handleBusiness(consumedMessage: ConsumerMessage.TransactionalMessage[String, String]): Future[Envelope[String, String, PartitionOffset]]  = {

       (conversion of consumedMessage ) map { message =>

            ProducerMessage.single(new ProducerRecord("myTopic", consumedMessage.record.key, message ), consumedMessage.partitionOffset)

     }


 }```


1

There are 1 best solutions below

0
On

I could do it using one Flow also Transactional source needs to have one Sink/Flow like below

Transactional.source(consumerSettings,
                      Subscriptions.topics(topicNames))
                      .mapMaterializedValue(innerControl = _)
                      .mapAsync(5) { msg => business(msg)}
                      .via(Transactional.flow(producersettings, transactions-id))