Version: 17.01
Supported Since: 17.01
The AMQP Ingress Connector can be used to obtain messages from remote AMQP server. When using AMQP Ingress Connector, you can use either transactional or non-transactional flow as shown in below sample use cases
In order to use the AMQP Ingress Connector, you must first select the "AMQP Connector" dependency from the connector list when you are creating an empty Ultra project. If you have already created a project, you can add the "AMQP Connector" dependency via Component Registry. From Tools menu, select Ultra Studio → Component Registry and from the Connectors list, select the "AMQP Connector" dependency.
After that you need to specify a Connection Factory to connect to the AMQP Server. For this example let’s use Rabbit MQ and if you have not already installed Rabbit MQ, please follow the Installation Guide
Next, after opening the project.xpml file in your project, under Design view there is a dropdown button named "Custom Template" and under that you will see an item named "RabbitMQ with Transactions" as shown below.
Select that and fill the required parameters in the pop-up modal and save it. Now you can see that all the minimally required resources for the AMQP connector are added to the project.xpml
First Let’s add an AMQP Ingress Connector and as for the source queue specify your queue name. Then, under the connection factory, specify "rabbitMQ-CachingConnectionFactory" and save the configuration as shown below.
After that, let’s add a console logger to log the payload of the message to the console and successfully complete the flow with a “Successful Flow End” element as shown below.
Now after creating a run configuration [link to creating a run configuration] and running the project, you will be able to see that the message has been obtained from the remote AMQP queue.
Now if we replace the "Successful Flow End" element with an "Exceptional Flow End" element and run the project, an exception will be thrown and the message flow will fail. But, the original message which we obtained from the remote queue will be lost as well. By using a Transactional flow we can configure the original message to be rolledback to the remote source queue in case of an exception, so that there will be no message loss.
In-order to enable transaction, first select the “rabbitMQ-UltraTxnManager” as the Transaction Manager for the AMQP Ingress Connector and set Channel Transacted as 'true' as shown below.
Next insert a transactional scope as shown below
There are two elements to signify the start and the end of the transactional scope. Transaction is only valid within these two elements. It is not mandatory to keep the Transaction end element. Like in the above flow, if we remove the Transaction end element, the transaction will be available until the end of the flow. |
Now if you run the flow, you can see that the failed message is rolled back to the original queue and re-tried a finite number of times and moved to a dead-letter queue.
Processor |
The message received to the Ingress Connector will be emitted through this outport |
On Exception |
The message will be sent to this outport if the Ingress Connector fails to process the message due to some reason |
* marked fields are mandatory
Queue Name * |
Basic |
The name of the queue to receive messages from |
Connection Factory * |
Basic |
The ConnectionFactory to use for obtaining Connections |
Transaction Manager |
Transaction |
The org.adroitlogic.x.base.trp.UltraPlatformTransactionManager instance to be used with the connector |
Channel Transacted |
Transaction |
Flag to indicate that channels created by this component will be transactional. NOTE: It is a MUST to enable this property when you specify a Transaction Manager |
Message Property Converter |
Advanced |
Resource for conversion between Spring AMQP Message Properties and RabbitMQ BasicProperties |
Recovery Interval |
Advanced |
Specify the interval between recovery attempts, in milliseconds. The default is 5000 ms, that is, 5 seconds. |
Receive Timeout |
Advanced |
The time (in milliseconds) that a consumer should wait for data. Default 1000 (1 second). |
Shutdown Timeout |
Advanced |
Specify the phase in which this connector should be started and stopped. The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that. By default this value is Integer.MAX_VALUE meaning that this connector starts as late as possible and stops as soon as possible. |
AMQP Error Handler |
Advanced |
Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default there will be no ErrorHandler so that error-level logging is the only result. |