CODE
Oracle Code Library
JOBS
Find Or Post Oracle Jobs
FORUM
Oracle Discussion & Chat
PSOUG Home Code Snippets Oracle Reference Oracle Functions PSOUG Forum Oracle Blogs Bookmark and Share
 
 Search the Reference Library pages:  

Free
Oracle Magazine
Subscriptions
and Oracle White Papers

Oracle Change Data Capture Synchronous Mode Demo
Version 11.1
 
Setup As SYS - Prepare Database and Instance
conn / as sysdba

-- *NIX only
define _editor=vi

-- validate database parameters
archive log list                   -- Archive Mode
show parameter compatible          -- must be 11.0 or above
show parameter global_names        -- must be TRUE
show parameter job_queue_processes -- min 2
show parameter open_links          -- not less than the default 4
show parameter shared_pool_size    -- must be 0 or at least 200MB
show parameter streams_pool_size   -- min. 480MB (10MB/capture 1MB/apply)
show parameter undo_retention      -- min. 3600 (1 hr.) (900)

-- Examples of altering initialization parameters
alter system set compatible='11.1.0.0.0' scope=SPFILE;
alter system set global_names=TRUE scope=BOTH;
alter system set job_queue_processes=6 scope=BOTH;
alter system set open_links=4 scope=SPFILE;
alter system set streams_pool_size=200M scope=BOTH; -- very slow if making smaller
alter system set undo_retention=3600 scope=BOTH;

/*
JOB_QUEUE_PROCESSES (current value) + 2
PARALLEL_MAX_SERVERS (current value) + (5 * (the number of change sets planned))
PROCESSES (current value) + (7 * (the number of change sets planned))
SESSIONS (current value) + (2 * (the number of change sets planned))
*/

-- Retest parameter after modification

shutdown immediate;

startup mount;

alter database archivelog;

-- important
alter database force logging;

-- one option among several
alter database add supplemental log data;

alter database open;

-- validate archivelogging
archive log list;

alter system switch logfile;

archive log list;

-- validate force and supplemental logging
col log_min format a7
col log_pk format a6
col log_pk format a6
col log_ui format a6
col log_fk format a6
col log_all format a7
col force_log format a9

SELECT supplemental_log_data_min LOG_MIN, supplemental_log_data_pk LOG_PK, supplemental_log_data_ui LOG_UI, supplemental_log_data_fk LOG_FK, 
supplemental_log_data_all LOG_ALL, force_logging FORCE_LOG
FROM v$database;

SELECT tablespace_name, force_logging
FROM dba_tablespaces;

-- examine CDC related data dictionary objects
SELECT table_name
FROM dba_tables
WHERE owner = 'SYS'
AND table_name LIKE 'CDC%$';

desc cdc_system$

SELECT * FROM cdc_system$;
 
Setup As SYS - Create Streams Administrators
conn / as sysdba

SELECT *
FROM dba_streams_administrator;

CREATE USER cdcadmin
IDENTIFIED BY cdcadmin
DEFAULT TABLESPACE users
TEMPORARY TABLESPACE temp
QUOTA 20M ON uwdata;

-- system privs
GRANT create session TO cdcadmin;
GRANT create table TO cdcadmin;
GRANT create sequence TO cdcadmin;
GRANT create procedure TO cdcadmin;
GRANT create any job TO cdcadmin;

-- role privs
GRANT execute_catalog_role TO cdcadmin;
GRANT select_catalog_role TO cdcadmin;

-- object privileges
GRANT execute ON dbms_cdc_publish TO cdcadmin;
GRANT execute ON dbms_cdc_subscribe TO cdcadmin;

-- streams specific priv
execute dbms_streams_auth.grant_admin_privilege('CDCADMIN');

SELECT account_status, created
FROM dba_users
WHERE username = 'CDCADMIN';

SELECT *
FROM dba_sys_privs
WHERE grantee = 'CDCADMIN';

SELECT username
FROM dba_users u, streams$_privileged_user s
WHERE u.user_id = s.user#;

