By: John Miner | Updated: 2017-09-25 | Comments (4) | Related: More > Azure
Most companies are faced with the ever-growing big data problem. It is estimated that there will be 40 zettabytes of new data generated between 2012 to 2020. See the computer world article for details. Most of this data will be generated by sensors and machines. However, only a small portion of the data is available for users. How can IT professionals help business lines gather and process data from various sources?
There have been two schools of thought when dealing with big data. Please see the article from The Data Warehouse Institute on this subject.
Schema on write is represented by the traditional relational database. Raw data is ingested by an extract, transform and load (ETL) process. The data is stored in tables for quick retrieval. Each column in the table is declared as a particular data type. This ensures domain integrity of the information. Constraints are added to the tables to ensure entity and referential integrity. Typically, changes to these systems can be time consuming and costly. Only a portion of the total data owned by the company resides in the database.
Schema on read is represented by technologies such as Hadoop or PolyBase. These technologies process stored data as a simple text files. It is assumed that data integrity was applied during the generation of these text files. The actual definition of the table is applied during the read operation. Therefore, changes can be done quickly at the query level and do not take a-lot of time. Almost all the company’s data can be placed in blob storage or a logical data lake. Data Scientist can investigate this data to add predictive analytics to the company’s repertoire.
Today, we are going to talk about how to load data from Blob Storage into Azure SQL database using two Transact SQL commands that were updated with additional functionality in February 2017. Please see the SQL Server database engine team’s blog for details.
There are many financial companies that invest in the stock market via mutual funds. Our fictitious company named Big Jon Investments is such a company. They currently invest in the S&P 500 mutual fund but are thinking of investing in individual funds starting in 2017. The investments department will need historical data to make informed choices when picking stocks. Our boss has asked us to pull summarized daily trading data for the 505 stocks that make up the list. Last year’s historical data should be stored in comma delimited files and will be the start of a new data mart in Azure. We accomplished creating the comma delimited files in a prior tip.
Now, our boss wants us to copy the comma delimited files to Azure blob storage and load an Azure SQL database table with the combined results.
How can we solve this business problem?
We are going to use a combination of techniques that we learned in prior articles to accomplish the following tasks.
|1||Use PowerShell to sign into Azure.|
|2||Create and load an Azure blob container.|
|3||Create and define an Azure SQL database.|
|4||Defining an external data source.|
|5||Investigate the two Transact SQL commands|
Sign into Azure
We must log in with a valid subscription owner before we can do any work in the PowerShell ISE. You will be prompted for credentials which are the valid user name and password.
An account might have several subscriptions associated with it. The output below shows the subscriptions associated with [email protected] user name. Since the Developer Program Benefit only has $25 associated with it, we want to make sure any new deployments are on the Visual Studio Enterprise subscription. The last step is to choose the correct subscription to work with.
Create Blob Container
Most of the objects you create in Azure are contained inside what is called a resource group. For this project, I am going to create a resource group named rg4tips17, a storage account named sa4tips17 and a blob container named sc4tips17. Please see my article on “Using Azure to store and process large amounts of SQL data” for the PowerShell cmdlets to accomplish these tasks.
The image below shows the final blob container inside of the storage account.
How do we know the names of the files stored in the blob container?
Each time I load a bunch of files into the blob storage container, I am also going to post a packing list. This is a simple file that contains the file names of the data files to be loaded into the Azure SQL database. The image below shows the packing list file under the INBOUND directory.
The image below, taken from the PowerShell ISE command window, depicts each of the S&P 500 stock data files being loaded into the storage container. The PowerShell script used to create and load data into a blob storage container is enclosed for you to use.
Creating the Azure SQL Server
Almost everything in Azure is contained by a resource group. For this project, I am going to define a logical SQL database server named sql4tips17 inside the resource group named rg4tips17. The administrator account or user name for the server is jminer and the password is MS#tIpS$2017. Please see my article called “Deploying Azure SQL Database Using Resource Manager PowerShell cmdlets” for the details on these tasks.
The image below, taken from the PowerShell ISE command window, shows the Azure SQL server residing in the East US 2 region.
Security is a very important concern by every company. That is why Microsoft blocks all connections to the database by using a firewall. The image below shows a firewall rule named fr4laptop being added to allow my laptop access to the database server.
The enclosed PowerShell script contains the cmdlets used to create this server.
Defining the Azure SQL Database
I am going to use SQL Server Management Studio to manage my new Azure SQL server. We need to connect to the server to start crafting Transact SQL scripts. Choose the database engine as the server type. Enter the fully qualified name of the Azure SQL Server. Pick SQL Server authentication as the security option and supply the login/password of the server administrator. The image below shows a typical connect to server login window.
Right now, there are no custom user defined databases. By default, you should be in the master database. Execute the code snippet below to create the PORTFOLIO database.
I want to create two user defined schemas. The ACTIVE schema will have the combined data. The STAGE schema will contain intermediate results and auditing information.
There are some tables that we want to define. First, the [ACTIVE].[STOCKS] table will contain all of S&P 500 historical data for 2016. Second, the [STAGE].[STOCKS] table is a copy of the active table without the surrogate key column named ST_ID. I will explain later why we need this table.
-- -- Create ACTIVE table -- CREATE TABLE [ACTIVE].[STOCKS] ( ST_ID INT IDENTITY(1, 1) NOT NULL, ST_SYMBOL VARCHAR(32) NOT NULL, ST_DATE DATE NOT NULL, ST_OPEN REAL NULL, ST_HIGH REAL NULL, ST_LOW REAL NULL, ST_CLOSE REAL NULL, ST_ADJ_CLOSE REAL NULL, ST_VOLUME BIGINT NULL, CONSTRAINT [PK_STOCKS_ID] PRIMARY KEY CLUSTERED (ST_ID ASC) ); GO
Third, the table named [STAGE].[AUDIT] keeps track of the actions performed by our load process. I am adding a bunch of default constraints to capture key data of when the action was executed and who performed the action.
-- -- Create AUDIT table -- -- Create new table CREATE TABLE [STAGE].[AUDIT] ( AU_CHANGE_ID INT IDENTITY (1,1) NOT NULL, AU_CHANGE_DATE [datetime] NOT NULL, AU_CMD_TEXT [varchar](1024) NOT NULL, AU_CHANGE_BY [nvarchar](128) NOT NULL, AU_APP_NAME [nvarchar](128) NOT NULL, AU_HOST_NAME [nvarchar](128) NOT NULL, CONSTRAINT [PK_CHANGE_ID] PRIMARY KEY CLUSTERED (AU_CHANGE_ID ASC) ); GO -- Add defaults for key information ALTER TABLE [STAGE].[AUDIT] ADD CONSTRAINT [DF_CHANGE_DATE] DEFAULT (GETDATE()) FOR AU_CHANGE_DATE; ALTER TABLE [STAGE].[AUDIT] ADD CONSTRAINT [DF_CHANGE_TEXT] DEFAULT ('') FOR AU_CMD_TEXT; ALTER TABLE [STAGE].[AUDIT] ADD CONSTRAINT [DF_CHANGE_BY] DEFAULT (COALESCE(SUSER_SNAME(),'?')) FOR AU_CHANGE_BY; ALTER TABLE [STAGE].[AUDIT] ADD CONSTRAINT [DF_APP_NAME] DEFAULT (COALESCE(APP_NAME(),'?')) FOR AU_APP_NAME; ALTER TABLE [STAGE].[AUDIT] ADD CONSTRAINT [DF_HOST_NAME] DEFAULT (COALESCE(CAST(CONNECTIONPROPERTY('client_net_address') AS nvarchar(128)),'?')) FOR AU_HOST_NAME; GO
If you successfully executed the enclosed Transact SQL script, your object explorer window should have three newly defined tables.
Defining an external data source
Up to this point, I have not talked about anything new. In order for the BULK INSERT and OPENROWSET Transact SQL commands to access the Azure blob storage, we need to define an external data source. There are four simple steps that are required to create this object.
We need to create a master key from a given password. I suggest using a strong random password generated by one of the free online password sites such as https://www.random.org/passwords/. The screen shot below shows the newly created key.
The next step is to obtain a share access signature for our storage account. Use the Azure Portal to access the action menu for the storage account. Under the settings menu, select the share access signature window. Please note that I selected this signature to have full access to the blob storage account for one year. It is very important to note that the time is expressed in UTC format. Click the Generate SAS button to produce the token string. Copy and save the share access signature to a text file for later use.
The next step is to define a database credential called ‘SHARED ACCESS SIGNATURE’. The secret is the SAS token that we captured above. The screen shot below shows our new credential.
To finish this task, we want to create the external data source using objects we created beforehand. Make sure you choose the type as BLOB_STORAGE. The location is the fully qualified path to the Azure blob storage container. Use the newly defined database credential for access.
The OPENROWSET command
The OPENROWSET command can read both text and binary files from Azure Blob Storage. The T-SQL snippet below is our first try at reading the packing list file. We need to supply the path to blob storage file, the name of the data source and the large object binary (LOB) option. There are three valid options: BLOB – read in the file as a binary object, CLOB – read in the file as a character object, and NCLOB – read in the file as a Unicode object.
-- -- One big string -- SELECT * FROM OPENROWSET ( BULK 'INBOUND/PACKING-LIST.TXT', DATA_SOURCE = 'EDS_AZURE_4_STOCKS', SINGLE_CLOB ) AS RAW_DATA;
The results window in SSMS show our file as one big string. While this is interesting, we need to have the data in a tabular format. How can we transform the data into a table?
I talked about the STRING_SPLIT function in a prior article. If we replace line feeds with an empty string and split the data on the carriage return, we will end up with the correct results.
-- -- Packing List -- SELECT CAST(LIST_DATA.VALUE AS VARCHAR(256)) AS PKG_LIST FROM OPENROWSET ( BULK 'INBOUND/PACKING-LIST.TXT', DATA_SOURCE = 'EDS_AZURE_4_STOCKS', SINGLE_CLOB ) AS RAW_DATA CROSS APPLY STRING_SPLIT(REPLACE(REPLACE(RAW_DATA.BulkColumn, CHAR(10), 'ž'), CHAR(13), ''), 'ž') AS LIST_DATA;
The results window in SSMS shows our packing list file formatted as a table with each row representing a file to load.
This can get quite tedious. If we try to use this technique on the data files, we will have several iterations of splitting. First, we must split the one big string into rows. Next, we need to split each row into columns. Do not forget to throw out the header line. There must be a better way to process the data files.
The BULK INSERT command
The BULK INSERT command can read in a comma separated value (CSV) file from Azure Blob Storage. The T-SQL script below shows the format of this command. The FROM clause takes the path to the blob storage file as a parameter. The rest of the options are specified in the WITH clause. The external DATA SOURCE name is passed as a parameter. At this current time, the only documented FORMAT available is CSV. The CODEPAGE of 65001 is the same as UTF-8 encoding. The FIRSTROW option allows the developer to skip the header row and the TABLOCK option acquires a table lock during the load process.
Do you think this code below works?
-- -- Command fails -- BULK INSERT [ACTIVE].[STOCKS] FROM 'INBOUND/MSFT-FY2016.CSV' WITH ( DATA_SOURCE = 'EDS_AZURE_4_STOCKS', FORMAT = 'CSV', CODEPAGE = 65001, FIRSTROW = 2, TABLOCK );
The above command produces an error. That is because it is trying to match the number of fields in the file to the table. The surrogate key, defined as an identity column, is created each time a record is inserted. Therefore, there is a mismatch in columns between the source and destination.
This is one reason why I wanted a staging table. The ability to restart a ETL process without losing state is another reason. A staging table allows us to separate the bulk inserts from the final insert into the active table. Thus, if step one bulk insert fails, we can truncate the staging table, fix the offending file and restart the process.
The corrected T-SQL code below loads the stock data for Microsoft symbol during the calendar year of 2016 into a STAGE table. This statement works correctly since the eight columns found in the text file match the eight columns in the staging table.
-- -- Use staging table -- -- Clear data TRUNCATE TABLE [STAGE].[STOCKS]; -- Load data BULK INSERT [STAGE].[STOCKS] FROM 'INBOUND/MSFT-FY2016.CSV' WITH ( DATA_SOURCE = 'EDS_AZURE_4_STOCKS', FORMAT = 'CSV', CODEPAGE = 65001, FIRSTROW = 2, TABLOCK ); -- Show data SELECT * FROM [STAGE].[STOCKS];
The results window in SSMS shows the first ten rows stored in the table.
The load process that we are creating can process many files. If an error occurs, how do we track down which file caused the issue?
That is why it is important to use an audit table. The T-SQL code below tests the default constraints of the AUDIT table we previously built. We only need to pass the T-SQL command that we want recorded as a parameter. In our case, it will be a dynamic BULK INSERT command. For the unit test, I am going to pass the word ‘TEST’ as the parameter.
-- -- Use auditing table -- -- Insert test record INSERT INTO [STAGE].[AUDIT] (AU_CMD_TEXT) VALUES ('TEST') GO -- View test record SELECT * FROM [STAGE].[AUDIT] GO -- Clear table TRUNCATE TABLE [STAGE].[AUDIT]; GO
The results window in SSMS shows the information recorded in the [STAGE].[AUDIT] table.
Custom Stored Procedure
A custom stored procedure is a great way to package a bunch of T-SQL statements into a repeatable process. The [ACTIVE].[LOAD_FROM_BLOB_STORAGE] takes one parameter as input. The verbose flag indicates whether or not the code displays debugging information to the results window.
The following algorithm is used by this stored procedure.
|1.0||Truncate the staging table.|
|2.0||Open cursor on packing list file.|
|3.0||For each record in the file.|
|3.1||Create a dynamic SQL statement (BULK INSERT).|
|3.2||Execute SQL statement.|
|3.2||Insert statement into audit table.|
|4.0||Close the cursor.|
The T-SQL code below implements the algorithm as a stored procedure with additional TRY/CATCH logic just in-case an error occurs.
-- -- Load from blob storage -- -- Drop stored procedure DROP PROCEDURE IF EXISTS [ACTIVE].[LOAD_FROM_BLOB_STORAGE] GO -- Create stored procedure CREATE PROCEDURE [ACTIVE].[LOAD_FROM_BLOB_STORAGE] @VAR_VERBOSE_FLAG CHAR(1) = 'N' AS BEGIN -- Error handling variables DECLARE @VAR_ERR_NUM INT; DECLARE @VAR_ERR_LINE INT; DECLARE @VAR_ERR_MSG VARCHAR(1024); DECLARE @VAR_ERR_PROC VARCHAR(1024); -- Declare variables DECLARE @VAR_NEXT_FILE VARCHAR(256); DECLARE @VAR_AZURE_BLOB VARCHAR(256); DECLARE @VAR_SQL_STMT NVARCHAR(1024); -- No counting of rows SET NOCOUNT ON; -- Debugging IF (@VAR_VERBOSE_FLAG = 'Y') BEGIN PRINT '[LOAD_FROM_BLOB_STORAGE] - STARTING TO EXECUTE STORED PROCEDURE.'; PRINT ' '; END; -- ** ERROR HANDLING - START TRY ** BEGIN TRY -- Clear data TRUNCATE TABLE [STAGE].[STOCKS]; -- Define cursor DECLARE VAR_FILE_CURSOR CURSOR FOR SELECT CAST(LIST_DATA.VALUE AS VARCHAR(256)) AS PKG_LIST FROM OPENROWSET ( BULK 'INBOUND/PACKING-LIST.TXT', DATA_SOURCE = 'EDS_AZURE_4_STOCKS', SINGLE_CLOB ) AS RAW_DATA CROSS APPLY STRING_SPLIT(REPLACE(REPLACE(RAW_DATA.BulkColumn, CHAR(10), 'ž'), CHAR(13), ''), 'ž') AS LIST_DATA WHERE LTRIM(RTRIM(LIST_DATA.VALUE)) <> ''; -- Open cursor OPEN VAR_FILE_CURSOR; -- Get first row FETCH NEXT FROM VAR_FILE_CURSOR INTO @VAR_NEXT_FILE; SET @VAR_AZURE_BLOB = CHAR(39) + 'INBOUND/' + @VAR_NEXT_FILE + CHAR(39); -- While there is data WHILE (@@fetch_status = 0) BEGIN -- Debugging IF (@VAR_VERBOSE_FLAG = 'Y') BEGIN PRINT '[LOAD_FROM_BLOB_STORAGE] - LOADING FILE ' + @VAR_AZURE_BLOB + '.'; PRINT ' '; END; -- Create dynamic SQL statement SELECT @VAR_SQL_STMT = ' BULK INSERT [STAGE].[STOCKS] FROM ' + @VAR_AZURE_BLOB + ' WITH ( DATA_SOURCE = ''EDS_AZURE_4_STOCKS'', FORMAT = ''CSV'', CODEPAGE = 65001, FIRSTROW = 2, TABLOCK );' -- Debugging IF (@VAR_VERBOSE_FLAG = 'Y') BEGIN PRINT @VAR_SQL_STMT PRINT ' ' END; -- Execute Bulk Insert EXEC SP_EXECUTESQL @VAR_SQL_STMT; -- Insert test record INSERT INTO [STAGE].[AUDIT] (AU_CMD_TEXT) VALUES (@VAR_SQL_STMT); -- Grab the next record FETCH NEXT FROM VAR_FILE_CURSOR INTO @VAR_NEXT_FILE; SET @VAR_AZURE_BLOB = CHAR(39) + 'INBOUND/' + @VAR_NEXT_FILE + CHAR(39); END -- Close cursor CLOSE VAR_FILE_CURSOR; -- Release memory DEALLOCATE VAR_FILE_CURSOR; -- ** ERROR HANDLING - END TRY ** END TRY -- ** Error Handling - Begin Catch ** BEGIN CATCH -- Grab variables SELECT @VAR_ERR_NUM = ERROR_NUMBER(), @VAR_ERR_PROC = ERROR_PROCEDURE(), @VAR_ERR_LINE = ERROR_LINE(), @VAR_ERR_MSG = ERROR_MESSAGE(); -- Raise error RAISERROR ('An error occurred within a user transaction. Error Number : %d Error Message : %s Affected Procedure : %s Affected Line Number: %d' , 16, 1 , @VAR_ERR_NUM, @VAR_ERR_MSG, @VAR_ERR_PROC, @VAR_ERR_LINE); -- ** Error Handling - End Catch ** END CATCH END GO
The T-SQL code below contains two steps to load the STAGE table with new data and append the data to the ACTIVE table. If I was using SQL Server Agent to schedule a periodic job, these would be two steps in a typical job. Since Azure SQL database does not support the agent service, how can we schedule this job? I leave that answer for the next tip I provide.
At the end of the script is an aggregation query to validate the data we just loaded.
-- -- Test load process -- -- Import blob files into stage EXEC [ACTIVE].[LOAD_FROM_BLOB_STORAGE] @VAR_VERBOSE_FLAG = 'N'; GO -- Move from stage to active INSERT INTO [ACTIVE].[STOCKS] SELECT * FROM [STAGE].[STOCKS]; GO -- Validate data SELECT [ST_SYMBOL], COUNT(*) AS TRADING_DAYS FROM [ACTIVE].[STOCKS] GROUP BY [ST_SYMBOL] ORDER BY [ST_SYMBOL]; GO
The screen shot below shows the 505 bulk insert actions as well as the 1 test record. If we copy the AU_CMD_TEXT field to the Notepad++ application, we can see the details of the very last BULK INSERT statement.
The output below shows the results of the aggregation query. Why are there not 505 distinct company symbols in our table? Did something go wrong with the load process?
If we look at the source files on my laptop, we can see that 7 files do not have data. We need to research why these companies do not have data for the calendar year of 2016. In short, the load process works fine.
Right now, we are consuming all fields in the text file. Also, we can’t insert directly into the ACTIVE table since we are not naming the fields in the BULK INSERT command. How can we overcome these limitations?
Using a format file
The OPENROWSET command can use a format file to tell the engine exactly how to deal with the data file. You can skip a column, re-order the columns or re-name a column using this file. The screen shot below shows my format file for the S&P 500 stock data. The first line of the file defines the database version and the second line of the file states how many columns are in the file. The rest of the file describes each column in the file. We can set the data type, maximum column size, column delimiter, SQL table field position and name of each column. I am enclosing this file as a sample for you to use in the future.
I am going to use PowerShell to upload the format file into BLOB storage under the INBOUND directory. The output, taken from the PowerShell ISE environment, shows the successful posting of the file to Azure.
The T-SQL snippet below loads the ACTIVE table with the stock data. Please note we are using two more options with the OPENROWSET command. First, the FORMATFILE parameter specifies the location of the Azure blob file. Second, the FORMATFILE_DATA_SOURCE parameter uses the same external data file source for connecting to Azure blob storage.
-- -- Using a format file -- -- Insert directly into table INSERT INTO [ACTIVE].[STOCKS] SELECT REPLACE([SYMBOL], '"', '') AS ST_SYMBOL, REPLACE([DATE], '"', '') AS ST_DATE, [OPEN] AS ST_OPEN, [HIGH] AS ST_HIGH, [LOW] AS ST_LOW, [CLOSE] AS ST_CLOSE, [ADJCLOSE] AS ST_ADJCLOSE, [VOLUME] AS ST_VOLUME FROM OPENROWSET ( BULK 'INBOUND/MSFT-FY2016.CSV', DATA_SOURCE = 'EDS_AZURE_4_STOCKS', FORMATFILE = 'INBOUND/STOCK-DATA.FMT', FORMATFILE_DATA_SOURCE = 'EDS_AZURE_4_STOCKS', FIRSTROW = 2 ) AS RAW_DATA; GO -- Show the data SELECT TOP 10 * FROM [ACTIVE].[STOCKS]; GO
The output below shows the top 10 rows of the Microsoft stock data from 2016.
In a nutshell, format files allow you more control over how to read and use the data. The enclosed file contains all the T-SQL statements that were used for this proof of concept.
Big Jon’s Investments wanted to collect S&P 500 historical data from a third party and load it into an Azure SQL database. Azure blob storage was used as a logical data lake for the company. PowerShell was used to collect and post the historical data to blob storage. Microsoft provided the database developer with updated BULK INSERT and OPENROWSET Transact SQL commands. New syntax has been added to these commands to handle files located in blob storage.
The two Transact SQL commands depend on an external data source being defined. First, a needs to be defined for a given unique password. Second, a share access signature (SAS) needs to be created for the blob storage account. Third, a database credential should be defined using the SAS token. Last, the external data source is created for access to the blob storage container.
There are some outstanding items to address. First, how do we move all the data files and packing list to an archive directory after successful processing? Second, how can we schedule the load from the blob storage process? Third, what happens if the total amount of data exceeds the 4 TB limit for an Azure SQL database?
- These topics will be talked about in the future.
- Using Azure Automation to schedule a PowerShell workflow.
- Processing large amounts of data with Azure SQL Data Warehouse.
Last Updated: 2017-09-25
About the author
View all my tips