Setup As SYS - Prepare Database and Instance |
conn / as sysdba
-- *NIX only
define _editor=vi
-- validate database parameters
archive log list
-- Archive Mode
show parameter compatible -- must be 11.0 or above
show parameter global_names -- must
be TRUE
show parameter job_queue_processes -- min 2
show parameter
open_links -- not less than the default
4
show parameter shared_pool_size -- must be 0 or at least
200MB
show parameter streams_pool_size -- min.
480MB (10MB/capture 1MB/apply)
show parameter undo_retention -- min. 3600 (1 hr.)
(900)
-- Examples of altering initialization parameters
alter system set compatible='11.1.0.0.0' scope=SPFILE;
alter system set global_names=TRUE scope=BOTH;
alter system set job_queue_processes=6 scope=BOTH;
alter system set open_links=4 scope=SPFILE;
alter system set streams_pool_size=200M scope=BOTH; -- very slow if making smaller
alter system set undo_retention=3600 scope=BOTH;
/*
JOB_QUEUE_PROCESSES (current value) + 2
PARALLEL_MAX_SERVERS (current value) + (5 * (the number of change sets planned))
PROCESSES (current value) + (7 * (the number of change sets planned))
SESSIONS (current value) + (2 * (the number of change sets planned))
*/
-- Retest parameter after modification
shutdown immediate;
startup mount;
alter database archivelog;
-- important
alter database force logging;
-- one option among several
alter database add supplemental log data;
alter database open;
-- validate archivelogging
archive log list;
alter system switch logfile;
archive log list;
-- validate force and supplemental logging
col log_min format a7
col log_pk format a6
col log_pk format a6
col log_ui format a6
col log_fk format a6
col log_all format a7
col force_log format a9
SELECT supplemental_log_data_min LOG_MIN, supplemental_log_data_pk LOG_PK,
supplemental_log_data_ui LOG_UI, supplemental_log_data_fk LOG_FK,
supplemental_log_data_all LOG_ALL, force_logging FORCE_LOG
FROM v$database;
SELECT tablespace_name, force_logging
FROM dba_tablespaces;
-- examine CDC related data dictionary objects
SELECT table_name
FROM dba_tables
WHERE owner = 'SYS'
AND table_name LIKE 'CDC%$';
desc cdc_system$
SELECT * FROM cdc_system$; |
|
Setup As SYS - Create Streams Administrators |
conn / as sysdba
SELECT *
FROM dba_streams_administrator;
CREATE USER cdcadmin
IDENTIFIED BY cdcadmin
DEFAULT TABLESPACE users
TEMPORARY TABLESPACE temp
QUOTA 20M ON uwdata;
-- system privs
GRANT create session TO cdcadmin;
GRANT create table TO cdcadmin;
GRANT create sequence TO cdcadmin;
GRANT create procedure TO cdcadmin;
GRANT create any job TO cdcadmin;
-- role privs
GRANT execute_catalog_role TO cdcadmin;
GRANT select_catalog_role TO cdcadmin;
-- object privileges
GRANT execute ON dbms_cdc_publish TO cdcadmin;
GRANT execute ON dbms_cdc_subscribe TO
cdcadmin;
-- streams specific priv
execute dbms_streams_auth.grant_admin_privilege('CDCADMIN');
SELECT account_status, created
FROM dba_users
WHERE username = 'CDCADMIN';
SELECT *
FROM dba_sys_privs
WHERE grantee = 'CDCADMIN';
SELECT username
FROM dba_users u, streams$_privileged_user s
WHERE u.user_id = s.user#;
col local_privileges format a20
col access_from_remote format a20
SELECT *
FROM dba_streams_administrator; |
|
Prepare Schema Tables for
CDC Replication |
conn / as sysdba
alter user hr account unlock identified by hr;
connect hr/hr
SELECT table_name, reason
FROM all_streams_unsupported
WHERE owner = 'HR'
ORDER BY 1;
desc employees
SELECT *
FROM employees;
-- create CDC demo table
CREATE TABLE cdc_demo1 AS
SELECT * FROM employees;
ALTER TABLE cdc_demo1
ADD CONSTRAINT pk_cdc_demo1
PRIMARY KEY (employee_id)
USING INDEX;
-- a second way to implement supplemental logging
SELECT * FROM user_log_groups;
ALTER TABLE cdc_demo1
ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
SELECT * FROM user_log_groups;
-- table to track salary history changes originating
in cdc_demo
CREATE TABLE salary_history (
employee_id NUMBER(6),
first_name VARCHAR2(20),
last_name VARCHAR2(25),
old_salary NUMBER(8,2),
new_salary NUMBER(8,2),
pct_change NUMBER(4,2),
action_date DATE);
SELECT object_name, created
FROM user_objects
WHERE object_type = 'TABLE'
ORDER BY 2,1; |
|
Instantiate Source Table |
conn cdcadmin/cdcadmin
desc dba_capture_prepared_tables
SELECT table_name, scn, supplemental_log_data_pk, supplemental_log_data_ui,
supplemental_log_data_fk, supplemental_log_data_all
FROM dba_capture_prepared_tables;
dbms_capture_adm.prepare_table_instantiation(
table_name IN VARCHAR2,
supplemental_logging IN VARCHAR2 DEFAULT 'keys'); |
exec
dbms_capture_adm.prepare_table_instantiation('HR.CDC_DEMO1');
SELECT table_name, scn, supplemental_log_data_pk PK, supplemental_log_data_ui
UI,
supplemental_log_data_fk FK, supplemental_log_data_all "ALL"
FROM dba_capture_prepared_tables; |
|
Create
Synchronous Change Set |
conn cdcadmin/cdcadmin
desc all_change_sets
SELECT set_name, change_source_name, created, ignore_ddl, begin_date,
end_date
FROM all_change_sets;
dbms_cdc_publish.create_change_set(
change_set_name IN VARCHAR2,
description IN VARCHAR2 DEFAULT NULL,
change_source_name IN VARCHAR2,
stop_on_ddl IN CHAR DEFAULT 'N',
begin_date IN DATE DEFAULT NULL,
end_date IN DATE DEFAULT
NULL); |
-- this may take a minute or
two
exec dbms_cdc_publish.create_change_set('CDC_DEMO_SET',
'Synch Capture Demo', 'SYNC_SOURCE');
SELECT set_name, change_source_name, created, ignore_ddl, begin_date,
end_date
FROM all_change_sets;
conn / as sysdba
desc cdc_change_sets$
set linesize 121
col set_name format a20
col capture_name format a20
col queue_name format a20
col queue_table_name format a20
SELECT set_name, capture_name, queue_name, queue_table_name
FROM cdc_change_sets$;
SELECT set_name, change_source_name, capture_enabled, stop_on_ddl, publisher
FROM change_sets;
desc streams$_process_params;
SELECT process_type, name
FROM streams$_process_params;
|
|
Create Change Table |
conn cdcadmin/cdcadmin
dbms_cdc_publish.create_change_table(
owner IN VARCHAR2,
change_table_name IN VARCHAR2,
change_set_name IN VARCHAR2,
source_schema IN VARCHAR2,
source_table IN VARCHAR2,
column_type_list IN VARCHAR2,
capture_values IN VARCHAR2, -- BOTH, NEW, OLD
rs_id IN CHAR,
row_id IN CHAR,
user_id IN CHAR,
timestamp IN CHAR,
object_id IN CHAR,
source_colmap IN CHAR,
target_colmap IN CHAR,
options_string IN VARCHAR2,
ddl_markers IN CHAR DEFAULT 'Y'); |
exec dbms_cdc_publish.create_change_table('CDCADMIN', 'CDC_DEMO_CT',
'CDC_DEMO_SET', 'HR', 'CDC_DEMO', 'EMPLOYEE_ID
NUMBER(6), FIRST_NAME VARCHAR2(20),
LAST_NAME VARCHAR2(25), SALARY NUMBER', 'BOTH', 'Y', 'Y',
'Y', 'N', 'N', 'Y', 'Y', ' TABLESPACE USERS pctfree 0 pctused 99', 'N');
-- note the importance for synchronous of ddl_markers
(new in 11g)
SELECT object_name, object_type
FROM user_objects
ORDER BY 2,1;
col high_value format a15
SELECT table_name, composite, partition_name, high_value
FROM user_tab_partitions;
GRANT select ON cdc_demo_ct TO hr;
conn / as sysdba
desc cdc_change_tables$
SELECT change_set_name, source_schema_name, source_table_name
FROM cdc_change_tables$;
|
|
Create Subscription |
conn hr/hr
desc user_subscriptions
SELECT set_name, username, subscription_name, created, status
FROM user_subscriptions;
dbms_cdc_subscribe.create_subscription(
change_set_name IN VARCHAR2,
description IN VARCHAR2,
subscription_name IN VARCHAR2); |
exec dbms_cdc_subscribe.create_subscription('CDC_DEMO_SET',
'Sync Capture Demo Set', 'CDC_DEMO_SUB');
SELECT set_name, username, subscription_name, created, status
FROM user_subscriptions;
conn / as sysdba
set linesize 121
col description format a30
col subscription_name format a20
col username format a10
SELECT subscription_name, handle, set_name, username, earliest_scn, description
FROM cdc_subscribers$;
|
|
Subscribe
to and Activate Subscription |
conn hr/hr
desc user_subscribed_columns
SELECT handle, source_table_name, column_name, subscription_name
FROM user_subscribed_columns;
dbms_cdc_subscribe.subscribe(
subscription_name IN VARCHAR2,
source_schema IN
VARCHAR2,
source_table IN
VARCHAR2,
column_list IN VARCHAR2,
subscriber_view IN VARCHAR2); |
BEGIN
dbms_cdc_subscribe.subscribe('CDC_DEMO_SUB', 'HR',
'CDC_DEMO',
'EMPLOYEE_ID, FIRST_NAME, LAST_NAME,
SALARY', 'CDC_DEMO_SUB_VIEW');
END;
/
SELECT handle, source_table_name, column_name, subscription_name
FROM user_subscribed_columns;
dbms_cdc_subscribe.activate_subscription(
subscription_name IN VARCHAR2); |
exec dbms_cdc_subscribe.activate_subscription('CDC_DEMO_SUB');
SELECT set_name, subscription_name, status
FROM user_subscriptions;
|
|
Create Procedure To Populate Salary History Table |
conn hr/hr
/* Create a stored procedure to populate the new HR.SALARY_HISTORY table. The procedure
extends the subscription window of the CDC_DEMP_SUB subscription to get the most recent set of source table
changes. It uses the subscriber's DEMO_SUB_VIEW view to scan the changes and insert them into the SALARY_HISTORY
table. It then purges the subscription window to indicate that it is finished with
that set of changes. */
CREATE OR REPLACE PROCEDURE update_salary_history IS
CURSOR cur IS
SELECT *
FROM (
SELECT 'I' opt, cscn$, rsid$, employee_id, first_name,
last_name, 0 old_salary,
salary new_salary, commit_timestamp$
FROM cdc_demo_sub_view
WHERE operation$ = 'I '
UNION ALL
SELECT 'D' opt, cscn$, rsid$, employee_id, first_name,
last_name, salary old_salary,
0 new_salary, commit_timestamp$
FROM cdc_demo_sub_view
WHERE operation$ = 'D '
UNION ALL
SELECT 'U' opt , v1.cscn$, v1.rsid$, v1.employee_id, v1.first_name, v1.last_name,
v1.salary old_salary, v2.salary new_salaryi, v1.commit_timestamp$
FROM cdc_demo_sub_view v1, cdc_demo_sub_view v2
WHERE v1.operation$ = 'UU' and v2.operation$ = 'UN'
AND v1.cscn$ = v2.cscn$
AND v1.rsid$ = v2.rsid$
AND ABS(v1.salary - v2.salary) > 0)
ORDER BY cscn$, rsid$;
percent NUMBER;
BEGIN
--Step 1 Get the change (extend the window)
dbms_cdc_subscribe.extend_window('CDC_DEMO_SUB');
FOR rec IN cur LOOP
IF rec.opt = 'I' THEN
INSERT INTO salary_history
(employee_id, first_name, last_name,
old_salary, new_salary, pct_change, action_date)
VALUES
(rec.employee_id, rec.first_name,
rec.last_name, 0, rec.new_salary, NULL,
rec.commit_timestamp$);
END IF;
IF rec.opt = 'D' THEN
INSERT INTO salary_history
(employee_id, first_name, last_name,
old_salary, new_salary, pct_change, action_date)
VALUES
(rec.employee_id, rec.first_name,
rec.last_name, rec.old_salary, 0, NULL,
rec.commit_timestamp$);
END IF;
IF rec.opt = 'U' THEN
percent := (rec.new_salary - rec.old_salary) / rec.old_salary * 100;
INSERT INTO salary_history
(employee_id, first_name, last_name,
old_salary, new_salary, pct_change, action_date)
VALUES
(rec.employee_id, rec.first_name,
rec.last_name, rec.old_salary,
rec.new_salary, percent,
rec.commit_timestamp$);
END IF;
END LOOP;
COMMIT;
--Step 2 Purge the window of consumed data
dbms_cdc_subscribe.purge_window('CDC_DEMO_SUB');
END update_salary_history;
/
|
|
DML On Source Table |
conn hr/hr
SELECT employee_id, first_name, last_name, salary
FROM cdc_demo
ORDER BY 1 DESC;
SELECT employee_id, first_name, last_name, salary
FROM cdc_demo_sub_view;
SELECT *
FROM salary_history;
UPDATE cdc_demo
SET salary = salary+1
WHERE employee_id = 100;
COMMIT;
SELECT employee_id,first_name,last_name,salary
FROM cdc_demo_sub_view;
exec update_salary_history;
SELECT employee_id,first_name,last_name,salary
FROM cdc_demo_sub_view;
SELECT *
FROM salary_history;
conn cdcadmin/cdcadmin
col username$ format a10
SELECT operation$, cscn$, commit_timestamp$, username$, employee_id,
salary
FROM cdc_demo_ct;
|
|
Capture Cleanup |
conn hr/hr
exec dbms_cdc_subscribe.drop_subscription('CDC_DEMO_SUB');
drop table salary_history purge;
drop procedure update_salary_history;
conn / as sysdba
-- reverse prepare table instantiation
exec dbms_capture_adm.abort_table_instantiation('HR.CDC_DEMO');
-- drop the change table
exec dbms_cdc_publish.drop_change_table('CDCADMIN',
'CDC_DEMO_CT', 'Y');
-- drop the change set
exec dbms_cdc_publish.drop_change_set('CDC_DEMO_SET');
drop user cdcadmin cascade;
|
|