CODE
Oracle Code Library
JOBS
Find Or Post Oracle Jobs
FORUM
Oracle Discussion & Chat
PSOUG Home Code Snippets Oracle Reference Oracle Functions PSOUG Forum Oracle Blogs Bookmark and Share
 
 Search the Reference Library pages:  

Free
Oracle Magazine
Subscriptions
and Oracle White Papers

Oracle AQ Demo 1
Version 11.1
 
Setup As SYS
conn / as sysdba

-- validate Oracle parameters
show parameter aq_tm_processes

show parameter job_queue_processes

ALTER SYSTEM SET job_queue_processes = 10 scope=BOTH;

SELECT owner, queue_name, queue_table, consumer_name
FROM dba_queue_subscribers;

SELECT *
FROM queue_privileges;

-- create AQ administrator
CREATE USER aqadmin
IDENTIFIED BY aqadmin
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 0 ON SYSTEM
QUOTA 20M ON uwdata;

GRANT create session TO aqadmin;
GRANT create procedure TO aqadmin;
GRANT create table TO aqadmin;
GRANT create type TO aqadmin;
GRANT create public synonym TO aqadmin;
GRANT aq_administrator_role TO aqadmin IDENTIFIED BY aqadmin;

GRANT execute ON dbms_aq TO aqadmin;
GRANT execute ON dbms_aqadm TO aqadmin;
GRANT execute ON dbms_lock TO aqadmin;   -- required for demo but not for AQ
GRANT execute on dbms_crypto TO aqadmin; -- required for demo but not for AQ

SELECT username, account_status, created
FROM dba_users
ORDER BY 1;

SELECT *
FROM dba_sys_privs
WHERE grantee = 'AQADMIN';

set linesize 131
col privilege format a15
col owner format a15

SELECT role, owner, table_name, privilege
FROM role_tab_privs
WHERE role = 'AQ_ADMINISTRATOR_ROLE'
ORDER BY 4, 2, 3;

-- create AQ users
CREATE USER pharm_a1
IDENTIFIED BY pharm_a1
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 0 ON SYSTEM
QUOTA 0 ON SYSAUX
QUOTA 5M ON uwdata;

GRANT create session TO pharm_a1;
GRANT create synonym TO pharm_a1;
GRANT execute ON dbms_aq TO pharm_a1;
GRANT execute ON dbms_lock TO pharm_a1;   -- required for demo but not for AQ

CREATE USER icu_a1
IDENTIFIED BY icu_a1
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 0 ON SYSTEM
QUOTA 0 ON SYSAUX
QUOTA 5M ON uwdata;

GRANT create session TO icu_a1;
GRANT create synonym TO icu_a1;
GRANT execute ON dbms_aq TO icu_a1;
GRANT execute on dbms_crypto TO icu_a1;  -- required for demo but not for AQ
GRANT execute ON dbms_lock TO icu_a1;    -- required for demo but not for AQ

CREATE USER nurse_a1
IDENTIFIED BY nurse_a1
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 0 ON SYSTEM
QUOTA 0 ON SYSAUX
QUOTA 5M ON uwdata;

GRANT create session TO nurse_a1;
GRANT execute ON dbms_aq TO nurse_a1;
GRANT execute on dbms_crypto TO nurse_a1; -- required for demo but not for AQ
GRANT execute ON dbms_lock TO nurse_a1;   -- required for demo but not for AQ
 
Setup As AQADMIN
conn aqadmin/aqadmin

SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;

-- create message user-defined data type
CREATE OR REPLACE TYPE message_t AS OBJECT (
id      NUMBER,
rx      VARCHAR2(30),
source  VARCHAR2(30));
/

GRANT execute ON message_t TO pharm_a1;

SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;

desc message_t

GRANT execute ON message_t TO public;

-- examine message for message content demo
SELECT text
FROM user_source
WHERE name = 'MESSAGE_T'
ORDER BY line;

CREATE TABLE test (
reg_col   VARCHAR2(11),
see_msg   message_t);

desc test

set describe depth all linenum on indent on

desc test

INSERT INTO test
(reg_col, see_msg)
VALUES
('Test Values', message_t(1, 'Thorazine', USER));

SELECT * FROM test;

