We have programmatically created subscriber to IBM MQ AMQP TOPIC with createDurableSubscriber
by providing clientId
and subscriber name.
We start the program so it subscribes to TOPIC and stop the program. Then send the msgs to topic and again start the receiver program again but we cannot receive the msgs sent and loose the messages which should not happen in case of durable subscription..
We can see amqp topic and its durable subscription when subscriber is connected using mqsc commands DISPLAY TOPIC
, DISPLAY TPSTATUS
, DISPLAY TPSTATUS SUB
, DISPLAY SUB SUBID
but not when subscriber program is stopped. We have defined attribute DEFPSIST(YES)
and client(producer to topic) is sending persistent messages.
Where are the messages gone as we cannot see messages in durable queues of subscriber? Does it depends on expiry attribute?
Output of DISPLAY SUB SUBID
for our subscriber when it is connected.
AMQ8096: WebSphere MQ subscription inquired.
SUBID("hex sub id")
SUB(:private:CLINET01:TOPIC01) TOPICSTR(TOPIC01)
TOPICOBJ(SYSTEM.BASE.TOPIC) DISTYPE(RESOLVED)
DEST(SYSTEM.MANAGED.DURABLE.5F6B5C2524FB9AED)
DESTQMGR(qm.name) PUBAPPID( )
SELECTOR( ) SELTYPE(NONE)
USERDATA(010)
PUBACCT(***************************************************)
DESTCORL(***************************************************)
DESTCLAS(MANAGED) DURABLE(YES)
EXPIRY(0) PSPROP(MSGPROP)
PUBPRTY(ASPUB) REQONLY(NO)
SUBSCOPE(ALL) SUBLEVEL(1)
SUBTYPE(API) VARUSER(FIXED)
WSCHEMA(TOPIC) SUBUSER(mqm)
CRDATE(2020-09-28) CRTIME(04:14:09)
ALTDATE(2020-09-28) ALTTIME(04:14:09)
Subscriber id has private(not sure why) and client id but not subscriber name which is sub4
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Topic;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.lang.String;
import javax.jms.Destination;
import javax.naming.Context;
import org.apache.qpid.jms.JmsConnectionFactory;
import javax.jms.DeliveryMode;
import javax.naming.InitialContext;
import javax.jms.Message;
public class AMQPQueueExample1 implements Runnable {
private static final int DELIVERY_MODE = DeliveryMode.PERSISTENT;
public void run(){
try{
Connection connection = null;
Context context = new InitialContext();
ConnectionFactory connectionFactory = (ConnectionFactory) context.lookup("myFactoryLookup");
connection = connectionFactory.createConnection();
connection.setClientID("123");//("WHATS_MY_PURPOSE3"); // Why do we need clientID while publishing the TOPIC from consumer / publisher
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic priceTopic = (Topic) context.lookup("myTopicLookup1");
MessageConsumer subscriber1 = session.createDurableSubscriber(priceTopic,"sub420"); //"sub3");
System.out.println("TOPIC "+priceTopic);
connection.start();
while(true){
TextMessage message1 = (TextMessage) subscriber1.receive(1000);
if(message1!=null)
System.out.println("Subscriber 1 received : " + message1.getText());
}
}catch(Exception e){
e.printStackTrace();
}
}
public static void main(String[] args) {
AMQPQueueExample1 amp=new AMQPQueueExample1();
Thread thread = new Thread(amp);
thread.start();
}
}
Values are taken from jndi.properties file for context factory and provider url.
An article from Matthew Whitehead "MQ Light messaging from Microsoft®.NET™ (Part 4)" states the following:
I searched and couldn't find a reference on how to set Source Timeout or Source Expiry Policy in Apache QPID, but the linked blog references setting expiry via a administratively defined subscription. Based on the info in your question I think you can just define something like this ahead of time. I have not specified
EXPIRY
because this will pick upEXPIRY(UNLIMITED)
fromSYSTEM.DEFAULT.SUB
:When you then connect your AMQP subscriber it will resume this existing subscription with the expiry set to
UNLIMITED
.