Load Data Lake files into Azure Synapse Analytics Using Azure Data Factory


By:   |   Updated: 2020-03-09   |   Comments   |   Related: More > Azure

Problem

In my previous article, Azure Data Factory Pipeline to fully Load all SQL Server Objects to ADLS Gen2, I demonstrated how to create a dynamic, parameterized, and meta-data driven process to fully load data from a On-Premises SQL Servers to Azure Data Lake Storage Gen2. Next, I am interested in fully loading the parquet snappy compressed data files from ADLS gen2 into Azure Synapse DW. One of my previous articles discusses the COPY INTO statement syntax and how it can be used to load data into Synapse DW.

What other options are available for loading data into Azure Synapse DW from Azure Data Lake Storage Gen2 using Azure Data Factory?

Solution

Azure Data Factory's Copy activity as a sink allows for three different copy methods for loading data into Azure Synapse Analytics. In this article, I will explore the three methods: Polybase, Copy Command(preview) and Bulk insert using a dynamic pipeline parameterized process that I have outlined in my previous article.

Pre-requisites

Read and implement the steps outlined in my three previous articles:

  1. Azure Data Factory Pipeline to fully Load all SQL Server Objects to ADLS Gen2
  2. Logging Azure Data Factory Pipeline Audit Data
  3. COPY INTO Azure Synapse Analytics from Azure Data Lake Store gen2

Create the Datasets

As a starting point, I will need to create a source dataset for my ADLS2 Snappy Parquet files and a sink dataset for Azure Synapse DW.

DS_ADLS2_PARQUET_SNAPPY_AZVM_SYNAPSE

I'll start by creating my source ADLS2 Dataset with parameterized paths. Note that I have pipeline_date in the source field. This column is driven by the pipeline_date field in the pipeline_parameter table that I created in my previous article Azure Data Factory Pipeline to fully Load all SQL Server Objects to ADLS Gen2 and then populated in my next article, Logging Azure Data Factory Pipeline Audit Data. Basically, this pipeline_date column contains the max folder date, which is valuable in this process since there may be multiple folders and we want to be able to load the latest modified folder.

DS_ADLS2_PARQUET_SNAPPY_AZVM_SYNAPSE1 Connection config

I'll also add the parameters that I'll need as follows:

DS_ADLS2_PARQUET_SNAPPY_AZVM_SYNAPSE2 parameter config

The linked service details are below. Azure Key Vault is being used to store the credential secrets. This will be relevant in the later sections when we begin to run the pipelines and notice any authentication errors.

DS_ADLS2_PARQUET_SNAPPY_AZVM_SYNAPSE3 Linked Service config

DS_ADLS2_PARQUET_SNAPPY_AZVM_MI_SYNAPSE

The difference with this dataset compared to the last one is that this linked service connection does not use Azure Key Vault. I'll use this to test and switch between the Key Vault connection and non-Key Vault connection when I notice errors later.

DS_ADLS2_PARQUET_SNAPPY_AZVM_MI_SYNAPSE1 Connection Config

Similar to the previous dataset, add the parameters here:

DS_ADLS2_PARQUET_SNAPPY_AZVM_MI_SYNAPSE2 Parameters Config

The linked service details are below. Azure Key Vault is not being used here. Again, this will be relevant in the later sections when we begin to run the pipelines and notice any authentication errors.

DS_ADLS2_PARQUET_SNAPPY_AZVM_MI_SYNAPSE3 Linked Service Config

DS_ASQLDW

The sink connection will be to my Azure Synapse DW. I am using parameters to specify my schema and table name. This is a good feature when we need the for each loop to create multiple tables using the same sink dataset.

DS_ASQLDW Connection Config

Create the Pipeline

Now that my datasets have been created, I'll create a new pipeline and then add a Lookup connected to a ForEach loop.

p_lake_to_Synapse1 Pipeline image

Based on my previous article where I set up the pipeline parameter table, my lookup will get a list of tables that will need to be loaded to Azure Synapse. Note that currently this is specified by WHERE load_synapse =1.

