Create Parameter Driven Databricks Engineering Notebooks

By:   |   Updated: 2023-10-23   |   Comments   |   Related: > Azure Databricks


Problem

The Delta Lakehouse design follows the medallion architecture for data quality. Usually, the data processing for the first two zones is similar in design. How can we write a Databricks Engineering Notebook to process all the files for a given source system?

Solution

The Databricks utility library (dbutils) has a set of methods that can be used to read and write widgets. Widgets allow the data engineer to pass parameters to the notebook so that the same code (notebook) can be used to process multiple files.

Business Problem

Our manager at Adventure Works has asked us to look into creating a Delta Lakehouse for our company. We assume the data is already available in the raw zone in a CSV format. These daily files represent a full dump of each table from our on-premises dimension model stored in SQL Server. We want to save a copy of the daily file in Parquet format in the bronze zone. Additionally, we want to create a delta table in our silver zone to query the information using Spark SQL.

Today, we will not investigate how to orchestrate data pipelines. However, Azure Data Factory is a good candidate for the Extract, Load, and Transform (ELT) design. The image below was taken from a Databricks Blog, pointing out that over 90 data sources can be read from and written to our data lake.

parameter driven notebook - ADF can be used to copy many types of data to/from data lake.

The first step in creating a data lake is to investigate how to pass parameters to a Python Notebook. The following topics will be covered in this article.

Task ID Description
1 Abstracting the details
2 Working with variables
3 Working with documents
4 Using delta tables

At the end of the article, we will understand the pros and cons of each way to pass variables.

Abstracting the Details

The best designs in life are based on simplicity. I try to use the "Keep It Simple Stupid" (KISS) principle in my systems designs. This phrase was first coined by the U.S. Navy when airplane designers were trying to keep the complexity of the jet engine simple so that technicians could easily repair them. Today's goal is to start creating a Python library (toolbox notebook) to be used when programming in Databricks.

The Python language supports classes. Therefore, we must choose between an object-oriented variable, which has methods or classic functions that perform business processing. Since the latter is simple, we will abstract the calls to the Databricks Utilities library by creating user-defined functions.

The code snippet below removes a named widget from a notebook:

#
#  del_widget - remove name w/error protection
#
 
def del_widget(name):
  try:
    dbutils.widgets.remove(name)
  except:
    pass

The code snippet below adds a named widget to a notebook. There are four types of widgets: text, dropdown, combo box, and multiselect. The text box entry is the most flexible.

#
#  add_widget - add name/value w/error protection
#
 
def add_widget(name, value):
  try:
    dbutils.widgets.text(name, value)
  except:
    pass  

The code snippet below gets the widgets value. The try/except execution block allows developers to control how errors are handled. In the first two functions, we do not care if a widget is created or destroyed. Thus, the pass keyword ignores the error since we can do nothing if the call to the library fails. On the other hand, if we encounter an error when retrieving a widget, we can set the return value to 'None.' This makes sense since we did not get a value.

#
#  get_widget - return value given name
#
 
def get_widget(name):
  try:
    return dbutils.widgets.get(name)
  except:
    return None

To recap, abstracting the details of the calls to the system utility library allows us to add error handling to our program. Additionally, any future logic changes can be made in one place. Next, we will investigate how to pass values to a notebook via widgets and store those values in variables.

Working with Variables

The widgets appear at the top of the notebook in Azure Databricks. If there are no defined widgets, the first cell of the notebook is shown. Please see the image below. As a practice, I reserve the first cell for documenting the purpose of the (notebook) program. For a parent notebook, I use the markup language to make it fancy. The image below shows seven distinct tasks we will cover in this article.

parameter driven notebook - our notebook to test 3 ways to pass parameters

For child notebooks, I usually place normal Python comments in the first cell. We do not want the fancy markup to be shown in the child notebook since it introduces clutter into the parent notebook. The screenshot below shows some topics I will cover in the next article. We can see the first set of functions deals with widgets.

parameter driven notebook - common code in our toolbox notebook

The run magic command executes the toolbox notebook code within our Spark session. After completion, any functions defined in that notebook are available to the developer.

%run ./nb-toolbox-code

The assignment of one widget per variable is the easiest way to pass parameters. The code below deletes any existing widgets. This is a sample of information that might be needed to process files between the raw, bronze, and silver zones.

#
# V1 - drop parameters
#
 
del_widget("dat_folder_name")
del_widget("src_file_name")
del_widget("dst_file_name")
del_widget("lake_path")
del_widget("file_schema")
del_widget("partition_count")
del_widget("debug_flag")

