Real Time Activity Tracking With Kafka

Version: 17.07

Supported Since: 17.07.1

Use case description

Hogwarts is in deep waters lately. After Dolores Umbridge took over the headmastership, not even a pixie could flutter without her knowing. She made sure that school’s webmaster, Prof. Quirinus Quirrell is also dancing under her imperius. All he had to do is to monitor the web activities for the news articles and find which are gaining momentum. Original website looks as follows, with 0 read counts.

real time activity tracking with kafka original website view

Proposed Solution

Quirrell deligates this task to one of the Slytherin students, Gregory Goyle. As the solution for this, Goyle has decided to use an ESB to integrate this web server with a Kafka server to provide real-time activity tracking. When the audience queries for the full news article as below, the request is used to populate an internal statistics engine to process read counts.

real time activity tracking with kafka full article view

The requirements can be broken down into 3 key segments.

  1. Retrieve the full news article and record the impression

  2. Persist the records, passing data into Hadoop or data warehousing systems for real-time processing and reporting

  3. Update the read count back on the server

UltraStudio Configuration

UltraESB-X Configuration

Implementation of the Solution

Prerequisite

In order to implement above use case you must first select following dependencies when you are creating a new Ultra project.

  • HTTP NIO Connector and Kafka Connector from the connector list

  • Flow Control, Payload Manipulation, String Manipulation, Transport Header Manipulation, and XML Operations from the processor list

If you have already created a project, you can add above dependencies via Component Registry. From Tools menu, select Ultra Studio → Component Registry and from the Connectors list and Processors list, select above dependencies.
Implementation
Segment 1

To implement above use case, first let’s create our integration flow named “retrieve-article-and-record-impression-flow”. Then let’s add required components by going through following steps in order.

  1. Add an NIO HTTP Ingress Connector from the connectors list to be triggered when a news article’s “Read full article” button is clicked on the web server. NIO HTTP Ingress Connector basic parameters should be filled as shown below to listen on port 8280 and service path /hogwarts/news.

  2. The request message injected to the flow has a query parameter id to identify each news item; for further processing, this needs to extracted. Add an Extract HTTP Query Parameter processing element to extract this query parameter value and assign it to a scope variable named key.

  3. Now, we compose a mock payload to be sent as the response of the full article request with the unique ID we extracted and a static payload in our case.

  4. This message is then persisted to a Kafka topic using Kafka Egress Connector with basic parameters as shown below.

  5. Finally, to make this example work, an Add New Transport Header processing element is added for enabling CORS which is not a part of the integration logic.

The completed integration flow for the segment 1 should look like below.

real time activity tracking with kafka segment 1 complete flow

Configuration for each element is as below. The numbering corresponds to the numbers shown in above diagram.

Design View

Text View

.

1. NIO HTTP Ingress Connector

real time activity tracking with kafka segment 1 component 1

2. Extract HTTP Query Parameter

real time activity tracking with kafka segment 1 component 2

3. String Payload Setter

real time activity tracking with kafka segment 1 component 3

4. Kafka Egress Connector

real time activity tracking with kafka segment 1 component 4

5. Add New Transport Header

real time activity tracking with kafka segment 1 component 5
.

1. NIO HTTP Ingress Connector

Http port

8280

Service path

/hogwarts/news

2. Extract HTTP Query Parameter

Query parameter name

id

Variable Name

key

3. String Payload Setter

String Payload

<message><id>@{variable.key}</id><article>During the 1997-1998 school year, Amycus Carrow was appointed Defence Against the Dark Arts Professor at Hogwarts, and changed the subject to just Dark Arts. The seventh years learned how to cast the Cruciatus Curse (which was to be practised on students who earned detention), and possibly how to conjure Fiendfyre and other Unforgivable Curses; Vincent Crabbe and Gregory Goyle were said to be top students due to their inclination to cruelty. However, Crabbe apparently did not pay attention to how to control or stop Fiendfyre, which was what led to his demise later that year.</article></message>

4. Kafka Egress Connector

Bootstrap Servers

localhost:9092

Topic Name

XNewsTopic

5. Add New Transport Header

Transport Header Name

Access-Control-Allow-Origin

Value

*

Segment 2

