CODE
Oracle Code Library
JOBS
Find Or Post Oracle Jobs
FORUM
Oracle Discussion & Chat
PSOUG Home Code Snippets Oracle Reference Oracle Functions PSOUG Forum Oracle Blogs Bookmark and Share
 
 Search the Reference Library pages:  

Free
Oracle Magazine
Subscriptions
and Oracle White Papers

Oracle Change Data Capture Asynchronous Hotlog Demo
Version 11.1
 
Setup As SYS - Prepare Database and Instance
conn / as sysdba

-- *NIX only
define _editor=vi
-- or the editor of your choice

-- validate database parameters
archive log list                    -- Archive Mode
show parameter compatible           -- 11.0 or above
show parameter global_names         -- should be TRUE but not required
show parameter java_pool_size       -- 0 or at least 50000000
show parameter open_links           -- not less than the default 4 (if using dblinks)
show parameter parallel_max_servers -- set to <current_value>+(5*number of planned change sets)
show parameter processes            -- set to <current_value>+(7*number of planned change sets)
show parameter sessions             -- set to <current value>+(2*number of planned change sets)
show parameter shared_pool_size     -- 0 or at least 200MB
show parameter streams_pool_size    -- 0 or <current_size>+(20*number of planned change sets)
show parameter undo_retention       -- minimum 3600 (1 hr.)


-- Examples of altering initialization parameters
alter system set compatible='11.1.0.0.0' scope=SPFILE;
alter system set global_names=TRUE scope=BOTH;
alter system set streams_pool_size=200M scope=BOTH; -- very slow if making smaller
alter system set undo_retention=3600 scope=BOTH;

shutdown immediate;

startup mount;

alter database archivelog;

-- important
alter database force logging;

-- one option among several
alter database add supplemental log data;

alter database open;

-- validate archivelogging
archive log list

alter system switch logfile;

archive log list

-- Retest initalization parameters changes to verify modification to planned values

-- validate force and supplemental logging
col log_min format a7
col log_pk format a6
col log_pk format a6
col log_ui format a6
col log_fk format a6
col log_all format a7
col force_log format a9

SELECT supplemental_log_data_min LOG_MIN, supplemental_log_data_pk LOG_PK, supplemental_log_data_ui LOG_UI, supplemental_log_data_fk LOG_FK, 
supplemental_log_data_all LOG_ALL, force_logging FORCE_LOG
FROM v$database;

SELECT tablespace_name, force_logging
FROM dba_tablespaces;

-- examine CDC related data dictionary objects
SELECT table_name
FROM dba_tables
WHERE owner = 'SYS'
AND table_name LIKE 'CDC%$';

desc cdc_system$

SELECT * FROM cdc_system$;
 
Setup As SYS - Create Streams Administrators
conn / as sysdba

SELECT *
FROM dba_streams_administrator;

CREATE TABLESPACE cdc_tbsp
datafile 'c: emp\cdctbsp01.dbf' SIZE 50M
AUTOEXTEND OFF
BLOCKSIZE 8192
EXTENT MANAGEMENT LOCAL UNIFORM SIZE 64K;

CREATE USER cdcadmin
IDENTIFIED BY cdcadmin
DEFAULT TABLESPACE cdc_tbsp
TEMPORARY TABLESPACE temp
QUOTA UNLIMITED ON cdc_tbsp;

-- system privs
GRANT create session TO cdcadmin;
GRANT create table TO cdcadmin;
GRANT create sequence TO cdcadmin;
GRANT create procedure TO cdcadmin;
GRANT create any job TO cdcadmin;

-- role privs
GRANT execute_catalog_role TO cdcadmin;
GRANT select_catalog_role TO cdcadmin;

-- object privileges
GRANT execute ON dbms_cdc_publish TO cdcadmin;
GRANT execute ON dbms_lock TO cdcadmin;

-- streams specific priv
execute dbms_streams_auth.grant_admin_privilege('CDCADMIN');

SELECT account_status, created
FROM dba_users
WHERE username = 'CDCADMIN';

SELECT *
FROM dba_sys_privs
WHERE grantee = 'CDCADMIN';

SELECT username
FROM dba_users u, streams$_privileged_user s
WHERE u.user_id = s.user#;

col local_privileges format a20
col access_from_remote format a20

SELECT *
FROM dba_streams_administrator;
 
Prepare Schema Tables for CDC Replication
conn / as sysdba

alter user hr account unlock identified by hr;

connect hr/hr

SELECT table_name, reason
FROM all_streams_unsupported
WHERE owner = 'HR'
ORDER BY 1;

