From b639c1c8b6452f998de196c25182666bc32143cc Mon Sep 17 00:00:00 2001 From: leihui Date: Fri, 3 Mar 2017 12:51:13 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BF=AE=E5=A4=8Djstorm=E4=BB=A3=E7=A0=81m?= =?UTF-8?q?erge=E4=B8=A2=E5=A4=B1=E7=9A=84jstorm-kafka=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 详见issue #440 --- .../alibaba/jstorm/kafka/KafkaConsumer.java | 442 ++++++++++-------- .../com/alibaba/jstorm/kafka/KafkaSpout.java | 88 ++-- .../jstorm/kafka/PartitionConsumer.java | 35 +- 3 files changed, 315 insertions(+), 250 deletions(-) diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaConsumer.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaConsumer.java index 787b28552..348c52dd3 100644 --- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaConsumer.java +++ b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaConsumer.java @@ -34,208 +34,240 @@ */ public class KafkaConsumer { - private static Logger LOG = Logger.getLogger(KafkaConsumer.class); - - public static final int NO_OFFSET = -1; - - private int status; - private SimpleConsumer consumer = null; - - private KafkaSpoutConfig config; - private LinkedList brokerList; - private int brokerIndex; - private Broker leaderBroker; - - public KafkaConsumer(KafkaSpoutConfig config) { - this.config = config; - this.brokerList = new LinkedList(config.brokers); - this.brokerIndex = 0; - } - - public ByteBufferMessageSet fetchMessages(int partition, long offset) throws IOException { - - String topic = config.topic; - FetchRequest req = new FetchRequestBuilder().clientId(config.clientId).addFetch(topic, partition, offset, config.fetchMaxBytes) - .maxWait(config.fetchWaitMaxMs).build(); - FetchResponse fetchResponse = null; - SimpleConsumer simpleConsumer = null; - try { - simpleConsumer = findLeaderConsumer(partition); - if (simpleConsumer == null) { - // LOG.error(message); - return null; - } - fetchResponse = simpleConsumer.fetch(req); - } catch (Exception e) { - if (e instanceof ConnectException || e instanceof SocketTimeoutException || e instanceof IOException - || e instanceof UnresolvedAddressException) { - LOG.warn("Network error when fetching messages:", e); - if (simpleConsumer != null) { - String host = simpleConsumer.host(); - int port = simpleConsumer.port(); - simpleConsumer = null; - throw new KafkaException("Network error when fetching messages: " + host + ":" + port + " , " + e.getMessage(), e); - } - - } else { - throw new RuntimeException(e); - } - } - if (fetchResponse.hasError()) { - short code = fetchResponse.errorCode(topic, partition); - if (code == ErrorMapping.OffsetOutOfRangeCode() && config.resetOffsetIfOutOfRange) { - long startOffset = getOffset(topic, partition, config.startOffsetTime); - offset = startOffset; - } - if(leaderBroker != null) { - LOG.error("fetch data from kafka topic[" + config.topic + "] host[" + leaderBroker.host() + ":" + leaderBroker.port() + "] partition[" - + partition + "] error:" + code); - }else { - - } - return null; - } else { - ByteBufferMessageSet msgs = fetchResponse.messageSet(topic, partition); - return msgs; - } - } - - private SimpleConsumer findLeaderConsumer(int partition) { - try { - if (consumer != null) { - return consumer; - } - PartitionMetadata metadata = findLeader(partition); - if (metadata == null) { - leaderBroker = null; - consumer = null; - return null; - } - leaderBroker = metadata.leader(); - consumer = new SimpleConsumer(leaderBroker.host(), leaderBroker.port(), config.socketTimeoutMs, config.socketReceiveBufferBytes, - config.clientId); - - return consumer; - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - return null; - } - - protected PartitionMetadata findLeader(int partition) { - PartitionMetadata returnMetaData = null; - int errors = 0; - int size = brokerList.size(); - - Host brokerHost = brokerList.get(brokerIndex); - try { - if (consumer == null) { - consumer = new SimpleConsumer(brokerHost.getHost(), brokerHost.getPort(), config.socketTimeoutMs, config.socketReceiveBufferBytes, - config.clientId); - } - } catch (Exception e) { - LOG.warn(e.getMessage(), e); - consumer = null; - } - int i = brokerIndex; - loop: while (i < size && errors < size + 1) { - Host host = brokerList.get(i); - i = (i + 1) % size; - brokerIndex = i; // next index - try { - - if (consumer == null) { - consumer = new SimpleConsumer(host.getHost(), host.getPort(), config.socketTimeoutMs, config.socketReceiveBufferBytes, - config.clientId); - } - List topics = Collections.singletonList(config.topic); - TopicMetadataRequest req = new TopicMetadataRequest(topics); - kafka.javaapi.TopicMetadataResponse resp = null; - try { - resp = consumer.send(req); - } catch (Exception e) { - errors += 1; - - LOG.error("findLeader error, broker:" + host.toString() + ", will change to next broker index:" + (i + 1) % size); - if (consumer != null) { - consumer.close(); - consumer = null; - } - continue; - } - - List metaData = resp.topicsMetadata(); - for (TopicMetadata item : metaData) { - for (PartitionMetadata part : item.partitionsMetadata()) { - if (part.partitionId() == partition) { - returnMetaData = part; - break loop; - } - } - } - - } catch (Exception e) { - LOG.error("Error communicating with Broker:" + host.toString() + ", find Leader for partition:" + partition); - } finally { - if (consumer != null) { - consumer.close(); - consumer = null; - } - } - } - - return returnMetaData; - } - - public long getOffset(String topic, int partition, long startOffsetTime) { - SimpleConsumer simpleConsumer = findLeaderConsumer(partition); - - if (simpleConsumer == null) { - LOG.error("Error consumer is null get offset from partition:" + partition); - return -1; - } - - TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); - Map requestInfo = new HashMap(); - requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1)); - OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId()); - - long[] offsets = simpleConsumer.getOffsetsBefore(request).offsets(topic, partition); - if (offsets.length > 0) { - return offsets[0]; - } else { - return NO_OFFSET; - } - } - - public void close() { - if (consumer != null) { - consumer.close(); - } - } - - public SimpleConsumer getConsumer() { - return consumer; - } - - public void setConsumer(SimpleConsumer consumer) { - this.consumer = consumer; - } - - public int getStatus() { - return status; - } - - public void setStatus(int status) { - this.status = status; - } - - public Broker getLeaderBroker() { - return leaderBroker; - } - - public void setLeaderBroker(Broker leaderBroker) { - this.leaderBroker = leaderBroker; - } - -} + private static Logger LOG = Logger.getLogger(KafkaConsumer.class); + + public static final int NO_OFFSET = -1; + + private int status; + private SimpleConsumer consumer = null; + + private KafkaSpoutConfig config; + private LinkedList brokerList; + private int brokerIndex; + private Broker leaderBroker; + private short fetchResponseCode = 0; + + public KafkaConsumer(KafkaSpoutConfig config) { + this.config = config; + this.brokerList = new LinkedList(config.brokers); + this.brokerIndex = 0; + } + + public ByteBufferMessageSet fetchMessages(int partition, long offset) + throws IOException { + + String topic = config.topic; + FetchRequest req = new FetchRequestBuilder().clientId(config.clientId) + .addFetch(topic, partition, offset, config.fetchMaxBytes) + .maxWait(config.fetchWaitMaxMs).build(); + FetchResponse fetchResponse = null; + SimpleConsumer simpleConsumer = null; + try { + simpleConsumer = findLeaderConsumer(partition); + if (simpleConsumer == null) { + // LOG.error(message); + return null; + } + fetchResponse = simpleConsumer.fetch(req); + } catch (Exception e) { + if (e instanceof ConnectException + || e instanceof SocketTimeoutException + || e instanceof IOException + || e instanceof UnresolvedAddressException) { + LOG.warn("Network error when fetching messages:", e); + if (simpleConsumer != null) { + String host = simpleConsumer.host(); + int port = simpleConsumer.port(); + simpleConsumer = null; + throw new KafkaException( + "Network error when fetching messages: " + host + + ":" + port + " , " + e.getMessage(), e); + } + + } else { + throw new RuntimeException(e); + } + } + if (fetchResponse.hasError()) { + fetchResponseCode = fetchResponse.errorCode(topic, partition); + if (fetchResponseCode == ErrorMapping.OffsetOutOfRangeCode()) { + } + // if (code == ErrorMapping.OffsetOutOfRangeCode() && + // config.resetOffsetIfOutOfRange) { + // long startOffset = getOffset(topic, partition, + // config.startOffsetTime); + // offset = startOffset; + // } + if (leaderBroker != null) { + LOG.error("fetch data from kafka topic[" + config.topic + + "] host[" + leaderBroker.host() + ":" + + leaderBroker.port() + "] partition[" + partition + + "] error:" + fetchResponseCode); + } else { + + } + return null; + } else { + ByteBufferMessageSet msgs = fetchResponse.messageSet(topic, + partition); + return msgs; + } + } + + public short getAndResetFetchResponseCode() { + short code = this.fetchResponseCode; + this.fetchResponseCode = 0; + return code; + } + + private SimpleConsumer findLeaderConsumer(int partition) { + try { + if (consumer != null) { + return consumer; + } + PartitionMetadata metadata = findLeader(partition); + if (metadata == null) { + leaderBroker = null; + consumer = null; + return null; + } + leaderBroker = metadata.leader(); + consumer = new SimpleConsumer(leaderBroker.host(), + leaderBroker.port(), config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId); + + return consumer; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + return null; + } + + protected PartitionMetadata findLeader(int partition) { + PartitionMetadata returnMetaData = null; + int errors = 0; + int size = brokerList.size(); + + Host brokerHost = brokerList.get(brokerIndex); + try { + if (consumer == null) { + consumer = new SimpleConsumer(brokerHost.getHost(), + brokerHost.getPort(), config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId); + } + } catch (Exception e) { + LOG.warn(e.getMessage(), e); + consumer = null; + } + int i = brokerIndex; + loop: while (i < size && errors < size + 1) { + Host host = brokerList.get(i); + i = (i + 1) % size; + brokerIndex = i; // next index + try { + + if (consumer == null) { + consumer = new SimpleConsumer(host.getHost(), + host.getPort(), config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId); + } + List topics = Collections.singletonList(config.topic); + TopicMetadataRequest req = new TopicMetadataRequest(topics); + kafka.javaapi.TopicMetadataResponse resp = null; + try { + resp = consumer.send(req); + } catch (Exception e) { + errors += 1; + + LOG.error("findLeader error, broker:" + host.toString() + + ", will change to next broker index:" + (i + 1) + % size); + if (consumer != null) { + consumer.close(); + consumer = null; + } + continue; + } + + List metaData = resp.topicsMetadata(); + for (TopicMetadata item : metaData) { + for (PartitionMetadata part : item.partitionsMetadata()) { + if (part.partitionId() == partition) { + returnMetaData = part; + break loop; + } + } + } + + } catch (Exception e) { + LOG.error("Error communicating with Broker:" + host.toString() + + ", find Leader for partition:" + partition); + } finally { + if (consumer != null) { + consumer.close(); + consumer = null; + } + } + } + + return returnMetaData; + } + + public long getOffset(String topic, int partition, long startOffsetTime) { + SimpleConsumer simpleConsumer = findLeaderConsumer(partition); + + if (simpleConsumer == null) { + LOG.error("Error consumer is null get offset from partition:" + + partition); + return -1; + } + + TopicAndPartition topicAndPartition = new TopicAndPartition(topic, + partition); + Map requestInfo = new HashMap(); + requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo( + startOffsetTime, 1)); + OffsetRequest request = new OffsetRequest(requestInfo, + kafka.api.OffsetRequest.CurrentVersion(), + simpleConsumer.clientId()); + + long[] offsets = simpleConsumer.getOffsetsBefore(request).offsets( + topic, partition); + if (offsets.length > 0) { + return offsets[0]; + } else { + return NO_OFFSET; + } + } + + public void close() { + if (consumer != null) { + consumer.close(); + } + } + + public SimpleConsumer getConsumer() { + return consumer; + } + + public void setConsumer(SimpleConsumer consumer) { + this.consumer = consumer; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + public Broker getLeaderBroker() { + return leaderBroker; + } + + public void setLeaderBroker(Broker leaderBroker) { + this.leaderBroker = leaderBroker; + } + +} \ No newline at end of file diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpout.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpout.java index 1d9432b7b..f5eb8da51 100644 --- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpout.java +++ b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpout.java @@ -22,24 +22,25 @@ public class KafkaSpout implements IRichSpout { private static Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); protected SpoutOutputCollector collector; - + private long lastUpdateMs; PartitionCoordinator coordinator; - + private KafkaSpoutConfig config; - + private ZkState zkState; - + public KafkaSpout() { - + } - + public KafkaSpout(KafkaSpoutConfig config) { this.config = config; } - + @Override - public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + public void open(Map conf, TopologyContext context, + SpoutOutputCollector collector) { this.collector = collector; if (this.config == null) { config = new KafkaSpoutConfig(); @@ -52,62 +53,71 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect @Override public void close() { - // TODO Auto-generated method stub - zkState.close(); + zkState.close(); } @Override public void activate() { - // TODO Auto-generated method stub - } @Override public void deactivate() { - // TODO Auto-generated method stub - } @Override public void nextTuple() { - Collection partitionConsumers = coordinator.getPartitionConsumers(); - for(PartitionConsumer consumer: partitionConsumers) { - EmitState state = consumer.emit(collector); - LOG.debug("====== partition "+ consumer.getPartition() + " emit message state is "+state); -// if(state != EmitState.EMIT_MORE) { -// currentPartitionIndex = (currentPartitionIndex+1) % consumerSize; -// } -// if(state != EmitState.EMIT_NONE) { -// break; -// } + Collection partitionConsumers = coordinator + .getPartitionConsumers(); + boolean isAllSleeping = true; + for (PartitionConsumer consumer : partitionConsumers) { + if (!consumer.isSleepingConsumer()) { + isAllSleeping = false; + EmitState state = consumer.emit(collector); + LOG.debug("====== partition " + consumer.getPartition() + + " emit message state is " + state); + } + // if(state != EmitState.EMIT_MORE) { + // currentPartitionIndex = (currentPartitionIndex+1) % consumerSize; + // } + // if(state != EmitState.EMIT_NONE) { + // break; + // } + } + if (isAllSleeping) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } } long now = System.currentTimeMillis(); - if((now - lastUpdateMs) > config.offsetUpdateIntervalMs) { - commitState(); - } - - + if ((now - lastUpdateMs) > config.offsetUpdateIntervalMs) { + commitState(); + } + } - + public void commitState() { - lastUpdateMs = System.currentTimeMillis(); - for(PartitionConsumer consumer: coordinator.getPartitionConsumers()) { + lastUpdateMs = System.currentTimeMillis(); + for (PartitionConsumer consumer : coordinator.getPartitionConsumers()) { consumer.commitState(); - } - + } + } @Override public void ack(Object msgId) { - KafkaMessageId messageId = (KafkaMessageId)msgId; - PartitionConsumer consumer = coordinator.getConsumer(messageId.getPartition()); + KafkaMessageId messageId = (KafkaMessageId) msgId; + PartitionConsumer consumer = coordinator.getConsumer(messageId + .getPartition()); consumer.ack(messageId.getOffset()); } @Override public void fail(Object msgId) { - KafkaMessageId messageId = (KafkaMessageId)msgId; - PartitionConsumer consumer = coordinator.getConsumer(messageId.getPartition()); + KafkaMessageId messageId = (KafkaMessageId) msgId; + PartitionConsumer consumer = coordinator.getConsumer(messageId + .getPartition()); consumer.fail(messageId.getOffset()); } @@ -120,7 +130,5 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { public Map getComponentConfiguration() { return null; } - - } \ No newline at end of file diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java index 4b8ad7f4d..4edd7b64c 100644 --- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java +++ b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java @@ -18,6 +18,7 @@ import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.Message; import kafka.message.MessageAndOffset; +import kafka.common.ErrorMapping; import backtype.storm.Config; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.utils.Utils; @@ -48,6 +49,7 @@ static enum EmitState { private long lastCommittedOffset; private ZkState zkState; private Map stormConf; + private long consumerSleepEndTime = 0; public PartitionConsumer(Map conf, KafkaSpoutConfig config, int partition, ZkState offsetState) { this.stormConf = conf; @@ -122,13 +124,28 @@ private void fillMessages() { ByteBufferMessageSet msgs; try { long start = System.currentTimeMillis(); - msgs = consumer.fetchMessages(partition, emittingOffset + 1); - + if(config.fromBeginning){ + msgs = consumer.fetchMessages(partition, emittingOffset); + }else { + msgs = consumer.fetchMessages(partition, emittingOffset + 1); + } + if (msgs == null) { - LOG.error("fetch null message from offset {}", emittingOffset); + short fetchResponseCode = consumer.getAndResetFetchResponseCode(); + if (fetchResponseCode == ErrorMapping.OffsetOutOfRangeCode()) { + this.emittingOffset = consumer.getOffset(config.topic, partition, kafka.api.OffsetRequest.LatestTime()); + LOG.warn("reset kafka offset {}", emittingOffset); + }else if(fetchResponseCode == ErrorMapping.NotLeaderForPartitionCode()){ + consumer.setConsumer(null); + LOG.warn("current consumer is not leader, reset kafka simpleConsumer"); + }else{ + this.consumerSleepEndTime = System.currentTimeMillis() + 100; + LOG.warn("sleep until {}", consumerSleepEndTime); + } + LOG.warn("fetch null message from offset {}", emittingOffset); return; } - + int count = 0; for (MessageAndOffset msg : msgs) { count += 1; @@ -138,12 +155,20 @@ private void fillMessages() { LOG.debug("fillmessage fetched a message:{}, offset:{}", msg.message().toString(), msg.offset()); } long end = System.currentTimeMillis(); + if(count == 0){ + this.consumerSleepEndTime = System.currentTimeMillis() + 100; + LOG.warn("sleep until {}", consumerSleepEndTime); + } LOG.info("fetch message from partition:"+partition+", offset:" + emittingOffset+", size:"+msgs.sizeInBytes()+", count:"+count +", time:"+(end-start)); } catch (Exception e) { e.printStackTrace(); LOG.error(e.getMessage(),e); } } + + public boolean isSleepingConsumer(){ + return System.currentTimeMillis() < this.consumerSleepEndTime; + } public void commitState() { try { @@ -224,4 +249,4 @@ public KafkaConsumer getConsumer() { public void setConsumer(KafkaConsumer consumer) { this.consumer = consumer; } -} +} \ No newline at end of file From a201d98653d3742e966c0b558e20661129de2344 Mon Sep 17 00:00:00 2001 From: leihui Date: Sat, 4 Mar 2017 22:15:03 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E8=B0=83=E6=95=B4=E6=A0=BC=E5=BC=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../alibaba/jstorm/kafka/KafkaConsumer.java | 447 +++++++++--------- .../com/alibaba/jstorm/kafka/KafkaSpout.java | 79 ++-- .../jstorm/kafka/PartitionConsumer.java | 31 +- 3 files changed, 264 insertions(+), 293 deletions(-) diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaConsumer.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaConsumer.java index 348c52dd3..4ed1cbfcf 100644 --- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaConsumer.java +++ b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaConsumer.java @@ -34,240 +34,217 @@ */ public class KafkaConsumer { - private static Logger LOG = Logger.getLogger(KafkaConsumer.class); - - public static final int NO_OFFSET = -1; - - private int status; - private SimpleConsumer consumer = null; - - private KafkaSpoutConfig config; - private LinkedList brokerList; - private int brokerIndex; - private Broker leaderBroker; - private short fetchResponseCode = 0; - - public KafkaConsumer(KafkaSpoutConfig config) { - this.config = config; - this.brokerList = new LinkedList(config.brokers); - this.brokerIndex = 0; - } - - public ByteBufferMessageSet fetchMessages(int partition, long offset) - throws IOException { - - String topic = config.topic; - FetchRequest req = new FetchRequestBuilder().clientId(config.clientId) - .addFetch(topic, partition, offset, config.fetchMaxBytes) - .maxWait(config.fetchWaitMaxMs).build(); - FetchResponse fetchResponse = null; - SimpleConsumer simpleConsumer = null; - try { - simpleConsumer = findLeaderConsumer(partition); - if (simpleConsumer == null) { - // LOG.error(message); - return null; - } - fetchResponse = simpleConsumer.fetch(req); - } catch (Exception e) { - if (e instanceof ConnectException - || e instanceof SocketTimeoutException - || e instanceof IOException - || e instanceof UnresolvedAddressException) { - LOG.warn("Network error when fetching messages:", e); - if (simpleConsumer != null) { - String host = simpleConsumer.host(); - int port = simpleConsumer.port(); - simpleConsumer = null; - throw new KafkaException( - "Network error when fetching messages: " + host - + ":" + port + " , " + e.getMessage(), e); - } - - } else { - throw new RuntimeException(e); - } - } - if (fetchResponse.hasError()) { - fetchResponseCode = fetchResponse.errorCode(topic, partition); - if (fetchResponseCode == ErrorMapping.OffsetOutOfRangeCode()) { - } - // if (code == ErrorMapping.OffsetOutOfRangeCode() && - // config.resetOffsetIfOutOfRange) { - // long startOffset = getOffset(topic, partition, - // config.startOffsetTime); - // offset = startOffset; - // } - if (leaderBroker != null) { - LOG.error("fetch data from kafka topic[" + config.topic - + "] host[" + leaderBroker.host() + ":" - + leaderBroker.port() + "] partition[" + partition - + "] error:" + fetchResponseCode); - } else { - - } - return null; - } else { - ByteBufferMessageSet msgs = fetchResponse.messageSet(topic, - partition); - return msgs; - } - } - - public short getAndResetFetchResponseCode() { - short code = this.fetchResponseCode; - this.fetchResponseCode = 0; - return code; - } - - private SimpleConsumer findLeaderConsumer(int partition) { - try { - if (consumer != null) { - return consumer; - } - PartitionMetadata metadata = findLeader(partition); - if (metadata == null) { - leaderBroker = null; - consumer = null; - return null; - } - leaderBroker = metadata.leader(); - consumer = new SimpleConsumer(leaderBroker.host(), - leaderBroker.port(), config.socketTimeoutMs, - config.socketReceiveBufferBytes, config.clientId); - - return consumer; - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } - return null; - } - - protected PartitionMetadata findLeader(int partition) { - PartitionMetadata returnMetaData = null; - int errors = 0; - int size = brokerList.size(); - - Host brokerHost = brokerList.get(brokerIndex); - try { - if (consumer == null) { - consumer = new SimpleConsumer(brokerHost.getHost(), - brokerHost.getPort(), config.socketTimeoutMs, - config.socketReceiveBufferBytes, config.clientId); - } - } catch (Exception e) { - LOG.warn(e.getMessage(), e); - consumer = null; - } - int i = brokerIndex; - loop: while (i < size && errors < size + 1) { - Host host = brokerList.get(i); - i = (i + 1) % size; - brokerIndex = i; // next index - try { - - if (consumer == null) { - consumer = new SimpleConsumer(host.getHost(), - host.getPort(), config.socketTimeoutMs, - config.socketReceiveBufferBytes, config.clientId); - } - List topics = Collections.singletonList(config.topic); - TopicMetadataRequest req = new TopicMetadataRequest(topics); - kafka.javaapi.TopicMetadataResponse resp = null; - try { - resp = consumer.send(req); - } catch (Exception e) { - errors += 1; - - LOG.error("findLeader error, broker:" + host.toString() - + ", will change to next broker index:" + (i + 1) - % size); - if (consumer != null) { - consumer.close(); - consumer = null; - } - continue; - } - - List metaData = resp.topicsMetadata(); - for (TopicMetadata item : metaData) { - for (PartitionMetadata part : item.partitionsMetadata()) { - if (part.partitionId() == partition) { - returnMetaData = part; - break loop; - } - } - } - - } catch (Exception e) { - LOG.error("Error communicating with Broker:" + host.toString() - + ", find Leader for partition:" + partition); - } finally { - if (consumer != null) { - consumer.close(); - consumer = null; - } - } - } - - return returnMetaData; - } - - public long getOffset(String topic, int partition, long startOffsetTime) { - SimpleConsumer simpleConsumer = findLeaderConsumer(partition); - - if (simpleConsumer == null) { - LOG.error("Error consumer is null get offset from partition:" - + partition); - return -1; - } - - TopicAndPartition topicAndPartition = new TopicAndPartition(topic, - partition); - Map requestInfo = new HashMap(); - requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo( - startOffsetTime, 1)); - OffsetRequest request = new OffsetRequest(requestInfo, - kafka.api.OffsetRequest.CurrentVersion(), - simpleConsumer.clientId()); - - long[] offsets = simpleConsumer.getOffsetsBefore(request).offsets( - topic, partition); - if (offsets.length > 0) { - return offsets[0]; - } else { - return NO_OFFSET; - } - } - - public void close() { - if (consumer != null) { - consumer.close(); - } - } - - public SimpleConsumer getConsumer() { - return consumer; - } - - public void setConsumer(SimpleConsumer consumer) { - this.consumer = consumer; - } - - public int getStatus() { - return status; - } - - public void setStatus(int status) { - this.status = status; - } - - public Broker getLeaderBroker() { - return leaderBroker; - } - - public void setLeaderBroker(Broker leaderBroker) { - this.leaderBroker = leaderBroker; - } + private static Logger LOG = Logger.getLogger(KafkaConsumer.class); + + public static final int NO_OFFSET = -1; + + private int status; + private SimpleConsumer consumer = null; + + private KafkaSpoutConfig config; + private LinkedList brokerList; + private int brokerIndex; + private Broker leaderBroker; + private short fetchResponseCode = 0; + + public KafkaConsumer(KafkaSpoutConfig config) { + this.config = config; + this.brokerList = new LinkedList(config.brokers); + this.brokerIndex = 0; + } + + public ByteBufferMessageSet fetchMessages(int partition, long offset) throws IOException { + + String topic = config.topic; + FetchRequest req = new FetchRequestBuilder().clientId(config.clientId).addFetch(topic, partition, offset, config.fetchMaxBytes) + .maxWait(config.fetchWaitMaxMs).build(); + FetchResponse fetchResponse = null; + SimpleConsumer simpleConsumer = null; + try { + simpleConsumer = findLeaderConsumer(partition); + if (simpleConsumer == null) { + // LOG.error(message); + return null; + } + fetchResponse = simpleConsumer.fetch(req); + } catch (Exception e) { + if (e instanceof ConnectException || e instanceof SocketTimeoutException || e instanceof IOException + || e instanceof UnresolvedAddressException) { + LOG.warn("Network error when fetching messages:", e); + if (simpleConsumer != null) { + String host = simpleConsumer.host(); + int port = simpleConsumer.port(); + simpleConsumer = null; + throw new KafkaException("Network error when fetching messages: " + host + ":" + port + " , " + e.getMessage(), e); + } + + } else { + throw new RuntimeException(e); + } + } + if (fetchResponse.hasError()) { + fetchResponseCode = fetchResponse.errorCode(topic, partition); + if (fetchResponseCode == ErrorMapping.OffsetOutOfRangeCode()) { + } +// if (code == ErrorMapping.OffsetOutOfRangeCode() && config.resetOffsetIfOutOfRange) { +// long startOffset = getOffset(topic, partition, config.startOffsetTime); +// offset = startOffset; +// } + if(leaderBroker != null) { + LOG.error("fetch data from kafka topic[" + config.topic + "] host[" + leaderBroker.host() + ":" + leaderBroker.port() + "] partition[" + + partition + "] error:" + fetchResponseCode); + }else { + + } + return null; + } else { + ByteBufferMessageSet msgs = fetchResponse.messageSet(topic, partition); + return msgs; + } + } + + public short getAndResetFetchResponseCode(){ + short code = this.fetchResponseCode; + this.fetchResponseCode = 0; + return code; + } + + private SimpleConsumer findLeaderConsumer(int partition) { + try { + if (consumer != null) { + return consumer; + } + PartitionMetadata metadata = findLeader(partition); + if (metadata == null) { + leaderBroker = null; + consumer = null; + return null; + } + leaderBroker = metadata.leader(); + consumer = new SimpleConsumer(leaderBroker.host(), leaderBroker.port(), config.socketTimeoutMs, config.socketReceiveBufferBytes, + config.clientId); + + return consumer; + } catch (Exception e) { + LOG.error(e.getMessage(), e); + } + return null; + } + + protected PartitionMetadata findLeader(int partition) { + PartitionMetadata returnMetaData = null; + int errors = 0; + int size = brokerList.size(); + + Host brokerHost = brokerList.get(brokerIndex); + try { + if (consumer == null) { + consumer = new SimpleConsumer(brokerHost.getHost(), brokerHost.getPort(), config.socketTimeoutMs, config.socketReceiveBufferBytes, + config.clientId); + } + } catch (Exception e) { + LOG.warn(e.getMessage(), e); + consumer = null; + } + int i = brokerIndex; + loop: while (i < size && errors < size + 1) { + Host host = brokerList.get(i); + i = (i + 1) % size; + brokerIndex = i; // next index + try { + + if (consumer == null) { + consumer = new SimpleConsumer(host.getHost(), host.getPort(), config.socketTimeoutMs, config.socketReceiveBufferBytes, + config.clientId); + } + List topics = Collections.singletonList(config.topic); + TopicMetadataRequest req = new TopicMetadataRequest(topics); + kafka.javaapi.TopicMetadataResponse resp = null; + try { + resp = consumer.send(req); + } catch (Exception e) { + errors += 1; + + LOG.error("findLeader error, broker:" + host.toString() + ", will change to next broker index:" + (i + 1) % size); + if (consumer != null) { + consumer.close(); + consumer = null; + } + continue; + } + + List metaData = resp.topicsMetadata(); + for (TopicMetadata item : metaData) { + for (PartitionMetadata part : item.partitionsMetadata()) { + if (part.partitionId() == partition) { + returnMetaData = part; + break loop; + } + } + } + + } catch (Exception e) { + LOG.error("Error communicating with Broker:" + host.toString() + ", find Leader for partition:" + partition); + } finally { + if (consumer != null) { + consumer.close(); + consumer = null; + } + } + } + + return returnMetaData; + } + + public long getOffset(String topic, int partition, long startOffsetTime) { + SimpleConsumer simpleConsumer = findLeaderConsumer(partition); + + if (simpleConsumer == null) { + LOG.error("Error consumer is null get offset from partition:" + partition); + return -1; + } + + TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition); + Map requestInfo = new HashMap(); + requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1)); + OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), simpleConsumer.clientId()); + + long[] offsets = simpleConsumer.getOffsetsBefore(request).offsets(topic, partition); + if (offsets.length > 0) { + return offsets[0]; + } else { + return NO_OFFSET; + } + } + + public void close() { + if (consumer != null) { + consumer.close(); + } + } + + public SimpleConsumer getConsumer() { + return consumer; + } + + public void setConsumer(SimpleConsumer consumer) { + this.consumer = consumer; + } + + public int getStatus() { + return status; + } + + public void setStatus(int status) { + this.status = status; + } + + public Broker getLeaderBroker() { + return leaderBroker; + } + + public void setLeaderBroker(Broker leaderBroker) { + this.leaderBroker = leaderBroker; + } } \ No newline at end of file diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpout.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpout.java index f5eb8da51..75e6ec206 100644 --- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpout.java +++ b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/KafkaSpout.java @@ -22,25 +22,24 @@ public class KafkaSpout implements IRichSpout { private static Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); protected SpoutOutputCollector collector; - + private long lastUpdateMs; PartitionCoordinator coordinator; - + private KafkaSpoutConfig config; - + private ZkState zkState; - + public KafkaSpout() { - + } - + public KafkaSpout(KafkaSpoutConfig config) { this.config = config; } - + @Override - public void open(Map conf, TopologyContext context, - SpoutOutputCollector collector) { + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; if (this.config == null) { config = new KafkaSpoutConfig(); @@ -53,37 +52,40 @@ public void open(Map conf, TopologyContext context, @Override public void close() { - zkState.close(); + // TODO Auto-generated method stub + zkState.close(); } @Override public void activate() { + // TODO Auto-generated method stub + } @Override public void deactivate() { + // TODO Auto-generated method stub + } @Override public void nextTuple() { - Collection partitionConsumers = coordinator - .getPartitionConsumers(); + Collection partitionConsumers = coordinator.getPartitionConsumers(); boolean isAllSleeping = true; - for (PartitionConsumer consumer : partitionConsumers) { - if (!consumer.isSleepingConsumer()) { + for(PartitionConsumer consumer: partitionConsumers) { + if(!consumer.isSleepingConsumer() ){ isAllSleeping = false; EmitState state = consumer.emit(collector); - LOG.debug("====== partition " + consumer.getPartition() - + " emit message state is " + state); + LOG.debug("====== partition "+ consumer.getPartition() + " emit message state is "+state); } - // if(state != EmitState.EMIT_MORE) { - // currentPartitionIndex = (currentPartitionIndex+1) % consumerSize; - // } - // if(state != EmitState.EMIT_NONE) { - // break; - // } +// if(state != EmitState.EMIT_MORE) { +// currentPartitionIndex = (currentPartitionIndex+1) % consumerSize; +// } +// if(state != EmitState.EMIT_NONE) { +// break; +// } } - if (isAllSleeping) { + if(isAllSleeping){ try { Thread.sleep(100); } catch (InterruptedException e) { @@ -91,33 +93,32 @@ public void nextTuple() { } } long now = System.currentTimeMillis(); - if ((now - lastUpdateMs) > config.offsetUpdateIntervalMs) { - commitState(); - } - + if((now - lastUpdateMs) > config.offsetUpdateIntervalMs) { + commitState(); + } + + } - + public void commitState() { - lastUpdateMs = System.currentTimeMillis(); - for (PartitionConsumer consumer : coordinator.getPartitionConsumers()) { + lastUpdateMs = System.currentTimeMillis(); + for(PartitionConsumer consumer: coordinator.getPartitionConsumers()) { consumer.commitState(); - } - + } + } @Override public void ack(Object msgId) { - KafkaMessageId messageId = (KafkaMessageId) msgId; - PartitionConsumer consumer = coordinator.getConsumer(messageId - .getPartition()); + KafkaMessageId messageId = (KafkaMessageId)msgId; + PartitionConsumer consumer = coordinator.getConsumer(messageId.getPartition()); consumer.ack(messageId.getOffset()); } @Override public void fail(Object msgId) { - KafkaMessageId messageId = (KafkaMessageId) msgId; - PartitionConsumer consumer = coordinator.getConsumer(messageId - .getPartition()); + KafkaMessageId messageId = (KafkaMessageId)msgId; + PartitionConsumer consumer = coordinator.getConsumer(messageId.getPartition()); consumer.fail(messageId.getOffset()); } @@ -130,5 +131,7 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) { public Map getComponentConfiguration() { return null; } + + } \ No newline at end of file diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java index 4edd7b64c..d90258ff4 100644 --- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java +++ b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java @@ -2,6 +2,7 @@ import java.nio.ByteBuffer; + import java.util.Arrays; import java.util.HashMap; import java.util.LinkedList; @@ -70,16 +71,17 @@ public PartitionConsumer(Map conf, KafkaSpoutConfig config, int partition, ZkSta } try { - if (config.fromBeginning) { - emittingOffset = consumer.getOffset(config.topic, partition, kafka.api.OffsetRequest.EarliestTime()); - } else { - if (jsonOffset == null) { - lastCommittedOffset = consumer.getOffset(config.topic, partition, kafka.api.OffsetRequest.LatestTime()); + if (jsonOffset == null) { + if (config.fromBeginning) { + emittingOffset = consumer.getOffset(config.topic, partition, kafka.api.OffsetRequest.EarliestTime()); } else { - lastCommittedOffset = jsonOffset; + lastCommittedOffset = consumer.getOffset(config.topic, partition, kafka.api.OffsetRequest.LatestTime()); } - emittingOffset = lastCommittedOffset; + } else { + lastCommittedOffset = jsonOffset; } + emittingOffset = lastCommittedOffset; + } catch (Exception e) { LOG.error(e.getMessage(), e); } @@ -124,20 +126,13 @@ private void fillMessages() { ByteBufferMessageSet msgs; try { long start = System.currentTimeMillis(); - if(config.fromBeginning){ - msgs = consumer.fetchMessages(partition, emittingOffset); - }else { - msgs = consumer.fetchMessages(partition, emittingOffset + 1); - } - + msgs = consumer.fetchMessages(partition, emittingOffset + 1); + if (msgs == null) { short fetchResponseCode = consumer.getAndResetFetchResponseCode(); if (fetchResponseCode == ErrorMapping.OffsetOutOfRangeCode()) { this.emittingOffset = consumer.getOffset(config.topic, partition, kafka.api.OffsetRequest.LatestTime()); LOG.warn("reset kafka offset {}", emittingOffset); - }else if(fetchResponseCode == ErrorMapping.NotLeaderForPartitionCode()){ - consumer.setConsumer(null); - LOG.warn("current consumer is not leader, reset kafka simpleConsumer"); }else{ this.consumerSleepEndTime = System.currentTimeMillis() + 100; LOG.warn("sleep until {}", consumerSleepEndTime); @@ -155,10 +150,6 @@ private void fillMessages() { LOG.debug("fillmessage fetched a message:{}, offset:{}", msg.message().toString(), msg.offset()); } long end = System.currentTimeMillis(); - if(count == 0){ - this.consumerSleepEndTime = System.currentTimeMillis() + 100; - LOG.warn("sleep until {}", consumerSleepEndTime); - } LOG.info("fetch message from partition:"+partition+", offset:" + emittingOffset+", size:"+msgs.sizeInBytes()+", count:"+count +", time:"+(end-start)); } catch (Exception e) { e.printStackTrace();