Loading Azure SQL Data Warehouse Dynamically using Azure Data Factory


By:   |   Updated: 2020-04-16   |   Comments   |   Related: More > Azure

Problem

In my last article, Load Data Lake files into Azure Synapse DW Using Azure Data Factory, I discussed how to load ADLS Gen2 files into Azure SQL DW using the COPY INTO command as one option. Now that I have designed and developed a dynamic process to 'Auto Create' and load my 'etl' schema tables into SQL DW with snappy compressed parquet files, I now want to explore options for creating and loading tables into a 'curated' schema, where I can dynamically define my schemas and distribution types at run-time to create my 'curated' schema tables. How can this be achieved dynamically using Azure Data Factory?

Solution

In my previous article, Azure Data Factory Pipeline to fully Load all SQL Server Objects to ADLS Gen2, I introduced the concept of a pipeline parameter table to track and control all SQL server tables, server, schemas and more. Essentially, this pipeline parameter table is set up to drive the Azure Data Factory orchestration process. To solve for dynamically being able to define my distribution types along with curated schemas, I will introduce a few new columns to this pipeline parameter table: [distribution_type], [dst_schema], and [dst_name]. I will then use these new columns within my Data Factory pipeline to dynamically create and load my curated tables from my 'etl' schema.

Pre-requisites

As a pre-requisite, I would recommend getting familiar with some of my previous articles related to this topic, which eventually lead to this process:

  1. Azure Data Factory Pipeline to fully Load all SQL Server Objects to ADLS Gen2
  2. Logging Azure Data Factory Pipeline Audit Data
  3. Using COPY INTO Azure Synapse Analytics from Azure Data Lake Store gen2
  4. Load Data Lake files into Azure Synapse DW Using Azure Data Factory

Dynamically Create and Load New Tables Using ADF Pre-Copy Script

As always, the process will begin with a look-up activity to the pipeline parameter table using a query where I can specify my flags and filters appropriately.

GetTables Lookup activity to get list of tables

For example, in this source lookup query, the following filters are applied. The pipeline_status = 'success' allows me to track if the files successfully made it to the lake and this is done via a SQL stored procedure. Also, it is important to note that I have quite a few columns in this pipeline parameter that help mw track steps throughout the end to end process. For the purpose of this demo, I am interested in columns [dst_schema], [dst_schema] and [distribution_type].

SELECT [ID]
      ,[server_name]
      ,[src_type]
      ,[src_schema]
      ,[src_db]
      ,[src_name]
      ,[dst_type]
      ,[dst_name]
      ,[include_pipeline_flag]
      ,[partition_field]
      ,[process_type]
      ,[priority_lane]
      ,[pipeline_date]
      ,[pipeline_status]
      ,[load_synapse]
      ,[load_frequency]
      ,[dst_folder]
      ,[file_type]
      ,[lake_dst_folder]
      ,[spark_flag]
      ,[data_sources_id]
      ,[dst_schema]
      ,[distribution_type]
      ,[load_sqldw_etl_pipeline_date]
      ,[load_sqldw_etl_pipeline_status]
      ,[load_sqldw_curated_pipeline_date]
      ,[load_sqldw_curated_pipeline_status]
      ,[load_delta_pipeline_date]
      ,[load_delta_pipeline_status]
  FROM [dbo].[pipeline_parameter]
WHERE load_synapse = 1
AND pipeline_status = 'success'
AND include_pipeline_flag = 1
AND process_type = 'full'
AND load_frequency = 'daily'

As an example, the dst_schema and distribution_type in the pipeline parameter table may look like this:

PipelineParamTableSample Sample of columns used in pipeline parameter table

As we move on to the ForEach activity, I will ensure that the Items within the 'settings' is filled in correctly to get the output of the lookup activity.

ForEachActivity Foreach activity used in the pipeline-config

