Advanced Schema Evolution using Databricks Auto Loader


By:   |   Updated: 2021-08-30   |   Comments   |   Related: > Azure Databricks


Problem

The concept of event driven ETL paradigms has been a long-standing desire in the data engineering ecosystem, and even more so as modern data architectures explore and approach the Lakehouse paradigm, which includes the concept of building out an entire Data warehousing ecosystem in a Data Lake. While there are numerous event driven data ingestion patterns in Azure, managing the changing schemas for streaming data has traditionally been a challenge. Additionally, the set-up and management of event grid subscriptions, topics and more have also been a challenge to seamlessly integrate with Spark. Auto Loader provides a structured streaming source called cloudFiles which offers the capability of incrementally processing new files as they arrive in Azure Data Lake Storage Gen2, while also managing advanced schema evolution of this streaming data, and finally storing this data in a dataframe. How can we get started with Auto Loader and cloudFiles?

Solution

Auto Loader within Databricks runtime versions of 7.2 and above is a designed for event driven structure streaming ELT patterns and is constantly evolving and improving with each new runtime release. With the release of Databricks runtime version 8.2, Auto Loader's cloudFile source now supports advanced schema evolution. With schema inference capabilities, there is no longer the need to identify and define a schema. In this article, I will demonstrate how to get started with using Auto Loader cloudFiles through an end-to-end practical example of ingesting a data stream which has an evolving schema. Within this exercise, you will learn how to setup Auto Loader cloudFiles in Azure, work with evolving streaming data schemas, track changing schemas through captured versions in schema locations, infer schemas and/or define schemas through schema hints.

Pre-requisites

There are a few set up steps that are required for Auto Loader cloudFiles to work effectively. In this section, you will learn how to create these pre-requisites which include generating the JSON files which will be used for this exercise, completing the necessary set-up with Azure portal and configuring Databricks secret scopes.

Prior to continuing this section, ensure that you have created the following Azure resources:

  1. Azure Databricks
  2. Azure Key Vault
  3. Azure Data Lake Storage Gen2

SQL

For the purpose of this exercise, let's use JSON format files for the source files that will be ingested by Auto Loader cloudFiles, since this format demonstrated a semi-structured, complex and evolving format. You can prepare these source files by using the Adventure Works LT2019 database which I have worked with and demonstrated how to import into SSMS in previous articles. Use the Customer table to generate three source files that you will feed into the streaming Auto Loader cloudFiles source.

SQLSalesLTCustomer Customer Table schema

Begin by writing the following SQL query which takes a handful of columns from the Customer table and returns them as JSON format for each record per line. Save the results of this query as a JSON file which you could call Customer1.json. You will also need to repeat this exercise two more time to create two additional customer JSON files. With each iteration of the code, you will need to add additional columns to the query to truly mimic an evolving schema.

JSON1SQLQuery 
SQL Query1 to create JSON1 file

Here is the SQL query which is used for Customer1.json.

SELECT
       (
              SELECT firstname,
                     lastname,
                     middlename,
                     title,
                     customerid FOR json path,
                     without_array_wrapper)
FROM   saleslt.customer

Similar to the previous query, here is the SQL query which is used for Customer2.json.

SELECT
       (
              SELECT firstname,
                     lastname,
                     middlename,
                     title,
                     customerid,
                     companyname,
                     emailaddress,
                     salesperson,
                     namestyle FOR json path,
                     without_array_wrapper)
FROM   saleslt.customer

Finally, here is the SQL query which is used for Customer3.json.

SELECT
       (
              SELECT firstname,
                     lastname,
                     middlename,
                     title,
                     customerid,
                     companyname,
                     emailaddress,
                     salesperson,
                     namestyle,
                     modifieddate,
                     phone,
                     rowguid FOR json path,
                     without_array_wrapper)
FROM   saleslt.customer

Now that you have created Customer1.json, Customer2.json, and Customer3.json files by using the source SQL queries within this section, you are ready to upload these JSON files into ADLS gen2.

Azure Data Lake Storage Gen2

Within ADLS gen2, you will need to create a few new folders. Create a Customer folder and load all of the Customer JSON files that you had created in the previous section into this folder.

ADLSCustomerFiles ADLS2 customer json files loaded to raw zone

Also create a Customer_stream folder and load the Cutomer1.json file. All files that will be added to this folder will be processed by the streaming Auto Loader cloudFiles source.

ADLS2CustomerStream ADLS2 customer1 file loaded to customer_stream folder

Azure Portal

There are a few configurations that need to be completed in the Azure portal. Begin by navigating to Resource providers in your Azure Subscription and register Microsoft.EventGrid as a resource provider.

EventGridPerms Register Event Grid as Resource Provider

