I am trying to write a mqttserver using netty using below code
public class MqttServer {
private final int port;
public MqttServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
MQTTMessageHandler mqttMessageHandler = new MQTTMessageHandler();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new MqttDecoder());
ch.pipeline().addLast(MqttEncoder.INSTANCE);
ch.pipeline().addLast(mqttMessageHandler);
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true)
.childOption(ChannelOption.SO_KEEPALIVE, true);;
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 1883; // Change the port if needed
new MqttServer(port).run();
}
}
Not below is how I have written mqtthandler which handle publish and subscribe
@ChannelHandler.Sharable
public class MQTTMessageHandler extends SimpleChannelInboundHandler<MqttMessage> {
Map<String, List<Channel>> channelMap = new HashMap<>();
@Override
protected void channelRead0(ChannelHandlerContext ctx, MqttMessage message) throws Exception {
// Handle different types of MQTT messages
switch (message.fixedHeader().messageType()) {
case CONNECT:
handleConnectMessage(ctx, (MqttConnectMessage) message);
break;
case PUBLISH:
handlePublishMessage(ctx, (MqttPublishMessage) message);
break;
case SUBSCRIBE:
handleSubscribeMessage(ctx, (MqttSubscribeMessage) message);
break;
case UNSUBSCRIBE:
handleUnsubscribeMessage(ctx, (MqttUnsubscribeMessage) message);
break;
case DISCONNECT:
handleDisconnectMessage(ctx);
break;
default:
// Handle unsupported message types or other cases
break;
}
}
private void handleConnectMessage(ChannelHandlerContext ctx, MqttConnectMessage msg) {
// Process the CONNECT message
MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK,false, MqttQoS.AT_LEAST_ONCE, false,0);
MqttConnAckVariableHeader mqttConnAckVariableHeader = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, false);
ctx.write(new MqttConnAckMessage(mqttFixedHeader , mqttConnAckVariableHeader));
ctx.flush();
System.out.println("Received CONNECT message");
}
private void handlePublishMessage(ChannelHandlerContext ctx, MqttPublishMessage msg) {
// Process the PUBLISH message
System.out.println("Received PUBLISH message");
String topic = msg.variableHeader().topicName();
byte[] payload = new byte[msg.payload().readableBytes()];
msg.payload().getBytes(0, payload);
if(channelMap.containsKey(topic)){
List<Channel> channels = channelMap.get(topic);
for(Channel channel : channels){
channel.write(msg);
channel.flush();
}
}
}
private void handleSubscribeMessage(ChannelHandlerContext ctx, MqttSubscribeMessage msg) {
System.out.println("Received SUBSCRIBE message");
// Process the SUBSCRIBE message
// Retrieve list of topic subscriptions from the SUBSCRIBE message
List<MqttTopicSubscription> subscriptions = msg.payload().topicSubscriptions();
for (MqttTopicSubscription subscription:
subscriptions) {
String topic = subscription.topicName();
if(channelMap.containsKey(topic)){
channelMap.get(topic).add(ctx.channel());
}
else{
List<Channel> channels = new ArrayList<>();
channels.add(ctx.channel());
channelMap.put(topic, channels);
}
}
// Prepare the SUBACK message with appropriate QoS levels
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.SUBACK,
false,
MqttQoS.AT_LEAST_ONCE,
false,
0
);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(msg.variableHeader().messageId());
MqttSubAckPayload payload = generateSubAckPayload(subscriptions);
// Create the SUBACK message
MqttSubAckMessage subAckMessage = new MqttSubAckMessage(fixedHeader, variableHeader, payload);
// Send the SUBACK message back to the client
ctx.writeAndFlush(subAckMessage);
}
private MqttSubAckPayload generateSubAckPayload(List<MqttTopicSubscription> subscriptions) {
List<Integer> collect = subscriptions.stream().map(s -> s.qualityOfService().value()).collect(Collectors.toList());
MqttSubAckPayload mqttSubAckPayload = new MqttSubAckPayload(collect);
return mqttSubAckPayload;
}
private void handleUnsubscribeMessage(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) {
MqttUnsubscribePayload payload = msg.payload();
MqttFixedHeader mqttFixedHeader = null;
MqttMessageIdVariableHeader variableHeader = null ;
for(String topic : payload.topics()){
if(channelMap.containsKey(topic)){
channelMap.get(topic).remove(ctx.channel());
ctx.writeAndFlush(new MqttUnsubAckMessage(mqttFixedHeader, variableHeader));
}
}
// Process the UNSUBSCRIBE message
System.out.println("Received UNSUBSCRIBE message");
}
private void handleDisconnectMessage(ChannelHandlerContext ctx) {
// Process the DISCONNECT message
System.out.println("Received DISCONNECT message");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Handle exceptions
cause.printStackTrace();
ctx.close();
}
}
Now subscribe is working fine and I am recieving message at client side but when I try to publish a message I get java.nio.channels.CloseChannelException.
I am storing the channel in a map during subscribe method and then writing to same channel while publish message