I am currently encountering issues in implementing RabbitMQ messaging between my two applications (web/worker). My RabbitMQ service is hosted on CloudAMQP (Heroku addon). However, any @RabbitListener
I declare seems to attempt connecting to localhost
rather than the cloud service.
Upon adding the following component into my worker app:
@Service
public class TaskConsumer {
@RabbitListener(queues = "worker.rpc.requests", containerFactory = "rabbitListenerContainerFactory")
public String fetch(String p) {
return p;
}
}
I encounter the following error:
2021-07-05 14:38:23.006 INFO 18840 --- [ntContainer#0-3] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2021-07-05 14:38:32.145 WARN 18840 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connect
2021-07-05 14:38:32.145 INFO 18840 --- [ntContainer#0-3] o.s.a.r.l.SimpleMessageListenerContainer : Restarting Consumer@32c8d668: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
How may I bind the RabbitListener such that it will connect to the AMQP environment instead? Here is my configuration:
@Configuration
@EnableRabbit
public class RabbitConfig {
protected final String workerQueueName = "worker.rpc.requests";
protected final String routingKeyName = "rpc";
protected final String directExcName = "worker.exchange";
@Bean
public ConnectionFactory connectionFactory() {
final URI ampqUrl;
try {
ampqUrl = new URI(getEnvOrThrow("CLOUDAMQP_URL"));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
final CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setUsername(ampqUrl.getUserInfo().split(":")[0]);
factory.setPassword(ampqUrl.getUserInfo().split(":")[1]);
factory.setHost(ampqUrl.getHost());
factory.setPort(ampqUrl.getPort());
factory.setVirtualHost(ampqUrl.getPath().substring(1));
try {
factory.getRabbitConnectionFactory().setUri(ampqUrl);
} catch (URISyntaxException e) {
e.printStackTrace();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
} catch (KeyManagementException e) {
e.printStackTrace();
}
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
SimpleMessageListenerContainer container = factory
.createListenerContainer();
factory.setConcurrentConsumers(50);
factory.setMaxConcurrentConsumers(100);
container.setStartConsumerMinInterval(3000);
container.setQueues(queue());
factory.setMaxConcurrentConsumers(5);
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.workerQueueName);
template.setDefaultReceiveQueue(this.workerQueueName);
return template;
}
@Bean
public Queue queue() {
return new Queue(this.workerQueueName);
}
@Bean
public DirectExchange direct() {
return new DirectExchange(this.directExcName);
}
@Bean
public Binding binding(DirectExchange direct,
Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1)
.to(direct)
.with(this.routingKeyName);
}
/**
* Required for executing adminstration functions against an AMQP Broker
*/
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
private static String getEnvOrThrow(String name) {
final String env = getenv(name);
if (env == null) {
throw new IllegalStateException("Environment variable [" + name + "] is not set.");
}
return env;
}
}