-- validate Oracle parameters
show parameter aq_tm_processes
show parameter job_queue_processes
ALTER SYSTEM SET job_queue_processes = 10 scope=BOTH;
SELECT owner, queue_name, queue_table, consumer_name
FROM dba_queue_subscribers;
SELECT *
FROM queue_privileges;
-- create AQ administrator
CREATE USER aqadmin
IDENTIFIED BY aqadmin
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 0 ON SYSTEM
QUOTA 20M ON uwdata;
GRANT create session TO aqadmin;
GRANT create procedure TO aqadmin;
GRANT create table TO aqadmin;
GRANT create type TO aqadmin;
GRANT create public synonym TO aqadmin;
GRANT aq_administrator_role TO aqadmin IDENTIFIED BY aqadmin;
GRANT execute ON dbms_aq TO aqadmin;
GRANT execute ON dbms_aqadm TO aqadmin;
GRANT execute ON dbms_lock TO aqadmin; -- required for demo but not for AQ
GRANT execute on dbms_crypto TO aqadmin; -- required for demo but not for AQ
SELECT username, account_status, created
FROM dba_users
ORDER BY 1;
SELECT *
FROM dba_sys_privs
WHERE grantee = 'AQADMIN';
set linesize 131
col privilege format a15
col owner format a15
SELECT role, owner, table_name, privilege
FROM role_tab_privs
WHERE role = 'AQ_ADMINISTRATOR_ROLE'
ORDER BY 4, 2, 3;
-- create AQ users
CREATE USER pharm_a1
IDENTIFIED BY pharm_a1
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 0 ON SYSTEM
QUOTA 0 ON SYSAUX
QUOTA 5M ON uwdata;
GRANT create session TO pharm_a1;
GRANT create synonym TO pharm_a1;
GRANT execute ON dbms_aq TO pharm_a1;
GRANT execute ON dbms_lock TO pharm_a1; -- required for demo but not for AQ
CREATE USER icu_a1
IDENTIFIED BY icu_a1
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 0 ON SYSTEM
QUOTA 0 ON SYSAUX
QUOTA 5M ON uwdata;
GRANT create session TO icu_a1;
GRANT create synonym TO icu_a1;
GRANT execute ON dbms_aq TO icu_a1;
GRANT execute on dbms_crypto TO icu_a1; -- required for demo but not for AQ
GRANT execute ON dbms_lock TO icu_a1; -- required for demo but not for AQ
CREATE USER nurse_a1
IDENTIFIED BY nurse_a1
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 0 ON SYSTEM
QUOTA 0 ON SYSAUX
QUOTA 5M ON uwdata;
GRANT create session TO nurse_a1;
GRANT execute ON dbms_aq TO nurse_a1;
GRANT execute on dbms_crypto TO nurse_a1; -- required for demo but not for AQ
GRANT execute ON dbms_lock TO nurse_a1; -- required for demo but not for AQ
Setup As AQADMIN
conn aqadmin/aqadmin
SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;
-- create message user-defined data type
CREATE OR REPLACE TYPE message_t AS OBJECT (
id NUMBER,
rx VARCHAR2(30),
source VARCHAR2(30));
/
GRANT execute ON message_t TO pharm_a1;
SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;
desc message_t
GRANT execute ON message_t TO public;
-- examine message for message content demo
SELECT text
FROM user_source
WHERE name = 'MESSAGE_T'
ORDER BY line;
CREATE TABLE test (
reg_col VARCHAR2(11),
see_msg message_t);
desc test
set describe depth all linenum on indent on
desc test
INSERT INTO test
(reg_col, see_msg)
VALUES
('Test Values', message_t(1, 'Thorazine', USER));
SELECT * FROM test;
DROP TABLE test PURGE;
Build As AQADMIN
-- table to hold dequeued messages
CREATE TABLE rx_processed_data (
id NUMBER,
rx VARCHAR2(30),
source NUMBER,
processed_by VARCHAR2(30),
dt_processed TIMESTAMP);
SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;
-- create a queue table
/* -- CREATE QUEUE TABLE syntax
dbms_aqadm.create_queue_table(
queue_table IN VARCHAR2,
-- table's name
queue_payload_type IN VARCHAR2, -- user defined data
type's name
storage_clause IN VARCHAR2
DEFAULT NULL, -- define pctfree
sort_list IN VARCHAR2
DEFAULT NULL, -- priority and/or enq_time
multiple_consumers IN BOOLEAN DEFAULT FALSE,
message_grouping IN BINARY_INTEGER DEFAULT NONE,
comment IN VARCHAR2
DEFAULT NULL, -- definer's comments
auto_commit IN BOOLEAN
DEFAULT TRUE, -- user commit not required
primary_instance IN BINARY_INTEGER DEFAULT 0, -- manage
queue in primary
secondary_instance IN BINARY_INTEGER DEFAULT 0, -- RAC
failover if possible
compatible IN VARCHAR2
DEFAULT NULL, -- lowest compatible version
non_repudiation IN BINARY_INTEGER DEFAULT 0,
secure IN BOOLEAN
DEFAULT FALSE);
*/
desc AQ
desc AQ
desc AQ
desc AQ
desc AQ
desc RX_PROCESSED_DATA
SELECT table_name, iot_name
FROM user_tables;
-- examine views
desc AQ
desc AQ
-- create the rx_queue using the rx_queue_table
/* -- CREATE QUEUE syntax
dbms_aqadm.create_queue (
queue_name IN VARCHAR2,
-- queue's name
queue_table IN VARCHAR2,
-- previously defined queue table
queue_type IN
BINARY_INTEGER DEFAULT NORMAL_QUEUE, -- Normal or Exception
max_retries IN NUMBER
DEFAULT NULL, -- default is 2**31-1
retry_delay IN NUMBER
DEFAULT 0, -- in seconds
retention_time IN NUMBER
DEFAULT 0,
dependency_tracking IN BOOLEAN
DEFAULT FALSE, -- must be FALSE: the default
comment
IN VARCHAR2 DEFAULT NULL, --
definer's comment
auto_commit IN BOOLEAN
DEFAULT TRUE);
*/
SELECT name, enqueue_enabled, dequeue_enabled
FROM user_queues;
-- create queue subscribers
/* -- GRANT QUEUE PRIVILEGE syntax
dbms_aqadm.grant_queue_privilege (
privilege IN VARCHAR2,
queue_name IN VARCHAR2,
grantee IN VARCHAR2,
grant_option IN BOOLEAN DEFAULT FALSE);
*/
/* -- AQ TYPE definition
TYPE aq AS OBJECT
(
name VARCHAR2(30), --
name of message producer or consumer
address VARCHAR2(1024), -- Protocol-specific address of the recipient.
-- Must be in the form [schema.]queue[@dblink].
protocol NUMBER DEFAULT 0); -- must be 0, other values for internal use only
*/
/* -- ADD SUBSCRIBER syntax
dbms_aqadm.add_subscriber(
queue_name IN VARCHAR2,
-- name of queue
subscriber IN sys.aq,
-- name, address and, protocol
rule IN VARCHAR2 DEFAULT NULL,
-- conditional / similar to WHERE clause
transformation IN VARCHAR2 DEFAULT NULL -- message transformation rule
queue_to_queue IN BOOLEAN DEFAULT FALSE, -- TRUE indicates queue-to-queue messaging
delivery_mode IN PLS_INTEGER DEFAULT dbms_aqadm.persistent);
-- BUFFERED,
-- PERSISTENT_OR_BUFFERED, or PERSISTENT:
-- Not modifiable by ALTER_SUBSCRIBER
*/
/* -- CREATE SCHEDULE PROPAGATION syntax
dbms_aqadm.schedule_propagation (
queue_name IN VARCHAR2,
-- name of queue
destination IN VARCHAR2 DEFAULT NULL,
-- destination database link
start_time IN DATE DEFAULT
SYSDATE,
-- initial propagation start time
duration IN NUMBER DEFAULT NULL,
-- propagation window in seconds
next_time IN VARCHAR2 DEFAULT NULL,
-- date-time of next window
latency IN NUMBER
DEFAULT 60, -- maximum wait in seconds
destination_queue IN VARCHAR2 DEFAULT NULL); -- target queue name
and db link
*/
/* -- MESSAGE_PROPERTIES_T definition
TYPE MESSAGE_PROPERTIES_T IS RECORD (
priority BINARY_INTEGER DEFAULT 1,
delay BINARY_INTEGER DEFAULT
NO_DELAY,
expiration BINARY_INTEGER DEFAULT NEVER,
correlation VARCHAR2(128) DEFAULT NULL,
attempts BINARY_INTEGER,
recipient_list aq,
exception_queue VARCHAR2(51) DEFAULT NULL,
enqueue_time DATE,
state
BINARY_INTEGER,
sender_id aq DEFAULT NULL,
original_msgid RAW(16) DEFAULT NULL);
*/
/* -- AQ definition
TYPE SYS.AQ IS TABLE OF sys.aq
INDEX BY BINARY_INTEGER;
*/
/* -- DEQUEUE syntax
dbms_aq.dequeue(
queue_name IN VARCHAR2,
dequeue_options IN dequeue_options_t,
message_properties OUT message_properties_t,
payload OUT <user_defined_data_type_name>
msgid OUT RAW);
*/
SELECT rx_quantity
INTO q_on_hand
FROM rx_inventory
WHERE rx_name = payload_t.rx;
IF q_on_hand > 0 THEN
UPDATE rx_inventory
SET rx_quantity = rx_quantity - 1
WHERE rx_name = payload_t.rx;
ELSE
NULL;
-- send back a failure message.
END IF;
INSERT INTO rx_processed_data
(id, rx, source, processed_by, dt_processed)
VALUES
(payload_t.id, payload_t.rx, payload_t.source, APPNAME, SYSTIMESTAMP);
COMMIT;
EXCEPTION
WHEN no_messages THEN
dbms_output.put_line('All queue messages processed');
COMMIT;
END demo_dequeue;
/
CREATE OR REPLACE PUBLIC SYNONYM demo_dequeue FOR aqadmin.demo_dequeue;
GRANT execute ON demo_dequeue TO pharm_a1;
conn pharm_a1/pharm_a1
CREATE SYNONYM demo_dequeue FOR aqadmin.demo_dequeue;
conn aqadmin/aqadmin
-- create procedure to enqueue messages
/* -- ENQUEUE_OPTIONS_T definition
TYPE ENQUEUE_OPTIONS_T IS RECORD (
visibility BINARY_INTEGER DEFAULT
ON_COMMIT,
relative_msgid RAW(16) DEFAULT NULL,
sequence_deviation BINARY_INTEGER DEFAULT NULL,
transformation VARCHAR2(60) DEFAULT NULL);
*/
/* -- MESSAGE_PROPERTIES_T definition
TYPE MESSAGE_PROPERTIES_T IS RECORD (
priority BINARY_INTEGER DEFAULT 1,
delay BINARY_INTEGER DEFAULT
NO_DELAY,
expiration BINARY_INTEGER DEFAULT NEVER,
correlation VARCHAR2(128) DEFAULT NULL,
attempts BINARY_INTEGER,
recipient_list aq,
exception_queue VARCHAR2(51) DEFAULT NULL,
enqueue_time DATE,
state BINARY_INTEGER,
sender_id aq DEFAULT NULL,
original_msgid RAW(16) DEFAULT NULL);
*/
/* -- ENQUEUE syntax
dbms_aq.enqueue(
queue_name
IN VARCHAR2,
enqueue_options IN enqueue_options_t,
message_properties IN message_properties_t,
payload IN <user_defined_data_type>,
msgid OUT RAW);
*/
CREATE OR REPLACE PROCEDURE demo_enqueue(usermsg MESSAGE_T, urgency NUMBER)
AUTHID DEFINER IS
IF urgency < 5 THEN
eopt.sequence_deviation := dbms_aq.top;
ELSE
eopt.sequence_deviation := NULL;
END IF;
mprop.priority := urgency;
mprop.sender_id := aprop;
IF urgency < 10 THEN
mprop.delay := dbms_aq.no_delay;
mprop.expiration := 300; -- push to exception
queue in 5 min.
ELSE
mprop.delay :=
1; -- one second delay
before sending
mprop.expiration := 1800; -- push to exception queue
in 30 min.
END IF;
dbms_aq.enqueue('aqadmin.rx_queue', eopt, mprop,
usermsg, enq_msgid);
COMMIT;
END demo_enqueue;
/
CREATE OR REPLACE PUBLIC SYNONYM demo_enqueue FOR aqadmin.demo_enqueue;
GRANT execute ON demo_enqueue TO icu_a1;
GRANT execute ON demo_enqueue TO nurse_a1;
conn icu_a1/icu_a1
CREATE SYNONYM demo_enqueue FOR aqadmin.demo_enqueue;
conn nurse_a1/nurse_a1
CREATE SYNONYM demo_enqueue FOR aqadmin.demo_enqueue;
-- order_rx MUST be assign priorities based on who ordered
what
Enqueues Message Procedure
conn aqadmin/aqadmin
CREATE OR REPLACE PROCEDURE order_rx IS
TYPE ixbb IS TABLE OF VARCHAR2(30)
INDEX BY BINARY_INTEGER;
TYPE ixbi IS TABLE OF NUMBER(2)
INDEX BY BINARY_INTEGER;
rxarray ixbb; -- the name of the rx
rxurgnt ixbi; -- the priority value for an rx
usermsg aqadmin.message_t;
x PLS_INTEGER;
priority PLS_INTEGER;
endcnt PLS_INTEGER;
BEGIN
rxarray(0):='ccc';
rxarray(1):='Aspirin';
rxarray(2):='Prozac';
rxarray(3):='Lipitor';
rxarray(4):='Tetracycline';
rxarray(5):='Thorazine';
rxarray(6):='Oxycontin';
rxarray(7):='Compazine';
rxarray(8):='aaa';
rxarray(9):='bbb';
IF priority IN (2,5,9) THEN
demo_enqueue(usermsg, 5);
ELSE
demo_enqueue(usermsg, 11);
END IF;
IF USER = 'ICU_A1' THEN
dbms_lock.sleep(3);
ELSE
dbms_lock.sleep(1);
END IF;
END LOOP;
END order_rx;
/
CREATE PUBLIC SYNONYM order_rx FOR aqadmin.order_rx;
GRANT execute ON order_rx TO icu_a1;
GRANT execute ON order_rx TO nurse_a1;
Dequeues Message Procedure
conn aqadmin/aqadmin
CREATE OR REPLACE PROCEDURE get_rx_order AUTHID DEFINER IS
qlist dbms_aq.aq;
agent_w_msg sys.aq;
listen_timeout EXCEPTION;
pragma exception_init(listen_timeout, -25254);
BEGIN
qlist(0) := sys.aq(USER,
'AQADMIN.RX_QUEUE',
NULL);
/* if retrieving message for multiple users simultaneously
example
qlist(0) := sys.aq('GenPharm', 'AQADMIN.RX_QUEUE',
NULL);
qlist(1) := sys.aq('ICUPharm', 'AQADMIN.RX_QUEUE',
NULL);
*/
LOOP
BEGIN
dbms_aq.listen(agent_list
=> qlist, wait => 15, agent => agent_w_msg);
IF agent_w_msg.name = USER THEN
demo_dequeue(USER);
ELSIF agent_w_msg.name = 'ICUPHARM' THEN
demo_dequeue('ICUPharm');
END IF;
EXCEPTION
WHEN listen_timeout THEN
EXIT;
END;
END LOOP;
END get_rx_order;
/
CREATE PUBLIC SYNONYM get_rx_order FOR aqadmin.get_rx_order;
GRANT execute ON get_rx_order TO pharm_a1;
To Run Demo
The following requires that you
simultaneously open four SQL*Plus sessions. In the first session log on as the ICU
and type step 2 but do not execute it. In the second session do the same thing as
the Nurses Station, and in the third as the Pharmacy. Again not executing the stored
procedure. Then log on as the aqadmin, set up the SQL*Plus environment and execute
one of the two queries then enter a slash (to repeat the query) but do not press the
<Enter> key.
Step
ICU
order meds
Nurses
Stn orders meds
Fill orders
1
conn icu_a1/icu_a1
conn
nurse_a1/nurse_a1
conn
pharm_a1/pharm_a1
2
exec order_rx
exec order_rx
exec get_rx_order
conn aqadmin/aqadmin
SELECT COUNT(*) FROM rx_queue_table;
set linesize 131
col rx format a15
col processed_by format a12
SELECT * FROM rx_processed_data ORDER BY id;
SELECT * FROM rx_processed_data ORDER BY dt_processed;
Then, with everything set up ... go to the first window and hit the <Enter> key, then
the second, then the third and finally the same in the AQADMIN session. Continue to monitor the AQADMIN session while
the ICU and Nurses order medications and the pharmacy fills them.
ICU Pharmacy - Dequeues
Messages and passes low priority message to the General Pharmacy
-- IF agent_w_msg.name = 'PROG1' THEN
demo_dequeue('ICUPHARM');
-- ELSIF agent_w_msg.name = 'PROG2' THEN
-- demo_dequeue('prog2');
-- END IF;
EXCEPTION
WHEN listen_timeout THEN
NULL;
END;
END LOOP;
END;
/
Cleanup As AQADMIN
conn aqadmin/aqadmin
-- add purge queue_table here
exec dbms_aqadm.purge_queue_tables('RX_QUEUE_TABLE',
queue_table IN VARCHAR2,
purge_condition IN VARCHAR2,
purge_options IN aq);
/* STOP_QUEUE syntax
dbms_aqadm.stop_queue (
queue_name IN VARCHAR2,
enqueue IN BOOLEAN DEFAULT TRUE,
dequeue IN BOOLEAN DEFAULT TRUE,
wait IN BOOLEAN DEFAULT TRUE);
*/
/* DROP_QUEUE sytnax
dbms_aqadm.drop_queue (
queue_name IN VARCHAR2,
auto_commit IN BOOLEAN DEFAULT TRUE);
*/
/* DROP_QUEUE_TABLE syntax
dbms_aqadm.drop_queue_table (
queue_table IN VARCHAR2,
force IN BOOLEAN DEFAULT FALSE,
auto_commit IN BOOLEAN DEFAULT TRUE);
*/
BEGIN
-- Stop Queue rx_queue
dbms_aqadm.stop_queue(queue_name => 'rx_queue',
wait => TRUE);
-- Drop Queue rx_queue
dbms_aqadm.drop_queue(queue_name => 'rx_queue');
-- Drop rx Queue Table
dbms_aqadm.drop_queue_table(queue_table =>
'rx_queue_table', force => TRUE);
END;
/
-- Verify queues is dropped
SELECT queue_name
FROM user_queues;
-- Verify queue table is dropped
SELECT table_name
FROM user_tables;
drop user pharm_a1 cascade;
drop user icu_a1 cascade;
drop user nurse_a1 cascade;
drop user aqadmin cascade;