Transactional ESB use cases made simple with the UltraESB

Sample Number





Demonstrates JTA XA and JDBC/JMS local transactions use, including asynchronous suspension and resume over JBoss Application Server

Use Case

Demonstrates the ease of handling both JTA and non-JTA (i.e. local) transactions - or both at the same time, with the UltraESB. In addition to the ease of use, the power to suspend and resume transactions by different threads, or to operate with multiple transactions within a message flow allows any requirement to be met much more easily. Compare the ease of use and the features - by reference with articles describing a subset of this functionality with the JBoss ESB and the WSO2 ESB.

Introduction to the scenario

This example (UltraESB sample # 702) describes the use of JTA and non-JTA transactions with the UltraESB when deployed within a JEE application server. The sample # 703 simulates this same scenario without an application server deployment - using the Atomikos transaction essentials libraries and ActiveMQ as the JMS provider.

This example reads a text JMS message from a queue 'OrderQueue' hosted by the JBoss application server within a JTA XA transaction. Although this request may commit or rollback - thus reverting the read for a redelivery - we wish to log the receipt of this message into a database. Hence, the JTA transaction will be suspended, and an independent local database transaction initiated to insert a record to the 'ORDERS_RECEIVED' table in an externally hosted Apache Derby / JavaDB database. Now the request received over JMS is transformed into a simple SOAP request, and sent to an external endpoint over HTTP.

The SOAP response is received asynchronously - possibly after a few seconds - without tying down any of the ESB worker threads while waiting for the response, as the UltraESB uses Java NIO to send and receive over HTTP/S. The suspended JTA transaction can then be resumed, again and a database insert to the table ORDERS_ACCEPTED performed within the scope of this XA transaction. The quoted price from the SOAP response is checked with the request price, and the JTA transaction is either committed or rolled back. Thus in the case of a rollback, both the update to the ORDERS_ACCEPTED table and the JMS message read is rolled-back - thus prompting a re-delivery of the same message. However, the insert to the ORDERS_RECEIVED will remain intact, as it was performed within the scope of another [local] transaction.

jta esb

Refer to the complete configuration for this example, and note that it is less than a page of text (Excluding the annotation comments), and is simple to understand by any human, while powerful in effect. There is NO hidden source code being linked, no compiled classes, or other artifacts to 'deploy' etc or other files used for the configuration. Let us now look into details of the Proxy service definition developed to solve the above use case.

Salient points of the UltraESB Proxy Service

The Proxy service identified as "jmsProxyIN" is bound to the transport definition identified by "jms-lsnr", and the JMS destination JNDI name is given as "OrderQueue". Refer to the complete configuration for the listener configuration, and note that it is almost the same as for a Spring configuration for its DefaultMessageListenerContainer beans - supporting each of those attributes for more advanced use cases.

JTA XA and Local Transactions with Asynchronous Suspend and Resume

 1<u:proxy id="jmsProxyIN">
 2    <u:transport id="jms-lsnr">
 3        <u:property name="ultra.jms.destination" value="OrderQueue"/>
 4    </u:transport>
 5    <u:target>
 6        <u:inSequence>
 7            <u:java import="org.springframework.jdbc.core.simple.*;"><![CDATA[
 9                String msgBody = mediation.readPayloadAsString(msg);
10      "New order message received : " + msgBody);
12                // Simple extraction of message data
13                String symbol = msgBody.split(" ")[0];
14                int quantity = Integer.parseInt(msgBody.split(" ")[1]);
15                double price = Double.parseDouble(msgBody.split(" ")[2]);
17                // suspend the transaction against transaction manager "txManager"
18                // Note: this is the TM used to read the JMS message (See JMS listener configuration)
19                msg.suspendTransaction("txManager");
21                // begin a new non-JTA transaction for use with the non-transactional Datasource and log
22                // message properties and commit. Note that this commit is now independent of the JTA
23                // transaction used to read the JMS message, and is now suspended
24                msg.beginTransaction("nonTxManager");
25                SimpleJdbcTemplate t = new SimpleJdbcTemplate(mediation.getDataSource("nonTxDS"));
26                t.update("insert into ORDERS_RECEIVED values(?,?,?,?)",
27                    new Object[] {msg.getMessageUUID().toString(), symbol, quantity, price});
28                msg.commitTransaction();
30                // Request for message properties to be made available with the asynchronous response
31                msg.addResponseCorrelation("symbol", symbol);
32                msg.addResponseCorrelation("quantity", quantity);
33                msg.addResponseCorrelation("price", price);
34                // create a stock quote request - Note: using a simple String message for simplicity,
35                // JSR-233 scripting languages, XQuery, XSLT, Milyn etc are other options (see samples)
36                msg.setContentType("text/xml");
37                mediation.setPayloadFromString(msg,
38                    "<soapenv:Envelope xmlns:soapenv=\"\"" +
39                    " xmlns:soap=\"\">\n" +
40                    "   <soapenv:Body>\n" +
41                    "      <soap:getQuote>\n" +
42                    "         <request>\n" +
43                    "            <symbol>" + symbol + "</symbol>\n" +
44                    "         </request>\n" +
45                    "      </soap:getQuote>\n" +
46                    "   </soapenv:Body>\n" +
47                    "</soapenv:Envelope>");
48            ]]></u:java>
49        </u:inSequence>
51        <u:inDestination>
52            <u:address>http://localhost:9000/service/SimpleStockQuoteService</u:address>
53        </u:inDestination>
55        <u:outSequence>
56            <u:java import="org.springframework.jdbc.core.simple.*;"><![CDATA[
58                // resume the transaction suspended-this is the JTA txn that read the JMS message
59                msg.resumeTransaction();
60                // load message properties saved from the request
61                String symbol= (String)  msg.getMessageProperty("symbol");
62                int quantity = (Integer) msg.getMessageProperty("quantity");
63                double price = (Double)  msg.getMessageProperty("price");
64                // read quoted price from the response received
65                double quotedPrice = Double.parseDouble(
66                    mediation.extractAsStringUsingXPath(msg, "//last", null));
68                // insert into ORDERS_ACCEPTED table - note, we are now within the resumed JTA txn
69                SimpleJdbcTemplate t = new SimpleJdbcTemplate(mediation.getDataSource("txDS"));
70                t.update("insert into ORDERS_ACCEPTED values(?,?,?)",
71                    new Object[] {symbol, quantity, quotedPrice});
73                // now, if the price quoted is less than the asking price, commit JTA transaction,
74                // and make the update to the database and JMS message read persist, or rollback
75                // everything and attempt again
76                if (quotedPrice < price) {
77          "Placing order at : " +
78                        quotedPrice + " for : " + quantity + " of : " + symbol);
79                    msg.commitTransaction();
80                } else {
81          "Cancelling order as quoted price for "
82                        + symbol + " was : " + quotedPrice);
83                    msg.rollbackTransaction();
84                }
85            ]]></u:java>
86        </u:outSequence>
87    </u:target>

