Search the Reference Library pages:  

Oracle Streams Demo 2
Change Data Capture
Version 10.2
 
SALARY_HISTORY and CDC_DEMO_EMPLOYEES
Setup As SYS - Prepare Database and Instance
CDC_CHANGE_SETS$
CHANGE_SETS
DBA_QUEUE_PUBLISHERS
WRH$_STREAMS_POOL_ADVICE

SELECT process_type, name 
FROM streams$_process_params;
conn / as sysdba

-- *NIX only
define _editor=vi

-- validate Oracle parameters
archive log list
show parameter aq_tm_processes      -- min 3
show parameter compatible           -- must be 10.1.0 or above
show parameter global_names         -- must be TRUE
show parameter job_queue_processes  -- min 2 recommended 4-6
show parameter open_links           -- not less than the default 4
show parameter shared_pool_size     -- must be 0 or at least 200MB
show parameter streams_pool_size    -- min. 480MB (10MB/capture 1MB/apply)
show parameter undo_retention       -- min. 3600 (1 hr.) (900)

-- Examples of altering initialization parameters
alter system set aq_tm_processes=3 scope=BOTH;
alter system set compatible='10.2.0.1.0' scope=SPFILE;
alter system set global_names=TRUE scope=BOTH;
alter system set job_queue_processes=6 scope=BOTH;
alter system set open_links=4 scope=SPFILE;
alter system set streams_pool_size=200M scope=BOTH; -- very slow if making smaller
alter system set undo_retention=3600 scope=BOTH;

/*
JOB_QUEUE_PROCESSES (current value) + 2
PARALLEL_MAX_SERVERS (current value) + (5 * (the number of change sets planned))
PROCESSES (current value) + (7 * (the number of change sets planned))
SESSIONS (current value) + (2 * (the number of change sets planned))
*/

-- Retest parameter after modification

shutdown immediate;

startup mount;

alter database archivelog;

alter database force logging;

alter database add supplemental log data;

alter database open;

archive log list

alter system switch logfile;

archive log list

SELECT table_name
FROM dba_tables
WHERE owner = 'SYS'
AND table_name LIKE 'CDC%$';

desc cdc_system$

SELECT * FROM cdc_system$;
 
Setup As SYS - Create Streams Administrators
conn / as sysdba

CREATE USER cdcadmin
IDENTIFIED BY cdcadmin
DEFAULT TABLESPACE users
TEMPORARY TABLESPACE temp
QUOTA 0 ON system
QUOTA 0 ON sysaux
QUOTA 20M ON users;

GRANT create session TO cdcadmin;
GRANT create table TO cdcadmin;
GRANT create sequence TO cdcadmin;
GRANT create procedure TO cdcadmin;
GRANT select_catalog_role TO cdcadmin;
GRANT execute_catalog_role TO cdcadmin;
GRANT dba TO cdcadmin;

-- privileges for CDC packages
GRANT execute ON dbms_cdc_publish TO cdcadmin;
GRANT execute ON dbms_cdc_subscribe TO cdcadmin;
-- required by this demo: not by CDC
GRANT execute ON dbms_lock TO cdcadmin;

execute dbms_streams_auth.grant_admin_privilege(grantee=>'CDCADMIN');

SELECT username, account_status, created
FROM dba_users
ORDER BY 1;

SELECT *
FROM dba_sys_privs
WHERE grantee = 'CDCADMIN';

SELECT username
FROM dba_users u, streams$_privileged_user s
WHERE u.user_id = s.user#;
 
Prepare Schema Tables for Streams Replication
alter user hr account unlock;

alter user hr identified by hr;

connect hr/hr

desc employees

-- create table with data for Streams CDC demo
CREATE TABLE cdc_demo_employees AS
SELECT * FROM employees;

ALTER TABLE cdc_demo_employees
ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

conn / as sysdba

desc v$database

conn hr/hr

