Azure Data Factory Pipeline Logging Error Details


By:   |   Updated: 2021-01-20   |   Comments   |   Related: More > Azure Data Factory


Problem

In my previous article, Logging Azure Data Factory Pipeline Audit Data, I discussed a variety of methods for capturing Azure Data Factory pipeline logs and persisting the data to either a SQL Server table or within Azure Data Lake Storage Gen2. While this process of capturing pipeline log data is valuable when the pipeline activities succeed, how can we also capture and persist error details related to Azure Data Factory pipelines when activities within the pipeline fail?

Solution

In this article, I will cover how to capture and persist Azure Data Factory pipeline errors to an Azure SQL Database table. Additionally, we will re-cap the pipeline parameter process that I had discussed in my previous articles to demonstrate how the pipeline_errors, pipeline_log, and pipeline_parameter relate to each other.

Explore and Understand the Meta-Data driven ETL Approach

Prior to continuing with the demonstration, try to read my previous articles as a pre-requisite to gain background and knowledge around the end-to-end meta-data driven E-T-L process.

To re-cap the tables needed for this process, I have included the diagram below which illustrates how the pipeline_parameter, pipeline_log, and pipeline_error tables are interconnected with each other.

diagram showing relationship between tables.

Create a Parameter Table

The following script will create the pipeline_parameter table with column parameter_id as the primary key. Note that this table drives the meta-data ETL approach.

SET ANSI_NULLS ON
GO
 
SET QUOTED_IDENTIFIER ON
GO
 
CREATE TABLE [dbo].[pipeline_parameter](
   [PARAMETER_ID] [int] IDENTITY(1,1) NOT NULL,
   [server_name] [nvarchar](500) NULL,
   [src_type] [nvarchar](500) NULL,
   [src_schema] [nvarchar](500) NULL,
   [src_db] [nvarchar](500) NULL,
   [src_name] [nvarchar](500) NULL,
   [dst_type] [nvarchar](500) NULL,
   [dst_name] [nvarchar](500) NULL,
   [include_pipeline_flag] [nvarchar](500) NULL,
   [partition_field] [nvarchar](500) NULL,
   [process_type] [nvarchar](500) NULL,
   [priority_lane] [nvarchar](500) NULL,
   [pipeline_date] [nvarchar](500) NULL,
   [pipeline_status] [nvarchar](500) NULL,
   [load_synapse] [nvarchar](500) NULL,
   [load_frequency] [nvarchar](500) NULL,
   [dst_folder] [nvarchar](500) NULL,
   [file_type] [nvarchar](500) NULL,
   [lake_dst_folder] [nvarchar](500) NULL,
   [spark_flag] [nvarchar](500) NULL,
   [dst_schema] [nvarchar](500) NULL,
   [distribution_type] [nvarchar](500) NULL,
   [load_sqldw_etl_pipeline_date] [datetime] NULL,
   [load_sqldw_etl_pipeline_status] [nvarchar](500) NULL,
   [load_sqldw_curated_pipeline_date] [datetime] NULL,
   [load_sqldw_curated_pipeline_status] [nvarchar](500) NULL,
   [load_delta_pipeline_date] [datetime] NULL,
   [load_delta_pipeline_status] [nvarchar](500) NULL,
PRIMARY KEY CLUSTERED 
(
   [PARAMETER_ID] ASC
)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF) ON [PRIMARY]
) ON [PRIMARY]
GO

Create a Log Table

This next script will create the pipeline_log table for capturing the Data Factory success logs. In this table, column log_id is the primary key and column parameter_id is a foreign key with a reference to column parameter_id from the pipeline_parameter table.

SET ANSI_NULLS ON
GO
 
SET QUOTED_IDENTIFIER ON
GO
 