Note: The above configuration can be edited via your preferred IDE - such as IntelliJ IDEA, Eclipse or Netbeans. For example, the free and open source community edition of IntelliJ IDEA provides intelligent context aware pop-ups etc. to ease the development of a configuration. Configurations can be easily tested and step-through debugged within the IDE environment if required.

Deploying example # 702 over JBoss Application Server 5.1.0.GA

Setting up a Derby database

We will use the JDK 1.6.x built-in database Apache Derby / JavaDB for this example, and this maybe substituted with a database of choice if required. The following commands assume the Sun JDK 1.6.x is installed at /opt/jdk along with the JavaDB at /opt/jdk/db

  1. Copy the Derby client JAR to the JBoss common library path - assuming JBoss 5.1.0 GA is installed at ~/java/jboss-5.1.0.GA

    asankha@asankha:/opt/jdk/db/lib$ cp derbyclient.jar ~/java/jboss-5.1.0.GA/common/lib
  2. Start the Derby network server default configuration (i.e. on port 1527)

    asankha@asankha:/opt/jdk/db/bin$ ./startNetworkServer
    Security manager installed using the Basic server security policy.
    Apache Derby Network Server - - (689064) started and ready to accept connections on port 1527 at 2010-05-02 15:50:04.878 GMT
  3. To create a new database called "database", run the command line utility "ij" as follows:

    asankha@asankha:/opt/jdk1.6.0_18/db/bin$ ./ij
    ij version 10.4
    ij> connect 'jdbc:derby://localhost:1527/database;create=true;user=admin;password=admin';
  4. From within the "ij" environment, execute the following to create the two tables.

    Creation of the Database Schema
            SYMBOL VARCHAR(25),
            PRICE DOUBLE
            MESSAGE_ID VARCHAR(40),
            SYMBOL VARCHAR(25),
            PRICE DOUBLE

