How to publish a message to a topic on an Oracle AQ message queue from PL/SQL

2.7k Views Asked by At

I know I can publish messages to a given topic using a JMS TopicPublisher from java code to an Oracle AQ queue ( there is a load of doc and samples ) but I really need to also publish messages to a given topic from PL/SQL code to the Qracle AQ queues. I can't find documentation nor examples on how to do that. Is it possible? Can somes one point me the proper documentation or sample location???

Any help is appreciated,

Jan van de klok

2

There are 2 best solutions below

4
On

All information with examples you can find in Streams Advanced Queuing User's Guide

below example was extracted from documentation with minor changes

    CREATE TYPE message_typ AS object(
       sender_id              NUMBER,
       subject                VARCHAR2(30),
       text                   VARCHAR2(1000));
    /


    BEGIN
       DBMS_AQADM.CREATE_QUEUE_TABLE(
          queue_table            => 'obj_qtab',
          queue_payload_type     => 'message_typ',
          multiple_consumers     => true
          );
    END;
    /

    BEGIN
       DBMS_AQADM.CREATE_QUEUE(
          queue_name         =>  'obj_queue',
          queue_table        =>  'obj_qtab');
    END;
    /

    BEGIN
       DBMS_AQADM.START_QUEUE (
          queue_name         => 'obj_queue');
    END;
    /

DECLARE
   subscriber         sys.aq$_agent;
BEGIN
   subscriber     :=  sys.aq$_agent('RED', NULL, NULL);
   DBMS_AQADM.ADD_SUBSCRIBER(
      queue_name  =>  'obj_queue',
      subscriber  =>  subscriber);

   subscriber     :=  sys.aq$_agent('GREEN', NULL, NULL);
   DBMS_AQADM.ADD_SUBSCRIBER(
      queue_name  =>  'obj_queue',
      subscriber  =>  subscriber); 

         subscriber     :=  sys.aq$_agent('YELLOW', NULL, NULL);
   DBMS_AQADM.ADD_SUBSCRIBER(
      queue_name  =>  'obj_queue',
      subscriber  =>  subscriber);

END;
/

Script does, create object, create queue table, create queue, starts queue and registered 3 subscribers.

DECLARE
   enqueue_options     DBMS_AQ.enqueue_options_t;
   message_properties  DBMS_AQ.message_properties_t;
   recipients          DBMS_AQ.aq$_recipient_list_t;
   message_handle      RAW(16);
   message             message_typ;

    recipients2          DBMS_AQ.aq$_recipient_list_t;
BEGIN
   message := message_typ(001, 'MESSAGE 1','For queue subscribers');
   DBMS_AQ.ENQUEUE(
      queue_name          => 'obj_queue',
      enqueue_options     => enqueue_options,
      message_properties  => message_properties,
      payload             => message,
      msgid               => message_handle);
-- Message for all subscriber

   message := message_typ(001, 'MESSAGE 2', 'For two recipients');
   recipients(1) := sys.aq$_agent('RED', NULL, NULL);
   recipients(2) := sys.aq$_agent('BLUE', NULL, NULL);
   message_properties.recipient_list := recipients;
   DBMS_AQ.ENQUEUE(
      queue_name          => 'obj_queue',
      enqueue_options     => enqueue_options,
      message_properties  => message_properties,
      payload             => message,
      msgid               => message_handle);
-- Message for RED,BLUE  subscriber   


   recipients2(1)   := sys.aq$_agent('RED', NULL, NULL);   
   message := message_typ(001, 'MESSAGE 3', 'For RED Subscriber');      
   message_properties.recipient_list := recipients2;
   DBMS_AQ.ENQUEUE(
      queue_name          => 'obj_queue',
      enqueue_options     => enqueue_options,
      message_properties  => message_properties,
      payload             => message,
      msgid               => message_handle);
-- Message for RED  subscriber   
   COMMIT;
END;
/

To check result you have to query queue table view (AQ$queue_table_name).
In this case it is.

select o.user_data,CONSUMER_NAME  from AQ$obj_qtab o order by MSG_ID;
0
On
If your message type in Queue table is SYS.AQ$_JMS_TEXT_MESSAGE. The message text in this example is in JSon format and is mapped to a Java pojo class in some application. This example is for queue and not for topic.
DECLARE
   enqueue_options     DBMS_AQ.enqueue_options_t;
   message_properties  DBMS_AQ.message_properties_t;   
   message_handle      RAW(16);
   message             SYS.AQ$_JMS_TEXT_MESSAGE;
   timeStamp           VARCHAR2(85);
BEGIN
   message := SYS.AQ$_JMS_TEXT_MESSAGE.CONSTRUCT();
   timeStamp := to_char(systimestamp, 'YYYY-MM-DD"T"HH24:mm:ss.FF9"Z"');
   message.SET_TEXT('{"filedName1":"field_value","timeStamp_field":"'||timeStamp||'"}');
   DBMS_AQ.ENQUEUE(
      queue_name          => 'DB_Name.QueueName',
      enqueue_options     => enqueue_options,
      message_properties  => message_properties,
      payload             => message,
      msgid               => message_handle);
COMMIT;
END;