Several questions about netty's ChannelPoolMap

537 Views Asked by At

I was trying to use netty as a client for a Key-value db,glad to find netty 4.0.28.Final provide a native connection pool.

I have a simple connection pool like the following(According to the Netty 4.0.28.Final's note):

QdbConnectionPool:

public class QdbConnectionPool {

private ChannelPoolMap<InetSocketAddress,FixedChannelPool> poolMap;

public void init(){
    EventLoopGroup group = new NioEventLoopGroup();
    final Bootstrap bootstrap = new Bootstrap();
    bootstrap.group(group).channel(NioSocketChannel.class)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,10000);

    poolMap = new AbstractChannelPoolMap<InetSocketAddress, FixedChannelPool>() {
        @Override
        protected FixedChannelPool newPool(InetSocketAddress key) {
            bootstrap.remoteAddress(key);
            return new FixedChannelPool(bootstrap,new QdbPoolHandler(),100);
        }
    };

}

public QdbResult query(InetSocketAddress address,String bkey){
    final QdbResult result = new QdbResult(bkey);
    final CountDownLatch countDownLatch = new CountDownLatch(1);
    final FixedChannelPool pool = poolMap.get(address);
    Future<Channel> future = pool.acquire();
    future.addListener(new FutureListener<Channel>() {
        @Override
        public void operationComplete(Future<Channel> future) {
            if (future.isSuccess()) {
                Channel ch = future.getNow();
                System.out.println(ch.toString());
                ch.pipeline().addLast(new QdbClientHandler(result, countDownLatch));
                ch.pipeline().fireChannelActive();
            } else {
                System.out.println("future not succ");
            }
        }
    });
    try{
        countDownLatch.await();
    }catch (InterruptedException ex){

    }
    pool.release(future.getNow());
    return result;
}

public static void main(String[] args) throws Exception{
    InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8888);
    QdbConnectionPool pool = new QdbConnectionPool();
    pool.init();
    QdbResult result  = pool.query(address, "xxxxxx");        
}

}

QdbPoolHandler with a decoder for the return content:

public class QdbPoolHandler extends AbstractChannelPoolHandler {


@Override
public void channelCreated(Channel ch) throws Exception{
    ch.pipeline().addLast(new QdbDecoder());
}

}

Here is what my understanding and my questions

1)when pool.acquire() is called, the pool will do connect

2)after the Future is aqcuired,I add the QdbClientHanlder() to handle message sending and receiving (the QdbClientHandler() will be removed in its own channelRead() method ),am I using it correctly?

3)I fire the request through channel.pipeline().fireChannelActive(), am I using it correctly? Are there any other ways to fire the request? The channel.pipeline().fireChannelActive() reuturns the ChannelPipeline,can I get a future from it?

4)I use the CowntDownLatch to make sure the request finished(the countDownLatch's countdown() method will be called in QdbClientHanlder's channelRead() or exceptionCaught() method),am I using it correctly? Are there any other ways to make sure the request finished?

0

There are 0 best solutions below