Discussion:
Transition from kafka 0.8 to 0.10
(too old to reply)
Milind Vaidya
2018-09-17 22:16:35 UTC
Permalink
Hi

We had been using kafka 0.8 with Storm. It was upgraded to
kafka_2.11-0.10.0.1 and Storm 1.1.1 as of now. Though the libraries changed
the code pretty much remained the same.

Now we are trying to upgrade to version 1.2.2 of Storm and also look into
KafkaSpoutRetryService. This also leads to using new KafkaSpoutConfig.

What I fail to understand is where do I set properties related to zookeeper
such as zkRoot in this new config. I also did not find any way to set
following properties


public int fetchSizeBytes = 1024 * 1024;public int socketTimeoutMs =
10000;public int fetchMaxWait = 10000;public int bufferSizeBytes =
1024 * 1024;public MultiScheme scheme = new RawMultiScheme();public
boolean ignoreZkOffsets = false;public long startOffsetTime =
kafka.api.OffsetRequest.EarliestTime();public long maxOffsetBehind =
Long.MAX_VALUE;public boolean useStartOffsetTimeIfOffsetOutOfRange =
true;public int metricsTimeBucketSizeInSecs = 60;
Stig Rohde Døssing
2018-09-18 05:45:24 UTC
Permalink
The storm-kafka-client spout is a complete rewrite. The new spout stores
offsets in Kafka instead of Zookeeper, so you don't need to set any
Zookeeper configuration. You will need to migrate your committed offsets
using
https://github.com/apache/storm/tree/master/external/storm-kafka-migration
though.

Regarding how to set the properties you listed, take a look at
https://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-kafka-client.html.
There's a table at the bottom that compares the two spout configs. Most of
the configs are now set using KafkaConsumer properties, which you can set
using KafkaSpoutConfig.setProp. The properties are documented at
https://kafka.apache.org/documentation/#newconsumerconfigs.
Post by Milind Vaidya
Hi
We had been using kafka 0.8 with Storm. It was upgraded to
kafka_2.11-0.10.0.1 and Storm 1.1.1 as of now. Though the libraries changed
the code pretty much remained the same.
Now we are trying to upgrade to version 1.2.2 of Storm and also look into
KafkaSpoutRetryService. This also leads to using new KafkaSpoutConfig.
What I fail to understand is where do I set properties related to
zookeeper such as zkRoot in this new config. I also did not find any way to
set following properties
public int fetchSizeBytes = 1024 * 1024;public int socketTimeoutMs = 10000;public int fetchMaxWait = 10000;public int bufferSizeBytes = 1024 * 1024;public MultiScheme scheme = new RawMultiScheme();public boolean ignoreZkOffsets = false;public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();public long maxOffsetBehind = Long.MAX_VALUE;public boolean useStartOffsetTimeIfOffsetOutOfRange = true;public int metricsTimeBucketSizeInSecs = 60;
Milind Vaidya
2018-10-01 23:01:14 UTC
Permalink
Hi Stig,

