Version: 17.07
Supported Since: 17.07.1
The Kafka Ingress Connector allows you to consume messages from a remote Kafka topic and inject those messages to the UltraESB engine.
In order to use the Kafka Ingress Connector, you must first select the Kafka Connector dependency from the connector list when you are creating an empty Ultra project. If you have already created a project, you can add the Kafka Connector dependency via Component Registry. From Tools menu, select Ultra Studio → Component Registry and from the Connectors list, select the Kafka Connector dependency. |
Processor |
The message received to the Ingress Connector will be emitted through this out port |
On Exception |
The message will be sent to this out port if the Ingress Connector fails to process the message due to some reason |
* marked fields are mandatory
Bootstrap Server * |
Basic |
A comma separated list of bootstrap server hostname:port pairs for the consumer(s) to connect to Kafka |
Topic Name * |
Basic |
The name of the topic from which the messages are read/consumed |
Group Id |
Basic |
The consumer group id of the Kafka consumer |
Client Id |
Basic |
An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging |
Consumer Count |
Advanced |
Number of concurrent consumer threads. Default is 1, using single consumer. |
Commit Automatically |
Advanced |
Whether offsets are committed automatically with a frequency controlled by the Auto Commit Interval. Default is true |
Auto Commit Interval |
Advanced |
The interval in milliseconds at which the offsets are committed, if the Auto Commits are enabled. Default is 5000 |
Auto Offset Reset |
Advanced |
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted). Default is LATEST and the other options are as follows.
|
Exclude Internal Topics |
Advanced |
Whether records from internal topics (such as offsets) should be exposed to the consumer. If set to true the only way to receive records from an internal topic is subscribing to it. Default is true. |
Isolation Level |
Advanced |
Controls how to read messages written transactionally. Default is READ_UNCOMMITTED and the other options are as follows.
Non-transactional messages are received in both the modes |
Maximum Poll Interval |
Advanced |
The maximum delay in milliseconds between invocations of poll when using consumer group management. Default is 300000 |
Maximum Poll Records |
Advanced |
The maximum number of records returned in a single call to poll(). Default is 500 |
Partition Assignment Strategy |
Advanced |
The class name of the partition assignment strategy that the client will use to distribute partition ownership amongst consumer instances when group management is used. Default is class org.apache.kafka.clients.consumer.RangeAssignor |
Automatically Check CRC32 |
Advanced |
Automatically check the CRC32 of the records consumed. This ensures no on-the-wire or on-disk corruption to the messages occurred. This check adds some overhead, so it may be disabled in cases seeking extreme performance. Default is true |
Interceptor Classes |
Advanced |
A comma separated list of classes to use as interceptors. Implementing the ConsumerInterceptor interface allows you to intercept (and possibly mutate) records received by the consumer |
Metadata Maximum Age |
Advanced |
The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions. Default is 300000 |
Kerberos Service Name |
Kerberos |
The Kerberos principal name that Kafka runs as |
Kerberos KInit Command |
Kerberos |
Kerberos kinit command path. Default is /usr/bin/kinit |
Kerberos Minimum Time Before Re-login |
Kerberos |
Login thread sleep time in milliseconds between refresh attempts. Default is 60000 |
Kerberos Ticket Renew Jitter |
Kerberos |
Percentage of random jitter added to the renewal time. Default is 0.05 |
Kerberos Ticket Renew Window Factor |
Kerberos |
Login thread will sleep until the specified window factor of time from last refresh to ticket’s expiry has been reached, at which time it will try to renew the ticket. Default is 0.8 |
Metrics Reporters |
Metrics |
A comma separated list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics |
Sample Count |
Metrics |
The number of samples maintained to compute metrics. Default is 2 |
Recording Level |
Metrics |
The highest recording level for metrics. Default is INFO and following are the available options.
|
Sample Window |
Metrics |
The window of time in milliseconds a metrics sample is computed over. Default is 30000 |
Session Timeout |
Network |
The session timeout of the Kafka connection in milliseconds. Default is 10000 |
Request Timeout |
Network |
The configuration controls the maximum amount of time in milliseconds the client will wait for the response of a request. Default is 305000 |
Heart Beat Interval |
Network |
The expected time in milliseconds between heartbeats to the consumer coordinator when using Kafka’s group management facilities. Default is 3000 |
Connection Maximum Idle Time |
Network |
Close idle connections after the number of milliseconds specified by this config. Default is 540000 |
Minimum Fetch Bytes |
Network |
The minimum amount of data the server should return for a fetch request. If insufficient data is available. the request will wait for that much data to accumulate before answering the request. Default is 1 |
Maximum Fetch Bytes |
Network |
The maximum amount of data the server should return for a fetch request. Default is 52428800 |
Maximum Bytes Fetch per Partition |
Network |
The maximum amount of data per-partition the server will return. Default is 1048576 |
Receive Buffer Size |
Network |
The size of the TCP receive buffer (SO_RCVBUF) in bytes to use when reading data. If the value is -1, the OS default will be used. Default is 65536 |
Send Buffer Size |
Network |
The size of the TCP send buffer (SO_SNDBUF) in bytes to use when sending data. If the value is -1, the OS default will be used. Default is 131072 |
Fetch Maximum Wait Time |
Network |
The maximum amount of time in milliseconds the server will block before answering the fetch request if there isn’t sufficient data to immediately satisfy the requirement given by fetch.min.bytes. Default is 500 |
Reconnect Back-Off |
Network |
The base amount of time in milliseconds to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker. Default is 50 |
Maximum Reconnect Back-Off |
Network |
The maximum amount of time in milliseconds to wait when re-connecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms. Default is 1000 |
Retry Back-Off |
Network |
The amount of time in milliseconds to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. Default is 100 |
Trust Keystore Location |
SSL |
The location of the trust store file used for two-way authentication for client |
Trust Keystore Password |
SSL |
The password for the trust store file |
Trust Keystore Type |
SSL |
The file format of the trust store file. Default is JKS |
Identity Keystore Location |
SSL |
The location of the key store file used for two-way authentication for client |
Identity Keystore Password |
SSL |
The store password for the key store file. This is only needed if Identity Keystore Location is configured |
Identity Keystore Type |
SSL |
The file format of the key store file. Default is JKS |
Identity Key Password |
SSL |
The password of the private key in the key store file |
Enabled SSL Protocols |
SSL |
A comma separated list of protocols enabled for SSL connections from TLSv1, TLSv1.1 and TLSv1.2, SSLv1, SSLv2 and SSLv3. Default is TLSv1.2,TLSv1.1,TLSv1 |
SSL Protocol |
SSL |
The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities |
SSL Provider |
SSL |
The name of the security provider used for SSL connections. Default value is the default security provider of the JVM |
SSL Cipher Suites |
SSL |
A comma separated list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported |
Endpoint Identification Algorithm |
SSL |
The endpoint identification algorithm to validate server hostname using server certificate |
Keymanager Algorithm |
SSL |
The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine (SunX509) |
Trustmanager Algorithm |
SSL |
The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine (PKIX) |
Secure Random Implementation |
SSL |
The SecureRandom PRNG implementation to use for SSL cryptography operations |
Security Protocol |
Security |
Protocol used to communicate with brokers. Default is PLAINTEXT and the other options are as follows
|
SASL Mechanism |
Security |
SASL mechanism used for client connections. This may be any mechanism for which a security provider is available. Default is GSSAPI |
JAAS Config |
Security |
JAAS login context parameters for SASL connections in the format used by JAAS configuration files |
You need to have Kafka and ZooKeeper up and running to proceed with this example using Kafka Ingress Connector. If you have not already installed Kafka, please follow the steps 1 to 3 of the quickstart guide.
First, let’s create our integration flow for this example and add a Kafka IngressConnector to that, and as for the bootstrap servers, specify the comma separated list of bootstrap servers and under topic name, specify the topic name and save the configuration.
After that, we’ll add a Logger Element to log the payload of the message to the console. Next include a Successful Flow End element and connect it to the Logger Element to complete the flow as shown below.
Now after creating a run configuration and running the project, the integration flow is up and ready. The step 4 of the Kafka quickstart guide explains how to send messages with its command line client. If you follow that, you will be able to see that the message has been consumed from the remote Kafka topic and logged into the console as follows.