Build real-time data pipelines with Azure Event Hub, Stream Analytics and Cosmos DB
By: Fikrat Azizov | Updated: 2021-03-23 | Comments | Related: > Azure Cosmos DB
Azure Cosmos DB is a low-latency, highly available and globally distributed NoSQL database, that Microsoft recommends as a database platform for high-velocity data from IoT devices. However, real-time data ingestion requires services like Azure Event Hubs, Azure IoT Hubs to serve as a gateway for streaming data. How do you build a real-time data ingestion pipeline from Azure Event Hubs into Azure Cosmos DB, with minimal coding?
Overview of Azure Cosmos DB, Azure Event Hubs and Azure Stream Analytics
Azure Cosmos DB is Microsoft’s managed NoSQL service, which supports non-tabular data models, like JSON, key-value, graph, and column-family type documents. It provides globally distributed, scalable, and low-latency data service that can serve high-concurrency and fast applications. Azure Cosmos DB guarantees low-latency read and writes, which allows building flexible near-real-time applications, based on the network of connected databases around the world (you can read more about Azure Cosmos DB here).
Azure Event Hubs is a big data streaming platform and event ingestion service. It’s one of the recommended services to receive, transform and redirect fast-arriving streaming data (see here for more details). Although Event Hubs can direct its output into few Azure services, Azure Cosmos DB is not among its outputs. Therefore, some intermediate service needs to be built, to serve as a bridge between these two services, and Azure Stream Analytics can be used in this role. What makes Azure Stream Analytics attractive as the integration component for the streaming data, is its powerful SQL-based query engine and its ability to write into multiple Azure services, including Azure Cosmos DB, Azure Synapse Analytics, Azure SQL DB, storage, etc. (see this article for more details).
In this tip, we will build a solution that ingests the Twitter feeds from Twitter API’s into Azure Event Hubs, and then deliver them into Azure Cosmos DB, using Azure Stream Analytics.
Create a data ingestion pipeline into Azure Event Hubs
I’ll use the TwitterClientCore application, described in this Microsoft article to ingest data into Azure Event Hubs. This application listens to the Twitter feeds related to a configurable list of keywords, like Microsoft, Azure, Skype, etc.
Please create the Azure Cosmos DB, Event Hubs, Azure Stream Analytics and Twitter accounts and follow the steps in this Microsoft article, to configure, compile and start the TwitterClientCore application.
Once the application started, it will receive the feeds and display the number of events sent to the Event Hubs, as follows:
At this point, we will be able to observe incoming traffic on Event Hubs’ dashboards:
Configure Azure Cosmos DB container
Open your Azure Cosmos DB account and add a container, using the Add Container button:
Provide the database and container names and partitioning field. Notice that the partitioning field should include the name of the existing column from the data source (I have selected the lang column, which is part of Twitter feed payloads). Select the Analytical store option, which would allow us to analyze the data from Synapse Analytics in future (see Explore Azure Cosmos Databases with Azure Synapse Analytics for more info on this option). Here is the screenshot with the container settings:
Configure Azure Stream Analytics
The Azure Stream Analytics job requires an input, an output, and a SQL query to transform the data.
Let us start by creating an Event Hub input. Open the Azure Stream Analytics account, navigate to the Inputs tab, and add a new Event Hub input:
Provide the input name (twitter-eh in my example), select Event Hub namespace/name and the policy with the Send privileges. Ensure that the GZip option is selected, as the compression method. Here is the screenshot:
Next, navigate to the Outputs tab, add the Cosmos DB output:
Select the Cosmos DB account name, database and provide the container name we created earlier, as follows:
Next, navigate to the Query tab, add the following query (replace the input/output names, to match your endpoint names):
SELECT * INTO [cosmosdb-output] FROM [twitter-eh] PARTITION BY [lang]
Notice that we have included a partitioning column, as the Cosmos DB container requires this parameter. The Input preview table in the center of the screen, will show a sample data from the Event Hub:
Test the query and save it, using the buttons, illustrated in Figure 9.
Next, navigate to the Overview page, and start the job:
The job status will be shown, under the Start button. It will take a few minutes for the job to move from the Starting stage to the Running stage and after that, you will be able to see the execution stats in the Monitoring pane charts, as follows:
Now we can check the data in Cosmos DB container. Let us navigate to the Data Explorer tab on the Cosmos DB account page, expand the database/container names and review some of the recently uploaded items:
- Read: Welcome to Azure Cosmos DB
- Read: Quickstart: Create a Stream Analytics job by using the Azure portal
- Read: Azure Cosmos DB output from Azure Stream Analytics
About the author
View all my tips
Article Last Updated: 2021-03-23