Setting up the JBoss JMS with the UltraESB, JMS Queues and Datasources

  1. Extract the UltraESB from the Zip distribution and copy all *.jar files from its lib directory to the webapp/WEB-INF/lib and delete the jta-xx.jar - Note: always skip optional/xercesImpl-x.x.x.jar, samples/servlet-api-x.x.jar, and the JARs from the 'lib/endorsed' sub directory (Refer to [1]) if copying other JAR files

    asankha@asankha:~/java/ultraesb/lib$ mkdir ~/java/ultraesb/webapp/WEB-INF/lib
    asankha@asankha:~/java/ultraesb/lib$ cp \   *.jar ~/java/ultraesb/webapp/WEB-INF/lib
    asankha@asankha:~/java/ultraesb/lib$ rm ~/java/ultraesb/webapp/WEB-INF/lib/jta-*.jar
  2. Copy or move the webapp directory into the JBoss default/deploy directory as ultraesb.war

    asankha@asankha:~/java/ultraesb$ mv webapp/ ~/java/jboss-5.1.0.GA/server/default/deploy/ultraesb.war
  3. Copy the DatabaseNonXA-ds.xml, DatabaseXA-ds.xml and JmsXA-service.xml configuration files from <ULTRA_HOME>/samples/resources/misc to the JBoss server deploy directory. This will create create the JBoss resources used - i.e. the JMS Queue, and a XA and non-XA Datasource (Note: We will use the default JmsXA connection factory from the 'default' JBoss server config).

    asankha@asankha:~/java/ultraesb/samples/resources/misc$ cp DatabaseNonXA-ds.xml ~/java/jboss-5.1.0.GA/server/default/deploy
    asankha@asankha:~/java/ultraesb/samples/resources/misc$ cp DatabaseXA-ds.xml ~/java/jboss-5.1.0.GA/server/default/deploy
    asankha@asankha:~/java/ultraesb/samples/resources/misc$ cp JmsXA-service.xml ~/java/jboss-5.1.0.GA/server/default/deploy
  4. Copy the Sample # 702 as the configuration file to be loaded from within the JBoss deployed UltraESB. If you are using a version prior to v1.7.0, look for the bean definition "fileCache" and change its first constructor argument from "tmp" to "/tmp" as an absolute directory is required when deploying within an application server.

    asankha@asankha:~/java/ultraesb/samples/conf$ cp ultra-sample-702.xml ~/java/jboss-5.1.0.GA/server/default/deploy/ultraesb.war/WEB-INF/classes/conf/ultra-root.xml
  5. Edit the JBoss start script to pass a System property 'ultra.home' to point to the UltraESB home. (Refer [1])

    JAVA_OPTS="$JAVA_OPTS -Dultra.home=/home/asankha/java/ultraesb"
  6. Start JBoss with the bin/ script, and observe that no errors are displayed at startup and that the UltraESB starts with the following message

    21:45:00,888 INFO [STDOUT] 2010-05-02 21:45:00,888 [-] [main] INFO ConfigurationImpl UltraESB/x.x.x (GA) started with root configuration..