col local_privileges format a20
col access_from_remote format a20

SELECT *
FROM dba_streams_administrator;
 
Prepare Schema Tables for CDC Replication
conn / as sysdba

alter user hr account unlock identified by hr;

connect hr/hr

SELECT table_name, reason
FROM all_streams_unsupported
WHERE owner = 'HR'
ORDER BY 1;

desc employees

SELECT *
FROM employees;

-- create CDC demo table
CREATE TABLE cdc_demo1 AS
SELECT * FROM employees;

ALTER TABLE cdc_demo1
ADD CONSTRAINT pk_cdc_demo1
PRIMARY KEY (employee_id)
USING INDEX;

-- a second way to implement supplemental logging
SELECT * FROM user_log_groups;

ALTER TABLE cdc_demo1
ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

SELECT * FROM user_log_groups;

-- table to track salary history changes originating in cdc_demo
CREATE TABLE salary_history (
employee_id NUMBER(6),
first_name  VARCHAR2(20),
last_name   VARCHAR2(25),
old_salary  NUMBER(8,2),
new_salary  NUMBER(8,2),
pct_change  NUMBER(4,2),
action_date DATE);

SELECT object_name, created
FROM user_objects
WHERE object_type = 'TABLE'
ORDER BY 2,1;
 
Instantiate Source Table
conn cdcadmin/cdcadmin

desc dba_capture_prepared_tables

SELECT table_name, scn, supplemental_log_data_pk, supplemental_log_data_ui,
supplemental_log_data_fk, supplemental_log_data_all
FROM dba_capture_prepared_tables;
dbms_capture_adm.prepare_table_instantiation(
table_name           IN VARCHAR2,
supplemental_logging IN VARCHAR2 DEFAULT 'keys');

exec dbms_capture_adm.prepare_table_instantiation('HR.CDC_DEMO1');

SELECT table_name, scn, supplemental_log_data_pk PK, supplemental_log_data_ui UI,
supplemental_log_data_fk FK, supplemental_log_data_all "ALL"
FROM dba_capture_prepared_tables;
 
Create Synchronous Change Set
conn cdcadmin/cdcadmin

desc all_change_sets

SELECT set_name, change_source_name, created, ignore_ddl, begin_date, end_date
FROM all_change_sets;
dbms_cdc_publish.create_change_set(
change_set_name    IN VARCHAR2,
description        IN VARCHAR2 DEFAULT NULL,
change_source_name IN VARCHAR2,
stop_on_ddl        IN CHAR DEFAULT 'N',
begin_date         IN DATE DEFAULT NULL,
end_date           IN DATE DEFAULT NULL
);

-- this may take a minute or two
exec dbms_cdc_publish.create_change_set('CDC_DEMO_SET', 'Synch Capture Demo', 'SYNC_SOURCE');

SELECT set_name, change_source_name, created, ignore_ddl, begin_date, end_date
FROM all_change_sets;

conn / as sysdba

desc cdc_change_sets$

set linesize 121
col set_name format a20
col capture_name format a20
col queue_name format a20
col queue_table_name format a20

SELECT set_name, capture_name, queue_name, queue_table_name
FROM cdc_change_sets$;

SELECT set_name, change_source_name, capture_enabled, stop_on_ddl, publisher
FROM change_sets;

desc streams$_process_params;

SELECT process_type, name
FROM streams$_process_params;

 
Create Change Table
conn cdcadmin/cdcadmin
dbms_cdc_publish.create_change_table(
owner             IN VARCHAR2,
change_table_name IN VARCHAR2,
change_set_name   IN VARCHAR2,
source_schema     IN VARCHAR2,
source_table      IN VARCHAR2,
column_type_list  IN VARCHAR2,
capture_values    IN VARCHAR2, -- BOTH, NEW, OLD
rs_id             IN CHAR,
row_id            IN CHAR,
user_id           IN CHAR,
timestamp         IN CHAR,
object_id         IN CHAR,
source_colmap     IN CHAR,
target_colmap     IN CHAR,
options_string    IN VARCHAR2,
ddl_markers       IN CHAR DEFAULT 'Y');