CREATE TABLE salary_history (
employee_id        NUMBER(6) NOT NULL,
job_id             VARCHAR2(10) NOT NULL,
department_id      NUMBER(4),
old_salary         NUMBER(8,2),
new_salary         NUMBER(8,2),
percent_change     NUMBER(4,2),
salary_action_date DATE);

SELECT table_name
FROM user_tables;
 
Instantiate Source Table
conn / as sysdba
dbms_capture_adm.prepare_table_instantiation(
table_name           IN VARCHAR2,
supplemental_logging IN VARCHAR2 DEFAULT 'keys');

exec dbms_capture_adm.prepare_table_instantiation(table_name => 'HR.CDC_DEMO_EMPLOYEES');

SELECT COUNT(*)
FROM dba_capture_prepared_tables;
 
Create Asynchronous HotLog Change Set
conn cdcadmin/cdcadmin

col object_name format a30

SELECT object_name, object_type
FROM user_objects
ORDER BY 2,1;
dbms_cdc_publish.create_change_set(
change_set_name    IN VARCHAR2,
description        IN VARCHAR2 DEFAULT NULL,
change_source_name IN VARCHAR2,
stop_on_ddl        IN CHAR DEFAULT 'N',
begin_date         IN DATE DEFAULT NULL,
end_date           IN DATE DEFAULT NULL);

-- this will take awhile
exec dbms_cdc_publish.create_change_set('CDC_DEMO_SET', 'change set for CDC demo', 'HOTLOG_SOURCE', 'Y', NULL, NULL);

-- here is why
SELECT object_name, object_type
FROM user_objects
ORDER BY 2,1;

SELECT table_name, tablespace_name, iot_type
FROM user_tables;

conn / as sysdba

desc cdc_change_sets$

set linesize 121
col set_name format a20
col capture_name format a20
col queue_name format a20
col queue_table_name format a20

SELECT set_name, capture_name, queue_name, queue_table_name
FROM cdc_change_sets$;

 
Create Change Table
conn cdcadmin/cdcadmin
dbms_cdc_publish.create_change_table(
owner             IN VARCHAR2,
change_table_name IN VARCHAR2,
change_set_name   IN VARCHAR2,
source_schema     IN VARCHAR2,
source_table      IN VARCHAR2,
column_type_list  IN VARCHAR2,
capture_values    IN VARCHAR2, -- BOTH, NEW, OLD
rs_id             IN CHAR,
row_id            IN CHAR,
user_id           IN CHAR,
timestamp         IN CHAR,
object_id         IN CHAR,
source_colmap     IN CHAR,
target_colmap     IN CHAR,
options_string    IN VARCHAR2);

BEGIN
  dbms_cdc_publish.create_change_table('CDCADMIN', 'CDC_DEMO_EMP_CT', 'CDC_DEMO_SET', 'HR', 'CDC_DEMO_EMPLOYEES', 'EMPLOYEE_ID NUMBER(6), FIRST_NAME VARCHAR2(20), LAST_NAME VARCHAR2(25), EMAIL VARCHAR2(25), PHONE_NUMBER VARCHAR2(20), HIRE_DATE DATE, JOB_ID VARCHAR2(10), SALARY NUMBER, COMMISSION_PCT NUMBER, MANAGER_ID NUMBER, DEPARTMENT_ID NUMBER', 'both', 'Y', 'N', 'N', 'N', 'N', 'N', 'Y', NULL);
END;
/

GRANT select ON cdc_demo_emp_ct TO hr;

conn / as sysdba

desc cdc_change_tables$

SELECT change_set_name, source_schema_name, source_table_name
FROM cdc_change_tables$;

 
Enable Capture
conn cdcadmin/cdcadmin

-- Review positional and explicit notation with students
dbms_cdc_publish.alter_change_set(
change_set_name     IN VARCHAR2,
description         IN VARCHAR2 DEFAULT NULL,
remove_description  IN CHAR DEFAULT 'N',
enable_capture      IN CHAR DEFAULT NULL,
recover_after_error IN CHAR DEFAULT NULL,
remove_ddl          IN CHAR DEFAULT NULL,
stop_on_ddl         IN CHAR DEFAULT NULL);

