Getting Started with Delta Lake Using Azure Data Factory

By:   |   Comments (5)   |   Related: > Azure Data Factory


Problem

While working with Azure Data Lake Gen2 and Apache Spark, I began to learn about both the limitations of Apache Spark along with the many data lake implementation challenges. I also learned that an ACID compliant feature set is crucial within a lake and that a Delta Lake offers many solutions to these existing issues. What is a Delta Lake and why do we need an ACID compliant lake? What are the benefits of Delta Lake and what is a good way of getting started with Delta Lake?

Solution

Delta Lake is an open source storage layer that guarantees data atomicity, consistency, isolation, and durability in the lake. In short, a Delta Lake is ACID compliant. In addition to providing ACID transactions, scalable metadata handling and more, Delta Lake runs on an existing Data Lake and is compatible with Apache Spark APIs. There are a few methods of getting started with Delta Lake. Databricks offers notebooks along with compatible Apache Spark APIs to create and manage Delta Lakes. Alternatively, Azure Data Factory's Mapping Data Flows, which uses scaled-out Apache Spark clusters, can be used to perform ACID compliant CRUD operations through GUI designed ETL pipelines. This article will demonstrate how to get started with Delta Lake using Azure Data Factory's new Delta Lake connector through examples of how to create, insert, update, and delete in a Delta Lake.

Why an ACID Delta Lake

There are many advantages to introducing Delta Lake into a Modern Cloud Data architecture. Traditionally, Data Lakes and Apache Spark are not ACID compliant. Delta Lake introduces this ACID compliance to solve many the following ACID compliance issues.

Atomicity: Write either All Data or Nothing. Apache Spark save modes do not utilize any locking and are not atomic. With this, a failed job may leave an incomplete file and may corrupt data. Additionally, a failing job may remove the old file and corrupt the new file. While this seems concerning, Spark does have in built data frame writer APIs that are not atomic but behaves so for append operations. This however does come with performance overhead for use with cloud storage.

Consistency: Data is always in a valid state. If the Spark API writer deletes an old file and creates a new one and the operation is not transactional, then there will always be a period of time when the file does not exist between the deletion of the old file and creation of the new. In that scenario, if the overwrite operation fails, this will result in data loss of the old file. Additionally, the new file may not be created. This is a typical spark overwrite operation issue related to Consistency.

Isolation: Multiple transactions occur independently without interference. This means that when writing to a dataset, other concurrent reads or writes on the same dataset should not be impacted by the write operation. Typical transactional databases offer multiple isolation levels. While Spark has task and job level commits, since it lacks atomicity, it does not have isolation types.

Durability: Committed Data is never lost. When Spark does not correctly implement a commit, then it overwrites all the great durability features offered by cloud storage options and either corrupts and/or loses the data. This violates data Durability.

Now that we have an understanding of the current data lake and spark challenges along with benefits of an ACID compliant Delta Lake, let's get started with the Demo.

Pre-Requisites

For this Demo, be sure to successfully create the following pre-requisites.

1) Create a Data Factory V2: Data Factory will be used to perform the ELT orchestrations. Additionally, ADF's Mapping Data Flows Delta Lake connector will be used to create and manage the Delta Lake. For more detail on creating a Data Factory V2, see Quickstart: Create a data factory by using the Azure Data Factory UI.

2) Create a Data Lake Storage Gen2: ADLSgen2 will be the Data Lake storage on top of which the Delta Lake will be created. For more detail on creating ADLSgen2, see: Creating Your First ADLS Gen2 Data Lake.

3) Create Data Lake Storage Gen2 Container and Zones: Once your Data Lake Gen2 is created, also create the appropriate containers and Zones. For more information on designing ADLS Gen2 Zones, see: Building your Data Lake on Azure Data Lake Storage gen2. This demo will use the Raw Zone to store a sample source parquet file. Additionally, the Staging Zone will be used for Delta Updates, Inserts, Deletes and additional transformations. Though the Curated Zone will not be used in this demo, it is important to mention that this zone may contain the final E-T-L, advanced analytics, or data science models that are further transformed and curated from the Staging Zone.

ADLSZones Image of ADLS2 zones/folders

4) Upload Data to Raw Zone: Finally, you'll need some data for this demo. By searching for 'sample parquet files', you'll obtain access to a number of online GitHub Repos or sample downloadable data. The following GitHub Repo data was used for this demo.

UserDataFil Image of Sample UserData file

5) Create a Data Factory Parquet Dataset pointing to the Raw Zone: The final pre-requisite would be to create a parquet format dataset in the newly created instance of ADF V2 pointing to the sample parquet file stored in the Raw Zone.

UserDataConnection Image of UserData Connection setting in ADF

Create and Insert into Delta Lake

Now that all pre-requisites are in place, we are ready to create the initial delta tables and insert data from our Raw Zone into the delta tables.

