Effortless Data Processing with Delta Live Tables

By:   |   Updated: 2022-11-18   |   Comments (1)   |   Related: > Azure Databricks


Problem

The main Data Engineers' challenge is building data processing pipelines that handle a considerable amount of ever-growing data. Modern architectures embrace polyglot persistence, where data sources are scattered among different data processing systems. There is a constant need to migrate the data between sources, fix data types, remove duplicates or empty rows, and merge and aggregate the datasets.

The more data transformation logic incorporated into the flow, the more complex the pipeline maintenance is. Instead of focusing on data, we spend a tremendous amount of time on tooling, working on a retry logic, change tracking, and incrementalizing the flow.

All data starts dirty, regardless of the data sources. Data cleanliness is a matter of requirements, and requirements tend to change all the time. Requirement changes trigger the need to backfill and rerun the data flows. In addition, Data Engineers are required to track data movements and make sure they update tables in the correct order. However, Data Engineers would prefer to focus on data and queries and avoid the operational complexity of the pipelines.

Solution

Most ETL/ELT frameworks perform procedural transformations, working row by row when applying data cleanup or enrichment logic. Procedural data processing means we explicitly state how to get the data and process it on the lowest technical level, breaking it down into the smallest steps.

Declarative data processing works differently. We neither define the order of data transformation logic execution nor require an explanation of every execution step. When we use a declarative approach to process data, execution timing and pipeline complexity are significantly reduced. Modern data processing systems have intelligent data processing algorithms that are getting smarter with every new version.

Delta Live Tables

This tip will introduce you to an innovative Databricks framework called Delta Live Tables. It is a dynamic data transformation tool, similar to the materialized views.

Delta Live Tables are simplified pipelines that use declarative development in a "data-as-a-code" style. Databricks takes care of finding the best execution plan and managing the cluster resources. We only need to define the data transformations. Pipelines are "stateful"—they read each data row only once, and they can track data quality and alert on percentage of bad records growth.

Delta Live Table is built with the concept of data table chaining and reuse. When the source table definition or data changes, the changes automatically propagate to the chained tables.

Delta Pipeline will visualize linked data sources and target datasets. Delta Live Framework automatically creates and maintains tables in Delta format and ensures the data is updated according to the pipeline data transformations definitions. We can also define "Expectations": "constraints" that validate data correctness.

Delta Live tables fit well into the medallion architecture that segments data lake data into three categories: raw (dirty) data, cleaned, and aggregated groups of data tables (below).

Medallion architecture explained

Creating Delta Live Tables

Note: Delta Live Tables are not available in the Standard Pricing Tier, so make sure your Data Bricks account is Premium Pricing Tier.

Choosing Premium configuration

Delta Live notebooks are special, and the best practice is to leave them unattached to a cluster and use a secondary notebook to run development and debug commands.

In the development notebook, I will create the database where I want to put Delta Live Tables:

CREATE DATABASE delta_live_lake;

I will also prepare a mount point to help read the data from Azure Data Lake:

dbutils.fs.mount(
 source = "wasbs://container_name@storage_account_name.blob.core.windows.net",
 mount_point = "/mnt/data/",
 extra_configs = {"fs.azure.account.key.admiraldatastorage.blob.core.windows.net":"account_key"})

Delta Live Table notebook will have one SQL command to create the Delta Live table. Since Delta tables are deeply integrated with the Spark Streaming engine, you can load Delta Table as a stream. The Spark engine will take care of running it incrementally and continuously as data arrives. This can reduce the costs of processing new data and lower data latency. However, such a table can only process append operations, while merges and deletes are not supported.

CREATE STREAMING LIVE TABLE yellow_trips
COMMENT "The raw NY yellow taxi trips table"
TBLPROPERTIES ("quality" = "bronze")
AS
SELECT * FROM cloud_files("/mnt/data/yellow-taxi-csv/", "csv",map("cloudFiles.inferColumnTypes", "true"));
SQL command to create Delta Live table

We can now create a pipeline to build and maintain the above Delta Live Table. As seen below, choose the Workflows tab, then go to Delta Live Tables, a separate type of Databricks job.

How to find Delta Live Pipelines

To create a new pipeline, you need to choose the proper configuration settings. Most settings can be changed at any time during pipeline development, except for the "Storage location" setting, which can be set only during pipeline creation. Below are explanations of each configuration setting:

Product Edition

  • Core – Most basic product edition. Best suited for workloads that do not require advanced features, like data quality constraints, called expectations (to be discussed later).
  • Pro – Supports all core features and includes a feature that supports updating tables when base data changes (Change Data Capture).
  • Advanced – Supports Core and Pro features as well as data quality constraints (expectations).

Notebook Libraries

