How do I create rSockets that don't share an underlying network connection in springboot client app

228 Views Asked by At

I'd like to create multiple rsocket connections from a springboot client app to a single server WITHOUT sharing the same underlying network connection.

The use case for this is load testing - I want to simulate many web clients making rSocket connections to my server, and would like to create (say) 100 such connections from a single application instance so it's easier to operate and monitor than running 100 separate processes.

I'm doing something along these lines:

RSocketRequester requester = rSocketRequesterBuilder
   .dataMimeType(MediaType.APPLICATION_JSON)
   .setupMetadata(jwt, BearerTokenMetadata.BEARER_AUTHENTICATION_MIME_TYPE)
   .rsocketStrategies(builder -> builder.encoder(new BearerTokenAuthenticationEncoder()))
   .websocket(serverUri);
requester.route("my.endpoint").retrieveFlux(byte[].class);

This works fine to make multiple connections to the server and get data. But websocket(), per its documentation, seems to be re-using the same underlying shared connection, which means it doesn't adequately simulate many independent connections putting load on the server.

Any idea how to do it?

I'm using Springboot 2.4.4.

Update

I now think i'm successfully making two connections, but that my second connection request is somehow inheriting and appending to the metadata of the first connection instead of having its own separate metadata. The metadata I'm focusing on is a JWT.

Here is the logging from the server side. It looks to me like the second setup frame has two JWTs, each starting with message/x.rsocket.authentication.bearer.v0; the first JWT follwed by the second JWT.