Let's begin by creating a new Data Factory pipeline and adding a new 'Mapping Data Flow' to it. Also remember to name the Pipeline and Data Flows sensible names, much like the sample below.

MDFInsert Image of Mapping Data Flows canvas for insert

Within the Data Flow, add a source and sink with the following configurations. Schema Drift may be enabled as needed for the specific use case.

For more detail on Schema Drift, see Schema drift in mapping data flow.

Sampling offers a method to limit the number of rows from the source, mainly used for testing and debugging purposes.

MDFInsertSettingsSourc Image of mapping data flows etl flow source settings for Inserts

Since Delta Lake leverages Spark's distributed processing power, it is capable of partitioning data appropriately, however, for purposes of demoing the capability of manually setting partitioning, I've configured 20 Hash partitions on the ID column. Click here, read more about the various options for Partition Types.

MDFOptimizeInsertSourc Image of Optimize settings for MDF partitioning.

After, adding the destination activity, ensure that the sink type is set to Delta. For more information on delta in ADF, see Delta format in Azure Data Factory. Note that Delta is available as both a source and sink in Mapping Data Flows. Also, you will be required to select the Linked Service once the sink type of Delta is selected.

MDFSinkInsert Image of MDF

Under the Settings tab, ensure that the Staging folder is selected and select Insert for the Update Method. Also, select Truncate table if there is a need to truncate the Delta Table before loading it.

For more information on Vacuum Command, see: Vacuum a Delta table (Delta Lake on Databricks). Essentially, Vacuum will remove files that are no longer referenced by delta tables and are older than the retention threshold in hours. The default is 30 days if the value is left at 0 or empty.

MDFInsertSinkSettings Settings for Insert Sink in MDF

Finally, within the Optimize tab, simply use the current partitioning since the source partitioning with flow downstream to the sink.

MDFOptimizeInsertSink Partitioning options for optimizing Sink

As expected, once the pipeline is triggered and completed running, we can see that there are 13 new columns created across 20 different partitions.

SrcPipelineResults Image of ADF MDF pipeline run details.

While looking at the ADLS2 staging folder, we see that a delta_log folder along with 20 snappy compressed parquet files have been created.

DeltaLakeDetail Image of delta lake partitioned files.

Open the delta_log folder to view the two log files. For more information on understanding the delta logs, read: Diving Into Delta Lake: Unpacking The Transaction Log.

DeltaLogDetail Image of Delta log files

After checking the new data in the Staging Delta Lake, we can see there are new records inserted.

InsertData Image of insert data results reading from parquet file.

Update Delta Lake

So far, we have covered Inserts into the Delta Lake. Next, let's take a look at how Data Factory can handle Updates to our delta tables.

Similar to inserts, create a new ADF pipeline with a mapping data flow for Updates.

MDFUpdateDeltaLak Settings for MDFUpdate Delta Lake

For this Update demo, let's update the first and last name of the user and convert it to lower case. To do this, we add a Derived Columns and Alter Row transform activity to the Update Mapping Data Flow canvas.

MDFUpdateDeltaSourceSettings Source Settings for updating Mapping Data Flows parquet.

The source data is still our Staging Delta Lake that was also configured for the Inserts. For more detail on Time Travel, see: Introducing Delta Time Travel for Large Scale Data Lakes.

MDFUpdateSourceOptions Image of Update Source Options.

The derived columns convert first and last name to lower case using the following expressions. Mapping Data Flows is capable of handling extremely complex transformations in this stage.

MDFUpdateDerivedColSettings Image of Derived column settings for Update

For the alter row settings, we need to specify an Update if condition of true() to update all rows that meet the criteria.

ADFAlterRowSettings Settings for the AlterRow Updates

Verify the sink configurations.

MDFSinkUpdat Image of Sink Settings for Update MDF

Ensure that the sink is still pointing to the Staging Delta Lake data. Also, select Allow Update as the update method. To show that multiple key columns can be simultaneously selected, there are 3 columns selected.

MDFUpdateSink Image of update settings for mdf sink.

After the pipeline is saved and triggered, we can see that the results reflect the first and last names have been updated to lower case values.

MDFUpdateData Image of Data showing updates as expected.

Delete from Delta Lake

To recap, we have covered Inserts and Updates till now. Next, let's look at an example of how Mapping Data Flows handles deletes in Delta Lake.

Similar to the Inserts and Updates, create a new Data Factory and Mapping Data Flow.

MDFDeleteDelta Image of MDF for Deleting from Delta Lake

Configure the Delta source settings.

MDFDeleteSourceSettings Image of Source settings for delete delta

Since we are still working with the same Staging Delta Lake, these source settings will be configured similar to Inserts and Updates.

MDFDeleteSourceOptions Image of Source options for delete delta

