Version: 17.07
Supported Since: 17.07.1
The Kafka Egress Connector allows you to asynchronously publish messages to a remote Kafka topic and get a hold of record metadata returned.
In order to use the Kafka Egress Connector, you must first select the Kafka Connector dependency from the connector list when you are creating an empty Ultra project. If you have already created a project, you can add the Kafka Connector dependency via Component Registry. From Tools menu, select Ultra Studio → Component Registry and from the Connectors list, select the Kafka Connector dependency. |
On Exception |
The message will be sent to this out port if the Egress Connector failed to process the message due to some reason |
Response Processor |
The message will be sent to this out port if the message has been published to the Kafka destination successfully |
Connector Operation |
This port is used to connect operational elements to the Egress Connector. By-default, user does not have to connect any operational element and the default connector operation will be used. |
* marked fields are mandatory
Bootstrap Server * |
Basic |
A comma separated list of bootstrap server hostname:port pairs for the producer to connect to Kafka |
Topic Name * |
Basic |
The name of the topic to which the messages are sent/published |
Key Type |
Basic |
The type of the key to be used for sending. Default is SEQUENTIAL and all the options available are as follows
|
Key Prefix |
Basic |
This will be prepended to the auto generated sequence number to determine the key |
Key Expression * |
Basic |
Expression out of which the key will be derived |
Key Value * |
Basic |
Static key value to be used to send the message to the desired partition of the topic |
Include Headers |
Basic |
Whether to include headers of the message into the Kafka message sent |
Send Asynchronously |
Basic |
Whether the sending mode is asynchronous, if disabled waits for the ACKs to complete the send |
Acknowledgements |
Basic |
Acknowledgements config controls the criteria under which requests are considered complete. Default value is ALL and all the options available are as follows.
|
Retry Count |
Basic |
Number of retries if the request fails, specifying retries as 0 makes it not retry any message failure. Enabling retries also opens up the possibility of duplicates |
Client Id |
Basic |
An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging |
Transactional Id Value |
Basic |
The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions |
Idempotency |
Basic |
When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. This is set to 'false' by default. Note that enabling idempotence requires max.in.flight.requests.per.connection to be set to 1 and retries cannot be zero. Additionally acks must be set to 'all'. If these values are left at their defaults, we will override the default to be suitable. If the values are set to something incompatible with the idempotent producer, a ConfigException will be thrown |
Buffer Memory |
Advanced |
This specifies the total amount of memory available to the producer for buffering. Default value is 33554432 |
Egress Timeout |
Advanced |
Timeout in milliseconds after which the egress message timeouts |
Weight |
Advanced |
Weight of the Egress endpoint when used with a Load Balancer |
Compression Type |
Advanced |
The compression type for all data generated by the producer. The default is none (i.e. no compression). Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression). All the options available are as follows
|
Batch Size |
Advanced |
Buffer size to batch requests on a per partition basis. Default is 16384 |
Linger in Milliseconds |
Advanced |
The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However, in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay—that is, rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle’s algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay) |
Maximum Block in Milliseconds |
Advanced |
The threshold for time to block for sending, after which it throws a TimeoutException |
Interceptor Classes |
Advanced |
A list of classes to use as interceptors. Implementing the ProducerInterceptor interface allows you to intercept (and possibly mutate) records received by the producer |
Metadata Max Age |
Advanced |
The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions |
Partitioner Class |
Advanced |
Partitioner class that implements the Partitioner interface. Default value is org.apache.kafka.clients.producer.internals.DefaultPartitioner |
Kerberos Service Name |
Kerberos |
The Kerberos principal name that Kafka runs as |
Kerberos KInit Command |
Kerberos |
Kerberos kinit command path. Default is /usr/bin/kinit |
Kerberos Minimum Time Before Re-login |
Kerberos |
Login thread sleep time in milliseconds between refresh attempts |
Kerberos Ticket Renew Jitter |
Kerberos |
Percentage of random jitter added to the renewal time. Default is 0.05 |
Kerberos Ticket Renew Window Factor |
Kerberos |
Login thread will sleep until the specified window factor of time from last refresh to ticket’s expiry has been reached, at which time it will try to renew the ticket. Default is 0.8 |
Metrics Reporters |
Metrics |
A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics |
Sample Count |
Metrics |
The number of samples maintained to compute metrics. Default is 2 |
Recording Level |
Metrics |
The highest recording level for metrics. Default is INFO and all the options available are as follows
|
Sample Window |
Metrics |
The window of time a metrics sample is computed over. Default 30000 |
Request Timeout |
Network |
The configuration controls the maximum amount of time in milliseconds the client will wait for the response of a request. Default is 30000 |
Transaction Timeout |
Network |
The maximum amount of time in milliseconds that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. Default is 60000 |
Connection Maximum Idle Time |
Network |
Close idle connections after the number of milliseconds specified by this config. Default is 540000 |
Maximum Request Size |
Network |
The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Default is 1048576 |
Receive Buffer Size |
Network |
The size of the TCP receive buffer (SO_RCVBUF) in bytes to use when reading data. If the value is -1, the OS default will be used. Default is 32768 |
Send Buffer Size |
Network |
The size of the TCP send buffer (SO_SNDBUF) in bytes to use when sending data. If the value is -1, the OS default will be used. Default is 131072 |
Reconnect Back-Off |
Network |
The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker. Default is 50 |
Maximum Reconnect Back-Off |
Network |
The maximum amount of time in milliseconds to wait when re-connecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms. Default is 1000 |
Retry Back-Off |
Network |
The amount of time in milliseconds to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. Default is 100 |
Maximum In-Flight Requests per Connection |
Network |
The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries. Default is 5 |
Trust Keystore Location |
SSL |
The location of the trust store file used for two-way authentication for client |
Trust Keystore Password |
SSL |
The password for the trust store file |
Trust Keystore Type |
SSL |
The file format of the trust store file. Default is JKS |
Identity Keystore Location |
SSL |
The location of the key store file used for two-way authentication for client |
Identity Keystore Password |
SSL |
The store password for the key store file. This is only needed if Identity Keystore Location is configured |
Identity Keystore Type |
SSL |
The file format of the key store file. Default is JKS |
Identity Key Password |
SSL |
The password of the private key in the key store file |
Enabled SSL Protocols |
SSL |
A comma separated list of protocols enabled for SSL connections from TLSv1, TLSv1.1 and TLSv1.2, SSLv1, SSLv2 and SSLv3. Default is TLSv1.2,TLSv1.1,TLSv1 |
SSL Protocol |
SSL |
The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities |
SSL Provider |
SSL |
The name of the security provider used for SSL connections. Default value is the default security provider of the JVM |
SSL Cipher Suites |
SSL |
A comma separated list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported |
Endpoint Identification Algorithm |
SSL |
The endpoint identification algorithm to validate server hostname using server certificate |
Keymanager Algorithm |
SSL |
The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine (SunX509) |
Trustmanager Algorithm |
SSL |
The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine (PKIX) |
Secure Random Implementation |
SSL |
The SecureRandom PRNG implementation to use for SSL cryptography operations |
Secure Random Implementation |
SSL |
The SecureRandom PRNG implementation to use for SSL cryptography operations |
Security Protocol |
Security |
Protocol used to communicate with brokers. Default is PLAINTEXT and the options available are as follows
|
SASL Mechanism |
Security |
SASL mechanism used for client connections. This may be any mechanism for which a security provider is available. Default is GSSAPI |
JAAS Config |
Security |
JAAS login context parameters for SASL connections in the format used by JAAS configuration files |
You need to have Kafka and ZooKeeper up and running to proceed with this example using Kafka Egress Connector. If you have not already installed Kafka, please follow the steps 1 to 3 of quickstart guide.
First, let’s create our integration flow for this example and add an NIO HTTP Ingress Connector which listens on port 8280 and service path /publish for HTTP POST requests. We use this to inject messages into UltraESB-X engine.
After that, we’ll add a Kafka EgressConnector to the flow, and as for the bootstrap servers, specify the comma separated list of bootstrap servers and under topic name, specify the topic name and save the configuration.
After that, we’ll add a Kafka EgressConnector to publish the message to a remote Kafka topic and the complete flow should look as follows.
Now after creating a run configuration and running the project, the integration flow is up and ready. Now we are going to inject a message to the UltraESB-X engine using HTTP client in Ultra Studio Toolbox as follows.
But, we have to make sure that the message has been successfully published to the remote Kafka topic. The step 5 of the Kafka quickstart guide explains how to start a kafka consumer and listens to messages with its command line client. If you follow the same command without the flag --from-beginning,
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
you will be able to see the messages that will be published to the remote topic.
Now, everything is ready and when we send the HTTP POST request, the command line Kafka consumer will print the message to the console as follows.