2021-05-03 17:52:20.501 DEBUG 19737 --- [ctor-http-nio-2] reactor.netty.transport.ServerTransport  : [id:98c2fa05, L:/0:0:0:0:0:0:0:0:7080] Bound new server
2021-05-03 17:52:20.506  INFO 19737 --- [           main] o.s.b.rsocket.netty.NettyRSocketServer   : Netty RSocket started on port(s): 7080
2021-05-03 17:52:20.520  INFO 19737 --- [           main] c.p.pano2.rx.RxServicesApplication       : Started RxServicesApplication in 2.616 seconds (JVM running for 3.152)
2021-05-03 17:52:25.799 DEBUG 19737 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id:380596e7, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] New http connection, requesting read
2021-05-03 17:52:25.800 DEBUG 19737 --- [ctor-http-nio-3] reactor.netty.transport.TransportConfig  : [id:380596e7, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] Initialized pipeline DefaultChannelPipeline{(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2021-05-03 17:52:25.826 DEBUG 19737 --- [ctor-http-nio-3] r.n.http.server.HttpServerOperations     : [id:380596e7, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] Increasing pending responses, now 1
2021-05-03 17:52:25.831 DEBUG 19737 --- [ctor-http-nio-3] reactor.netty.ReactorNetty               : [id:380596e7-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] Added decoder [PongHandler] at the end of the user pipeline, full pipeline: [reactor.left.httpCodec, reactor.left.httpTrafficHandler, PongHandler, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
2021-05-03 17:52:25.832 DEBUG 19737 --- [ctor-http-nio-3] reactor.netty.http.server.HttpServer     : [id:380596e7-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] Handler is being applied: io.rsocket.transport.netty.server.WebsocketServerTransport$$Lambda$1058/0x0000000800720c40@cb577eb
2021-05-03 17:52:25.837 DEBUG 19737 --- [ctor-http-nio-3] reactor.netty.ReactorNetty               : [id:380596e7-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] Removed handler: reactor.left.httpTrafficHandler, pipeline: DefaultChannelPipeline{(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (PongHandler = io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2021-05-03 17:52:25.837 DEBUG 19737 --- [ctor-http-nio-3] reactor.netty.ReactorNetty               : [id:380596e7-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] Non Removed handler: reactor.left.accessLogHandler, context: null, pipeline: DefaultChannelPipeline{(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (PongHandler = io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2021-05-03 17:52:25.837 DEBUG 19737 --- [ctor-http-nio-3] reactor.netty.ReactorNetty               : [id:380596e7-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] Non Removed handler: reactor.left.httpMetricsHandler, context: null, pipeline: DefaultChannelPipeline{(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (PongHandler = io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2021-05-03 17:52:25.870 DEBUG 19737 --- [ctor-http-nio-3] reactor.netty.channel.FluxReceive        : [id:380596e7-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58851] FluxReceive{pending=0, cancelled=false, inboundDone=false, inboundError=null}: subscribing inbound receiver
2021-05-03 17:52:25.893 DEBUG 19737 --- [ctor-http-nio-3] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 0 Type: SETUP Flags: 0b100000000 Length: 300
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 29 6d 65 73 73 61 67 65 2f 78 2e 72 73 6f 63 6b |)message/x.rsock|
|00000010| 65 74 2e 61 75 74 68 65 6e 74 69 63 61 74 69 6f |et.authenticatio|
|00000020| 6e 2e 62 65 61 72 65 72 2e 76 30 00 00 b0 65 79 |n.bearer.v0...ey|
|00000030| 4a 68 62 47 63 69 4f 69 4a 49 55 7a 49 31 4e 69 |JhbGciOiJIUzI1Ni|
|00000040| 4a 39 2e 65 79 4a 71 64 47 6b 69 4f 69 4a 56 55 |J9.eyJqdGkiOiJVU|
|00000050| 30 56 53 4d 53 49 73 49 6d 6c 68 64 43 49 36 4d |0VSMSIsImlhdCI6M|
|00000060| 54 59 79 4d 44 41 33 4f 44 63 30 4e 53 77 69 63 |TYyMDA3ODc0NSwic|
|00000070| 33 56 69 49 6a 6f 69 56 56 4e 46 55 6a 45 69 4c |3ViIjoiVVNFUjEiL|
|00000080| 43 4a 70 63 33 4d 69 4f 69 4a 6d 59 57 74 6c 49 |CJpc3MiOiJmYWtlI|
|00000090| 47 6c 7a 63 33 56 6c 63 69 49 73 49 6d 56 34 63 |Glzc3VlciIsImV4c|
|000000a0| 43 49 36 4d 54 59 79 4d 44 45 32 4e 54 45 30 4e |CI6MTYyMDE2NTE0N|
|000000b0| 58 30 2e 58 33 6e 79 62 6b 4a 44 51 4f 67 36 7a |X0.X3nybkJDQOg6z|
|000000c0| 53 7a 77 39 68 63 68 4d 54 4b 34 6e 71 48 63 34 |Szw9hchMTK4nqHc4|
|000000d0| 4c 6f 47 36 32 73 7a 73 6e 47 58 54 79 77       |LoG62szsnGXTyw  |
+--------+-------------------------------------------------+----------------+
Data:

2021-05-03 17:52:25.929  INFO 19737 --- [ctor-http-nio-3] c.p.p.security.FakeReactiveJwtDecoder    : Decoding jwt 1040895620 subject USER1
2021-05-03 17:52:25.947 DEBUG 19737 --- [ctor-http-nio-3] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 1 Type: REQUEST_STREAM Flags: 0b100000000 Length: 22 InitialRequestN: 9223372036854775807
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 05 04 74 65 73 74                      |.....test       |
+--------+-------------------------------------------------+----------------+
Data:

2021-05-03 17:52:25.965  INFO 19737 --- [ctor-http-nio-3] c.p.pano2.rx.web.SystemController        : Test connection from USER1 null
2021-05-03 17:52:28.981 DEBUG 19737 --- [     parallel-2] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 19
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 65 6c 6c 6f 20 55 53 45 52 31 20 30          |Hello USER1 0   |
+--------+-------------------------------------------------+----------------+
2021-05-03 17:52:30.736 DEBUG 19737 --- [ctor-http-nio-4] r.n.http.server.HttpServerOperations     : [id:d82b3f3a, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] New http connection, requesting read
2021-05-03 17:52:30.736 DEBUG 19737 --- [ctor-http-nio-4] reactor.netty.transport.TransportConfig  : [id:d82b3f3a, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] Initialized pipeline DefaultChannelPipeline{(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2021-05-03 17:52:30.751 DEBUG 19737 --- [ctor-http-nio-4] r.n.http.server.HttpServerOperations     : [id:d82b3f3a, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] Increasing pending responses, now 1
2021-05-03 17:52:30.751 DEBUG 19737 --- [ctor-http-nio-4] reactor.netty.ReactorNetty               : [id:d82b3f3a-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] Added decoder [PongHandler] at the end of the user pipeline, full pipeline: [reactor.left.httpCodec, reactor.left.httpTrafficHandler, PongHandler, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
2021-05-03 17:52:30.751 DEBUG 19737 --- [ctor-http-nio-4] reactor.netty.http.server.HttpServer     : [id:d82b3f3a-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] Handler is being applied: io.rsocket.transport.netty.server.WebsocketServerTransport$$Lambda$1058/0x0000000800720c40@cb577eb
2021-05-03 17:52:30.751 DEBUG 19737 --- [ctor-http-nio-4] reactor.netty.ReactorNetty               : [id:d82b3f3a-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] Removed handler: reactor.left.httpTrafficHandler, pipeline: DefaultChannelPipeline{(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (PongHandler = io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2021-05-03 17:52:30.751 DEBUG 19737 --- [ctor-http-nio-4] reactor.netty.ReactorNetty               : [id:d82b3f3a-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] Non Removed handler: reactor.left.accessLogHandler, context: null, pipeline: DefaultChannelPipeline{(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (PongHandler = io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2021-05-03 17:52:30.751 DEBUG 19737 --- [ctor-http-nio-4] reactor.netty.ReactorNetty               : [id:d82b3f3a-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] Non Removed handler: reactor.left.httpMetricsHandler, context: null, pipeline: DefaultChannelPipeline{(reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (PongHandler = io.rsocket.transport.netty.server.BaseWebsocketServerTransport$PongHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
2021-05-03 17:52:30.752 DEBUG 19737 --- [ctor-http-nio-4] reactor.netty.channel.FluxReceive        : [id:d82b3f3a-1, L:/127.0.0.1:7080 - R:/127.0.0.1:58855] FluxReceive{pending=0, cancelled=false, inboundDone=false, inboundError=null}: subscribing inbound receiver
2021-05-03 17:52:30.755 DEBUG 19737 --- [ctor-http-nio-4] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 0 Type: SETUP Flags: 0b100000000 Length: 522
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 29 6d 65 73 73 61 67 65 2f 78 2e 72 73 6f 63 6b |)message/x.rsock|
|00000010| 65 74 2e 61 75 74 68 65 6e 74 69 63 61 74 69 6f |et.authenticatio|
|00000020| 6e 2e 62 65 61 72 65 72 2e 76 30 00 00 b0 65 79 |n.bearer.v0...ey|
|00000030| 4a 68 62 47 63 69 4f 69 4a 49 55 7a 49 31 4e 69 |JhbGciOiJIUzI1Ni|
|00000040| 4a 39 2e 65 79 4a 71 64 47 6b 69 4f 69 4a 56 55 |J9.eyJqdGkiOiJVU|
|00000050| 30 56 53 4d 53 49 73 49 6d 6c 68 64 43 49 36 4d |0VSMSIsImlhdCI6M|
|00000060| 54 59 79 4d 44 41 33 4f 44 63 30 4e 53 77 69 63 |TYyMDA3ODc0NSwic|
|00000070| 33 56 69 49 6a 6f 69 56 56 4e 46 55 6a 45 69 4c |3ViIjoiVVNFUjEiL|
|00000080| 43 4a 70 63 33 4d 69 4f 69 4a 6d 59 57 74 6c 49 |CJpc3MiOiJmYWtlI|
|00000090| 47 6c 7a 63 33 56 6c 63 69 49 73 49 6d 56 34 63 |Glzc3VlciIsImV4c|
|000000a0| 43 49 36 4d 54 59 79 4d 44 45 32 4e 54 45 30 4e |CI6MTYyMDE2NTE0N|
|000000b0| 58 30 2e 58 33 6e 79 62 6b 4a 44 51 4f 67 36 7a |X0.X3nybkJDQOg6z|
|000000c0| 53 7a 77 39 68 63 68 4d 54 4b 34 6e 71 48 63 34 |Szw9hchMTK4nqHc4|
|000000d0| 4c 6f 47 36 32 73 7a 73 6e 47 58 54 79 77 29 6d |LoG62szsnGXTyw)m|
|000000e0| 65 73 73 61 67 65 2f 78 2e 72 73 6f 63 6b 65 74 |essage/x.rsocket|
|000000f0| 2e 61 75 74 68 65 6e 74 69 63 61 74 69 6f 6e 2e |.authentication.|
|00000100| 62 65 61 72 65 72 2e 76 30 00 00 b0 65 79 4a 68 |bearer.v0...eyJh|
|00000110| 62 47 63 69 4f 69 4a 49 55 7a 49 31 4e 69 4a 39 |bGciOiJIUzI1NiJ9|
|00000120| 2e 65 79 4a 71 64 47 6b 69 4f 69 4a 56 55 30 56 |.eyJqdGkiOiJVU0V|
|00000130| 53 4d 69 49 73 49 6d 6c 68 64 43 49 36 4d 54 59 |SMiIsImlhdCI6MTY|
|00000140| 79 4d 44 41 33 4f 44 63 31 4d 43 77 69 63 33 56 |yMDA3ODc1MCwic3V|
|00000150| 69 49 6a 6f 69 56 56 4e 46 55 6a 49 69 4c 43 4a |iIjoiVVNFUjIiLCJ|
|00000160| 70 63 33 4d 69 4f 69 4a 6d 59 57 74 6c 49 47 6c |pc3MiOiJmYWtlIGl|
|00000170| 7a 63 33 56 6c 63 69 49 73 49 6d 56 34 63 43 49 |zc3VlciIsImV4cCI|
|00000180| 36 4d 54 59 79 4d 44 45 32 4e 54 45 31 4d 48 30 |6MTYyMDE2NTE1MH0|
|00000190| 2e 4a 33 63 63 52 68 72 31 43 4b 70 45 43 58 66 |.J3ccRhr1CKpECXf|
|000001a0| 45 51 53 31 5a 49 63 71 4f 6d 76 57 31 52 52 4d |EQS1ZIcqOmvW1RRM|
|000001b0| 43 38 67 53 76 4a 76 45 54 44 64 67             |C8gSvJvETDdg    |
+--------+-------------------------------------------------+----------------+
Data:

2021-05-03 17:52:30.757  INFO 19737 --- [ctor-http-nio-4] c.p.p.security.FakeReactiveJwtDecoder    : Decoding jwt 1994845380 subject USER1
2021-05-03 17:52:30.757 DEBUG 19737 --- [ctor-http-nio-4] io.rsocket.FrameLogger                   : receiving -> 
Frame => Stream ID: 1 Type: REQUEST_STREAM Flags: 0b100000000 Length: 22 InitialRequestN: 9223372036854775807
Metadata:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| fe 00 00 05 04 74 65 73 74                      |.....test       |
+--------+-------------------------------------------------+----------------+
Data:

2021-05-03 17:52:30.758  INFO 19737 --- [ctor-http-nio-4] c.p.pano2.rx.web.SystemController        : Test connection from USER1 null
2021-05-03 17:52:31.967 DEBUG 19737 --- [     parallel-2] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 19
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 65 6c 6c 6f 20 55 53 45 52 31 20 31          |Hello USER1 1   |
+--------+-------------------------------------------------+----------------+
2021-05-03 17:52:33.765 DEBUG 19737 --- [     parallel-4] io.rsocket.FrameLogger                   : sending -> 
Frame => Stream ID: 1 Type: NEXT Flags: 0b100000 Length: 19
Data:
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 65 6c 6c 6f 20 55 53 45 52 31 20 30          |Hello USER1 0   |
+--------+-------------------------------------------------+----------------+

I'm having trouble connecting what I see from wiretap log from the client side; the encoding in the frame logger seems to be different and less human-readable, but it does look like a second request is longer than the first. (Omitting the log to stay within the 30k char limit)

The code I'm using to generate the two client requests is:

  private void testRequest(String user) {
        String jwt = FakeJwtUtils.createJwt(user, user, 24*60*60*1000L);
        URI uri = config.getUri();
        String baseUrl = uri.getHost() + ":" + uri.getPort();
        HttpClient httpClient = HttpClient.newConnection().baseUrl(baseUrl).wiretap(true);  // newConnection() to avoid shared connection
        WebsocketClientTransport transport = WebsocketClientTransport.create(httpClient, uri.getPath());
        try {
            log.info("Setting up rSocketRequesterBuilder for jwt sub {}: {}", FakeJwtUtils.parseClaimsAsMap(jwt).get("sub"), jwt);
        } catch (Exception ignore) { }
        var requesterBuilder = rSocketRequesterBuilder
                .dataMimeType(MediaType.APPLICATION_JSON)
                .setupMetadata(jwt, BearerTokenMetadata.BEARER_AUTHENTICATION_MIME_TYPE)
                .rsocketStrategies(builder -> builder.encoder(new BearerTokenAuthenticationEncoder()))
                .transport(transport);
        requesterBuilder.route("test").retrieveFlux(byte[].class).subscribe(bytes -> log.info("rcvd: {}", new String(bytes)));
    }

Any pointers would be much appreciated!

1

There are 1 best solutions below

0
On BEST ANSWER

I'm answering my own question in case anyone else searching has a similar confusion to mine.

The root issue is that I was reusing the RSocketRequester.Builder to make multiple requesters for my multiple requests. This caused multiple issues; most problematic being that I was effectively (in the code above) calling setupMetadata(jwt, BearerTokenMetadata.BEARER_AUTHENTICATION_MIME_TYPE) multiple times on the same builder, with the effect of appending JWT entries rather than setting or replacing as I expected.

The solution seems to be simply to create a new builder for each connection:

RSocketRequester.builder()
     .dataMimeType(MediaType.APPLICATION_JSON)
     .setupMetadata(jwt, BearerTokenMetadata.BEARER_AUTHENTICATION_MIME_TYPE)
     .rsocketStrategies(b -> b.encoder(new BearerTokenAuthenticationEncoder()))
     .websocket(config.getUri())
     .route("test").retrieveFlux(byte[].class).subscribe();