Azure Data Factory ForEach Activity Example
Data integration flows often involve execution of the same tasks on many similar objects. A typical example could be - copying multiple files from one folder into another or copying multiple tables from one database into another. Azure Data Factory's (ADF) ForEach and Until activities are designed to handle iterative processing logic. We are going to discuss the ForEach activity in this article.
Azure Data Factory ForEach Activity
The ForEach activity defines a repeating control flow in your pipeline. This activity could be used to iterate over a collection of items and execute specified activities in a loop. This functionality is similar to SSIS's Foreach Loop Container.
ForEach activity's item collection can include outputs of other activities, pipeline parameters or variables of array type. This activity is a compound activity- in other words, it can include more than one activity.
Creating ForEach Activity in Azure Data Factory
In the previous two posts (here and here), we have started developing pipeline ControlFlow2_PL, which reads the list of tables from SrcDb database, filters out tables with the names starting with character 'P' and assigns results to pipeline variable FilteredTableNames. Here is the list of tables, which we get in this variable:
In this exercise, we will add ForEach activity to this pipeline, which will copy tables, listed in this variable into DstDb database.
Before we proceed further, Let's prepare target tables. First, Let's remove foreign key relationships between these tables in the destination database using below script, to prevent ForEach activity from failing:
ALTER TABLE [SalesLT].[Product] DROP CONSTRAINT [FK_Product_ProductCategory_ProductCategoryID] GO ALTER TABLE [SalesLT].[Product] DROP CONSTRAINT [FK_Product_ProductModel_ProductModelID] GO ALTER TABLE [SalesLT].[ProductCategory] DROP CONSTRAINT [FK_ProductCategory_ProductCategory_ParentProductCategoryID_ProductCategoryID] GO ALTER TABLE [SalesLT].[ProductModelProductDescription] DROP CONSTRAINT [FK_ProductModelProductDescription_ProductDescription_ProductDescriptionID] GO ALTER TABLE [SalesLT].[ProductModelProductDescription] DROP CONSTRAINT [FK_ProductModelProductDescription_ProductModel_ProductModelID] GO ALTER TABLE [SalesLT].[SalesOrderDetail] DROP CONSTRAINT [FK_SalesOrderDetail_Product_ProductID] GO
Next, let's create stored procedure to purge target tables, using below script. We'll need to call this procedure before each copy, to avoid PK errors:
CREATE PROCEDURE Usp_PurgeTargetTables AS BEGIN delete from [SalesLT].[Product] delete from [SalesLT].[ProductModelProductDescription] delete from [SalesLT].[ProductDescription] delete from [SalesLT].[ProductModel] delete from [SalesLT].[ProductCategory] END
Let's follow the below steps to add a ForEach activity to the ControlFlow2_PL pipeline:
Select pipeline ControlFlow2_PL, expand Iterations & Conditionals group on the Activities panel, drag-drop ForEach activity into the central panel and assign a name (I've named it as ForEach_AC):
Switch to the Settings tab and enter an expression @variables('FilteredTableNames') into Items text box:
Switch to Activities tab and click Add activity button:
Drag-drop copy activity to central panel (I've named it as CopyFiltered_AC), switch to Source tab and click '+New' button to start creating source dataset:
Next, create Azure SQL Db dataset, pointing to SrcDb database (I've named it as ASQLSrc_DS) and add dataset parameter TableName of string type:
Switch to Connection tab and enter an expression @dataset().TableName in the Table text box, which will ensure that table names for this dataset will be assigned dynamically, using dataset parameter:
Now that source dataset has been created, let's return to parent pipeline's design surface and enter an expression @item().name in the TableName text box. This expression will ensure that items from the ForEach activity's input list are mapped to its copy activity's source dataset:
Next, let's create parameterized Sink dataset for CopyFiltered_AC activity, using a similar method. Here is how your screen should look like:
Now that we've completed configuration of CopyFiltered_AC activity, let's switch to the parent pipeline's design surface, using navigation link at the top of the screen:
Next, let's add Stored Procedure activity (I've named it as SP_Purge_AC), pointing to the Usp_PurgeTargetTables procedure we created earlier and link it to Set_Variable_AC activity on Success criteria:
As the last configuration step, let's link activities SP_Purge_AC and ForEach_AC on Success criteria. This will ensure that target tables will be purged prior to the beginning of copy activities:
Finally, let's start the pipeline in Debug mode and examine execution logs in the Output window to ensure that five copy activities (one per each item from FilteredTableNames variable list) have finished successfully:
We can also examine the input of the ForEach activity, using the Input button and confirm that it received five items:
Since pipeline works as expected, we can publish all the changes now.
I have attached JSON scripts for this pipeline here.
Optional attributes of ForEach activity in Azure Data Factory
ForEach activity has few optional attributes, which allow controlling parallelism degree of its child activities. Here are those attributes:
- Sequential - This setting instructs ForEach activity to run its child activities in sequential order, one at a time
- Batch Count - This setting allows specifying parallelism degree of ForEach activity's child activities
Here is the screenshot with these attributes:
- Read: Azure Data Factory Lookup Activity Example
- Read: Azure Data Factory Filter Activity and Debugging Capabilities
- Read: Azure Data Factory Pipeline Variables
- Read: ForEach activity in Azure Data Factory
About the author
View all my tips