Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,9 @@ public AppendMessageResult appendMessagesInner(final MessageExt messageExt, fina
int endpos = currentPos + msgLen;
// alignment end position
int extraAppendSize = UNSAFE_PAGE_SIZE - endpos % UNSAFE_PAGE_SIZE;
if (extraAppendSize == UNSAFE_PAGE_SIZE) {
extraAppendSize = 0;
}
int actualAppendSize = msgLen + extraAppendSize;

this.fileChannel.position(currentPos);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1699,9 +1699,6 @@ public String getServiceName() {
public void run() {
setState(AbstractStateService.START);
TimerMessageStore.LOGGER.info(this.getServiceName() + " service start");
//Mark different rounds
boolean isRound = true;
Map<String, MessageExt> avoidDeleteLose = new HashMap<>();
while (!this.isStopped()) {
try {
setState(AbstractStateService.WAITING);
Expand All @@ -1718,18 +1715,9 @@ public void run() {
MessageExt msgExt = getMessageByCommitOffset(tr.getOffsetPy(), tr.getSizePy());
if (null != msgExt) {
if (needDelete(tr.getMagic()) && !needRoll(tr.getMagic())) {
//Clearing is performed once in each round.
//The deletion message is received first and the common message is received once
if (!isRound) {
isRound = true;
for (MessageExt messageExt : avoidDeleteLose.values()) {
addMetric(messageExt, 1);
}
avoidDeleteLose.clear();
}
if (msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY) != null && tr.getDeleteList() != null) {

avoidDeleteLose.put(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY), msgExt);
//Execute metric plus one for messages that fail to be deleted
addMetric(msgExt, 1);
tr.getDeleteList().add(msgExt.getProperty(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY));
}
tr.idempotentRelease();
Expand All @@ -1739,13 +1727,9 @@ public void run() {
if (null == uniqueKey) {
LOGGER.warn("No uniqueKey for msg:{}", msgExt);
}
//Mark ready for next round
if (isRound) {
isRound = false;
}
if (null != uniqueKey && tr.getDeleteList() != null && tr.getDeleteList().size() > 0
&& tr.getDeleteList().contains(buildDeleteKey(getRealTopic(msgExt), uniqueKey, storeConfig.isAppendTopicForTimerDeleteKey()))) {
avoidDeleteLose.remove(uniqueKey);
if (null != uniqueKey && tr.getDeleteList() != null && tr.getDeleteList().size() > 0 && tr.getDeleteList().contains(buildDeleteKey(getRealTopic(msgExt), uniqueKey, storeConfig.isAppendTopicForTimerDeleteKey()))) {
//Normally, it cancels out with the +1 above
addMetric(msgExt, -1);
doRes = true;
tr.idempotentRelease();
perfCounterTicks.getCounter("dequeue_delete").flow(1);
Expand Down
Loading