exec dbms_cdc_publish.create_change_table('CDCADMIN', 'CDC_DEMO_CT', 'CDC_DEMO_SET', 'HR', 'CDC_DEMO', 'EMPLOYEE_ID NUMBER(6), FIRST_NAME VARCHAR2(20), LAST_NAME VARCHAR2(25), SALARY NUMBER', 'BOTH', 'Y', 'Y', 'Y', 'N', 'N', 'Y', 'Y', ' TABLESPACE USERS pctfree 0 pctused 99', 'N');

-- note the importance for synchronous of ddl_markers (new in 11g)

SELECT object_name, object_type
FROM user_objects
ORDER BY 2,1;

col high_value format a15

SELECT table_name, composite, partition_name, high_value
FROM user_tab_partitions;

GRANT select ON cdc_demo_ct TO hr;

conn / as sysdba

desc cdc_change_tables$

SELECT change_set_name, source_schema_name, source_table_name
FROM cdc_change_tables$;

 
Create Subscription
conn hr/hr

desc user_subscriptions

SELECT set_name, username, subscription_name, created, status
FROM user_subscriptions;
dbms_cdc_subscribe.create_subscription(
change_set_name   IN VARCHAR2,
description       IN VARCHAR2,
subscription_name IN VARCHAR2);

exec dbms_cdc_subscribe.create_subscription('CDC_DEMO_SET', 'Sync Capture Demo Set', 'CDC_DEMO_SUB');

SELECT set_name, username, subscription_name, created, status
FROM user_subscriptions;

conn / as sysdba

set linesize 121
col description format a30
col subscription_name format a20
col username format a10

SELECT subscription_name, handle, set_name, username, earliest_scn, description
FROM cdc_subscribers$;

 
Subscribe to and Activate Subscription
conn hr/hr

desc user_subscribed_columns

SELECT handle, source_table_name, column_name, subscription_name
FROM user_subscribed_columns;
dbms_cdc_subscribe.subscribe(
subscription_name IN VARCHAR2,
source_schema     IN VARCHAR2,
source_table      IN VARCHAR2,
column_list       IN VARCHAR2,
subscriber_view   IN VARCHAR2);

BEGIN
  dbms_cdc_subscribe.subscribe('CDC_DEMO_SUB', 'HR', 'CDC_DEMO',
  'EMPLOYEE_ID, FIRST_NAME, LAST_NAME, SALARY', 'CDC_DEMO_SUB_VIEW');
END;
/

SELECT handle, source_table_name, column_name, subscription_name
FROM user_subscribed_columns;

dbms_cdc_subscribe.activate_subscription(
subscription_name IN VARCHAR2);

exec dbms_cdc_subscribe.activate_subscription('CDC_DEMO_SUB');

SELECT set_name, subscription_name, status
FROM user_subscriptions;

 
Create Procedure To Populate Salary History Table
conn hr/hr

/* Create a stored procedure to populate the new HR.SALARY_HISTORY table. The procedure extends the subscription window of the CDC_DEMP_SUB subscription to get the most recent set of source table changes. It uses the subscriber's DEMO_SUB_VIEW view to scan the changes and insert them into the SALARY_HISTORY table. It then purges the subscription window to indicate that it is finished with that set of changes. */

CREATE OR REPLACE PROCEDURE update_salary_history IS
 CURSOR cur IS
 SELECT *
 FROM (
   SELECT 'I' opt, cscn$, rsid$, employee_id, first_name, last_name, 0 old_salary,
   salary new_salary, commit_timestamp$
   FROM cdc_demo_sub_view
   WHERE operation$ = 'I '
   UNION ALL
   SELECT 'D' opt, cscn$, rsid$, employee_id, first_name, last_name, salary old_salary,
   0 new_salary, commit_timestamp$
   FROM cdc_demo_sub_view
   WHERE operation$ = 'D '
   UNION ALL
   SELECT 'U' opt , v1.cscn$, v1.rsid$, v1.employee_id, v1.first_name, v1.last_name,
   v1.salary old_salary, v2.salary new_salaryi, v1.commit_timestamp$
   FROM cdc_demo_sub_view v1, cdc_demo_sub_view v2
   WHERE v1.operation$ = 'UU' and v2.operation$ = 'UN'
   AND v1.cscn$ = v2.cscn$
   AND v1.rsid$ = v2.rsid$
   AND ABS(v1.salary - v2.salary) > 0)
   ORDER BY cscn$, rsid$;

 percent NUMBER;
