id,hours,restarts,minor_repairs,major_repairs,extra_cost
1,10,0,0,0,0
2,8,1,2,0,100
Version: 17.01
Supported Since: 17.01
Zenythz Corp. is a manufacturer of heavy machinery, runs several factories in different parts of mother country. As tooling for manufacturing heavy machinery is fairly expensive, Zenythz wishes to improve their tooling maintenance process by reporting maintenance details of each machine in a per-factory basis, daily, to the headquarters, which will maintain an aggregated record for each machine for further analysis.
After end of each day’s work shift, each factory sends a CSV file to the headquarters via email (as an attachment), containing maintenance details of each machine in the following format:
id,hours,restarts,minor_repairs,major_repairs,extra_cost
1,10,0,0,0,0
2,8,1,2,0,100
In order to distinguish the maintenance emails from other emails sent to headquarters, the email includes a special
header X-Zenythz-Factory
containing the ID of the respective factory.
An ESB instance running at headquarters is assigned with the task of processing the sent attachments and aggregating daily maintenance details of each machine into a central MySQL database.
We can break this workflow into 2 integration flows:
a Report Receiver for fetching relevant emails and saving attached reports to the filesystem
a Report Processor for processing saved reports and updating the database
A MySQL installation, with access to a user account with new database creation privileges
An email account supporting IMAP (or POP3, in which case the settings in the sample would have to be modified accordingly)
Add a new integration flow report-receiver
for the first stage of processing:
Add the following resources in project.xpml
as prerequisites for listening to emails:
Type |
|
Name |
|
|
|
Type |
|
Name |
|
|
(leave empty, just to ensure the presence of header) |
Add a Mail Ingress Connector with the following configurations (assuming a Gmail email address; replace account details as appropriate):
Basic |
|
Protocol |
|
Host |
|
Port |
|
User |
|
Password |
|
Folder |
|
Other Settings |
|
Advanced |
|
Search Headers |
|
Scheduling |
|
Polling CRON Expression (1) |
|
(1) check mail once a day at 11 PM, from Monday to Friday
Next add a Logger to generate a log before attempting to save the attachment:
Log Template |
|
Log Level |
|
As the file content does not contain any reference to the originating factory, we shall save it by the name of the
factory itself (i.e. the value of X-Zenythz-Factory
header), for correlation during further processing.
For this, add an Attachment Saver with the following configuration:
Attachment Name Regex |
|
Directory Path |
|
Use Attachment Name as File Name |
|
File Name |
|
Finally, complete the flow with a Successful Flow End element.
The completed flow would resemble the following:
Add a new integration flow named report-processor
for the second processing stage.
Add a NIO File Ingress Connector to pick the saved report:
Basic |
|
Root Path |
|
Pattern Syntax |
|
File Path Pattern |
|
Advanced |
|
Wait After Modification (1) |
|
Remove Original File (2) |
|
Move After Process |
|
Move After Failure |
|
Move Timestamp Format |
|
(1) as NIO File Ingress Connector is event-driven, a sufficient delay should be allowed to allow the event notification system to reach stability
(2) instead of deleting the received file, we shall store them in backup directories for archival purposes
After the file is picked, its name (which is now the factory ID) will be available via the message header ultra.file.name
.
Add a Logger to log this ID before we start processing the report:
Log Template |
|
Log Level |
|
As the machine entries in the report can be processed as individual database update queries, we can use an XML splitter to process them parallely. As we need an XML payload for this, let’s first add a CSV-to-XML transformer to convert the CSV content to XML:
Root element name |
|
Row element name |
|
The resulting payload will resemble the following:
<factory> <machine> <id>1</id> <hours>10</hours> ... </machine> <machine> ... </factory>
Next add an XML Splitter to split the converted payload at nodes denoted by the XPath /factory/machine
:
Split Strategy |
|
Split XPath |
|
Aggregation Timeout |
|
From this point onwards, until an XML Aggregator is encountered on the flow, the processing element operations would be applied identically on the split payload segments being propagated in individual messages.
For logging purposes, let’s first extract the machine ID into a variable using an XPath String Extractor:
Variable Name |
|
XPath |
|
Next add a Logger to log the machine ID, as a confirmation before performing the actual database update:
Log Template |
|
Log Level |
|
For database access, a JDBC data source has to be added as a project resource.
Switch to the project.xpml
editor UI and add a Datasource using the Custom Template drop down.
We shall assume a MySQL database configured on port 3306
on localhost
:
Resource ID |
|
Driver class name |
|
Database URL |
|
Initial connections |
|
Minimum connections |
|
Maximum connections |
|
Validation query |
|
Username |
|
Password |
|
We can simplify the database update by utilizing a Database Mapping File to directly map the required data from the
message into placeholders in a predefined parameterized query, rather than constructing the query by hand.
For this, add a mapping file report-mapper.dson
by right-clicking the resources
directory on the
Project Structure pane and selecting New → DB Configuration, and populate it with the following JSON content:
{ "messageType" : "XML", "columnNamesPresent" : false, "parameters" : [ { "type" : "PATH", "name" : "hours", "path" : "/machine/hours" }, { "type" : "PATH", "name" : "restarts", "path" : "/machine/restarts" }, { "type" : "PATH", "name" : "minor_repairs", "path" : "/machine/minor_repairs" }, { "type" : "PATH", "name" : "major_repairs", "path" : "/machine/major_repairs" }, { "type" : "PATH", "name" : "extra_cost", "path" : "/machine/extra_cost" }, { "type" : "VARIABLE", "name" : "machine_id", "variableName" : "machine_id", "className" : "java.lang.String", "propagateToParent" : false }, { "type" : "HEADER", "name" : "factory_id", "headerName" : "ultra.file.name", "className" : "java.lang.String" } ] }
Now add a Database Processor element to perform the actual database update query, using the template we just defined (you may need to refresh the flow editor UI for the newly defined resources to become available):
Mapping file |
|
SQL query |
|
Before we finish the flow, we need to aggregate the split message paths using an XML Aggregator (although the aggregation result would not be useful for us in this case):
Aggregation XPath |
|
Wrapper element name |
|
Now end the flow using a Successful Flow End element.
The complete flow, once constructed, would resemble the following:
As the privileged MySQL user, execute the schema.sql
script inside src/main/resources
.
The script creates the zenythdb
database with a machine
table, and makes them accessible via a zenythz
user.
As the flow only involves an update query, it cannot anticipate new machines added to a factory (but can be improved to
incorporate new machine additions as well).
Hence the script will also insert an initial set of machine entries (containing nil (0) values for all data fields).
Create the folders /tmp/test
, /tmp/done
and /tmp/error
for the saving and archival of email attachments.
For testing the sample, you would require a mail sender that supports sending of both attachments and custom headers.
As most web-based email clients do not support usage of custom headers during email composition, a simple SendMail.java
program has been provided in the src/main/java
directory of the sample project for the same purpose.
Replace the string placeholders (and email provider information) as appropriate, and run the class (main()
method) to
generate and send an email that can be used for testing the sample.
Create an UltraESB Server run configuration and start the project
Use the email client to send an appropriately composed email to the listened email address.
Sample CSV files factory1.csv
and factory2.csv
are provided in src/main/resources
.
Observe the ESB log to track the reception and processing of machine maintenance records.
/tmp/done
will contain all successfully processed reports, and /tmp/error
will contain any reports that failed while
processing, each prefixed by the received timestamp.
When running the sample in the UltraESB-X distribution, you need to override the following properties in-order for the sample to work.
Refer to Managing Project Properties documentation on how to override properties. |
|
The database driver to be used |
|
Url of the database |
|
The name of the database connection pool |
|
Database username |
|
Database password |
|
Minimum size of the database pool |
|
Maximum total number of connections of database pool |
|
Initial number of connections to be created |