Azure Data Factory Until Activity Example

By:   |   Comments (3)   |   Related: > Azure Data Factory


Problem

In the previous post, Foreach activity, we discussed the ForEach activity designed to handle iterative processing logic, based on a collection of items. Azure Data Factory (ADF) also has another type of iteration activity, the Until activity which is based on a dynamic expression. We will discuss the Until activity, as well as the Wait activity which is frequently used alongside iteration activities.

Solution

Azure Data Factory Wait Activity

The Wait activity causes pipeline execution to pause for a specified period, before continuing with the execution of subsequent activities. This activity has a single parameter, waitTimeInSeconds, which identifies a wait period in seconds. We will be using this activity as part of the sample solution to demonstrate iteration logic in the next sections.

Azure Data Factory Until Activity

The Until activity is a compound activity. It executes its child activities in a loop, until one of the below conditions is met:

  • The condition it's associated with, evaluates to true
  • Its timeout period elapses

Like SSIS's For Loop Container, the Until activity's evaluation is based on a certain expression. However, there are some differences:

  • Unlike SSIS's For Loop Container, Until activity validates its condition at the end of the loop
  • Until activity continues the loop, while its condition evaluates to false and passes execution to subsequent activities, once this condition becomes true. SSIS's For Loop Container, on the contrary, continues its loop while its condition evaluates to true.

The Until activity's evaluation condition can include the outputs of other activities, pipeline parameters or variables.

When applied in combination with the Wait activity, Until activity allows creating loop condition to check the status of certain processes on a periodic basis, here are some examples:

  • Verify if new rows have been added into the database table
  • Verify if SQL job has been finished
  • Check if new files have been deposited into a certain folder

Azure Data Factory Data Integration Scenario

To demonstrate Until and Wait activities at work, I'm going to create an incremental copy flow with the following logic:

  • Perform initial synchronization of source and destination tables, using simple copy activity
  • Read from ID value, previously stored in the IDlookup table and assign it to LastID pipeline variable
  • On regular intervals, read last inserted ID from the source table and compare it against LastID variable, to see if the source table has received new rows
  • Upon arrival of the new rows, copy them into the destination table
  • Update ID lookup table with the latest available ID from the source table.

Prepare Source and Destination Databases

My data flow will read the DimCustomer table from AdventureWorksDW2016 database, hosted on my laptop and write into the same table within the DstDb Azure SQL database. Before proceeding with ADF development, let us prepare the source /destination tables.

First, let use query last inserted ID value from the source DimCustomer table, by executing the below script under the context of AdventureWorksDW2016 database:

USE [AdventureWorksDW2016]
GO
SELECT MAX([CustomerKey]) FROM [dbo].[DimCustomer]

This query has returned the value 29514 in my case.

Next, execute the below script in the same database to create stored procedure Usp_GetDeltaFromDimCustomer, which will be required for incremental copy purposes:

USE [AdventureWorksDW2016]
GO
CREATE procedure [dbo].[Usp_GetDeltaFromDimCustomer]
@lastId int
AS
Select * FROM DimCustomer WHERE CustomerKey>@lastId
GO

Please note the procedure Usp_GetDeltaFromDimCustomer has a  lastId parameter and is designed to read rows with the customer keys above this parameter value.

Next, let us switch to destination database DstDb and run below script to create IDLookup and DimCustomer tables:

CREATE TABLE [dbo].[IDLookup](
   [TableName] [varchar](100) NULL,
   [LastID] [int] NULL
) ON [PRIMARY]
GO
CREATE TABLE [dbo].[DimCustomer](
   [CustomerKey] [int] NOT NULL,
   [GeographyKey] [int] NULL,
   [CustomerAlternateKey] [nvarchar](15) NOT NULL,
   [Title] [nvarchar](8) NULL,
   [FirstName] [nvarchar](50) NULL,
   [MiddleName] [nvarchar](50) NULL,
   [LastName] [nvarchar](50) NULL,
   [NameStyle] [bit] NULL,
   [BirthDate] [date] NULL,
   [MaritalStatus] [nchar](1) NULL,
   [Suffix] [nvarchar](10) NULL,
   [Gender] [nvarchar](1) NULL,
   [EmailAddress] [nvarchar](50) NULL,
   [YearlyIncome] [money] NULL,
   [TotalChildren] [tinyint] NULL,
   [NumberChildrenAtHome] [tinyint] NULL,
   [EnglishEducation] [nvarchar](40) NULL,
   [SpanishEducation] [nvarchar](40) NULL,
   [FrenchEducation] [nvarchar](40) NULL,
   [EnglishOccupation] [nvarchar](100) NULL,
   [SpanishOccupation] [nvarchar](100) NULL,
   [FrenchOccupation] [nvarchar](100) NULL,
   [HouseOwnerFlag] [nchar](1) NULL,
   [NumberCarsOwned] [tinyint] NULL,
   [AddressLine1] [nvarchar](120) NULL,
   [AddressLine2] [nvarchar](120) NULL,
   [Phone] [nvarchar](20) NULL,
   [DateFirstPurchase] [date] NULL,
   [CommuteDistance] [nvarchar](15) NULL,
 CONSTRAINT [PK_DimCustomer_CustomerKey] PRIMARY KEY CLUSTERED 
([CustomerKey] ASC)) ON [PRIMARY]
GO