Thanks for that response. I was able to set up kafka-storm integration in
QA without zookeeper. I have a specific question related to
FIrstPollOffsetStrategy. The table at the end of the above link you sent
explains the mapping between startOffsetTime and forceFromStart to
FIrstPollOffsetStrategy. But the old KafkaSpout also had a provision by
which a unix timestamp aka seconds since epoch was also acceptable value.
This combined with ignoring offsets from ZK was convenient to have a finer
granularity of consuming the kafka offsets.The new spout has only 4 such
options. Is there a way to achieve the timestamp based consumption ? When
described, the kafka consumer group using command line the timestamp is
nowhere to be found in the fields.
Post by Stig Rohde Døssing
The storm-kafka-client spout is a complete rewrite. The new spout stores
offsets in Kafka instead of Zookeeper, so you don't need to set any
Zookeeper configuration. You will need to migrate your committed offsets
using
https://github.com/apache/storm/tree/master/external/storm-kafka-migration
though.
Regarding how to set the properties you listed, take a look at
https://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-kafka-client.html.
There's a table at the bottom that compares the two spout configs. Most of
the configs are now set using KafkaConsumer properties, which you can set
using KafkaSpoutConfig.setProp. The properties are documented at
https://kafka.apache.org/documentation/#newconsumerconfigs.
Post by Milind Vaidya
Hi
We had been using kafka 0.8 with Storm. It was upgraded to
kafka_2.11-0.10.0.1 and Storm 1.1.1 as of now. Though the libraries changed
the code pretty much remained the same.
Now we are trying to upgrade to version 1.2.2 of Storm and also look into
KafkaSpoutRetryService. This also leads to using new KafkaSpoutConfig.
What I fail to understand is where do I set properties related to
zookeeper such as zkRoot in this new config. I also did not find any way to
set following properties
public int fetchSizeBytes = 1024 * 1024;public int socketTimeoutMs = 10000;public int fetchMaxWait = 10000;public int bufferSizeBytes = 1024 * 1024;public MultiScheme scheme = new RawMultiScheme();public boolean ignoreZkOffsets = false;public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();public long maxOffsetBehind = Long.MAX_VALUE;public boolean useStartOffsetTimeIfOffsetOutOfRange = true;public int metricsTimeBucketSizeInSecs = 60;
Stig Rohde Døssing
2018-10-02 19:56:10 UTC
Permalink
See https://issues.apache.org/jira/browse/STORM-2992.
Post by Milind Vaidya
Hi Stig,
Thanks for that response. I was able to set up kafka-storm integration in
QA without zookeeper. I have a specific question related to
FIrstPollOffsetStrategy. The table at the end of the above link you sent
explains the mapping between startOffsetTime and forceFromStart to
FIrstPollOffsetStrategy. But the old KafkaSpout also had a provision by
which a unix timestamp aka seconds since epoch was also acceptable value.
This combined with ignoring offsets from ZK was convenient to have a finer
granularity of consuming the kafka offsets.The new spout has only 4 such
options. Is there a way to achieve the timestamp based consumption ? When
described, the kafka consumer group using command line the timestamp is
nowhere to be found in the fields.
On Mon, Sep 17, 2018 at 10:45 PM Stig Rohde DÞssing <
Post by Stig Rohde Døssing
The storm-kafka-client spout is a complete rewrite. The new spout stores
offsets in Kafka instead of Zookeeper, so you don't need to set any
Zookeeper configuration. You will need to migrate your committed offsets
using
https://github.com/apache/storm/tree/master/external/storm-kafka-migration
though.
Regarding how to set the properties you listed, take a look at
https://storm.apache.org/releases/2.0.0-SNAPSHOT/storm-kafka-client.html.
There's a table at the bottom that compares the two spout configs. Most of
the configs are now set using KafkaConsumer properties, which you can set
using KafkaSpoutConfig.setProp. The properties are documented at
https://kafka.apache.org/documentation/#newconsumerconfigs.
Post by Milind Vaidya
Hi
We had been using kafka 0.8 with Storm. It was upgraded to
kafka_2.11-0.10.0.1 and Storm 1.1.1 as of now. Though the libraries changed
the code pretty much remained the same.
Now we are trying to upgrade to version 1.2.2 of Storm and also look
into KafkaSpoutRetryService. This also leads to using new
KafkaSpoutConfig.
What I fail to understand is where do I set properties related to
zookeeper such as zkRoot in this new config. I also did not find any way to
set following properties
public int fetchSizeBytes = 1024 * 1024;public int socketTimeoutMs = 10000;public int fetchMaxWait = 10000;public int bufferSizeBytes = 1024 * 1024;public MultiScheme scheme = new RawMultiScheme();public boolean ignoreZkOffsets = false;public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();public long maxOffsetBehind = Long.MAX_VALUE;public boolean useStartOffsetTimeIfOffsetOutOfRange = true;public int metricsTimeBucketSizeInSecs = 60;
Loading...