Second segment can be implemented creating the next integration flow named “persist-record-to-stat-engine-flow”. Then let’s add required components by going through following steps in order.

  1. Add an Kafka Ingress Connector from the connectors list to be triggered when a new record is published into the Kafka topic which was used to persist the messages in the segment 1. Whenever it consumes a new record, the message is injected into this flow.

  2. Now, we extract the value of the id element in the message payload which was set in the first segment. Add an XPath String Extractor processing element to extract this value and assign it to a scope variable named id.

  3. Then, this value is used to keep the read count for each individual news article in an internal statistics engine using the custom processing element Impression Injector.

The completed integration flow for the segment 2 should look like below.

real time activity tracking with kafka segment 2 complete flow

Configuration for each element is as below. The numbering corresponds to the numbers shown in above diagram.

Design View

Text View

.

1. Kafka Ingress Connector

real time activity tracking with kafka segment 2 component 1

2. XPath String Extractor

real time activity tracking with kafka segment 2 component 2

3. Impression Injector

real time activity tracking with kafka segment 2 component 3
.

1. Kafka Ingress Connector

Bootstrap Servers

localhost:9092

Topic Name

XNewsTopic

2. XPath String Extractor

Variable Name

id

XPath

/message/id

3. Impression Injector

News ID

@{variable.id}

Segment 3

Final requirement is to update the read counts of the news article. In segment 3 we implement that by creating another integration flow named “retrieve-read-count-flow”. Then let’s add required components by going through following steps in order.

  1. Add an NIO HTTP Ingress Connector from the connectors list to retrieve the read counts of each news article from the internal statistics engine. This connector listens on port 8280 and service path /hogwarts/stats.

  2. The request message injected to the flow has a query parameter id to identify each news item, which needs to be extracted to retrieve the read count for the particular news article. Add an Extract HTTP Query Parameter processing element to extract this query parameter value and assign it to a scope variable named key.

  3. Next, to retrieve the read count for the news article, a Read Count to Payload Setter custom processing element is added. This processor queries the read count from the statistics engine based on the unique key.

  4. Finally, an Add New Transport Header processing element is added for enabling CORS just like in segment 1 which is not a part of the integration logic.

The completed integration flow for the segment 3 should look like below.

real time activity tracking with kafka segment 3 complete flow

Configuration for each element is as below. The numbering corresponds to the numbers shown in above diagram.

Design View

Text View

.

1. NIO HTTP Ingress Connector

real time activity tracking with kafka segment 3 component 1

2. Extract HTTP Query Parameter

real time activity tracking with kafka segment 3 component 2

3. Read Count to Payload Setter

real time activity tracking with kafka segment 3 component 3

4. Add New Transport Header

real time activity tracking with kafka segment 3 component 4
.

1. NIO HTTP Ingress Connector

Http port

8280

Service path

/hogwarts/stats

2. Extract HTTP Query Parameter

Query parameter name

id

Variable Name

key

3. Read Count to Payload Setter

News ID

@{variable.key}

4. Add New Transport Header

Transport Header Name

Access-Control-Allow-Origin

Value

*

.

Now you can run the Ultra Project and check the functionality of the integration flow. Create an UltraESB Server run configuration and start it.

Property Configuration

When running the sample in the UltraESB-X distribution, you need to override the following properties in-order for the sample to work. The properties file is located at $ULTRA_HOME/conf/projects/real-time-activity-tracking-with-kafka/default.properties

Refer to Managing Project Properties documentation on how to override properties.

persist-record-to-stat-engine-flow.news-kafka-listener.bootstrapServers

The comma separated list of bootstrap servers for Kafka consumer in Segment 2

persist-record-to-stat-engine-flow.news-kafka-listener.topicName

Topic name for Kafka consumer in Segment 2

retrieve-article-and-record-impression-flow.news-kafka-reporter.bootstrapServers

The comma separated list of bootstrap servers for Kafka producer in Segment 1

retrieve-article-and-record-impression-flow.news-kafka-reporter.topicName

Topic name for Kafka producer in Segment 1

After that navigate to $ULTRA_HOME/bin directory. Next you can run the UltraESB-X distribution with following command to start the engine with this sample project deployed.

./ultraesbx.sh -sample real-time-activity-tracking-with-kafka

Testing the Integration Project

Now you can run the integration flow and visit to http://localhost:5000 in a browser to open up the web server. Now try to read full article of a few news summaries there and notice the change in the read counts.

Prof. Quirrell presented the final web server which looked as follows with the read counts and explained about the persisted data for real-time analytics and other processing.

real time activity tracking with kafka read count view

For this solution, Gregory Goyle was well rewarded by the headmistress with a prefectship.

In this topic
In this topic
Contact Us