desc employees

SELECT *
FROM employees;

-- create CDC demo table
CREATE TABLE cdc_demo2 AS
SELECT * FROM employees;

ALTER TABLE cdc_demo2
ADD CONSTRAINT pk_cdc_demo2
PRIMARY KEY (employee_id)
USING INDEX
PCTFREE 0;

-- a second way to implement supplemental logging
SELECT * FROM user_log_groups;

ALTER TABLE cdc_demo2
ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

SELECT * FROM user_log_groups;

-- table to track salary history changes originating in cdc_demo2
CREATE TABLE salary_history (
employee_id NUMBER(6),
job_id      VARCHAR2(10),
dept_id     NUMBER(4),
old_salary  NUMBER(8,2),
new_salary  NUMBER(8,2),
pct_change  NUMBER(4,2),
action_date DATE);

SELECT object_name, created
FROM user_objects
WHERE object_type = 'TABLE'
ORDER BY 2;
 
Instantiate Source Table
conn cdcadmin/cdcadmin

desc dba_capture_prepared_tables

SELECT table_name, scn, supplemental_log_data_pk PK, supplemental_log_data_ui UI,
supplemental_log_data_fk FK, supplemental_log_data_all "ALL"
FROM dba_capture_prepared_tables;
dbms_capture_adm.prepare_table_instantiation(
table_name           IN VARCHAR2,
supplemental_logging IN VARCHAR2 DEFAULT 'keys');

exec dbms_capture_adm.prepare_table_instantiation('HR.CDC_DEMO2');

SELECT table_name, scn, supplemental_log_data_pk PK, supplemental_log_data_ui UI,
supplemental_log_data_fk FK, supplemental_log_data_all "ALL"
FROM dba_capture_prepared_tables;
 
Create Asynchronous HotLog Change Set
conn cdcadmin/cdcadmin

desc all_change_sets

SELECT set_name, change_source_name, created, ignore_ddl, begin_date, end_date
FROM all_change_sets;
dbms_cdc_publish.create_change_set(
change_set_name    IN VARCHAR2,
description        IN VARCHAR2 DEFAULT NULL,
change_source_name IN VARCHAR2,
stop_on_ddl        IN CHAR DEFAULT 'N',
begin_date         IN DATE DEFAULT NULL,
end_date           IN DATE DEFAULT NULL);

-- this may take awhile don't be impatient
exec dbms_cdc_publish.create_change_set('CDC_DEMO_SET', 'CDC Demo 2 Change Set', 'HOTLOG_SOURCE', 'Y', NULL, NULL);

SELECT object_name, object_type
FROM user_objects
ORDER BY 2,1;

SELECT set_name, change_source_name, created, ignore_ddl, begin_date, end_date
FROM all_change_sets;

conn / as sysdba

SELECT object_name, object_type
FROM dba_objects
WHERE owner = 'SYS'
AND object_name not like 'W%'
AND created > SYSDATE-1;

desc cdc_change_sets$

set linesize 121
col set_name format a20
col capture_name format a20
col queue_name format a20
col queue_table_name format a20

SELECT set_name, capture_name, queue_name, queue_table_name
FROM cdc_change_sets$;

SELECT set_name, change_source_name, capture_enabled, stop_on_ddl, publisher
FROM change_sets;

 
Create Change Table
conn cdcadmin/cdcadmin

SELECT object_name, object_type
FROM user_objects
ORDER BY 2,1;
dbms_cdc_publish.create_change_table(
owner             IN VARCHAR2,
change_table_name IN VARCHAR2,
change_set_name   IN VARCHAR2,
source_schema     IN VARCHAR2,
source_table      IN VARCHAR2,
column_type_list  IN VARCHAR2,
capture_values    IN VARCHAR2, -- BOTH, NEW, OLD
rs_id             IN CHAR,
row_id            IN CHAR,
user_id           IN CHAR,
timestamp         IN CHAR,
object_id         IN CHAR,
source_colmap     IN CHAR,
target_colmap     IN CHAR,
options_string    IN VARCHAR2,
ddl_markers       IN CHAR DEFAULT 'Y');

BEGIN
  dbms_cdc_publish.create_change_table('CDCADMIN', 'CDC_DEMO_CT', 'CDC_DEMO_SET', 'HR', 'CDC_DEMO2', 'EMPLOYEE_ID NUMBER(6), FIRST_NAME VARCHAR2(20), LAST_NAME VARCHAR2(25), EMAIL VARCHAR2(25), PHONE_NUMBER VARCHAR2(20), HIRE_DATE DATE, JOB_ID VARCHAR2(10), SALARY NUMBER, COMMISSION_PCT NUMBER, MANAGER_ID NUMBER, DEPARTMENT_ID NUMBER', 'BOTH', 'Y', 'N', 'N', 'N', 'N', 'N', 'Y', NULL);
