Logging Azure Data Factory Pipeline Audit Data


By:   |   Updated: 2020-02-18   |   Comments (5)   |   Related: More > Azure Data Factory


Problem

In my last article, Azure Data Factory Pipeline to fully Load all SQL Server Objects to ADLS Gen2, I discussed how to create a pipeline parameter table in Azure SQL DB and drive the creation of snappy parquet files consisting of On-Premises SQL Server tables into Azure Data Lake Store Gen2. Now that I have a process for generating files in the lake, I would also like to implement a process to track the log activity for my pipelines that run and persist the data. What options do I have for creating and storing this log data?

Solution

Azure Data Factory is a robust cloud-based E-L-T tool that is capable of accommodating multiple scenarios for logging pipeline audit data.

In this article, I will discuss three of these possible options, which include:

  1. Updating Pipeline Status and Datetime columns in a static pipeline parameter table using an ADF Stored Procedure activity
  2. Generating a metadata CSV file for every parquet file that is created and storing the logs in hierarchical folders in ADLS2
  3. Creating a pipeline log table in Azure SQL Database and storing the pipeline activity as records in the table
LoggingOptions Image of Logging pipeline activity options

Prerequisites

Ensure that you have read and implemented Azure Data Factory Pipeline to fully Load all SQL Server Objects to ADLS Gen2, as this demo will be building a pipeline logging process on the pipeline copy activity that was created in the article.

Option 1: Create a Stored Procedure Activity

The Stored Procedure Activity is one of the transformation activities that Data Factory supports. We will use the Stored Procedure Activity to invoke a stored procedure in Azure SQL Database. For more information on ADF Stored Procedure Activity, see Transform data by using the SQL Server Stored Procedure activity in Azure Data Factory.

For this scenario, I would like to maintain my Pipeline Execution Status and Pipeline Date detail as columns in my Pipeline Parameter table rather than having a separate log table. The downside to this method is that it will not retain historical log data, but will simply update the values based on a lookup of the incoming files to records in the pipeline parameter table. This gives a quick, yet not necessarily robust, method of viewing the status and load date across all items in the pipeline parameter table.

I’ll begin by adding a stored procedure activity to my Copy-Table Activity so that as the process iterates on a table level basis for my stored procedure.

ADFStoredProc Option 1: ADF Stored Procedure data flow.

Next, I will add the following stored procedure to my Azure SQL Database where my pipeline parameter table resides. This procedure simply looks up the destination table name in the pipeline parameter table and updates the status and datetime for each table once the Copy-Table activity is successful.

SET ANSI_NULLS ON
GO
 
SET QUOTED_IDENTIFIER ON
GO 
 
CREATE PROCEDURE [dbo].[sql2adls_data_files_loaded] @dst_name NVARCHAR(500)
AS

SET NOCOUNT ON -- turns off messages sent back to client after DML is run, keep this here
DECLARE @Currentday DATETIME = GETDATE();
 
BEGIN TRY  
 
   BEGIN TRANSACTION -- BEGIN TRAN statement will increment transaction count from 0 to 1
      UPDATE [dbo].[pipeline_parameter] set pipeline_status = 'success', pipeline_datetime = @Currentday where dst_name = @dst_name ;
   COMMIT TRANSACTION -- COMMIT will decrement transaction count from 1 to 0 if dml worked
 
END TRY  
BEGIN CATCH  
   IF @@TRANCOUNT > 0  
      ROLLBACK  
 
   -- Return error information. 
   DECLARE @ErrorMessage nvarchar(4000),  @ErrorSeverity int;  
   SELECT @ErrorMessage = ERROR_MESSAGE(),@ErrorSeverity = ERROR_SEVERITY();  
   RAISERROR(@ErrorMessage, @ErrorSeverity, 1);  
END CATCH;  
 
GO 

After creating my stored procedure, I can confirm that it has been created in my Azure SQL Database.

StoredProcSQL 
Image to verify stored proc is created

I will then return to my data factory pipeline and configure the stored procedure activity. In the Stored Procedure tab, I will select the stored procedure that I just created. I will also add a new stored procedure parameter that references my destination name, which I had configured in the copy activity.

ConfigStoredProc Configure stored procedure steps

