diff --git a/src/job.c b/src/job.c index 7a88bf2..a342a62 100644 --- a/src/job.c +++ b/src/job.c @@ -430,7 +430,7 @@ int processJobs(struct aeEventLoop *eventLoop, long long id, void *clientData) { int period = 100; /* 100 ms default period. */ int max = 10000; /* 10k jobs * 1000 milliseconds = 10M jobs/sec max. */ mstime_t now = mstime(), latency; - skiplistNode *current, *next; + skiplistNode *current; DISQUE_NOTUSED(eventLoop); DISQUE_NOTUSED(id); DISQUE_NOTUSED(clientData); @@ -449,8 +449,9 @@ int processJobs(struct aeEventLoop *eventLoop, long long id, void *clientData) { latencyStartMonitor(latency); server.mstime = now; /* Update it since it's used by processJob(). */ - current = server.awakeme->header->level[0].forward; - while(current && max--) { + skiplistIter iter; + skiplistRewind(server.awakeme, &iter); + while((current = skiplistNext(&iter)) != NULL && max--) { job *j = current->obj; #ifdef DEBUG_SCHEDULER @@ -464,16 +465,15 @@ int processJobs(struct aeEventLoop *eventLoop, long long id, void *clientData) { #endif if (j->awakeme > now) break; - next = current->level[0].forward; processJob(j); - current = next; } /* Try to block between 1 and 100 millseconds depending on how near * in time is the next async event to process. Note that because of * received commands or change in state jobs state may be modified so * we set a max time of 100 milliseconds to wakeup anyway. */ - current = server.awakeme->header->level[0].forward; + skiplistRewind(server.awakeme, &iter); + current = skiplistNext(&iter); if (current) { job *j = current->obj; period = server.mstime-j->awakeme; diff --git a/src/queue.c b/src/queue.c index 5827135..9452ade 100644 --- a/src/queue.c +++ b/src/queue.c @@ -767,11 +767,18 @@ void qpeekCommand(client *c) { newjobs = 1; } + skiplistIter iter; skiplistNode *sn = NULL; queue *q = lookupQueue(c->argv[1]); if (q != NULL) - sn = newjobs ? q->sl->tail : q->sl->header->level[0].forward; + { + if (newjobs) + skiplistRewindTail(server.awakeme, &iter); + else + skiplistRewind(server.awakeme, &iter); + sn = skiplistNext(&iter); + } if (sn == NULL) { addReply(c,shared.emptymultibulk); @@ -785,10 +792,7 @@ void qpeekCommand(client *c) { addReplyBulkCBuffer(c,j->id,JOB_ID_LEN); addReplyBulkCBuffer(c,j->body,sdslen(j->body)); returned++; - if (newjobs) - sn = sn->backward; - else - sn = sn->level[0].forward; + sn = skiplistNext(&iter); } setDeferredMultiBulkLength(c,deflen,returned); } diff --git a/src/skiplist.c b/src/skiplist.c index 0234896..06f301d 100644 --- a/src/skiplist.c +++ b/src/skiplist.c @@ -288,12 +288,11 @@ int main(void) { /* The following should fail. */ printf("\nInsert %s again: %p\n\n", words[2], skiplistInsert(sl,words[2])); - skiplistNode *x; - x = sl->header; - x = x->level[0].forward; - while(x) { - printf("%s\n", x->obj); - x = x->level[0].forward; + skiplistIter skiplist_iter; + skiplistNode *node; + skiplistRewind(sl, &skiplist_iter); + while ((node = skiplistNext(&skiplist_iter)) != NULL) { + printf("%s\n", node->obj); } printf("Searching for 'hello': %p\n", skiplistFind(sl,"hello")); diff --git a/src/skiplist.h b/src/skiplist.h index cba3415..b68e028 100644 --- a/src/skiplist.h +++ b/src/skiplist.h @@ -51,6 +51,11 @@ typedef struct skiplist { int level; } skiplist; +typedef struct skiplistIter { + skiplistNode *next; + int direction; +} skiplistIter; + skiplist *skiplistCreate(int (*compare)(const void *, const void *)); void skiplistFree(skiplist *sl); skiplistNode *skiplistInsert(skiplist *sl, void *obj); @@ -60,4 +65,30 @@ void *skiplistPopHead(skiplist *sl); void *skiplistPopTail(skiplist *sl); unsigned long skiplistLength(skiplist *sl); +/* Directions for iterators */ +#define SL_START_HEAD 0 +#define SL_START_TAIL 1 + +/* Create an iterator in the skiplist private iterator structure + * skiplistIter should be a pointer point to the struct. + * */ +#define skiplistRewind(skiplist, skiplistIter) do { \ + (skiplistIter)->next = (skiplist)->header->level[0].forward; \ + (skiplistIter)->direction = SL_START_HEAD; \ +} while(0) + +#define skiplistRewindTail(skiplist, skiplistIter) do { \ + (skiplistIter)->next = (skiplist)->tail; \ + (skiplistIter)->direction = SL_START_TAIL; \ +} while(0) + +inline skiplistNode *skiplistNext(skiplistIter *iter) { + skiplistNode *current = iter->next; + if (current != NULL) { + iter->next = iter->direction == SL_START_HEAD ? + current->level[0].forward : current->backward; + } + return current; +} + #endif