Databricks Delta Change Data Feed
The introduction of delta file format within a data lake has been a modern approach to managing changing records and data since regular parquet file formats are immutable and there is no graceful method of performing CRUD operations on these native parquet file formats. Despite the advantages of delta format files in the data lake, this change data capture process also comes with significant overhead of having to scan and read the entire files even if only a few records within have changed. Change Data Feed within Databricks supplements this change data capture (CDC) process by storing meta-data about cdc records for optimized management of these changed records. How can we get started with Delta Change Data Feed in Databricks?
Change Data Feed enables you to only read a delta table’s change feed rather than the entire table to capture and manage changes. It requires you to manually enabling a delta table’s change data feed properties and works with runtime versions of 8.2 and above. This configuration can be set at either the table for individual tables or cluster level for all tables associated with the cluster. Note that there is some additional overhead with capturing and storing additional CDC related meta-data. Change Data Feed supports batch and streaming data. In this article, you will learn how to implement a batch Change Data Feed process through an end-to-end exercise.
Design and Implement Change Data Feed
The process of implementing Change Data Feed begins by creating a Databricks Cluster of 8.2 and then creating the required databases and tables with change data feed enabled. For this scenario, you will learn how to create a Silver table that has change feed enabled which will then propagate the changed records to a Gold table.
Create Database and Tables
Once you have created a cluster and SQL Databricks notebook, run the following script to create the database. Additionally, the script will drop tables that already exist in the database.
Here is the script that you will need to run, as shown in the Figure above.
CREATE DATABASE IF NOT EXISTS cdc; DROP TABLE IF EXISTS cdc.OrdersSilver DROP TABLE IF EXISTS cdc.OrdersGold
Next, go ahead and create your OrdersSilver table by running the following script. In the subsequent sections, you will begin inserting, updating and deleting data from this table. Notice that the format is DELTA. Additionally, the table will be created in your Azure Data Lake Storage gen2 account which you will need to ensure is properly mounted. Finally, notice that the table properties specify that this table must be enables for change data feed.
Here is the code that you will need to run to create the OrdersSilver table, as shown in the Figure above.
CREATE TABLE cdc.OrdersSilver ( OrderID int, UnitPrice int, Quantity int, Customer string ) USING DELTA LOCATION "/mnt/raw/OrdersSilver" TBLPROPERTIES (delta.enableChangeDataFeed = true);
Once the delta table is created, notice that it will exist within your ADLS gen2 account and will automatically have a delta_log associated with it.
Similarly, go ahead and create an OrdersGold table by running the following script. This gold table is also a delta table, but will not need change feed enabled since it is already enabled for the OrdersSilver table and the changes from that table will be propagated into this gold table. Notice also that this OrdersGold table will contain an OrderTotal column which is simply the UnitPrice * Quantity from the OrdersSilver table. The structure of this OrdersGold table is slightly different from the OrdersSilver table and is intended to be an aggregated, production ready table.
Here is the script that you will need to run to create the OrdersGold table.
CREATE TABLE cdc.OrdersGold ( OrderID int, OrderTotal int, Customer string ) USING DELTA LOCATION "/mnt/raw/OrdersGold"
As expected, once the OrdersGold delta table is created, it will appear within your ADLS gen2 account along with the associated delta_log folder.
Insert Data into Tables
Now that you have created your silver and gold order tables, go ahead and run the following script to insert records into the OrdersSilver table. Notice that there are 5 records that will be inserted into the table.
Here is the code that you will need to run to insert data into the OrdersSilver table.
INSERT INTO cdc.OrdersSilver SELECT 1 OrderID, 96 as UnitPrice, 5 as Quantity, "A" as Customer UNION SELECT 2 OrderID, 450 as UnitPrice, 10 as Quantity, "B" as Customer UNION SELECT 3 OrderID, 134 as UnitPrice, 7 as Quantity, "C" as Customer UNION SELECT 4 OrderID, 847 as UnitPrice, 8 as Quantity, "D" as Customer UNION SELECT 5 OrderID, 189 as UnitPrice, 15 as Quantity, "E" as Customer; SELECT * FROM cdc.OrdersSilver
Similarly, run the following script to insert data into the OrdersGold table and verify that the results are as expected.
Here is the script that you will need to run to insert data into the OrdersGold table.
INSERT INTO cdc.OrdersGold SELECT OrderID, UnitPrice * Quantity AS OrderTotal, Customer FROM cdc.OrdersSilver; SELECT * FROM cdc.OrdersGold
Change Data Capture
Since you have enabled delta change feed in the prior steps of the OrdersSilver table, run the following script to create a temporary view which will show you the cdc specific changes in relation to the OrdersSilver table. Notice that you also have the option to specify a range of versions. You could use this view at any point in time to retrieve a list of cdc changes for the OrdersSilver table.
Here is the code that you will need to run to create the latest_version view.
CREATE OR REPLACE TEMPORARY VIEW latest_version as SELECT * FROM (SELECT *, rank() over (partition by OrderID order by _commit_version desc) as rank FROM table_changes('cdc.OrdersSilver', 2, 5) WHERE _change_type !='update_preimage') WHERE rank=1
Now its time to run some CRUD operations on your OrdersSilver table to demonstrate how changes are handled in relation to inserts, updates, and deletes. After running the update, insert, and delete script on the OrdersSilver table, run the select statement included in the script to verify that the change type has been accurately executed.
Here is the script that you will need to run to update, insert, delete and finally view the committed changes made to the OrdersSilver table.
UPDATE cdc.OrdersSilver SET Quantity = 20 WHERE OrderID = 1; DELETE FROM cdc.OrdersSilver WHERE Customer = 'D'; INSERT INTO cdc.OrdersSilver SELECT 6 OrderID, 100 as UnitPrice, 10 as Quantity, "F" as Customer; SELECT * FROM table_changes('cdc.OrdersSilver', 2, 5) order by _commit_timestamp;
Once you have verified that the OrdersSilver has accurately captured the desired changes, its time to run the following script to merge the changes from the OrdersSilver into the OrdersGold table using the latest_version view. After you run the script, notice that there are 3 affected rows for the deleted, updated, and inserted rows.
Here is the code that you will need to run to perform the insert, update and delete on the OrdersGold table.
MERGE INTO cdc.OrdersGold og USING latest_version os ON og.OrderID = os.OrderID WHEN MATCHED AND os._change_type='update_postimage' THEN UPDATE SET OrderTotal = os.UnitPrice * os.Quantity WHEN MATCHED AND os._change_type='delete' THEN DELETE WHEN NOT MATCHED THEN INSERT (OrderID, OrderTotal, Customer) VALUES (os.OrderID, os.UnitPrice * os.Quantity, os.Customer)
Once you have successfully run the script, notice that the changes are accurately reflected in the OrdersGold table. This process has efficiently handled inserts, updates and deletes as a result of the delta change feed which was enabled on the OrdersSilver table.
It is important to also point out that once the delta change feed is executed against the OrdersSilver, a new folder will appear in your ADLS gen2 delta location which will begin capturing delta change feed meta-data.
For example, the cdc related files will begin populating in the _change_data folder. Files will be created as a result of CRUD operations that are performed against the OrdersSilver table. This additional process may create overhead and costs related to persisting cdc data in another folder. It will be important to assess whether the benefits outweigh the costs and overhead prior to progressing down this path.
Additionally, the _delta_log folder will also capture details related to cdc, as shown in the figure below.
Once you finish this exercise, remember to delete the tables that you have created by running the following script.
Here is the code that you will need to run to drop the OrdersSilver and OrdersGold once you have completed the exercise.
DROP TABLE cdc.OrdersSilver; DROP TABLE cdc.OrdersGold;
In this article, I have demonstrated how to get started with Databricks delta Change Data Feed through an end-to-end exercise. You learned how to create a delta table with Change Data Feed enabled on a Silver table. Remember that you could also enable change data feed at the cluster level as an alternative. You also learned how to create and track changes made to your Silver table and then propagate those changes into a Gold table. The table_changes from the change data feed can be used to explore details related to the inserts, updates, and deletes. Additionally, you could create views, explore versions, timestamps, and granular details related to the table changes. This is an optimal process since it will not require you to read all data within your lake to identify changed records and it is also compatible with streaming data, which supports the opportunity to design and implement highly scalable lambda architectural patterns using delta Change Data Feed.
- Read more about Change Data Feed, Change data feed - Azure Databricks - Workspace | Microsoft Docs
- Read more about Simplifying Change Data Capture with Databricks Delta - The Databricks Blog
About the author
View all my tips
Article Last Updated: 2021-09-10