ActiveMQ Artemis replication colocated not working

1.4k Views Asked by At

I have a cluster of ActiveMQ Artemis and config them to replication colocated over network.

Here is two configs of them:

Broker1

<?xml version='1.0'?>
<configuration xmlns="urn:activemq"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:xi="http://www.w3.org/2001/XInclude"
           xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="urn:activemq:core ">

  <name>10.1.1.130</name>


  <persistence-enabled>true</persistence-enabled>

  <!-- this could be ASYNCIO, MAPPED, NIO
       ASYNCIO: Linux Libaio
       MAPPED: mmap files
       NIO: Plain Java Files
   -->
  <journal-type>ASYNCIO</journal-type>

  <paging-directory>data/paging</paging-directory>

  <bindings-directory>data/bindings</bindings-directory>

  <journal-directory>data/journal</journal-directory>

  <large-messages-directory>data/large-messages</large-messages-directory>

  <journal-datasync>true</journal-datasync>

  <journal-min-files>2</journal-min-files>

  <journal-pool-files>10</journal-pool-files>

  <journal-device-block-size>4096</journal-device-block-size>

  <journal-file-size>10M</journal-file-size>

  <journal-buffer-timeout>23940000</journal-buffer-timeout>

  <journal-max-io>1</journal-max-io>

  <!-- how often we are looking for how many bytes are being used on the disk in ms -->
  <disk-scan-period>5000</disk-scan-period>

  <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
       that won't support flow control. -->
  <max-disk-usage>90</max-disk-usage>

  <!-- should the broker detect dead locks and other issues -->
  <critical-analyzer>true</critical-analyzer>

  <critical-analyzer-timeout>120000</critical-analyzer-timeout>

  <critical-analyzer-check-period>60000</critical-analyzer-check-period>

  <critical-analyzer-policy>HALT</critical-analyzer-policy>


 <!-- Clustering configuration -->
  <connectors>
     <connector name="netty-connector">tcp://localhost:61616</connector>
     <!-- connector to the server2 -->
     <connector name="server2-connector">tcp://10.1.1.131:61616</connector>
  </connectors>
  <ha-policy>
     <replication>
        <colocated>
           <backup-request-retries>-1</backup-request-retries>
           <backup-request-retry-interval>2000</backup-request-retry-interval>
           <excludes>
              <connector-ref>server2-connector</connector-ref>
              <connector-ref>netty-connector</connector-ref>
           </excludes>
           <max-backups>1</max-backups>
           <request-backup>true</request-backup>
           <master> 
           </master>
           <slave>
           </slave>
        </colocated>
     </replication>
  </ha-policy>
 <cluster-user>ACTIVEMQ.CLUSTER.ADMIN.USER</cluster-user>
 <cluster-password>123456</cluster-password>
 <cluster-connections>
     <cluster-connection name="my-cluster">
        <connector-ref>netty-connector</connector-ref>
        <retry-interval>500</retry-interval>
        <use-duplicate-detection>true</use-duplicate-detection>
        <message-load-balancing>STRICT</message-load-balancing>
        <max-hops>1</max-hops>
        <static-connectors>
           <connector-ref>server2-connector</connector-ref>
        </static-connectors>
     </cluster-connection>
  </cluster-connections>

  <acceptors>
     <!-- Acceptor for every supported protocol -->
     <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>

     <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
     <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>

     <!-- STOMP Acceptor. -->
     <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>

     <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
     <acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>

     <!-- MQTT Acceptor -->
     <acceptor name="mqtt">tcp://0.0.0.0:4883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>

  </acceptors>


  <security-settings>
     <security-setting match="#">
        <permission type="createNonDurableQueue" roles="amq"/>
        <permission type="deleteNonDurableQueue" roles="amq"/>
        <permission type="createDurableQueue" roles="amq"/>
        <permission type="deleteDurableQueue" roles="amq"/>
        <permission type="createAddress" roles="amq"/>
        <permission type="deleteAddress" roles="amq"/>
        <permission type="consume" roles="amq"/>
        <permission type="browse" roles="amq"/>
        <permission type="send" roles="amq"/>
        <!-- we need this otherwise ./artemis data imp wouldn't work -->
        <permission type="manage" roles="amq"/>
     </security-setting>
  </security-settings>

