amqp ingress connector

AMQP Ingress Connector

Version: 17.07

Supported Since: 17.01

What is AMQP Ingress Connector?

The AMQP Ingress Connector can be used to obtain messages from remote AMQP server. When using AMQP Ingress Connector, you can use either transactional or non-transactional flow as shown in below sample use cases

In order to use the AMQP Ingress Connector, you must first select the AMQP Connector dependency from the connector list when you are creating an empty Ultra project. If you have already created a project, you can add the AMQP Connector dependency via Component Registry. From Tools menu, select Ultra Studio → Component Registry and from the Connectors list, select the AMQP Connector dependency.
amqp ingress connector 7

Out Ports

Processor

The message received to the Ingress Connector will be emitted through this outport

On Exception

The message will be sent to this outport if the Ingress Connector fails to process the message due to some reason

Parameters

* marked fields are mandatory

Queue Name *

Basic

The name of the queue to receive messages from

Connection Factory *

Basic

The ConnectionFactory to use for obtaining Connections

Transaction Manager

Transaction

The org.adroitlogic.x.base.trp.UltraPlatformTransactionManager instance to be used with the connector

Channel Transacted

Transaction

Flag to indicate that channels created by this component will be transactional. NOTE: It is a MUST to enable this property when you specify a Transaction Manager

Message Property Converter

Advanced

Resource for conversion between Spring AMQP Message Properties and RabbitMQ BasicProperties

Recovery Interval

Advanced

Specify the interval between recovery attempts, in milliseconds. The default is 5000 ms, that is, 5 seconds.

Receive Timeout

Advanced

The time (in milliseconds) that a consumer should wait for data. Default 1000 (1 second).

Shutdown Timeout

Advanced

Specify the phase in which this connector should be started and stopped. The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that. By default this value is Integer.MAX_VALUE meaning that this connector starts as late as possible and stops as soon as possible.

AMQP Error Handler

Advanced

Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default there will be no ErrorHandler so that error-level logging is the only result.

Sample Use case

First you need to specify a Connection Factory to connect to the AMQP Server. For this example let’s use Rabbit MQ and if you have not already installed Rabbit MQ, please follow the Installation Guide

First open the project.xpml file of the project and right click on it (on the XML content of the file). From the context menu, select Resource Template as shown in below figure

amqp egress connector 1

After that from the shown dialog box, select RabbitMQ with Transaction template.

amqp egress connector 2

Next you need to specify the required parameters as shown below. It is mandatory to specify a bean prefix and for that you can specify any value. Apart from that you need to specify the host and port of the RabbitMQ server.

amqp egress connector 3
1. Obtaining a message from AMQP queue

First Let’s add an AMQP Ingress Connector and as for the source queue specify your queue name. Then, under the connection factory, specify AMQP1-rabbitMQ-CachingConnectionFactory and save the configuration as shown below.

amqp ingress connector 3

After that, let’s add a console logger to log the payload of the message to the console and successfully complete the flow with a Successful Flow End element as shown below.

amqp ingress connector 4

Now after creating a run configuration and running the project, you will be able to see that the message has been obtained from the remote AMQP queue.

2. Obtaining a message from a AMQP queue with transactions

Now if we replace the Successful Flow End element with an Exceptional Flow End element and run the project, an exception will be thrown and the message flow will fail. But, the original message which we obtained from the remote queue will be lost as well. By using a Transactional flow we can configure the original message to be rolled back to the remote source queue in case of an exception, so that there will be no message loss.

In-order to enable transaction, first select the AMQP1-rabbitMQ-CachingConnectionFactory as the Transaction Manager for the AMQP Ingress Connector and set Channel Transacted as 'true' as shown below.