KafkaSpout Integration: Core Storm APIs
The core-storm API represents a Kafka spout with the KafkaSpout
                class.
To initialize KafkaSpout, define a SpoutConfig
                subclass instance of the KafkaConfig class, representing
                configuration information needed to ingest data from a Kafka cluster.
                    KafkaSpout requires an instance of the BrokerHosts
                interface.
BrokerHosts Interface
The BrokerHost interface maps Kafka brokers to topic partitions.
                Constructors for KafkaSpout (and, for the Trident API,
                    TridentKafkaConfig) require an implementation of the
                    BrokerHosts interface.
The storm-kafka component provides two implementations of
                    BrokerHosts, ZkHosts and
                    StaticHosts:
- Use - ZkHostsif you want to track broker-to-partition mapping dynamically.This class uses Kafka's ZooKeeper entries to track mapping.- You can instantiate an object as follows: - public ZkHosts(String brokerZkStr, String brokerZkPath)- public ZkHosts(String brokerZkStr)- where: - brokerZkStris the- IP:portaddress for the ZooKeeper host; for example,- localhost:2181.
- brokerZkPathis the root directory under which topics and partition information are stored. By default this is /- brokers, which is the default used by Kafka.
 - By default, broker-partition mapping refreshes every 60 seconds. If you want to change the refresh frequency, set - host.refreshFreqSecsto your chosen value.
- Use - StaticHostsfor static broker-to-partition mapping. To construct an instance of this class, you must first construct an instance of- GlobalPartitionInformation; for example:- Broker brokerForPartition0 = new Broker("localhost");//localhost:9092 Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string. GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation(); partitionInfo.add(0, brokerForPartition0)//mapping form partition 0 to brokerForPartition0 partitionInfo.add(1, brokerForPartition1)//mapping form partition 1 to brokerForPartition1 partitionInfo.add(2, brokerForPartition2)//mapping form partition 2 to brokerForPartition2 StaticHosts hosts = new StaticHosts(partitionInfo);
KafkaConfig Class and SpoutConfig
                    Subclass
Next, define a SpoutConfig subclass instance of the
                    KafkaConfig class.
KafkaConfig contains several fields used to configure the
                behavior of a Kafka spout in a Storm topology; Spoutconfig extends
                    KafkaConfig, supporting additional fields for ZooKeeper connection
                info and for controlling behavior specific to KafkaSpout.
KafkaConfig implements the following constructors, each of
                which requires an implementation of the BrokerHosts
                interface:
public KafkaConfig(BrokerHosts hosts, String topic) public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
KafkaConfig Parameters
- hosts
- One or more hosts that are Kafka ZooKeeper broker nodes (see " - BrokerHostsInterface").
- topic
- Name of the Kafka topic that KafkaSpout will consume from. 
- clientId
- Optional parameter used as part of the ZooKeeper path, specifying where the spout's current offset is stored. 
KafkaConfig Fields
- fetchSizeBytes
- Number of bytes to attempt to fetch in one request to a Kafka server. The default is 1MB. 
- socketTimeoutMs
- Number of milliseconds to wait before a socket fails an operation with a timeout. The default value is 10 seconds. 
- bufferSizeBytes
- Buffer size (in bytes) for network requests. The default is 1MB. 
- scheme
- The interface that specifies how a - ByteBufferfrom a Kafka topic is transformed into a Storm tuple.- The default, - MultiScheme, returns a tuple and no additional processing.- The API provides many implementations of the - Schemeclass, including:- storm.kafka.StringScheme
- storm.kafka.KeyValueSchemeAsMultiScheme
- storm.kafka.StringKeyValueScheme
- storm.kafka.KeyValueSchemeAsMultiScheme
 ![[Important]](../common/images/admon/important.png) - Important - In Apache Storm versions prior to 1.0, - MultiSchememethods accepted a- byte[]parameter instead of a- ByteBuffer. In Storm version 1.0,- MultiSchemeand related scheme APIs changed; they now accept a- ByteBufferinstead of a- byte[].- As a result, Kafka spouts built with Storm versions earlier than 1.0 do not work with Storm versions 1.0 and later. When running topologies with Storm version 1.0 and later, ensure that your version of - storm-kafkais at least 1.0. Rebuild pre-1.0 shaded topology .jar files that bundle- storm-kafkaclasses with- storm-kafkaversion 1.0 before running them in clusters with Storm 1.0 and later.
- ignoreZKOffsets
- To force the spout to ignore any consumer state information stored in ZooKeeper, set - ignoreZkOffsetsto- true. If- true, the spout always begins reading from the offset defined by- startOffsetTime. For more information, see "How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures."
- startOffsetTime
- Controls whether streaming for a topic starts from the beginning of the topic or whether only new messages are streamed. The following are valid values: - kafka.api.OffsetRequest.EarliestTime()starts streaming from the beginning of the topic
- kafka.api.OffsetRequest.LatestTime()streams only new messages
 
- maxOffsetBehind
- Specifies how long a spout attempts to retry the processing of a failed tuple. If a failing tuple's offset is less than - maxOffsetBehind, the spout stops retrying the tuple. The default is- LONG.MAX_VALUE.
- useStartOffsetTimeOfOffsetOutOfRange
- Controls whether a spout streams messages from the beginning of a topic when the spout throws an exception for an out-of-range offset. The default value is true. 
- metricsTimeBucketSizeInSecs
- Controls the time interval at which Storm reports spout-related metrics. The default is 60 seconds. 
Instantiate SpoutConfig as follows:
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String nodeId)
SpoutConfig Parameters
- hosts
- One or more hosts that are Kafka ZooKeeper broker nodes (see " - BrokerHostsInterface").
- topic
- Name of the Kafka topic that KafkaSpout will consume from. 
- zkroot
- Root directory in ZooKeeper under which KafkaSpout consumer offsets are stored. The default is - /brokers.
- nodeId
- ZooKeeper node under which KafkaSpout stores offsets for each topic-partition. The node ID must be unique for each Topology. The topology uses this path to recover in failure scenarios, or when there is maintenance that requires killing the topology. 
zkroot and nodeId are used to construct the ZooKeeper
                path where Storm stores the Kafka offset. You can find offsets at
                    zkroot+"/"+nodeId.
To start processing messages from where the last operation left off, use the same
                    zkroot and nodeId. To start from the beginning of the
                Kafka topic, set KafkaConfig.ignoreZKOffsets to
                true.
Example
The following example illustrates the use of the KafkaSpout
                class and related interfaces:
BrokerHosts hosts = new ZkHosts(zkConnString); SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + zkrootDir, node); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);

