Data Aggregation via Email and Database

Version: 17.07

Supported Since: 17.01

Use case description

Zenythz Corp. is a manufacturer of heavy machinery, runs several factories in different parts of mother country. As tooling for manufacturing heavy machinery is fairly expensive, Zenythz wishes to improve their tooling maintenance process by reporting maintenance details of each machine in a per-factory basis, daily, to the headquarters, which will maintain an aggregated record for each machine for further analysis.

Proposed Solution

After end of each day’s work shift, each factory sends a CSV file to the headquarters via email (as an attachment), containing maintenance details of each machine in the following format:

id,hours,restarts,minor_repairs,major_repairs,extra_cost
1,10,0,0,0,0
2,8,1,2,0,100

In order to distinguish the maintenance emails from other emails sent to headquarters, the email includes a special header X-Zenythz-Factory containing the ID of the respective factory.

An ESB instance running at headquarters is assigned with the task of processing the sent attachments and aggregating daily maintenance details of each machine into a central MySQL database.

UltraStudio Configuration

UltraESB-X Configuration

Implementation of the Solution

Prerequisites

  • A MySQL installation, with access to a user account with new database creation privileges

  • An email account supporting IMAP (or POP3, in which case the settings in the sample would have to be modified accordingly)

In order to implement above use case you must first select following dependencies when you are creating a new Ultra project.

  • File Connector, and Mail Connector from the connector list

  • Attachment Processing, Content Extraction, Database Operations, EIP Routing, Logging, Throttling, Transformation, and XML Operations from the processor list

If you have already created a project, you can add above dependencies via Component Registry. From Tools menu, select Ultra Studio → Component Registry and from the Connectors list and Processors list, select above dependencies.
Implementation

We can break this workflow into 2 integration flows:

  • a Report Receiver for fetching relevant emails and saving attached reports to the filesystem

  • a Report Processor for processing saved reports and updating the database

Report Receiver

Add a new integration flow report-receiver for the first stage of processing, and then add required components by going through following steps in order.

  1. Add a Mail Ingress Connector with the configurations shown below (assuming a Gmail email address; replace account details as appropriate)

  2. Next add a Logger to generate a log before attempting to save the attachment.

  3. As the file content does not contain any reference to the originating factory, we shall save it by the name of the factory itself (i.e. the value of X-Zenythz-Factory header), for correlation during further processing. For this, add an Attachment Saver with the configuration shown below.

  4. Finally, complete the flow with a Successful Flow End element.

The completed integration flow should look like below.

email db data aggregation b complete flow

Configuration for each element is as below. The numbering corresponds to the numbers shown in above diagram.

Design View

Text View

.

1. Mail Ingress Connector (Basic Tab)

email db data aggregation b component 1

Other Settings Map

email db data aggregation b component 1 1

1. Mail Ingress Connector (Advanced Tab, Search Headers Map)

email db data aggregation b component 1 2

1. Mail Ingress Connector (Scheduling Tab)

email db data aggregation b component 1 3

2. Logger

email db data aggregation b component 2

3. Attachment Saver

email db data aggregation b component 3
.

1. Mail Ingress Connector (Basic Tab)

Protocol

imap

Host

imap.gmail.com

Port

993

User

Password

hq-password

Folder

INBOX

Other Settings Map

mail.imap.ssl.enable

true

1. Mail Ingress Connector (Advanced Tab, Search Headers Map)

X-Zenythz-Factory

1

1. Mail Ingress Connector (Scheduling Tab)

Polling CRON Expression (1)

0 0 23 ? * 1-5

2. Logger

Log Template

Saving report from @{message.id} as @{message.headers.X-Zenythz-Factory}

Log Level

INFO

3. Attachment Saver

Attachment Name Regex

factory\d+\.csv

Directory Path

/tmp/test

Use Attachment Name as File Name

Off

File Name

@{message.headers.X-Zenythz-Factory}

Report Processor

