Learn more about SQL Server tools

mssqltips logo
 

Tutorials          DBA          Dev          BI          Career          Categories          Webcasts          Scripts          Today's Tip          Join

Tutorials      DBA      Dev      BI      Categories      Webcasts

DBA    Dev    BI    Categories

 

Change Capture Alternatives with Azure Data Factory


By:   |   Last Updated: 2019-07-18   |   Comments (2)   |   Related Tips: More > Azure

Problem

From time to time, you have to deal with source systems where records are not timestamped, i.e. no attribute that can be used to identify whether the record has been modified. A lack of tracking information from the source system significantly complicates the ETL design. This article will help you decide between three different change capture alternatives and guide you through the pipeline implementation using the latest available Azure Data Factory V2 with data flows. The three alternatives are:

  1. Data Flows by ADF
  2. SQL Server Stored Procedures by ADF
  3. Azure-SSIS by ADF
Solution

ADF (Azure Data Factory) allows for different methodologies that solve the change capture problem, such as: Azure-SSIS Integrated Runtime (IR), Data Flows powered by Databricks IR or SQL Server Stored Procedures. We will need a system to work and test with: Azure SQL Databases, we can use the Basic tier which is more than enough for our purposes (use this Tip to create an Azure SQL Database) for an instance of Azure Data Factory V2. You will also require resources like SSIS and Data Bricks IRs. These are moderately expensive and depending on which solution you prefer; we will create them later.

The Resource Group should look as follows:

sql database

An additional database instance of AdventureWorksLT will be used in this tip to generate the source sales data, whereas dwh (data warehouse) database is the destination database. The source data AdventureWorksLT view definition is provided below:

CREATE VIEW [dbo].[view_source_data] 
AS 
SELECT 
Orders.SalesOrderID, 
DATEADD(hour,Orders.SalesOrderID,OrderHeader.OrderDate) AS OrderDateTime, 
Products.Name AS ProductName, 
Products.ProductNumber AS ProductCode, 
Products.Color, 
Products.Size, 
Orders.OrderQty, 
Orders.UnitPrice, 
Orders.UnitPriceDiscount AS Discount, 
Orders.LineTotal 
FROM SalesLT.Product AS Products 
INNER JOIN SalesLT.SalesOrderDetail AS Orders 
ON Products.ProductID = Orders.ProductID 
INNER JOIN SalesLT.SalesOrderHeader AS OrderHeader 
ON Orders.SalesOrderID = OrderHeader.SalesOrderID 
Destination table definition in dwh database: 
CREATE SCHEMA stg 
GO 

CREATE SCHEMA Facts 
GO 

CREATE TABLE [Facts].[SalesData] 
( 
[HashId] [char](128) NOT NULL, -- for SHA512 algorithm 
[SalesOrderID] [int] NOT NULL, 
[OrderDateTime] [datetime2](0) NULL, 
[ProductName] [nvarchar](50) NOT NULL, 
[ProductCode] [nvarchar](25) NOT NULL, 
[Color] [nvarchar](15) NULL, 
[Size] [nvarchar](5) NULL, 
[OrderQty] [smallint] NOT NULL, 
[UnitPrice] [money] NOT NULL, 
[Discount] [money] NOT NULL, 
[LineTotal] [numeric](38, 6) NOT NULL, 
[RecordTimeStamp] [timestamp] NOT NULL  keeping track of changes 
) 
GO 

CREATE TABLE [stg].[SalesData] -- Staging area to Copy data to without transformation 
( 
[SalesOrderID] [int] NOT NULL, 
[OrderDateTime] [datetime2](0) NULL, 
[ProductName] [nvarchar](50) NOT NULL, 
[ProductCode] [nvarchar](25) NOT NULL, 
[Color] [nvarchar](15) NULL, 
[Size] [nvarchar](5) NULL, 
[OrderQty] [smallint] NOT NULL, 
[UnitPrice] [money] NOT NULL, 
[Discount] [money] NOT NULL, 
[LineTotal] [numeric](38, 6) NOT NULL 
) -- This Table should be truncated after each upload 

The purpose of the ETL will be to keep track of changes between two database tables by uniquely identifying every record using the following attributes: SalesOrderID, OrderDateTime, ProductName, ProductCode, Color and Size.

In real world terms, this will be applicable to scenarios where some order details like: quantity, unit price, discount, total are updated after the initial order line is written into the ERP database. In some cases, due to currency exchange rate differences between sales date and conversion date. This poses a challenge for ETL developers to keep track of such changes.

As there are so many identity columns using a join transformation to locate records is somewhat unpractical and IO intensive for SQL database. A more effective way will be a hash value identity column (in SalesData table it is HashId) using SHA512 algorithm. Here are the alternatives.

Solution with ADF Data Flows

This is an all Azure alternative where Dataflows are powered by Data Bricks IR in the background. Open adf-010 resource and choose "Author & Monitor". If you need more information on how to create and run Data Flows in ADF this tip will help.

Create a Source for bdo.view_source_data and Sink (Destination) for stg.SalesData.

adventure works
destination dwh