Upon drilling into the ForEach activities, there are three activities: Copy-Table, SUCCESS-Stored Procedure and Create_ASQLDB_Log. My previous articles discussed logging so this article will focus on the details of the Copy-Table activity.

CopyTableSource CopyTable Source Dataset and properties

My source dataset is the SQLDW ETL schema.

SrcDataset Source Dataset connection config

And my sink dataset is defined as my 'curated' schema where I can parametrize the destination schema and name. Note that the source dataset contains parameters that will be needed from the source schema, however the sink dataset does not contain any parameters.

DstDataset Destination Dataset connection config

After creating my datasets, I can take a closer look at the pre-copy script. Note that I am using Bulk insert as the copy method since the data currently exists in the 'etl' schema on SQL DW and must be loaded to a curated schema.

I'll also set the table option to 'none' since I'll be creating tables using the following pre-copy script, which is basically a dynamic Create Table As Select Syntax that referenced my destination schema and name along with the distribution type. I am using a Select Top (0) because I only want to create the tables using this step and load them using the ADF Copy Activity.

CREATE TABLE @{item().dst_schema}[email protected]{item().dst_name}
WITH
    (
     CLUSTERED COLUMNSTORE INDEX,
     DISTRIBUTION = @{item().distribution_type}
    )
AS SELECT TOP (0) * FROM [email protected]{item().dst_name}
OPTION (LABEL = 'CTAS : @{item().dst_name}');
Pre-CopyScript PreCopy script sql syntax and config

After running the pipeline, all the curated tables will be created in SQL DW with the appropriate destination schema, name, and distribution type.

Dynamically Truncate & Load Existing Tables Using ADF Pre-Copy Script

In a scenario where I need to dynamically truncate and load existing tables rather than recreate the tables, I would approach it by simply truncating the destination table. This would be the only notable change from the previous pipeline.

CopyTableTruncate Pre-Copy Script to truncate table

Dynamically Drop, Create and Load Tables Using SQL DW Stored Procedure

Lastly, I am interested in exploring an option that will allow me to use store procedures on the SQL DW to drop and create my curated tables.

The pipeline design will be very similar to my previous pipelines by starting with a lookup and then flowing into a ForEach activity.

CTAS_StoredProc_Pipeline Pipeline flow for lookup and foreach using a CTAS Stored Proc

Within the ForEach, I have 3 stored procedure activities: 1) CTAS from pipeline parameter, 2) SUCCESS, and 3) FAIL. The success and fail stored procedures simply log the status of the pipeline run in the pipeline parameter table. For the purpose of this article, I will focus on the CTAS stored procedure.

ForEachActivity_CTAS Activities within the ForEach Loop

The selected stored procedure has been created within the SQL DW. Additionally, the destination name and schema have been defined as stored procedure parameters and are being passed to the stored procedure.

CTAS_Stored_Proc CTAS Stored proc config and parameters

When I head over to SSMS and script the stored procedure, I can see that the script is doing a few things:

  1. Declaring and setting the distribution_types along with etl, curated and schema/table names dynamically
  2. Dropping the curated_stage table if it exists
  3. Setting the SQL Syntax to Create the stage table as select * from etl.table with the distribution type dynamically set.
  4. Dropping the actual/original curated table
  5. Renaming the curated_stage to the actual/original curated table.
SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
 
CREATE PROC [etl].[ctas_from_pipeline_parameter] @schema [varchar](255),@name [varchar](255),@distribution_type [varchar](255) AS
BEGIN
 
declare @table varchar(255)
declare @table_stage varchar(255)
declare @table_etl varchar(255)
declare @sql varchar(max)
 
