By: Ron L'Esteve | Comments (3) | Related: > Azure Data Factory
Problem
In my previous articles, Azure Data Factory Pipeline to fully Load all SQL Server Objects to ADLS Gen2 and Load Data Lake files into Azure Synapse Analytics Using Azure Data Factory, I demonstrated how to 1) fully load an Azure Data Lake Storage Gen2 from a SQL Database and then 2) fully load Azure Synapse DW from the Data Lake Storage Gen2 parquet files using Azure Data Factory's Copy Activity.
Additionally, this E-L-T process was driven by a meta-data approach by using pipeline parameters populated in a SQL table. While this is a useful E-L-T method, how can we advance this process to address how to 1) incrementally load Azure Data Lake Storage from a SQL source and 2) Update and Insert (Upsert) the incremental records into an Azure Synapse DW destination?
Solution
In this article, we will explore the inbuilt Upsert feature of Azure Data Factory's Mapping Data flows to update and insert data from Azure Data Lake Storage Gen2 parquet files into Azure Synapse DW. It is important to note that Mapping Data flows does not currently support on-premises data sources and sinks, therefore this demonstration will utilize an Azure SQL Database as the source dataset.
Additionally, we will use a custom method of incrementally populating Data Lake Storage gen2 with parquet files based on the Created Date within the Azure SQL Database. This article assumes that you are familiar with my previous articles which demonstrate a meta-data driven E-T-L approach using Azure Data Factory.
Create a Parameter Table
Let's begin the process by creating a pipeline_parameter table in an Azure SQL Database. This table will contain the parameter values that will be configured to control the ADF pipeline.
In particular, we will be interested in the following columns for the incremental and upsert process:
- upsert_key_column: This is the key column that must be used by mapping data flows for the upsert process. It is typically an ID column.
- incremental_watermark_value: This must be populated with the source SQL table's value to drive the incremental process. This is typically either a primary key id or created/last updated date column. It can be updated by a stored procedure.
- incremental_watermark_column: This is simply the column name of for the value populated in the incremental_watermark_value column. This is typically either a primary key id or created/last updated date column. It can be updated by a stored procedure.
- process_type: This must be set to incremental for ADF to know which records within this table are incrementals.
SET ANSI_NULLS ON GO SET QUOTED_IDENTIFIER ON GO CREATE TABLE [dbo].[pipeline_parameter]( [PARAMETER_ID] [int] IDENTITY(1,1) NOT NULL, [server_name] [nvarchar](500) NULL, [src_type] [nvarchar](500) NULL, [src_schema] [nvarchar](500) NULL, [src_db] [nvarchar](500) NULL, [src_name] [nvarchar](500) NULL, [dst_type] [nvarchar](500) NULL, [dst_name] [nvarchar](500) NULL, [include_pipeline_flag] [nvarchar](500) NULL, [partition_field] [nvarchar](500) NULL, [process_type] [nvarchar](500) NULL, [priority_lane] [nvarchar](500) NULL, [pipeline_date] [nvarchar](500) NULL, [pipeline_status] [nvarchar](500) NULL, [load_synapse] [nvarchar](500) NULL, [load_frequency] [nvarchar](500) NULL, [dst_folder] [nvarchar](500) NULL, [file_type] [nvarchar](500) NULL, [lake_dst_folder] [nvarchar](500) NULL, [spark_flag] [nvarchar](500) NULL, [dst_schema] [nvarchar](500) NULL, [distribution_type] [nvarchar](500) NULL, [load_sqldw_etl_pipeline_date] [datetime] NULL, [load_sqldw_etl_pipeline_status] [nvarchar](500) NULL, [load_sqldw_curated_pipeline_date] [datetime] NULL, [load_sqldw_curated_pipeline_status] [nvarchar](500) NULL, [load_delta_pipeline_date] [datetime] NULL, [load_delta_pipeline_status] [nvarchar](500) NULL, [upsert_key_column] [nvarchar](500) NULL, [incremental_watermark_column] [nvarchar](500) NULL, [incremental_watermark_value] [datetime] NULL, PRIMARY KEY CLUSTERED ( [PARAMETER_ID] ASC )WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF) ON [PRIMARY] ) ON [PRIMARY] GO
Create a Source Query for the ADF Pipeline
Now that we have created our pipeline_parameter. Let's write a custom SQL query that will be used as the source of the ADF pipeline.
Notice the addition of a SQLCommand and WhereValue in the query below which could be used to dynamically create a SQL statement and where clause based on whether the process type is full or incremental. For the purposes of this demo, we will only apply a filter for the incrementals, but the query demonstrates the flexibility to incorporate full loads into the same source query.
select src_schema, src_db, src_name, dst_schema, dst_type, dst_name, dst_folder, process_type, file_type, load_synapse, distribution_type, upsert_key_column, incremental_watermark_column, case when process_type = 'FULL' then 'select * from ' + src_schema + '.' + src_name + ' where 1 = ' when process_type = 'Incremental' then 'select * from ' + src_schema + '.' + src_name + ' where ' + incremental_watermark_column + ' > ' end as SQLCommand, case when process_type = 'FULL' then '1' when process_type = 'incremental' then cast(isnull(incremental_watermark_value,'yyyy-MM-dd') as varchar(50)) end as WhereValue, dst_folder + '/' + dst_name+ '/' + file_type+ '/' + format(getdate(),'yyyy-MM-dd') as FolderName, dst_name +'.' + file_type as FileName from dbo.pipeline_parameter where load_synapse = 1 AND process_type = 'incremental'
Here is the result of the query after populating the pipeline_parameter with one incremental record that we want to run through the ADF pipeline.
![IncrementalQuerySQL Image of SQL Query results](/tipimages2/6729_azure-data-factory-mapping-data-flows.001.png)
Add the ADF Datasets
Next, lets head over to ADF and create the following datasets.
Azure SQL Database
We'll need an Azure SQL Database source dataset.
![SourceASQLDB Image of Source Azure SQL DB](/tipimages2/6729_azure-data-factory-mapping-data-flows.002.png)
Azure Data Lake Storage Gen2 – Parquet
We will also need an Azure Data Lake Storage Gen2 dataset that we can use within mapping data flows to create the parquet files.
![ParquetADLS2_1 Connection config for ADLS2 filepath](/tipimages2/6729_azure-data-factory-mapping-data-flows.003.png)
Also add the following parameters to the parquet configuration section.
![ParquetADLS2_1 Connection parameters for ADLS2 filepath](/tipimages2/6729_azure-data-factory-mapping-data-flows.004.png)
Azure Synapse Analytics – DW
Finally, we will need an Azure Synapse DW destination dataset.
![SynapseConnection Connection for Synapse dataset](/tipimages2/6729_azure-data-factory-mapping-data-flows.005.png)
Create the ADF Pipeline
Now that we have created the required datasets, let's begin configuring the ADF pipeline activities.
Lookup – Get-Tables
Let's begin with a look up to get the tables needed for the ADF pipeline. Here is where we will add the query that was created in the previous steps.
![p_sql_to_synapse_lookup sql to synapse incremental pipeline lookup settings and source query](/tipimages2/6729_azure-data-factory-mapping-data-flows.006.png)
ForEach – Copy-Each-Table
Also, configure the Foreach loop settings as follows.
![p_sql_to_synapse_foreach sql to synapse incremental pipeline foreach settings](/tipimages2/6729_azure-data-factory-mapping-data-flows.007.png)
Mapping Data Flow – SQL to Lake Incremental
Now we can get started with building the mapping data flows for the incremental loads from the source Azure SQL Database to the sink Data Lake Store Gen2 parquet folders and files.
The FolderName and FileName were created in the source ADLS parquet dataset and used as a source in the mapping data flow.
![p_sql_to_synapse_foreach_incremental sql to synapse incremental pipeline foreach data flow settings](/tipimages2/6729_azure-data-factory-mapping-data-flows.008.png)
The parameters contain the SQLCommand, WhereValue, and FileName.
![p_sql_to_synapse_foreach_param sql to synapse incremental pipeline foreach data flow parameters](/tipimages2/6729_azure-data-factory-mapping-data-flows.009.png)
The source settings of the mapping data flow are configured as follows:
![df_SQL_to_Lake_src_settings Source settings for data flow](/tipimages2/6729_azure-data-factory-mapping-data-flows.010.png)
The source options of the data flow are configured as follows and uses the string interpolation expression feature. For more on building expressions with mapping data flows, read: Build expressions in mapping data flow.
![df_SQL_to_Lake_src_options Source options for data flow](/tipimages2/6729_azure-data-factory-mapping-data-flows.011.png)
Next, lets configure the destination sink of the mapping data flow as follows:
![df_SQL_to_Lake_dst_sink sink for dst data flow](/tipimages2/6729_azure-data-factory-mapping-data-flows.012.png)
The settings will indicate the single file output FileName.
![df_SQL_to_Lake_dst_settings settings for dst data flow](/tipimages2/6729_azure-data-factory-mapping-data-flows.013.png)
And we'll used single partitioning in the optimize tab.
![df_SQL_to_Lake_dst_optimize optimize settings for dst data flow](/tipimages2/6729_azure-data-factory-mapping-data-flows.014.png)
By clicking on the blank space of the mapping data flow canvas, the mapping data flow parameters tab will appear. Add the following parameters.
![df_SQL_to_Lake_param data flow params for data flow](/tipimages2/6729_azure-data-factory-mapping-data-flows.015.png)
Mapping Data Flow – Incremental Upsert from Lake to Synapse
Now that we've created and configured the SQL to Lake Incremental mapping data flow pipeline, its time to create and configure the incremental upsert from lake to synapse pipeline.
![df_upsert_to_synapse_settings data flow settings for upsert](/tipimages2/6729_azure-data-factory-mapping-data-flows.016.png)
Ensure to configure the following data flow parameters.
![df_upsert_to_synapse_params data flow parameters for upsert](/tipimages2/6729_azure-data-factory-mapping-data-flows.017.png)
This data flow will contain the following three activities.
Begin by configuring the settings of the lake source as follows:
![df_upsert_to_synapse_src_settings SRC data flow settings for upsert](/tipimages2/6729_azure-data-factory-mapping-data-flows.018.png)
Next, ensure that the source options tab contains the parameterized FolderName.
![df_upsert_to_synapse_src_options SRC data flow options for upsert](/tipimages2/6729_azure-data-factory-mapping-data-flows.019.png)
Add an AlterRow transform activity and add an Upsert if row condition to equal true().
![df_upsert_to_synapse_alterrow_settings AlterRow data flow settings for upsert](/tipimages2/6729_azure-data-factory-mapping-data-flows.020.png)
I used current partitioning within the optimize tab but there may be opportunities to explore setting the partitioning for big data workloads and to improve performance.
![df_upsert_to_synapse_alterrow_optimize AlterRow data flow optimize settings for upsert](/tipimages2/6729_azure-data-factory-mapping-data-flows.021.png)
Finally, configure the destination Synapse dataset.
![df_upsert_to_synapse_dst_sink DST data flow sink for upsert](/tipimages2/6729_azure-data-factory-mapping-data-flows.022.png)
Within the settings tab, choose 'Allow Upsert' for the update method and add the upsert_key_column that we created and populated in the pipeline parameter table. I have chosen to not 'Enable Staging' for this demo but this may be a good option for performance optimization purposes.
![df_upsert_to_synapse_dst_settings DST data flow settings for upsert](/tipimages2/6729_azure-data-factory-mapping-data-flows.023.png)
Finally, ensure that the mapping data flow parameters contain the FolderName and upsert_key_column. To get to these parameters, remember to click into the white space of the mapping data flow canvas.
![df_upsert_to_synapse_params data flow params for upsert](/tipimages2/6729_azure-data-factory-mapping-data-flows.024.png)
Run the ADF Pipeline, Test & Verify the Results
Now that we have created the ADF pipeline, lets run it to test and verify the results.
Verify Incremental SQL to Synapse Pipeline Results
After running this pipeline, we can see that the end-to-end pipeline succeeded and copied over one table since we only had one record in the pipeline_parameter table.
![pipeline_run_details execution details of the pipeline](/tipimages2/6729_azure-data-factory-mapping-data-flows.025.png)
Verify Incremental SQL to Lake ADF Pipeline Results
The pipeline copied 493 rows from the source Azure SQL Database table to a parquet file in ADLS2.
![pipeline_run_details_rowcnt1 execution details of the pipeline rowcnt1](/tipimages2/6729_azure-data-factory-mapping-data-flows.026.png)
Verify Upsert Incremental Lake to Synapse ADF Pipeline Results
The incremental upsert from ADLS copied over 493 rows to Synapse.
![pipeline_run_details_rowcnt2 execution details of the pipeline rowcnt2](/tipimages2/6729_azure-data-factory-mapping-data-flows.027.png)
Verify Source SQL Record Count
The reason we have 493 rows in because the source contains 493 rows with a created date greater than April 1, 2020 and since this was our incremental_watermark_value defined in the pipeline_parameter table, that is how many records the ADF pipeline is expected to incrementally load.
![pipeline_run_details_rowcnt3 execution details of the pipeline/SQL rowcnt3](/tipimages2/6729_azure-data-factory-mapping-data-flows.028.png)
Verify Lake Folder & Parquet File Path
We can also verify that ADLS folders along with a parquet file has been created.
![lake_file image of the lake file](/tipimages2/6729_azure-data-factory-mapping-data-flows.029.png)
Verify Destination Synapse Record Count
Finally, when we run a SQL count statement on the destination Synapse DW table, we can also confirm that it contains 493 records which confirms that the incremental pipeline worked as expected.
![pipeline_run_details_rowcnt4 execution details of the pipeline/SQL rowcnt4](/tipimages2/6729_azure-data-factory-mapping-data-flows.030.png)
Insert a Source SQL Record
Now that we have confirmed that the incremental sql to synapse pipeline worked as expected, let's also verify that the insert portion of the upsert command works as expected by adding an additional record to the source SQL table where created date is greater than April 1, 2020.
After adding the record and running a count, we can see that the query now returns 494 records rather than 493.
![pipeline_run_details_rowcnt5 execution details of the pipeline/SQL rowcnt5](/tipimages2/6729_azure-data-factory-mapping-data-flows.031.png)
Verify Incremental SQL to Lake ADF Pipeline Results
Once again, when we run the pipeline, we can see the new pipeline log results return a count of 494 from SQL to Lake.
![pipeline_run_details_rowcnt6 execution details of the pipeline/SQL rowcnt6](/tipimages2/6729_azure-data-factory-mapping-data-flows.032.png)
Verify Upsert Incremental Lake to Synapse ADF Pipeline Results
Also, we can see the new pipeline log results return a count of 494 from Lake to Synapse.
![pipeline_run_details_rowcnt7 execution details of the pipeline/SQL rowcnt7](/tipimages2/6729_azure-data-factory-mapping-data-flows.033.png)
Verify Destination Synapse Record Count
Finally, the destination Synapse DW Table also contains 494 records, which confirms that the insert worked as expected.
![pipeline_run_details_rowcnt8 execution details of the pipeline/SQL rowcnt8](/tipimages2/6729_azure-data-factory-mapping-data-flows.034.png)
Update a Source SQL Record
Let's run one final test to ensure that the Update command of the mapping data flow Upsert works as expected. For this test, I've updated the table to set the FacilityId to equal 100 where the created date is greater than April 1, 2020. This updated two rows in the source SQL table.
![update_table image of sql statement to update table](/tipimages2/6729_azure-data-factory-mapping-data-flows.035.png)
Verify Destination Synapse Record Count
As expected, the destination Synapse DW table currently has no records with FacilityID = 100.
![pipeline_run_details_rowcnt9 execution details of the pipeline/SQL rowcnt9](/tipimages2/6729_azure-data-factory-mapping-data-flows.036.png)
After running the pipeline again, we can verify that the destination Synapse DW table still contains 494 rows and has two records that have been updated with a FacilityId of 100. This finally confirms that both the update and insert commands along with incremental loads work as expected from the Source SQL table to ADLS and finally to Synapse DW.
![pipeline_run_details_rowcnt10 execution details of the pipeline/SQL rowcnt10](/tipimages2/6729_azure-data-factory-mapping-data-flows.037.png)
Next Steps
- For more information on creating parameters in mapping data flows, read Parameterizing mapping data flows.
- For more information on integrating mapping data flows with your data warehouse, read my article: Azure Data Factory Mapping Data Flow for Datawarehouse ETL.
- For more information on using mapping data flows for Big data lake aggregations, read my article: Azure Data Factory Mapping Data Flows for Big Data Lake Aggregations and Transformations.
- For more information on an Incremental ADF ETL process, read: Incrementally load data from Azure SQL Database to Azure Blob storage using the Azure portal.
About the author
![MSSQLTips author Ron L'Esteve](/images/ron-lesteve.jpg)
This author pledges the content of this article is based on professional experience and not AI generated.
View all my tips