I need to push messages to external rabbitmq. My java configuration successfully declares queue to push, but every time I try to push, I have next exception:
web_1 | com.rabbitmq.client.AlreadyClosedException: connection is already closed due to clean connection shutdown; protocol method: #method<connection.close>(reply-code=200, reply-text=OK, class-id=0, method-id=0)
web_1 | at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:258)
web_1 | at com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:427)
web_1 | at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:710)
web_1 | at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:685)
web_1 | at com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:675)
web_1 | at com.rabbitmq.client.impl.recovery.AutorecoveringChannel.basicPublish(AutorecoveringChannel.java:207)
web_1 | at com.ruddi.logiweb.service.impl.MQServiceImpl.send(MQServiceImpl.java:33)
web_1 | at com.ruddi.logiweb.controller.DriverController.addDriver(DriverController.java:105)
web_1 | at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
web_1 | at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
web_1 | at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
web_1 | at java.base/java.lang.reflect.Method.invoke(Method.java:566)
web_1 | at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:197)
web_1 | at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:141)
web_1 | at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:106)
web_1 | at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:894)
web_1 | at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:808)
web_1 | at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87)
web_1 | at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:1060)
web_1 | at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:962)
web_1 | at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:1006)
web_1 | at org.springframework.web.servlet.FrameworkServlet.doPost(FrameworkServlet.java:909)
web_1 | at javax.servlet.http.HttpServlet.service(HttpServlet.java:652)
web_1 | at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:883)
web_1 | at javax.servlet.http.HttpServlet.service(HttpServlet.java:733)
web_1 | at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:227)
web_1 | at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
web_1 | at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:53)
web_1 | at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
web_1 | at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:320)
web_1 | at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:126)
web_1 | at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:90)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:118)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:137)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:158)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:63)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:200)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:116)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.header.HeaderWriterFilter.doHeadersAfter(HeaderWriterFilter.java:92)
web_1 | at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:77)
web_1 | at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:105)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:56)
web_1 | at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:119)
web_1 | at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:334)
web_1 | at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:215)
web_1 | at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:178)
web_1 | at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:358)
web_1 | at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:271)
web_1 | at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:189)
web_1 | at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:162)
web_1 | at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:202)
web_1 | at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:97)
web_1 | at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:542)
web_1 | at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:143)
web_1 | at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:92)
web_1 | at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:687)
web_1 | at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:78)
web_1 | at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:357)
web_1 | at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:374)
web_1 | at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:65)
web_1 | at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:893)
web_1 | at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1707)
web_1 | at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
web_1 | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
web_1 | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
web_1 | at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
web_1 | at java.base/java.lang.Thread.run(Thread.java:829)
Bean:
@Bean("mqChannel")
public Channel channel() {
ConnectionFactory factory = new ConnectionFactory();
String uri = "amqp://sxtswmgm:[email protected]/sxtswmgm";
try {
factory.setUri(uri);
}
catch (Exception e) {
e.printStackTrace();
}
// factory.setHost("rabbitmq");
// factory.setPort(5672);
// factory.setUsername("guest");
// factory.setPassword("guest");
Channel channel = null;
try (Connection connection = factory.newConnection()){
channel = connection.createChannel();
channel.queueDeclare("logiweb", false, false, false, null);
}
catch(Exception e) {
log.info("EXCEPTION IN BEAN");
e.printStackTrace();
}
return channel;
}
Sending message service class:
@Service
@Slf4j
public class MQServiceImpl implements MQService {
Channel channel;
@Autowired
public MQServiceImpl(
@Qualifier("mqChannel") Channel channel) {
this.channel = channel;
}
/**
* sending message to mq
* @param message message to send
*/
@Override
public void send(String message) {
log.info("send(message) method was called");
try {
channel.basicPublish("", "logiweb", null, message.getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
}
By the way, when I start rabbitmq locally it works perfectly. This problem appears when I use containerized rabbitmq or external cloudamqp
So, one more time, when I run my app I can see new queue in RabbitMQ dashboard (it was declared), but every try to push the message ends with exception.
I'm struggling to understand how that code fits together, but this part strikes me as definitely wrong:
When the
try
block completes, theconnection
resource is going to be auto-closed. However the javadoc forConnection.close()
states:So, the
Channel
that you have created will be closed before thechannel()
method returns it.