Next, navigate to Azure Active Directory and register a new application. You will need the client ID, tenant ID and client secret of this new app and will need to also give this app access to ADLS gen2.

ADLSAppRegistration Register App that will have access to ADLS2

Once you finish registering this new app, navigate to Certificates and secrets and create a new client secret.

GenerateSecret Generate a new secret for the newly registered App

Within ADLS gen2, navigate to Access Control (IAM) and add a new role assignment. Give the app that you previously registered contributor access to ADLS gen2.

ADLS2perm Allow the App to access ADLS2 as a contributor.

At this point, begin copying the credentials and keys from the various application so that you can store them in a Key Vault. Databricks will have access to this key vault and Auto Loader will use these credentials to created event grid subscriptions and topics to process the incoming streaming data. Begin by navigating to the App that you registered in the previous section and copy ClientID and TenantID and save them in a notepad.

CopyAppIds Copy the Client and Tenant IDs from the registered App

Next, navigate to the registered App's Certificates and secrets tab and create a new secret. Once created, copy the ClientSecret and paste this into the notepad as well.

CopyClientSecret Copy the client secret from the registered app.

You will also need to navigate to the ADLS gen2 Access keys tab, copy the SASKey, and paste it into the notepad.

CopySASKey Copy the SASKey fro the ADLS2 access keys tab

Within ADLS gen2, navigate to Shared access signature, ensure that the Allowed services, resource types, and permissions are configured accurately. Generate the connection string, copy it and paste in the notepad.

CopyConnectionString Copy the connection string from the SAS tab of ADLS 2

There is one final step in the Azure portal pre-requisites section. Use all of the values that you have previously pasted into notepad as Key Vault secrets. Do this by navigating to Key Vault's Secrets tab, generate a new secret and create the following secrets. You will also need to add your resource group and Subscription ID as secrets. Ensure that the following seven secrets are created and enabled.

AddKeyVaultSecrets Add the copied secrets to AKV

Databricks

In Databricks, create a new secret scope by navigating to https://<DATABRICKS-INSTANCE>#secrets/createScope and replace <DATABRICKS-INSTANCE> with your own Databricks URL instance. This URL will take you to the UI where you can create your secret scope. Paste the Key Vault URI and Resource ID from your Key Vault into the respective DNS Name and Resource ID section.

CreateSecretScope Create a secret scope in Databricks

Create a new Cluster with Databricks Runtime Version of 8.2, which supports the advanced schema evolution capabilities of Auto Loader cloudFiles.

CreateCluster Create 8.2 runtime cluster

To prevent any errors at run-time, also install the Event Hubs library to the cluster. This Maven library contains the following coordinates.

InstallLibrary Install Event Hubs library to cluster

This concludes all of the required prerequisites in Databricks.

Run Auto Loader in Databricks

In this section, you will learn how to begin working with Auto Loader in a Databricks notebook.

Configuration Properties

To begin the process of configuring and running Auto Loader, set the following configuration, which specifies either the number of bytes or files to read as part of the config size required to infer the schema.

CloudFilesConfig Set the cloudfiles config in notebook

Here is the code shown in the figure above. Note that you could use either the numBytes or numFiles properties.

#spark.conf.set("spark.databricks.cloudfiles.schemaInference.sampleSize.numBytes",10000000000)
spark.conf.set("spark.databricks.cloudfiles.schemaInference.sampleSize.numFiles",10)

This next block of code will obtain the list of secrets that you have created in your Key Vault.

DbutilsSecrets Add the dbutils secrets to notebook

Here is the code that is used in the figure above. Since you have given Databricks access to the Key Vault secret scope, there should be no errors when your run this code.

subscriptionId = dbutils.secrets.get("akv-0011","subscriptionId")
tenantId = dbutils.secrets.get("akv-0011","tenantId")
clientId = dbutils.secrets.get("akv-0011","clientId")
clientSecret = dbutils.secrets.get("akv-0011","clientSecret")
resourceGroup = dbutils.secrets.get("akv-0011","resourceGroup")
queueconnectionString = dbutils.secrets.get("akv-0011","queueconnectionString")
SASKey = dbutils.secrets.get("akv-0011","SASKey")

Rescue Data

This next block of code will build your cloudFiles config. Notice that the format is listed as JSON, but could just as easily be any other format. Define the schema location within your Customer_stream folder and set the schema evolution mode as rescue. There are additional options which have been commented out in this section, however we will cover some of these details in later sections.

CloudFileSettings Add the cloud files config settings to the notebook