After saving, publishing and running the pipeline, I can see that my pipeline_datetime and pipeline_status columns have been updated as a result of the ADF Stored Procedure Activity.

PipelineParam Image of pipeline parameter table capturing pipeline status and date

Option 2: Create a CSV Log file in Azure Data Lake Store2

Since my Copy-Table activity is generating snappy parquet files into hierarchical ADLS2 folders, I also want to create a metadata .csv file which contains the pipeline activity. For this scenario, I have set up an Azure Data Factory Event Grid to listen for metadata files and then kick of a process to transform my table and load it into a curated zone.

I will start by adding a Copy activity for creating my log files and connecting it the Copy-Table activity. Similar to the previous process, this process will generate a .csv metadata file in a metadata folder per table.

Option2: Create CSV Image of pipeline flow for create log file

To configure the source dataset, I will select my source on-premise SQL Server.

Next, I will add the following query as my source query. As we can see, this query will contain a combination of pipeline activities, copy table activities, and user-defined parameters.

CreateLogFileSource source data set and query config for Option 2
SELECT '@{pipeline().DataFactory}' as DataFactory_Name,
'@{pipeline().Pipeline}' as Pipeline_Name,
'@{pipeline().RunId}' as RunId,
'@{item().src_name}' as Source,
'@{item().dst_name}' as Destination,
'@{pipeline().TriggerType}' as TriggerType,
'@{pipeline().TriggerId}' as TriggerId,
'@{pipeline().TriggerName}' as TriggerName,
'@{pipeline().TriggerTime}' as TriggerTime,
'@{activity('Copy-Table').output.rowsCopied}' as rowsCopied,
'@{activity('Copy-Table').output.rowsRead}' as RowsRead,
'@{activity('Copy-Table').output.usedParallelCopies}' as No_ParallelCopies,
'@{activity('Copy-Table').output.copyDuration}' as copyDuration_in_secs,
'@{activity('Copy-Table').output.effectiveIntegrationRuntime}' as effectiveIntegrationRuntime,
'@{activity('Copy-Table').output.executionDetails[0].source.type}' as Source_Type,
'@{activity('Copy-Table').output.executionDetails[0].sink.type}' as Sink_Type,
'@{activity('Copy-Table').output.executionDetails[0].status}' as Execution_Status,
'@{activity('Copy-Table').output.executionDetails[0].start}' as CopyActivity_Start_Time,
'@{utcnow()}' as CopyActivity_End_Time,
'@{activity('Copy-Table').output.executionDetails[0].detailedDurations.queuingDuration}' as CopyActivity_queuingDuration_in_secs,
'@{activity('Copy-Table').output.executionDetails[0].detailedDurations.timeToFirstByte}' as CopyActivity_timeToFirstByte_in_secs,
'@{activity('Copy-Table').output.executionDetails[0].detailedDurations.transferDuration}' as CopyActivity_transferDuration_in_secs

My sink will be a csv dataset with a .csv extension.

CSVDataset data set config for csv metadata files.

Below is the connection configuration that I will use for my csv dataset.

Option2ConnectionPath Parameters for connection path for generating metadata files.

The following parameterized path will ensure that the file is generate in the correct folder structure.

@{item().server_name}/@{item().src_db}/@{item().src_schema}/@{item().dst_name}/metadata/@{formatDateTime(utcnow(),'yyyy-MM-dd')}/@{item().dst_name}.csv

After I save, publish, and run my pipeline, I can see that a metadata folder has been created in my Server>database>schema>Destination_table location.

MetadataFolder Image to show that metadata folder is created.

When I open the metadata folder, I can see that there will be csv file per day that the pipeline runs.

MetadataDateFolder Image to show that metadata>date folder is created.

Finally, I can see that a metadata .csv file with the name of my table has been created.

MetadataFile Image to show that metadata>date>file is created.

When I download and open the file, I can see that all of the query results have been populated in my .csv file.

CSVFile Sample csv file containing all values.

Option 3: Create a log table in Azure SQL Database

My last scenario involves creating a log table in Azure SQL Database, where my parameter table resides and then writing the data to records in the ASQL table.

Again, for this option, I will start by adding a copy data activity connected to my Copy-Table activity.