CREATE TABLE [dbo].[pipeline_log](
   [LOG_ID] [int] IDENTITY(1,1) NOT NULL,
   [PARAMETER_ID] [int] NULL,
   [DataFactory_Name] [nvarchar](500) NULL,
   [Pipeline_Name] [nvarchar](500) NULL,
   [RunId] [nvarchar](500) NULL,
   [Source] [nvarchar](500) NULL,
   [Destination] [nvarchar](500) NULL,
   [TriggerType] [nvarchar](500) NULL,
   [TriggerId] [nvarchar](500) NULL,
   [TriggerName] [nvarchar](500) NULL,
   [TriggerTime] [nvarchar](500) NULL,
   [rowsCopied] [nvarchar](500) NULL,
   [DataRead] [int] NULL,
   [No_ParallelCopies] [int] NULL,
   [copyDuration_in_secs] [nvarchar](500) NULL,
   [effectiveIntegrationRuntime] [nvarchar](500) NULL,
   [Source_Type] [nvarchar](500) NULL,
   [Sink_Type] [nvarchar](500) NULL,
   [Execution_Status] [nvarchar](500) NULL,
   [CopyActivity_Start_Time] [nvarchar](500) NULL,
   [CopyActivity_End_Time] [nvarchar](500) NULL,
   [CopyActivity_queuingDuration_in_secs] [nvarchar](500) NULL,
   [CopyActivity_transferDuration_in_secs] [nvarchar](500) NULL,
 CONSTRAINT [PK_pipeline_log] PRIMARY KEY CLUSTERED 
(
   [LOG_ID] ASC
)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF) ON [PRIMARY]
) ON [PRIMARY]
GO
 
ALTER TABLE [dbo].[pipeline_log]  WITH CHECK ADD FOREIGN KEY([PARAMETER_ID])
REFERENCES [dbo].[pipeline_parameter] ([PARAMETER_ID])
ON UPDATE CASCADE
GO

Create an Error Table

This next script will create a pipeline_errors table which will be used to capture the Data Factory error details from failed pipeline activities. In this table, column error_id is the primary key and column parameter_id is a foreign key with a reference to column parameter_id from the pipeline_parameter table.

SET ANSI_NULLS ON
GO
 
SET QUOTED_IDENTIFIER ON
GO
 
CREATE TABLE [dbo].[pipeline_errors](
   [error_id] [int] IDENTITY(1,1) NOT NULL,
   [parameter_id] [int] NULL,
   [DataFactory_Name] [nvarchar](500) NULL,
   [Pipeline_Name] [nvarchar](500) NULL,
   [RunId] [nvarchar](500) NULL,
   [Source] [nvarchar](500) NULL,
   [Destination] [nvarchar](500) NULL,
   [TriggerType] [nvarchar](500) NULL,
   [TriggerId] [nvarchar](500) NULL,
   [TriggerName] [nvarchar](500) NULL,
   [TriggerTime] [nvarchar](500) NULL,
   [No_ParallelCopies] [int] NULL,
   [copyDuration_in_secs] [nvarchar](500) NULL,
   [effectiveIntegrationRuntime] [nvarchar](500) NULL,
   [Source_Type] [nvarchar](500) NULL,
   [Sink_Type] [nvarchar](500) NULL,
   [Execution_Status] [nvarchar](500) NULL,
   [ErrorDescription] [nvarchar](max) NULL,
   [ErrorCode] [nvarchar](500) NULL,
   [ErrorLoggedTime] [nvarchar](500) NULL,
   [FailureType] [nvarchar](500) NULL,
 CONSTRAINT [PK_pipeline_error] PRIMARY KEY CLUSTERED 
(
   [error_id] ASC
)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF) ON [PRIMARY]
) ON [PRIMARY] TEXTIMAGE_ON [PRIMARY]
GO
 
ALTER TABLE [dbo].[pipeline_errors]  WITH CHECK ADD FOREIGN KEY([parameter_id])
REFERENCES [dbo].[pipeline_parameter] ([PARAMETER_ID])
ON UPDATE CASCADE
GO

Create a Stored Procedure to Update the Log Table

Now that we have all the necessary SQL Tables in place, we can begin creating a few necessary stored procedures. Let’s begin with the following script which will create a stored procedure to update the pipeline_log table with data from the successful pipeline run. Note that this stored procedure will be called from the Data Factory pipeline at run-time.

SET ANSI_NULLS ON
GO
 