The code below creates widgets for each parameter. Note: Any numeric values must be cast from string to their respective format. Additionally, the Spark Schema is defined using SQL syntax.

#
# V2 - add parameters
#
 
add_widget("dat_folder_name", "/dim/customer/")
add_widget("src_file_name", "DimCustomer.csv")
add_widget("dst_file_name", "customer.parquet")
add_widget("lake_path", "/mnt/advwrks/datalake")
add_widget("debug_flag", "true")
add_widget("partition_count", "2")
add_widget("file_schema", "AccountKey INT, ParentAccountKey INT, AccountCodeAlternateKey INT, ParentAccountCodeAlternateKey INT, AccountDescription STRING, AccountType STRING, Operator STRING, CustomMembers STRING, ValueType STRING, CustomMemberOptions STRING")

The result of our work is shown below, with the parameters having default values. The destruction and creation of the widgets should happen one time.

parameter driven notebook - passing parameters as widgets can get messy

The retrieval of a parameter as a variable is a one-to-one assignment. By default, the variable is of type TEXT.

#
#  V3 - Retrieve parameter
#
 
var_lake_path = get_widget("lake_path")
print(var_lake_path)

The image below shows the root path to our data lake for the Adventure Works company.

parameter driven notebook - reading a single widget is easy

To recap, the three Python functions we created in the previous section were used to delete, add, and get values from widgets. Every design pattern has pros and cons. When using variables, it is easy to set and get a given parameter. However, the maintenance might be unmanageable if the number of parameters is huge.

Working with Documents

The JavaScript Object Notation (JSON) was developed to transmit between two different systems. Therefore, a JSON document can encode a large number of key/value pairs as a single string. The code snippet below creates a variable named app_data, which contains many more parameters that might be required when processing data from the raw to silver zones. This data type is called a dictionary in Python.

#
#  D1 - define JSON document
#
 
app_data = {
    "hive_database": "advwrks",
    "lake_path": "/mnt/advwrks/datalake",
    "folder_path": "/dim/customer/",
    "raw_file_name": "DimCustomer",
    "raw_file_ext": "csv",
    "raw_file_header": "False",
    "raw_file_delimiter": "|",
    "br_file_name": "customer",
    "br_file_ext": "parquet",
    "br_file_header": None,
    "br_file_delimiter": None,
    "ag_file_name": "customer",
    "ag_file_ext": "delta",
    "ag_file_header": None,
    "ag_file_delimiter": None,
    "load_type": "full",
    "partition_cnt": 1,
    "file_schema": "AccountKey INT, ParentAccountKey INT, AccountCodeAlternateKey INT, ParentAccountCodeAlternateKey INT, AccountDescription STRING, AccountType STRING, Operator STRING, CustomMembers STRING, ValueType STRING, CustomMemberOptions STRING"
     }
     

The JSON library has useful methods to convert a dictionary to a string and vice versa. In the code below, the dictionary is converted to a string using the dumps method.

#
# D2 - add parameter
#
 
import json
add_widget("app_config", json.dumps(app_data))

The image below shows the widget called app_config, which contains a JSON document.

parameter driven notebook - reducing the number of widgets by using JSON

The code snippet below uses the loads method to convert the string retrieved from the widget into a dictionary. The print statements show two parameters describing the file in the bronze zone.

#
# D3 - convert JSON to a dictionary
#
 
import json
dict_app_config = json.loads(get_widget("app_config"))
print(dict_app_config['br_file_name'])
print(dict_app_config['br_file_ext'])
type(dict_app_config)

We can see in the following image that the customer file will be saved as a parquet file in the bronze quality zone.

parameter driven notebook - the output is a dictionary and we can get the bronze file and extension properties

Every design has pros and cons. When using JSON documents, a huge number of key/value pairs can be passed as a single widget. This is great! If we were to schedule this common data engineering notebook, the workflow would have to maintain these JSON documents. In practice, this requires copying the information to a text editor, making the required document changes, and saving the updated information into the workflow parameter. It can be difficult to maintain the documents. Is there a better way to store and retrieve parameters for a given notebook?

Using Delta Tables

The relational database management system (RDBMS) has been popular since the 1970s. Regarding Apache Spark, a hive database is where tables are stored. The only file format that has ACID properties is DELTA. Therefore, we want to use a DELTA table to store and retrieve metadata used by our engineering notebook. As a cloud architect, we can use a database such as Azure SQL Database. However, we are now creating a dependency between the two services. If the database is not up, the data engineering programs will fail. Additionally, we must decide whether to store the delta table in existing (managed) or remote (unmanaged) storage.