DROP TABLE test PURGE;
 
Build As AQADMIN
-- table to hold dequeued messages
CREATE TABLE rx_processed_data (
id           NUMBER,
rx           VARCHAR2(30),
source       NUMBER,
processed_by VARCHAR2(30),
dt_processed TIMESTAMP);

SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;

-- create a queue table
/* -- CREATE QUEUE TABLE syntax
dbms_aqadm.create_queue_table(
queue_table        IN VARCHAR2,      -- table's name
queue_payload_type IN VARCHAR2,      -- user defined data type's name
storage_clause     IN VARCHAR2       DEFAULT NULL, -- define pctfree
sort_list          IN VARCHAR2       DEFAULT NULL, -- priority and/or enq_time
multiple_consumers IN BOOLEAN        DEFAULT FALSE,
message_grouping   IN BINARY_INTEGER DEFAULT NONE,
comment            IN VARCHAR2       DEFAULT NULL, -- definer's comments
auto_commit        IN BOOLEAN        DEFAULT TRUE, -- user commit not required
primary_instance   IN BINARY_INTEGER DEFAULT 0,    -- manage queue in primary
secondary_instance IN BINARY_INTEGER DEFAULT 0,    -- RAC failover if possible
compatible         IN VARCHAR2       DEFAULT NULL, -- lowest compatible version
non_repudiation    IN BINARY_INTEGER DEFAULT 0,
secure             IN BOOLEAN        DEFAULT FALSE);
*/

exec dbms_aqadm.create_queue_table(
     queue_table => 'rx_queue_table',
     queue_payload_type => 'message_t',
     storage_clause => 'PCTFREE 0 PCTUSED 99',
     sort_list => 'ENQ_TIME',
     multiple_consumers => TRUE,
     comment => 'Pharmacy queue table',
     compatible => '10.0',
     secure => FALSE);

SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;

col object_name format a30

SELECT o.object_name, o.object_type, i.table_name
FROM user_objects o,user_indexes i
WHERE o.object_name = i.table_name (+)
ORDER BY 2, 1;

set linesize 121
col evaluation_function format a30
col evaluation_context_comment format a30
col table_name format a30
col user_comment format a30

-- examine evaluation context
SELECT * FROM user_evaluation_contexts;

-- examine evaluation context table
SELECT * FROM user_evaluation_context_tables;

-- examine queues
SELECT name, queue_table, user_comment
FROM user_queues;

-- examine sequence
SELECT sequence_name, min_value, max_value,
increment_by, cycle_flag, order_flag, cache_size
FROM user_sequences;

set describe depth 1

-- examine tables
desc RX_QUEUE_TABLE

set describe depth all

desc RX_QUEUE_TABLE

desc AQ$_RX_QUEUE_TABLE_G
desc AQ$_RX_QUEUE_TABLE_H
desc AQ$_RX_QUEUE_TABLE_I
desc AQ$_RX_QUEUE_TABLE_S
desc AQ$_RX_QUEUE_TABLE_T
desc RX_PROCESSED_DATA

SELECT table_name, iot_name
FROM user_tables;

-- examine views
desc AQ$RX_QUEUE_TABLE
desc AQ$RX_QUEUE_TABLE_S

-- create the rx_queue using the rx_queue_table

/* -- CREATE QUEUE syntax
dbms_aqadm.create_queue (
queue_name          IN VARCHAR2,      -- queue's name
queue_table         IN VARCHAR2,      -- previously defined queue table
queue_type          IN BINARY_INTEGER DEFAULT NORMAL_QUEUE, -- Normal or Exception
max_retries         IN NUMBER         DEFAULT NULL,  -- default is 2**31-1
retry_delay         IN NUMBER         DEFAULT 0,     -- in seconds
retention_time      IN NUMBER         DEFAULT 0,
dependency_tracking IN BOOLEAN        DEFAULT FALSE, -- must be FALSE: the default
comment             IN VARCHAR2       DEFAULT NULL,  -- definer's comment
auto_commit         IN BOOLEAN        DEFAULT TRUE);
*/

exec dbms_aqadm.create_queue(
     queue_name => 'rx_queue',
     queue_table => 'rx_queue_table',
     queue_type => dbms_aqadm.NORMAL_QUEUE,
     max_retries => 3,
     retry_delay => 0,
     comment => 'Prescription Queue');

SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type
ORDER BY 1;
-- note QUEUE and RULE SET creation plus new views

SELECT name, queue_table, user_comment
FROM user_queues;

SELECT rule_set_name, rule_set_eval_context_owner, rule_set_eval_context_name
FROM user_rule_sets;

set long 10000
set pagesize 0

SELECT view_name, text
FROM user_views;

SELECT name, enqueue_enabled, dequeue_enabled
FROM user_queues;

set pagesize 20


-- Start rx queue rx_queue

/* -- CREATE START_QUEUE syntax
dbms_aqadm.start_queue (
queue_name IN VARCHAR2,
enqueue    IN BOOLEAN DEFAULT TRUE,
dequeue    IN BOOLEAN DEFAULT TRUE);
*/

SELECT name, enqueue_enabled, dequeue_enabled
FROM user_queues;

exec dbms_aqadm.start_queue(queue_name => 'RX_QUEUE');

SELECT name, enqueue_enabled, dequeue_enabled
FROM user_queues;


-- create queue subscribers

/* -- GRANT QUEUE PRIVILEGE syntax
dbms_aqadm.grant_queue_privilege (
privilege    IN VARCHAR2,
queue_name   IN VARCHAR2,
grantee      IN VARCHAR2,
grant_option IN BOOLEAN DEFAULT FALSE);
*/

BEGIN
  dbms_aqadm.grant_queue_privilege('DEQUEUE', 'RX_QUEUE', 'pharm_a1', FALSE);
  dbms_aqadm.grant_queue_privilege('ENQUEUE', 'RX_QUEUE', 'pharm_a1', FALSE);
  dbms_aqadm.grant_queue_privilege('ENQUEUE', 'RX_QUEUE', 'icu_a1', FALSE);
  dbms_aqadm.grant_queue_privilege('ENQUEUE', 'RX_QUEUE', 'nurse_a1', FALSE);
END;
/

-- where is this privilege information stored?

-- create queue subscribers

/* -- AQ$_AGENT TYPE definition
TYPE aq$_agent AS OBJECT (
name     VARCHAR2(30),      -- name of message producer or consumer
address  VARCHAR2(1024),    -- Protocol-specific address of the recipient.
                            -- Must be in the form [schema.]queue[@dblink].
protocol NUMBER DEFAULT 0); -- must be 0, other values for internal use only
*/

/* -- ADD SUBSCRIBER syntax
dbms_aqadm.add_subscriber(
queue_name     IN VARCHAR2,              -- name of queue
subscriber     IN sys.aq$_agent,         -- name, address and, protocol
rule           IN VARCHAR2 DEFAULT NULL, -- conditional / similar to WHERE clause
transformation IN VARCHAR2 DEFAULT NULL  -- message transformation rule
queue_to_queue IN BOOLEAN DEFAULT FALSE, -- TRUE indicates queue-to-queue messaging
delivery_mode  IN PLS_INTEGER DEFAULT dbms_aqadm.persistent); -- BUFFERED,
                                         -- PERSISTENT_OR_BUFFERED, or PERSISTENT:
                                         -- Not modifiable by ALTER_SUBSCRIBER
*/

DECLARE
 subsc_t     sys.aq$_agent;
 subsc_addr  VARCHAR2(1024) := 'AQADMIN.RX_QUEUE';
BEGIN
  subsc_t := sys.aq$_agent('pharm_a1', subsc_addr, 0);
  dbms_aqadm.add_subscriber('rx_queue', subsc_t);

  subsc_t := sys.aq$_agent('icu_a1', subsc_addr, 0);
  dbms_aqadm.add_subscriber('rx_queue', subsc_t);

  subsc_t := sys.aq$_agent('nurse_a1', subsc_addr, 0);
  dbms_aqadm.add_subscriber('rx_queue', subsc_t, 'priority > 10');
END;
/

-- create propagation schedule

