Transactions spanning multiple resources - an Example with JMS, JDBC and File systems with JTA

Sample

704

Level

Expert

Description

JMS transactional reads coupled with JDBC updates and file system changes using JTA XA transactions

Shows JMS transactional read and JDBC transactional updates using JTA XA transactions. Further, transactions performing updates to the file system and rollback are described.

Overview of the Use case

A JMS destination is polled, and the messages are saved into a table on a Database within a JTA transaction T1. A second transaction T2 is started, and the payload is saved into the file system, and more tables on the Database are read, and updated as shown below. If T2 is rolled back the File system changes needs to be rolled back as well.

jms jdbc jta example

The final configuration

We’ve used the Atomikos library for standalone JTA support, and used an ActiveMQ JMS provider and an Apache Derby Database in this example. This solution can be easily switched over to a JBoss application server, and run against a WebSphere JMS destination and an Oracle database etc. with few configuration changes.

Hosting an embedded ActiveMQ instance

The UltraESB configuration can easily host an embedded ActiveMQ server - if desired - with just a couple of lines of Spring configuration. In this example, we define the following Spring configuration fragment within the ultra-root.xml for this purpose.

Starting an Embedded ActiveMQ instance

1<amq:broker usejmx="false" persistent="false">
2 <amq:transportconnectors>
3    <amq:transportconnector uri="tcp://localhost:61616"/>
4 </amq:transportconnectors>
5</amq:broker>
Definition of the Proxy Service: "message-processor"

Note the use of the Spring Framework and Java coding for the configuration. This allows one to use stable technology they are already familiar with, instead of learning a new XML configuration language etc. to configure a vendor specific ESB implementation. The mediation maybe specified as Java, Groovy, Javascript, Ruby or any other JSR 223 scripting language supported by JDK 7 or later, and the UltraESB compiles it into byte code for optimal execution. This frees the developer from compile, bundle and build cycles to deploy changes, and allows one to just write-and-execute. The option to deploy custom Java code or compiled classes/JAR files or Spring beans is also available for those who prefer to keep the source hidden.

Proxy Service Definition

 1<u:proxy id="message-processor">
 2 <u:transport id="jms-lsnr">
 3     <u:property name="ultra.jms.destination" value="QUEUE.IN"/>
 4 </u:transport>
 5 <u:target>
 6 <u:inSequence>
 7     <u:java import="org.springframework.jdbc.support.*;
 8     org.springframework.jdbc.core.simple.*; org.springframework.jdbc.core.*; java.sql.*;"><![CDATA[
 9
10     final String clientName = "ermanno"; //msg.getFirstTransportHeader("client");
11     final String payload = mediation.readPayloadAsString(msg);
12     logger.info("Received message from client {} within transaction t1 as : {}", clientName, payload);
13
14     KeyHolder keyHolder = new GeneratedKeyHolder();
15     JdbcTemplate t = new JdbcTemplate(mediation.getDataSource("dataSource"));
16     t.update(new PreparedStatementCreator() {
17         public PreparedStatement createPreparedStatement(Connection connection) throws SQLException {
18             PreparedStatement ps = connection.prepareStatement(
19             "INSERT INTO APP.MESSAGE (CLIENT_NAME, PAYLOAD) VALUES (?,?)", Statement.RETURN_GENERATED_KEYS);
20             ps.setString(1, clientName);
21             ps.setString(2, payload);
22             return ps;
23         }
24     }, keyHolder);
25     Integer generatedId = new Integer(keyHolder.getKey().intValue());
26
27     t.update("INSERT INTO APP.HISTORY (FK_MESSAGE_ID, TIMESTAMP, STATE) VALUES (?,?,?)",
28         new Object[] {generatedId, new java.util.Date(), "RECEIVED"});
29
30     // simulate a failure if the payload is "fail"
31     if ("fail".equals(payload)) {
32         msg.rollbackTransaction();
33         logger.info("Rolled back JTA transaction : t1");
34     } else {
35         msg.commitTransaction();
36         logger.info("Committed JTA transaction : t1");
37     }
38
39     logger.info("Starting transaction : t2");
40     msg.beginTransaction("txManager");
41     boolean clientExists = t.queryForInt(
42         "SELECT COUNT(*) FROM APP.CLIENT WHERE CLIENT_NAME = ?", new Object[] {clientName}) == 1;
43     logger.info("Client Exists : {}", clientExists);
44     final String filePath = "/tmp/" + (clientExists ? "correct" : "error") + "/";
45     final String fileName = "message_" + generatedId;
46
47     try {
48         mediation.savePayloadToFile(msg, filePath + fileName);
49         t.update("INSERT INTO APP.FILE (FK_MESSAGE_ID, FILE_PATH, FILE_NAME) VALUES (?,?,?)",
50             new Object[] {generatedId, filePath, fileName});
51         t.update("INSERT INTO APP.HISTORY (FK_MESSAGE_ID, TIMESTAMP, STATE) VALUES (?,?,?)",
52             new Object[] {generatedId, new java.util.Date(), "CENSUS_OK"});
53         msg.commitTransaction();
54     } catch (Exception e) {
55         logger.error("Rolling back transaction by removing file : {}", filePath + fileName);
56         java.io.File f = new java.io.File(filePath + fileName);
57         if (f.exists()) {
58             if (!f.delete()) {
59                 f.deleteOnExit();
60             }
61         }
62         msg.rollbackTransaction();
63     }
64     logger.info("Processing of message : {} is complete", generatedId);
65     ]]></u:java>
66 </u:inSequence>
67 </u:target>
68</u:proxy>

