I have been stuggling with a configuration using Netty to stream bytes to a ClamAV service. I am running in an Apache Camel route.
Using Netty, I am unable to intercept the "INSTREAM size limit exceeded" message.
INSTREAM It is mandatory to prefix this command with n or z. Scan a stream of data. The stream is sent to clamd in chunks, after INSTREAM, on the same socket on which the command was sent. This avoids the overhead of establishing new TCP connections and problems with NAT. The format of the chunk is: '' where is the size of the following data in bytes expressed as a 4 byte unsigned integer in network byte order and is the actual chunk. Streaming is terminated by sending a zero-length chunk. Note: do not exceed StreamMaxLength as defined in clamd.conf, otherwise clamd will reply with INSTREAM size limit exceeded and close the connection.
Using a straight synchronous socket connection I have no issues. Can anyone point me in the right direction for how I should be using Netty to do this? Or should I just stick with a synchronous socket connection.
Implementation using synchronous sockets. Credit to https://github.com/solita/clamav-java "Antti Virtanen".
private class UseSocket implements Processor{
@Override
public void process(Exchange exchange) throws Exception{
try (BufferedInputStream message = new BufferedInputStream(exchange.getIn().getBody(InputStream.class));
Socket socket = new Socket("localhost", 3310);
BufferedOutputStream socketOutput = new BufferedOutputStream(socket.getOutputStream())){
byte[] command = "zINSTREAM\0".getBytes();
socketOutput.write(command);
socketOutput.flush();
byte[] chunk = new byte[2048];
int chunkSize;
try(BufferedInputStream socketInput = new BufferedInputStream(socket.getInputStream())){
for(chunkSize = message.read(chunk);chunkSize > -1;chunkSize = message.read(chunk)){
socketOutput.write(ByteBuffer.allocate(4).putInt(chunkSize).array());
socketOutput.write(chunk, 0, chunkSize);
socketOutput.flush();
if(processReply(socketInput, exchange)){
return;
}
}
socketOutput.write(ByteBuffer.allocate(4).putInt(0).array());
socketOutput.flush();
processReply(socketInput, exchange);
}
}
}
private boolean processReply(BufferedInputStream in, Exchange exchange) throws Exception{
if(in.available() > 0) {
logger.info("processing reply");
byte[] replyBytes = new byte[256];
int replySize = in.read(replyBytes);
if (replySize > 0) {
String reply = new String(replyBytes, 0, replySize, StandardCharsets.UTF_8);
String avStatus = "infected";
if ("stream: OK\0".equals(reply)) {
avStatus = "clean";
} else if ("INSTREAM size limit exceeded. ERROR\0".equals(reply)) {
avStatus = "overflow";
}
exchange.getIn().setHeader("av-status", avStatus);
return true;
}
}
return false;
}
}
Implementation using Netty with inbound and outbound channel handlers.
private class UseNetty implements Processor{
@Override
public void process(Exchange exchange) throws Exception{
logger.info(CLASS_NAME + ": Creating Netty client");
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
try{
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.remoteAddress(new InetSocketAddress("localhost", 3310));
bootstrap.handler(new ClamAvChannelIntializer(exchange));
ChannelFuture channelFuture = bootstrap.connect().sync();
channelFuture.channel().closeFuture().sync();
}catch(Exception ex) {
logger.error(CLASS_NAME + ": ERROR", ex);
}
finally
{
eventLoopGroup.shutdownGracefully();
logger.info(CLASS_NAME + ": Netty client closed");
}
}
}
public class ClamAvChannelIntializer extends ChannelInitializer<SocketChannel> {
private Exchange exchange;
public ClamAvChannelIntializer(Exchange exchange){
this.exchange = exchange;
}
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ClamAvClientWriter());
socketChannel.pipeline().addLast(new ClamAvClientHandler(exchange));
}
}
public class ClamAvClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
String CLASS_NAME;
Logger logger;
private Exchange exchange;
public static final int MAX_BUFFER = 2048;
public ClamAvClientHandler(Exchange exchange){
super();
CLASS_NAME = this.getClass().getName();
logger = LoggerFactory.getLogger(CLASS_NAME);
this.exchange = exchange;
}
@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception{
logger.info(CLASS_NAME + ": Entering channelActive");
channelHandlerContext.write(exchange);
logger.info(CLASS_NAME + ": Exiting channelActive");
}
@Override
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable cause){
cause.printStackTrace();
channelHandlerContext.close();
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
logger.info(CLASS_NAME + ": Entering channelRead0");
String reply = byteBuf.toString(CharsetUtil.UTF_8);
logger.info(CLASS_NAME + ": Reply = " + reply);
String avStatus = "infected";
if ("stream: OK\0".equals(reply)) {
avStatus = "clean";
} else if ("INSTREAM size limit exceeded. ERROR\0".equals(reply)) {
avStatus = "overflow";
} else{
logger.warn("Infected or unknown reply = " + reply);
}
exchange.getIn().setHeader("av-status", avStatus);
logger.info(CLASS_NAME + ": Exiting channelRead0");
channelHandlerContext.close();
}
}
public class ClamAvClientWriter extends ChannelOutboundHandlerAdapter {
String CLASS_NAME;
Logger logger;
public static final int MAX_BUFFER = 64000;//2^16
public ClamAvClientWriter(){
CLASS_NAME = this.getClass().getName();
logger = LoggerFactory.getLogger(CLASS_NAME);
}
@Override
public void write(ChannelHandlerContext channelHandlerContext, Object o, ChannelPromise channelPromise) throws Exception{
logger.info(CLASS_NAME + ": Entering write");
Exchange exchange = (Exchange)o;
try(BufferedInputStream message = new BufferedInputStream(exchange.getIn().getBody(InputStream.class))){
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer("zINSTREAM\0".getBytes()));
byte[] chunk = new byte[MAX_BUFFER];
for(int i=message.read(chunk);i>-1;i=message.read(chunk)){
byte[] chunkSize = ByteBuffer.allocate(4).putInt(i).array();
channelHandlerContext.write(Unpooled.copiedBuffer(chunkSize));
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer(chunk, 0, i));
}
channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer(ByteBuffer.allocate(4).putInt(0).array()));
}
logger.info(CLASS_NAME + ": Exiting write");
}
}
I finally gave up on trying to use Netty for this. I created a new Camel Processor and packaged the socket stream in it. Code below in case anyone runs into a similar issue.
}