Version: 17.07
Supported Since: 17.07.1
Hogwarts is in deep waters lately. After Dolores Umbridge took over the headmastership, not even a pixie could flutter without her knowing. She made sure that school’s webmaster, Prof. Quirinus Quirrell is also dancing under her imperius. All he had to do is to monitor the web activities for the news articles and find which are gaining momentum. Original website looks as follows, with 0 read counts.
Quirrell deligates this task to one of the Slytherin students, Gregory Goyle. As the solution for this, Goyle has decided to use an ESB to integrate this web server with a Kafka server to provide real-time activity tracking. When the audience queries for the full news article as below, the request is used to populate an internal statistics engine to process read counts.
The requirements can be broken down into 3 key segments.
Retrieve the full news article and record the impression
Persist the records, passing data into Hadoop or data warehousing systems for real-time processing and reporting
Update the read count back on the server
UltraStudio Configuration
UltraESB-X Configuration
In order to implement above use case you must first select following dependencies when you are creating a new Ultra project.
HTTP NIO Connector and Kafka Connector from the connector list
Flow Control, Payload Manipulation, String Manipulation, Transport Header Manipulation, and XML Operations from the processor list
If you have already created a project, you can add above dependencies via Component Registry. From Tools menu, select Ultra Studio → Component Registry and from the Connectors list and Processors list, select above dependencies. |
To implement above use case, first let’s create our integration flow named “retrieve-article-and-record-impression-flow”. Then let’s add required components by going through following steps in order.
Add an NIO HTTP Ingress Connector from the connectors list to be triggered when a news article’s “Read full article” button is clicked on the web server. NIO HTTP Ingress Connector basic parameters should be filled as shown below to listen on port 8280 and service path /hogwarts/news.
The request message injected to the flow has a query parameter id to identify each news item; for further processing, this needs to extracted. Add an Extract HTTP Query Parameter processing element to extract this query parameter value and assign it to a scope variable named key.
Now, we compose a mock payload to be sent as the response of the full article request with the unique ID we extracted and a static payload in our case.
This message is then persisted to a Kafka topic using Kafka Egress Connector with basic parameters as shown below.
Finally, to make this example work, an Add New Transport Header processing element is added for enabling CORS which is not a part of the integration logic.
The completed integration flow for the segment 1 should look like below.
Configuration for each element is as below. The numbering corresponds to the numbers shown in above diagram.
Design View
Text View
1. NIO HTTP Ingress Connector
2. Extract HTTP Query Parameter
3. String Payload Setter
4. Kafka Egress Connector
5. Add New Transport Header
1. NIO HTTP Ingress Connector
|
8280 |
|
/hogwarts/news |
2. Extract HTTP Query Parameter
|
id |
|
key |
3. String Payload Setter
|
<message><id>@{variable.key}</id><article>During the 1997-1998 school year, Amycus Carrow was appointed Defence Against the Dark Arts Professor at Hogwarts, and changed the subject to just Dark Arts. The seventh years learned how to cast the Cruciatus Curse (which was to be practised on students who earned detention), and possibly how to conjure Fiendfyre and other Unforgivable Curses; Vincent Crabbe and Gregory Goyle were said to be top students due to their inclination to cruelty. However, Crabbe apparently did not pay attention to how to control or stop Fiendfyre, which was what led to his demise later that year.</article></message> |
4. Kafka Egress Connector
|
localhost:9092 |
|
XNewsTopic |
5. Add New Transport Header
|
Access-Control-Allow-Origin |
|
* |
Second segment can be implemented creating the next integration flow named “persist-record-to-stat-engine-flow”. Then let’s add required components by going through following steps in order.
Add an Kafka Ingress Connector from the connectors list to be triggered when a new record is published into the Kafka topic which was used to persist the messages in the segment 1. Whenever it consumes a new record, the message is injected into this flow.
Now, we extract the value of the id element in the message payload which was set in the first segment. Add an XPath String Extractor processing element to extract this value and assign it to a scope variable named id.
Then, this value is used to keep the read count for each individual news article in an internal statistics engine using the custom processing element Impression Injector.
The completed integration flow for the segment 2 should look like below.
Configuration for each element is as below. The numbering corresponds to the numbers shown in above diagram.
Design View
Text View
1. Kafka Ingress Connector
|
localhost:9092 |
|
XNewsTopic |
2. XPath String Extractor
|
id |
|
/message/id |
3. Impression Injector
|
@{variable.id} |
Final requirement is to update the read counts of the news article. In segment 3 we implement that by creating another integration flow named “retrieve-read-count-flow”. Then let’s add required components by going through following steps in order.
Add an NIO HTTP Ingress Connector from the connectors list to retrieve the read counts of each news article from the internal statistics engine. This connector listens on port 8280 and service path /hogwarts/stats.
The request message injected to the flow has a query parameter id to identify each news item, which needs to be extracted to retrieve the read count for the particular news article. Add an Extract HTTP Query Parameter processing element to extract this query parameter value and assign it to a scope variable named key.
Next, to retrieve the read count for the news article, a Read Count to Payload Setter custom processing element is added. This processor queries the read count from the statistics engine based on the unique key.
Finally, an Add New Transport Header processing element is added for enabling CORS just like in segment 1 which is not a part of the integration logic.
The completed integration flow for the segment 3 should look like below.
Configuration for each element is as below. The numbering corresponds to the numbers shown in above diagram.
Design View
Text View
Now you can run the Ultra Project and check the functionality of the integration flow. Create an UltraESB Server run configuration and start it.
When running the sample in the UltraESB-X distribution, you need to override the following properties in-order for the sample to work. The properties file is located at $ULTRA_HOME/conf/projects/real-time-activity-tracking-with-kafka/default.properties
Refer to Managing Project Properties documentation on how to override properties. |
persist-record-to-stat-engine-flow.news-kafka-listener.bootstrapServers |
The comma separated list of bootstrap servers for Kafka consumer in Segment 2 |
persist-record-to-stat-engine-flow.news-kafka-listener.topicName |
Topic name for Kafka consumer in Segment 2 |
retrieve-article-and-record-impression-flow.news-kafka-reporter.bootstrapServers |
The comma separated list of bootstrap servers for Kafka producer in Segment 1 |
retrieve-article-and-record-impression-flow.news-kafka-reporter.topicName |
Topic name for Kafka producer in Segment 1 |
After that navigate to $ULTRA_HOME/bin directory. Next you can run the UltraESB-X distribution with following command to start the engine with this sample project deployed.
./ultraesbx.sh -sample real-time-activity-tracking-with-kafka
Now you can run the integration flow and visit to http://localhost:5000 in a browser to open up the web server. Now try to read full article of a few news summaries there and notice the change in the read counts.
Prof. Quirrell presented the final web server which looked as follows with the read counts and explained about the persisted data for real-time analytics and other processing.
For this solution, Gregory Goyle was well rewarded by the headmistress with a prefectship.