Implementing ETL Logging on Lakehouse using Delta Lake's Time Travel capability

By:   |   Updated: 2022-06-10   |   Comments (1)   |   Related: 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | > Azure Synapse Analytics


Problem

This tip is part of the series dedicated to the building end-to-end Lakehouse solutions leveraging Azure Synapse Analytics. In previous posts, we've explored the ways to build ETL pipelines to ingest, transform and load data into Lakehouse. The Lakehouse concept is based on the Delta Lake technology, allowing you to treat the files in Azure Data Lake like relational databases. However, what makes Delta Lake technology outstanding is its time travel functionality. In this tip, I'm going to demonstrate how to use Delta Lake's time travel functionality for ETL logging.

Solution

What is Delta Lake time travel?

Delta Lake technology uses transaction logging to preserve the history of changes to the underlying files. This allows you to go back in time and see the previous states of the rows (see Introducing Delta Time Travel for Large Scale Data Lakes for more details). This feature can be used for many different purposes, including auditing, troubleshooting, logging, etc. You can query rows by version, as well as by timestamp.

Let me illustrate this using the SalesOrderHeader Delta table I've created in this tip.

Add a Spark pool and create a Spark notebook with SparkSQL language (see Microsoft documentation for more details). Add a cell with the following command to browse a few rows:

Select * from delta.`/delta/silver/SalesOrderHeader/` ORDER BY SalesOrderID LIMIT  5

Notice that I'm treating the location of the Delta tables, as the table names in a relational database. There's an alternative way of querying Delta tables, that involves prior registration of these tables in Delta Lake's meta store (see this Delta Lake documentation for more details)

Here are the query results:

browse and select data

Figure 1

Add another cell to simulate updating single row in the table:

UPDATE delta.`/delta/silver/SalesOrderHeader/` SET RevisionNumber=3 WHERE SalesOrderID=71774

Now let's see the history of the changes to this table by using the following command:

DESCRIBE HISTORY delta.`/delta/silver/SalesOrderHeader/`

As you can see from the below screenshot, we've got multiple changes, with the top record being the most recent change:

history of changes

Figure 2

Notice that the results also contain the transaction timestamp, operation type and operation metrics columns, among other details. The operationMetrics column is in JSON format, and it includes more granular transaction details, like the number of updated rows. As you can see from the query results, the previous changes have been caused by the merge operation, which is because we've selected an upsert method to populate these tables from Mapping Data Flow. Let's also examine the operationMetrics details for the Merge operation:

operation metrics update and merge

Figure 3

Notice the structure of the operationMetrics column is slightly different for the Merge operation. We can also get the table's past state as of version 23, using the following command:

Select * from delta.`/delta/silver/SalesOrderHeader@v23` WHERE SalesOrderID=71774

Here's the screenshot:

operation metrics

Figure 4

Finally, let's explore the Delete command, by using following commands:

DELETE FROM delta.`/delta/silver/SalesOrderHeader/`  WHERE SalesOrderID=71783;
DESCRIBE HISTORY delta.`/delta/silver/SalesOrderHeader/`

Here's the screenshot:

delete data

Figure 5

Now that we know how the time travel functionality works, let's see how this can be applied for ETL logging purposes.

The ETL logging notebook

Let's create another Spark notebook with PySpark language. I'm going to use an alternative way of querying Delta Lake, which involves Delta Lake API. Add the parameter-type cell with the following parameters:

LoadZone='silver'
TableName='SalesOrderHeader'

Add another cell with following command:

from delta.tables import *
from pyspark.sql.functions import *
delta_table_path=''.join(['/delta/',LoadZone,'/',TableName,'/'])
deltaTable = DeltaTable.forPath(spark, delta_table_path)

These commands build the table path from the parameters and create a Delta table object based on that path.

Now we can use the history method and assign the results to a data frame, as follows:

dfLogs=deltaTable.history()
display(dfLogs)

Here's the screenshot with query results:

history data

Figure 6

And let's run following commands to get merge command stats:

dfMergeLogs=(dfLogs.withColumn('LoadZone',lit(LoadZone))
.withColumn('TableName',lit(TableName))
.selectExpr('readVersion','LoadZone','TableName','timestamp','operation',
'operationMetrics.numTargetRowsInserted as RowsInserted',
'operationMetrics.numTargetRowsUpdated as RowsUpdated',
'operationMetrics.numTargetRowsDeleted as RowsDeleted')
.filter("operation='MERGE'"))
display(dfMergeLogs)

The above command creates a new data frame by extracting granular transaction details from the operationMetrics column. Here's the screenshot:

new data frame

Figure 7

Let's get details for update and delete commands, using similar methods:

dfUpdateLogs=(dfLogs.withColumn('LoadZone',lit(LoadZone))
.withColumn('TableName',lit(TableName))
.selectExpr('readVersion','LoadZone','TableName','timestamp','operation',
'0 as RowsInserted',
'operationMetrics.numUpdatedRows as RowsUpdated',
'0 as RowsDeleted',)
.filter("operation='UPDATE'"))
 
dfDeleteLogs=(dfLogs.withColumn('LoadZone',lit(LoadZone))
.withColumn('TableName',lit(TableName))
.selectExpr('readVersion','LoadZone','TableName','timestamp','operation',
'0 as RowsInserted',
'0 as RowsUpdated',
'operationMetrics.numDeletedRows as RowsDeleted')
.filter("operation='DELETE'"))

Finally, let's union all three data frames and order the results by change versions, using the following command:

dfAllLogs=dfMergeLogs.unionAll(dfUpdateLogs).unionAll(dfDeleteLogs).sort(desc('readVersion'))
display(dfAllLogs)

Here's the screenshot:

union of data frames

Figure 8

And now we can persist the logs in the Delta table. First, run this command to create a database:

spark.sql("CREATE DATABASE IF NOT EXISTS ETL")

And then persist the data frame in the managed Delta table, by using following command:

dfAllLogs.write.format("delta").saveAsTable('ETL.ExecLogs','delta','append')

Let's do a quick validation:

validate data

Figure 9

The last touch here would be adding limit(1) method to the final write command, to ensure that historical rows are not appended on subsequent executions:

dfAllLogs.limit(1).write.format("delta").saveAsTable('ETL.ExecLogs','delta','append')

Congratulations, we've built a notebook with ETL logging!

The ETL logging orchestration

The notebook we've created above can be kicked off from the data integration pipeline, and we can create that pipeline directly from this notebook- just use Add to pipeline button, as follows:

etl logging

Figure 10

Once inside the pipeline, select the Notebook activity, select Spark pool and add the required parameters, as follows:

etl logging

Figure 11

This activity could be included in a dedicated pipeline or be added to the existing ETL pipeline, depending on your overall ETL orchestration needs.

Next Steps


sql server categories

sql server webinars

subscribe to mssqltips

sql server tutorials

sql server white papers

next tip



About the author
MSSQLTips author Fikrat Azizov Fikrat Azizov has been working with SQL Server since 2002 and has earned two MCSE certifications. He’s currently working as a Solutions Architect at Slalom Canada.

This author pledges the content of this article is based on professional experience and not AI generated.

View all my tips


Article Last Updated: 2022-06-10

Comments For This Article




Friday, June 10, 2022 - 9:01:59 AM - shaun Back To Top (90153)
If it's already in the history of the delta table. Which is effectively a table. Why write it to another table?














get free sql tips
agree to terms