Surprisingly, you put the name of your notebook that contains the table creation code. If your code requires additional Python libraries, you can add them after the pipeline is created using the "Settings" button.

Storage Location

Cloud storage location is where the tables and metadata will be stored in subfolders. If not specified, the system will use "dbfs:/pipelines/". Note: This setting cannot be changed after the pipeline is created.

Target

When building a Delta Live Table, you will not be able to provide a database name where the tables should be located. This configuration is used to choose the database target.

Pipeline Execution Mode

  • Triggered – More cost-effective. Tables get updated on a schedule or manually without keeping cluster resources online.
  • Continuous – Tables get updated as soon as data gets updated, which requires the cluster to run all the time.

Cluster Mode

  • Legacy Autoscaling – Autoscaling dynamically allocates workers based on the workload and can reduce costs by deallocating cluster resources when they are not needed. This is a backward compatibility mode; in most cases, you should use "enhanced autoscaling" mode.
  • Enhanced Autoscaling – Workers also dynamically allocated workers and includes all kinds of enhancements in cluster utilization and lower costs.
  • Fixed-size – Cluster resources are static.

Photon Acceleration

This new query engine speeds up the processing of large datasets (100GB+) using robust scan performance and advanced join strategies on tables with many columns and/or many small files. Usually, this will not improve the performance of small datasets.

Channel

This configuration allows testing of the future version of Delta Live Runtime.

Execution Modes

  • Development – When building the pipeline and re-trying execution, the pipeline will reuse the same cluster to avoid long waits for the resources and disables pipeline retries.
  • Production – The pipeline will be executed based on a chosen schedule and will try to repair the execution by restarting the cluster when specific errors are received.
select tables for refresh

After successful pipeline execution, we can click on a table visualization to get information about the created live table schema and how many rows were loaded and failed. Since no data quality constraints were defined for this demonstration, all table data was successfully loaded, as seen below.

Click on Delta table to see information on its location, refresh duration and more More information on refreshed table, data quality and Schema

I will add another table to the data pipeline, a taxi zone lookup table, and rerun the flow.

CREATE STREAMING LIVE TABLE taxi_zones_lookup
COMMENT "The NY yellow taxi zone lookup"
TBLPROPERTIES ("quality" = "bronze")
AS
SELECT * FROM cloud_files("/mnt/data/ny_taxi_lookups/", "csv",map("cloudFiles.inferColumnTypes", "true"));

Now I can join both tables in a development notebook to analyze the data:

Join both tables in development notebook

Let's define two data quality constraints on the yellow_trips table:

  1. Always have a pickup location. If the pickup location is empty, drop the row.
  2. Remove the trips where the passenger count = 0.

In addition, let's create a live table as the above aggregation to reflect the number of trips per Taxi Zone, the average number of passengers, and the average and maximum trip costs:

CREATE LIVE TABLE yellow_trips_metrics
COMMENT "Aggregation by pickup location"
TBLPROPERTIES ("quality" = "silver")
AS
SELECT date(t.tpep_pickup_datetime) as pickup_date,
       l1.Borough as pickup_location, 
       l2.Borough as dropoff_location, 
       avg(t.passenger_count) as passenger_count_avg,
       avg(t.total_amount) as total_amount_avg,
       max(t.total_amount) as total_amount_max
FROM LIVE.yellow_trips t
  join LIVE.taxi_zones_lookup l1
     on t.PULocationID = l1.LocationID
  join LIVE.taxi_zones_lookup l2
     on t.DOLocationID = l2.LocationID   
GROUP BY  date(t.tpep_pickup_datetime) ,
       l1.Borough , 
       l2.Borough

Here is my final notebook code:

Full notebook code for 3 delta tables

The image below shows the table chaining visualization and data quality results after a Delta pipeline execution:

Pipeline execution results

Best Practices

  • Store the expectations outside the notebook in a format that can be easily edited, such as a JSON file in the Data Lake, to ensure the code is portable and easy to maintain.
  • Save invalid data instead of dropping the rows to analyze later.
  • Define cluster policies to limit cluster resources.
  • The PIVOT clause is not supported as a Delta Live Tables transformation.
Next Steps


sql server categories

sql server webinars

subscribe to mssqltips

sql server tutorials

sql server white papers

next tip



About the author
MSSQLTips author Maria Zakourdaev Maria Zakourdaev has been working with SQL Server for more than 20 years. She is also managing other database technologies such as MySQL, PostgreSQL, Redis, RedShift, CouchBase and ElasticSearch.

This author pledges the content of this article is based on professional experience and not AI generated.

View all my tips


Article Last Updated: 2022-11-18

Comments For This Article




Monday, May 15, 2023 - 4:06:44 AM - Yang Back To Top (91189)
This is hands-down the best tutorial on delta live table pipeline! Thank you very much.














get free sql tips
agree to terms