Add a new dataflow1 and add Source_SalesData as the source:

source settings

There are several options that one needs to consider depending on the behavior of the source system.

  1. "Allow schema drift" if enabled it will propagate metadata changes through the dataflow pipeline
  2. "Validate schema" if enabled it will fail if the underlying source metadata is different from the dataflow mapping
  3. "Sampling" is relevant for large datasets where getting part of the data is the only time feasible option

For the AdventureWorksLT dataset, none of these options are required, but you may want to adjust your choice depending on the system you are working with. The following adds a "Derived Column" transformation to calculate the HashId:

source view
documentation

Add column HashId and open Visual Expression Builder:

visual expression builder

The SHA-512 function definition is provided below:

sha2(512,SalesOrderID,OrderDateTime,ProductName,ProductCode,Color,Size)

The result of this function will be 128 hexadecimal character string matched by char(128) datatype in the HashId column.

Add the sink (Destination) following the derived column transformation:

sink sales data
sink sales data
allow insert

To allow data to flow smoothly between the source and destination it will update records that have equal HashIds and insert new records where HashId has no match on the Destination.  We also need to setup update methods on our sink. First define the HashId column as the key column and continue with the configuration by selecting "Allow insert" and "Allow update" to get data synced between the source and destination using HashId.

Solution with SQL Server Stored Procedures

Azure data factory has an activity to run stored procedures in the Azure SQL Database engine or Microsoft SQL Server. Stored procedures can access data only within the SQL server instance scope. (It is possible to extend the scope of a stored procedure by adding a "linked server" to your instance, but from an architectural point of view this is messy, and I recommend using the Copy Data transform when it comes to 100% Azure or hybrid infrastructures). To copy data from one Azure SQL database to another we will need a copy data activity followed by stored procedure that calculates the HashId. The Pipeline will look as follows:

copy data

The T-SQL code for the stored procedure that calculates the HashId with the help of HASHBYTES() T-SQL function is given below:

CREATE PROCEDURE [stg].[usp_adf_cdc]
 
AS
BEGIN
    SET NOCOUNT ON
   SET IMPLICIT_TRANSACTIONS OFF
 
   BEGIN TRANSACTION -- both changes to Fact.SalesData and truncation of staging data executed as one atomic transaction
 
   --Microsoft: Beginning with SQL Server 2016 (13.x), all algorithms other than SHA2_256, and SHA2_512 are deprecated.
   BEGIN TRY
 
      MERGE Facts.SalesData AS DEST  
      USING stg.SalesData AS SRC
    ON (
      DEST.HashId =
      HASHBYTES --HASH algorithm
      (
      'SHA2_512',
      CAST (SRC.SalesOrderID AS nvarchar) +
      CAST (SRC.OrderDateTime AS nvarchar) +
      SRC.ProductName +
      SRC.ProductCode +
      ISNULL(SRC.Color,'') +
      ISNULL(SRC.Size,'')
      )
      ) 
    WHEN MATCHED THEN   
        UPDATE SET DEST.OrderQty = SRC.OrderQty,
      DEST.UnitPrice = SRC.UnitPrice,
      DEST.Discount = SRC.Discount,
      DEST.Linetotal = SRC.Linetotal
   WHEN NOT MATCHED THEN  
      INSERT (HashId, SalesOrderID, OrderDateTime, ProductName, ProductCode, Color, Size, OrderQty, UnitPrice, Discount, Linetotal)  
      VALUES (
      HASHBYTES --HASH algorithm
      (
      'SHA2_512',
      CAST (SRC.SalesOrderID AS nvarchar) +
      CAST (SRC.OrderDateTime AS nvarchar) +
      SRC.ProductName +
      SRC.ProductCode +
      ISNULL(SRC.Color,'') +
      ISNULL(SRC.Size,'')
      ), SRC.SalesOrderID, SRC.OrderDateTime, SRC.ProductName,
      SRC.ProductCode, SRC.Color, SRC.Size, SRC.OrderQty, SRC.UnitPrice, SRC.Discount, SRC.Linetotal);
 
      TRUNCATE TABLE stg.SalesData
 
      COMMIT TRANSACTION
   END TRY
   BEGIN CATCH
      THROW -- error is returned to caller if catch block is triggered
      ROLLBACK TRANSACTION -- all changes are rolled back
   END CATCH
 
END
GO
			

The setup of the copy activity is given below:

source sales data
stored procedure name
data integration unit

Configuration of Source and Destination are self-explanatory, but the Settings tab needs an explanation. The "data integration unit" is for performance where 2 is the least performance and 256 is the maximum performance. Tune this according to your database tier. If you are coping a lot of data, I would recommend increasing not only the data integration unit, but also the "degree of copy parallelism". The "fault tolerance" setting affects the next activity execution. This pipeline will execute the stored procedure only if all rows in the copy activity are successful, this does not have to be this way, you could change the precedence constraint to competition instead of success.

Deploy and Debug to verify the pipeline code:

copy data

Solution with Azure-SSIS