SET QUOTED_IDENTIFIER ON
GO
 
CREATE PROCEDURE [dbo].[sp_UpdateLogTable]
@DataFactory_Name VARCHAR(250),
@Pipeline_Name VARCHAR(250),
@RunID VARCHAR(250),
@Source VARCHAR(300),
@Destination VARCHAR(300),
@TriggerType VARCHAR(300),
@TriggerId VARCHAR(300),
@TriggerName VARCHAR(300),
@TriggerTime VARCHAR(500),
@rowsCopied VARCHAR(300),
@DataRead INT,
@No_ParallelCopies INT,
@copyDuration_in_secs VARCHAR(300),
@effectiveIntegrationRuntime VARCHAR(300),
@Source_Type VARCHAR(300),
@Sink_Type VARCHAR(300),
@Execution_Status VARCHAR(300),
@CopyActivity_Start_Time VARCHAR(500),
@CopyActivity_End_Time VARCHAR(500),
@CopyActivity_queuingDuration_in_secs VARCHAR(500),
@CopyActivity_transferDuration_in_secs VARCHAR(500)
AS
INSERT INTO [pipeline_log]
(
      [DataFactory_Name]
      ,[Pipeline_Name]
      ,[RunId]
      ,[Source]
      ,[Destination]
      ,[TriggerType]
      ,[TriggerId]
      ,[TriggerName]
      ,[TriggerTime]
      ,[rowsCopied]
      ,[DataRead]
      ,[No_ParallelCopies]
      ,[copyDuration_in_secs]
      ,[effectiveIntegrationRuntime]
      ,[Source_Type]
      ,[Sink_Type]
      ,[Execution_Status]
      ,[CopyActivity_Start_Time]
      ,[CopyActivity_End_Time]
      ,[CopyActivity_queuingDuration_in_secs]
      ,[CopyActivity_transferDuration_in_secs]
)
VALUES
(
@DataFactory_Name
      ,@Pipeline_Name
      ,@RunId
      ,@Source
      ,@Destination
      ,@TriggerType
      ,@TriggerId
      ,@TriggerName
      ,@TriggerTime
      ,@rowsCopied
      ,@DataRead
      ,@No_ParallelCopies
      ,@copyDuration_in_secs
      ,@effectiveIntegrationRuntime
      ,@Source_Type
      ,@Sink_Type
      ,@Execution_Status
      ,@CopyActivity_Start_Time
      ,@CopyActivity_End_Time
      ,@CopyActivity_queuingDuration_in_secs
      ,@CopyActivity_transferDuration_in_secs
)
GO

Create a Stored Procedure to Update the Errors Table

Next, lets run the following script which will create a stored procedure to update the pipeline_errors table with detailed error data from the failed pipeline run. Note that this stored procedure will be called from the Data Factory pipeline at run-time.

SET ANSI_NULLS ON
GO
 
SET QUOTED_IDENTIFIER ON
GO 
 
CREATE PROCEDURE [dbo].[sp_UpdateErrorTable]
   @DataFactory_Name [nvarchar](500) NULL,
   @Pipeline_Name [nvarchar](500) NULL,
   @RunId [nvarchar](500) NULL,
   @Source [nvarchar](500) NULL,
   @Destination [nvarchar](500) NULL,
   @TriggerType [nvarchar](500) NULL,
   @TriggerId [nvarchar](500) NULL,
   @TriggerName [nvarchar](500) NULL,
   @TriggerTime [nvarchar](500) NULL,
   @No_ParallelCopies [int] NULL,
   @copyDuration_in_secs [nvarchar](500) NULL,
   @effectiveIntegrationRuntime [nvarchar](500) NULL,
   @Source_Type [nvarchar](500) NULL,
   @Sink_Type [nvarchar](500) NULL,
   @Execution_Status [nvarchar](500) NULL,
   @ErrorDescription [nvarchar](max) NULL,
   @ErrorCode [nvarchar](500) NULL,
   @ErrorLoggedTime [nvarchar](500) NULL,
   @FailureType [nvarchar](500) NULL