END;
/


SELECT object_name, object_type
FROM user_objects
ORDER BY 2,1;

col high_value format a15

SELECT table_name, composite, partition_name, high_value
FROM user_tab_partitions;

GRANT select ON cdc_demo_ct TO hr;

conn / as sysdba

desc cdc_change_tables$

SELECT change_set_name, source_schema_name, source_table_name
FROM cdc_change_tables$;

 
Enable Capture
conn / as sysdba

SELECT set_name, change_source_name, capture_enabled
FROM cdc_change_sets$;

conn cdcadmin/cdcadmin
dbms_cdc_publish.alter_change_set(
change_set_name     IN VARCHAR2,
description         IN VARCHAR2 DEFAULT NULL,
remove_description  IN CHAR DEFAULT 'N',
enable_capture      IN CHAR DEFAULT NULL,
recover_after_error IN CHAR DEFAULT NULL,
remove_ddl          IN CHAR DEFAULT NULL,
stop_on_ddl         IN CHAR DEFAULT NULL);

exec dbms_cdc_publish.alter_change_set(change_set_name=>'CDC_DEMO_SET', enable_capture=> 'Y');

conn / as sysdba

SELECT set_name, change_source_name, capture_enabled
FROM cdc_change_sets$;

SELECT object_name, object_type
FROM dba_objects
WHERE owner = 'SYS'
AND object_name not like 'W%'
AND created > SYSDATE-1;

 
Create Subscription
conn hr/hr

desc user_subscriptions

SELECT set_name, username, subscription_name, created, status
FROM user_subscriptions;
dbms_cdc_subscribe.create_subscription(
change_set_name   IN VARCHAR2,
description       IN VARCHAR2,
subscription_name IN VARCHAR2);

exec dbms_cdc_subscribe.create_subscription('CDC_DEMO_SET', 'cdc_demo subx', 'CDC_DEMO_SUB');

SELECT set_name, username, subscription_name, created, status
FROM user_subscriptions;

conn / as sysdba

set linesize 121
col description format a30
col subscription_name format a20
col username format a10

SELECT subscription_name, handle, set_name, username, earliest_scn, description
FROM cdc_subscribers$;

 
Subscribe to  and Activate Subscription
conn hr/hr

desc user_subscribed_columns

SELECT handle, source_table_name, column_name, subscription_name
FROM user_subscribed_columns;
dbms_cdc_subscribe.subscribe(
subscription_name IN VARCHAR2,
source_schema     IN VARCHAR2,
source_table      IN VARCHAR2,
column_list       IN VARCHAR2,
subscriber_view   IN VARCHAR2);

BEGIN
  dbms_cdc_subscribe.subscribe('CDC_DEMO_SUB', 'HR', 'CDC_DEMO2',
  'EMPLOYEE_ID, FIRST_NAME, LAST_NAME, EMAIL, PHONE_NUMBER, HIRE_DATE, JOB_ID, SALARY,
   COMMISSION_PCT, MANAGER_ID, DEPARTMENT_ID'
,
  'CDC_DEMO_SUB_VIEW');
END;
/

SELECT handle, source_table_name, column_name, subscription_name
FROM user_subscribed_columns;

dbms_cdc_subscribe.activate_subscription(
subscription_name IN VARCHAR2);

exec dbms_cdc_subscribe.activate_subscription('CDC_DEMO_SUB');

SELECT set_name, subscription_name, status
FROM user_subscriptions;

 
Create Procedure To Populate Salary History Table
conn hr/hr

/* Create a stored procedure to populate the new HR.SALARY_HISTORY table. The procedure extends the subscription window of the CDC_DEMP_SUB subscription to get the most recent set of source table changes. It uses the subscriber's DEMO_SUB_VIEW view to scan the changes and insert them into the SALARY_HISTORY table. It then purges the subscription window to indicate that it is finished with that set of changes. */

