Discussion:
storm-kafka-client Repeated consumption last message
Add Reply
731635762
2018-11-20 01:53:38 UTC
Reply
Permalink
Hi All, I use storm+kafka+protobuf to build my stream process system.

The problem is KafkaTridentSpoutOpaque Repeated consumption the last message. I really want just once consumer for every message in kafka.

This are my some detail:
---------------------java dependency----------

storm-kafka-client 1.2.2
storm-core 1.2.2
kafka_2.10 0.10.2.0


--------------------component-----------------

kafka_2.12-2.0.0
apache-storm-1.2.2



------------------------------------- build KafkaTridentSpoutOpaque instance code ----------------------


protected static KafkaSpoutConfig<String, byte[]> newKafkaSpoutConfig(String bootstrapServers, String topic) {
KafkaSpoutConfig.Builder<String, byte[]> builder = new KafkaSpoutConfig.Builder<>(bootstrapServers, topic);

return builder.setProp(ConsumerConfig.GROUP_ID_CONFIG, "stormKafkaSpoutGroup")
.setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
.setRecordTranslator(new JustValueFunc(), new Fields("str"))
.setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
.setProcessingGuarantee(AT_MOST_ONCE)
.build();
}

private static KafkaTridentSpoutOpaque<String, byte[]> newKafkaTridentSpoutOpaque(KafkaSpoutConfig<String, byte[]> spoutConfig) {
return new KafkaTridentSpoutOpaque<>(spoutConfig);
}

