id,hours,restarts,minor_repairs,major_repairs,extra_cost
1,10,0,0,0,0
2,8,1,2,0,100
Version: 17.07
Supported Since: 17.01
Zenythz Corp. is a manufacturer of heavy machinery, runs several factories in different parts of mother country. As tooling for manufacturing heavy machinery is fairly expensive, Zenythz wishes to improve their tooling maintenance process by reporting maintenance details of each machine in a per-factory basis, daily, to the headquarters, which will maintain an aggregated record for each machine for further analysis.
After end of each day’s work shift, each factory sends a CSV file to the headquarters via email (as an attachment), containing maintenance details of each machine in the following format:
id,hours,restarts,minor_repairs,major_repairs,extra_cost
1,10,0,0,0,0
2,8,1,2,0,100
In order to distinguish the maintenance emails from other emails sent to headquarters, the email includes a special
header X-Zenythz-Factory
containing the ID of the respective factory.
An ESB instance running at headquarters is assigned with the task of processing the sent attachments and aggregating daily maintenance details of each machine into a central MySQL database.
UltraStudio Configuration
UltraESB-X Configuration
A MySQL installation, with access to a user account with new database creation privileges
An email account supporting IMAP (or POP3, in which case the settings in the sample would have to be modified accordingly)
In order to implement above use case you must first select following dependencies when you are creating a new Ultra project.
File Connector, and Mail Connector from the connector list
Attachment Processing, Content Extraction, Database Operations, EIP Routing, Logging, Throttling, Transformation, and XML Operations from the processor list
If you have already created a project, you can add above dependencies via Component Registry. From Tools menu, select Ultra Studio → Component Registry and from the Connectors list and Processors list, select above dependencies. |
Add a new integration flow report-receiver
for the first stage of processing, and then add required components by
going through following steps in order.
Add a Mail Ingress Connector with the configurations shown below (assuming a Gmail email address; replace account details as appropriate)
Next add a Logger to generate a log before attempting to save the attachment.
As the file content does not contain any reference to the originating factory, we shall save it by the name of the
factory itself (i.e. the value of X-Zenythz-Factory
header), for correlation during further processing.
For this, add an Attachment Saver with the configuration shown below.
Finally, complete the flow with a Successful Flow End element.
The completed integration flow should look like below.
Configuration for each element is as below. The numbering corresponds to the numbers shown in above diagram.
Design View
Text View
1. Mail Ingress Connector (Basic Tab)
Other Settings Map
1. Mail Ingress Connector (Advanced Tab, Search Headers Map)
1. Mail Ingress Connector (Scheduling Tab)
2. Logger
3. Attachment Saver
1. Mail Ingress Connector (Basic Tab)
|
imap |
|
imap.gmail.com |
|
993 |
|
|
|
hq-password |
|
INBOX |
Other Settings Map
|
true |
1. Mail Ingress Connector (Advanced Tab, Search Headers Map)
|
1 |
1. Mail Ingress Connector (Scheduling Tab)
|
0 0 23 ? * 1-5 |
2. Logger
|
Saving report from @{message.id} as @{message.headers.X-Zenythz-Factory} |
|
INFO |
3. Attachment Saver
|
factory\d+\.csv |
|
/tmp/test |
|
Off |
|
@{message.headers.X-Zenythz-Factory} |
Add a new integration flow named report-processor
for the second processing stage, and then add required components
by going through following steps in order.
Add a NIO File Ingress Connector to pick the saved report.
After the file is picked, its name (which is now the factory ID) will be available via the message header ultra.file.name
.
Add a Logger to log this ID before we start processing the report.
As the machine entries in the report can be processed as individual database update queries, we can use an XML splitter to process them parallely. As we need an XML payload for this, let’s first add a CSV-to-XML transformer to convert the CSV content to XML.
The resulting payload will resemble the following:
<factory> <machine> <id>1</id> <hours>10</hours> ... </machine> <machine> ... </factory>
Next add an XML Splitter to split the converted payload at nodes denoted by the XPath /factory/machine
. From
this point onwards, until an XML Aggregator is encountered on the flow, the processing element operations would be
applied identically on the split payload segments being propagated in individual messages.
For logging purposes, let’s first extract the machine ID into a variable using an XPath String Extractor.
Next add a Logger to log the machine ID, as a confirmation before performing the actual database update.
For database access, a JDBC data source has to be added as a project resource. Switch to the project.xpml
and right click on the XML content and from the context menu select Resource Template. Next from the pop-up window
select Daatasource and click next. After that fill the necessary values as shown in below table and click save button. We shall assume a MySQL database configured
on port 3306
on localhost
.
|
data-src |
|
com.mysql.jdbc.Driver |
|
jdbc:mysql://localhost:3306/zenythzdb?useSSL=false |
|
2 |
|
1 |
|
5 |
|
SELECT 1 FROM DUAL |
|
zenythz |
|
zenythz |
We can simplify the database update by utilizing a Database Mapping File to directly map the required data from the
message into placeholders in a predefined parameterized query, rather than constructing the query by hand.
For this, add a mapping file report-mapper.dson
by right-clicking the resources
directory on the
Project Structure pane and selecting New → DB Configuration, and populate it with the following JSON content:
{ "messageType" : "XML", "columnNamesPresent" : false, "parameters" : [ { "type" : "PATH", "name" : "hours", "path" : "/machine/hours" }, { "type" : "PATH", "name" : "restarts", "path" : "/machine/restarts" }, { "type" : "PATH", "name" : "minor_repairs", "path" : "/machine/minor_repairs" }, { "type" : "PATH", "name" : "major_repairs", "path" : "/machine/major_repairs" }, { "type" : "PATH", "name" : "extra_cost", "path" : "/machine/extra_cost" }, { "type" : "VARIABLE", "name" : "machine_id", "variableName" : "machine_id", "className" : "java.lang.String", "propagateToParent" : false }, { "type" : "HEADER", "name" : "factory_id", "headerName" : "ultra.file.name", "className" : "java.lang.String" } ] }
Now add a Database Processor element to perform the actual database update query, using the template we just defined.
Before we finish the flow, we need to aggregate the split message paths using an XML Aggregator (although the aggregation result would not be useful for us in this case).
Now end the flow using a Successful Flow End element.
The completed integration flow should look like below.
Configuration for each element is as below. The numbering corresponds to the numbers shown in above diagram.
Design View
Text View
1. NIO Ingress Connector (Basic Tab)
1. NIO Ingress Connector (Advanced Tab)
2. Logger
3. CSV to XML Transformer
4. XML Splitter
5. Exceptional Flow End
6. XPath String Extractor
7. Logger
8. Database
9. Aggregate Processor
1. NIO Ingress Connector (Basic Tab)
|
/tmp/test |
|
regex |
|
\d+ |
1. NIO Ingress Connector (Advanced Tab)
|
1000 |
|
On |
|
/tmp/done |
|
/tmp/error |
|
yyyy-MM-dd_HH:mm:ss |
2. Logger
|
Processing file @{message.headers.ultra.file.name} |
|
INFO |
3. CSV to XML Transformer
|
factory |
|
machine |
4. XML Splitter
|
PARALLEL |
|
/factory/machine |
|
10000 |
5. Exceptional Flow End
|
XML Aggregation Timeout |
6. XPath String Extractor
|
machine_id |
|
/machine/id |
7. Logger
|
Updating machine @{variable.machine_id} from factory @{message.headers.ultra.file.name} |
|
INFO |
8. Database
|
report-mapper.dson |
|
UPDATE machine SET hours = hours + @{hours}, restarts = restarts + @{restarts}, minor_repairs = minor_repairs + @{minor_repairs}, major_repairs = major_repairs + @{major_repairs}, extra_cost = extra_cost + @{extra_cost} WHERE machine_id = @{machine_id} AND factory_id = @{factory_id} |
9. Aggregate Processor
|
/machine/id |
|
report |
Now you can run the Ultra Project and check the functionality of the integration flow. Create an UltraESB Server run configuration and start it.
When running the sample in the UltraESB-X distribution, you need to override the following properties in-order for the sample to work. The properties file is located at $ULTRA_HOME/conf/projects/email-db-data-aggregation/default.properties
Refer to Managing Project Properties documentation on how to override properties. |
ultracp.driver=com.mysql.jdbc.Driver |
The database driver to be used |
ultracp.url |
Url of the database |
ultracp.pool |
The name of the database connection pool |
ultracp.username |
Database username |
ultracp.password |
Database password |
ultracp.minSize |
Minimum size of the database pool |
ultracp.maxTotal |
Maximum total number of connections of database pool |
ultracp.initialSize |
Initial number of connections to be created |
You will also have to add the mysql-connector-java-5.1.38.jar
dependency (downloadable from
here) to $ULTRA_HOME/lib/custom
directory.
After that navigate to $ULTRA_HOME/bin directory. Next you can run the UltraESB-X distribution with following command to start the engine with this sample project deployed.
./ultraesbx.sh -sample email-db-data-aggregation
As the privileged MySQL user, execute the following schema.sql
script inside src/test/resources
.
CREATE DATABASE zenythzdb;
USE zenythzdb;
CREATE TABLE machine (
factory_id SMALLINT,
machine_id SMALLINT,
hours SMALLINT,
restarts SMALLINT,
minor_repairs SMALLINT,
major_repairs SMALLINT,
extra_cost INT,
PRIMARY KEY (machine_id, factory_id)
);
GRANT ALL PRIVILEGES ON zenythzdb.* TO 'zenythz'@'localhost' IDENTIFIED BY 'zenythz';
INSERT INTO machine VALUES (1, 1, 0, 0, 0, 0, 0), (1, 2, 0, 0, 0, 0, 0), (1, 3, 0, 0, 0, 0, 0), (1, 4, 0, 0, 0, 0, 0),
(2, 1, 0, 0, 0, 0, 0), (2, 2, 0, 0, 0, 0, 0), (2, 5, 0, 0, 0, 0, 0);
The script creates the zenythdb
database with a machine
table, and makes them accessible via a zenythz
user.
As the flow only involves an update query, it cannot anticipate new machines added to a factory (but can be improved to
incorporate new machine additions as well).
Hence the script will also insert an initial set of machine entries (containing nil (0) values for all data fields).
Create the folders /tmp/test
, /tmp/done
and /tmp/error
for the saving and archival of email attachments.
For testing the sample, you would require a mail sender that supports sending of both attachments and custom headers.
As most web-based email clients do not support usage of custom headers during email composition, a simple SendMail.java
program has been provided in the src/test/java
directory of the sample project for the same purpose.
Replace the string placeholders (and email provider information) as appropriate, and run the class (main()
method) to
generate and send an email that can be used for testing the sample.
Since the email listener is configured to poll for new mail only once a day at 11 PM via a cron expression,
you may need to change the expression to increase the polling frequency (e.g. 0 0/1 * 1/1 * ? * ) during testing.
|
Use the email client to send an appropriately composed email to the listened email address. Sample CSV files
factory1.csv
and factory2.csv
are provided in src/test/resources
.
id,hours,restarts,minor_repairs,major_repairs,extra_cost
1,10,0,0,0,0
2,8,1,2,0,100
3,15,3,0,1,1000
4,12,3,5,0,800
id,hours,restarts,minor_repairs,major_repairs,extra_cost
1,12,0,0,0,0
2,12,2,1,0,100
5,12,0,0,0,0
Observe the ESB log to track the reception and processing of machine maintenance records.
/tmp/done
will contain all successfully processed reports, and /tmp/error
will contain any reports that failed while
processing, each prefixed by the received timestamp.