kafka ingress connector

Kakfa Ingress Connector

Version: 17.07

Supported Since: 17.07.1

What is Kafka Ingress Connector?

The Kafka Ingress Connector allows you to consume messages from a remote Kafka topic and inject those messages to the UltraESB engine.

In order to use the Kafka Ingress Connector, you must first select the Kafka Connector dependency from the connector list when you are creating an empty Ultra project. If you have already created a project, you can add the Kafka Connector dependency via Component Registry. From Tools menu, select Ultra Studio → Component Registry and from the Connectors list, select the Kafka Connector dependency.
kafka ingress connector ports

Out Ports

Processor

The message received to the Ingress Connector will be emitted through this out port

On Exception

The message will be sent to this out port if the Ingress Connector fails to process the message due to some reason

Parameters

* marked fields are mandatory

Bootstrap Server *

Basic

A comma separated list of bootstrap server hostname:port pairs for the consumer(s) to connect to Kafka

Topic Name *

Basic

The name of the topic from which the messages are read/consumed

Group Id

Basic

The consumer group id of the Kafka consumer

Client Id

Basic

An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging

Consumer Count

Advanced

Number of concurrent consumer threads. Default is 1, using single consumer.

Commit Automatically

Advanced

Whether offsets are committed automatically with a frequency controlled by the Auto Commit Interval. Default is true

Auto Commit Interval

Advanced

The interval in milliseconds at which the offsets are committed, if the Auto Commits are enabled. Default is 5000

Auto Offset Reset

Advanced

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted). Default is LATEST and the other options are as follows.

  • EARLIEST - Automatically reset the offset to the earliest offset

  • LATEST - Automatically reset the offset to the latest offset

  • NONE - If no previous offset is found for the consumer’s group, throws exception to the consumer

  • ERROR - Throws exception to the consumer

Exclude Internal Topics

Advanced

Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to true the only way to receive records from an internal topic is subscribing to it. Default is true.

Isolation Level

Advanced

Controls how to read messages written transactionally. Default is READ_UNCOMMITTED and the other options are as follows.

  • READ_COMMITTED - Receives only transactional messages which have been committed

  • READ_UNCOMMITTED - Receives all messages, even transactional messages which have been aborted

Non-transactional messages are received in both the modes

Maximum Poll Interval

Advanced

The maximum delay in milliseconds between invocations of poll when using consumer group management. Default is 300000

Maximum Poll Records

Advanced

The maximum number of records returned in a single call to poll(). Default is 500

Partition Assignment Strategy

Advanced

The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used. Default is class org.apache.kafka.clients.consumer.RangeAssignor

Automatically Check CRC32

Advanced

Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. Default is true

Interceptor Classes

Advanced

A comma separated list of classes to use as interceptors. Implementing the ConsumerInterceptor interface allows you to intercept (and possibly mutate) records received by the consumer

Metadata Maximum Age

Advanced

The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions. Default is 300000

Kerberos Service Name

Kerberos

The Kerberos principal name that Kafka runs as

Kerberos KInit Command

Kerberos

Kerberos kinit command path. Default is /usr/bin/kinit

Kerberos Minimum Time Before Re-login

Kerberos

Login thread sleep time in milliseconds between refresh attempts. Default is 60000

Kerberos Ticket Renew Jitter

Kerberos

Percentage of random jitter added to the renewal time. Default is 0.05

Kerberos Ticket Renew Window Factor

Kerberos

Login thread will sleep until the specified window factor of time from last refresh to ticket’s expiry has been reached, at which time it will try to renew the ticket. Default is 0.8

Metrics Reporters

Metrics

A comma separated list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics

Sample Count

Metrics

The number of samples maintained to compute metrics. Default is 2

Recording Level

Metrics

The highest recording level for metrics. Default is INFO and following are the available options.

  • INFO

  • DEBUG

Sample Window

Metrics

The window of time in milliseconds a metrics sample is computed over. Default is 30000

Session Timeout

Network

The session timeout of the Kafka connection in milliseconds. Default is 10000

Request Timeout

Network

The configuration controls the maximum amount of time in milliseconds the client will wait for the response of a request. Default is 305000