AS
INSERT INTO [pipeline_errors]
 
(
    [DataFactory_Name],
   [Pipeline_Name],
   [RunId],
   [Source],
   [Destination],
   [TriggerType],
   [TriggerId],
   [TriggerName],
   [TriggerTime],
   [No_ParallelCopies],
   [copyDuration_in_secs],
   [effectiveIntegrationRuntime],
   [Source_Type],
   [Sink_Type],
   [Execution_Status],
   [ErrorDescription],
   [ErrorCode],
   [ErrorLoggedTime],
   [FailureType]
)
VALUES
(
   @DataFactory_Name,
   @Pipeline_Name,
   @RunId,
   @Source,
   @Destination,
   @TriggerType,
   @TriggerId,
   @TriggerName,
   @TriggerTime,
   @No_ParallelCopies,
   @copyDuration_in_secs,
   @effectiveIntegrationRuntime,
   @Source_Type,
   @Sink_Type,
   @Execution_Status,
   @ErrorDescription,
   @ErrorCode,
   @ErrorLoggedTime,
   @FailureType
)
GO

Create a Source Error SQL Table

Recall from my previous article, Azure Data Factory Pipeline to fully Load all SQL Server Objects to ADLS Gen2, that we used a source SQL Server Table that we then moved to the Data Lake Storage Gen2 and ultimately into Synapse DW. Based on this process, we will need to test a known error within the Data Factory pipeline and process. It is known that generally a varchar(max) datatype containing at least 8000+ characters will fail when being loaded into Synapse DW since varchar(max) is an unsupported data type. This seems like a good use case for an error test.

The following table dbo.MyErrorTable contains two columns with col1 being the varchar(max) datatype.

datatypes in the error table

Within dbo.MyErrorTable I have added a large block of text and decided to randomly choose Sample text for Roma : the novel of ancient Rome by Steven Saylor. After doing some editing of the text, I confirmed that col1 contains 8001 words, which is sure to fail my Azure Data Factory pipeline and trigger a record to be created in the pipeline_errors table.

error table

Add Records to Parameter Table

Now that we’ve identified the source SQL tables to run through the process, I’ll add them to the pipeline_parameter table. For this demonstration I have added the Error table that we created in the previous step along with a regular table that we would expect to succeed to demonstrate both a success and failure end to end logging process.

pipeline_parameter

Verify the Azure Data Lake Storage Gen2 Folders and Files

After running the pipeline to load my SQL tables to Azure Data Lake Storage Gen2, we can see that the destination ADLS2 container now has both of the tables in snappy compressed parquet format.

my error tables

As an additional verification step, we can see that the folder contains the expected parquet file.

my error table

Configure the Pipeline Lookup Activity

It’s now time to build and configure the ADF pipeline. My previous article, Load Data Lake files into Azure Synapse Analytics Using Azure Data Factory, covers the details on how to build this pipeline. To recap the process, the select query within the lookup gets the list of parquet files that need to be loaded to Synapse DW and then passes them on to each loop which will load the parquet files to Synapse DW.

Synapse

Configure the Pipeline Foreach Loop Activity

The Foreach loop contains the Copy Table activity with takes the parquet files and loads them to Synapse DW while auto-creating the tables. If the Copy-Table activity succeeds, it will log the pipeline run data to the pipeline_log table. However, if the Copy-Table activity fails, it will log the pipeline error details to the pipeline_errors table.

Configure Stored Procedure to Update the Log Table

Notice that the UpdateLogTable Stored procedure that we created earlier will be called by the success stored procedure activity.

Stored procedure

Below are the stored procedure parameters that will Update the pipeline_log table and can be imported directly from the Stored Procedure.

procedure parameters

The following values will need to be entered into the stored procedure parameter values.