Note the highlighted text which allows the deletion of the File system changes if the transaction T2 is has to be rolled back. We’ve included lines 28-32 to simulate a failure case easily by setting the payload text to "fail". (See section below for more details)

Executing the scenario

As this example is written against the Atomikos Transactions Essentials library that allows standalone JTA capabilities without the use of an JEE application server, you will need to download and place the transactions-essentials-all.jar JAR into lib/custom directory.

Windows platforms may cause file locking problems. It is reccomended that the UltraESB is used on a Linux/Unix platform whenever the Filesystem transports or changes are involved. Generally, the UltraESB can be tuned for better performance on Linux systems than on Windows.

Next start a Derby Database from your Sun JDK’s JDK_HOME/db/bin directory by executing the "startNetworkServer" script

Starting a Derby / JavaDB Database

asankha@http://asankha/opt/jdk/db/bin$[asankha:/opt/jdk/db/bin$] ./startNetworkServer
Security manager installed using the Basic server security policy.
Apache Derby Network Server - 10.4.2.0 - (689064) started and ready to accept connections on port 1527 at 2010-10-19 16:04:17.511 GMT

From the same directory, execute the Derby "ij" script, and connect to the sample database that will be used for the example. Note: A new database will be created during this process.

asankha@asankha:/opt/jdk1.6.0_18/db/bin$ ./ij
ij version 10.4
ij> connect 'jdbc:derby://localhost:1527/ermanno;create=true;user=ermanno;password=ermanno';

Now copy the contents of the sample-704-schema.sql file into the clipboard, and paste it into the "ij" console to create the sample tables and schema.

Create the Database Schema

CREATE TABLE APP.CLIENT (
    ID                         INTEGER GENERATED ALWAYS AS IDENTITY(START WITH 1, INCREMENT BY 1) NOT NULL,
    CLIENT_NAME     VARCHAR(25)
);

CREATE TABLE APP.FILE (
    ID                          INTEGER GENERATED ALWAYS AS IDENTITY(START WITH 1, INCREMENT BY 1) NOT NULL,
    FK_MESSAGE_ID  INTEGER NOT NULL,
    FILE_NAME           VARCHAR(80) NOT NULL,
    FILE_PATH            VARCHAR(80) NOT NULL
);

CREATE TABLE APP.HISTORY (
    ID                         INTEGER GENERATED ALWAYS AS IDENTITY(START WITH 1, INCREMENT BY 1) NOT NULL,
    FK_MESSAGE_ID INTEGER NOT NULL,
    TIMESTAMP         TIMESTAMP NOT NULL,
    STATE                  VARCHAR(25) NOT NULL
);

CREATE TABLE APP.MESSAGE (
    ID                         INTEGER GENERATED ALWAYS AS IDENTITY(START WITH 1, INCREMENT BY 1) NOT NULL,
    CLIENT_NAME     VARCHAR(25),
    PAYLOAD             VARCHAR(2000)
);

Next create the two directories "/tmp/correct" and "/tmp/error" to place the saved message files.

To start the UltraESB configuration, execute sample 704 as follows:

asankha@asankha:~/java/ultraesb-1.6.1/bin$ ./ultraesb.sh -sample 704

To send a sample JMS message, start the SOA Toolbox as bin/toolbox.sh from another terminal. From the GUI, select File→New→JMS Client. Specify Destination JNDI name as "dynamicQueues/QUEUE.IN", and JNDI properties as ActiveMQ radio button. Type "Hello World" as the payload body selecting a Text payload, and press the send button.

You will see the JMS message being read through the log messages, and the database tables updated. The message will be saved into the /tmp/error directory as the client entry does not exist. Try creating a client entry for client "ermanno" (as we have hard coded for it in the configuration. You may uncomment it, and set a JMS message property 'client' via the SOA Toolbox as an exercise)

Testing transaction rollback

Type "fail" as the text message payload and issue a JMS message. The logs will show the message being processed, rolled back and reprocessed until its moved into the dead letter queue. Notice that the database changes to the MESSAGE table rolls back with the JMS rollback (JTA Transaction T1) but the message is saved into the file system and the FILE table etc is updated (JTA Transaction T2)

In this topic
In this topic
Contact Us