BEGIN
  --Step 1 Get the change (extend the window)
  dbms_cdc_subscribe.extend_window('CDC_DEMO_SUB');

  FOR rec IN cur LOOP
    IF rec.opt = 'I' THEN
      INSERT INTO salary_history
      (employee_id, first_name, last_name, old_salary, new_salary, pct_change, action_date)
      VALUES
      (rec.employee_id, rec.first_name, rec.last_name, 0, rec.new_salary, NULL,
       rec.commit_timestamp$);
    END IF;

    IF rec.opt = 'D' THEN
      INSERT INTO salary_history
      (employee_id, first_name, last_name, old_salary, new_salary, pct_change, action_date)
      VALUES
      (rec.employee_id, rec.first_name, rec.last_name, rec.old_salary, 0, NULL,
       rec.commit_timestamp$);
    END IF;

    IF rec.opt = 'U' THEN
      percent := (rec.new_salary - rec.old_salary) / rec.old_salary * 100;
      INSERT INTO salary_history
      (employee_id, first_name, last_name, old_salary, new_salary, pct_change, action_date)
      VALUES
      (rec.employee_id, rec.first_name, rec.last_name, rec.old_salary,
      rec.new_salary, percent, rec.commit_timestamp$);
    END IF;
  END LOOP;
  COMMIT;

  --Step 2 Purge the window of consumed data
  dbms_cdc_subscribe.purge_window('CDC_DEMO_SUB');
END update_salary_history;
/
 
DML On Source Table 
conn hr/hr

SELECT employee_id, first_name, last_name, salary
FROM cdc_demo
ORDER BY 1 DESC;

SELECT employee_id, first_name, last_name, salary
FROM cdc_demo_sub_view;

SELECT *
FROM salary_history;

UPDATE cdc_demo
SET salary = salary+1
WHERE employee_id = 100;

COMMIT;

SELECT employee_id,first_name,last_name,salary
FROM cdc_demo_sub_view;

exec update_salary_history;

SELECT employee_id,first_name,last_name,salary
FROM cdc_demo_sub_view;

SELECT *
FROM salary_history;

conn cdcadmin/cdcadmin

col username$ format a10

SELECT operation$, cscn$, commit_timestamp$, username$, employee_id, salary
FROM cdc_demo_ct;
 
Capture Cleanup
conn hr/hr

exec dbms_cdc_subscribe.drop_subscription('CDC_DEMO_SUB');

drop table salary_history purge;

drop procedure update_salary_history;

conn / as sysdba

-- reverse prepare table instantiation
exec dbms_capture_adm.abort_table_instantiation('HR.CDC_DEMO');

-- drop the change table
exec dbms_cdc_publish.drop_change_table('CDCADMIN', 'CDC_DEMO_CT', 'Y');

-- drop the change set
exec dbms_cdc_publish.drop_change_set('CDC_DEMO_SET');

drop user cdcadmin cascade;
 
Related Topics
CDC Demo - Asynchronous AutoLog
CDC Demo - Asynchronous HotLog
DBMS_CAPTURE_ADM
DBMS_CDC_PUBLISH
DBMS_CDC_SUBSCRIBE
DBMS_STREAMS_AUTH
DBMS_OUTPUT
Export
Import
Streams Demo 1
User
 
Home      :      Code Library      :      Sponsors      :      Privacy      :      Terms of Use      :      Contact Us [265 users online]    © 2010 psoug.org