How to skip some specified Handler in netty or the correct handler usage with reactor-netty?

30 Views Asked by At

Summary

I am new to netty and plan to demonstrate a rtsp online desktop share application for practise . now i get puzzled that when i request the desktop share frame via rtsp , it should be decode by RtspDecoder which is provide by netty native , but the other data would be decode too? my purpose is that if the data is rtsp then it decoded by rtspHandler ,if it not , it should fire subsequent handler ReactiveConnectionConsumer , but now the reactive have not reaceive any data . could some one tell me , the correct usage of this ? i make mess on it .

rtps client side i send the data wrapper into rtsp via netty ByteBuf


public void configSendConsumer (Consumer<DefaultFullHttpRequest> consumer) {
        new Thread(()-> {
            while (true){
//                recorder.get
                if (bos.size() > 0){
                    ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer();

                    buffer.writeBytes(bos.toByteArray());

                    DefaultFullHttpRequest defaultFullHttpRequest = new DefaultFullHttpRequest(RtspVersions.RTSP_1_0,
                            RtspMethods.OPTIONS, "/live", buffer);
                    consumer.accept(defaultFullHttpRequest);

                    bos.reset();
                }

            }
        }).start();
    }

server builder ReactorTcpServer

  public ReactiveServer init(InetSocketAddress address){
        this.address = address;
        server = TcpServer
                .create()
                .wiretap("tcp-server", LogLevel.INFO)
                .port(address.getPort())
                .doOnConnection(connection -> {
                    log.debug("connection has been established ");

                    connection.addHandlerLast(new ProtobufEncoder());

                    ProtoBufMessageLiteScanner.protobufDecoders()
                                    .forEach(connection::addHandlerLast);
                    connection
                            .addHandlerLast(new RtspEncoder())   // i add this one to  decode the rtsp protocol , 
                            .addHandlerLast(new RtspDecoder())
                            ;
                })
                .handle(ReactiveHandlerSPI.wiredSpiHandler().handler())
        ;

        log.info("startup netty  on port {}",address.getPort());

        return this;
    }

comsumer data with reactor handler


@Slf4j
public class ReactiveConnectionConsumer extends ConnectionConsumer {

    public ReactiveConnectionConsumer(){
        super((nettyInbound, nettyOutbound) -> {

            Flux<byte[]> handle = nettyInbound.receive().handle((byteBuf, sink) ->

                nettyInbound.withConnection(connection -> {

                log.debug("receive data ");

                int i = byteBuf.readableBytes();

                if (i > 0) {

                    try{

                        ByteBufProcessService.getInstance().process(connection,byteBuf);

                    }catch (Exception exception){

                        log.error("reactor netty occur error {} ", ExceptionUtil.stacktraceToString(exception));

                        sink.next(("occur error {} " + ExceptionUtil.stacktraceToString(exception)).getBytes());

                    }

                }

                sink.next("server has receive your data ".getBytes());
                }));
            var nettyOutbound1 = nettyOutbound.sendByteArray(Flux.concat(handle));
            return nettyOutbound1.then();

        });

    }

    @Override
    public void accept(Connection c) {

        super.accept(c);

    }
}

here is project link github/pengpengon/Meeting , may be the whole design is in wrong way , could some one helps to demonstrate the general rtsp in netty best practise?

0

There are 0 best solutions below