amqp ingress connector

AMQP Ingress Connector

Version: 17.01

Supported Since: 17.01

What is AMQP Ingress Connector?

The AMQP Ingress Connector can be used to obtain messages from remote AMQP server. When using AMQP Ingress Connector, you can use either transactional or non-transactional flow as shown in below sample use cases

Sample Use case

In order to use the AMQP Ingress Connector, you must first select the "AMQP Connector" dependency from the connector list when you are creating an empty Ultra project. If you have already created a project, you can add the "AMQP Connector" dependency via Component Registry. From Tools menu, select Ultra Studio → Component Registry and from the Connectors list, select the "AMQP Connector" dependency.

After that you need to specify a Connection Factory to connect to the AMQP Server. For this example let’s use Rabbit MQ and if you have not already installed Rabbit MQ, please follow the Installation Guide

Next, after opening the project.xpml file in your project, under Design view there is a dropdown button named "Custom Template" and under that you will see an item named "RabbitMQ with Transactions" as shown below.

amqp ingress connector 1

Select that and fill the required parameters in the pop-up modal and save it. Now you can see that all the minimally required resources for the AMQP connector are added to the project.xpml

amqp ingress connector 2
1. Obtaining a message from AMQP queue

First Let’s add an AMQP Ingress Connector and as for the source queue specify your queue name. Then, under the connection factory, specify "rabbitMQ-CachingConnectionFactory" and save the configuration as shown below.

amqp ingress connector 3

After that, let’s add a console logger to log the payload of the message to the console and successfully complete the flow with a “Successful Flow End” element as shown below.

amqp ingress connector 4

Now after creating a run configuration [link to creating a run configuration] and running the project, you will be able to see that the message has been obtained from the remote AMQP queue.

2. Obtaining a message from a AMQP queue with transactions

Now if we replace the "Successful Flow End" element with an "Exceptional Flow End" element and run the project, an exception will be thrown and the message flow will fail. But, the original message which we obtained from the remote queue will be lost as well. By using a Transactional flow we can configure the original message to be rolledback to the remote source queue in case of an exception, so that there will be no message loss.

In-order to enable transaction, first select the “rabbitMQ-UltraTxnManager” as the Transaction Manager for the AMQP Ingress Connector and set Channel Transacted as 'true' as shown below.

amqp ingress connector 5

Next insert a transactional scope as shown below

amqp ingress connector 6
There are two elements to signify the start and the end of the transactional scope. Transaction is only valid within these two elements. It is not mandatory to keep the Transaction end element. Like in the above flow, if we remove the Transaction end element, the transaction will be available until the end of the flow.

Now if you run the flow, you can see that the failed message is rolled back to the original queue and re-tried a finite number of times and moved to a dead-letter queue.

Out Ports

Processor

The message received to the Ingress Connector will be emitted through this outport

On Exception

The message will be sent to this outport if the Ingress Connector fails to process the message due to some reason

Parameters

* marked fields are mandatory

Queue Name *

Basic

The name of the queue to receive messages from

Connection Factory *

Basic

The ConnectionFactory to use for obtaining Connections

Transaction Manager

Transaction

The org.adroitlogic.x.base.trp.UltraPlatformTransactionManager instance to be used with the connector

Channel Transacted

Transaction

Flag to indicate that channels created by this component will be transactional. NOTE: It is a MUST to enable this property when you specify a Transaction Manager

Message Property Converter

Advanced

Resource for conversion between Spring AMQP Message Properties and RabbitMQ BasicProperties

Recovery Interval

Advanced

Specify the interval between recovery attempts, in milliseconds. The default is 5000 ms, that is, 5 seconds.

Receive Timeout

Advanced

The time (in milliseconds) that a consumer should wait for data. Default 1000 (1 second).

Shutdown Timeout

Advanced

Specify the phase in which this connector should be started and stopped. The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that. By default this value is Integer.MAX_VALUE meaning that this connector starts as late as possible and stops as soon as possible.

AMQP Error Handler

Advanced

Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default there will be no ErrorHandler so that error-level logging is the only result.

In this topic
In this topic
Contact Us