I have doubts about rabbitmq.
I would like to know if when sending a "pub" message, I need to close the connection to the channel and the amqpblib connection, the same with the consumer? Or the correct thing is to leave the connection opened?
I have this to pub and sub:
public produce = async <T>(queue: string, message: T): Promise<boolean> => {
try {
if (!this.connection) await this.start();
await this.initChannel(queue);
const sendResult = this.channel.sendToQueue(queue, Buffer.from(message), {
persistent: true,
});
if (!sendResult) {
await new Promise(resolve => this.channel.once('drain', () => resolve));
}
return sendResult;
} catch (error) {
Logger.info(error.message);
return false;
} finally {
this.close();
}
};
Subscribe
public subscribe = async (
queue: string,
onMessage: (msg: IMessage) => boolean,
): Promise<void> => {
if (!this.connection) await this.start();
const channel = await this.initChannel(queue);
channel.consume(queue, message => {
if (!message) return false;
const body = <IMessage>JSON.parse(message.content.toString());
if (body && onMessage(body)) onMessage(body);
channel.ack(message);
});
};
And this to init connection and listenner events:
private start = async () => {
try {
this.connection = await connect(this.rabbitUrl);
Logger.info('connect to RabbitMQ success');
await this.listeners();
} catch (err) {
Logger.info(err.message);
sleep(this.start, 10000);
}
};
private listeners = async (): Promise<Connection> => {
return (
this.connection.on('error', (err: Error) => {
Logger.info(err.message);
sleep(this.start, 10000);
}) &&
this.connection.on('close', () => {
Logger.info('connection to RabbitQM closed!');
sleep(this.start, 10000);
})
);
};
Creating connection is expensive, channel is lightweight but thread-unsafe in most client.
For Node.js application(single-thread model), it is ideally only have two connections for
publish
andsubscribe
, one channel per connection.So please keep the connection and channel open.