|
|
|
Search the Reference Library pages: |
|
|
|
Oracle AQ Demo 2
|
Version 11.1 |
Setup As SYS |
CONN / AS SYSDBA
CREATE USER aquser
IDENTIFIED BY aquser
DEFAULT TABLESPACE uwdata
TEMPORARY TABLESPACE temp
QUOTA 0 ON SYSTEM
QUOTA 10M ON uwdata;
GRANT create session TO aquser;
GRANT create procedure TO aquser;
GRANT create session TO aquser;
GRANT create table TO aquser;
GRANT create type TO aquser;
GRANT aq_administrator_role TO aquser;
GRANT EXECUTE ON dbms_aq TO aquser;
GRANT EXECUTE ON dbms_aqadm TO aquser; |
|
Setup As AQUSER |
CONN aquser/aquser
SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;
CREATE SEQUENCE message_seq INCREMENT BY 1 START WITH 1000;
SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;
CREATE TYPE message_type AS OBJECT (
message_id NUMBER(15),
subject VARCHAR2(100),
text VARCHAR2(100),
dollar_value NUMBER(4,2));
/
SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type;
BEGIN
/*
** +----------------------------------------------------------------------------+
** | CREATE QUEUE TABLE |
** | -------------------------------------------------------------------------- |
** | -> (QUEUE_TABLE) - Name of the queue table to create. |
** | -> (QUEUE_PAYLOAD_TYPE) - Name of either the object type or RAW. |
** | |
** | NOTE: The "create_queue_table" procedure will also create the following |
** | items: |
** | |
** | AQ$QUEUE_TABLE_NAME - A read-only view for information on the queue |
** | table. |
** | AQ$QUEUE_TABLE_NAME_E - A default exception queue. |
** | AQ$QUEUE_TABLE_NAME_T - An index for time manager operations. |
** | AQ$QUEUE_TABLE_NAME_I - An index or index-organized table to handle |
** | dequeueing on queues with multiple consumers. |
** | |
** | ADDITIONAL NOTES: |
** | - Payload type can be either RAW or as a custom object type. |
** | - Maximum payload size is 32 KB. |
** | - When a user-defined object type is being used as a payload, the |
** | maximum number of attributes allowed for the object type is 900. |
** | - Messages must be in READY state to be dequeued unless a MSGID value is |
** | specified. Messages can be sorted for dequeue based on msgid or |
** | correlation values. |
** | |
** +----------------------------------------------------------------------------+
*/
dbms_aqadm.create_queue_table (
queue_table => 'aquser.msg_qt',
queue_payload_type => 'aquser.message_type');
/*
** +----------------------------------------------------------------------------+
** | CREATE QUEUE |
** | -------------------------------------------------------------------------- |
** | -> (QUEUE_NAME) - Name of the queue to create and place in the |
** | queue table (below). |
** | -> (QUEUE_TABLE) - Name of the queue table to store queue the (above) |
** | named queue in. |
** | -> (QUEUE_TYPE) - Type of queue to create. The types of queues to |
** | are NORMAL queues and EXCEPTION queues. |
** | -> (MAX_RETRIES) - Used to limit the number of times a dequeue with the |
** | REMOVE mode can be attempted on the message. The |
** | default is 0 which allows no retries. When the |
** | maximum number of retries is reached, the message is |
** | moved to the exception queue. The value is |
** | incremented when the application issues a rollback |
** | after executing the dequeue. |
** | -> (RETRY_DELAY) - Specifies the delay time, in seconds, before the |
** | message is scheduled for processing again after an |
** | application rollback. The default value is 0, which |
** | allows a message to be retried as soon as possible. |
** | If MAX_RETRIES is set to 0, the RETRY_DELAY argument |
** | will have no effect. |
** | -> (RETENTION_TIME) - |
** | -> (DEPENDENCY_TRACKING) - |
** | -> (COMMENT) - Assign a comment to the queue. |
** | -> (AUTO_COMMIT) - If you set the AUTO_COMMIT argument to 'TRUE', the |
** | current transaction, if any, will be committed |
** | before the operation is carried out. This is the |
** | default action. If you set the AUTO_COMMIT argument |
** | to 'FALSE', the operation will be part of the |
** | current transaction and will become persistent only |
** | when the user issues a COMMIT. |
** +----------------------------------------------------------------------------+
*/
dbms_aqadm.create_queue(
queue_name => 'msg_queue',
queue_table => 'aquser.msg_qt',
queue_type => DBMS_AQADM.NORMAL_QUEUE,
max_retries => 0,
retry_delay => 0,
retention_time => 1209600,
dependency_tracking => FALSE,
comment => 'Test Object Type Queue',
auto_commit => FALSE);
-- start queue
dbms_aqadm.start_queue('msg_queue');
END;
/
SELECT object_type, COUNT(*)
FROM user_objects
GROUP BY object_type; |
|
Dequeue User |
CONN aquser/aquser
set serveroutput on
DECLARE
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message aquser.message_type;
BEGIN
/*
** +----------------------------------------------------------------------------------+
** | DEQUEUE OPTIONS |
** | -------------------------------------------------------------------------------- |
** | -> (CONSUMER_NAME) - Name of the application, process or user receiving the |
** | message. Should be NULL for queues not set up to handle |
** | more than one consumer. |
** | -> (DEQUEUE_MODE) - Specifies locks, if any, to be acquired on the message by |
** | the dequeue() process. Can be: |
** | |
** | BROWSE : For "read-only" access similar to that used in |
** | "select" statements. |
** | LOCKED : For the ability to write to the message during |
** | the transaction, similar to a share lock |
** | acquired in a "select for update" statement. |
** | REMOVE : For the ability to read the message, |
** | updating it or deleting it. The message is |
** | retained according to properties set in queue |
** | table creation. |
** | |
** | -> (NAVIGATION) - Determines the position of the message to be retrieved, |
** | the first step in retrieving messages. The second step is |
** | applying search criteria. The navigation variable can have |
** | one of three values: |
** | |
** | NEXT_MESSAGE : Is used for retrieving the next |
** | message available that matches search |
** | criteria. |
** | NEXT_TRANSACTION : Is used to skip remaining messages in |
** | the current transaction group and |
** | retrieve the first message of the |
** | next transaction group. |
** | FIRST_MESSAGE : Used to retrieve the first message |
** | that fits the search criteria, |
** | resetting the position to the |
** | beginning of queue. |
** | -> (VISIBILITY) - Defines visibility of the message within the transaction |
** | of the application dequeueing it. Values are: |
** | |
** | ON_COMMIT : If the message dequeued is part of |
** | the current transaction. |
** | IMMEDIATE : If the message is its own transaction.|
** | |
** | -> (WAIT) - Specifies how long to wait if an attempt is made to |
** | enqueue() a message and there is no message to retrieve. |
** | Values are: |
** | |
** | FOREVER : Wait forever. |
** | NO_WAIT : Do not wait for any message. |
** | num : Number that represents the number of |
** | seconds it will wait. |
** | |
** | -> (MSGID) - The message identifier for the message to be dequeued. If |
** | specified, the message will be dequeued even if expired. |
** | -> (CORRELATION) - The name of the message to be dequeued. |
** | |
** +----------------------------------------------------------------------------------+
*/
dequeue_options.CONSUMER_NAME := NULL;
dequeue_options.DEQUEUE_MODE := DBMS_AQ.REMOVE;
dequeue_options.NAVIGATION := DBMS_AQ.NEXT_MESSAGE;
dequeue_options.VISIBILITY := DBMS_AQ.IMMEDIATE;
dequeue_options.WAIT := DBMS_AQ.FOREVER;
dequeue_options.MSGID := null;
dequeue_options.CORRELATION := 'Jeff, Melody and Alex Message';
dbms_output.put_line('+-----------------+');
dbms_output.put_line('| DEQUEUE OPTIONS |');
dbms_output.put_line('+-----------------+');
dbms_output.put_line(' -> CONSUMER_NAME := ' || NVL(dequeue_options.CONSUMER_NAME,
-999));
dbms_output.put_line(' -> DEQUEUE_MODE := ' || NVL(dequeue_options.DEQUEUE_MODE,
-999));
dbms_output.put_line(' -> NAVIGATION := ' || NVL(dequeue_options.NAVIGATION, -999));
dbms_output.put_line(' -> VISIBILITY := ' || NVL(dequeue_options.VISIBILITY, -999));
dbms_output.put_line(' -> WAIT := ' || NVL(dequeue_options.WAIT, -999));
dbms_output.put_line(' -> MSGID := ' || NVL(dequeue_options.MSGID, '<null>'));
dbms_output.put_line(' -> CORRELATION := ' || NVL(dequeue_options.CORRELATION,
'<null>'));
/*
** +------------------------------------+
** | PRINT MESSAGE PROPERTIES |
** +------------------------------------+
*/
dbms_output.put_line('+--------------------+');
dbms_output.put_line('| MESSAGE PROPERTIES |');
dbms_output.put_line('+--------------------+');
dbms_output.put_line(' -> PRIORITY := ' || NVL(message_properties.PRIORITY, -999));
dbms_output.put_line(' -> DELAY := ' || NVL(message_properties.DELAY, -999));
dbms_output.put_line(' -> EXPIRATION := ' || NVL(message_properties.EXPIRATION, -999));
dbms_output.put_line(' -> CORRELATION := ' || NVL(message_properties.CORRELATION,
-999));
dbms_output.put_line(' -> ATTEMPTS := ' || NVL(message_properties.ATTEMPTS, -999));
dbms_output.put_line(' -> EXCEPTION_QUEUE := ' ||
NVL(message_properties.EXCEPTION_QUEUE, -999));
dbms_output.put_line(' -> ENQUEUE_TIME := ' || message_properties.ENQUEUE_TIME);
dbms_output.put_line(' -> STATE := ' || NVL(message_properties.STATE, -999));
/*
** +------------------------------------+
** | DEQUEUE THE MESSAGE |
** +------------------------------------+
*/
dbms_aq.dequeue(queue_name => 'msg_queue', dequeue_options => dequeue_options,
message_properties => message_properties, payload => message, msgid =>
message_handle);
/*
** +------------------------------------+
** | PRINT THE DEQUEUED MESSAGE PAYLOAD |
** +------------------------------------+
*/
dbms_output.put_line('+-----------------+');
dbms_output.put_line('| MESSAGE PAYLOAD |');
dbms_output.put_line('+-----------------+');
dbms_output.put_line('- Message ID := ' || message.message_id);
dbms_output.put_line('- Subject := ' || message.subject);
dbms_output.put_line('- Message := ' || message.text);
dbms_output.put_line('- Dollar Value := ' || message.dollar_value);
COMMIT;
END;
/ |
|
Enqueue User |
CONN aquser/aquser
set serveroutput on
DECLARE
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
-- subscriber dbms_aq.aq$_recipient_list_t;
message_handle RAW(16);
message aquser.message_type;
message_id NUMBER;
BEGIN
/*
** +----------------------------------------------------------------------------+
** | GET NEXT MESSAGE ID |
** +----------------------------------------------------------------------------+
*/
SELECT message_seq.nextval
INTO message_id
FROM dual;
/*
** +----------------------------------------------------------------------------+
** | ASSIGN MESSAGE |
** +----------------------------------------------------------------------------+
*/
message := message_type(message_id, 'EXAMPLE MESSAGE', 'This is a sample message.', 10.2);
/*
** +----------------------------------------------------------------------------------+
** | ENQUEUE OPTIONS |
** | -------------------------------------------------------------------------------- |
** | -> (VISIBILITY) - Defines transactional behaviour of the queued |
** | request. Can be set to: |
** | |
** | ON_COMMIT : The enqueued message is part of the |
** | current transaction and that the |
** | operation will be complete when the |
** | transaction commits. The default |
** | value is "ON_COMMIT". |
** | IMMEDIATE : the enqueued message is its own |
** | transaction, not part of the current |
** | transaction. |
** | |
** | -> (RELATIVE_MSGID) - Only relevant when "BEFORE" is used in |
** | "sequence_deviation" (below). This variable |
** | defines the message identifier referenced in |
** | "sequence_deviation". |
** | |
** | -> (SEQUENCE_DEVIATION) - Identifies whether the message enqueued should be |
** | dequeued before other messages in the queue. Values |
** | permitted: |
** | |
** | BEFORE : This message should be dequeued |
** | before the message defined by |
** | "RELATIVE_MSGID" (above). |
** | TOP : This message is dequeued before any |
** | other messages. |
** | null : Says this message is dequeued in |
** | regular order. NULL is the default |
** | value. |
** | |
** +----------------------------------------------------------------------------------+
*/
enqueue_options.VISIBILITY := DBMS_AQ.ON_COMMIT;
-- enqueue_options.RELATIVE_MSGID := '02AB9AD2F4859C5';
enqueue_options.SEQUENCE_DEVIATION := null;
dbms_output.put_line('+-----------------+');
dbms_output.put_line('| ENQUEUE OPTIONS |');
dbms_output.put_line('+-----------------+');
dbms_output.put_line(' -> VISIBILITY := ' || NVL(enqueue_options.VISIBILITY, -999));
--dbms_output.put_line(' -> RELATIVE_MSGID := '
||NVL(enqueue_options.RELATIVE_MSGID,'<null>'));
dbms_output.put_line(' -> SEQUENCE_DEVIATION:='
||NVL(enqueue_options.SEQUENCE_DEVIATION, -999));
/*
** +----------------------------------------------------------------------------------+
** | MESSAGE PROPERTIES |
** | -------------------------------------------------------------------------------- |
** | -> (PRIORITY) - Specifies the prioriy of the message numerically. Both |
** | negatives and positives are allowed; the lower the |
** | number, the higher the priority. |
** | |
** | -> (DELAY) - Identifies a delay, in seconds, during which time the |
** | message may not be dequeued. Alternately, "NO_DELAY" |
** | may be specified for this variable. It relies on the |
** | setting of the time manager. |
** | |
** | -> (EXPIRATION) - Defines how long the message is available for |
** | dequeueing, in seconds, after which time the message |
** | expires. Alternately, "NEVER" may be specified for |
** | this variable. |
** | |
** | -> (CORRELATION) - Identifies the message with a name. |
** | |
** | -> (ATTEMPTS) - Number of times other consumers attempted to |
** | dequeue() the message. This is not set at time of |
** | enqueue(). |
** | |
** | -> (RECIPIENT_LIST) - Can be used only for queues allowing multiple consumers. |
** | Default recipients are the subscribers to the queue. |
** | Values for this variable cannot be returned in a |
** | dequeue(). The recipient list can be defined with |
** | variable, of type SYS.AQ$_AGENT, which takes three |
** | variables: name, address and protocol, of datatypes |
** | VARCHAR2, VARCHAR2 and NUMBER, respectively. |
** | |
** | -> (EXCEPTION_QUEUE) - Messages moved to the exception queue after value for |
** | "expire" has passed, or if "attempts" exceeded the |
** | maximum number of attempts allowed for the queue. |
** | |
** | -> (ENQUEUE_TIME) - Set internally by the system as the time "enqueue()"
|
** | deposited the message. |
** | |
** | -> (STATE) - The current state of the message. This has four |
** | possible values: |
** | |
** | WAITING - If the message is still in delay. |
** | READY - If the message can be obtained via |
** | dequeue(). |
** | PROCESSED - If the message is processed and retained. |
** | EXPIRED - If the message moved to the location |
** | defined by exception queue. |
** | |
** +----------------------------------------------------------------------------------+
*/
message_properties.PRIORITY := -5;
message_properties.DELAY := DBMS_AQ.NO_DELAY;
message_properties.EXPIRATION := DBMS_AQ.NEVER;
message_properties.CORRELATION := 'Jeff, Melody and Alex Message';
-- message_properties.ATTEMPTS := (Not set at time of enqueue);
-- subscriber(1) := SYS.AQ$_AGENT('JEFF', null, null);
-- subscriber(2) := SYS.AQ$_AGENT('MELODY', null, null);
-- subscriber(3) := SYS.AQ$_AGENT('ALEX', null, null);
-- message_properties.RECIPIENT_LIST := subscriber;
-- message_properties.EXCEPTION_QUEUE := 'AQ$MSG_QT_E';
-- message_properties.ENQUEUE_TIME := (Not set by user);
-- message_properties.STATE := (Not set by user);
dbms_output.put_line('+--------------------+');
dbms_output.put_line('| MESSAGE PROPERTIES |');
dbms_output.put_line('+--------------------+');
dbms_output.put_line(' -> PRIORITY := ' || NVL(message_properties.PRIORITY, -999));
dbms_output.put_line(' -> DELAY := ' || NVL(message_properties.DELAY, -999));
dbms_output.put_line(' -> EXPIRATION := ' || NVL(message_properties.EXPIRATION, -999));
dbms_output.put_line(' -> CORRELATION := ' || NVL(message_properties.CORRELATION,
-999));
dbms_output.put_line(' -> ATTEMPTS := ' || '(Not set at time of enqueue)' );
dbms_output.put_line(' -> EXCEPTION_QUEUE := ' ||
NVL(message_properties.EXCEPTION_QUEUE, -999));
dbms_output.put_line(' -> ENQUEUE_TIME := ' || '(Not set by user)' );
dbms_output.put_line(' -> STATE := ' || '(Not set by user)' );
/*
** +------------------------------------+
** | ENQUEUE THE MESSAGE |
** +------------------------------------+
*/
dbms_aq.enqueue(queue_name => 'msg_queue', enqueue_options =>
enqueue_options,
message_properties => message_properties, payload => message, msgid =>
message_handle);
COMMIT;
END;
/ |
|
-----
|