I'm writing a master program for publishing message into a message queue (RabbitMQ). The program is written in Perl 5 and is using AnyEvent::RabbitMQ for the communication to RabbitMQ.
The following minimal example (for the issue I ran into) will fail on a second command send via the same channel with the error "Channel closed".
use strictures 2;
use AnyEvent::RabbitMQ;
main();
############################################################################
sub main {
_log( debug => 'main' );
my $condvar = AnyEvent->condvar;
my $ar = AnyEvent::RabbitMQ->new;
$ar->load_xml_spec;
_log( debug => 'Connecting to RabbitMQ...' );
$ar->connect(
host => 'localhost',
port => 5672,
user => 'guest',
pass => 'guest',
vhost => '/',
timeout => 1,
tls => 0,
on_success => sub { _on_connect_success( $condvar, $ar, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
$condvar->recv;
$ar->close;
return;
}
############################################################################
sub _on_connect_success {
my ( $condvar, $ar, $new_ar ) = @_;
_log( debug => 'Connected to RabbitMQ.' );
_open_channel( $condvar, $new_ar );
return;
}
############################################################################
sub _open_channel {
my ( $condvar, $ar ) = @_;
_log( debug => 'Opening RabbitMQ channel...' );
$ar->open_channel(
on_success => sub { _on_open_channel_success( $condvar, $ar, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_open_channel_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Opened RabbitMQ channel.' );
_declare_queue( $condvar, $ar, $channel );
return;
}
############################################################################
sub _declare_queue {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declaring RabbitMQ queue...' );
$channel->declare_queue(
queue => 'test',
auto_delete => 1,
passive => 0,
durable => 0,
exclusive => 0,
no_ack => 1,
ticket => 0,
on_success =>
sub { _on_declare_queue_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_declare_queue_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Declared RabbitMQ queue.' );
_bind_queue( $condvar, $ar, $channel );
return;
}
############################################################################
sub _bind_queue {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binding RabbitMQ queue...' );
$channel->bind_queue(
queue => 'test',
exchange => '',
routing_key => '',
on_success => sub { _on_bind_queue_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
);
return;
}
############################################################################
sub _on_bind_queue_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => 'Binded RabbitMQ queue.' );
_log( info => 'Master ready to publish messages.' );
_publish_message( $condvar, $ar, $channel, 'Hello, world!' );
return;
}
############################################################################
sub _publish_message {
my ( $condvar, $ar, $channel, $message ) = @_;
_log( debug => "Publishing RabbitMQ message ($message)..." );
$channel->publish(
queue => 'test',
exchange => '',
routing_key => '',
body => $message,
header => {},
mandatory => 0,
immediate => 0,
on_success =>
sub { _on_publish_message_success( $condvar, $ar, $channel, @_ ) },
on_failure => sub { _error( $condvar, $ar, 'failure', @_ ) },
on_read_failure => sub { _error( $condvar, $ar, 'read_failure', @_ ) },
on_return => sub { _error( $condvar, $ar, 'return', @_ ) },
on_close => sub { _error( $condvar, $ar, 'close', @_ ) },
on_ack => sub { _error( $condvar, $ar, 'ack', @_ ) },
on_nack => sub { _error( $condvar, $ar, 'nack', @_ ) },
);
return;
}
############################################################################
sub _on_publish_message_success {
my ( $condvar, $ar, $channel ) = @_;
_log( debug => "Published RabbitMQ message." );
sleep 1;
_publish_message( $condvar, $ar, $channel, 'Hello, world! Again ' . time );
return;
}
############################################################################
sub _error {
my ( $condvar, $ar, $type, @error ) = @_;
_log( error => sprintf '%s - %s', $type, join ', ', @error );
$condvar->send( $condvar, $ar, $type, @error );
return;
}
############################################################################
sub _log {
my ( $level, $message ) = @_;
my @time = gmtime time;
$time[5] += 1900;
$time[4] += 1;
my $time = sprintf '%04d-%02d-%02dT%02d:%02d:%02d+00:00', @time[ 5, 4, 3, 2, 1, 0 ];
my @caller0 = caller(0);
my @caller1 = caller(1);
my $subroutine = $caller1[3];
$subroutine =~ s/^$caller0[0]:://;
print STDERR "$time [$level] $message at $caller0[1] line $caller0[2] ($subroutine; from $caller1[1] line $caller1[2])\n";
return;
}
This program should:
- connect to RabbitMQ
- opens a RabbitMQ channel
- declares a simpe queue (named "test")
- bind to that queue (named "test")
- publish a message ("Hello, world!")
- after successfull publishing the message wait a second and publish another message
This program (master program) should not consume messages. There are other programs out there to do this job.
The minimal example (see above) will produce the following output:
2015-08-12T13:02:07+00:00 [debug] main at minimal.pl line 9 (main; from minimal.pl line 5)
2015-08-12T13:02:07+00:00 [debug] Connecting to RabbitMQ... at minimal.pl line 13 (main; from minimal.pl line 5)
2015-08-12T13:02:07+00:00 [debug] Connected to RabbitMQ. at minimal.pl line 36 (_on_connect_success; from minimal.pl line 22)
2015-08-12T13:02:07+00:00 [debug] Opening RabbitMQ channel... at minimal.pl line 44 (_open_channel; from minimal.pl line 37)
2015-08-12T13:02:07+00:00 [debug] Opened RabbitMQ channel. at minimal.pl line 58 (_on_open_channel_success; from minimal.pl line 46)
2015-08-12T13:02:07+00:00 [debug] Declaring RabbitMQ queue... at minimal.pl line 66 (_declare_queue; from minimal.pl line 59)
2015-08-12T13:02:07+00:00 [debug] Declared RabbitMQ queue. at minimal.pl line 88 (_on_declare_queue_success; from minimal.pl line 76)
2015-08-12T13:02:07+00:00 [debug] Binding RabbitMQ queue... at minimal.pl line 96 (_bind_queue; from minimal.pl line 89)
2015-08-12T13:02:07+00:00 [error] failure - Channel closed at minimal.pl line 155 (_error; from minimal.pl line 102)
2015-08-12T13:02:07+00:00 [error] close - Net::AMQP::Frame::Method=HASH(0x38fe1c8) at minimal.pl line 155 (_error; from minimal.pl line 50)
Why does AnyEvent::RabbitMQ
or RabbitMQ itself closes the channel (not the connection or did I miss something)?
If you take a look at the RabbitMQ server logs you will see something like this:
Apparently it doesn't let you bind a queue on the default exchange. So you need to declare and bind your own exchange first.
Once you've set those subs up, tell your program to use this custom exchange.
The
$channel->confirm
is necessary to make RabbitMQ answer with a confirmation when you send a message to the queue. If you do not do that, the success handler will never get called because there are no success responses coming back.Then in your
_bind_queue
you need to add the exchange to thebind_queue()
call.The same needs to be done in the
_publish_message
with thepublish()
call. There you should also replace theon_ack
handler with something that actually deals with the acknowledgement. I think you intended to do that but had a copy/paste error1.One more thing is that the
sleep
call in_on_publish_message_success
is not a good idea when you are working with AnyEvent as that will stop the whole program. Use anAE::timer
instead.Here is the full code with all the changes.
1) In some places you need to buy your colleagues a beer for those :)