From 597f0a0cbec52341e81cf7ea2b6fcb1cfb7c4747 Mon Sep 17 00:00:00 2001 From: dengbo Date: Tue, 25 Jul 2017 11:40:26 +0800 Subject: [PATCH] fix kafka spout race condition bug --- .../jstorm/kafka/PartitionConsumer.java | 43 +++++++++++-------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java index 5ff061961..4fe8a8958 100644 --- a/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java +++ b/jstorm-utility/jstorm-kafka/src/main/java/com/alibaba/jstorm/kafka/PartitionConsumer.java @@ -40,7 +40,7 @@ import backtype.storm.utils.Utils; /** - * + * * @author feilaoda * */ @@ -53,13 +53,13 @@ static enum EmitState { private int partition; private KafkaConsumer consumer; - + private PartitionCoordinator coordinator; private KafkaSpoutConfig config; private LinkedList emittingMessages = new LinkedList(); - private SortedSet pendingOffsets = new TreeSet(); + private SortedSet pendingOffsets = Collections.synchronizedSortedSet(new TreeSet()); private SortedSet failedOffsets = new TreeSet(); private long emittingOffset; private long lastCommittedOffset; @@ -77,7 +77,6 @@ public PartitionConsumer(Map conf, KafkaSpoutConfig config, int partition, ZkSta try { Map json = offsetState.readJSON(zkPath()); if (json != null) { - // jsonTopologyId = (String)((Map)json.get("topology")); jsonOffset = (Long) json.get("offset"); } } catch (Throwable e) { @@ -140,19 +139,21 @@ private void fillMessages() { try { long start = System.currentTimeMillis(); msgs = consumer.fetchMessages(partition, emittingOffset + 1); - + if (msgs == null) { LOG.error("fetch null message from offset {}", emittingOffset); return; } - + int count = 0; - for (MessageAndOffset msg : msgs) { - count += 1; - emittingMessages.add(msg); - emittingOffset = msg.offset(); - pendingOffsets.add(emittingOffset); - LOG.debug("fillmessage fetched a message:{}, offset:{}", msg.message().toString(), msg.offset()); + synchronized (pendingOffsets) { + for (MessageAndOffset msg : msgs) { + count += 1; + emittingMessages.add(msg); + emittingOffset = msg.offset(); + pendingOffsets.add(emittingOffset); + LOG.debug("fillmessage fetched a message:{}, offset:{}", msg.message().toString(), msg.offset()); + } } long end = System.currentTimeMillis(); LOG.info("fetch message from partition:"+partition+", offset:" + emittingOffset+", size:"+msgs.sizeInBytes()+", count:"+count +", time:"+(end-start)); @@ -165,10 +166,16 @@ private void fillMessages() { public void commitState() { try { long lastOffset = 0; - if (pendingOffsets.isEmpty() || pendingOffsets.size() <= 0) { - lastOffset = emittingOffset; - } else { - lastOffset = pendingOffsets.first(); + synchronized (pendingOffsets) { + if (pendingOffsets.isEmpty() || pendingOffsets.size() <= 0) { + lastOffset = emittingOffset; + } else { + try { + lastOffset = pendingOffsets.first(); + } catch (NoSuchElementException e) { + lastOffset = emittingOffset; + } + } } if (lastOffset != lastCommittedOffset) { Map data = new HashMap(); @@ -188,7 +195,9 @@ public void commitState() { public void ack(long offset) { try { - pendingOffsets.remove(offset); + synchronized (pendingOffsets) { + pendingOffsets.remove(offset); + } } catch (Exception e) { LOG.error("offset ack error " + offset); }