Data Aggregation via Email and Database

Version: 17.01

Supported Since: 17.01

Scenario Description

Zenythz Corp. is a manufacturer of heavy machinery, runs several factories in different parts of mother country. As tooling for manufacturing heavy machinery is fairly expensive, Zenythz wishes to improve their tooling maintenance process by reporting maintenance details of each machine in a per-factory basis, daily, to the headquarters, which will maintain an aggregated record for each machine for further analysis.

Solution

After end of each day’s work shift, each factory sends a CSV file to the headquarters via email (as an attachment), containing maintenance details of each machine in the following format:

id,hours,restarts,minor_repairs,major_repairs,extra_cost
1,10,0,0,0,0
2,8,1,2,0,100

In order to distinguish the maintenance emails from other emails sent to headquarters, the email includes a special header X-Zenythz-Factory containing the ID of the respective factory.

An ESB instance running at headquarters is assigned with the task of processing the sent attachments and aggregating daily maintenance details of each machine into a central MySQL database.

Implementation

We can break this workflow into 2 integration flows:

  • a Report Receiver for fetching relevant emails and saving attached reports to the filesystem

  • a Report Processor for processing saved reports and updating the database

Prerequisites

  • A MySQL installation, with access to a user account with new database creation privileges

  • An email account supporting IMAP (or POP3, in which case the settings in the sample would have to be modified accordingly)

Report Receiver

Add a new integration flow report-receiver for the first stage of processing:

  1. Add the following resources in project.xpml as prerequisites for listening to emails:

    Type

    Map

    Name

    mail-props

    mail.imap.ssl.enable

    true

    Type

    Map

    Name

    mail-headers

    X-Zenythz-Factory

    (leave empty, just to ensure the presence of header)

  2. Add a Mail Ingress Connector with the following configurations (assuming a Gmail email address; replace account details as appropriate):

    Basic

    Protocol

    imap

    Host

    imap.gmail.com

    Port

    993

    User

    hq-email@gmail.com

    Password

    hq-password

    Folder

    INBOX

    Other Settings

    mail-props (Map)

    Advanced

    Search Headers

    mail-headers (Map)

    Scheduling

    Polling CRON Expression (1)

    0 0 23 ? * 1-7

    (1) check mail once a day at 11 PM, from Monday to Friday

  3. Next add a Logger to generate a log before attempting to save the attachment:

    Log Template

    Saving report from @{message.id} as @{message.headers.X-Zenythz-Factory}

    Log Level

    INFO

  4. As the file content does not contain any reference to the originating factory, we shall save it by the name of the factory itself (i.e. the value of X-Zenythz-Factory header), for correlation during further processing. For this, add an Attachment Saver with the following configuration:

    Attachment Name Regex

    factory\d+\.csv

    Directory Path

    /tmp/test

    Use Attachment Name as File Name

    Off

    File Name

    @{message.headers.X-Zenythz-Factory}

  5. Finally, complete the flow with a Successful Flow End element.

The completed flow would resemble the following:

report receiver

Report Processor