/* -- CREATE SCHEDULE PROPAGATION syntax
dbms_aqadm.schedule_propagation (
queue_name        IN VARCHAR2,                 -- name of queue
destination       IN VARCHAR2 DEFAULT NULL,    -- destination database link
start_time        IN DATE     DEFAULT SYSDATE, -- initial propagation start time
duration          IN NUMBER   DEFAULT NULL,    -- propagation window in seconds
next_time         IN VARCHAR2 DEFAULT NULL,    -- date-time of next window
latency           IN NUMBER   DEFAULT 60,      -- maximum wait in seconds
destination_queue IN VARCHAR2 DEFAULT NULL);   -- target queue name and db link
*/

exec dbms_aqadm.schedule_propagation(queue_name=>'rx_queue', latency =>0);

/* -- DEQUEUE_OPTIONS_T definition
TYPE DEQUEUE_OPTIONS_T IS RECORD (
consumer_name  VARCHAR2(30) DEFAULT NULL,
dequeue_mode   BINARY_INTEGER DEFAULT REMOVE,
navigation     BINARY_INTEGER DEFAULT NEXT_MESSAGE,
visibility     BINARY_INTEGER DEFAULT ON_COMMIT,
wait           BINARY_INTEGER DEFAULT FOREVER,
msgid          RAW(16) DEFAULT NULL,
correlation    VARCHAR2(128) DEFAULT NULL,
deq_condition  VARCHAR2(4000) DEFAULT NULL,
signature      aq$_sig_prop DEFAULT NULL,
transformation VARCHAR2(60) DEFAULT NULL,
delivery_mode  PLS_INTEGER DEFAULT PERSISTENT);
*/

/* -- MESSAGE_PROPERTIES_T definition
TYPE MESSAGE_PROPERTIES_T IS RECORD (
priority        BINARY_INTEGER DEFAULT 1,
delay           BINARY_INTEGER DEFAULT NO_DELAY,
expiration      BINARY_INTEGER DEFAULT NEVER,
correlation     VARCHAR2(128) DEFAULT NULL,
attempts        BINARY_INTEGER,
recipient_list  aq$_recipient_list_t,
exception_queue VARCHAR2(51) DEFAULT NULL,
enqueue_time    DATE,
state           BINARY_INTEGER,
sender_id       aq$_agent DEFAULT NULL,
original_msgid  RAW(16) DEFAULT NULL);
*/

/* -- AQ$_RECIPIENT_LIST_T definition
TYPE SYS.AQ$_RECIPIENT_LIST_T IS TABLE OF sys.aq$_agent
INDEX BY BINARY_INTEGER;
*/

/* -- DEQUEUE syntax
dbms_aq.dequeue(
queue_name         IN  VARCHAR2,
dequeue_options    IN  dequeue_options_t,
message_properties OUT message_properties_t,
payload            OUT <user_defined_data_type_name>
msgid              OUT RAW);
*/
CREATE TABLE rx_inventory (
pharm_id VARCHAR2(3),
rx_name VARCHAR2(30),
rx_quantity NUMBER(5));

INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Aspirin', 10);

INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Compazine', 10);

INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Prozac', 10);

INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Lipitor', 10);

INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Oxycontin', 10);

INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'aaa', 10);

INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'bbb', 10);

INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'ccc', 10);

INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Tetracycline', 10);

INSERT INTO rx_inventory (pharm_id, rx_name, rx_quantity)
VALUES ('GEN', 'Thorazine', 10);

COMMIT;

SELECT *
FROM rx_inventory;

GRANT SELECT, UPDATE ON rx_inventory TO pharm_a1;
 

-- create procedure to dequeue messages

CREATE OR REPLACE PROCEDURE demo_dequeue(appname VARCHAR2) AUTHID DEFINER IS
 deq_msgid RAW(16);
 dopt      dbms_aq.dequeue_options_t;
 mprop     dbms_aq.message_properties_t;
 payload_t message_t;
 q_on_hand PLS_INTEGER;

 no_messages EXCEPTION;
 pragma exception_init(no_messages, -25228);

 pragma autonomous_transaction;