Heart Beat Interval

Network

The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Default is 3000

Connection Maximum Idle Time

Network

Close idle connections after the number of milliseconds specified by this config. Default is 540000

Minimum Fetch Bytes

Network

The minimum amount of data the server should return for a fetch request. If insufficient data is available. the request will wait for that much data to accumulate before answering the request. Default is 1

Maximum Fetch Bytes

Network

The maximum amount of data the server should return for a fetch request. Default is 52428800

Maximum Bytes Fetch per Partition

Network

The maximum amount of data per-partition the server will return. Default is 1048576

Receive Buffer Size

Network

The size of the TCP receive buffer (SO_RCVBUF) in bytes to use when reading data. If the value is -1, the OS default will be used. Default is 65536

Send Buffer Size

Network

The size of the TCP send buffer (SO_SNDBUF) in bytes to use when sending data. If the value is -1, the OS default will be used. Default is 131072

Fetch Maximum Wait Time

Network

The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement given by fetch.min.bytes. Default is 500

Reconnect Back-Off

Network

The base amount of time in milliseconds to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker. Default is 50

Maximum Reconnect Back-Off

Network

The maximum amount of time in milliseconds to wait when re-connecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms. Default is 1000

Retry Back-Off

Network

The amount of time in milliseconds to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. Default is 100

Trust Keystore Location

SSL

The location of the trust store file used for two-way authentication for client

Trust Keystore Password

SSL

The password for the trust store file

Trust Keystore Type

SSL

The file format of the trust store file. Default is JKS

Identity Keystore Location

SSL

The location of the key store file used for two-way authentication for client

Identity Keystore Password

SSL

The store password for the key store file. This is only needed if Identity Keystore Location is configured

Identity Keystore Type

SSL

The file format of the key store file. Default is JKS

Identity Key Password

SSL

The password of the private key in the key store file

Enabled SSL Protocols

SSL

A comma separated list of protocols enabled for SSL connections from TLSv1, TLSv1.1 and TLSv1.2, SSLv1, SSLv2 and SSLv3. Default is TLSv1.2,TLSv1.1,TLSv1

SSL Protocol

SSL

The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities

SSL Provider

SSL

The name of the security provider used for SSL connections. Default value is the default security provider of the JVM

SSL Cipher Suites

SSL

A comma separated list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported

Endpoint Identification Algorithm

SSL

The endpoint identification algorithm to validate server hostname using server certificate

Keymanager Algorithm

SSL

The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine (SunX509)

Trustmanager Algorithm

SSL

The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine (PKIX)

Secure Random Implementation

SSL

The SecureRandom PRNG implementation to use for SSL cryptography operations

Security Protocol

Security

Protocol used to communicate with brokers. Default is PLAINTEXT and the other options are as follows

  • PLAINTEXT

  • SSL

  • SASL_PLAINTEXT

  • SASL_SSL

SASL Mechanism

Security

SASL mechanism used for client connections. This may be any mechanism for which a security provider is available. Default is GSSAPI

JAAS Config

Security

JAAS login context parameters for SASL connections in the format used by JAAS configuration files

Sample Use Case

Prerequisite

You need to have Kafka and ZooKeeper up and running to proceed with this example using Kafka Ingress Connector. If you have not already installed Kafka, please follow the steps 1 to 3 of the quickstart guide.

1. Consuming a record from a Kafka topic

First, let’s create our integration flow for this example and add a Kafka IngressConnector to that, and as for the bootstrap servers, specify the comma separated list of bootstrap servers and under topic name, specify the topic name and save the configuration.

kafka ingress connnector use case 1 kafka ingress connector configuration

After that, we’ll add a Logger Element to log the payload of the message to the console. Next include a Successful Flow End element and connect it to the Logger Element to complete the flow as shown below.

kafka ingress connnector use case 1 complete flow

Now after creating a run configuration and running the project, the integration flow is up and ready. The step 4 of the Kafka quickstart guide explains how to send messages with its command line client. If you follow that, you will be able to see that the message has been consumed from the remote Kafka topic and logged into the console as follows.

kafka ingress connnector use case 1 output
In this topic
In this topic
Contact Us