Azure Data Factory Multiple File Load Example - Part 2


By:   |   Updated: 2020-02-03   |   Comments (12)   |   Related: More > Azure Data Factory


Problem

In this two-part tip, we are created a metadata-driven pipeline which will copy multiple flat files from Azure blob storage to an Azure SQL Database. The flat files can have different delimiters and are stored in different folders and there are multiple destination tables as well.

Solution

In part 1 of this tip, we created the metadata table in SQL Server and we also created parameterized datasets in Azure Data Factory. In this part, we will combine both to create a metadata-driven pipeline using the ForEach activity.

If you want to follow along, make sure you have read part 1 for the first step.

Step 2 – The Pipeline

With the datasets ready, we can now start on the pipeline. The first action is retrieving the metadata. In a new pipeline, drag the Lookup activity to the canvas.

pipeline lookup

With the following query, we can retrieve the metadata from SQL Server:

SELECT
     b.[ObjectName]
    ,FolderName = b.[ObjectValue]
    ,SQLTable   = s.[ObjectValue]
    ,Delimiter  = d.[ObjectValue]
FROM [dbo].[Metadata_ADF] b
JOIN [dbo].[Metadata_ADF] s ON b.[ObjectName] = s.[ObjectName]
JOIN [dbo].[Metadata_ADF] d ON b.[ObjectName] = d.[ObjectName]
WHERE   b.[SourceType] = 'BlobContainer'
    AND s.[SourceType] = 'SQLTable'
    AND d.[SourceType] = 'Delimiter';

The advantage of datasets is you can use them dynamically with parameters, but you can also use it for any query connecting to the same database. This means we can use the dataset we created before as a placeholder for this metadata query (as long as the metadata and the destination tables are in the same database of course).

specify lookup metadata query

All we have to do is change the Use query property to Query and fill in the query below. Since the query is returning more than one row, don’t forget to deselect the First row only checkbox. We did define a parameter on the table name though, so we need to specify some value, otherwise an error is returned. This can be solved by specifying a dummy value, such as _notSet in this example.

We can preview the data:

metadata preview

If the dummy table name was not specified, the following error would have been returned by the preview:

preview error

Next, we add a ForEach iterator to the pipeline and connect it with the Lookup activity.

add foreach

On the settings pane of the ForEach activity, click on Add dynamic content.

settings foreach dynamic content

Here we’re going to select the output of the Lookup activity.

add output to expression

At the end of the expression, add .value so the full expression becomes:

@activity('Retrieve Metadata').output.value

The settings tab should now look like this:

settings tab finished

Go to the Activities tab and click on Add activity.

add activity to foreach

This will take you to a new pipeline canvas, where we add the Copy Data activity.

You can go back to the original pipeline by selecting the DynamicPipeline link at the top of the canvas. Go to the Source tab of the Copy Data activity and select the csv_movie_dynamic dataset. You have to specify the parameter values for the FolderName and the DelimiterSymbol parameters. This can be done using the following expression:

@{item().ObjectValue}

Here ObjectValue is a metadata column from the Lookup activity. You need to replace it with the actual column name you need. The tab for the source then becomes:

source configuration

A wildcard for the file name was also specified, to make sure only csv files are processed. For the sink, we need to specify the sql_movies_dynamic dataset we created earlier.

sink configuration

Here, we need to specify the parameter value for the table name, which is done with the following expression:

@{item().SQLTable}

It’s also possible to specify a SQL statement that will be executed at the start of the Copy Data activity, before any data is loaded. In the Pre-copy script field, we can specify an expression that will truncate the destination table with the following expression:

TRUNCATE TABLE [email protected]{item().SQLTable}

Make sure no mapping is specified at all in the Mapping tab. Again, this set-up will only work if the columns in the flat file are the same as in the destination table.

no mapping

Step 3 – Debugging The Pipeline

Go back to the original pipeline (containing the Lookup and the ForEach) and hit the debug button at the top. If everything went successful, you’ll get an output like this:

success debug

By clicking on the output arrow for the Retrieve Metadata activity, we can view the metadata retrieved from the SQL Server database:

output lookup

For the ForEach iterator, we can verify two items were declared in the input:

input data

For the input of one of the CopyData activities, we can verify the truncate table statement for the sink was constructed correctly:

input copydata

For the output of the CopyData activity (for the files with a semicolon), we can see two files were indeed processed in parallel:

copydata output semicolon

While for the other CopyData activity, only one file (with a comma) was processed:

copydata output comma

When you click on the glasses next to one of the CopyData activities in the debug output, you can see more performance related information:

copydata performance

As you can see, two files were processed with a total of 200 rows for that one specific CopyData activity.

It’s important to notice there are actually two parallel operations in this set-up:

  1. The ForEach loop will process each blob folder separately in parallel (semicolon files vs comma files).
  2. In an iteration of the ForEach loop, the CopyData activity itself will process all blob files found in one folder also in parallel (2 files found with semicolon data).

Conclusion

In this tip we saw how we can build a metadata-driven pipeline in Azure Data Factory. Using metadata stored in a key-value pair table in SQL Server, we can use this data to populate parameters of the different datasets. Because of those parameters, we can use a single dataset dynamically to pick up multiple types of files or write to different tables in a database. The ForEach iterator loops over the metadata and executes a CopyData activity dynamically.

Next Steps


Last Updated: 2020-02-03


get scripts

next tip button



About the author
MSSQLTips author Koen Verbeeck Koen Verbeeck is a BI professional, specializing in the Microsoft BI stack with a particular love for SSIS.