Here is the code which is shown in the figure above. Since cloudFiles will automatically create EventGrid topics and subscriptions it will need the credentials in this code to get access to the relevant Azure resources. The _checkpoint folder will store the schema meta-data and will also keep track of multiple versions of the evolved schemas. The partitionColumns config provides the option to read Hive style partition folder structures. The schema evolution mode of 'failOnNewColumns' will simply fail the job when new columns are detected and will require manual intervention to define and update and new schema. We will not be exploring this option.

cloudfile = {
"cloudFiles.subscriptionID": subscriptionId,
"cloudFiles.connectionString": queueconnectionString,
"cloudFiles.format": "json",
"cloudFiles.tenantId": tenantId,
"cloudFiles.clientId": clientId,
"cloudFiles.clientSecret": clientSecret,
"cloudFiles.resourceGroup": resourceGroup,
"cloudFiles.useNotifications": "true",
"cloudFiles.schemaLocation": "/mnt/raw/Customer_stream/_checkpoint/",
"cloudFiles.schemaEvolutionMode": "rescue"
#"cloudFiles.inferColumnTypes": "true"
#"cloudFiles.schemaEvolutionMode": "failOnNewColumns"
#"cloudFiles.schemaEvolutionMode": "addNewColumns"
#"cloudFiles.partitionColumns": ""
}

With the AdditionalOptions properties, you can define schema hints, rescued data columns and more. In this code block, you are specifying which column to add the rescued data into.

AdditionalOptionsRescueData Add additional options for rescue data

Here is the code which is shown in the figure above.

AdditionalOptions = {"rescueDataColumn":"_rescued_data"}

In this next code block, set the ADLS gen2 config by adding the ADLS gen2 account and SAS Key.

SASKeyConfig Add the SAS key config to notebook

Here is the code which is shown in the figure above.

spark.conf.set("fs.azure.account.key.adlsg2v001.dfs.core.windows.net","SASKey")

Run the following code to configure your data frame using the defined configuration properties. Notice that by default, the columns are defaulted to 'string' in this mode. Nevertheless, cloudFiles is able to automatically infer the schema.

ReadStream1 Read the stream and view the schema

Here is the code that is used in the figure above.

df = (spark.readStream.format("cloudFiles")
      .options(**cloudfile)
      .options(**AdditionalOptions)
      .load("abfss://data@adlsg2v001.dfs.core.windows.net/raw/Customer_stream/"))

Upon navigating to the Customer_stream folder in ADLS gen2, notice that there is now a new _checkpoint folder that is created as a result of running the code above.

CheckPoint See that the checkpoint folder has been created in customer_stream folder.

Within the _schemas folder there is a file named 0. This file contains the initial meta data version of the schema that has been defined.

schemafile0 schema file 0 has been create which contains initial schema

Upon opening the file, notice how it captures the JSON data frame schema structure as expected.

{"dataSchemaJson":"{\"type\":\"struct\",\"fields\":[{\"name\":\"FirstName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"LastName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"MiddleName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"Title\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"customerid\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionSchemaJson":"{\"type\":\"struct\",\"fields\":[]}"}

Schema Hints

Navigate back to the Databricks notebook and to the code block which contains AdditionalOptions. This time, add the following schemaHints. This is particularly useful if you wish to explicitly define the schema of a particular column.

SchemaHints Add schema hints to alter schema

Here is the code that is used in the figure above. Remember to delete your _schemas folder so that the process can infer the schema from scratch once again.

AdditionalOptions = {
"cloudFiles.schemaHints":"customerid int",
"rescueDataColumn":"_rescued_data"}

Re-run the following code and notice that customerid has this time been inferred as an integer rather than a string.

ReadStreamSchemaHint View schema from schema hints

Here is the code that is used in the figure above.

df = (spark.readStream.format("cloudFiles")
      .options(**cloudfile)
      .options(**AdditionalOptions)
      .load("abfss://data@adlsg2v001.dfs.core.windows.net/raw/Customer_stream/"))

Infer Column Types

Navigate to the cloudfile config code block and uncomment "cloudFiles.inferColumnTypes": "true", which provides the capability of automatically inferring the schema of you incoming data.

InferColumns Enable inferColumnTypes to auto infer the schema.

Additionally, this time, remove the schemHints from the AdditionalOptions code block.

RemoveSchemaHints Remove the schema hints from the Additional Options code block

Also, once again, remember to delete the contents of the _checkpoint folder so that you are re-inferring the schema from scratch.

DeleteCheckPointFile Delete the checkpoint file to recreate the schema file

Re-run the following code and notice that this time, without schemaHints, customerid has been inferred as 'long'.

ReadStreamInferedSchema Read stream and notice schema is auto inferred.

Go ahead and run the following command to initialize the stream and display the data.

DisplayReadStream Display the stream

