Lambda Architecture in Azure for Batch Processing
The Lambda architecture is a data-processing system designed to handle massive quantities of data by taking advantage of both batch (slow) and stream-processing (fast) methods. This approach to BIG DATA attempts to balance latency, throughput, and fault-tolerance by using batch processing lanes to provide comprehensive and accurate views of batch data, while simultaneously using real-time stream processing lanes to provide views of online data.
The simulated telemetry of an internet connected soda vending machine is a great use case to show how each of these lanes can be designed in the Azure.
How can we deploy and configure the right Azure components to generate and consume IOT data using a batch processing lane?
An Azure Stream Analytics job consists of an input, query, and an output. Stream Analytics can read data from Azure Event Hubs and write data to Azure Blob Storage. A Data Factory pipeline can be used to read the data from the logical data lake and write the data to an Azure SQL database.
Our boss has asked us to continue our investigation on connecting the machines owned by Collegiate Vending, Inc. to the cloud to capture soda product sales. The ultimate goal is to save the messages in an Azure SQL database for analysis and reporting. Our fictitious company has a contract to distribute Coca-Cola products and vending machines to various educational institutions in the United States of America.
Our job is to use a test program that sends simulated soda machine telemetry to an event hub service. An Azure Stream Analytics Job will save this data as JSON document to Azure Blob Storage using a directory structure reflecting date and hour. Last but not least, an Azure Data Factory pipeline will read this data from blob storage and write the data to an SQL database.
One of the things I like about a logical data lake (blob storage) is that reprocessing can happen, if required. The actual storage capacity (time) of the event hub is from one to seven days.
When designing a complex system, a working knowledge of many different Azure Services is required. Please look up the areas of knowledge that you might be lacking to understand how the pieces of the puzzle go together. The image below is a screen shot of my Azure Portal Dashboard showing all the services involved in this solution.
The data flow diagram shows how simulated vending messages flow thru the system on a batch rate set to one hour. This rate can be adjusted to shorter or longer if you want. I will be going over each component in the upcoming sections.
Azure Virtual Machine
Please refer to the prior article titled "Azure Event Hub Service Telemetry Example using PowerShell" for details behind the test program. The PowerShell script and supporting data files are seen below. There are two ways to automate this process in Azure.
One solution involves the creation of an Azure Virtual Machine with the Windows 10 operating system. The Windows Task Scheduler can be used to execute the process on an hourly schedule. Another solution involved creating an Azure Automation Runbook. However, there is no local storage associated with the runbook. The four JSON documents seen above will have to be placed into remote Azure Blob Storage. Also, the original PowerShell program will have to be modified to read the files from remote, not local storage.
Since the article is about building a Lambda Architecture for batch processing, I chose to implement the first solution. To follow naming conventions, my virtual machine is called vm4win10. Please type "task scheduler" to find the windows program seen below.
The general tab has two items that the developer needs to configure. First, I choose to name the task "Simulate Soda Events". Second, I told the windows program to execute under my user id even if I am disconnected from the computer.
The trigger tab is used to create a schedule to execute the program. I want the job to run five minutes past the hour. The job runs every day and every hour.
Last but not least, the actions tab is where you specify which program to execute. The actual Power Shell script is just a text file. Thus, we need to locate the interpreter for PowerShell. I found the 32 bit version of the program in the system32 directory. Make sure the name of the program is prefixed with the -File command. The start in directory should reflect the location of the Power Shell script. The image below shows the correct settings for this action.
The main window of the task scheduler library shows the results of the last executed task by name. We can see the task named "Simulate Soda Events" had a successful execution. The next component in our design is Azure Event Hub.
Azure Event Hub
Please refer to the prior article titled "The tale of two Azure Hubs" for details on how to deploy and configure an Azure Event Hub. One or more event hubs instances can be deployed to a given event hubs namespace. Thus, two objects need to be deployed. The image below shows the event hub called ehub4tips2020 being deployed to a name space called ehns4tips2020.
In the soda machine simulation script, 3397 machines were deployed at various institutions in the United States. Each vending machine has 45 slots. A total of 152,865 messages are sent to the event hub every hour. The image below shows the first two runs of our simulation script.
The process data button brings up a query designer to peek at the data in the event hub. The image below displays a subset of typical telemetry for a given machine.
At this point, we have to decide how to save the information in the Event Hub to Blob Storage. The event hub can be configured to write messages to a given storage container. Please see MSDN article for details. However, a new directory and Avro formatted file is created every five minutes. The second way to capture data is to write a Stream Analytics Job to save the data in real time to blob storage. This job can be configured to create a new directory every hour. Please see MSDN article for details.
My choice was simple. The Avro file contains an array of messages. Azure Data Factory was unable to parse such a format. Without changing the original PowerShell script, this automatic save to blob storage is not useful.
Azure Stream Analytics
The deploy query button in the previous screen shot is the easiest way to create a stream analytics job. The name of the job is called saj4tips2020 and is deployed in the same resource group as all the objects. Both a new consumer group and policy is created to access the event hub.
A lot of fancy processing can take place within a Stream Analytics job. Windowing and aggregating of the event hub data can take place within the job. However, a caution to the developer, the event hub only retains up to 7 days of data. What happens if the query used in the job contains a bug? There is no way to retrieve the lost data.
The image below shows almost the same query seen in the event hub. However, the details of the output have been changed.
Using a little Microsoft paint magic, I have put the complete output details together in one image. The output is now directed towards the storage account named sa4tips2020 and the storage container named sc4tips2020. I used the MSDN information to come up with a customer path pattern. The final output file is stored as a JSON document and contains the last hour of data.
Make sure you start the stream analytics job by pressing the play button. The image below shows the job has been running for at least 12 hours. The number of events equals the total number of vending machines times the number of slots per vending machine. The next task is to check Azure Blob Storage for the JSON files.
Azure Blob Storage
This section of the document will investigate the blob storage directory structure and output JSON files that has been created by the Azure Stream Analytics job. I choose to name the root directory logs since this was part of the sample path pattern supplied by Microsoft.
The directory structure is broken down by year, month, day and hour. I started collecting data 2 am UTC time on March 28th.
The resulting JSON file is about 34 MB in size. The name of the file is a random pattern follow by the correct extension. Let’s download the file and peek at the data.
I downloaded the JSON file to the c:\temp directory and opened it using notepad++. It is not surprising that the messages generated by the PowerShell script are seen in this file. Everything is working out as planned.
If you want to see the real time writing of messages to the JSON file, look for a new file at 5 minutes past the hour. You will see the JSON file increase in size as messages are captured by the stream analytics job and written to blob storage.
Azure SQL Database
The Azure SQL Database schema is composed of 7 different objects. There are two schemas named active and stage. The slot events table in the stage schema is defined with variable length character fields. That way, the insertion of data from an Azure Data Factory copy activity never fails. On the other hand, the slot events table in the active schema is correctly typed. The clustered primary key index on this table is defined on the event date, machine id and slot id fields. The watermark table keeps track of the last run date. The image below shows the seven user defined objects. The complete code can be found here.
I am going to review three of the objects that I think are most interesting. The increment watermark stored procedure updates the process date by one hour each time it is called. Please see the code below for the definition.
-- Create proc. CREATE PROCEDURE [stage].[increment_watermark] AS BEGIN UPDATE [stage].[current_watermark] SET [process_date_hour] = DATEADD(hh, 1, [process_date_hour]) END GO
The cleaned slot events view tries to cast the variable length character data in the stage table into strongly typed fields that are compatible with the target table.
-- Create view CREATE VIEW [stage].[cleaned_slot_events] AS SELECT try_cast([slot_id] as int) as slot_id, try_cast([machine_id] as int) as machine_id, try_cast([product_id] as int) as product_id, try_cast([event_dt] as datetime) as event_dt, try_cast([vend_cnt] as int) as vend_cnt FROM [stage].[slot_events] GO
The upsert slot events stored procedure uses the MERGE statement to execute an update on matching records and an insert for unmatched records.
-- Create proc CREATE PROCEDURE [stage].[upsert_slot_events] AS BEGIN -- Set no count SET NOCOUNT ON -- Merge the clean stage data with active table MERGE [active].[slot_events] AS trg USING ( SELECT * FROM [stage].[cleaned_slot_events] ) AS src ON src.[event_dt] = trg.[event_dt] and src.[machine_id] = trg.[machine_id] and src.[slot_id] = trg.[slot_id] -- Update condition WHEN MATCHED THEN UPDATE SET [slot_id] = src.[slot_id], [machine_id] = src.[machine_id], [product_id] = src.[product_id], [event_dt] = src.[event_dt], [vend_cnt] = src.[vend_cnt] -- Insert condition WHEN NOT MATCHED BY TARGET THEN INSERT ( [slot_id], [machine_id], [product_id], [event_dt], [vend_cnt] ) VALUES ( src.[slot_id], src.[machine_id], src.[product_id], src.[event_dt], src.[vend_cnt] ); END GO
In a nutshell, the database schema is setup to process new telemetry data every hour. This new data in the stage schema is upserted into the final table in the active schema. The watermark table allows the ETL program to be restarted to a prior date and hour at will. Now that we have the relational database schema worked out, we can focus on designing an Azure Data Factory pipeline to automate the reading of data from blob storage and the writing of data to the final active table.
ADF - Linked Services & Datasets
Today, we are going to have a crash course on how to develop pipelines using Azure Data Factory. Since I am doing a high level overview of the components, I suggest you look at the MSDN documentation if you get stuck on the details. I deployed an Azure Data Factory instance called adf4tips2020 using the web portal.
The first task of any Extract, Transform and Load (ETL) program is the definition connection strings. The Linked Services section of Azure Data Factory is where you define the connection information to existing data sources. Please note that I am using a naming convention when I created the objects seen below. I found this link on the internet by Navin D. that does a good job at naming pipeline objects. I suggest that you adopt this naming convention or create your own.
The above image shows three linked services defined to solve this problem. The blob storage connector uses an internal account key provided by the subscription to authenticate to the storage. There are many times in which a user name and password are used to connect to a service such as Azure SQL Database. This password should never be stored as clear text. That is why the key vault connector is defined. Please remember, when using the key vault connector it is important to add the managed identity for adf4tips2020 to both RBAC and Access Policy security.
The above image shows the three objects that we need to define for our solution. Datasets are a named view of the information which can be accessed via linked services. We need to define views to the source files which are stored in Azure Blob Storage as well as the target table which is defined in Azure SQL Database.
First, let’s start by defining the location of our JSON files. Under the dataset folder in factory resources, click new dataset. Please choose Azure Blob Storage as the storage type and JSON as the file type. The image below shows the connection tab of the defined dataset. Make sure the correct linked service is chosen and browse to the correct storage container. At this point, we could just hardcode the file path and call it a day. However, parameterized datasets are much more powerful. You can skip to the next section, the parameters tab, to create a parameter named VAR_DIR. Use the "add dynamic content" link to access the expression editor. Select the newly defined variable as the expression.
Unfortunately, defining a parameterized dataset is a balancing act between tabs. I am creating the parameter name VAR_DIR which points to the logical path to look for JSON files. The directory path follows the pattern we defined in the Azure Stream Analytics job.
It is always good to test the dataset before moving on. Use the preview data to access the JSON file stored in blob storage. Please see image below. For every newly created object in Azure Data Factory, make sure you save the object. Then publish the object to make it part of the runtime environment.
Next, let’s define the location of the target table. Make sure that the correct linked service is chosen from the drop down menu. Enter the name of the staging table as the target. The preview data should show no records exist in the table.
To recap, linked services are used to define the connection information for a particular data source. The LS_AZ_SQLDB linked service allows a dataset to connect to an Azure SQL Database. Datasets provide a view of the data from a given data source. The DS_ASQL_SLOT_EVENTS dataset allows a copy activity to either read from or write to from the staging table.
ADF – Pipelines & Triggers
In reality, Azure Data Factory (ADF) is an orchestration engine that shines at extracting (E) and loading (L) data. There are a ton of sources and destinations that can be used with the copy activity. Any transformations (T) have to be done by another system such as a Databricks, SQL Server and/or Hadoop. This all changed in 2019 with the introduction of mapping or wrangling data flows that execute spark scripts on a Databricks cluster.
A pipeline is a logical unit of work within ADF. It combines a bunch of activities in a certain order to implement an algorithm. A trigger can be event based or scheduled based. Most people use schedule-based triggers and they are analogous to a job. The trigger tells when and how often a pipeline will execute. The image below uses 5 different activities to solve our business problem. The tested pipeline ran to a successful execution.
The PL_TRAN_SODA_TELEMETRY uses a local variable to hold the results from a table lookup. Please define the variable named DATA_DIR as part of the pipeline object. I defaulted the variable to the first directory in my logical data lake.
The lookup activity uses a query to pull the last process date from the watermarktable. Make sure you pick the slot events data set. As a default, the query timeout is 120 minutes. I think we can but that down to 2 minutes. Since there is only one record, check the first row only checkbox.
The set variable activity is used to format the return value of the lookup activity into a fully qualified path to the JSON file in blob storage. Thus, the data in the watermark table determines the folder in blob storage to load data from.
The copy activity is the workhorse of every ADF pipeline that involves some type of storage. It has three sections that need definition: source, target, and mapping. The image below details the source location or JSON files stored in blob storage. Notice that the DATA_DIR variable makes the data source dynamic. Make sure you select the wild card file path.
The image below shows the target location or the staging table in the Azure SQL Database. Since we are continuously loading data each hour, we need to execute a TRUNCATE TABLE command as a pre-copy script.
The mapping section of the copy activity determines which JSON field gets copied to which SQL column. It is important to note that JSON is not a strongly typed file format. Thus, all the data is coming in as strings.
Like I said before, any transformations have to take place in another system. I am using a stored procedure activity to MERGE the data from the staging to the active table. The CLEANED_SLOT_EVENTS view already type casts in data in the staging table so that it matches the active table.
Last but not least, we need to advance the date time watermark by one hour. Again, another stored procedure activity is used to make this happen. Otherwise, we will keep on processing the same directory and same JSON file.
Triggers can be easily added to a current pipeline by selecting the add trigger button. Since I have a bunch of telemetry existing in the data lake, I am initially setting the reoccurrence window to every five minutes. Once the database is caught up in processing all the data lake data, I will set this value to 60 minutes.
In summary, an Azure Data Factory pipeline uses a bunch of activities to achieve a business goal. In our case, we want to read the latest JSON file and append it to the active table. The MERGE function is ideal since the operation is restartable when used with a watermark table entry.
I have been allowing the windows task scheduler, the stream analytics job, and the azure data factory trigger run for 6 plus days. It’s always a good idea to look at the captured data to make sure the system is sound. The enclosed T-SQL script has two queries. The first query is to get a total of vending events per time zone (hour) as well as a running total. We know that Collegiate Vending caters to clients in several time zones.
The above image shows the results after the very first run. The grand total is the actual number of events or messages (152,865) we should receive every hour. I modified the query to save the results to a temporary table. Then filter by the time zone column that does not equal the expected number of events. Rows 7 thru 15 are suspect. Since every client has a given hour every day.
Why are we not seeing our total magic number?
If you do the math on the smallest total, the percent difference is less that 0.4 percent. However, something is going on here. More investigation is required. I am suspecting that the clean slot events stored procedure might produce null values that will be rejected by the MERGE statement. Choosing a default value when a cast is unsuccessful might eliminate these issues.
The last image shows some interesting statistics about our system. We can see that in less than 7 days (1 week) we have generated 21 M rows. Multiplying this number by 52 weeks results in over a billion rows in one year. This is truly a BIG DATA problem. We are going to have a lot of fun investigating how to handle this in future articles.
Today we investigated how to create a batch processing lane for the Lambda Architecture. While there were six different components used in the design, it was not that difficult to setup and configure each component. I like the use of batch processing lanes since all the telemetry resides in a logical data lake, blob storage. That means the partitioning and windowing functions can be applied to the slot events table in the future. Setting a data retention policy when dealing with BIG DATA is a good idea. Any historical lookups can always be retrieved from the lake.
In the future, I will be spending more time talking about how to implement a spoke and hub model with Azure Data Factory. This is the central idea behind a data lake. If the ETL programs are written correctly, the producer or consumer can be changed without affecting the whole model. It is important to save the information in the lake in a consistent way. That means, if we currently write SAP data to the data lake today. Tomorrow, we can change to using Microsoft dynamics. The data platform will not change as long as the same logical files (tables) like raw materials, product inventory, sales, and etc. are used.
As a final note, I will go over how to implement a stream processing (fast lane) in a future article. We might even land the data in two different systems for different user groups. This is a common request.
- Changing the granularity of BIG DATA to save space
- Using incremental refresh with Power BI service
- Exploring full and incremental patterns with ADF
- Creating a self-made data lake with parquet files
About the author
View all my tips