![]() |
|
|
|
By: Ray Barley | Read Comments (22) | Related Tips: 1 | 2 | 3 | 4 | More > Change Data Capture |
Problem
I have a requirement to track all changes to specific tables. The changes need to be stored in an audit table. It looks like the Change Data Capture capability in SQL Server 2008 could be used to get the changes but I need a way of copying the changes to the audit table. How can I do that?
Solution
Change Data Capture (CDC) is a new capability in SQL Server 2008 Enterprise Edition and can be used to track all inserts, updates and deletes to a table. A SQL Server Integration Services (SSIS) package would be a good choice for populating an audit table with the CDC data. Let's discuss a scenario then walk through a solution.
Assume that we want to store all changes to our customer table in a customer_audit table. We'll write an SSIS package to query the CDC data and copy it to the customer_audit table. We would like the option of running the SSIS package on demand and/or on a schedule (e.g. a SQL Agent job). Each time we run the SSIS package we want to pickup whatever changed since the last time we ran the package.
Customer Tables
We will use the following customer table:
create table dbo.customer ( customer_id int identity primary key not null , name nvarchar(50) not null , sales_rep nvarchar(50) not null , region nvarchar(50) not null , credit_limit int not null ) |
We will use the following customer_audit table to store changes to the customer table:
create table dbo.customer_audit ( customer_audit_id int identity primary key not null , customer_id int not null , name nvarchar(50) not null , sales_rep nvarchar(50) not null , region nvarchar(50) not null , credit_limit int not null , effective_date datetime not null , __$start_lsn binary(10) not null , __$seqval binary(10) not null , __$operation int not null , __$update_mask varbinary(128) not null ) |
The customer_audit table has each column from the customer table as well as the following additional columns provided by CDC:
effective_date will be populated from the cdc.lsn_time_mapping table (described later). It is the date and time of the transaction that modified the source table.
__$start_lsn is the log sequence number from the transaction log.
__$seqval provides an ordering of the changes within a transaction.
__$operation has one of the following values: 1=delete, 2=insert, 3=update (before values) and 4=update (after values).
__$update_mask is a bit mask where every column that was changed is set to 1.
Enabling CDC
CDC needs to be enabled at the database level and for each table that you want to track changes. CDC provides a stored procedure for each. Here is a sample script to enable CDC for our mssqltips database and the customer table:
use mssqltips go exec sys.sp_cdc_enable_db exec sys.sp_cdc_enable_table @source_schema = N'dbo' ,@source_name = N'customer' ,@role_name = N'cdc_admin' ,@capture_instance = N'customer_all' ,@supports_net_changes = 1 ,@index_name = NULL ,@captured_column_list = NULL ,@filegroup_name = NULL |
The main points about the above script are:
The stored procedure sys.sp_cdc_enable_db enables CDC for the current database.
The stored procedure sys.sp_cdc_enable_table enables CDC for a table.
A database role is created for the role_name parameter; members of this role have access to the CDC data.
The capture_instance parameter is used to identify the CDC data; you can have two per table.
Set supports_net_changes to 1 to optionally get the accumulation of changes to a row in a single row.
You must specify a unique index if the table does not have a primary key.
You can specify a list of columns to track or NULL to track all columns.
You can specify a filegroup for the CDC files or NULL to use the default.
For additional details check Books on Line for the sys.sp_cdc_enable_db and sys.sp_cdc_enable_table stored procedures.
Make sure that SQL Agent is running; CDC creates two SQL Agent jobs; one scans the transaction log and copies changes to enabled tables to individual change tables in the cdc schema. A second SQL Agent job clears out the individual change tables in the cdc schema. You can view the default values used in these SQL Agent jobs by executing the sys.sp_cdc_help_jobs stored procedure. You can change the default values by executing the cdc_jobs stored procedure. By default the cleanup job removes change data after it is 3 days old.
If you enable change data capture on a table and SQL Agent is not running, you will see the following warning message:
SQLServerAgent is not currently running so it cannot be notified of this action.
Logging
In order to allow our SSIS package to pickup just the changes since the last time it was run, we'll populate a log table with the data we need. We'll use the following log table:
create table dbo.cdc_capture_log ( cdc_capture_log_id int identity not null , capture_instance nvarchar(50) not null , start_time datetime not null , min_lsn binary(10) not null , max_lsn binary(10) not null , end_time datetime null , insert_count int not null default 0 , update_count int not null default 0 , delete_count int not null default 0 , status_code int not null default 0 ) |
The main points about the log table are:
capture_instance is the value specified when enabling CDC on the table.
start_time and end_time are recorded to allow us to track how long it takes to copy the CDC data to our audit table.
min_lsn and max_lsn represent the range of log sequence numbers (LSN) to copy. LSNs uniquely identify changes in the transaction log. CDC records the LSN with each change. We derive the min_lsn from the max_lsn of the last time our SSIS package was run. CDC provides the function sys.fn_cdc_get_max_lsn to retrieve the maximum LSN.
insert_count, update_count and delete_count record the counts each time we run the SSIS package.
status_code is set to 1 when the SSIS package completes successfully.
We'll create two stored procedures to maintain the log:
init_cdc_capture_log will create a new row.
end_cdc_capture_log will update the row.
The init_cdc_capture_log is called at the beginning of our SSIS package. It is shown below:
create procedure dbo.init_cdc_capture_log @capture_instance nvarchar(50) as begin set nocount on; declare @begin_lsn binary(10) , @end_lsn binary(10) , @prev_max_lsn binary(10) -- get the max LSN for the capture instance from -- the last extract select @prev_max_lsn = max(max_lsn) from dbo.cdc_capture_log where capture_instance = @capture_instance -- if no row found in cdc_capture_log get the min lsn -- for the capture instance if @prev_max_lsn is null set @begin_lsn = sys.fn_cdc_get_min_lsn(@capture_instance) else set @begin_lsn = sys.fn_cdc_increment_lsn(@prev_max_lsn) -- get the max lsn set @end_lsn = sys.fn_cdc_get_max_lsn() insert into dbo.cdc_capture_log (capture_instance,start_time,min_lsn,max_lsn) values (@capture_instance,getdate(),@begin_lsn,@end_lsn) select cast(scope_identity() as int) cdc_capture_log_id end |
The main points about the above stored procedure are:
We query the log to get the max_lsn from the last time we ran the SSIS package. If we find the row from our previous run we call the CDC function sys.fn.cdc_increment_lsn to increment the LSN, else we call the CDC function sys.fn_cdc_get_min_lsn to get the LSN of the first change record for our table.
We call the CDC function sys.fn_cdc_get_max_lsn to get the highest LSN (i.e. the LSN of the latest transaction). We get all CDC data up to and including this LSN on the current run.
We insert a new row into the log and return the identity value; we need the identity value to update this row.
The end_cdc_capture_log stored procedure updates the row created by the init_cdc_capture_log stored procedure. It is shown below:
create procedure dbo.end_cdc_capture_log @cdc_capture_log_id int , @insert_count int , @update_count int , @delete_count int as begin set nocount on; update dbo.cdc_capture_log set end_time = getdate() , insert_count = @insert_count , update_count = @update_count , delete_count = @delete_count , status_code = 1 where cdc_capture_log_id = @cdc_capture_log_id end |
The main points about the above stored procedure are:
cdc_capture_log_id is the value returned by the init_cdc_capture_log stored procedure.
We update the row with the counts, end time, and set the status_code to 1.
Creating the SSIS Package
We will create an SSIS package that has the following control flow:
The main points about the above control flow are:
Init Log is an Execute SQL task; it calls the init_cdc_capture_log stored procedure (described above) and saves the identity value of the created cdc_capture_log row in a package variable.
Process Changes is a Data Flow task that retrieves the latest changes from the CDC table and copies them to our audit table.
End Log is an Execute SQL task that calls the end_cdc_capture_log stored procedure (described above) to update the cdc_capture_log row.
The Process Changes Data Flow task is implemented as shown below:
The main points about the above data flow are:
Extract Customer Changes is an OLE DB Source that executes the stored procedure extract_customer_capture_log to retrieve the customer changes since the last run.
Count Inserts Updates and Deletes is a Script Component Transform task that just counts the number of inserts, updates and deletes in the change data.
Save Customer Changes to Custom Audit Table is an OLE DB Destination used to insert each change row into the customer_audit table.
The extract_customer_capture_log stored procedure is shown below:
create procedure dbo.extract_customer_capture_log @cdc_capture_log_id int as begin set nocount on; declare @begin_lsn binary(10) , @end_lsn binary(10) -- get the lsn range to process select @begin_lsn = min_lsn , @end_lsn = max_lsn from dbo.cdc_capture_log where cdc_capture_log_id = @cdc_capture_log_id -- extract and return the changes select m.tran_end_time modified_ts, x.* from cdc.fn_cdc_get_all_changes_customer_all( @begin_lsn, @end_lsn, 'all' ) x join cdc.lsn_time_mapping m on m.start_lsn = x.__$start_lsn ; end |
The main points about the above stored procedure are:
The cdc_capture_log_id parameter value is the value returned from the call to the init_cdc_capture_log stored procedure (described above in the Logging section).
Retrieve the LSN range from the cdc_capture_log table row. The LSN range represents all changes that have occurred since the last run of the SSIS package.
The cdc.fn_cdc_get_all_changes_customer_all function is generated when you enable CDC. The function name includes the capture instance. The function returns the changes that occurred in the LSN range.
The cdc.lsn_time_mapping table is populated by CDC with the mapping of transaction times to LSNs. The join retrieves the transaction time. This alleviates the need to manually track this in the source table.
Take a look at cdc.fn_cdc_get_all_changes_<capture_instance> in Books on Line for additional information on retrieving the CDC change data,
Testing the SSIS Package
Before running the SSIS package, we need to execute a script that performs some inserts, updates and deletes. We'll use the following script:
use mssqltips go insert into dbo.customer (name,sales_rep,region,credit_limit) values (N'BGM Systems', N'Cane', N'South', 2500) update dbo.customer set sales_rep = N'Smith' where [name] = N'BGM Systems' update dbo.customer set credit_limit = 3000 where [name] = N'BGM Systems' delete dbo.customer where [name] = N'BGM Systems' |
After running the above script, execute the SSIS package, then check the customer_audit table to see that there are four rows; one for each change made in the script. The partial contents of the customer_audit table are shown below:
The main points about the above table are:
effective_date is the date and time of the transaction, as retrieved from the cdc.lsn_time_mapping table.
Row 1 shows the insert; __$operation=2 for an insert.
Row 2 shows the update of the sales_rep; __$operation=4 for the update showing the values after the update.
Row 3 shows the update of the credit_limit.
Row 4 shows the delete; __$operation=1 for a delete.
The effective_date column provides the ability to query the customer_audit table and see the customer values at any point in time by filtering on the maximum effective_date that is less than or equal to some value.
Next Steps
| Monday, February 22, 2010 - 11:03:06 AM - VARMA1412@GMAIL.COM | Read The Tip |
|
Hi Ray Barley,
I have used your article How To Process Change Data Capture (CDC) in SQL Server Integration Services SSIS 2008. Same procedure is followed by creating new database. SSIS Package works fine. But in the end I dont see any Values in the Customer_Audit table. I TRIED SETTING UP ROLES AND ALL THE CONNECTIONS WERE PERFECT EVEN THE PACKAGE IS GETTING SUCCESS, BUT ID ONT SEE ANY UPDATED VALUES INTO CUSTOMER_AUDIT TABLE.
pLEASE HELP ME OUT ITS VERY URGENT.
Thanks, Varma
|
|
| Monday, February 22, 2010 - 1:12:32 PM - raybarley | Read The Tip |
| There are a number of things that you have to double check - did you enable CDC on the database, did you enable CDC on the table, is SQL Server Agent running? After you get through those, have you performed any inserts, updates or deletes in the table that has CDC enabled? Query the cdc.lsn_time_mapping table. There will be rows in there if any inserts, updates or deletes have happened on a table that has CDC enabled. Lastly you can query the actual changes that have been captured; take a look at this topic in books on line: cdc._CT | |
| Monday, September 20, 2010 - 5:54:48 PM - Chris Skorlinski | Read The Tip |
|
I enjoyed reading about your CDC\SSIS posting in MSSQLTIPS. I thought it was very well written, easy to follow, and very clearly walk through CDC and SSIS. I’ve linked your presentation to my MSDN “ReplTalk” blog. http://blogs.msdn.com/b/repltalk/archive/2010/04/19/references-and-links-on-change-data-capture.aspx Thanks for a great CDC\SSIS article Chris Skorlinski Microsoft SQL Server Escalation Services |
|
| Wednesday, September 29, 2010 - 10:35:25 AM - Ram | Read The Tip |
|
Thanks for posting this article. The code you provided gave me an excellent head start. I was able to customize it. Works great! The one comment I have is Microsoft advices not to use the CDC tables directly and instead use the functions they provide to get to the data. You have used the lsn_time_mapping table directly - is there a way to use the function instead? Thanks for this article again. Great Post! Ram |
|
| Wednesday, September 29, 2010 - 11:18:46 AM - Ray Barley | Read The Tip |
|
There are 2 functions available - sys.fn_cdc_map_lsn_to_time(lsn_value ) returns the date and time value from the tran_end_time column in the cdc.lsn_time_mapping system table for the specified log sequence number (LSN) and sys.fn_cdc_map_time_to_lsn ('<relational_operator>', tracking_time ) which returns the log sequence number (LSN) value from the start_lsn column in the cdc.lsn_time_mapping table for the specified time and relational operator. In my case I just wanted to process changes based on the LSN and not deal with dates and times at all. I don't like using date and time because I've actually had an instance where a last modified date and time was set on a row and the process that extracts data based on last modeified date and time ran before the transaction was committed and uncommitted updates were missed. Sounds hard to believe but I did see it. |
|
| Thursday, December 09, 2010 - 12:37:11 PM - Ali Bousselot | Read The Tip |
|
This is a great article! I downloaded the sample code and it ran successfully. Then, I immediately ran the sample code again and it failed because no data had changed in the customer table. Is there a way to handle that issue? I'm trying to design a CDC package that will run every 1 minute and I want it to run successfully even if the data hasn't changed in the source tables (with CDC enabled). Thanks, Ali |
|
| Thursday, December 09, 2010 - 3:20:41 PM - Ray Barley | Read The Tip |
|
I think the problem is caused by this code in the dbo.init_cdc_capture_log stored proc:
if @prev_max_lsn is null set @begin_lsn = sys.fn_cdc_get_min_lsn(@capture_instance) else set @begin_lsn = sys.fn_cdc_increment_lsn(@prev_max_lsn) -- get the max lsn set @end_lsn = sys.fn_cdc_get_max_lsn()
When there are no changes, set @begin_lsn = sys.fn_cdc_increment_lsn(@prev_max_lsn) returns a LSN that is greater than the @end_lsn and you get error 208 ("An insufficient number of arguments were supplied for the procedure or function cdc.fn_cdc_get_all_changes.")
I think what I did is to change the dbo.init_cdc_capture_log stored proc to return a value that indicates the @begin_lsn is (or is not) greater than the @end_lsn. the SSIS package would not continue if the @begin_lsn is greater than the @end_lsn since this means that there is nothing to do. |
|
| Thursday, December 09, 2010 - 3:31:37 PM - Ali Bousselot | Read The Tip |
|
Thanks for your help. The issue you described is exactly what happened when I ran the pacakge a second time. Do you recommend that the dbo.init_cdc_capture_log not insert a value in the cdc_capture_log table if the @begin_lsn is greater than the @end_lsn? Thanks again. |
|
| Thursday, December 09, 2010 - 3:44:46 PM - Ali Bousselot | Read The Tip |
|
Or do you recommend performing WAITFORs in the dbo.init_cdc_capture_log stored proc, to delay the execution of the stored procedure, if necessary? |
|
| Thursday, December 09, 2010 - 5:11:36 PM - Ray Barley | Read The Tip |
|
I would only insert a row into cdc_capture_log if there are changes to process. As fas as using WAITFOR that is certainly an option especially if you want to process changes as soon as possible. |
|
| Friday, December 10, 2010 - 10:57:42 AM - Ali Bousselot | Read The Tip |
|
Thanks for your feedback and help. I really appreciate it. We want to run a SSIS package (that copies the change data to a database on a separate server) every 1 minute. Is that a bad idea to run the pacakge every 1 minute? Do you think it would be better to run it every 3 minutes? The database that we're copying the change data to, will be used for reporting. We want reports to use this reporting database instead of connecting to the OLTP (source) database. |
|
| Monday, May 28, 2012 - 5:51:56 AM - JKJLKJ | Read The Tip | |
|
||
| Wednesday, July 11, 2012 - 10:20:36 AM - Luis | Read The Tip |
|
Hi Ray, thank you for your great article. I used it as a guide for me, and i found one issue implementing it. Have you regarded that after the first sincronization, you will count one more transaction than it's really appening? If yes can you mention it and how you solved it. If i find a work arround i'll let you know.
Regards. |
|
| Wednesday, July 11, 2012 - 3:38:41 PM - Ray Barley | Read The Tip |
|
I just did a couple of test runs and I can't reproduce the behavior "after the first sincronization, you will count one more transaction than it's really appening" |
|
| Thursday, July 12, 2012 - 6:06:25 AM - Luis | Read The Tip |
|
Hi, Thank for your answer. If you try to make one update and then check the log table you'll find the right count, but after that one, the counting will be increased by one, due to the count of the first row. I've also made one change to one of the procedures (init_cdc_capture_log), for it to be running periodically without errors. if @prev_max_lsn is null changed to: if @prev_max_lsn is null
|
|
| Thursday, July 12, 2012 - 6:22:25 AM - Luis | Read The Tip |
|
I leave an example for testing: 3 updates, sincronization, and after one update. The result of the table cdc_capture_log is: 93 TiposArtigo_all 2012-07-12 11:18:41.730 0x000092090000004C0001 0x00009209000001250004 2012-07-12 11:18:42.237 0 3 0 1 and should be: 93 TiposArtigo_all 2012-07-12 11:18:41.730 0x000092090000004C0001 0x00009209000001250004 2012-07-12 11:18:42.237 0 3 0 1
I hope i helped. Regards |
|
| Thursday, July 12, 2012 - 9:57:23 AM - Ray Barley | Read The Tip |
|
I tested again with your change to the init_cdc_capture_log stored proc: if @prev_max_lsn is null set @begin_lsn = sys.fn_cdc_get_min_lsn(@capture_instance) This change causes you to pick up a changed row a second time since the begin_lsn is the same as the end_lsn from the previous time you ran the package. An alternative way to fix the error 208 ("An insufficient number of arguments were supplied for the procedure or function cdc.fn_cdc_get_all_changes.") is to do the following: 1) add a precedence constraint after the Init Log Excecute SQL task; set the evaluation option to Expression and constraint, value = Success, and Expression = @[User::cdc_capture_log_id] > -1 2) change the stored proc to below to return -1 if the end_lsn is not > the end_lsn: replace code AFTER set @end_lsn = sys.fn_cdc_get_max_lsn() with:
if @begin_lsn <= @end_lsn
begin
insert into dbo.cdc_capture_log
(capture_instance,start_time,min_lsn,max_lsn)
values
(@capture_instance,getdate(),@begin_lsn,@end_lsn)
select cast(scope_identity() as int) cdc_capture_log_id
end
else
-- return -1 to indicate there is nothing to do
select CAST(-1 as int) cdc_capture_log_id
|
|
| Thursday, July 12, 2012 - 11:14:08 AM - Luis | Read The Tip |
|
Perfect, now it's woking like a charm. Thank you for all your support, and wonce again congratulations for your great article.
Regards, Luís Ivars |
|
| Tuesday, February 12, 2013 - 1:27:33 PM - JBP | Read The Tip |
|
IN SSIS I have implemented change data capture. I have made initial load and incremental load for the entire table which is related to data warehouse. Also made two master packages for initial and incremental. Initial consider as full load which would be loaded once and incremental loaded as per business require mental. I need some information related to risk and error handling or any other related information maintained of packages, how to send email notification do you have script??? |
|
| Tuesday, February 12, 2013 - 3:55:57 PM - Raymond Barley | Read The Tip |
|
I send emails from an SSIS package by using database mail in an Execute SQL task. Here's a tip that discusses the setup of database mail: http://www.mssqltips.com/sqlservertip/1100/setting-up-database-mail-for-sql-2005/ Once you have it setup use the sp_send_dbmail stored procedure in your Execute SQL task. Here's a link to sp_send_dbmail: http://msdn.microsoft.com/en-us/library/ms190307.aspx
|
|
| Wednesday, May 08, 2013 - 4:55:16 AM - Shreyans Halani | Read The Tip |
|
Hi Ray Barley,
Thanks, Shreyans |
|
| Wednesday, May 08, 2013 - 10:39:05 AM - Raymond Barley | Read The Tip |
|
Change Data Capture scans the transaction log looking for changes to tables / columns that have been configured for CDC. Any changes found are copied to the CDC tables. When you query cdc.fn_cdc_get_all_changes_<capture_instance> you can only retrieve rows that have changed; you can't retrive any row that has not changed. |
|
|
privacy | disclaimer | copyright | advertise | about authors | contribute | feedback | giveaways | user groups Some names and products listed are the registered trademarks of their respective owners. Edgewood Solutions LLC | MSSharePointTips.com | MSSQLTips.com |