Trying out the Sample

Running the Sample SOAP Service
  1. Copy the JBoss client JARs from ~/java/jboss-5.1.0.GA/client directory to the UltraESB lib/custom directory, and delete the jaxws-* JARs

    asankha@asankha:/home/asankha/java/jboss-5.1.0.GA/client$ cp * ~/java/ultraesb/lib/custom
    asankha@asankha:~/java/ultraesb/lib/custom$ rm jaxws-*
  2. Start the AdroitLogic ToolBox as <ULTRA_HOME>/bin/ and start the Sample Jetty Server (File→New→JettyServer), and start the default server on port 9000. This will host the SimpleStockQuoteService against which SOAP requests will be made

  3. Start a JMS client (File→New→JMS Client) and select JBoss as the JMS provider for the JNDI properties. Specify "queue/OrderQueue" as the JNDI name of the destination, and "XAConnectionFactory" as the JNDI name of the connection factory. Next issue a Text message with a payload as "<symbol> <quantity> <price>". e.g. "ADROIT 25000 89.50" and press send.

  4. Observe the log messages printed on the JBoss console as follows:

    Creation of the Database Schema
    22:11:15,347 INFO [STDOUT] 2010-05-02 22:11:15,347 [-] [primary-1] INFO sequence New order message received : ADROIT 25000 89.50
    22:11:16,262 INFO [STDOUT] 2010-05-02 22:11:16,260 [-] [primary-2] INFO sequence Cancelling order as quoted price for ADROIT was : 95.44758461644089
    22:11:17,285 INFO [STDOUT] 2010-05-02 22:11:17,285 [-] [primary-1] INFO sequence New order message received : ADROIT 25000 89.50
    22:11:17,322 INFO [STDOUT] 2010-05-02 22:11:17,321 [-] [primary-3] INFO sequence Cancelling order as quoted price for ADROIT was : 148.3290120602293
    22:11:18,329 INFO [STDOUT] 2010-05-02 22:11:18,329 [-] [primary-1] INFO sequence New order message received : ADROIT 25000 89.50
    22:11:18,362 INFO [STDOUT] 2010-05-02 22:11:18,362 [-] [primary-4] INFO sequence Cancelling order as quoted price for ADROIT was : 155.828062335257
    22:11:19,368 INFO [STDOUT] 2010-05-02 22:11:19,368 [-] [primary-1] INFO sequence New order message received : ADROIT 25000 89.50
    22:11:19,432 INFO [STDOUT] 2010-05-02 22:11:19,432 [-] [primary-5] INFO sequence Placing order at : 71.9998810093243 for : 25000 of : ADROIT
  5. Observe the two tables from "ij" as follows:

    Creation of the Database Schema
    ij> select * from ORDERS_RECEIVED;
    73988e12-1f1f-409b-807f-929956afee1b |ADROIT |25000 |89.5
    3e513faa-93b2-431f-8ea0-5e450f84b852 |ADROIT |25000 |89.5
    229b1c01-08e8-46f9-a4bb-77017f94627c |ADROIT |25000 |89.5
    815d72a8-af62-4bf6-94fc-924fe8036b30 |ADROIT |25000 |89.5
    4 rows selected
    ij> select * from ORDERS_ACCEPTED;
    ADROIT |25000 |71.9998810093243
    1 row selected
  6. Note that the ORDERS_RECEIVED table contains 4 rows, and that the JTA transactions were rolled back 4 times as per the above log messages in this random scenario. However, when a quote of less than 89.50 was finally received on the 5th attempt, the JMS message read committed successfully, and the XA database insert into the ORDERS_ACCEPTED as well. Experiment with more order request messages, and note that a price much less than 100 (say 9.90) will cause the message to be rolled back on each re-attempt until redelivery attempts are exhausted (default 15) and the message is moved to the dead letter queue by JBoss. A price much higher than 100 (say 210.33) will cause the order to be accepted in the first attempt.

In this topic
In this topic
Contact Us