Option3CreateSQLTable Pipeline flow to create sql log table.

Next, I will create the following table in my Azure SQL Database. This table will store and capture the pipeline and copy activity details.

SET ANSI_NULLS ON
GO
 
SET QUOTED_IDENTIFIER ON
GO
 
CREATE TABLE [dbo].[pipeline_log](
   [DataFactory_Name] [nvarchar](500) NULL,
   [Pipeline_Name] [nvarchar](500) NULL,
   [RunId] [nvarchar](500) NULL,
   [Source] [nvarchar](500) NULL,
   [Destination] [nvarchar](500) NULL,
   [TriggerType] [nvarchar](500) NULL,
   [TriggerId] [nvarchar](500) NULL,
   [TriggerName] [nvarchar](500) NULL,
   [TriggerTime] [nvarchar](500) NULL,
   [rowsCopied] [nvarchar](500) NULL,
   [RowsRead] [int] NULL,
   [No_ParallelCopies] [int] NULL,
   [copyDuration_in_secs] [nvarchar](500) NULL,
   [effectiveIntegrationRuntime] [nvarchar](500) NULL,
   [Source_Type] [nvarchar](500) NULL,
   [Sink_Type] [nvarchar](500) NULL,
   [Execution_Status] [nvarchar](500) NULL,
   [CopyActivity_Start_Time] [datetime] NULL,
   [CopyActivity_End_Time] [datetime] NULL,
   [CopyActivity_queuingDuration_in_secs] [nvarchar](500) NULL,
   [CopyActivity_timeToFirstByte_in_secs] [nvarchar](500) NULL,
   [CopyActivity_transferDuration_in_secs] [nvarchar](500) NULL
) ON [PRIMARY]
GO

Similar to my last pipeline option, I will configure my on-premise SQL server as my source and use the query provided in Option 2 as my source.

SQL_logSource Source dataset and query config for SQL Log Table

My sink will be a connection to the Azure SQL Db pipeline log table the I created earlier.

SQL_logSink Sink dataset config for SQL Log Table

Below are the connection details for the Azure SQL DB pipeline log table.

ASQLLog Image of log file connection to ASQLDB

When I save, publish and run my pipeline, I can see that the pipeline copy activity records have been captured in my dbo.pipeline_log table.

ASQL_LogTable_Data Image displaying the log data in sql table after Option3 pipeline runs.
Next Steps


Last Updated: 2020-02-18


get scripts

next tip button



About the author
MSSQLTips author Ron L'Esteve Ron L'Esteve is a seasoned Data Architect who holds an MBA and MSF. Ron has over 15 years of consulting experience with Microsoft Business Intelligence, data engineering, emerging cloud and big data technologies.

View all my tips



Comments For This Article




Saturday, April 10, 2021 - 1:50:43 AM - VT Back To Top (88508)
In case sometime if copy activity do not have time to first byte parameter in that case it fails saying that parameter do not found. I am facing the same case.

Wednesday, March 10, 2021 - 10:42:18 AM - Kevin Back To Top (88374)
Hello Ron,

Great post, thanks for sharing! Can we also retrieve into our .csv file how many bytes of data were processed by copy activity?

Wednesday, February 03, 2021 - 11:18:24 AM - Ron LEsteve Back To Top (88146)
Hello Alex,

Yes of course, please see my related article on logging error details from ADF: https://www.mssqltips.com/sqlservertip/6712/azure-data-factory-pipeline-logging-error-details/

Sincerely,

Ron L'Esteve

Wednesday, February 03, 2021 - 10:17:12 AM - Alex Back To Top (88145)
Hi, thank you for a great article! Is it possible to pass an error message the same way in case of copy activity fail?

Monday, May 18, 2020 - 11:15:08 PM - Patrick Back To Top (85690)

If continue an article on previously published on it would be nice to continue on the same table structre and DDLs.

You previous article refrers to pipeline_parameter table which is a different schema in this article.



download





Recommended Reading

Azure Data Factory Pipeline Email Notification Part 1

Azure Data Factory Lookup Activity Example

Azure Data Factory ForEach Activity Example

Azure Data Factory Get Metadata Example

Azure Data Factory vs SSIS vs Azure Databricks














get free sql tips
agree to terms