Getting Started with Azure Synapse Analytics Hyperspace for Spark Indexing


By:   |   Updated: 2021-02-02   |   Comments   |   Related: > Azure


Problem

While Spark offers tremendous value in the advanced analytics and big data spaces, there are currently a few known limitations around indexing with Spark when compared to the SQL Server Indexing systems and processes. While Spark isn't great at b-tree indexing and single record lookups, Spark partitioning attempts to address some of these indexing limitations. However, when users query the data with a different search predicate than what was partitioned, this will result in a full scan of the data along with in-memory filtering on the Spark cluster, which is quite inefficient.

In my previous article, Getting Started with Delta Lake Using Azure Data Factory, I covered some of the benefits and capabilities of Delta Lake. While the Delta formats offer Z-ordering, which is the equivalent to a SQL Server Clustered Index, what other options do we have for non-clustered indexing in Spark in order to improve query performance as well as reduce operational overhead costs?

Solution

Microsoft recently announced a new open source indexing subsystem for Apache Spark called Hyperspace. While it is currently only partially baked at version 0.2 which only supports Spark 2.4, there is huge potential for this Spark indexing sub-system, specifically within the Azure Synapse Analytics landscape since it comes built into the Synapse workspaces.

Similar to a SQL Server non-clustered index, Hyperspace will:

  1. Create an index across a specified data-frame,
  2. Create a separate optimized and re-organized data store for the columns that are being indexed,
  3. Include additional columns in the optimized and re-organized data store, much like a non-clustered SQL Server index.

This article will explore creating a dataset in a Synapse workspace along with a Hyperspace Index to compare a query using hyperspace indexed vs non-indexed tables to observe performance optimizations.

Pre-Requisites

Create a Synapse Analytics Workspace:

Prior to working with Hyperspace, a Synapse Analytics Workspace will need to be created. This quick-start describes the steps to create an Azure Synapse workspace by using the Azure portal: Quickstart: Create a Synapse workspace.

synapse workspace

Create a Spark Pool:

In addition to a Synapse workspace, a Spark Pool will be needed. For more information related to creating a Spark Pool, see: Quickstart: Create a new Apache Spark pool using the Azure portal.

For this demo, I have created a Medium Node size with 8 vCPU/ 64 GB.

Create a new synapse spark pool

Choose a Dataset:

Finally, we'll need a dataset to work with. While big datasets are always preferable, this would come with overhead and compute costs, therefore for this demo, I have used a subset of the NYC Taxi & Limousine Commission – yellow taxi trip records. While the entire record-set spans 1.5 billion records (50GB) from 2009 to 2018, I have chosen to use a 7-month subset of the data from 2018-05-01 to 2018-05-08, which is around 2.1 million records. Note that this dataset and more is also available in the Synapse Workspace Knowledge Center.

NYC Taxi DataSet

Import a Dataset

Now that we have established our pre-requisites, the following code will import the NycTlcYellow data from Azure ML Open Datasets. There is also a parser filter for the desired dates. Finally, the code will load the data to a spark data frame.

from azureml.opendatasets import NycTlcYellow
 
from datetime import datetime
from dateutil import parser
 
end_date = parser.parse('2018-05-08 00:00:00')
start_date = parser.parse('2018-05-01 00:00:00')
 
nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
nyc_tlc_df = nyc_tlc.to_spark_dataframe()
NYC Taxi Dataset

After the job succeeds, we can run a select on the VendorID column to confirm that we have approximately 2.1M records, as expected.

Count of records in the dataset

Create Parquet Files

Now that we have a spark data frame containing data, the following code will create parquet files in the linked ADLS2 account.

nyc_tlc_df.createOrReplaceTempView('nyc_tlc_df')
nyc_tlc_df.write.parquet('nyc_tlc_df_parquet', mode='overwrite')
Create Parquet files in datalake

As expected, we can verify in ADLS2 that the snappy compressed parquet files have indeed been created in the nyc_tlc_df_parquet folder.

NYC Data

Run a Query without an Index

Next, let's run the following query on the data frame to obtain a benchmark on how long the aggregate query takes to complete.

from pyspark.sql import functions as F
df = nyc_tlc_df.groupBy("passengerCount").agg(F.avg('tripDistance').alias('AvgTripDistance'), F.sum('tripDistance').alias('SumTripDistance'))
display(df)

Based on the results, this query took approximately 12 seconds to execute with 2 Spark executors and 8 cores on a 2.1M data frame.