View all my tips





Comments For This Article




Monday, July 27, 2020 - 2:34:26 PM - Koen Verbeeck Back To Top (86200)

Hi Kunal,

the metadata needs to come from somewhere, so that's why I used a SQL table in this tip.
I've also noticed issues with the schema. The best option is to not use any column metadata in the linked service and in the copy data activity. This requires that the source file and the destination have the exact same columns.

Koen


Friday, July 24, 2020 - 1:26:50 PM - Kunal Panchal Back To Top (86190)

Is there any way to do this dynamically without using the extra sql table,

I tried using get metadata activity and its copying first file into first table correctly but the second file is throwing error while being copied as schema of first file is present there.

In this case two tables were created successfully just schema is the issue.

Any suggestion would be appreciated ?


Tuesday, April 21, 2020 - 6:24:58 AM - Mehboob Back To Top (85430)

Thanks Koen. I know now where I was committing mistake. You just pointed it to perfection.

Regards,

Mehboob


Monday, April 20, 2020 - 1:37:02 PM - Koen Verbeeck Back To Top (85424)

Hi Mehboob,

I just ran my pipeline and it succeeded without any issues.
Row 46 of the file is "Lock, Stock and 2 smoking barrels", so there's a comma in the text, besided the semicolon which is the actual delimiter.
Make sure you set a parameter on the column delimiter in the dataset configuration (described in part 1).

Regards,
Koen


Monday, April 20, 2020 - 11:02:43 AM - Mehboob Back To Top (85420)

Hi,

Followed exactly the way you have mentioned above, but there seems to have an additional delimiter in the source Flat file TopMovies_Part2.csv which is why Copy activity and in-turn the Loop Over metadata failed.

Error message:

{
    "errorCode": "2200",
    "message": "ErrorCode=DelimitedTextMoreColumnsThanDefined,'Type=Microsoft.DataTransfer.Common.Shared.HybridDeliveryException,Message=Error found when processing 'Csv/Tsv Format Text' source 'TopMovies_Part2.csv' with row number 46: found more columns than expected column count: 1.,Source=Microsoft.DataTransfer.Common,'",
    "failureType": "UserError",
    "target": "Copy Blob to SQL",

Thursday, February 13, 2020 - 4:37:25 AM - Koen Verbeeck Back To Top (84417)

Hi Vinnie,

it should work as well with Azure Synapse, as long as the copy activity supports it. Another option might be to use external tables (Polybase) to get data into Synapse. To load data from on prem, you can use a self-hosted integration runtime, as explained here:

https://www.mssqltips.com/sqlservertip/5812/connect-to-onpremises-data-in-azure-data-factory-with-the-selfhosted-integration-runtime--part-1/

In theory, this should support the same pattern as described in this tip, but I haven't tried it out myself.
SSIS can also be used, depends a bit on what the skills in the team are and if you want to pay for a Azure-SSIS IR.

Koen


Tuesday, February 11, 2020 - 7:22:08 PM - Vinnie Back To Top (84365)

Koen, Thank you for this article - very insightful. A couple of "newbie" questions - will this method work just as well with Azure Synapse (SQL Data Warehouse)? Also, Im working on a POC - in this design work flow, what would be the automated mechanism to migrate files from on prem to blob or data lake storage. I see a lot of examples of migrating from ADLS to Azure database or data warehouse, but I'm not sure what best practices are around getting files from on prem data sources to ADLS. Is this achievable via ADF or SSIS?


Thursday, February 06, 2020 - 6:23:36 AM - Anurag Back To Top (84206)

Yes, we have same file structure. In future we may have some new Tag/Column/KeyValue but will be same for all the files. How to handle it dynamically. ? Our ADF will fail due to new KeyValue in it.

First we have to get all the unique key value from the files and compare it with SQL table. If we have any extra column, we have to add in table. Once it is done, automatically mapping should done and load the data into the sql table.

Below are the step we need to follow 

1. Check the unique KeyValue from all the files

2. Compare with SQL table and if we have any extra column, we need to add it in SQL Table

3. Dynamic mapping between source and Sink.

Regards, Anurag


Wednesday, February 05, 2020 - 2:37:54 AM - Koen Verbeeck Back To Top (84159)

Hi Anurag,

do the files have the same schema?

Regards,
Koen


Tuesday, February 04, 2020 - 11:24:30 AM - Anurag Back To Top (84148)

Thanks for your response,

Yes, I will have to load multiple files in single table. 


Tuesday, February 04, 2020 - 3:52:30 AM - Koen Verbeeck Back To Top (84139)

Hi Anurag,

I'm not 100% following. Do you have to load multiple JSON files into the same table, or into different tables? Do the JSON files all have the same schema, or are they different?

Koen


Monday, February 03, 2020 - 2:49:01 AM - Anurag Back To Top (84115)

Hi,

I have to get all json files data into a table from azure data factory to sql server data warehouse.I am able to load the data into a table with static values (by giving column names in the dataset) but generating in dynamic I am unable to get that using azure data factory. Can anyone help on this solution to get dynamically in azure data factory?



download





Recommended Reading

Azure Data Factory Pipeline Email Notification Part 1

Azure Data Factory Lookup Activity Example

Azure Data Factory Get Metadata Example

Azure Data Factory vs SSIS vs Azure Databricks

Getting Started with Delta Lake Using Azure Data Factory








get free sql tips
agree to terms


Learn more about SQL Server tools