As expected, notice that the data from Customer1.json is being displayed. Notice that the _rescued_data column is null since there is no data that needs to be rescued yet. When you begin adding new columns from Customer2 and Customer3 JSON files, these additional columns will be captured in this column since the rescue data config properties is enabled.

StreamDisplayed The data from the stream is displayed

Now it is time to add Customer2.json to our Customer_stream folder. Since Customer2 has more columns it should demonstrate how schema evolution is handled by Auto Loader cloudFiles.

AddCustomer2Json Add the next Customer2.json file with additional columns.

After navigating back to the streaming query, notice that the newly added columns from Customer2.json are all added to the _rescued_data column in the dataframe, as expected. Since Auto Loader is constantly watching for new files, there is no need to re-run this query since the update occurs every 5-10 seconds.

RescuedDataCol Stream will pick up the new columns in the rescued data column.

When you check your _schemas folder, you will only see the version 0 schema struct file which contains the original file from Customer1.json. This is because the schema is still fixed and all new columns are added to the _rescued_data column.

schemafilecreated a new schema file is created.

Add New Columns

Now that you have seen how rescue data works, lets enable to schema evolution mode of 'addNewColumns' so that we could include the new columns and now simply bucket them into the _rescued_data column. This process should now also create a new version of the schema struct file in the _schemas folder.

AddNewCols Enable option to addNewColumns in code block

Notice how the stream failed this time, which is deliberate since it follows the patterns of failing the job, updating the new schema, and then include the schema when the job is re-started. This is intended to follow the best practices of streaming architecture which contains automatic failures and re-tries.

StreamStopped Stream job will stop since it does not match schema

Re-start the streaming job and notice that this time the new additional columns from Customer2.json are included in the schema.

ReadStreamNewCols Restart the job and re-run readstream and notice new cols

Re-run the df.display() command and notice that the additional columns are displayed in the data frame.

DisplayStreamNewCols New columns are added to display stream

Upon navigating back to the _schemas folder in ADLS gen2, notice that there is a new version 1 of the schema struct file.

NewSchemaFile A new schema file is created with new cols.

Upon opening this file, notice that it contains the additional columns that were added from Customer2.json.

v1
{"dataSchemaJson":"{\"type\":\"struct\",\"fields\":[{\"name\":\"FirstName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"LastName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"MiddleName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"Title\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"customerid\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"CompanyName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"EmailAddress\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"SalesPerson\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"namestyle\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}}]}","partitionSchemaJson":"{\"type\":\"struct\",\"fields\":[]}"}

Similar to the previous demonstration, add the Customer3.json file to the Customer_stream folder in ADLS gen2 and notice once again that the stream will fail, as expected.

StreamStopped2 Adding new file with additional cols will again fail the stream.

Re-start the stream by running the following code and notice that the new columns from Customer3.json are included.

RestartStream Restart the stream to see new cols come in

Also, notice that the new columns from Customer3.json are included in the streaming data frame.

NewColsAdded3 New cols from 2rd file are captured in stream

Similar to the previous example, there is yet another schema struct file included in the _schemas folder in ADLS gen2 which contains version 3 to include the additional columns from Customer3.json.

NewSchemaFile3 New schema file 3 has ben created with evolved schema.

Upon opening the version 2 schema struct file, notice that it includes the new columns from Customer3.json.

v1
{"dataSchemaJson":"{\"type\":\"struct\",\"fields\":[{\"name\":\"FirstName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"LastName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"MiddleName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"Title\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"customerid\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"CompanyName\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"EmailAddress\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"SalesPerson\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"namestyle\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ModifiedDate\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"Phone\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"rowguid\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}","partitionSchemaJson":"{\"type\":\"struct\",\"fields\":[]}"}

Summary

In this article, I demonstrated how to configure and run Auto Loader in Azure Databricks by using the cloudFiles source. Specifically, you learned how to manage advanced schema evolution capabilities from streaming semi-structured HSON data. This modern streaming architectural pattern can be considered while building the Lakehouse architecture since it also supports writing the stream to Delta format. It can be used form both stream and batch ELT workload processing paradigms since the cluster will shut down when there no files are in queue to process and will re-start when new files arrive in the queue. Additionally, the ease of management and maintenance of Event grid subscriptions and topics demonstrate Auto Loader's capabilities in the modern Data and Analytics platform, specifically as it related to the Lakehouse Paradigm.

Next Steps





get scripts

next tip button



About the author
MSSQLTips author Ron L'Esteve Ron L'Esteve is a seasoned Data Architect who holds an MBA and MSF. Ron has over 15 years of consulting experience with Microsoft Business Intelligence, data engineering, emerging cloud and big data technologies.

View all my tips


Article Last Updated: 2021-08-30

Comments For This Article





download














get free sql tips
agree to terms