Spring Cloud : Supplier continuously publishing Kafka events instead of one?

471 Views Asked by At

Spring Cloud : Supplier continuously Kafka publishing events how to published only one ?

  public static HashMap<String, Ticker> transactionsOfAccount = new HashMap<>(0);
    public LinkedList<Ticker> lists = new LinkedList<>();

Producer.class

@Bean
public Supplier<Message<Ticker>> messageSupplier() {

    return () -> {
        if (tickerPublisher.lists.peek() != null) {
            Message<Ticker> msg = MessageBuilder
                    .withPayload(tickerPublisher.lists.peek())
                    .build();
            log.info("Total Size is {}",tickerPublisher.lists.size());
            log.info("Message: {}", msg.getPayload());
            tickerPublisher.lists.get(0).setStatus(Status.SUCESS);
            return  msg;
        } else {
            return null;
        }
    };
}

RestController.class

@GetMapping(value = "/quote-mono", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Mono<Ticker> getQuoteMono(@RequestParam("symbol") String symbol) {
    tickerPublisher.publisherMono(symbol);
    return mono;
}

PublisherService.class

 public void publisherMono(String ticker) {
        String path = ticker.toUpperCase() + "/prices/realtime?api_key=" + apiKey;
        this.webClient
                .get()
                .uri(path)
                .retrieve()
                .bodyToMono(Ticker.class)
                .flatMap(data -> sendToKafka(ticker, data))
                .doOnNext(data-> {
                    log.info("next events from published : {}", data);
                    if (transactionsOfAccount.containsKey(ticker) && !lists.isEmpty()) {
                        log.info("list is clear now ");
                        transactionsOfAccount.clear();
                    }
                })
                .subscribe(
                        data -> {
                            log.info("data is {}", data);
                            this.sinkMono.tryEmitValue((Ticker) data);
                        },
                        (err) -> log.info(String.valueOf(err)),
                        () -> {
                            log.info("Completed");
                        }
                )
        ;
        log.info(lists.toString());
    }

Issues is Supplier Class is publishing continuously duplicate events in KafkaBroker

enter image description here

Need help where is the issues how Can we published Mono Single events if response is Single object

Rest response is

{"last_price":245.3,"last_time":"2022-12-09T22:44:53.000Z","last_size":null,"bid_price":244.98,"bid_size":100,"ask_price":250.0,"ask_size":96,"open_price":246.4,"close_price":null,"high_price":248.2,"low_price":244.37,"exchange_volume":1168680,"market_volume":null,"updated_on":"2022-12-09T22:59:58.183Z","source":"bats_delayed","security":{"id":"sec_XaL6mg","ticker":"MSFT","exchange_ticker":"MSFT:UW","figi":"BBG000BPHFS9","composite_figi":"BBG000BPH459"}}

Expectation is to published only one events with WebClient

2

There are 2 best solutions below

0
On BEST ANSWER

The supplier programming model in Spring Cloud Stream is intended for polling, i.e., it polls every configured time duration and triggers the supplier. By default, it polls every second. You can change that by configuring it. See this for more details.

However, in your case, this may not work. You might not want a supplier and should publish programmatically using something like a StreamBridge.

0
On

In your case probably the best choise would be to use StreamBridge

  @Autowired
  private final StreamBridge streamBridge;

  @Override
  public void send(T t) {
    Message<T> message = message(t);
    boolean success = streamBridge.send(bindingName(), message);
    if (success) {
      log.info("send message to kafka {}", message);
    } else {
      String msg = "Message was not sent to stream (kafka producer): " + message;
      log.warn(msg);
      throw exception(msg);
    }
  }