The code below removes an existing hive database if it exists and then creates a hive database named "etl_jobs." Note: The spark.sql command allows the developer to execute Spark SQL.

#
# T1 - drop + create database 
#
 
# define vars
database_name = "etl_jobs"
 
# Drop the database
sql_stmt = "DROP SCHEMA IF EXISTS {} CASCADE;".format(database_name)
spark.sql(sql_stmt)
 
# Create the database
sql_stmt = "CREATE DATABASE IF NOT EXISTS {}".format(database_name)
spark.sql(sql_stmt)

We need to account for all data processing cases to make code restartable. To create a table, we must ensure that the existing table does not exist. Thus, we want to remove the existing folder and hive table before creating a new unmanaged table. I used the remove command from the utilities library to erase the folder.

#
#
# T2 - drop + create table 
#
 
# define vars
table_name = "meta_data"
file_path = "/mnt/advwrks/datalake/etl"
 
# remove dir if exists
try:
  dbutils.fs.rm(file_path, recurse=True)
except:
  pass
 
# Drop hive table
sql_stmt = "DROP TABLE {}.{}".format(database_name, table_name)
try:
  spark.sql(sql_stmt)
except:
  pass
 
# Create hive table
sql_stmt = """
  CREATE TABLE IF NOT EXISTS {}.{} 
  (
    job_key STRING,
    hive_database STRING,
    lake_path STRING,
    folder_path STRING,
    raw_file_name STRING,
    raw_file_ext STRING,
    raw_file_header BOOLEAN,
    raw_file_delimiter STRING,
    br_file_name STRING,
    br_file_ext STRING,
    br_file_header BOOLEAN,
    br_file_delimiter STRING,
    ag_file_name STRING,
    ag_file_ext STRING,
    ag_file_header BOOLEAN,
    ag_file_delimiter STRING,
    load_type STRING,
    partition_cnt STRING,
    partition_col STRING,
    file_schema STRING
  )
  LOCATION '{}'
  """.format(database_name, table_name, file_path)
spark.sql(sql_stmt)

Now that we have a delta table, we want to define a job key that can be used to retrieve the metadata for the notebook. Therefore, it is important to create a column that is considered a primary key. The CREATE TABLE syntax allows for the definition of primary keys. However, these constraints are not enforced.

The code below destroys our job_key widget:

#
# T3A - drop parameter
#
 
del_widget("job_key")

The code below creates our job_key widget:

#
# T3B - add parameter
#
 
add_widget("job_key", "customer_ingestion")

In this article, we have been moving the maintenance of parameters from notebooks to JSON documents. How can we make this process even easier?

We can use our SQL skills to move this maintenance to Delta tables. The image below shows a very simple parameter being passed to the notebook, the primary key for a given row of metadata in our Delta table. The maintenance nightmare in the workflows (jobs) section of Databricks is eliminated. All metadata is inserted, updated, or deleted in the DELTA table. Now we only need to maintain our table!

parameter driven notebook - using delta tables allows us just to pass the job primary key.

The SQL magic command can be used to execute Spark SQL statements. In short, the statement is converted to text internally and passed to the spark.sql command. Use the INSERT statement to add the first entry to the table.

%sql
 
-- Customers - metadata entry
 
INSERT INTO etl_jobs.meta_data
(
    job_key,
    hive_database,
    lake_path,
    folder_path,
    raw_file_name,
    raw_file_ext,
    raw_file_header,
    raw_file_delimiter,
    br_file_name,
    br_file_ext,
    br_file_header,
    br_file_delimiter,
    ag_file_name,
    ag_file_ext,
    ag_file_header,
    ag_file_delimiter,
    load_type,
    partition_cnt,
    partition_col,
    file_schema
  )
  VALUES
  (
    "customer_ingestion",
    "advwrks",
    "/mnt/advwrks/datalake",
    "/dim/customer/",
    "DimCustomer",
    "csv",
    True,
    "|",
    "customer",
    "parquet",
    False,
    "",
    "customer",
    "delta",
    False,
    "",
    "full",
    1,
    "",
    "AccountKey INT, ParentAccountKey INT, AccountCodeAlternateKey INT, ParentAccountCodeAlternateKey INT, AccountDescription STRING, AccountType STRING, Operator STRING, CustomMembers STRING, ValueType STRING, CustomMemberOptions STRING"
  )

