Version: 17.07
Supported Since: 17.01
The AMQP Ingress Connector can be used to obtain messages from remote AMQP server. When using AMQP Ingress Connector, you can use either transactional or non-transactional flow as shown in below sample use cases
In order to use the AMQP Ingress Connector, you must first select the AMQP Connector dependency from the connector list when you are creating an empty Ultra project. If you have already created a project, you can add the AMQP Connector dependency via Component Registry. From Tools menu, select Ultra Studio → Component Registry and from the Connectors list, select the AMQP Connector dependency. |
Processor |
The message received to the Ingress Connector will be emitted through this outport |
On Exception |
The message will be sent to this outport if the Ingress Connector fails to process the message due to some reason |
* marked fields are mandatory
Queue Name * |
Basic |
The name of the queue to receive messages from |
Connection Factory * |
Basic |
The ConnectionFactory to use for obtaining Connections |
Transaction Manager |
Transaction |
The org.adroitlogic.x.base.trp.UltraPlatformTransactionManager instance to be used with the connector |
Channel Transacted |
Transaction |
Flag to indicate that channels created by this component will be transactional. NOTE: It is a MUST to enable this property when you specify a Transaction Manager |
Message Property Converter |
Advanced |
Resource for conversion between Spring AMQP Message Properties and RabbitMQ BasicProperties |
Recovery Interval |
Advanced |
Specify the interval between recovery attempts, in milliseconds. The default is 5000 ms, that is, 5 seconds. |
Receive Timeout |
Advanced |
The time (in milliseconds) that a consumer should wait for data. Default 1000 (1 second). |
Shutdown Timeout |
Advanced |
Specify the phase in which this connector should be started and stopped. The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that. By default this value is Integer.MAX_VALUE meaning that this connector starts as late as possible and stops as soon as possible. |
AMQP Error Handler |
Advanced |
Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default there will be no ErrorHandler so that error-level logging is the only result. |
First you need to specify a Connection Factory to connect to the AMQP Server. For this example let’s use Rabbit MQ and if you have not already installed Rabbit MQ, please follow the Installation Guide
First open the project.xpml file of the project and right click on it (on the XML content of the file). From the context menu, select Resource Template as shown in below figure
After that from the shown dialog box, select RabbitMQ with Transaction template.
Next you need to specify the required parameters as shown below. It is mandatory to specify a bean prefix and for that you can specify any value. Apart from that you need to specify the host and port of the RabbitMQ server.
First Let’s add an AMQP Ingress Connector and as for the source queue specify your queue name. Then, under the connection factory, specify AMQP1-rabbitMQ-CachingConnectionFactory and save the configuration as shown below.
After that, let’s add a console logger to log the payload of the message to the console and successfully complete the flow with a Successful Flow End element as shown below.
Now after creating a run configuration and running the project, you will be able to see that the message has been obtained from the remote AMQP queue.
Now if we replace the Successful Flow End element with an Exceptional Flow End element and run the project, an exception will be thrown and the message flow will fail. But, the original message which we obtained from the remote queue will be lost as well. By using a Transactional flow we can configure the original message to be rolled back to the remote source queue in case of an exception, so that there will be no message loss.
In-order to enable transaction, first select the AMQP1-rabbitMQ-CachingConnectionFactory as the Transaction Manager for the AMQP Ingress Connector and set Channel Transacted as 'true' as shown below.
Next insert a transactional scope as shown below
There are two elements to signify the start and the end of the transactional scope. Transaction is only valid within these two elements. It is not mandatory to keep the Transaction end element. Like in the above flow, if we remove the Transaction end element, the transaction will be available until the end of the flow. |
Now if you run the flow, you can see that the failed message is rolled back to the original queue and re-tried a finite number of times and moved to a dead-letter queue.