kafka egress connector

Kafka Egress Connector

Version: 17.07

Supported Since: 17.07.1

What is Kafka Egress Connector?

The Kafka Egress Connector allows you to asynchronously publish messages to a remote Kafka topic and get a hold of record metadata returned.

In order to use the Kafka Egress Connector, you must first select the Kafka Connector dependency from the connector list when you are creating an empty Ultra project. If you have already created a project, you can add the Kafka Connector dependency via Component Registry. From Tools menu, select Ultra Studio → Component Registry and from the Connectors list, select the Kafka Connector dependency.
kafka egress connector ports

Out Ports

On Exception

The message will be sent to this out port if the Egress Connector failed to process the message due to some reason

Response Processor

The message will be sent to this out port if the message has been published to the Kafka destination successfully

Side Ports

Connector Operation

This port is used to connect operational elements to the Egress Connector. By-default, user does not have to connect any operational element and the default connector operation will be used.

Parameters

* marked fields are mandatory

Bootstrap Server *

Basic

A comma separated list of bootstrap server hostname:port pairs for the producer to connect to Kafka

Topic Name *

Basic

The name of the topic to which the messages are sent/published

Key Type

Basic

The type of the key to be used for sending. Default is SEQUENTIAL and all the options available are as follows

  • SEQUENTIAL - Key is generated as a static prefix appended by an auto generated sequence number

  • EXPRESSION - Key is generated as an extracted value from an expression

  • VALUE - Key is generated by a pre-defined static value

Key Prefix

Basic

This will be prepended to the auto generated sequence number to determine the key

Key Expression *

Basic

Expression out of which the key will be derived

Key Value *

Basic

Static key value to be used to send the message to the desired partition of the topic

Include Headers

Basic

Whether to include headers of the message into the Kafka message sent

Send Asynchronously

Basic

Whether the sending mode is asynchronous, if disabled waits for the ACKs to complete the send

Acknowledgements

Basic

Acknowledgements config controls the criteria under which requests are considered complete. Default value is ALL and all the options available are as follows.

  • ALL - The leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive

  • LEADER - The leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.

  • NONE - The producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent.

Retry Count

Basic

Number of retries if the request fails, specifying retries as 0 makes it not retry any message failure. Enabling retries also opens up the possibility of duplicates

Client Id

Basic

An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging

Transactional Id Value

Basic

The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions

Idempotency

Basic

When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. This is set to 'false' by default. Note that enabling idempotence requires max.in.flight.requests.per.connection to be set to 1 and retries cannot be zero. Additionally acks must be set to 'all'. If these values are left at their defaults, we will override the default to be suitable. If the values are set to something incompatible with the idempotent producer, a ConfigException will be thrown

Buffer Memory

Advanced

This specifies the total amount of memory available to the producer for buffering. Default value is 33554432

Egress Timeout

Advanced

Timeout in milliseconds after which the egress message timeouts

Weight

Advanced

Weight of the Egress endpoint when used with a Load Balancer

Compression Type

Advanced

The compression type for all data generated by the producer. The default is none (i.e. no compression). Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression). All the options available are as follows

  • none - no compression

  • gzip - GZip compression

  • snappy - Snappy compression

  • lz4 - LZ4 compression

Batch Size

Advanced

Buffer size to batch requests on a per partition basis. Default is 16384

Linger in Milliseconds

Advanced

The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However, in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay—that is, rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle’s algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay)

Maximum Block in Milliseconds

Advanced

The threshold for time to block for sending, after which it throws a TimeoutException

Interceptor Classes

Advanced

A list of classes to use as interceptors. Implementing the ProducerInterceptor interface allows you to intercept (and possibly mutate) records received by the producer

Metadata Max Age

Advanced

The period of time in milliseconds after which we force a refresh of metadata even if we haven’t seen any partition leadership changes to proactively discover any new brokers or partitions

Partitioner Class

Advanced

Partitioner class that implements the Partitioner interface. Default value is org.apache.kafka.clients.producer.internals.DefaultPartitioner

Kerberos Service Name

Kerberos

The Kerberos principal name that Kafka runs as

Kerberos KInit Command

Kerberos

Kerberos kinit command path. Default is /usr/bin/kinit

Kerberos Minimum Time Before Re-login

Kerberos

Login thread sleep time in milliseconds between refresh attempts

Kerberos Ticket Renew Jitter

Kerberos

Percentage of random jitter added to the renewal time. Default is 0.05

Kerberos Ticket Renew Window Factor

Kerberos

Login thread will sleep until the specified window factor of time from last refresh to ticket’s expiry has been reached, at which time it will try to renew the ticket. Default is 0.8

Metrics Reporters

Metrics

A list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics

Sample Count

Metrics

The number of samples maintained to compute metrics. Default is 2

Recording Level

Metrics

The highest recording level for metrics. Default is INFO and all the options available are as follows

  • INFO

  • DEBUG

Sample Window

Metrics

The window of time a metrics sample is computed over. Default 30000

