Summary
I am new to netty and plan to demonstrate a rtsp online desktop share application for practise . now i get puzzled that when i request the desktop share frame via rtsp , it should be decode by RtspDecoder
which is provide by netty native , but the other data would be decode too? my purpose is that if the data is rtsp then it decoded by rtspHandler
,if it not , it should fire subsequent handler ReactiveConnectionConsumer
, but now the reactive have not reaceive any data . could some one tell me , the correct usage of this ? i make mess on it .
rtps client side i send the data wrapper into rtsp via netty ByteBuf
public void configSendConsumer (Consumer<DefaultFullHttpRequest> consumer) {
new Thread(()-> {
while (true){
// recorder.get
if (bos.size() > 0){
ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer();
buffer.writeBytes(bos.toByteArray());
DefaultFullHttpRequest defaultFullHttpRequest = new DefaultFullHttpRequest(RtspVersions.RTSP_1_0,
RtspMethods.OPTIONS, "/live", buffer);
consumer.accept(defaultFullHttpRequest);
bos.reset();
}
}
}).start();
}
server builder ReactorTcpServer
public ReactiveServer init(InetSocketAddress address){
this.address = address;
server = TcpServer
.create()
.wiretap("tcp-server", LogLevel.INFO)
.port(address.getPort())
.doOnConnection(connection -> {
log.debug("connection has been established ");
connection.addHandlerLast(new ProtobufEncoder());
ProtoBufMessageLiteScanner.protobufDecoders()
.forEach(connection::addHandlerLast);
connection
.addHandlerLast(new RtspEncoder()) // i add this one to decode the rtsp protocol ,
.addHandlerLast(new RtspDecoder())
;
})
.handle(ReactiveHandlerSPI.wiredSpiHandler().handler())
;
log.info("startup netty on port {}",address.getPort());
return this;
}
comsumer data with reactor handler
@Slf4j
public class ReactiveConnectionConsumer extends ConnectionConsumer {
public ReactiveConnectionConsumer(){
super((nettyInbound, nettyOutbound) -> {
Flux<byte[]> handle = nettyInbound.receive().handle((byteBuf, sink) ->
nettyInbound.withConnection(connection -> {
log.debug("receive data ");
int i = byteBuf.readableBytes();
if (i > 0) {
try{
ByteBufProcessService.getInstance().process(connection,byteBuf);
}catch (Exception exception){
log.error("reactor netty occur error {} ", ExceptionUtil.stacktraceToString(exception));
sink.next(("occur error {} " + ExceptionUtil.stacktraceToString(exception)).getBytes());
}
}
sink.next("server has receive your data ".getBytes());
}));
var nettyOutbound1 = nettyOutbound.sendByteArray(Flux.concat(handle));
return nettyOutbound1.then();
});
}
@Override
public void accept(Connection c) {
super.accept(c);
}
}
here is project link github/pengpengon/Meeting , may be the whole design is in wrong way , could some one helps to demonstrate the general rtsp in netty best practise?