For this example, let's delete all records were gender = male. To do this, we need to configure the alter row conditions to Delete if gender == 'Male'.

MDFDeleteAlterRow Image of Alter Row settings for delete delta

Finally, configure the sink delta settings.

MDFDeleteSink Image of sink settings for delete delta

Select the Staging Delta Lake for the sink, select 'Allow Delete' and select the desired key columns.

MDFDeleteSinkSettings Image2 of sink settings for delete delta

After publishing and triggering this pipeline, notice how all records where gender = Male have been deleted.

MDFDeleteData Image of delete data file as expected.

Explore Delta Logs

Lastly, let's go ahead and take a look at the Delta Logs to briefly understand how the logs have been created and populated. The main commit info files are generated and stored in the Insert, Update, and Delete JSON commit files. Additionally, CRC files are created. CRC is a popular technique for checking data integrity as it has excellent error detection abilities, uses little resources and is easily used.

ExploreDeltaLogs Image of complete delta logs after insert, update, delete are complete.

Insert

As we can see from opening the Insert JSON commit file, it contains commit info for the insert operations.

{"commitInfo":{"timestamp":1594782281467,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"isolationLevel":"WriteSerializable","isBlindAppend":true}}
DeltaLogInsert Image of Delta Log Insert

Update

Similarly, when we open the Update JSON commit file, it contains commit info for the update operations.

{"commitInfo":{"timestamp":1594782711552,"operation":"MERGE","operationParameters":{"predicate":"(((source.`id` = target.`id`) AND (source.`registration_dttm` = target.`registration_dttm`)) AND (source.`ip_address` = target.`ip_address`))","updatePredicate":"((NOT ((source.`ra2b434a305b34f2f96cd5b4b4149455e` & 2) = 0)) OR (NOT ((source.`ra2b434a305b34f2f96cd5b4b4149455e` & 8) = 0)))","deletePredicate":"(NOT ((source.`ra2b434a305b34f2f96cd5b4b4149455e` & 4) = 0))"},"readVersion":0,"isolationLevel":"WriteSerializable","isBlindAppend":false}}
DeltaLogUpdat Image of Delta Log Update

Delete

Lastly, when we open the Delete JSON commit file, it contains commit info for the Delete operations.

"commitInfo":{"timestamp":1594783812366,"operation":"MERGE","operationParameters":{"predicate":"(((source.`id` = target.`id`) AND (source.`registration_dttm` = target.`registration_dttm`)) AND (source.`ip_address` = target.`ip_address`))","updatePredicate":"((NOT ((source.`ra079d97a688347b581710234d2cc4b63` & 2) = 0)) OR (NOT ((source.`ra079d97a688347b581710234d2cc4b63` & 8) = 0)))","deletePredicate":"(NOT ((source.`ra079d97a688347b581710234d2cc4b63` & 4) = 0))"},"readVersion":1,"isolationLevel":"WriteSerializable","isBlindAppend":false}}
DeltaLogDelet Image of Delta Log Delete
Next Steps


sql server categories

sql server webinars

subscribe to mssqltips

sql server tutorials

sql server white papers

next tip



About the author
MSSQLTips author Ron L'Esteve Ron L'Esteve is a trusted information technology thought leader and professional Author residing in Illinois. He brings over 20 years of IT experience and is well-known for his impactful books and article publications on Data & AI Architecture, Engineering, and Cloud Leadership. Ron completed his Master’s in Business Administration and Finance from Loyola University in Chicago. Ron brings deep tec

This author pledges the content of this article is based on professional experience and not AI generated.

View all my tips



Comments For This Article




Friday, January 12, 2024 - 4:14:50 PM - Willie Back To Top (91854)
Hi Ron,
Great demo ! Thanks. Is there a way to add a runtime column ?
Rgds

Tuesday, November 8, 2022 - 6:31:34 PM - Valentin Loghin Back To Top (90671)
Hi Ron,
I tried the Upsert , at every execution all the records are reinserted , do you have an work arround ?

Tuesday, June 14, 2022 - 12:16:40 PM - Hebi Back To Top (90160)
Hello,

Can we time travel back to where you didn't delete all the males?

Regards,
Hebi

Friday, February 12, 2021 - 10:02:17 AM - Ron LEsteve Back To Top (88229)
Hello,

To verify the results, I used Azure Data Factory's Mapping Data Flows 'Data Preview' tab in the Delta Lake sink 'DSTStagingDeltaLake'. Also, 'debug mode' will need to be enabled to view results in Data Preview.

Thank You

Ron L

Friday, February 12, 2021 - 6:29:25 AM - Prajwal Hegde Back To Top (88225)
"After the pipeline is saved and triggered, we can see that the results reflect the first and last names have been updated to lower case values" - Could you please tell me which azure app/services or how you are checking the final results. I am confused.














get free sql tips
agree to terms