Broker2:

<?xml version='1.0'?>
<configuration xmlns="urn:activemq"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:xi="http://www.w3.org/2001/XInclude"
           xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

<core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="urn:activemq:core ">


  <name>10.1.1.131</name>


  <persistence-enabled>true</persistence-enabled>

  <!-- this could be ASYNCIO, MAPPED, NIO
       ASYNCIO: Linux Libaio
       MAPPED: mmap files
       NIO: Plain Java Files
   -->
  <journal-type>ASYNCIO</journal-type>

  <paging-directory>data/paging</paging-directory>

  <bindings-directory>data/bindings</bindings-directory>

  <journal-directory>data/journal</journal-directory>

  <large-messages-directory>data/large-messages</large-messages-directory>

  <journal-datasync>true</journal-datasync>

  <journal-min-files>2</journal-min-files>

  <journal-pool-files>10</journal-pool-files>

  <journal-device-block-size>4096</journal-device-block-size>

  <journal-file-size>10M</journal-file-size>

  <journal-buffer-timeout>23940000</journal-buffer-timeout>

  <journal-max-io>1</journal-max-io>

  <!-- how often we are looking for how many bytes are being used on the disk in ms -->
  <disk-scan-period>5000</disk-scan-period>

  <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
       that won't support flow control. -->
  <max-disk-usage>90</max-disk-usage>

  <!-- should the broker detect dead locks and other issues -->
  <critical-analyzer>true</critical-analyzer>

  <critical-analyzer-timeout>120000</critical-analyzer-timeout>

  <critical-analyzer-check-period>60000</critical-analyzer-check-period>

  <critical-analyzer-policy>HALT</critical-analyzer-policy>


 <!-- Clustering configuration -->
  <connectors>
     <connector name="netty-connector">tcp://localhost:61616</connector>
     <!-- connector to the server1 -->
     <connector name="server1-connector">tcp://10.1.1.130:61616</connector>
  </connectors>
  <ha-policy>
     <replication>
        <colocated>
           <backup-request-retries>-1</backup-request-retries>
           <backup-request-retry-interval>2000</backup-request-retry-interval>
           <excludes>
              <connector-ref>server1-connector</connector-ref>
              <connector-ref>netty-connector</connector-ref>
           </excludes>
           <max-backups>1</max-backups>
           <request-backup>true</request-backup>
           <master> 
           </master>
           <slave>
           </slave>
        </colocated>
     </replication>
  </ha-policy>
 <cluster-user>ACTIVEMQ.CLUSTER.ADMIN.USER</cluster-user>
 <cluster-password>123456</cluster-password>
 <cluster-connections>
     <cluster-connection name="my-cluster">
        <connector-ref>netty-connector</connector-ref>
        <retry-interval>500</retry-interval>
        <use-duplicate-detection>true</use-duplicate-detection>
        <message-load-balancing>STRICT</message-load-balancing>
        <max-hops>1</max-hops>
        <static-connectors>
           <connector-ref>server1-connector</connector-ref>
        </static-connectors>
     </cluster-connection>
  </cluster-connections>

  <acceptors>
     <!-- Acceptor for every supported protocol -->
     <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>

     <!-- AMQP Acceptor.  Listens on default AMQP port for AMQP traffic.-->
     <acceptor name="amqp">tcp://0.0.0.0:5672?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpLowCredits=300</acceptor>

     <!-- STOMP Acceptor. -->
     <acceptor name="stomp">tcp://0.0.0.0:61613?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>

     <!-- HornetQ Compatibility Acceptor.  Enables HornetQ Core and STOMP for legacy HornetQ clients. -->
     <acceptor name="hornetq">tcp://0.0.0.0:5445?anycastPrefix=jms.queue.;multicastPrefix=jms.topic.;protocols=HORNETQ,STOMP;useEpoll=true</acceptor>

     <!-- MQTT Acceptor -->
     <acceptor name="mqtt">tcp://0.0.0.0:4883?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=MQTT;useEpoll=true</acceptor>

  </acceptors>


  <security-settings>
     <security-setting match="#">
        <permission type="createNonDurableQueue" roles="amq"/>
        <permission type="deleteNonDurableQueue" roles="amq"/>
        <permission type="createDurableQueue" roles="amq"/>
        <permission type="deleteDurableQueue" roles="amq"/>
        <permission type="createAddress" roles="amq"/>
        <permission type="deleteAddress" roles="amq"/>
        <permission type="consume" roles="amq"/>
        <permission type="browse" roles="amq"/>
        <permission type="send" roles="amq"/>
        <!-- we need this otherwise ./artemis data imp wouldn't work -->
        <permission type="manage" roles="amq"/>
     </security-setting>
  </security-settings>

