sqs ingress connector

SQS Ingress Connector

Version: 17.07

Supported Since: 17.07

What is SQS Ingress Connector?

SQS Ingress Connector can be used to pull SQS messages from Amazon AWS SQS queues. The default behaviour of this connector is that it will pull messages from the SQS queue and persist temporarily in the local file system and inject it to the UltraESB-X engine. After successfully completing the integration flow, it will then delete the original SQS message from the queue.

AWS SQS credentials associated with an account which has write permission to the source queue will be necessary when configuring this connector.
In order to use the SQS Ingress Connector, you must first select the AWS Connector dependency from the connector list when you are creating an empty Ultra project. If you have already created a project, you can add the AWS Connector dependency via Component Registry. From Tools menu, select Ultra Studio → Component Registry and from the Connectors list, select the AWS Connector dependency.
sqs ingress ports

Out Ports

Processor

The message will be emitted from this out port after a message has been obtained from the SQS queue. The payload of the message will be the message which has been pulled from the SQS queue.

On Exception

The message will be emitted from this out port if the processing element fails to prepare a message from the object pulled from the SQS queue.

Parameters

* marked fields are mandatory

SQS Region *

Basic

The region of the source SQS queue

Temporary buffer location *

Basic

Location in local file system to temporarily store the message which has been pulled from the SQS queue

Url of the Source queue *

Basic

URL of the AWS SQS queue, which is located in the configured AWS SQS region.

Use profile Credentials *

Basic

If selected, profile credentials will be used, credentials will be picked from ~/.aws/credentials (Linux/Mac) or C:\Users\USER_NAME\.aws\credentials (Windows)

AWS Access Key Id

Basic

AWS Access Key Id is required only if profile credentials are not going to be used.

AWS Access Secret Key

Basic

AWS Access Secret Key is required only if profile credentials are not going to be used.

Visibility timeout

Basic

The duration (in seconds) that the received messages are hidden from subsequent retrieve requests after being retrieved by UltraESB.

Wait time

Basic

The duration (in seconds) for which the call waits for a message to arrive in the queue before returning. If a message is available, the call returns sooner than Wait time.

Scheduler Configuration

Scheduling

Bean reference of the scheduler configuration bean which should be declared as a resource in the project.xpml file. By default there is internal scheduler configuration within the framework which will be shared by all the polling connectors. If you need to configure higher level of concurrent processing threads which will fetch the messages from the SQS queue, you can configure your own thread pool configuration while declaring the parameters of the scheduler configuration bean as below.

<x:resources>
    <x:resource id="custom-scheduler-config">
        <bean id="schedulerConfigBean" class="org.adroitlogic.x.base.trp.SchedulerConfig">
            <constructor-arg name="name" value="my-custom-scheduler"/>
            <property name="schedulerThreadCount" value="4"/>
            <property name="pollingThreadCoreSize" value="4"/>
            <property name="pollingThreadMaxSize" value="10"/>
            <property name="pollingQueueSize" value="25"/>
            <property name="pollingKeepAliveTime" value="5000"/>
        </bean>
    </x:resource>
</x:resources>

In this configuration,

  • schedulerThreadCount - is the number of threads which will be used to schedule the number of threads to be used to handle scheduled polling tasks in this SQS Ingress Connector. Generally this value should be smaller value since polling threads don’t do heavy task in this, rather it’s just initiating the polling task for the scheduling iteration and handover the file fetching and processing task to a separate executor service. This executor service can be configured by using next four parameters of above scheduling configuration bean.

  • pollingThreadCoreSize - is the core size of the SQS message fetching thread pool

  • pollingThreadMaxSize - is the maximum number of threads of the SQS message fetching thread pool

  • pollingQueueSize - is the queue size of the SQS message fetching thread pool

  • pollingKeepAliveTime - is the keep alive time of the SQS message fetching thread pool

Polling Cron Expression

Scheduling

Cron expression for the SQS message polling schedule. Cron expression should be a valid Quartz cron expression since the Framework is underneath using Quartz to extract the schedule from the cron expression.

Polling Start Delay

Scheduling

Delay in milliseconds to start the polling schedule. Any iteration which comes within this time period from the startup time of the framework, won’t be considered as a valid file polling iteration.

Polling Repeat Interval

Scheduling

Interval in milliseconds for the next iteration of the polling schedule. This will be considered if there isn’t a configured cron schedule.

Polling Repeat Count

Scheduling

Number of iterations which should go through the polling schedule. If this is set to 1 which means only the first iteration of the polling schedule will be considered as a valid file polling iteration and all other iterations of the schedule will be ignored. By default, this value is set to -1 which means it will consider all the iterations of the polling schedule as a valid iteration.

Concurrent Polling Count

Scheduling

Maximum number of concurrent threads which can be used to poll the configured SQS queue to fetch messages. By default, this value is 1.

Concurrent Execution Count

Scheduling

Maximum number of concurrent threads which can be used to process the fetched objects from the SQS queue. By default, this value is 4.

Sample Use Case

sqs ingress sample 1
Figure 1. Integration flow to pull MP3s from SQS

In this scenario, assume KarokeMagic is a company which provides an online tool to remove vocals from mp3 files. Since this process has to run complex - resource consuming algorithms on mp3 files, company decides to enqueue all user requests and process each request leisurely using their high performance processing nodes cluster. After processing a request, the node which performed the processing sends a notification to the user via an email. Requirements of this setup are,

  • Cluster should not process same mp3 file twice.

  • Cluster should process mp3 files in the order they were enqueued (FIFO)

sqs ingress sample config
Figure 2. Configuration of SQS ingress connector to pull MP3s

According to this configuration, SQS ingress connector will temporarily buffer incoming mp3 files at /tmp before injecting them to the UltraESB. Since visibility timeout has set to 300 seconds, once a node in the cluster pull a mp3 file, it will not be visible to the peers for 6 minutes. During this time period, the 2nd processing element (MP3 processor) is expected to remove vocals and send the notification to the user via an email.

Product and company names and marks mentioned are the property of their respective owners and are mentioned for identification purposes only.

In this topic
In this topic
Contact Us