I am trying to test my SSE API in the way explained in the following tutorial https://docs.spring.io/spring/docs/current/spring-framework-reference/testing.html#webtestclient-stream .

Unfortunately, it doesn't work for me. To generate next event it is necessary to perform some actions that will trigger it. I couldn't do it while waiting for response from WebTestClient (I didn't find the possibility to add such handler).

I found the workaround by creating separate thread that trigger generating events periodically, but it is not elegant. Is there any better way to do it?

TimerTask task = new TimerTask() {
        public void run() {
                while (true) {
                        //code that trigger generating event periodically
                }
        }
};
Timer timer = new Timer("Timer");
long delay = 1000L;
timer.schedule(task, delay);

FluxExchangeResult<MyEvent> result = client.get().uri("/events")
        .accept(TEXT_EVENT_STREAM)
        .exchange()
        .expectStatus().isOk()
        .returnResult(MyEvent.class); 

Flux<Event> eventFlux = result.getResponseBody();

StepVerifier.create(eventFlux)
        .expectNext(person)
        .expectNextCount(4)
        .consumeNextWith(p -> ...)
        .thenCancel()
        .verify();
2

There are 2 best solutions below

1
On

I had similar issue. If I understand you correctly, you want to publish some sort of message and then check if you are getting that message or not. If that is the case then you should use subscribe and provide consumer. Here is an example.

FluxExchangeResult<MyEvent> result = client.get().uri("/events")
    .accept(TEXT_EVENT_STREAM)
    .exchange()
    .expectStatus().isOk()
    .returnResult(MyEvent.class)
    .getResponseBody()
    .subscribe(new Consumer<MyEvent>() {
        @Override
        public void accept(String event) {
        }
    });
0
On

Maybe a bit late but I found a smarter solution. My problem is a bit different but you can propably adapt my solution. My flux is returning a list of user events that are triggered by some other endpoint. So right after connecting to the stream I need to invoke something else so that the flux does something.

First of all I inject a controller-spy:

@SpyBean
private UserAsyncController userController;

Within My test code I prepare the spy to activate the flux:

doAnswer(new Answer<Flux<?>>() {
  @Override
  public Flux<?> answer(InvocationOnMock invocation) throws Throwable {
      // [...] do something like user creation right before returning the flux.
      return (Flux<?>) invocation.callRealMethod();
  }
}).when(userController).getUserUpdates();

// [...] WebTestClient[...]exchange[...] and StepVerifier.

It is not really some "new" thread because you are spying aroung the actual controller method and you are inside the server thread but it works for me and it does not need any initial Timer. If you need some asynchronous (f.e. something repeated once per second) you can start it at this point.

From architecture view the method interception can be called an "on-incomming-request-spy".