Test running a query with non-indexed tables

Here are the results of the query.

Result of query

To view more details related to the query, let's open the following Spark UI.

Open the spark UI

Below are the details of the Query execution. As we can see the job took approximately 11 seconds and there were no indexed tables.

Details of the Query

Alternatively to using the Spark UI for query execution details, you could run df.explain() in a code cell to get the details within the notebook itself.

Import Hyperspace

Now that we have established a benchmark query without any indexed tables, lets demonstrate how to get started with Hyperspace in the Synapse workspace.

By running the following code, we will import Hyperspace.

from hyperspace import Hyperspace
hs = Hyperspace(spark)
Import the Hyperspace

Read the Parquet Files to a Data Frame

Next, we'll need to read the parquet files into a data frame. This is because the Hyperspace Index creation process requires the source files to be stored on disk. Hopefully, with a future release, it will have the capability of also creating indexes on in-memory data frames.

df=spark.read.parquet("/user/trusted-service-user/nyc_tlc_df_parquet/")
Read the parquet files to a data frame

Create a Hyperspace Index

The next step is to create a Hyperspace Index with the following code. Note that VendorID is my Indexed column. Additionally, I have included two columns that have also be used in my aggregate query.

from hyperspace import IndexConfig
hs.createIndex(df, IndexConfig("vendorID", ["passengerCount"], ["tripDistance"]))
Create a hyperspace index

The Hyperspace Indexing sub-system will automatically create a collection of snappy compressed files in an index folder in ADLS2. While this adds additional storage overhead costs, the benefits of a performant and optimized query may outweigh the costs. Hopefully with a future release, this process can be more in-memory driven.

Index files are auto created

Re-Run the Query with Hyperspace Index

Now that we have created a Hyperspace Index, let's re-run our original query to explore the execution time and query details.

from pyspark.sql import functions as F
df = nyc_tlc_df.groupBy("passengerCount").agg(F.avg('tripDistance').alias('AvgTripDistance'), F.sum('tripDistance').alias('SumTripDistance'))
display(df)

This time, the query only took ~2 seconds versus the original ~12 seconds. While the performance gains are harder to notice with a relatively small 1.2M record set, the benefits will be more notable when optimizing extremely big data sets.

Re run the query with indexes
The rerun query results are the same

Once again, let's open the Spark UI to view the query details.

Open the UI for more Query Details

As expected, the query details show us that both the included columns and the indexed columns were used by this query.

Note also that there is an additional 'Details' section in the Spark UI to view more detail about the Query.

The details of the re-run Query

Other Hyperspace Management APIs

There are a few other Hyperspace Management APIs that can be used directly from the Synapse workspace. The following code will display the indexes.

display(hs.indexes())
Display the indexes that were created

Additionally, the following codes lists other APIs to refresh, delete(soft-delete), restore and vaccum(hard-delete) the Hyperspace Indexes.

// Refreshes the given index if the source data changes.
hs.refreshIndex("index")
 
// Soft-deletes the given index and does not physically remove it from filesystem.
hs.deleteIndex("index")
 
// Restores the soft-deleted index.
hs.restoreIndex("index")
 
// Hard-delete the given index and physically remove it from filesystem.
hs.vacuumIndex("index")

Summary

In this article, we explored how to get started with creating Spark Indexes and Azure Synapse workspace by using the Hyperspace Index Management Sub-system. For more details on Hyperspace, including benefits, future roadmap, limitations, and big data performance benchmark tests, check out, Open-sourcing Hyperspace v0.1: An Indexing Subsystem for Apache Spark.

Next Steps


Last Updated: 2021-02-02


get scripts

next tip button



About the author
MSSQLTips author Ron L'Esteve Ron L'Esteve is a seasoned Data Architect who holds an MBA and MSF. Ron has over 15 years of consulting experience with Microsoft Business Intelligence, data engineering, emerging cloud and big data technologies.

View all my tips
Related Resources



Comments For This Article





download





Recommended Reading

Adding Users to Azure SQL Databases

Connect to On-premises Data in Azure Data Factory with the Self-hosted Integration Runtime - Part 1

Transfer Files from SharePoint To Blob Storage with Azure Logic Apps

Reading and Writing data in Azure Data Lake Storage Gen 2 with Azure Databricks

Create a Python Wheel File to Package and Distribute Custom Code














get free sql tips
agree to terms