Request Timeout

Network

The configuration controls the maximum amount of time in milliseconds the client will wait for the response of a request. Default is 30000

Transaction Timeout

Network

The maximum amount of time in milliseconds that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction. Default is 60000

Connection Maximum Idle Time

Network

Close idle connections after the number of milliseconds specified by this config. Default is 540000

Maximum Request Size

Network

The maximum size of a request in bytes. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests. This is also effectively a cap on the maximum record batch size. Default is 1048576

Receive Buffer Size

Network

The size of the TCP receive buffer (SO_RCVBUF) in bytes to use when reading data. If the value is -1, the OS default will be used. Default is 32768

Send Buffer Size

Network

The size of the TCP send buffer (SO_SNDBUF) in bytes to use when sending data. If the value is -1, the OS default will be used. Default is 131072

Reconnect Back-Off

Network

The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker. Default is 50

Maximum Reconnect Back-Off

Network

The maximum amount of time in milliseconds to wait when re-connecting to a broker that has repeatedly failed to connect. If provided, the backoff per host will increase exponentially for each consecutive connection failure, up to this maximum. After calculating the backoff increase, 20% random jitter is added to avoid connection storms. Default is 1000

Retry Back-Off

Network

The amount of time in milliseconds to wait before attempting to retry a failed request to a given topic partition. This avoids repeatedly sending requests in a tight loop under some failure scenarios. Default is 100

Maximum In-Flight Requests per Connection

Network

The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries. Default is 5

Trust Keystore Location

SSL

The location of the trust store file used for two-way authentication for client

Trust Keystore Password

SSL

The password for the trust store file

Trust Keystore Type

SSL

The file format of the trust store file. Default is JKS

Identity Keystore Location

SSL

The location of the key store file used for two-way authentication for client

Identity Keystore Password

SSL

The store password for the key store file. This is only needed if Identity Keystore Location is configured

Identity Keystore Type

SSL

The file format of the key store file. Default is JKS

Identity Key Password

SSL

The password of the private key in the key store file

Enabled SSL Protocols

SSL

A comma separated list of protocols enabled for SSL connections from TLSv1, TLSv1.1 and TLSv1.2, SSLv1, SSLv2 and SSLv3. Default is TLSv1.2,TLSv1.1,TLSv1

SSL Protocol

SSL

The SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities

SSL Provider

SSL

The name of the security provider used for SSL connections. Default value is the default security provider of the JVM

SSL Cipher Suites

SSL

A comma separated list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol. By default all the available cipher suites are supported

Endpoint Identification Algorithm

SSL

The endpoint identification algorithm to validate server hostname using server certificate

Keymanager Algorithm

SSL

The algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine (SunX509)

Trustmanager Algorithm

SSL

The algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine (PKIX)

Secure Random Implementation

SSL

The SecureRandom PRNG implementation to use for SSL cryptography operations

Secure Random Implementation

SSL

The SecureRandom PRNG implementation to use for SSL cryptography operations

Security Protocol

Security

Protocol used to communicate with brokers. Default is PLAINTEXT and the options available are as follows

  • PLAINTEXT

  • SSL

  • SASL_PLAINTEXT

  • SASL_SSL

SASL Mechanism

Security

SASL mechanism used for client connections. This may be any mechanism for which a security provider is available. Default is GSSAPI

JAAS Config

Security

JAAS login context parameters for SASL connections in the format used by JAAS configuration files

Sample Use Case

Prerequisite

You need to have Kafka and ZooKeeper up and running to proceed with this example using Kafka Egress Connector. If you have not already installed Kafka, please follow the steps 1 to 3 of quickstart guide.

1. Publishing a record to a Kafka topic

First, let’s create our integration flow for this example and add an NIO HTTP Ingress Connector which listens on port 8280 and service path /publish for HTTP POST requests. We use this to inject messages into UltraESB-X engine.

After that, we’ll add a Kafka EgressConnector to the flow, and as for the bootstrap servers, specify the comma separated list of bootstrap servers and under topic name, specify the topic name and save the configuration.

kafka egress connector use case 1 kafka egress connector configuration

After that, we’ll add a Kafka EgressConnector to publish the message to a remote Kafka topic and the complete flow should look as follows.

kafka egress connector use case 1 complete flow

Now after creating a run configuration and running the project, the integration flow is up and ready. Now we are going to inject a message to the UltraESB-X engine using HTTP client in Ultra Studio Toolbox as follows.

kafka egress connector use case 1 http client

But, we have to make sure that the message has been successfully published to the remote Kafka topic. The step 5 of the Kafka quickstart guide explains how to start a kafka consumer and listens to messages with its command line client. If you follow the same command without the flag --from-beginning,

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

you will be able to see the messages that will be published to the remote topic.

Now, everything is ready and when we send the HTTP POST request, the command line Kafka consumer will print the message to the console as follows.

kafka egress connector use case 1 console output
In this topic
In this topic
Contact Us