Build Azure Data Factory Pipeline Dependencies
In these series of posts, I am going to explore Azure Data Factory (ADF), compare its features against SQL Server Integration Services (SSIS) and show how to use it towards real-life data integration problems. In previous posts, we have created a simple pipeline containing a single activity. However, typical ETL jobs for data warehouses often involve multiple interrelated data flows, uploading data into various dimension and fact tables, where some tables need to be uploaded before the others. In this post, we will be exploring the ways to build pipeline dependencies.
Azure Data Factory Pipeline Dependencies
An ADF pipeline can contain more than one activity and those activities could be configured to be run in a certain order. Moreover, just like in SSIS, you can set precedence constraints to determine which activity would be executed next, depending on the completion status of the current activity. There are four dependency conditions available for activities, I will explain them using the below example:
- Success - If activity C has a Success dependency condition on activity A, it only runs, if activity A succeeds.
- Failure - If activity D has Failure dependency condition on activity A, it only runs, if activity A fails.
- Skipped - If activity C has Success dependency condition on activity A and activity F has a Skipped dependency on activity C, activity F will only run, if activity C is skipped, due to a failure condition on activity A.
- Completion - If activity E has a Completion dependency condition on activity A, activity E will run regardless of the completion status of activity A.
Creating Azure Data Factory Pipeline Dependencies
In this exercise, I will create the following ADF objects to demonstrate activity dependencies:
- Hosting pipeline - I have created a sample pipeline and named it as ExploreDependencies.
- Activity 'Copy DimCustomer' - This is a simple copy activity, which will copy DimCustomer table from AdventureWorksDW2016 database on my local machine to DstDb Azure SQL database. This table has PK constraint, which means that unless we purge it before each data copy, the activity 'Copy DimCustomer' should fail when running repeatedly.
- Stored Procedure activity - I have created stored procedure Usp_LogActivityStatus, which stores the pipeline name, activity name, dependency type and execution time in DstDb database. I will create four copies of Stored Procedure activity, all pointing to this stored procedure, but with different parameters.
- Activity dependencies - Finally, I will create dependencies between activity 'Copy DimCustomer' and all its dependents.
Let's first create the target DimCustomer table, activity log table ActivityLogs and related stored procedure Usp_LogActivityStatus in DstDb database, using the below scripts:
CREATE TABLE [dbo].[DimCustomer]( [CustomerKey] [int] IDENTITY(1,1) NOT NULL, [GeographyKey] [int] NULL, [CustomerAlternateKey] [nvarchar](15) NOT NULL, [Title] [nvarchar](8) NULL, [FirstName] [nvarchar](50) NULL, [MiddleName] [nvarchar](50) NULL, [LastName] [nvarchar](50) NULL, [NameStyle] [bit] NULL, [BirthDate] [date] NULL, [MaritalStatus] [nchar](1) NULL, [Suffix] [nvarchar](10) NULL, [Gender] [nvarchar](1) NULL, [EmailAddress] [nvarchar](50) NULL, [YearlyIncome] [money] NULL, [TotalChildren] [tinyint] NULL, [NumberChildrenAtHome] [tinyint] NULL, [EnglishEducation] [nvarchar](40) NULL, [SpanishEducation] [nvarchar](40) NULL, [FrenchEducation] [nvarchar](40) NULL, [EnglishOccupation] [nvarchar](100) NULL, [SpanishOccupation] [nvarchar](100) NULL, [FrenchOccupation] [nvarchar](100) NULL, [HouseOwnerFlag] [nchar](1) NULL, [NumberCarsOwned] [tinyint] NULL, [AddressLine1] [nvarchar](120) NULL, [AddressLine2] [nvarchar](120) NULL, [Phone] [nvarchar](20) NULL, [DateFirstPurchase] [date] NULL, [CommuteDistance] [nvarchar](15) NULL, CONSTRAINT [PK_DimCustomer_CustomerKey] PRIMARY KEY CLUSTERED ([CustomerKey] ASC ) ON [PRIMARY] ) ON [PRIMARY] GO --------------------------------------------- CREATE TABLE [dbo].[ActivityLogs]( [PipelineName] [varchar](100) NULL, [ACName] [varchar](100) NULL, [ExecStatus] [varchar](30) NULL, [ExecTime] [datetime] NULL ) ON [PRIMARY] GO --------------------------------------- create procedure Usp_LogActivityStatus (@PipelineName varchar(100), @ACName varchar(100), @ExecStatus varchar(30)) AS BEGIN INSERT INTO [dbo].[ActivityLogs] VALUES(@PipelineName,@ACName,@ExecStatus,GetDate()) END GO
Here are the steps required to create activities and dependencies between them:
Step 1 - Let's add a copy activity, named 'Copy_DimCustomer_AC' and configure it to copy DimCustomer table from on-premises machine to Azure SQL db (see my previous post Transfer Data to the Cloud Using Azure Data Factory for details):
Step 2 - Next, let's add a stored procedure activity, named 'Log_success_status_AC' and point it to stored procedure Usp_LogActivityStatus (see my previous post Azure Data Factory Stored Procedure Activity Transformation Activities for details on how to create Stored Procedure activity). I've assigned static string values 'Log_success_status_AC' and 'Success' to its first two parameters and dynamic expression @pipeline().Pipeline to the third parameter:
Step 3 - Next, let's copy 'Log_success_status_AC' activity, using the copy command and paste it using the paste command on adjacent empty space within the central panel:
Step 4 - I've named a new activity as Log_failure_status_AC and assigned values 'Log_failure_status_AC', 'Failure' and @pipeline().Pipeline to its parameters:
Step 5 - ADF adds a Success endpoint to any activity by default (see green point). To add other dependency endpoints, select activity 'Copy_DimCustomer_AC', click on '+' button and select required dependency type. Let's add Failure and Completion endpoints to this activity:
Step 6 - Now that we have dependency endpoints, let's click on the success endpoint (green) and drag-drop it onto Log_success_status_AC activity and then link the failure endpoint (orange) to the Log_failure_status_AC activity using the same method:
Step 7 - By now, we know how to duplicate activities and create dependencies, so let's create two more and link them to existing activities as follows:
- Activity Log_skipped_status_AC has Skipped dependency on activity Log_success_status_AC and has parameter values 'Log_skipped_status_AC' , 'Skipped' and '@pipeline().Pipeline'
- Activity Log_completed_status_AC has Completion dependency on activity Copy_DimCustomer_AC and has parameter values 'Log_completed_status_AC' , 'Completion' and '@pipeline().Pipeline'
Here is how your screen should look at the end of this exercise:
Finally, let's publish the changes.
Azure Data Factory Pipeline Execution and Analysis of Results
Let's kick-off the pipeline manually that we just created and open the monitoring page to see the execution results:
As you can see from execution details, activity Copy_DimCustomer_AC has been followed by Log_success_status_AC and Log_completed_status_AC activities, which is what we expected:
Here is the ActivityLogs table content:
Let's kick-off this pipeline again - I expect that activity Copy_DimCustomer_AC would fail this time, because of PK errors and I am curious to see what activities will run and fail. As you can see from the execution details, activity Copy_DimCustomer_AC has indeed failed, followed by successful executions of Log_completed_status_AC, Log_skipped_status_AC and Log_failure_status_AC activities, which is what we expected in a failure case:
Here is the ActivityLogs table content with all logs:
Azure Data Factory Multiple Dependencies - 'AND' or 'OR' Logic
I am curious to see how ADF activity dependencies work when an activity has multiple dependencies.
To investigate this, I added an extra activity Log_Completed_Success_AC and made it dependent on Log_Completed_AC and Log_Success_AC activities on success conditions, as shown below:
Before the next execution, I'll purge the target table using a truncate table [dbo].[DimCustomer] command, to ensure that both activities that affect activity Log_Completed_Success_AC are successful.
As you can see from the execution details, activity Log_Completed_Success_AC ran after a successful end for both activities it depended on:
Now, let us run the pipeline again, this time with the data on target DimCustomer table. As you can see from the execution details, activity Log_Completed_Success_AC did not run this time, because one of the activities it depends on, did not succeed:
This shows that an activity, depending on multiple activities would be invoked only when all of them succeed. This functionality is different from SSIS precedence constraints functionality which allows configuration based on a logical OR condition.
I have attached the JSON code of the pipeline ExploreDependencies in case you want to review it.
- Read: Transfer Data to the Cloud Using Azure Data Factory
- Read: Azure Data Factory Stored Procedure Activity Transformation Activities
- Read: Branching and chaining activities in a Data Factory pipeline
About the author
View all my tips