Name Values
DataFactory_Name @{pipeline().DataFactory}
Pipeline_Name @{pipeline().Pipeline}
RunId @{pipeline().RunId}
Source @{item().src_name}
Destination @{item().dst_name}
TriggerType @{pipeline().TriggerType}
TriggerId @{pipeline().TriggerId}
TriggerName @{pipeline().TriggerName}
TriggerTime @{pipeline().TriggerTime}
rowsCopied @{activity('Copy-Table').output.rowsCopied}
RowsRead @{activity('Copy-Table').output.rowsRead}
No_ParallelCopies @{activity('Copy-Table').output.usedParallelCopies}
copyDuration_in_secs @{activity('Copy-Table').output.copyDuration}
effectiveIntegrationRuntime @{activity('Copy-Table').output.effectiveIntegrationRuntime}
Source_Type @{activity('Copy-Table').output.executionDetails[0].source.type}
Sink_Type @{activity('Copy-Table').output.executionDetails[0].sink.type}
Execution_Status @{activity('Copy-Table').output.executionDetails[0].status}
CopyActivity_Start_Time @{activity('Copy-Table').output.executionDetails[0].start}
CopyActivity_End_Time @{utcnow()}
CopyActivity_queuingDuration_in_secs @{activity('Copy-Table').output.executionDetails[0]. detailedDurations.queuingDuration}
CopyActivity_transferDuration_in_secs @{activity('Copy-Table').output.executionDetails[0]. detailedDurations.transferDuration}

Configure Stored Procedure to Update the Error Table

The last stored procedure within the Foreach loop activity is the UpdateErrorTable Stored procedure that we created earlier and will be called by the failure stored procedure activity.

Stored Procedure

Below are the stored procedure parameters that will Update the pipeline_errors table and can be imported directly from the Stored Procedure.

user properties

The following values will need to be entered into the stored procedure parameter values.

Description Source
DataFactory_Name @{pipeline().DataFactory}
Pipeline_Name @{pipeline().Pipeline}
RunId @{pipeline().RunId}
Source @{item().src_name}
Destination @{item().dst_name}
TriggerType @{pipeline().TriggerType}
TriggerId @{pipeline().TriggerId}
TriggerName @{pipeline().TriggerName}
TriggerTime @{pipeline().TriggerTime}
No_ParallelCopies @{activity('Copy-Table').output.usedParallelCopies}
copyDuration_in_secs @{activity('Copy-Table').output.copyDuration}
effectiveIntegrationRuntime @{activity('Copy-Table').output.effectiveIntegrationRuntime}
Source_Type @{activity('Copy-Table').output.executionDetails[0].source.type}
Sink_Type @{activity('Copy-Table').output.executionDetails[0].sink.type}
Execution_Status @{activity('Copy-Table').output.executionDetails[0].status}
ErrorCode @{activity('Copy-Table').error.errorCode}
ErrorDescription @{activity('Copy-Table').error.message}
ErrorLoggedTIme @utcnow()
FailureType @concat(activity('Copy-Table').error.message,'failureType:',activity('Copy-Table').error.failureType)

Run the Pipeline

Now that we have configured the pipeline, it is time to run the pipeline. As we can see from the debug mode Output log, one table succeeded and the other failed, as expected.

Details of the pipeline

Verify the Results

Finally, lets verify the results in the pipeline_log table. As we can see, the pipeline_log table has captured one log containing the source, MyTable.

pipeline log table

And the pipeline_errors table now has one record for MyErrorTable, along with detailed error codes, descriptions, messages and more.

pipeline error table

As a final check, when I navigate to the Synapse DW, I can see that both tables have been auto-created, despite the fact that one failed and one succeeded.

synapse

However, data was only loaded in MyTable since MyErrorTable contains no data.

table verified
Next Steps


Last Updated: 2021-01-20


get scripts

next tip button



About the author
MSSQLTips author Ron L'Esteve Ron L'Esteve is a seasoned Data Architect who holds an MBA and MSF. Ron has over 15 years of consulting experience with Microsoft Business Intelligence, data engineering, emerging cloud and big data technologies.

View all my tips



Comments For This Article





download





Recommended Reading

Azure Data Factory Pipeline Email Notification Part 1

Azure Data Factory Lookup Activity Example

Azure Data Factory ForEach Activity Example

Azure Data Factory Get Metadata Example

Azure Data Factory vs SSIS vs Azure Databricks














get free sql tips
agree to terms