BEGIN
  dbms_cdc_publish.alter_change_set(change_set_name=>'CDC_DEMO_SET', enable_capture=> 'Y');
END;
/

 
Create Subscription
conn hr/hr
dbms_cdc_subscribe.create_subscription(
change_set_name   IN VARCHAR2,
description       IN VARCHAR2,
subscription_name IN VARCHAR2);

BEGIN
  dbms_cdc_subscribe.create_subscription('CDC_DEMO_SET', 'Subx to cdc_demo_employees',
  'CDC_DEMO_EMP_SUB');
END;
/

conn / as sysdba

set linesize 121
col description format a30
col subscription_name format a20
col username format a10

SELECT subscription_name, handle, set_name, username, status, description
from cdc_subscribers$;

 
Create Subscriber View and Activate Subscription
conn hr/hr
dbms_cdc_subscribe.subscribe(
subscription_name IN VARCHAR2,
source_schema     IN VARCHAR2,
source_table      IN VARCHAR2,
column_list       IN VARCHAR2,
subscriber_view   IN VARCHAR2);

BEGIN
  dbms_cdc_subscribe.subscribe('CDC_DEMO_EMP_SUB', 'HR', 'CDC_DEMO_EMPLOYEES',
  'EMPLOYEE_ID, FIRST_NAME, LAST_NAME, EMAIL, PHONE_NUMBER, HIRE_DATE, JOB_ID, SALARY,
   COMMISSION_PCT, MANAGER_ID, DEPARTMENT_ID'
,
  'CDC_DEMO_EMP_SUB_VIEW');
END;
/

dbms_cdc_subscribe.activate_subscription(
subscription_name IN VARCHAR2);

exec dbms_cdc_subscribe.activate_subscription('CDC_DEMO_EMP_SUB');

 
Create Procedure To Populate Salary History Table
conn hr/hr

/* Create a stored procedure to populate the new HR.SALARY_HISTORY table. The procedure extends the subscription window of the CDC_DEMP_EMP_SUB subscription to get the most recent set of source table changes. It uses the subscriber's DEMO_EMP_SUB_VIEW view to scan the changes and insert them into the SALARY_HISTORY table. It then purges the subscription window to indicate that it is finished with the set of changes. */

CREATE OR REPLACE PROCEDURE update_salary_history IS

CURSOR cur IS
SELECT *
FROM (
  SELECT 'I' opt, cscn$, rsid$, employee_id, job_id, department_id, 0 old_salary,
  salary new_salary, commit_timestamp$
  FROM cdc_demo_emp_sub_view
  WHERE operation$ = 'I '
  UNION ALL
  SELECT 'D' opt, cscn$, rsid$, employee_id, job_id, department_id, salary old_salary,
  0 new_salary, commit_timestamp$
  FROM cdc_demo_emp_sub_view
  WHERE operation$ = 'D '
  UNION ALL
  SELECT 'U' opt , v1.cscn$, v1.rsid$, v1.employee_id, v1.job_id, v1.department_id,
  v1.salary old_salary, v2.salary new_salaryi, v1.commit_timestamp$
  FROM cdc_demo_emp_sub_view v1, cdc_demo_emp_sub_view v2
  WHERE v1.operation$ = 'UO' and v2.operation$ = 'UN'
  AND v1.cscn$ = v2.cscn$
  AND v1.rsid$ = v2.rsid$
  AND ABS(v1.salary - v2.salary) > 0)
  ORDER BY cscn$, rsid$;

  percent NUMBER;
