diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaConsumer.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaConsumer.java index 787b28552..4ed1cbfcf 100644 --- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaConsumer.java +++ b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaConsumer.java @@ -45,6 +45,7 @@ public class KafkaConsumer { private LinkedList brokerList; private int brokerIndex; private Broker leaderBroker; + private short fetchResponseCode = 0; public KafkaConsumer(KafkaSpoutConfig config) { this.config = config; @@ -82,14 +83,16 @@ public ByteBufferMessageSet fetchMessages(int partition, long offset) throws IOE } } if (fetchResponse.hasError()) { - short code = fetchResponse.errorCode(topic, partition); - if (code == ErrorMapping.OffsetOutOfRangeCode() && config.resetOffsetIfOutOfRange) { - long startOffset = getOffset(topic, partition, config.startOffsetTime); - offset = startOffset; + fetchResponseCode = fetchResponse.errorCode(topic, partition); + if (fetchResponseCode == ErrorMapping.OffsetOutOfRangeCode()) { } +// if (code == ErrorMapping.OffsetOutOfRangeCode() && config.resetOffsetIfOutOfRange) { +// long startOffset = getOffset(topic, partition, config.startOffsetTime); +// offset = startOffset; +// } if(leaderBroker != null) { LOG.error("fetch data from kafka topic[" + config.topic + "] host[" + leaderBroker.host() + ":" + leaderBroker.port() + "] partition[" - + partition + "] error:" + code); + + partition + "] error:" + fetchResponseCode); }else { } @@ -99,6 +102,12 @@ public ByteBufferMessageSet fetchMessages(int partition, long offset) throws IOE return msgs; } } + + public short getAndResetFetchResponseCode(){ + short code = this.fetchResponseCode; + this.fetchResponseCode = 0; + return code; + } private SimpleConsumer findLeaderConsumer(int partition) { try { @@ -238,4 +247,4 @@ public void setLeaderBroker(Broker leaderBroker) { this.leaderBroker = leaderBroker; } -} +} \ No newline at end of file diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpout.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpout.java index 1d9432b7b..75e6ec206 100644 --- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpout.java +++ b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpout.java @@ -71,9 +71,13 @@ public void deactivate() { @Override public void nextTuple() { Collection partitionConsumers = coordinator.getPartitionConsumers(); + boolean isAllSleeping = true; for(PartitionConsumer consumer: partitionConsumers) { - EmitState state = consumer.emit(collector); - LOG.debug("====== partition "+ consumer.getPartition() + " emit message state is "+state); + if(!consumer.isSleepingConsumer() ){ + isAllSleeping = false; + EmitState state = consumer.emit(collector); + LOG.debug("====== partition "+ consumer.getPartition() + " emit message state is "+state); + } // if(state != EmitState.EMIT_MORE) { // currentPartitionIndex = (currentPartitionIndex+1) % consumerSize; // } @@ -81,6 +85,13 @@ public void nextTuple() { // break; // } } + if(isAllSleeping){ + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } long now = System.currentTimeMillis(); if((now - lastUpdateMs) > config.offsetUpdateIntervalMs) { commitState(); diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java index 4b8ad7f4d..d90258ff4 100644 --- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java +++ b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java @@ -2,6 +2,7 @@ import java.nio.ByteBuffer; + import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; @@ -18,6 +19,7 @@ import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.Message; import kafka.message.MessageAndOffset; +import kafka.common.ErrorMapping; import backtype.storm.Config; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.utils.Utils; @@ -48,6 +50,7 @@ static enum EmitState { private long lastCommittedOffset; private ZkState zkState; private Map stormConf; + private long consumerSleepEndTime = 0; public PartitionConsumer(Map conf, KafkaSpoutConfig config, int partition, ZkState offsetState) { this.stormConf = conf; @@ -68,16 +71,17 @@ public PartitionConsumer(Map conf, KafkaSpoutConfig config, int partition, ZkSta } try { - if (config.fromBeginning) { - emittingOffset = consumer.getOffset(config.topic, partition, kafka.api.OffsetRequest.EarliestTime()); - } else { - if (jsonOffset == null) { - lastCommittedOffset = consumer.getOffset(config.topic, partition, kafka.api.OffsetRequest.LatestTime()); + if (jsonOffset == null) { + if (config.fromBeginning) { + emittingOffset = consumer.getOffset(config.topic, partition, kafka.api.OffsetRequest.EarliestTime()); } else { - lastCommittedOffset = jsonOffset; + lastCommittedOffset = consumer.getOffset(config.topic, partition, kafka.api.OffsetRequest.LatestTime()); } - emittingOffset = lastCommittedOffset; + } else { + lastCommittedOffset = jsonOffset; } + emittingOffset = lastCommittedOffset; + } catch (Exception e) { LOG.error(e.getMessage(), e); } @@ -125,10 +129,18 @@ private void fillMessages() { msgs = consumer.fetchMessages(partition, emittingOffset + 1); if (msgs == null) { - LOG.error("fetch null message from offset {}", emittingOffset); + short fetchResponseCode = consumer.getAndResetFetchResponseCode(); + if (fetchResponseCode == ErrorMapping.OffsetOutOfRangeCode()) { + this.emittingOffset = consumer.getOffset(config.topic, partition, kafka.api.OffsetRequest.LatestTime()); + LOG.warn("reset kafka offset {}", emittingOffset); + }else{ + this.consumerSleepEndTime = System.currentTimeMillis() + 100; + LOG.warn("sleep until {}", consumerSleepEndTime); + } + LOG.warn("fetch null message from offset {}", emittingOffset); return; } - + int count = 0; for (MessageAndOffset msg : msgs) { count += 1; @@ -144,6 +156,10 @@ private void fillMessages() { LOG.error(e.getMessage(),e); } } + + public boolean isSleepingConsumer(){ + return System.currentTimeMillis() < this.consumerSleepEndTime; + } public void commitState() { try { @@ -224,4 +240,4 @@ public KafkaConsumer getConsumer() { public void setConsumer(KafkaConsumer consumer) { this.consumer = consumer; } -} +} \ No newline at end of file