BEGIN
  dopt.consumer_name := appname;
  dopt.dequeue_mode := dbms_aq.remove;
  dopt.navigation := dbms_aq.first_message;
  dopt.visibility := dbms_aq.immediate;
  dopt.wait := 10;

  -- add functionality with mprop.attempts and mprop.enqueue_time

  dbms_aq.dequeue('aqadmin.rx_queue', dopt, mprop, payload_t, deq_msgid);

  SELECT rx_quantity
  INTO q_on_hand
  FROM rx_inventory
  WHERE rx_name = payload_t.rx;

  IF q_on_hand > 0 THEN
    UPDATE rx_inventory
    SET rx_quantity = rx_quantity - 1
    WHERE rx_name = payload_t.rx;
  ELSE
    NULL;
    -- send back a failure message.
  END IF;

  INSERT INTO rx_processed_data
  (id, rx, source, processed_by, dt_processed)
  VALUES
  (payload_t.id, payload_t.rx, payload_t.source, APPNAME, SYSTIMESTAMP);
  COMMIT;
EXCEPTION
  WHEN no_messages THEN
    dbms_output.put_line('All queue messages processed');
    COMMIT;
END demo_dequeue;
/

CREATE OR REPLACE PUBLIC SYNONYM demo_dequeue FOR aqadmin.demo_dequeue;

GRANT execute ON demo_dequeue TO pharm_a1;

conn pharm_a1/pharm_a1

CREATE SYNONYM demo_dequeue FOR aqadmin.demo_dequeue;

conn aqadmin/aqadmin

-- create procedure to enqueue messages

/* -- ENQUEUE_OPTIONS_T definition
TYPE ENQUEUE_OPTIONS_T IS RECORD (
visibility         BINARY_INTEGER DEFAULT ON_COMMIT,
relative_msgid     RAW(16) DEFAULT NULL,
sequence_deviation BINARY_INTEGER DEFAULT NULL,
transformation     VARCHAR2(60) DEFAULT NULL);
*/

/* -- MESSAGE_PROPERTIES_T definition
TYPE MESSAGE_PROPERTIES_T IS RECORD (
priority        BINARY_INTEGER DEFAULT 1,
delay           BINARY_INTEGER DEFAULT NO_DELAY,
expiration      BINARY_INTEGER DEFAULT NEVER,
correlation     VARCHAR2(128) DEFAULT NULL,
attempts        BINARY_INTEGER,
recipient_list  aq$_recipient_list_t,
exception_queue VARCHAR2(51) DEFAULT NULL,
enqueue_time    DATE,
state           BINARY_INTEGER,
sender_id       aq$_agent DEFAULT NULL,
original_msgid  RAW(16) DEFAULT NULL);
*/

/* -- ENQUEUE syntax
dbms_aq.enqueue(
queue_name         IN  VARCHAR2,
enqueue_options    IN  enqueue_options_t,
message_properties IN  message_properties_t,
payload            IN  <user_defined_data_type>,
msgid              OUT RAW);
*/
 
CREATE OR REPLACE PROCEDURE demo_enqueue(usermsg MESSAGE_T, urgency NUMBER)
AUTHID DEFINER IS

enq_msgid RAW(16);
eopt      dbms_aq.enqueue_options_t;
mprop     dbms_aq.message_properties_t;
aprop     sys.aq$_agent;

pragma autonomous_transaction;

BEGIN
  aprop := sys.aq$_agent(USER, NULL, 0);

  IF urgency < 5 THEN
    eopt.sequence_deviation := dbms_aq.top;
  ELSE
    eopt.sequence_deviation := NULL;
  END IF;

  mprop.priority := urgency;
  mprop.sender_id := aprop;
  IF urgency < 10 THEN
    mprop.delay := dbms_aq.no_delay;
    mprop.expiration := 300;   -- push to exception queue in 5 min.
  ELSE
    mprop.delay := 1;          -- one second delay before sending
    mprop.expiration := 1800;  -- push to exception queue in 30 min.
  END IF;

  dbms_aq.enqueue('aqadmin.rx_queue', eopt, mprop, usermsg, enq_msgid);
  COMMIT;
END demo_enqueue;
/

CREATE OR REPLACE PUBLIC SYNONYM demo_enqueue FOR aqadmin.demo_enqueue;

GRANT execute ON demo_enqueue TO icu_a1;
GRANT execute ON demo_enqueue TO nurse_a1;

conn icu_a1/icu_a1

CREATE SYNONYM demo_enqueue FOR aqadmin.demo_enqueue;

conn nurse_a1/nurse_a1