Let's execute the below script to insert the last ID value obtained earlier, into the IDLookup table:

INSERT INTO [dbo].[IDLookup] ([TableName],[LastID]) VALUES ('DimCustomer', 29514)

Finally, let us create stored procedure Usp_UpdateIDLookup, to update IDLookup table, using the below script:

CREATE PROCEDURE [dbo].[Usp_UpdateIDLookup]
(@TableName varchar(100),@LastId int)
AS
BEGIN
IF EXISTS (SELECT * FROM [dbo].[IDLookup] WHERE TableName=@TableName)
  UPDATE [dbo].[IDLookup] SET [LastID]=@LastId WHERE TableName=@TableName
ELSE
  INSERT INTO [dbo].[IDLookup]  VALUES (@TableName, @LastId )  
END
GO

Create an ADF Pipeline

Now that we have prepared the source and destination databases, we can proceed with building ADF data flow, as follows:

Create a new pipeline and add pipeline variables LastID and CurrentID of string type. We'll use LastID to store values from IDLookup table and CurrentID to store latest ID obtained from the source table:

azure data factory until activity

Next, add a Lookup activity to read previously stored ID value from the IDLookup table (I've named it as LastID_Lookup_AC), with the below settings:

azure data factory until activity

Next, add a Set Variable activity (I have named it as Set_LastID_AC), to assign the value obtained from the LastID_Lookup_AC activity to the LastID variable and we can use an expression @string(activity('LastID_Lookup_AC').output.firstRow.LastID) to achieve that. Here's how your screen should look at this point:

azure data factory until activity

Next, add a Copy activity for the initial source and destination synchronization for DimCustomer table (I have named it as Init_Sync_AC). Please note, I've added a purge query to its sink properties, to prevent it from failing due to PK errors:

azure data factory until activity

Drag and drop an Until activity (I've named it as Until_AC), link it to the Success ends of the Set_LastID_AC and Init_Sync_AC activities and enter an expression @greater(int(variables('CurrentID')),int(variables('LastID'))) to its expression text box. This expression evaluates to true, if CurrentID variable is greater than LastID variable (the variable CurrentID will be populated by subsequent activities):

azure data factory until activity

Switch to Activities tab and click the Add Activity button, to start adding child activities. First, let's add a Lookup activity (I've named it as GetCurrentID_AC), to read the latest ID from the source table. This activity will be connected to dataset DimCustomer_SRC_DS, pointing to my local AdventureWorksDW2016 database and uses the below query as a source

SELECT MAX([CustomerKey]) AS CurrentId FROM [dbo].[DimCustomer]

Here is the screenshot, with the required settings:

azure data factory until activity

Next, add a Set Variable activity (I've named it as SetCurrentID_AC), to assign the value obtained from the GetCurrentID_AC activity to CurrentID pipeline variable - we can use an expression @string(activity('GetCurrentID_AC').output.firstRow.CurrentId) to achieve that. Here's the screenshot at this point:

azure data factory until activity

We can complete this chain by adding a Wait activity. Let's drag and drop it from the General group, connect it to the Success end of the SetCurrentID_AC activity and set its wait time setting to 30 seconds:

azure data factory until activity

Now that we're done with the child components of the Until_AC activity, let's use the navigation link to return to the parent pipeline's design surface:

azure data factory until activity

Next, let's clone (copy/paste) Init_Sync_AC activity, link it to the Success end of the Until_AC activity, specify its source query as Usp_GetDeltaFromDimCustomer procedure, to ensure incremental data read and assign an expression @int(variables('LastID')) to its lastId parameter. This will ensure that this stored procedure returns rows with the ID's greater than the value of LastID variable:

azure data factory until activity

Let's finalize the pipeline design, by adding a stored procedure to update IDLookup table with the last ID, obtained from the source table. Please note, I've used an expression @int(variables('CurrentID')) to extract value of CurrentID variable:

azure data factory until activity

Finally, let us start the pipeline in Debug mode, let it run for few minutes and examine the execution logs in the Output window:

azure data factory until activity

As you can notice from the logs, once the initial steps complete, the iteration logic started which checks the source table for new rows on a periodic basis. This iteration will continue until Until_AC activity's condition becomes true or its timeout value is reached.

To test the incremental logic I have outlined earlier, let's insert some rows into the source table, using the below script:

Insert into DimCustomer
SELECT top 10 [GeographyKey]
      ,[CustomerAlternateKey]+'AAA'
      ,[Title]
      ,[FirstName]
      ,[MiddleName]
      ,[LastName]
      ,[NameStyle]
      ,[BirthDate]
      ,[MaritalStatus]
      ,[Suffix]
      ,[Gender]
      ,[EmailAddress]
      ,[YearlyIncome]
      ,[TotalChildren]
      ,[NumberChildrenAtHome]
      ,[EnglishEducation]
      ,[SpanishEducation]
      ,[FrenchEducation]
      ,[EnglishOccupation]
      ,[SpanishOccupation]
      ,[FrenchOccupation]
      ,[HouseOwnerFlag]
      ,[NumberCarsOwned]
      ,[AddressLine1]
      ,[AddressLine2]
      ,[Phone]
      ,[DateFirstPurchase]
      ,[CommuteDistance]
  FROM [AdventureWorksDW2016].[dbo].[DimCustomer]

Soon after this row insertion, our iteration finished and the pipeline execution continued with subsequent activities, as expected:

azure data factory until activity

I have attached JSON scripts for this pipeline here, for your reference.

Azure Data Factory Timeout Setting

As I mentioned earlier, the Until activity has a timeout setting which prevents pipelines from long, needless execution. The timeout setting is set to 7 days by default, but can be changed, if required:

azure data factory until activity
Next Steps


sql server categories

sql server webinars

subscribe to mssqltips

sql server tutorials

sql server white papers

next tip



About the author
MSSQLTips author Fikrat Azizov Fikrat Azizov has been working with SQL Server since 2002 and has earned two MCSE certifications. He’s currently working as a Solutions Architect at Slalom Canada.

This author pledges the content of this article is based on professional experience and not AI generated.

View all my tips



Comments For This Article




Tuesday, June 14, 2022 - 2:15:01 AM - Amol Back To Top (90158)
I have created a pipeline with "Until" activity to execute API call until API call return response code as 6. This response code is assigned to a variable. If the variable value is equal to 6 the loop will exit.

Sometime API call returns the response code 6 within 2-3 minutes (depends on number of orders to process) in that case "Until" activity takes 1 minute to exit the loop after receiving the response 6. Sometime API returns response code 6 after one hour but "Until" activity break the loop after 20 minutes. As the API call execute for more time the time taken to exit the loop also increases.

Sometimes all API calls runs for 8 hours and even the last activity inside the loop successfully got executed, but the "Until" activity runs for another 2 hours without doing nothing. any solution for this would be appreciated.

Saturday, October 26, 2019 - 5:44:58 PM - Fikrat Back To Top (82908)

Hi Avik,

Yes, the method you described here would certainly serve the purpose of creating incremental copy logic. However, my purpose in this article was to demonstrate Until component in action, although I should admit- this is not the simplest way to create incremental copy logic.

Hope that helps,

Fikrat


Thursday, October 24, 2019 - 10:43:28 AM - Avik Sadhu Back To Top (82886)

It's a good article but just wondering isn't it add more steps for an incremental load activity to the dedstination dimesntion table? If I use a look up and unmatched row I insert and matched row I insert to a temp table in destination DB and then do a set based update from there it'll serve the same purpose. Or even we use a merge keyword in a stored procedue and used in inside a stored procedure task. Kindly rectify me if I'm wrong















get free sql tips
agree to terms