By: Fikrat Azizov | Comments (3) | Related: > Azure Data Factory
Problem
In the previous post, Foreach activity, we discussed the ForEach activity designed to handle iterative processing logic, based on a collection of items. Azure Data Factory (ADF) also has another type of iteration activity, the Until activity which is based on a dynamic expression. We will discuss the Until activity, as well as the Wait activity which is frequently used alongside iteration activities.
Solution
Azure Data Factory Wait Activity
The Wait activity causes pipeline execution to pause for a specified period, before continuing with the execution of subsequent activities. This activity has a single parameter, waitTimeInSeconds, which identifies a wait period in seconds. We will be using this activity as part of the sample solution to demonstrate iteration logic in the next sections.
Azure Data Factory Until Activity
The Until activity is a compound activity. It executes its child activities in a loop, until one of the below conditions is met:
- The condition it's associated with, evaluates to true
- Its timeout period elapses
Like SSIS's For Loop Container, the Until activity's evaluation is based on a certain expression. However, there are some differences:
- Unlike SSIS's For Loop Container, Until activity validates its condition at the end of the loop
- Until activity continues the loop, while its condition evaluates to false and passes execution to subsequent activities, once this condition becomes true. SSIS's For Loop Container, on the contrary, continues its loop while its condition evaluates to true.
The Until activity's evaluation condition can include the outputs of other activities, pipeline parameters or variables.
When applied in combination with the Wait activity, Until activity allows creating loop condition to check the status of certain processes on a periodic basis, here are some examples:
- Verify if new rows have been added into the database table
- Verify if SQL job has been finished
- Check if new files have been deposited into a certain folder
Azure Data Factory Data Integration Scenario
To demonstrate Until and Wait activities at work, I'm going to create an incremental copy flow with the following logic:
- Perform initial synchronization of source and destination tables, using simple copy activity
- Read from ID value, previously stored in the IDlookup table and assign it to LastID pipeline variable
- On regular intervals, read last inserted ID from the source table and compare it against LastID variable, to see if the source table has received new rows
- Upon arrival of the new rows, copy them into the destination table
- Update ID lookup table with the latest available ID from the source table.
Prepare Source and Destination Databases
My data flow will read the DimCustomer table from AdventureWorksDW2016 database, hosted on my laptop and write into the same table within the DstDb Azure SQL database. Before proceeding with ADF development, let us prepare the source /destination tables.
First, let use query last inserted ID value from the source DimCustomer table, by executing the below script under the context of AdventureWorksDW2016 database:
USE [AdventureWorksDW2016] GO SELECT MAX([CustomerKey]) FROM [dbo].[DimCustomer]
This query has returned the value 29514 in my case.
Next, execute the below script in the same database to create stored procedure Usp_GetDeltaFromDimCustomer, which will be required for incremental copy purposes:
USE [AdventureWorksDW2016] GO CREATE procedure [dbo].[Usp_GetDeltaFromDimCustomer] @lastId int AS Select * FROM DimCustomer WHERE CustomerKey>@lastId GO
Please note the procedure Usp_GetDeltaFromDimCustomer has a lastId parameter and is designed to read rows with the customer keys above this parameter value.
Next, let us switch to destination database DstDb and run below script to create IDLookup and DimCustomer tables:
CREATE TABLE [dbo].[IDLookup]( [TableName] [varchar](100) NULL, [LastID] [int] NULL ) ON [PRIMARY] GO CREATE TABLE [dbo].[DimCustomer]( [CustomerKey] [int] NOT NULL, [GeographyKey] [int] NULL, [CustomerAlternateKey] [nvarchar](15) NOT NULL, [Title] [nvarchar](8) NULL, [FirstName] [nvarchar](50) NULL, [MiddleName] [nvarchar](50) NULL, [LastName] [nvarchar](50) NULL, [NameStyle] [bit] NULL, [BirthDate] [date] NULL, [MaritalStatus] [nchar](1) NULL, [Suffix] [nvarchar](10) NULL, [Gender] [nvarchar](1) NULL, [EmailAddress] [nvarchar](50) NULL, [YearlyIncome] [money] NULL, [TotalChildren] [tinyint] NULL, [NumberChildrenAtHome] [tinyint] NULL, [EnglishEducation] [nvarchar](40) NULL, [SpanishEducation] [nvarchar](40) NULL, [FrenchEducation] [nvarchar](40) NULL, [EnglishOccupation] [nvarchar](100) NULL, [SpanishOccupation] [nvarchar](100) NULL, [FrenchOccupation] [nvarchar](100) NULL, [HouseOwnerFlag] [nchar](1) NULL, [NumberCarsOwned] [tinyint] NULL, [AddressLine1] [nvarchar](120) NULL, [AddressLine2] [nvarchar](120) NULL, [Phone] [nvarchar](20) NULL, [DateFirstPurchase] [date] NULL, [CommuteDistance] [nvarchar](15) NULL, CONSTRAINT [PK_DimCustomer_CustomerKey] PRIMARY KEY CLUSTERED ([CustomerKey] ASC)) ON [PRIMARY] GO
Let's execute the below script to insert the last ID value obtained earlier, into the IDLookup table:
INSERT INTO [dbo].[IDLookup] ([TableName],[LastID]) VALUES ('DimCustomer', 29514)
Finally, let us create stored procedure Usp_UpdateIDLookup, to update IDLookup table, using the below script:
CREATE PROCEDURE [dbo].[Usp_UpdateIDLookup] (@TableName varchar(100),@LastId int) AS BEGIN IF EXISTS (SELECT * FROM [dbo].[IDLookup] WHERE TableName=@TableName) UPDATE [dbo].[IDLookup] SET [LastID]=@LastId WHERE TableName=@TableName ELSE INSERT INTO [dbo].[IDLookup] VALUES (@TableName, @LastId ) END GO
Create an ADF Pipeline
Now that we have prepared the source and destination databases, we can proceed with building ADF data flow, as follows:
Create a new pipeline and add pipeline variables LastID and CurrentID of string type. We'll use LastID to store values from IDLookup table and CurrentID to store latest ID obtained from the source table:
Next, add a Lookup activity to read previously stored ID value from the IDLookup table (I've named it as LastID_Lookup_AC), with the below settings:
Next, add a Set Variable activity (I have named it as Set_LastID_AC), to assign the value obtained from the LastID_Lookup_AC activity to the LastID variable and we can use an expression @string(activity('LastID_Lookup_AC').output.firstRow.LastID) to achieve that. Here's how your screen should look at this point:
Next, add a Copy activity for the initial source and destination synchronization for DimCustomer table (I have named it as Init_Sync_AC). Please note, I've added a purge query to its sink properties, to prevent it from failing due to PK errors:
Drag and drop an Until activity (I've named it as Until_AC), link it to the Success ends of the Set_LastID_AC and Init_Sync_AC activities and enter an expression @greater(int(variables('CurrentID')),int(variables('LastID'))) to its expression text box. This expression evaluates to true, if CurrentID variable is greater than LastID variable (the variable CurrentID will be populated by subsequent activities):
Switch to Activities tab and click the Add Activity button, to start adding child activities. First, let's add a Lookup activity (I've named it as GetCurrentID_AC), to read the latest ID from the source table. This activity will be connected to dataset DimCustomer_SRC_DS, pointing to my local AdventureWorksDW2016 database and uses the below query as a source
SELECT MAX([CustomerKey]) AS CurrentId FROM [dbo].[DimCustomer]
Here is the screenshot, with the required settings:
Next, add a Set Variable activity (I've named it as SetCurrentID_AC), to assign the value obtained from the GetCurrentID_AC activity to CurrentID pipeline variable - we can use an expression @string(activity('GetCurrentID_AC').output.firstRow.CurrentId) to achieve that. Here's the screenshot at this point:
We can complete this chain by adding a Wait activity. Let's drag and drop it from the General group, connect it to the Success end of the SetCurrentID_AC activity and set its wait time setting to 30 seconds:
Now that we're done with the child components of the Until_AC activity, let's use the navigation link to return to the parent pipeline's design surface:
Next, let's clone (copy/paste) Init_Sync_AC activity, link it to the Success end of the Until_AC activity, specify its source query as Usp_GetDeltaFromDimCustomer procedure, to ensure incremental data read and assign an expression @int(variables('LastID')) to its lastId parameter. This will ensure that this stored procedure returns rows with the ID's greater than the value of LastID variable:
Let's finalize the pipeline design, by adding a stored procedure to update IDLookup table with the last ID, obtained from the source table. Please note, I've used an expression @int(variables('CurrentID')) to extract value of CurrentID variable:
Finally, let us start the pipeline in Debug mode, let it run for few minutes and examine the execution logs in the Output window:
As you can notice from the logs, once the initial steps complete, the iteration logic started which checks the source table for new rows on a periodic basis. This iteration will continue until Until_AC activity's condition becomes true or its timeout value is reached.
To test the incremental logic I have outlined earlier, let's insert some rows into the source table, using the below script:
Insert into DimCustomer SELECT top 10 [GeographyKey] ,[CustomerAlternateKey]+'AAA' ,[Title] ,[FirstName] ,[MiddleName] ,[LastName] ,[NameStyle] ,[BirthDate] ,[MaritalStatus] ,[Suffix] ,[Gender] ,[EmailAddress] ,[YearlyIncome] ,[TotalChildren] ,[NumberChildrenAtHome] ,[EnglishEducation] ,[SpanishEducation] ,[FrenchEducation] ,[EnglishOccupation] ,[SpanishOccupation] ,[FrenchOccupation] ,[HouseOwnerFlag] ,[NumberCarsOwned] ,[AddressLine1] ,[AddressLine2] ,[Phone] ,[DateFirstPurchase] ,[CommuteDistance] FROM [AdventureWorksDW2016].[dbo].[DimCustomer]
Soon after this row insertion, our iteration finished and the pipeline execution continued with subsequent activities, as expected:
I have attached JSON scripts for this pipeline here, for your reference.
Azure Data Factory Timeout Setting
As I mentioned earlier, the Until activity has a timeout setting which prevents pipelines from long, needless execution. The timeout setting is set to 7 days by default, but can be changed, if required:
Next Steps
- Read: Azure Data Factory ForEach Activity Example
- Read: Azure Data Factory Lookup Activity Example
- Read: Azure Data Factory Pipeline Variables
- Read: Azure Data Factory Stored Procedure Activity Transformation Activities
- Read more about the Until activity
- Read more about the For Loop Container
About the author
This author pledges the content of this article is based on professional experience and not AI generated.
View all my tips