CREATE SYNONYM demo_enqueue FOR aqadmin.demo_enqueue;

-- order_rx MUST be assign priorities based on who ordered what
Enqueues Message Procedure
conn aqadmin/aqadmin

CREATE OR REPLACE PROCEDURE order_rx IS

TYPE ixbb IS TABLE OF VARCHAR2(30)
INDEX BY BINARY_INTEGER;

TYPE ixbi IS TABLE OF NUMBER(2)
INDEX BY BINARY_INTEGER;

 rxarray  ixbb; -- the name of the rx
 rxurgnt  ixbi; -- the priority value for an rx
 usermsg  aqadmin.message_t;
 x        PLS_INTEGER;
 priority PLS_INTEGER;
 endcnt   PLS_INTEGER;
BEGIN
  rxarray(0):='ccc';
  rxarray(1):='Aspirin';
  rxarray(2):='Prozac';
  rxarray(3):='Lipitor';
  rxarray(4):='Tetracycline';
  rxarray(5):='Thorazine';
  rxarray(6):='Oxycontin';
  rxarray(7):='Compazine';
  rxarray(8):='aaa';
  rxarray(9):='bbb';

  rxurgnt(0) := 1;
  rxurgnt(1) := 3;
  rxurgnt(2) := 5;
  rxurgnt(3) := 11;
  rxurgnt(4) := 19;
  rxurgnt(5) := 20;
  rxurgnt(6) := 32;
  rxurgnt(7) := 89;
  rxurgnt(8) := 42;
  rxurgnt(9) := 66;

  IF USER = 'ICU_A1' THEN
    endcnt := 100;
  ELSE
    endcnt := 30;
  END IF;

  FOR i IN 1..endcnt
  LOOP
    SELECT TO_NUMBER(SUBSTR(dbms_crypto.randominteger,3,1))
    INTO x
    FROM dual;

    SELECT DECODE(x, 6, 2, 4, 5, 7, 9, 99)
    INTO priority
    FROM dual;

    usermsg := aqadmin.message_t(i, rxarray(x), priority);

    IF priority IN (2,5,9) THEN
      demo_enqueue(usermsg, 5);
    ELSE
      demo_enqueue(usermsg, 11);
    END IF;

    IF USER = 'ICU_A1' THEN
      dbms_lock.sleep(3);
    ELSE
      dbms_lock.sleep(1);
    END IF;
  END LOOP;
END order_rx;
/

CREATE PUBLIC SYNONYM order_rx FOR aqadmin.order_rx;

GRANT execute ON order_rx TO icu_a1;
GRANT execute ON order_rx TO nurse_a1;
 
Dequeues Message Procedure
conn aqadmin/aqadmin

CREATE OR REPLACE PROCEDURE get_rx_order AUTHID DEFINER IS
 qlist dbms_aq.aq$_agent_list_t;
 agent_w_msg sys.aq$_agent;
 listen_timeout EXCEPTION;
 pragma exception_init(listen_timeout, -25254);
BEGIN
  qlist(0) := sys.aq$_agent(USER, 'AQADMIN.RX_QUEUE', NULL);
  /* if retrieving message for multiple users simultaneously example
  qlist(0) := sys.aq$_agent('GenPharm', 'AQADMIN.RX_QUEUE', NULL);
  qlist(1) := sys.aq$_agent('ICUPharm', 'AQADMIN.RX_QUEUE', NULL);
  */


  LOOP
    BEGIN
      dbms_aq.listen(agent_list => qlist, wait => 15, agent => agent_w_msg);
      IF agent_w_msg.name = USER THEN
        demo_dequeue(USER);
      ELSIF agent_w_msg.name = 'ICUPHARM' THEN
        demo_dequeue('ICUPharm');
      END IF;
    EXCEPTION
      WHEN listen_timeout THEN
        EXIT;
    END;
  END LOOP;
END get_rx_order;
/

CREATE PUBLIC SYNONYM get_rx_order FOR aqadmin.get_rx_order;

GRANT execute ON get_rx_order TO pharm_a1;
 