p_lake_to_Synapse2 Lookup to get tables
SELECT [server_name]
      ,[src_type]
      ,[src_schema]
      ,[src_db]
      ,[src_name]
      ,[dst_type]
      ,[dst_name]
      ,[include_pipeline_flag]
      ,[partition_field]
      ,[process_type]
      ,[priority_lane]
      ,[pipeline_date]
      ,[pipeline_status]
      ,[dst_folder]
      ,[file_type]
  FROM [dbo].[pipeline_parameter]
WHERE load_synapse = 1

Within the settings of the ForEach loop, I'll add the output value of the Lookup. Remember to leave the 'Sequential' box unchecked to ensure multiple tables will process in parallel. The default 'Batch count' if left blank is 50.

p_lake_to_Synapse3 For each settings to copy each table

I'll also add one copy activity to the ForEach activity. Click the pencil icon to view the Copy activity.

p_lake_to_Synapse4 For each activities to copy each table

The source is set to DS_ADLS2_PARQUET_SNAPPY_AZVM_SYNAPSE, which uses an Azure Key Vault in the linked service connection.

I have added the dynamic parameters that I'll need. Note that the parameters were defined in the dataset.

p_lake_to_Synapse4 Copy data source within for each

Finally, I will choose my DS_ASQLDW dataset as my sink and will select 'Bulk Insert' with an 'Auto create table' option 'enabled'.

p_lake_to_Synapse5 Copy data sink within for each

Based on the current configurations of the pipeline, since it is driven by the pipeline_parameter table, when I add (n) number of tables/records to the pipeline parameter table and set the load_synapse flag to = 1, then the pipeline will execute and load all tables to Azure Synapse in parallel based on the copy method that I select.

Choose the Copy Method

There are three options for the sink copy method. PolyBase, Copy command (preview) and Bulk insert are all options that I will demonstrate in this section.

Bulk Insert

See BULK INSERT (-Transact-SQL) for more detail on the BULK INSERT Syntax.

Within the Sink of the Copy activity, set the copy method to BULK INSERT.

'Auto create table' automatically creates the table if it does not exist using the schema from the source file. This isn't supported when sink specifies stored procedure or copy activity is equipped with the staging settings. For my scenario, the source file is a parquet snappy compressed file that does not contain incompatible data types such as VARCHAR(MAX) so there should be no issues with the 'Auto Create Table' option.

Note that the Pre-copy script will run before the table is created so in a scenario using 'Auto create table' when the table does not exist, run it without the pre-copy script first to prevent errors then add the pre-copy script back once the tables have been created for on-going full loads.

If the default Auto Create Table option does not meet the distribution needs for custom distributions based on tables, then there is an 'Add dynamic content' that can be leveraged to use a distribution method specified in the pipeline parameter table per table.

TRUNCATE TABLE @{item().src_schema}[email protected]{item().dst_name}
Bulk Insert Bulk insert as sink copy method image

After running the pipeline, it succeeded using the BULK INSERT copy method.

PipelineStatus Bulk insert pipeline status

Below are the details of the Bulk Insert Copy pipeline status.

BulkInsertDetails details of the Bulk Insert Copy pipeline status.

After querying the Synapse table, I can confirm there are the same number of rows in the table.

SynapseCount Count of the newly created and loaded table.

The Bulk Insert method also works for an On-premise SQL Server as the source with Azure Synapse being the sink.

PolyBase

For more detail on PolyBase, read What is PolyBase?

Select PolyBase to test this copy method.

Polybase Polybase as sink copy method image

See Copy and transform data in Azure Synapse Analytics (formerly Azure SQL Data Warehouse) by using Azure Data Factory for more detail on the additional polybase options.

As a pre-requisite for Managed Identity Credentials, see the 'Managed identities for Azure resource authentication' section of the above article to provision Azure AD and grant the data factory full access to the database.

For more detail on verifying the access, review the following queries on Synapse DW:

select * from sys.database_scoped_credentials
select * from sys.database_role_members
select * from sys.database_principals

Also, when external tables, data sources, and file formats need to be created, the following queries can help with verifying that the required objects have been created:

select * from sys.external_tables
select * from sys.external_data_sources
select * from sys.external_file_formats

After configuring my pipeline and running it, the pipeline failed with the following error:

"ErrorCode=FailedDbOperation,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=Error happened when loading data into SQL Data Warehouse.,Source=Microsoft.DataTransfer.ClientLibrary,''Type=System.Data.SqlClient.SqlException,Message=External file access failed due to internal error: 'Error occurred while accessing HDFS: Java exception raised on call to HdfsBridge_IsDirExist. Java exception message:\r\nHdfsBridge::isDirExist - Unexpected error encountered checking whether directory exists or not: AbfsRestOperationException: Operation failed: \"This request is not authorized to perform this operation.\", 403, HEAD, https://lake.dfs.core.windows.net/sds//?upn=false&action=getAccessControl&timeout=90',Source=.Net SqlClient Data Provider,SqlErrorNumber=105019,Class=16,ErrorCode=-2146232060,State=1,Errors=[{Class=16,Number=105019,State=1,Message=External file access failed due to internal error: 'Error occurred while accessing HDFS: Java exception raised on call to HdfsBridge_IsDirExist. Java exception message:\r\nHdfsBridge::isDirExist - Unexpected error encountered checking whether directory exists or not: AbfsRestOperationException: Operation failed: \"This request is not authorized to perform this operation.\", 403, HEAD, https://lake.dfs.core.windows.net/sds//?upn=false&action=getAccessControl&timeout=90',},],'",

After researching the error, the reason is because the original Azure Data Lake Storage linked service from source dataset DS_ADLS2_PARQUET_SNAPPY_AZVM_SYNAPSE is using Azure Key Vault to store authentication credentials, which is an un-supported managed identity authentication method at this time for using PolyBase and Copy command.

After changing the source dataset to DS_ADLS2_PARQUET_SNAPPY_AZVM_MI_SYNAPSE which no longer uses Azure Key Vault, the pipeline succeeded using the polybase copy method.

Polybase Pipeline Details Details of polybase load from adls to synapse.

Copy command (preview)

For more detail on the copy command, read COPY (Transact-SQL) (preview).

Copy command will function similar to Polybase so the permissions needed for polybase will be more than sufficient for the copy command as well. For more information on COPY INTO, see my article on COPY INTO Azure Synapse Analytics from Azure Data Lake Store gen2. The article covers details on permissions, use cases and the SQL syntax for COPY INTO.

Copy Command Copy Command as sink copy method image

Similar to the Polybase copy method using Azure Key Vault, I received a slightly different error message:

ErrorCode=UserErrorSqlDWCopyCommandError,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=SQL DW Copy Command operation failed with error 'Not able to validate external location because The remote server returned an error: (403) Forbidden.',Source=Microsoft.DataTransfer.ClientLibrary,''Type=System.Data.SqlClient.SqlException,Message=Not able to validate external location because The remote server returned an error: (403) Forbidden.,Source=.Net SqlClient Data Provider,SqlErrorNumber=105215,Class=16,ErrorCode=-2146232060,State=1,Errors=[{Class=16,Number=105215,State=1,Message=Not able to validate external location because The remote server returned an error: (403) Forbidden.,},],'", "failureType": "UserError", "target": "Copy data1", "details": []

After changing to the linked service that does not use Azure Key Vault, the pipeline succeeded.

Copy Command Pipeline Details Details of the copy command pipeline

In this article, I created source Azure Data Lake Storage Gen2 datasets and a sink Azure Synapse Analytics dataset along with an Azure Data Factory pipeline driven by a parameter table to load snappy compressed parquet files into Azure Synapse using 3 copy methods: BULK INSERT, PolyBase, and Copy Command (preview).

Next Steps


Last Updated: 2020-03-09


get scripts

next tip button



About the author
MSSQLTips author Ron L'Esteve Ron L'Esteve is a seasoned Data Architect who holds an MBA and MSF. Ron has over 15 years of consulting experience with Microsoft Business Intelligence, data engineering, emerging cloud and big data technologies.

View all my tips
Related Resources





Comments For This Article





download


Recommended Reading

Adding Users to Azure SQL Databases

Getting Started with Azure Blueprints

Connect to On-premises Data in Azure Data Factory with the Self-hosted Integration Runtime - Part 1

Azure Data Factory vs SSIS vs Azure Databricks

Continuous database deployments with Azure DevOps





get free sql tips
agree to terms


Learn more about SQL Server tools