set @table = @schema + '.' + @name
set @table_stage = @table + '_stage'
set @table_etl = 'etl.' + @name
 
 
set @sql = 
'if object_id ('''+ @table_stage + ''',''U'') is not null drop table ' [email protected]_stage +';
 
CREATE TABLE ' + @table_stage + '
WITH
(
DISTRIBUTION = ' + @distribution_type + '
,CLUSTERED COLUMNSTORE INDEX
)
AS
SELECT  *
FROM    ' + @table_etl + ';
 
if object_id ('''+ @table + ''',''U'') is not null drop table ' [email protected] +';
 
RENAME OBJECT ' + @table_stage + ' TO ' + @name +';'
 
exec(@sql)
 
END
GO

In a scenario where I am interested in renaming the original curated table, rather than dropping the original curated table, I would use this script:

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
 
CREATE PROC [etl].[ctas_from_pipeline_parameter] @schema [varchar](255),@name [varchar](255),@distribution_type [varchar](255) AS
BEGIN
 
declare @table varchar(255)
declare @table_stage varchar(255)
declare @table_drop varchar(255)
declare @table_etl varchar(255)
declare @schematable_drop varchar(255)
declare @sql varchar(max)
 
 
set @table = @schema + '.' + @name
set @table_stage = @table + '_stage'
set @table_drop = @name + '_drop'
set @table_etl = 'etl.' + @name
set @schematable_drop = @table + '_drop'
 
set @sql = 
'if object_id ('''+ @table_stage + ''',''U'') is not null drop table ' [email protected]_stage +';
 
CREATE TABLE ' + @table_stage + '
WITH
(
DISTRIBUTION = ' + @distribution_type + '
,CLUSTERED COLUMNSTORE INDEX
)
AS
SELECT  *
FROM    ' + @table_etl + ';
 
if object_id ('''+ @table + ''',''U'') is not null rename object ' [email protected] + ' TO ' + @table_drop +';
 
RENAME OBJECT ' + @table_stage + ' TO ' + @name +';
 
if object_id ('''+ @schematable_drop + ''',''U'') is not null drop table ' [email protected]_drop +';'
 
exec(@sql)
 
END
GO

Lastly, it is important to note that within SQL DW, if I am attempting to drop or rename a table that has dependencies linked to Materialized View that has been created, then the drop and rename script will fail.

In this article, I outlined steps to Dynamically Create & Load New Tables Using ADF Pre-Copy Script, Dynamically Truncate & Load Existing Tables Using ADF Pre-Copy Script, and finally Dynamically Drop, Create, & Load Tables Using SQL DW Stored Procedure.

Next Steps
  • For more detail on designing distributed tables in SQL DW, see Guidance for designing distributed tables in SQL Analytics
  • Similar to dynamically defining distribution types, explore options for dynamically defining indexes, partitions and more in the pipeline parameter table and dynamic SQL within Azure Data Factory.
  • See my article Design and Manage Azure SQL Data Warehouse for more detail on working with SQL DW.
  • Explore and research options for dropping and/or renaming tables that have dependencies linked to Materialized Views.


Last Updated: 2020-04-16


get scripts

next tip button



About the author
MSSQLTips author Ron L'Esteve Ron L'Esteve is a seasoned Data Architect who holds an MBA and MSF. Ron has over 15 years of consulting experience with Microsoft Business Intelligence, data engineering, emerging cloud and big data technologies.

View all my tips
Related Resources




More SQL Server Solutions











Post a comment or let the author know this tip helped.

All comments are reviewed, so stay on subject or we may delete your comment. Note: your email address is not published. Required fields are marked with an asterisk (*).

*Name
*Email
Email me updates

Signup for our newsletter

I agree by submitting my data to receive communications, account updates and/or special offers about SQL Server from MSSQLTips and/or its Sponsors. I have read the privacy statement and understand I may unsubscribe at any time.






download


Recommended Reading

Adding Users to Azure SQL Databases

Connect to On-premises Data in Azure Data Factory with the Self-hosted Integration Runtime - Part 1

Continuous database deployments with Azure DevOps

Azure Data Factory Pipeline Email Notification Part 1

Transfer Files from SharePoint To Blob Storage with Azure Logic Apps





get free sql tips
agree to terms


Learn more about SQL Server tools