BEGIN
  -- Get the next set of changes to the HR.CDC_DEMO_EMPLOYEES source table
  dbms_cdc_subscribe.extend_window(subscription_name => 'CDC_DEMO_EMP_SUB');

  -- Process each change
  FOR row IN cur
  LOOP
    IF row.opt = 'I' THEN
      INSERT INTO salary_history VALUES
      (row.employee_id, row.job_id, row.department_id, 0,
       row.new_salary, NULL, row.commit_timestamp$);
    END IF;

    IF row.opt = 'D' THEN
      INSERT INTO salary_history VALUES
      (row.employee_id, row.job_id, row.department_id, row.old_salary, 0,
       NULL, row.commit_timestamp$);
    END IF;

    IF row.opt = 'U' THEN
      percent := (row.new_salary - row.old_salary) / row.old_salary * 100;
      INSERT INTO salary_history VALUES
      (row.employee_id, row.job_id, row.department_id, row.old_salary,
       row.new_salary, percent, row.commit_timestamp$);
    END IF;
  END LOOP;

  -- Indicate subscriber is finished with this set of changes
  dbms_cdc_subscribe.purge_window(subscription_name => 'CDC_DEMO_EMP_SUB');
END update_salary_history;
/
 
Create Procedure To Wait For Changes
/* Create function CDCADMIN.WAIT_FOR_CHANGES to enable this demo to run predictably. The
asynchronous nature of CDC HotLog mode means that there is a delay for source table changes to appear in the CDC change table and the subscriber view. By default this procedure waits up to 5 minutes for the change table and 1 additional minute for the subscriber view. This can be adjusted if it is insufficient. The caller must specify the name of the change table and the number of rows expected to be in the change table. The caller may also optionally specify a different number of seconds to wait for changes to appear in the change table. */


conn cdcadmin/cdcadmin

CREATE OR REPLACE FUNCTION wait_for_changes (
change_table_name VARCHAR2,      -- name of change table
rowcount          NUMBER,        -- number of rows to wait for
maxwait_seconds   NUMBER := 300) -- maximum time to wait, in seconds
RETURN VARCHAR2 AUTHID CURRENT_USER AS

num_of_rows  NUMBER := 0;        -- number of rows in change table
slept        NUMBER := 0;        -- total time slept
sleep_time   NUMBER := 3;        -- number of seconds to sleep
return_msg   VARCHAR2(500);      -- informational message
query_txt    VARCHAR2(200) := 'SELECT COUNT(*) FROM ' || change_table_name;
keep_waiting BOOLEAN := TRUE;    -- whether to keep waiting

BEGIN
  WHILE (keep_waiting)
  LOOP
    execute immediate query_txt into num_of_rows;

    -- Got expected number of rows
    IF num_of_rows >= rowcount THEN
      keep_waiting := FALSE;
      return_msg := 'Change table ' || change_table_name || ' contains at least ' ||
      rowcount || ' rows.';
      goto DONE;
      -- Reached maximum number of seconds to wait
    ELSIF slept > maxwait_seconds THEN
      return_msg := ' !!! - Timed out while waiting for ' || change_table_name ||
      ' to reach ' || rowcount || ' rows !!! ';
      goto DONE;
    END IF;

    dbms_lock.sleep(sleep_time);
    slept := slept+sleep_time;
  END LOOP;
  <<DONE>>
  -- additional wait time for changes to become available to subscriber view
  dbms_lock.sleep(120);

  RETURN return_msg;
END wait_for_changes;
/
 
DML On Source Table
conn hr/hr

UPDATE cdc_demo_employees SET salary = salary +  500 WHERE job_id = 'SH_CLERK';
UPDATE cdc_demo_employees SET salary = salary + 1000 WHERE job_id = 'ST_CLERK';
UPDATE cdc_demo_employees SET salary = salary + 1500 WHERE job_id = 'PU_CLERK';
COMMIT;

INSERT INTO cdc_demo_employees
(employee_id, first_name, last_name, email, phone_number, hire_date, job_id,
 salary, commission_pct, manager_id, department_id)
VALUES
(207, 'Mary', 'Lee', 'MLEE', '310.234.4590', TO_DATE('10-JAN-2003'), 'SH_CLERK',
4000, NULL, 121, 50);

INSERT INTO cdc_demo_employees
(employee_id, first_name, last_name, email, phone_number, hire_date, job_id,
 salary, commission_pct, manager_id, department_id)