Add a new integration flow named report-processor for the second processing stage.

  1. Add a NIO File Ingress Connector to pick the saved report:

    Basic

    Root Path

    /tmp/test

    Pattern Syntax

    regex

    File Path Pattern

    \d+

    Advanced

    Wait After Modification (1)

    1000

    Remove Original File (2)

    On

    Move After Process

    /tmp/done

    Move After Failure

    /tmp/error

    Move Timestamp Format

    yyyy-MM-dd_HH:mm:ss

    (1) as NIO File Ingress Connector is event-driven, a sufficient delay should be allowed to allow the event notification system to reach stability

    (2) instead of deleting the received file, we shall store them in backup directories for archival purposes

  2. After the file is picked, its name (which is now the factory ID) will be available via the message header ultra.file.name. Add a Logger to log this ID before we start processing the report:

    Log Template

    Processing file @{message.headers.ultra.file.name}

    Log Level

    INFO

  3. As the machine entries in the report can be processed as individual database update queries, we can use an XML splitter to process them parallely. As we need an XML payload for this, let’s first add a CSV-to-XML transformer to convert the CSV content to XML:

    Root element name

    factory

    Row element name

    machine

    The resulting payload will resemble the following:

    <factory>
        <machine>
            <id>1</id>
            <hours>10</hours>
            ...
        </machine>
        <machine>
        ...
    </factory>
  4. Next add an XML Splitter to split the converted payload at nodes denoted by the XPath /factory/machine:

    Split Strategy

    PARALLEL

    Split XPath

    /factory/machine

    Aggregation Timeout

    10000

    From this point onwards, until an XML Aggregator is encountered on the flow, the processing element operations would be applied identically on the split payload segments being propagated in individual messages.

  5. For logging purposes, let’s first extract the machine ID into a variable using an XPath String Extractor:

    Variable Name

    machine_id

    XPath

    /machine/id

  6. Next add a Logger to log the machine ID, as a confirmation before performing the actual database update:

    Log Template

    Updating machine @{variable.machine_id} from factory @{message.headers.ultra.file.name}

    Log Level

    INFO

  7. For database access, a JDBC data source has to be added as a project resource. Switch to the project.xpml editor UI and add a Datasource using the Custom Template drop down. We shall assume a MySQL database configured on port 3306 on localhost:

    Resource ID

    data-src

    Driver class name

    com.mysql.jdbc.Driver

    Database URL

    jdbc:mysql://localhost:3306/zenythzdb?useSSL=false

    Initial connections

    2

    Minimum connections

    1

    Maximum connections

    5

    Validation query

    SELECT 1 FROM DUAL

    Username

    zenythz

    Password

    zenythz

  8. We can simplify the database update by utilizing a Database Mapping File to directly map the required data from the message into placeholders in a predefined parameterized query, rather than constructing the query by hand. For this, add a mapping file report-mapper.dson by right-clicking the resources directory on the Project Structure pane and selecting New → DB Configuration, and populate it with the following JSON content:

    {
      "messageType" : "XML",
      "columnNamesPresent" : false,
      "parameters" : [ {
        "type" : "PATH",
        "name" : "hours",
        "path" : "/machine/hours"
      }, {
        "type" : "PATH",
        "name" : "restarts",
        "path" : "/machine/restarts"
      }, {
        "type" : "PATH",
        "name" : "minor_repairs",
        "path" : "/machine/minor_repairs"
      }, {
        "type" : "PATH",
        "name" : "major_repairs",
        "path" : "/machine/major_repairs"
      }, {
        "type" : "PATH",
        "name" : "extra_cost",
        "path" : "/machine/extra_cost"
      }, {
        "type" : "VARIABLE",
        "name" : "machine_id",
        "variableName" : "machine_id",
        "className" : "java.lang.String",
        "propagateToParent" : false
      }, {
        "type" : "HEADER",
        "name" : "factory_id",
        "headerName" : "ultra.file.name",
        "className" : "java.lang.String"
      } ]
    }
  9. Now add a Database Processor element to perform the actual database update query, using the template we just defined (you may need to refresh the flow editor UI for the newly defined resources to become available):

    Mapping file

    report-mapper.dson

    SQL query

    UPDATE machine SET hours = hours + @{hours}, restarts = restarts + @{restarts}, minor_repairs = minor_repairs + @{minor_repairs}, major_repairs = major_repairs + @{major_repairs}, extra_cost = extra_cost + @{extra_cost} WHERE machine_id = @{machine_id} AND factory_id = @{factory_id}

  10. Before we finish the flow, we need to aggregate the split message paths using an XML Aggregator (although the aggregation result would not be useful for us in this case):

    Aggregation XPath

    /machine/id

    Wrapper element name

    report

  11. Now end the flow using a Successful Flow End element.

The complete flow, once constructed, would resemble the following:

report processor

Testing

Setting up the Database

As the privileged MySQL user, execute the schema.sql script inside src/main/resources. The script creates the zenythdb database with a machine table, and makes them accessible via a zenythz user. As the flow only involves an update query, it cannot anticipate new machines added to a factory (but can be improved to incorporate new machine additions as well). Hence the script will also insert an initial set of machine entries (containing nil (0) values for all data fields).

Preparing the filesystem

Create the folders /tmp/test, /tmp/done and /tmp/error for the saving and archival of email attachments.

Emulating factory emails

For testing the sample, you would require a mail sender that supports sending of both attachments and custom headers. As most web-based email clients do not support usage of custom headers during email composition, a simple SendMail.java program has been provided in the src/main/java directory of the sample project for the same purpose. Replace the string placeholders (and email provider information) as appropriate, and run the class (main() method) to generate and send an email that can be used for testing the sample.

Running the sample

  • Create an UltraESB Server run configuration and start the project

  • Use the email client to send an appropriately composed email to the listened email address. Sample CSV files factory1.csv and factory2.csv are provided in src/main/resources.

  • Observe the ESB log to track the reception and processing of machine maintenance records.

  • /tmp/done will contain all successfully processed reports, and /tmp/error will contain any reports that failed while processing, each prefixed by the received timestamp.

Property Configuration

When running the sample in the UltraESB-X distribution, you need to override the following properties in-order for the sample to work.

Refer to Managing Project Properties documentation on how to override properties.

ultracp.driver=com.mysql.jdbc.Driver

The database driver to be used

ultracp.url

Url of the database

ultracp.pool

The name of the database connection pool

ultracp.username

Database username

ultracp.password

Database password

ultracp.minSize

Minimum size of the database pool

ultracp.maxTotal

Maximum total number of connections of database pool

ultracp.initialSize

Initial number of connections to be created

In this topic
In this topic
Contact Us