private static class JustValueFunc implements Func<ConsumerRecord<String, byte[]>, List<Object>>, Serializable {
@Override
public List<Object> apply(ConsumerRecord<String, byte[]> record) {
Values res = null;
try {
res = new Values(PbMiddlewareTransfer.Record.parseFrom(record.value()));
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return res;
}
}


-------------------------------there is my topology code ---------------------------------------
public static void main(String[] args) throws Exception {
StormTopology topology = getTridentTopology();
Config conf = new Config();
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("storm-kafka-client-spout-test", conf, topology);
}

public static StormTopology getTridentTopology() {
final TridentTopology tridentTopology = new TridentTopology();

KafkaSpoutConfig<String, byte[]> spoutConfig = newKafkaSpoutConfig("192.168.0.202:9092", "test-2");
ITridentDataSource spout = newKafkaTridentSpoutOpaque(spoutConfig);

final Stream spoutStream = tridentTopology.newStream("spout", spout).parallelismHint(1);

spoutStream.each(spoutStream.getOutputFields(), new Debug("##### fastest driver"));

return tridentTopology.build();
}




------------------------------log-------------------------
./6702/worker.log:2018-11-19 20:19:12.418 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:19:12 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:19:25.908 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:19:25 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:01.997 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:30.591 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:42.960 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:42 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:44.477 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:44 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:47.501 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:47 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:48.516 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:48 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:20:54.072 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:20:54 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:22:01.171 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:22:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:22:27.380 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:22:27 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:03.992 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:03 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:14.893 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:14 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:20.955 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:20 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:25.495 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:25 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:47.978 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:47 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:23:56.440 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:23:56 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:24:33.534 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:24:33 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:27:35.588 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:27:35 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:28:23.784 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:28:23 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:28:48.155 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:28:48 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:29:12.218 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:29:12 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:31:15.597 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:31:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:31:30.720 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:31:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:33:07.871 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:33:07 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:33:27.889 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:33:27 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:34:34.126 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:34:34 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:35:36.615 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:35:36 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:39:31.282 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:39:31 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:40:15.364 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:40:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:15.565 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:16.570 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:16 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:41:54.130 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:41:54 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:43:30.303 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:43:30 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:44:26.049 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:44:26 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:52:43.618 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:52:43 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:54:01.904 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:54:01 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:55:13.448 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:55:13 CST 2018> DEBUG(##### fastest driver): [metricID: 1
./6702/worker.log:2018-11-19 20:59:15.220 STDIO Thread-9-b-0-executor[23 23] [INFO] <Mon Nov 19 20:59:15 CST 2018> DEBUG(##### fastest driver): [metricID: 1


I just produce one message put in kafka, above message is my topology output message. it is except just one, but there are many. And it will repeted about half hour.


Any help is appreciated. Thanks,
ChenBo
Shaik Asifullah
2018-11-20 02:58:55 UTC
Reply
Permalink
Hi,
Setting max spout pending value very high causes this. Try setting with low
value say 1. My topology works good with 10 as max spout pending. More info
here http://storm.apache.org/releases/1.1.2/FAQ.html

Thanks,
Shaik Asifullah
Post by 731635762
Hi All, I use storm+kafka+protobuf to build my stream process system.
The problem is KafkaTridentSpoutOpaque Repeated consumption the last
message. I really want just once consumer for every message in kafka.
---------------------java dependency----------
storm-kafka-client 1.2.2
storm-core 1.2.2
kafka_2.10 0.10.2.0
--------------------component-----------------
kafka_2.12-2.0.0
apache-storm-1.2.2
------------------------------------- build KafkaTridentSpoutOpaque
instance code ----------------------
protected static KafkaSpoutConfig<String, byte[]>
newKafkaSpoutConfig(String bootstrapServers, String topic) {
KafkaSpoutConfig.Builder<String, byte[]> builder = new
KafkaSpoutConfig.Builder<>(bootstrapServers, topic);
return builder.setProp(ConsumerConfig.GROUP_ID_CONFIG,
"stormKafkaSpoutGroup")
.setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArrayDeserializer")
.setRecordTranslator(new JustValueFunc(), new
Fields("str"))
.setFirstPollOffsetStrategy(UNCOMMITTED_EARLIEST)
.setProcessingGuarantee(AT_MOST_ONCE)
.build();
}
private static KafkaTridentSpoutOpaque<String, byte[]>
newKafkaTridentSpoutOpaque(KafkaSpoutConfig<String, byte[]> spoutConfig) {
return new KafkaTridentSpoutOpaque<>(spoutConfig);
}
private static class JustValueFunc implements
Func<ConsumerRecord<String, byte[]>, List<Object>>, Serializable {
@Override
public List<Object> apply(ConsumerRecord<String, byte[]> record) {
Values res = null;
try {
res = new
Values(PbMiddlewareTransfer.Record.parseFrom(record.value()));
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
return res;
}
}
-------------------------------there is my topology code
---------------------------------------
public static void main(String[] args) throws Exception {
StormTopology topology = getTridentTopology();
Config conf = new Config();
conf.setNumWorkers(20);
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("storm-kafka-client-spout-test", conf, topology);
}
public static StormTopology getTridentTopology() {
final TridentTopology tridentTopology = new TridentTopology();
KafkaSpoutConfig<String, byte[]> spoutConfig =
newKafkaSpoutConfig("192.168.0.202:9092", "test-2");
ITridentDataSource spout = newKafkaTridentSpoutOpaque(spoutConfig);
final Stream spoutStream = tridentTopology.newStream("spout",
spout).parallelismHint(1);
spoutStream.each(spoutStream.getOutputFields(), new Debug("##### fastest driver"));
return tridentTopology.build();
}
------------------------------log-------------------------
./6702/worker.log:2018-11-19 20:19:12.418 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:19:25.908 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:20:01.997 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:20:30.591 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:20:42.960 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:20:44.477 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:20:47.501 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:20:48.516 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:20:54.072 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:22:01.171 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:22:27.380 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:23:03.992 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:23:14.893 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:23:20.955 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:23:25.495 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:23:47.978 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:23:56.440 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:24:33.534 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:27:35.588 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:28:23.784 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:28:48.155 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:29:12.218 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:31:15.597 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:31:30.720 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:33:07.871 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:33:27.889 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:34:34.126 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:35:36.615 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:39:31.282 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:40:15.364 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:41:15.565 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:41:16.570 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:41:54.130 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:43:30.303 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:44:26.049 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:52:43.618 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:54:01.904 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:55:13.448 STDIO Thread-9-b-0-executor[23
[metricID: 1
./6702/worker.log:2018-11-19 20:59:15.220 STDIO Thread-9-b-0-executor[23
[metricID: 1
I just produce one message put in kafka, above message is my topology
output message. it is except just one, but there are many. And it will
repeted about half hour.
Any help is appreciated.
Thanks,
ChenBo
Loading...