Sample |
704 |
Level |
|
Description |
JMS transactional reads coupled with JDBC updates and file system changes using JTA XA transactions |
Shows JMS transactional read and JDBC transactional updates using JTA XA transactions. Further, transactions performing updates to the file system and rollback are described.
A JMS destination is polled, and the messages are saved into a table on a Database within a JTA transaction T1. A second transaction T2 is started, and the payload is saved into the file system, and more tables on the Database are read, and updated as shown below. If T2 is rolled back the File system changes needs to be rolled back as well.
We’ve used the Atomikos library for standalone JTA support, and used an ActiveMQ JMS provider and an Apache Derby Database in this example. This solution can be easily switched over to a JBoss application server, and run against a WebSphere JMS destination and an Oracle database etc. with few configuration changes.
The UltraESB configuration can easily host an embedded ActiveMQ server - if desired - with just a couple of lines of Spring configuration. In this example, we define the following Spring configuration fragment within the ultra-root.xml for this purpose.
Starting an Embedded ActiveMQ instance
1<amq:broker usejmx="false" persistent="false">
2 <amq:transportconnectors>
3 <amq:transportconnector uri="tcp://localhost:61616"/>
4 </amq:transportconnectors>
5</amq:broker>
Note the use of the Spring Framework and Java coding for the configuration. This allows one to use stable technology they are already familiar with, instead of learning a new XML configuration language etc. to configure a vendor specific ESB implementation. The mediation maybe specified as Java, Groovy, Javascript, Ruby or any other JSR 223 scripting language supported by JDK 7 or later, and the UltraESB compiles it into byte code for optimal execution. This frees the developer from compile, bundle and build cycles to deploy changes, and allows one to just write-and-execute. The option to deploy custom Java code or compiled classes/JAR files or Spring beans is also available for those who prefer to keep the source hidden.
Proxy Service Definition
1<u:proxy id="message-processor">
2 <u:transport id="jms-lsnr">
3 <u:property name="ultra.jms.destination" value="QUEUE.IN"/>
4 </u:transport>
5 <u:target>
6 <u:inSequence>
7 <u:java import="org.springframework.jdbc.support.*;
8 org.springframework.jdbc.core.simple.*; org.springframework.jdbc.core.*; java.sql.*;"><![CDATA[
9
10 final String clientName = "ermanno"; //msg.getFirstTransportHeader("client");
11 final String payload = mediation.readPayloadAsString(msg);
12 logger.info("Received message from client {} within transaction t1 as : {}", clientName, payload);
13
14 KeyHolder keyHolder = new GeneratedKeyHolder();
15 JdbcTemplate t = new JdbcTemplate(mediation.getDataSource("dataSource"));
16 t.update(new PreparedStatementCreator() {
17 public PreparedStatement createPreparedStatement(Connection connection) throws SQLException {
18 PreparedStatement ps = connection.prepareStatement(
19 "INSERT INTO APP.MESSAGE (CLIENT_NAME, PAYLOAD) VALUES (?,?)", Statement.RETURN_GENERATED_KEYS);
20 ps.setString(1, clientName);
21 ps.setString(2, payload);
22 return ps;
23 }
24 }, keyHolder);
25 Integer generatedId = new Integer(keyHolder.getKey().intValue());
26
27 t.update("INSERT INTO APP.HISTORY (FK_MESSAGE_ID, TIMESTAMP, STATE) VALUES (?,?,?)",
28 new Object[] {generatedId, new java.util.Date(), "RECEIVED"});
29
30 // simulate a failure if the payload is "fail"
31 if ("fail".equals(payload)) {
32 msg.rollbackTransaction();
33 logger.info("Rolled back JTA transaction : t1");
34 } else {
35 msg.commitTransaction();
36 logger.info("Committed JTA transaction : t1");
37 }
38
39 logger.info("Starting transaction : t2");
40 msg.beginTransaction("txManager");
41 boolean clientExists = t.queryForInt(
42 "SELECT COUNT(*) FROM APP.CLIENT WHERE CLIENT_NAME = ?", new Object[] {clientName}) == 1;
43 logger.info("Client Exists : {}", clientExists);
44 final String filePath = "/tmp/" + (clientExists ? "correct" : "error") + "/";
45 final String fileName = "message_" + generatedId;
46
47 try {
48 mediation.savePayloadToFile(msg, filePath + fileName);
49 t.update("INSERT INTO APP.FILE (FK_MESSAGE_ID, FILE_PATH, FILE_NAME) VALUES (?,?,?)",
50 new Object[] {generatedId, filePath, fileName});
51 t.update("INSERT INTO APP.HISTORY (FK_MESSAGE_ID, TIMESTAMP, STATE) VALUES (?,?,?)",
52 new Object[] {generatedId, new java.util.Date(), "CENSUS_OK"});
53 msg.commitTransaction();
54 } catch (Exception e) {
55 logger.error("Rolling back transaction by removing file : {}", filePath + fileName);
56 java.io.File f = new java.io.File(filePath + fileName);
57 if (f.exists()) {
58 if (!f.delete()) {
59 f.deleteOnExit();
60 }
61 }
62 msg.rollbackTransaction();
63 }
64 logger.info("Processing of message : {} is complete", generatedId);
65 ]]></u:java>
66 </u:inSequence>
67 </u:target>
68</u:proxy>
Note the highlighted text which allows the deletion of the File system changes if the transaction T2 is has to be rolled back. We’ve included lines 28-32 to simulate a failure case easily by setting the payload text to "fail". (See section below for more details)
As this example is written against the Atomikos Transactions Essentials library that allows standalone JTA capabilities without the use of an JEE application server, you will need to download and place the transactions-essentials-all.jar JAR into lib/custom directory.
Windows platforms may cause file locking problems. It is reccomended that the UltraESB is used on a Linux/Unix platform whenever the Filesystem transports or changes are involved. Generally, the UltraESB can be tuned for better performance on Linux systems than on Windows. |
Next start a Derby Database from your Sun JDK’s JDK_HOME/db/bin directory by executing the "startNetworkServer" script
Starting a Derby / JavaDB Database
asankha@http://asankha/opt/jdk/db/bin$[asankha:/opt/jdk/db/bin$] ./startNetworkServer Security manager installed using the Basic server security policy. Apache Derby Network Server - 10.4.2.0 - (689064) started and ready to accept connections on port 1527 at 2010-10-19 16:04:17.511 GMT
From the same directory, execute the Derby "ij" script, and connect to the sample database that will be used for the example. Note: A new database will be created during this process.
asankha@asankha:/opt/jdk1.6.0_18/db/bin$ ./ij ij version 10.4 ij> connect 'jdbc:derby://localhost:1527/ermanno;create=true;user=ermanno;password=ermanno';
Now copy the contents of the sample-704-schema.sql file into the clipboard, and paste it into the "ij" console to create the sample tables and schema.
Create the Database Schema
CREATE TABLE APP.CLIENT (
ID INTEGER GENERATED ALWAYS AS IDENTITY(START WITH 1, INCREMENT BY 1) NOT NULL,
CLIENT_NAME VARCHAR(25)
);
CREATE TABLE APP.FILE (
ID INTEGER GENERATED ALWAYS AS IDENTITY(START WITH 1, INCREMENT BY 1) NOT NULL,
FK_MESSAGE_ID INTEGER NOT NULL,
FILE_NAME VARCHAR(80) NOT NULL,
FILE_PATH VARCHAR(80) NOT NULL
);
CREATE TABLE APP.HISTORY (
ID INTEGER GENERATED ALWAYS AS IDENTITY(START WITH 1, INCREMENT BY 1) NOT NULL,
FK_MESSAGE_ID INTEGER NOT NULL,
TIMESTAMP TIMESTAMP NOT NULL,
STATE VARCHAR(25) NOT NULL
);
CREATE TABLE APP.MESSAGE (
ID INTEGER GENERATED ALWAYS AS IDENTITY(START WITH 1, INCREMENT BY 1) NOT NULL,
CLIENT_NAME VARCHAR(25),
PAYLOAD VARCHAR(2000)
);
Next create the two directories "/tmp/correct" and "/tmp/error" to place the saved message files.
To start the UltraESB configuration, execute sample 704 as follows:
asankha@asankha:~/java/ultraesb-1.6.1/bin$ ./ultraesb.sh -sample 704
To send a sample JMS message, start the SOA Toolbox as bin/toolbox.sh from another terminal. From the GUI, select File→New→JMS Client. Specify Destination JNDI name as "dynamicQueues/QUEUE.IN", and JNDI properties as ActiveMQ radio button. Type "Hello World" as the payload body selecting a Text payload, and press the send button.
You will see the JMS message being read through the log messages, and the database tables updated. The message will be saved into the /tmp/error directory as the client entry does not exist. Try creating a client entry for client "ermanno" (as we have hard coded for it in the configuration. You may uncomment it, and set a JMS message property 'client' via the SOA Toolbox as an exercise)
Type "fail" as the text message payload and issue a JMS message. The logs will show the message being processed, rolled back and reprocessed until its moved into the dead letter queue. Notice that the database changes to the MESSAGE table rolls back with the JMS rollback (JTA Transaction T1) but the message is saved into the file system and the FILE table etc is updated (JTA Transaction T2)