Add a new integration flow named report-processor for the second processing stage, and then add required components by going through following steps in order.

  1. Add a NIO File Ingress Connector to pick the saved report.

  2. After the file is picked, its name (which is now the factory ID) will be available via the message header ultra.file.name. Add a Logger to log this ID before we start processing the report.

  3. As the machine entries in the report can be processed as individual database update queries, we can use an XML splitter to process them parallely. As we need an XML payload for this, let’s first add a CSV-to-XML transformer to convert the CSV content to XML.

    The resulting payload will resemble the following:

    <factory>
        <machine>
            <id>1</id>
            <hours>10</hours>
            ...
        </machine>
        <machine>
        ...
    </factory>
  4. Next add an XML Splitter to split the converted payload at nodes denoted by the XPath /factory/machine. From this point onwards, until an XML Aggregator is encountered on the flow, the processing element operations would be applied identically on the split payload segments being propagated in individual messages.

  5. For logging purposes, let’s first extract the machine ID into a variable using an XPath String Extractor.

  6. Next add a Logger to log the machine ID, as a confirmation before performing the actual database update.

  7. For database access, a JDBC data source has to be added as a project resource. Switch to the project.xpml and right click on the XML content and from the context menu select Resource Template. Next from the pop-up window select Daatasource and click next. After that fill the necessary values as shown in below table and click save button. We shall assume a MySQL database configured on port 3306 on localhost.

Resource ID

data-src

Driver class name

com.mysql.jdbc.Driver

Database URL

jdbc:mysql://localhost:3306/zenythzdb?useSSL=false

Initial connections

2

Minimum connections

1

Maximum connections

5

Validation query

SELECT 1 FROM DUAL

Username

zenythz

Password

zenythz

  1. We can simplify the database update by utilizing a Database Mapping File to directly map the required data from the message into placeholders in a predefined parameterized query, rather than constructing the query by hand. For this, add a mapping file report-mapper.dson by right-clicking the resources directory on the Project Structure pane and selecting New → DB Configuration, and populate it with the following JSON content:

    {
      "messageType" : "XML",
      "columnNamesPresent" : false,
      "parameters" : [ {
        "type" : "PATH",
        "name" : "hours",
        "path" : "/machine/hours"
      }, {
        "type" : "PATH",
        "name" : "restarts",
        "path" : "/machine/restarts"
      }, {
        "type" : "PATH",
        "name" : "minor_repairs",
        "path" : "/machine/minor_repairs"
      }, {
        "type" : "PATH",
        "name" : "major_repairs",
        "path" : "/machine/major_repairs"
      }, {
        "type" : "PATH",
        "name" : "extra_cost",
        "path" : "/machine/extra_cost"
      }, {
        "type" : "VARIABLE",
        "name" : "machine_id",
        "variableName" : "machine_id",
        "className" : "java.lang.String",
        "propagateToParent" : false
      }, {
        "type" : "HEADER",
        "name" : "factory_id",
        "headerName" : "ultra.file.name",
        "className" : "java.lang.String"
      } ]
    }
  2. Now add a Database Processor element to perform the actual database update query, using the template we just defined.

  3. Before we finish the flow, we need to aggregate the split message paths using an XML Aggregator (although the aggregation result would not be useful for us in this case).

  4. Now end the flow using a Successful Flow End element.

The completed integration flow should look like below.

email db data aggregation a complete flow

Configuration for each element is as below. The numbering corresponds to the numbers shown in above diagram.

Design View

Text View

.

1. NIO Ingress Connector (Basic Tab)

email db data aggregation a component 1

1. NIO Ingress Connector (Advanced Tab)

email db data aggregation a component 1 1

2. Logger

email db data aggregation a component 2

3. CSV to XML Transformer

email db data aggregation a component 3

4. XML Splitter

email db data aggregation a component 4

5. Exceptional Flow End

email db data aggregation a component 5

6. XPath String Extractor

email db data aggregation a component 6

7. Logger

email db data aggregation a component 7

8. Database

email db data aggregation a component 8

9. Aggregate Processor

email db data aggregation a component 9
.

1. NIO Ingress Connector (Basic Tab)

Root Path

/tmp/test

Pattern Syntax

regex

File Path Pattern

\d+

1. NIO Ingress Connector (Advanced Tab)

Wait After Modification (1)

1000

Remove Original File (2)

On

Move After Process

/tmp/done

Move After Failure

/tmp/error

Move Timestamp Format

yyyy-MM-dd_HH:mm:ss

2. Logger

Log Template

Processing file @{message.headers.ultra.file.name}

Log Level

INFO

3. CSV to XML Transformer

Root element name

factory

Row element name

machine

4. XML Splitter

Split Strategy

PARALLEL

Split XPath

/factory/machine

Aggregation Timeout

10000

5. Exceptional Flow End

Exception Message

XML Aggregation Timeout

6. XPath String Extractor

Variable Name

machine_id

XPath

/machine/id

7. Logger

Log Template