Data Factory can orchestrate execution of SSIS packages from the SSISDB repository by setting up an Azure-SSIS Integrated Runtime on the ADF overview page:

configure ssis integration

For more help on configuration of Azure-SSIS IR environment consult this tip. Azure SSIS IR is costly when it comes to both compute resources and requires a SQL Server license. To minimize expenses, consider the resource level you need. In my experience for most small to medium size projects, one VM node of Standard_D4_v3(4vCores and 16GB memory) size is enough.

To compute the HashId with Azure-SSIS pipeline setup a project with the following data flow tasks:

source azure

The "HashId Script" component is a C#.net SSIS script configured for "Transformation". For more help on getting started with the SSIS Script task check out this tip.

The dot.NET C# code for the script component for the HashId using SHA2_512 algorithm is given below:

#region Namespacesusing System;
using System.Text;
using System.Security.Cryptography;
 
#endregion
   public override void Input0_ProcessInputRow(Input0Buffer Row)
    {
        String ProductColor;
        String ProductSize;
 
        if (Row.Color_IsNull == true) { ProductColor = ""; }
        else { ProductColor = Row.Color.ToString(); }
 
        if (Row.Size_IsNull == true) { ProductSize = ""; }
        else { ProductSize = Row.Size.ToString(); }
 
        String RowString = Row.SalesOrderID.ToString() +
                            Row.OrderDateTime.ToString() +
                            Row.ProductName.ToString() +
                            Row.ProductCode.ToString() +
                            ProductColor +
                            ProductSize;
 
        HashAlgorithm sha_512 = new SHA512CryptoServiceProvider();
        byte[] binary_hash = sha_512.ComputeHash(Encoding.UTF8.GetBytes(RowString));
        Row.HashId = ByteArrayToHexString(binary_hash);
    }
 
 
    public static string ByteArrayToHexString(byte[] MyByteArray)
    {
        StringBuilder HexedecimalString = new StringBuilder(MyByteArray.Length * 2);
        foreach (byte Byte in MyByteArray)
            HexedecimalString.AppendFormat("{0:x2}", Byte);
        return HexedecimalString.ToString();
    }
}	
}			

This script performs the exactly same actions as the T-SQL stored procedure in the previous section. It calculates a SHA2_512 hash value using the same columns and handles NULL exceptions for the color and size attributes.

Debug the SSIS package.

adventure works

The main advantage of the Azure-SSIS architecture is the ability for live debugging and data analysis while the pipeline runs. You can examine the HashId values live by placing a Data Viewer on the output of the script component, below is what this looks like.

rows displayed

Conclusion

All three Azure pipeline architectures have pros and cons when it comes to change capture using hashing algorithms. When it comes to usability and scalability, the Data Flow architecture clearly stands out as a better option. It offers the cleanest (from a coding point of view) approach to hash the attribute values. Whereas, the Stored Procedure and Azure-SSIS approaches give more control over the data flow and development process.

Next Steps


Last Updated: 2019-07-18


get scripts

next tip button



About the author
MSSQLTips author Semjon Terehhov Semjon Terehhov is a BI Architect and Manager at BDO Business Analytics in Norway with a masters degree from University of Oxford.

View all my tips
Related Resources




Post a comment or let the author know this tip helped.

All comments are reviewed, so stay on subject or we may delete your comment. Note: your email address is not published. Required fields are marked with an asterisk (*).

*Name    *Email    Email me updates 


Signup for our newsletter
 I agree by submitting my data to receive communications, account updates and/or special offers about SQL Server from MSSQLTips and/or its Sponsors. I have read the privacy statement and understand I may unsubscribe at any time.



    



Friday, August 02, 2019 - 3:32:24 AM - Semjon Terehhov Back To Top

Hi Nigel & thank you for comment,

1. Yes concatenation of variable length strings without delimiter can yield false negatives as you have described. I would normally advise to enforce stricter datatypes for things like product code to avoid the issue. The solution you have suggested is a good work around.
2. Completely with you on this one. Good optimisation suggestion.
3. Nice one. I will add it to my coding guideline practice.

The three options I have described here are not the only options, to your list I can add U-SQL and I strongly believe many more will be available in future as Microsoft has been expanding ADF rapidly in the recent years.

Thank you again for your comment,
Semjon Terehhov


Wednesday, July 31, 2019 - 1:25:08 PM - Nigel Meakins Back To Top

Hi Semjon.

Nice article. There are however a couple of things to be aware of with the above that will reduce the opportunity for issues:

  1. Use a delimiter when concatenating values for hashing, so as to avoid false negatives on your changes. In the above the following would yield the same: product name, code = "bread", "dd12", and "breadd", "d12". I like the double pipe "||" for this as it is very unlikely to ever rear it's head in valid data input.
  2. Use string builder rather than string when concatenating in C#, (or Java for that matter) to avoid excessive memory use.
  3. Consider concatenating in name order to be consistent across implementations.

Thanks for highlighting the implementation options we have in Azure. I guess you could also call out scala jar, python script in ADF as additional options for those familiar with these.

Regards

Nigel


Learn more about SQL Server tools