To Run Demo
The following requires that you simultaneously open four SQL*Plus sessions. In the first session log on as the ICU and type step 2 but do not execute it. In the second session do the same thing as the Nurses Station, and in the third as the Pharmacy. Again not executing the stored procedure. Then log on as the aqadmin, set up the SQL*Plus environment and execute one of the two queries then enter a slash (to repeat the query) but do not press the <Enter> key.
Step ICU order meds Nurses Stn orders meds

Fill orders

1

conn icu_a1/icu_a1

conn nurse_a1/nurse_a1 conn pharm_a1/pharm_a1
2 exec order_rx exec order_rx exec get_rx_order

conn aqadmin/aqadmin

SELECT COUNT(*) FROM rx_queue_table;

set linesize 131
col rx format a15
col processed_by format a12

SELECT * FROM rx_processed_data ORDER BY id;

SELECT * FROM rx_processed_data ORDER BY dt_processed;


Then, with everything set up ... go to the first window and hit the <Enter> key, then the second, then the third and finally the same in the AQADMIN session. Continue to monitor the AQADMIN session while the ICU and Nurses order medications and the pharmacy fills them.
 
ICU Pharmacy - Dequeues Messages and passes low priority message to the General Pharmacy
conn icupharm/icupharm

set serveroutput on

DECLARE

qlist dbms_aq.aq$_agent_list_t;
agent_w_msg sys.aq$_agent;
start_tx NUMBER;
finish_tx NUMBER;

listen_timeout EXCEPTION;
pragma exception_init(listen_timeout, -25254);

BEGIN
  qlist(0) := sys.aq$_agent('ICUPharm', 'aqadmin.rx_queue', NULL);
  qlist(1) := sys.aq$_agent('GenPharm', 'aqadmin.rx_queue', NULL);

  start_tx := dbms_utility.get_time;
  LOOP
    BEGIN
      finish_tx := dbms_utility.get_time;
      IF finish_tx - start_tx > 1200 THEN
        EXIT;
      END IF;

      dbms_aq.listen(agent_list => qlist, wait => 30, agent => agent_w_msg);

--      IF agent_w_msg.name = 'PROG1' THEN
        demo_dequeue('ICUPHARM');
--      ELSIF agent_w_msg.name = 'PROG2' THEN
--        demo_dequeue('prog2');
--      END IF;
    EXCEPTION
      WHEN listen_timeout THEN
        NULL;
    END;
  END LOOP;
END;
/
 
Cleanup As AQADMIN
conn aqadmin/aqadmin

-- add purge queue_table here
exec dbms_aqadm.purge_queue_tables('RX_QUEUE_TABLE',
queue_table     IN VARCHAR2,
purge_condition IN VARCHAR2,
purge_options   IN aq$_purge_options_t);
/* STOP_QUEUE syntax
dbms_aqadm.stop_queue (
queue_name IN VARCHAR2,
enqueue    IN BOOLEAN DEFAULT TRUE,
dequeue    IN BOOLEAN DEFAULT TRUE,
wait       IN BOOLEAN DEFAULT TRUE);
*/

/* DROP_QUEUE sytnax
dbms_aqadm.drop_queue (
queue_name  IN VARCHAR2,
auto_commit IN BOOLEAN DEFAULT TRUE);
*/

/* DROP_QUEUE_TABLE syntax
dbms_aqadm.drop_queue_table (
queue_table IN VARCHAR2,
force       IN BOOLEAN DEFAULT FALSE,
auto_commit IN BOOLEAN DEFAULT TRUE);
*/

BEGIN
  -- Stop Queue rx_queue
  dbms_aqadm.stop_queue(queue_name => 'rx_queue', wait => TRUE);
  -- Drop Queue rx_queue
  dbms_aqadm.drop_queue(queue_name => 'rx_queue');
  -- Drop rx Queue Table
  dbms_aqadm.drop_queue_table(queue_table => 'rx_queue_table', force => TRUE);
END;
/

-- Verify queues is dropped
SELECT queue_name
FROM user_queues;

-- Verify queue table is dropped
SELECT table_name
FROM user_tables;

drop user pharm_a1 cascade;
drop user icu_a1 cascade;
drop user nurse_a1 cascade;
drop user aqadmin cascade;

Home      :      Code Library      :      Sponsors      :      Privacy      :      Terms of Use      :      Contact Us [75 users online]    © 2010 psoug.org