Updating machine @{variable.machine_id} from factory @{message.headers.ultra.file.name}

Log Level

INFO

8. Database

Mapping file

report-mapper.dson

SQL query

UPDATE machine SET hours = hours + @{hours}, restarts = restarts + @{restarts}, minor_repairs = minor_repairs + @{minor_repairs}, major_repairs = major_repairs + @{major_repairs}, extra_cost = extra_cost + @{extra_cost} WHERE machine_id = @{machine_id} AND factory_id = @{factory_id}

9. Aggregate Processor

Aggregation XPath

/machine/id

Wrapper element name

report

.

Now you can run the Ultra Project and check the functionality of the integration flow. Create an UltraESB Server run configuration and start it.

Property Configuration

When running the sample in the UltraESB-X distribution, you need to override the following properties in-order for the sample to work. The properties file is located at $ULTRA_HOME/conf/projects/email-db-data-aggregation/default.properties

Refer to Managing Project Properties documentation on how to override properties.

ultracp.driver=com.mysql.jdbc.Driver

The database driver to be used

ultracp.url

Url of the database

ultracp.pool

The name of the database connection pool

ultracp.username

Database username

ultracp.password

Database password

ultracp.minSize

Minimum size of the database pool

ultracp.maxTotal

Maximum total number of connections of database pool

ultracp.initialSize

Initial number of connections to be created

You will also have to add the mysql-connector-java-5.1.38.jar dependency (downloadable from here) to $ULTRA_HOME/lib/custom directory.

After that navigate to $ULTRA_HOME/bin directory. Next you can run the UltraESB-X distribution with following command to start the engine with this sample project deployed.

./ultraesbx.sh -sample email-db-data-aggregation

Testing the Integration Project

Setting up the Database

As the privileged MySQL user, execute the following schema.sql script inside src/test/resources.

CREATE DATABASE zenythzdb;
USE zenythzdb;
CREATE TABLE machine (
    factory_id SMALLINT,
    machine_id SMALLINT,
    hours SMALLINT,
    restarts SMALLINT,
    minor_repairs SMALLINT,
    major_repairs SMALLINT,
    extra_cost INT,
    PRIMARY KEY (machine_id, factory_id)
);
GRANT ALL PRIVILEGES ON zenythzdb.* TO 'zenythz'@'localhost' IDENTIFIED BY 'zenythz';

INSERT INTO machine VALUES (1, 1, 0, 0, 0, 0, 0), (1, 2, 0, 0, 0, 0, 0), (1, 3, 0, 0, 0, 0, 0), (1, 4, 0, 0, 0, 0, 0),
(2, 1, 0, 0, 0, 0, 0), (2, 2, 0, 0, 0, 0, 0), (2, 5, 0, 0, 0, 0, 0);

The script creates the zenythdb database with a machine table, and makes them accessible via a zenythz user. As the flow only involves an update query, it cannot anticipate new machines added to a factory (but can be improved to incorporate new machine additions as well). Hence the script will also insert an initial set of machine entries (containing nil (0) values for all data fields).

Preparing the filesystem

Create the folders /tmp/test, /tmp/done and /tmp/error for the saving and archival of email attachments.

Emulating factory emails

For testing the sample, you would require a mail sender that supports sending of both attachments and custom headers. As most web-based email clients do not support usage of custom headers during email composition, a simple SendMail.java program has been provided in the src/test/java directory of the sample project for the same purpose. Replace the string placeholders (and email provider information) as appropriate, and run the class (main() method) to generate and send an email that can be used for testing the sample.

Running the sample

Since the email listener is configured to poll for new mail only once a day at 11 PM via a cron expression, you may need to change the expression to increase the polling frequency (e.g. 0 0/1 * 1/1 * ? *) during testing.
  1. Use the email client to send an appropriately composed email to the listened email address. Sample CSV files factory1.csv and factory2.csv are provided in src/test/resources.

id,hours,restarts,minor_repairs,major_repairs,extra_cost
1,10,0,0,0,0
2,8,1,2,0,100
3,15,3,0,1,1000
4,12,3,5,0,800
id,hours,restarts,minor_repairs,major_repairs,extra_cost
1,12,0,0,0,0
2,12,2,1,0,100
5,12,0,0,0,0
  1. Observe the ESB log to track the reception and processing of machine maintenance records.

  2. /tmp/done will contain all successfully processed reports, and /tmp/error will contain any reports that failed while processing, each prefixed by the received timestamp.

In this topic
In this topic
Contact Us