ABC VARCHAR2(10),BCD NUMBER(20,2), CDE DATE . . . |
ABC, BCD, CDE, . . . |
connect system/manager@whatever GRANT EXECUTE_CATALOG_ROLE to boris_publisher; GRANT SELECT_CATALOG_ROLE to boris_publisher; |
connect scott/tiger@whatever GRANT SELECT on emp to boris_publisher; GRANT SELECT on DEPT to boris_publisher; |
Create Change Tables |
---|
connect boris_publisher/boris_publisher@whatever set serveroutput on size 1000000 set linesize 120 /* Login as a publisher to run this */ BEGIN DECLARE work_sql VARCHAR2(32767); v_publisher_id VARCHAR2(20) := 'boris_publisher'; v_source_schema VARCHAR2(20) := 'scott'; v_source_table VARCHAR2(100); v_cdc_table VARCHAR2(100); CURSOR c_tables IS SELECT 'emp' table_name FROM dual UNION SELECT 'dept' table_name FROM dual ; BEGIN FOR tables IN c_tables LOOP v_source_table := tables.table_name; v_cdc_table := 'CDC_'||v_source_table; work_sql := etl_util.get_cols_definition( v_source_schema,v_source_table, '',''); DBMS_LOGMNR_CDC_PUBLISH.CREATE_CHANGE_TABLE ( OWNER => v_publisher_id, SOURCE_SCHEMA => v_source_schema ,CHANGE_SET_NAME => 'SYNC_SET', CAPTURE_VALUES => 'both' ,RS_ID => 'y', ROW_ID => 'n', USER_ID => 'y' , TIMESTAMP => 'y' ,OBJECT_ID => 'n' -- leave it as 'N' or you will have -- "table has no columns" error ,SOURCE_COLMAP => 'y', TARGET_COLMAP => 'y' ,OPTIONS_STRING => null ,SOURCE_TABLE => v_source_table ,CHANGE_TABLE_NAME => v_cdc_table ,COLUMN_TYPE_LIST => work_sql); DBMS_output.put_line('Change table '||v_cdc_table ||' was created successfully'); END LOOP; EXCEPTION WHEN OTHERS THEN DBMS_output.put_line( 'Error start ********************************************'); DBMS_output.put_line('Error during change table ' || v_cdc_table || ' creation:'); DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm); DBMS_output.put_line( 'Error end ********************************************'); END; END; / |
CURSOR c_tables IS SELECT 'emp' table_name FROM dual UNION SELECT 'dept' table_name FROM dual |
work_sql := etl_util.get_cols_definition(v_source_schema,v_source_table, '',''); |
comm number(7,2), deptno number(2), empno number(4), ename varchar2(10) , hiredate date, job varchar2(9), mgr number(4), sal number(7,2) |
work_sql := etl_util.get_cols_definition(v_source_schema,v_source_table, 'hiredate,job',''); |
describe cdc_emp | describe cdc_dept |
---|---|
Name Type ----------------- ------------ OPERATION$ CHAR(2) CSCN$ NUMBER COMMIT_TIMESTAMP$ DATE RSID$ NUMBER USERNAME$ VARCHAR2(30) TIMESTAMP$ DATE SOURCE_COLMAP$ RAW(128) TARGET_COLMAP$ RAW(128) COMM NUMBER(7,2) DEPTNO NUMBER(2) EMPNO NUMBER(4) ENAME VARCHAR2(10) HIREDATE DATE JOB VARCHAR2(9) MGR NUMBER(4) SAL NUMBER(7,2) |
Name Type ----------------- ------------ OPERATION$ CHAR(2) CSCN$ NUMBER COMMIT_TIMESTAMP$ DATE RSID$ NUMBER USERNAME$ VARCHAR2(30) TIMESTAMP$ DATE SOURCE_COLMAP$ RAW(128) TARGET_COLMAP$ RAW(128) DEPTNO NUMBER(2) DNAME VARCHAR2(14) LOC VARCHAR2(13) |
CONNECT scott/tiger@whatever GRANT SELECT ON emp TO boris_subscriber; GRANT SELECT ON dept TO boris_subscriber; CONNECT boris_publisher/boris_publisher GRANT SELECT ON cdc_emp TO boris_subscriber; GRANT SELECT ON cdc_dept TO boris_subscriber; |
Subscribe all the tables, activate subscription, extend CDC window |
---|
connect boris_subscriber/boris_subscriber@whatever set serveroutput on size 1000000 set linesize 120 /* Login as a subscriber to run this */ BEGIN DECLARE vSubhandle NUMBER; v_subscription_description VARCHAR2(40):='scott -> Datawarehouse'; v_publisher_id VARCHAR2(20) := 'BORIS_PUBLISHER'; v_source_schema VARCHAR2(20) := 'SCOTT'; v_subscriber_id VARCHAR2(20) := 'BORIS_SUBSCRIBER'; v_source_table VARCHAR2(100); v_cdc_table VARCHAR2(100); col_names VARCHAR2(32767); v_err VARCHAR2(10):='OK'; CURSOR c_tables IS SELECT 'emp' table_name FROM dual UNION SELECT 'dept' table_name FROM dual ; BEGIN -- find/create a subscription BEGIN vSubhandle :=0; -- get the handle SELECT handle INTO vSubhandle FROM all_subscriptions WHERE description = v_subscription_description; EXCEPTION WHEN OTHERS THEN vSubhandle := 0; dbms_output.put_line('Subscription ' ||v_subscription_description || ' does not exist in the database'); END; BEGIN IF (vSubhandle = 0) THEN -- create handle DBMS_CDC_SUBSCRIBE.GET_SUBSCRIPTION_HANDLE( CHANGE_SET => 'SYNC_SET', DESCRIPTION => v_subscription_description, SUBSCRIPTION_HANDLE => vSubhandle ); dbms_output.put_line('Obtained handle ' ||TO_CHAR(vSubhandle) || ' for subscription '||v_subscription_description ); END IF; EXCEPTION WHEN OTHERS THEN DBMS_output.put_line( 'Error start ***********************************'); DBMS_output.put_line( 'Error getting a handle ' || ' for a CDC subscription ' || v_subscription_description|| ':'); DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm || ' Handle = ' ||TO_CHAR(vSubhandle)); DBMS_output.put_line( 'Error end ***********************************'); v_err :='ERROR'; END; -- Subscribe for all the tables FOR tables IN c_tables LOOP v_source_table := UPPER(TRIM(tables.table_name)); BEGIN IF (v_err = 'ERROR') THEN RETURN; END IF; v_cdc_table := 'CDC_'||v_source_table; col_names := etl_util.get_col_names( v_source_schema,v_source_table, '',''); DBMS_LOGMNR_CDC_SUBSCRIBE.SUBSCRIBE (vSubhandle , v_source_schema, v_source_table, col_names); DBMS_output.put_line('Added table '|| v_source_schema ||'.' || v_source_table || ' to the CDC subscription ''' || v_subscription_description || ''' successfully. Handle = '||TO_CHAR(vSubhandle)); EXCEPTION WHEN OTHERS THEN DBMS_output.put_line( 'Error start ***********************************'); DBMS_output.put_line('Error adding table ' || v_source_schema||'.'||v_source_table || ' to the CDC subscription ''' || v_subscription_description || ''':'); DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm || ' Handle = ' ||TO_CHAR(vSubhandle)); DBMS_output.put_line( 'Error end ***********************************'); v_err :='ERROR'; END; END LOOP; BEGIN -- Activate the subscription IF (v_err = 'ERROR') THEN RETURN; END IF; DBMS_CDC_SUBSCRIBE.ACTIVATE_SUBSCRIPTION(vSubhandle); DBMS_output.put_line('CDC subscription ''' || v_subscription_description ||''' was successfully activated. Handle = ' ||TO_CHAR(vSubhandle)); EXCEPTION WHEN OTHERS THEN DBMS_output.put_line( 'Error start ***********************************'); DBMS_output.put_line( 'Error activating the CDC subscription ''' || v_subscription_description || ''':'); DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm || ' Handle = ' ||TO_CHAR(vSubhandle)); DBMS_output.put_line( 'Error end ***********************************'); v_err :='ERROR'; END; -- Extend the window immediately BEGIN -- Extend the window for subscription IF (v_err = 'ERROR') THEN RETURN; END IF; DBMS_CDC_SUBSCRIBE.EXTEND_WINDOW(SUBSCRIPTION_HANDLE=>vSubhandle); DBMS_output.put_line('Window to the CDC subscription ''' || v_subscription_description || ''' was successfully extended'); EXCEPTION WHEN OTHERS THEN DBMS_output.put_line( 'Error start ***********************************'); DBMS_output.put_line( 'Error during window extention to CDC subscription ''' ||v_subscription_description || ''':'); DBMS_output.put_line('Handle # '||TO_CHAR(vSubhandle)); DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm); DBMS_output.put_line( 'Error end ***********************************'); v_err :='ERROR'; END; END; END; / |
Added table SCOTT.DEPT to the CDC subscription 'scott -> Datawarehouse' successfully. Handle = 87 Added table SCOTT.EMP to the CDC subscription 'scott -> Datawarehouse' successfully. Handle = 87 CDC subscription 'scott -> Datawarehouse' was successfully activated. Handle = 87 Window to the CDC subscription 'scott -> Datawarehouse' was successfully extended |
Extend substription window and create CDC views |
---|
connect boris_subscriber/boris_subscriber@whatever set serveroutput on size 1000000 set linesize 200 /* Login as a subscriber to run this */ BEGIN DECLARE vSubhandle NUMBER; v_subscription_description VARCHAR2(40):='scott -> Datawarehouse'; v_publisher_id VARCHAR2(20) := 'BORIS_PUBLISHER'; v_source_schema VARCHAR2(20) := 'SCOTT'; v_subscriber_id VARCHAR2(20) := 'BORIS_SUBSCRIBER'; v_source_table VARCHAR2(100); v_cdc_table VARCHAR2(100); v_cdc_view_name VARCHAR2(40); our_view_name VARCHAR2(200); vSQL VARCHAR2(1000); CURSOR c_tables IS SELECT 'emp' table_name FROM dual UNION SELECT 'dept' table_name FROM dual ; BEGIN -- extend the window BEGIN -- get the handle SELECT handle INTO vSubhandle FROM all_subscriptions WHERE description = v_subscription_description; -- Extend the window for subscription DBMS_CDC_SUBSCRIBE.EXTEND_WINDOW(SUBSCRIPTION_HANDLE=>vSubhandle); DBMS_output.put_line('Window to the CDC subscription ''' || v_subscription_description || ''' was successfully extended'); EXCEPTION WHEN OTHERS THEN DBMS_output.put_line( 'Error start *************************************'); DBMS_output.put_line( 'Error during window extention to CDC subscription ''' ||v_subscription_description || ''':'); DBMS_output.put_line('Handle # '||TO_CHAR(vSubhandle)); DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm); DBMS_output.put_line( 'Error end *************************************'); END; -- create subscriber views/synonyms BEGIN FOR tables IN c_tables LOOP -- create a view v_source_table := UPPER(TRIM(tables.table_name)); BEGIN v_cdc_table := 'CDC_'||v_source_table; v_cdc_view_name:=v_cdc_table|| '_vw'; DBMS_CDC_SUBSCRIBE.PREPARE_SUBSCRIBER_VIEW( SUBSCRIPTION_HANDLE=> vSubhandle ,SOURCE_SCHEMA => v_source_schema ,SOURCE_TABLE => v_source_table ,VIEW_NAME => our_view_name); DBMS_output.put_line('Subscriber view ''' ||our_view_name|| ''' was successfully created for table ' || v_source_schema||'.'||v_source_table); EXCEPTION WHEN OTHERS THEN DBMS_output.put_line( 'Error start **********************************'); DBMS_output.put_line( 'Error during creation of subscriber view' || for source table ' ||v_source_table||': '); DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm || ' Handle #'||TO_CHAR(vSubhandle)); DBMS_output.put_line( 'Error end **********************************'); END; -- drop the previous synonym BEGIN vSQL := 'DROP SYNONYM ' || v_cdc_view_name; EXECUTE IMMEDIATE vSQL; EXCEPTION WHEN OTHERS THEN NULL; END; -- create a private synonym to point to this view: BEGIN vSQL := 'CREATE SYNONYM ' || v_cdc_view_name ||' FOR '|| our_view_name; EXECUTE IMMEDIATE vSQL; DBMS_output.put_line('Private synonym ''' || v_cdc_view_name || ''' for view ''' || our_view_name || ''' was successfully created.'); EXCEPTION WHEN OTHERS THEN DBMS_output.put_line( 'Error start ******************************'); DBMS_output.put_line( 'Error during creation of the synonym ''' || v_cdc_view_name || ''' for view ''' || our_view_name || ''': '); DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm); DBMS_output.put_line( 'Error end ******************************'); END; END LOOP; END; END; END; / |
Window to the CDC subscription 'scott -> Datawarehouse' was successfully extended Subscriber view 'CDC#CV$8757846' was successfully created for table SCOTT.DEPT Private synonym 'CDC_DEPT_vw' for view 'CDC#CV$8757846' was successfully created. Subscriber view 'CDC#CV$8757848' was successfully created for table SCOTT.EMP Private synonym 'CDC_EMP_vw' for view 'CDC#CV$8757848' was successfully created. |
CREATE OR REPLACE VIEW CDC#CV$8757846 ( OPERATION$, CSCN$, COMMIT_TIMESTAMP$, TIMESTAMP$, USERNAME$, TARGET_COLMAP$, SOURCE_COLMAP$, RSID$, DEPTNO, DNAME, LOC ) AS SELECT OPERATION$, CSCN$, COMMIT_TIMESTAMP$, TIMESTAMP$, USERNAME$, TARGET_COLMAP$, SOURCE_COLMAP$, RSID$, "DEPTNO", "DNAME", "LOC" FROM "BORIS_PUBLISHER"."CDC_DEPT" WHERE CSCN$ >= 40802127 AND CSCN$ <= 41013754 WITH READ ONLY |
CREATE VIEW CDC_EMP_VW AS SELECT'I' operation$, 1 cscn$, SYSDATE commit_timestamp$, 1 rsid$ , 'initial_load' username$, SYSDATE timestamp$, HEXTORAW('FEFFFFFF)' SOURCE_COLMAP$ , HEXTORAW('FEFFFFFF') TARGET_COLMAP$ , t.* FROM emp t; |
Column id | Binary Value | Hex Value |
---|---|---|
1 | 10 | 2 |
2 | 100 | 4 |
3 | 1000 | 8 |
4 | 10000 | 10 |
5 | 100000 | 20 |
6 | 1000000 | 40 |
7 | 10000000 | 80 |
8 | 00000000 00000001 | 00 01 |
9 | 00000000 00000010 | 00 02 |
. . . | . . . | . . . |
16 | 00000001 00000000 00000000 | 01 00 00 |
Drop CDC Views and Purge CDC Window |
---|
connect boris_subscriber/boris_subscriber@whatever set serveroutput on size 1000000 set linesize 200 /* Login as a subscriber to run this */ BEGIN DECLARE vSubhandle NUMBER; v_subscription_description VARCHAR2(40):= 'scott -> Datawarehouse'; v_publisher_id VARCHAR2(20) := 'BORIS_PUBLISHER'; v_source_schema VARCHAR2(20) := 'SCOTT'; v_subscriber_id VARCHAR2(20) := 'BORIS_SUBSCRIBER'; v_source_table VARCHAR2(100); v_cdc_table VARCHAR2(100); v_cdc_view_name VARCHAR2(40); our_view_name VARCHAR2(200); vSQL VARCHAR2(1000); v_err VARCHAR2(10):='OK'; CURSOR c_tables IS SELECT 'emp' table_name FROM dual UNION SELECT 'dept' table_name FROM dual ; BEGIN -- find a subscription BEGIN vSubhandle :=0; -- get the handle SELECT handle INTO vSubhandle FROM all_subscriptions WHERE description = v_subscription_description; EXCEPTION WHEN OTHERS THEN vSubhandle := 0; dbms_output.put_line('Subscription ' ||v_subscription_description || ' does not exist in the database' ); END; FOR tables IN c_tables LOOP v_source_table := UPPER(TRIM(tables.table_name)); BEGIN v_cdc_table := 'CDC_'||v_source_table; v_cdc_view_name:=v_cdc_table || '_vw'; -- drop the synonym BEGIN vSQL := 'DROP SYNONYM ' || v_cdc_view_name; EXECUTE IMMEDIATE vSQL; EXCEPTION WHEN OTHERS THEN NULL; END; -- drop the subscriber view DBMS_CDC_SUBSCRIBE.DROP_SUBSCRIBER_VIEW( SUBSCRIPTION_HANDLE=> vSubhandle ,SOURCE_SCHEMA => v_source_schema , SOURCE_TABLE => v_source_table); DBMS_output.put_line('Subscriber View for CDC table ''' || v_cdc_table || ''' was successfully dropped. Handle # ' ||TO_CHAR(vSubhandle)); EXCEPTION WHEN OTHERS THEN DBMS_output.put_line( 'Error start ********************************'); DBMS_output.put_line( 'Error during dropping the subscriber view for ''' || v_cdc_table || ''' ' || ' to the CDC subscription ''' || v_subscription_description || ''':'); DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm || ' Handle = '||TO_CHAR(vSubhandle)); DBMS_output.put_line( 'Error end ********************************'); END; END LOOP; BEGIN -- purge window DBMS_CDC_SUBSCRIBE.PURGE_WINDOW( SUBSCRIPTION_HANDLE=> vSubhandle); DBMS_output.put_line( 'Subscriber Window for subscription ''' || v_subscription_description || ''' was successfully purged '); EXCEPTION WHEN OTHERS THEN DBMS_output.put_line( 'Error during subscriber view drop: '); DBMS_output.put_line(to_char(sqlcode)|| ' ' || sqlerrm ||' Handle # '||TO_CHAR(vSubhandle)); END; END; END; / |
Subscriber View for CDC table 'CDC_DEPT' was successfully dropped. Handle # 86 Subscriber View for CDC table 'CDC_EMP' was successfully dropped. Handle # 86 Subscriber Window for subscription 'scott -> Datawarehouse' was successfully purged |