Implement Incremental Ingestion for Large Fact Tables on Lakehouse using Synapse Mapping Data Flow - Part 2

By:   |   Updated: 2022-04-21   |   Comments   |   Related: > Azure Synapse Analytics


   Free MSSQLTips webinar - "Efficient Monitoring and Management of SQL Server" (click to register)

Problem

This tip is part of a series of posts dedicated to the building of end-to-end Lakehouse solutions leveraging Azure Synapse Analytics. In the previous post, Incremental Ingestion of Fact Tables on Lakehouse using Azure Synapse Analytics Mapping Data Flow - Part 1, we've explored the ways to build incremental data upload logic and created a mapping flow to fetch the data for two fact tables for a specified date range. In this tip, we'll create a data integration pipeline that divides historical data into smaller pieces based on certain time intervals and calls the mapping flow for each interval.

Solution

Data preparation

Our incremental logic will be based on ModifiedDate column, and because original data coming with the Adventure Works has single value, let's connect to Azure SQL DB and run below query, to introduce a date variations spanning a few weeks:

UPDATE [SalesLT].[SalesOrderHeader] SET ModifiedDate='2022-01-01' WHERE SalesOrderID<71831;
UPDATE [SalesLT].[SalesOrderHeader] SET ModifiedDate='2022-01-08' WHERE SalesOrderID between 71831 AND 71901
UPDATE [SalesLT].[SalesOrderHeader] SET ModifiedDate='2022-01-17' WHERE SalesOrderID between 71902 AND 71920
UPDATE [SalesLT].[SalesOrderHeader] SET ModifiedDate='2022-01-25' WHERE SalesOrderID >71920

Let's also run Azure SQL DB to Raw and Raw to Bronze layer pipelines we created in earlier posts to fetch the data to the Bronze layer. Next, run the following SQL script to ensure that the Bronze table has received the latest updates:

SELECT
    TOP 100 *
FROM
    OPENROWSET(
        BULK 'https://synstg.blob.core.windows.net/syn-fs/delta/bronze/SalesOrderHeader/',
        FORMAT = 'DELTA'
    ) AS [result]
order by modifieddate desc

Here's is the screenshot with the results:

sql script

Figure 1

Data Integration Pipeline Design

Let's create a data integration pipeline and add two parameters of the string type- WindowStart and WindowEnd. These parameters needed to receive trigger window timestamps. Here's the pipeline with the parameters:

data integration pipeline

Figure 2

Next, add a data flow activity and point it to the data flow BronzeSilverSalesOrders created in the previous post:

add a data flow activity

Figure 3

Next, navigate to the Parameters tab and provide the following pipeline expressions as the parameter values:

  • WindowStart -@pipeline().parameters.MyStartDate
  • WindowEnd - @pipeline().parameters.MyEndDate

These expressions will pass the pipeline parameters to the mapping data flow.

Here is the screenshot:

pipeline parameters

Figure 4

Pipeline Orchestration

As mentioned earlier, our purpose is to design the orchestration that could handle both historical and future time periods. Out of various of trigger types available in Azure Synapse Analytics only the tumbling triggers allow creating backdated window ranges. Because my historical data spans several weeks in January 2022, I'm going to create a weekly trigger starting at January 1, 2022.

So, let's create a trigger, as follows:

pipeline orchestration

Figure 5

Select tumbling trigger type, set starting date as ‘01/01/2022', select hourly frequency, enter 168 hours and ensure that Start trigger on creation checkbox is selected, as follows:

tumbling trigger type

Figure 6

Expand the Advanced panel and limit the max concurrency setting to one- this will ensure that multiple instances pipeline does not run simultaneously. This is important because Delta Lake tables do not allow concurrent writes. Here's the screenshot:

tumbling trigger type

Figure 7

Confirm your changes, and move to the trigger parameters screen. Enter following expressions for the pipeline parameters:

  • MyStartDate - @formatDateTime(trigger().outputs.windowStartTime,'yyyy-MM-dd')
  • MyEndDate - @formatDateTime(trigger().outputs.windowEndTime,'yyyy-MM-dd')

These expressions convert the trigger's window boundaries into string values and assign them to the corresponding pipeline parameters. Save the trigger settings, commit and publish your changes.

Once the trigger is published, we can navigate to the Monitor page and find multiple instances of this pipeline running sequentially, as follows:

find instances of this pipeline running sequentially

Figure 8

We can also see details of each execution, by opening the corresponding link and going into data flow details:

details of each execution

Figure 9

My data flow execution results for Jan 1 - Jan 8 window looks like this:

data flow execution results

Figure 10

Finally, let's validate the data in the destination Delta table, by running following SQL script:

SELECT
    TOP 100 *
FROM
    OPENROWSET(
        BULK 'https://synstg.blob.core.windows.net/syn-fs/delta/silver/SalesOrderHeader/',
        FORMAT = 'DELTA'
    ) AS [result]

Here is the screenshot with the results:

sql script

Figure 11

This concludes the data pipeline design to allow incremental ingestion of the historical data, and this trigger will also kick off on a weekly interval for future periods.

Next Steps



Get Started Now - Click here to get your free 14 day trial of SolarWinds Database Insights




get scripts

next tip button



About the author
MSSQLTips author Fikrat Azizov Fikrat Azizov has been working with SQL Server since 2002 and has earned two MCSE certifications. Hes currently working as a Solutions Architect at Slalom Canada.

View all my tips


Article Last Updated: 2022-04-21

Comments For This Article





download














get free sql tips
agree to terms