The image below shows that one row was inserted after executing the Spark SQL statement.

parameter driven notebook - inserting a row has a returned result set

The Spark SQL language does support the TRUNCATE TABLE statement. This is great if you want to repopulate a table from scratch.

%sql
 
-- Clear table?
 
TRUNCATE TABLE etl_jobs.meta_data

The Spark SQL language does support the SELECT statement so that we can look at the record (tuple of data) we inserted into the table.

%sql
 
-- Show inserted data
 
SELECT * FROM etl_jobs.meta_data;

The output from executing the SELECT statement is a dataframe shown in a grid control. Since there are many columns in this table, there is a scroll bar when you click the output.

parameter driven notebook - front part of job row

In the next article, I will talk about writing generic Python functions to read and write data files. The first pass at writing data files will use a full load pattern. This means the folder and files are removed before writing. The column name for this parameter is load_type and has a value of "full."

parameter driven notebook - back part of job row

How can we change this metadata to support "incremental" file loading? We can use the UPDATE command that is supported by Spark SQL.

%sql
 
-- Update the record
 
update etl_jobs.meta_data as m
set m.load_type = 'incremental'
where m.job_key = 'customer_ingestion';

The output from this action is a dataframe telling how many rows were affected.

parameter driven notebook - the update command returns a result set

If we execute the SELECT statement again and scroll right on the record, we can see that the column has been updated with the new value.

parameter driven notebook - the data in the only row has been updated

The Python code below retrieves the single record for data from the meta_data table in the etl_jobs database, where the job_key matches our input parameter.

#
# T4 - retrieve parameters
#
 
# select specified row
sql_stmt = "SELECT * FROM etl_jobs.meta_data WHERE job_key = '{}'".format(get_widget("job_key"))
 
# exec sql stmt, convert to pandas, convert to dict of lists
dict_app_config = ((spark.sql(sql_stmt)).toPandas()).to_dict(orient = 'list')
   
# print the dict
print(dict_app_config['br_file_name'][0])
print(dict_app_config['br_file_ext'][0])
type(dict_app_config)

The magic happens with the toPandas and to_dict function calls. The output of the spark.sql function call is a Spark SQL dataframe. The Pandas library has a function that turns a record set into a dictionary of lists (arrays). Thus, if we had two rows of data in the dataframe, position zero would have the first row, and position one would have the second row. Please see how the dict_app_config variable is referenced in the code above as an array.

The image below shows the same results as the JSON documents test. However, the maintenance of the data has been moved from a TEXT document to a DELTA table. A negative feature of this design is the code to set up and maintain the meta_data table. A positive design feature is the fact that only a job_key is needed to retrieve all the parameters for a complex program.

parameter driven notebook - retrieving data from a delta file requires more code

Summary

The ability to pass parameters to a notebook allows a developer to take a static program and make it dynamic. Today, we abstracted the Databricks utility library with three user-defined functions: delete widget, create widget, and get widget value. These functions were used to look at three ways to pass data to the notebook: multiple variables, one JSON document, and one table primary key. Working with variables can result in a lot of maintenance inside your notebook. If you choose to work with JSON documents, one variable can be used to pass all data. However, the maintenance is now moved to the document. Since we must pass unique parameters for each scheduled workflow, our new maintenance location is the job editor. The last design pattern was to store all the metadata in a DELTA table. I prefer this pattern since the notebook and job editor have a single mnemonic value. All the maintenance is within a table. As a data engineer, creating and maintaining tables is second nature.

To recap, there are many ways to pass parameters to a Python notebook. Please be mindful of where the maintenance will occur, choosing the design that simplifies maintenance. Enclosed is a zip file that has the Python notebook code that was covered today. Next time, we will discuss how to create read and write functions for parameter-driven notebooks, allowing us to quickly fill in the bronze and silver zones from files in the raw zone.

Next Steps
  • Moving Data between Medallion Zones with Databricks
  • Incremental loading data with Delta Tables


sql server categories

sql server webinars

subscribe to mssqltips

sql server tutorials

sql server white papers

next tip



About the author
MSSQLTips author John Miner John Miner is a Data Architect at Insight Digital Innovation helping corporations solve their business needs with various data platform solutions.

This author pledges the content of this article is based on professional experience and not AI generated.

View all my tips


Article Last Updated: 2023-10-23

Comments For This Article

















get free sql tips
agree to terms