CREATE OR REPLACE PROCEDURE update_salary_history IS
 CURSOR cur IS
 SELECT *
 FROM (
  SELECT 'I' opt, cscn$, rsid$, employee_id, job_id, department_id, 0 old_salary,
  salary new_salary, commit_timestamp$
  FROM cdc_demo_sub_view
  WHERE operation$ = 'I '
  UNION ALL
  SELECT 'D' opt, cscn$, rsid$, employee_id, job_id, department_id, salary old_salary,
  0 new_salary, commit_timestamp$
  FROM cdc_demo_sub_view
  WHERE operation$ = 'D '
  UNION ALL
  SELECT 'U' opt , v1.cscn$, v1.rsid$, v1.employee_id, v1.job_id, v1.department_id,
  v1.salary old_salary, v2.salary new_salaryi, v1.commit_timestamp$
  FROM cdc_demo_sub_view v1, cdc_demo_sub_view v2
  WHERE v1.operation$ = 'UO' and v2.operation$ = 'UN'
  AND v1.cscn$ = v2.cscn$
  AND v1.rsid$ = v2.rsid$
  AND ABS(v1.salary - v2.salary) > 0)
  ORDER BY cscn$, rsid$;

 percent NUMBER;
BEGIN
  -- get the next set of changes to the HR.CDC_DEMO source table
  dbms_cdc_subscribe.extend_window('CDC_DEMO_SUB');

  -- Process each change
  FOR rec IN cur
  LOOP
    IF rec.opt = 'I' THEN
      INSERT INTO salary_history VALUES
      (rec.employee_id, rec.job_id, rec.department_id, 0,
       rec.new_salary, NULL, rec.commit_timestamp$);
    END IF;

    IF rec.opt = 'D' THEN
      INSERT INTO salary_history VALUES
      (rec.employee_id, rec.job_id, rec.department_id, rec.old_salary, 0,
       NULL, rec.commit_timestamp$);
    END IF;

    IF rec.opt = 'U' THEN
      percent := (rec.new_salary - rec.old_salary) / rec.old_salary * 100;
      INSERT INTO salary_history VALUES
      (rec.employee_id, rec.job_id, rec.department_id, rec.old_salary,
       rec.new_salary, percent, rec.commit_timestamp$);
    END IF;
  END LOOP;

  -- indicate subscriber is finished with this set of changes
  dbms_cdc_subscribe.purge_window('CDC_DEMO_SUB');
END update_salary_history;
/
 
Create Function To Wait For Changes
/* Create function CDCADMIN.WAIT_FOR_CHANGES to enable this demo to run predictably. The asynchronous nature of CDC HotLog mode means that there is a delay for source table changes to appear in the CDC change table and the subscriber view. By default this procedure waits up to 3 minutes for the change table and 1 additional minute for the subscriber view. This can be adjusted if it is insufficient. The caller must specify the name of the change table and the number of rows expected to be in the change table. The caller may also optionally specify a different number of seconds to wait for changes to appear in the change table. */

conn cdcadmin/cdcadmin

CREATE OR REPLACE FUNCTION wait_for_changes (
rowcount        NUMBER,        -- number of rows to wait for
maxwait_seconds NUMBER := 300) -- maximum time to wait, in seconds
RETURN VARCHAR2 AUTHID CURRENT_USER AS
 numrows        NUMBER := 0;       -- number of rows in change table
 slept          NUMBER := 0;       -- total time slept
 sleep_time     NUMBER := 3;       -- number of seconds to sleep
 return_msg     VARCHAR2(100);     -- informational message
 keep_waiting   BOOLEAN := TRUE;   -- whether to keep waiting
BEGIN
  WHILE keep_waiting LOOP
    SELECT COUNT(*)
    INTO numrows
    FROM CDC_DEMO_CT;

    -- Got expected number of rows
    IF numrows >= rowcount THEN
      keep_waiting := FALSE;
      return_msg := 'Change table contains at least ' || TO_CHAR(rowcount) || ' rows';
      EXIT;
      -- Reached maximum number of seconds to wait
    ELSIF slept > maxwait_seconds THEN
      return_msg := ' - Timed out while waiting for the change table to reach ' ||
      TO_CHAR(rowcount) || ' rows';
      EXIT;
    END IF;

    dbms_lock.sleep(sleep_time);
    slept := slept+sleep_time;
  END LOOP;
  -- additional wait time for changes to become available to subscriber view
  dbms_lock.sleep(60);

  RETURN return_msg;
END wait_for_changes;
/
 
Preparation for DML in the Operating System (keep both widows open and visible)
-- In a separate terminal window
cd $ORACLE_BASE/diag/rdbms/orabase/orabase/trace

adrci

adrci> show homes

adrci> set homepath diag/rdbms/orcl/orcl

adrci> show alert -tail -f
-- tailing the alert log allows us to watch log miner at work
 
Preparation for DML in SQL*Plus (keep both windows open and visible)
-- open a SQL*Plus session as SYS
conn / as sysdba

desc gv$streams_capture

set linesize 121
col state format a20