Here is Srping Boot java Code to send message:

@Configuration
public class ArtemisProducerConfig extends BaseObject {
@Value("${artemis.broker-url}")
private String brokerUrl;

@Bean
public ActiveMQConnectionFactory senderActiveMQConnectionFactory() {
    return new ActiveMQConnectionFactory(brokerUrl);
}

@Bean
public CachingConnectionFactory cachingConnectionFactory() {
    return new CachingConnectionFactory(senderActiveMQConnectionFactory());
}

@Bean
public JmsTemplate jmsTemplate() {
    JmsTemplate template =  new JmsTemplate(cachingConnectionFactory());
    template.setExplicitQosEnabled(true);
    template.setDeliveryPersistent(true);
    return template;
}

}

jmsTemplate.convertAndSend("test.address::test.queue", inputData.getData(), new MessagePostProcessor() {

        @Override
        public Message postProcessMessage(Message message) throws JMSException {
            // TODO Auto-generated method stub
            message.setJMSCorrelationID(inputData.getCorrID());
            return message;
        }
    });

The Messsage is send to Broker1 success, I can view this on the site http://10.1.1.130:8161. But on Broker2 no message available now.I understand that the Messages must be backed up to Broker2 to meet HA.

Can someone help me an example to configure Artemis to replication colocated over network? Thank you!

1

There are 1 best solutions below

10
Justin Bertram On

ActiveMQ Artemis uses a active/passive scheme to achieve high-availability. In the replicated use-case therefore an active, "live" broker has a passive, "slave" broker to which it replicates messages. In a replicated & colocated configuration each JVM actually has 2 brokers (i.e. the two brokers are colocated in the same JVM). One broker is live and the other broker serves as a backup for another broker in the cluster. You won't "see" the messages on the slave until the live broker fails at which point the slave will activate and become the master.

To confirm that replication is happening as expected you can check the log files on the master and the slave. The slave will have something like this first:

INFO  [org.apache.activemq.artemis.core.server] AMQ221109: Apache ActiveMQ Artemis Backup Server version X.X.X [null] started, waiting live to fail before it gets active

Then the live will have something like this:

INFO  [org.apache.activemq.artemis.core.server] AMQ221025: Replication: sending NIOSequentialFile /path/to/data/journal/activemq-data-2.amq (size=10,485,760) to replica.
INFO  [org.apache.activemq.artemis.core.server] AMQ221025: Replication: sending NIOSequentialFile /path/to/data/bindings/activemq-bindings-3.bindings (size=1,048,576) to replica.
INFO  [org.apache.activemq.artemis.core.server] AMQ221025: Replication: sending NIOSequentialFile /path/to/data/bindings/activemq-bindings-2.bindings (size=1,048,576) to replica.
INFO  [org.apache.activemq.artemis.core.server] AMQ221024: Backup server ActiveMQServerImpl::serverUUID=8d7477d0-1518-11ea-abd1-a0afbd82eaba is synchronized with live-server.

Then finally the slave will have:

INFO  [org.apache.activemq.artemis.core.server] AMQ221031: backup announced