VALUES
(208, 'Karen', 'Prince', 'KPRINCE', '345.444.6756', TO_DATE('10-NOV-2003'), 'SH_CLERK', 3000, NULL, 111, 50);

INSERT INTO cdc_demo_employees
(employee_id, first_name, last_name, email, phone_number, hire_date, job_id,
 salary, commission_pct, manager_id, department_id)
VALUES
(209, 'Frank', 'Gate', 'FGATE', '451.445.5678', TO_DATE('13-NOV-2003'), 'IT_PROG',
8000, NULL, 101, 50);

INSERT INTO cdc_demo_employees
(employee_id, first_name, last_name, email, phone_number, hire_date, job_id,
 salary, commission_pct, manager_id, department_id)
VALUES
(210, 'Paul', 'Jeep', 'PJEEP', '607.345.1112', TO_DATE('28-MAY-2003'), 'IT_PROG',
8000, NULL, 101, 50);

COMMIT;
 
Validate Capture
-- Expecting 94 rows to appear in the change table CDCADMIN.CDC_DEMO_EMP_CT. This first
-- capture may take a few minutes. Later captures should be substantially faster.


conn cdcadmin/cdcadmin

SELECT wait_for_changes('CDCADMIN.CDC_DEMO_EMP_CT', 94, 180) message
FROM dual;
 
Another Test
conn hr/hr

exec update_salary_history;

SELECT employee_id, job_id, department_id, old_salary, new_salary, percent_change
FROM salary_history
ORDER BY 1, 4, 5;

delete from cdc_demo_employees where first_name = 'Mary' and last_name = 'Lee';
delete from cdc_demo_employees where first_name = 'Karen' and last_name = 'Prince';
delete from cdc_demo_employees where first_name = 'Frank' and last_name = 'Gate';
delete from cdc_demo_employees where first_name = 'Paul' and last_name = 'Jeep';
COMMIT;

update cdc_demo_employees set salary = salary + 5000 where job_id = 'AD_VP';
update cdc_demo_employees set salary = salary - 1000 where job_id = 'ST_MAN';
update cdc_demo_employees set salary = salary - 500 where job_id = 'FI_ACCOUNT';
COMMIT;

-- Expecting 122 rows to appear in the change table CDCADMIN.CDC_DEMO_EMP_CT.
-- (94 rows from the first set of DMLs and 28 from the second set)

conn cdcadmin/cdcadmin

SELECT wait_for_changes('CDCADMIN.CDC_DEMO_EMP_CT', 122, 180) message from dual;

conn hr/hr

exec update_salary_history;

SELECT employee_id, job_id, department_id, old_salary, new_salary, percent_change
FROM salary_history
order by 1, 4, 5;
 
Capture Cleanup
conn hr/hr

exec dbms_cdc_subscribe.drop_subscription('CDC_DEMO_EMP_SUB');

conn / as sysdba

-- reverse prepare table instantiation
exec dbms_capture_adm.abort_table_instantiation('HR.CDC_DEMO_EMPLOYEES');

-- drop the change table
exec dbms_cdc_publish.drop_change_table('CDCADMIN', 'CDC_DEMO_EMP_CT', 'Y');

-- drop the change set
exec dbms_cdc_publish.drop_change_set('CDC_DEMO_SET');

conn cdcadmin/cdcadmin

drop function wait_for_changes;

conn hr/hr

drop table salary_history purge;
drop table cdc_demo_employees purge;
drop procedure update_salary_history;

conn / as sysdba

drop user cdcadmin cascade;
 
Related Topics
Database Link
DBMS_CAPTURE_ADM
DBMS_CDC_PUBLISH
DBMS_CDC_SUBSCRIBE
DBMS_STREAMS_AUTH
DBMS_OUTPUT
Export
Import
Streams Demo 1
User
 
   Home |    Search |    Code Library |    Sponsors |    Privacy |    Terms of Use |    Contact Us    © 2003 - 2024 psoug.org
-----