SELECT capture_name, logminer_id, state, total_messages_captured
FROM gv$streams_capture;

-- open a SQL*Plus session as SYS
desc gv$streams_apply_reader

set linesize 121
col state format a20

SELECT apply_name, state, total_messages_dequeued
FROM gv$streams_apply_reader;
 
DML On Source Table 
conn hr/hr

UPDATE cdc_demo2 SET salary = salary +  500 WHERE job_id = 'SH_CLERK';
UPDATE cdc_demo2 SET salary = salary + 1000 WHERE job_id = 'ST_CLERK';
UPDATE cdc_demo2 SET salary = salary + 1500 WHERE job_id = 'PU_CLERK';
COMMIT;

INSERT INTO cdc_demo2
(employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id)
VALUES
(207, 'Mary', 'Lee', 'MLEE', '310.234.4590', TO_DATE('10-JAN-2003'), 'SH_CLERK',
4000, NULL, 121, 50);

INSERT INTO cdc_demo2
(employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id)
VALUES
(208, 'Karen', 'Prince', 'KPRINCE', '345.444.6756', TO_DATE('10-NOV-2003'), 'SH_CLERK', 3000, NULL, 111, 50);

INSERT INTO cdc_demo2
(employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id)
VALUES
(209, 'Frank', 'Gate', 'FGATE', '451.445.5678', TO_DATE('13-NOV-2003'), 'IT_PROG',
8000, NULL, 101, 50);

INSERT INTO cdc_demo2
(employee_id, first_name, last_name, email, phone_number, hire_date, job_id, salary, commission_pct, manager_id, department_id)
VALUES
(210, 'Paul', 'Jeep', 'PJEEP', '607.345.1112', TO_DATE('28-MAY-2003'), 'IT_PROG',
8000, NULL, 101, 50);

COMMIT;

SELECT *
FROM salary_history;
 
Validate Capture 
-- The first capture may take quite a few minutes. Later captures should be much faster

conn cdcadmin/cdcadmin

SELECT wait_for_changes(10, 120) message
FROM dual;

--The wait_for_changes function having indicated the changes have been populated apply the changes to the salary_history table
conn hr/hr

exec update_salary_history;

SELECT *
FROM salary_history;
 
Another Test 
conn hr/hr

SELECT employee_id, job_id, dept_id, old_salary, new_salary, pct_change
FROM salary_history
ORDER BY 1, 4, 5;

delete from cdc_demo2 where first_name = 'Mary' and last_name = 'Lee';
delete from cdc_demo2 where first_name = 'Karen' and last_name = 'Prince';
delete from cdc_demo2 where first_name = 'Frank' and last_name = 'Gate';
delete from cdc_demo2 where first_name = 'Paul' and last_name = 'Jeep';
COMMIT;

update cdc_demo2 set salary = salary + 5000 where job_id = 'AD_VP';
update cdc_demo2 set salary = salary - 1000 where job_id = 'ST_MAN';
update cdc_demo2 set salary = salary - 500 where job_id = 'FI_ACCOUNT';
COMMIT;

conn cdcadmin/cdcadmin

SELECT wait_for_changes(15, 120) message from dual;

conn hr/hr

SELECT COUNT(*)
FROM salary_history;

exec update_salary_history

SELECT COUNT(*)
FROM salary_history;

SELECT employee_id, job_id, dept_id, old_salary, new_salary, pct_change
FROM salary_history
order by 1, 4, 5;
 
Capture Cleanup
conn hr/hr

exec dbms_cdc_subscribe.drop_subscription('CDC_DEMO_SUB');

drop table salary_history purge;
drop procedure update_salary_history;

conn / as sysdba

-- reverse prepare table instantiation
exec dbms_capture_adm.abort_table_instantiation('HR.CDC_DEMO2');

-- drop the change table
exec dbms_cdc_publish.drop_change_table('CDCADMIN', 'CDC_DEMO_CT', 'Y');

-- drop the change set
exec dbms_cdc_publish.drop_change_set('CDC_DEMO_SET');

drop user cdcadmin cascade;
 
Related Topics
CDC Demo - Asynchronous AutoLog
CDC Demo - Synchronous Mode
DBMS_CAPTURE_ADM
DBMS_CDC_PUBLISH
DBMS_CDC_SUBSCRIBE
DBMS_STREAMS_AUTH
DBMS_OUTPUT
Export
Import
Streams Demo 1
User
 
Home      :      Code Library      :      Sponsors      :      Privacy      :      Terms of Use      :      Contact Us [62 users online]    © 2010 psoug.org