Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/job.c
Original file line number Diff line number Diff line change
Expand Up @@ -430,7 +430,7 @@ int processJobs(struct aeEventLoop *eventLoop, long long id, void *clientData) {
int period = 100; /* 100 ms default period. */
int max = 10000; /* 10k jobs * 1000 milliseconds = 10M jobs/sec max. */
mstime_t now = mstime(), latency;
skiplistNode *current, *next;
skiplistNode *current;
DISQUE_NOTUSED(eventLoop);
DISQUE_NOTUSED(id);
DISQUE_NOTUSED(clientData);
Expand All @@ -449,8 +449,9 @@ int processJobs(struct aeEventLoop *eventLoop, long long id, void *clientData) {

latencyStartMonitor(latency);
server.mstime = now; /* Update it since it's used by processJob(). */
current = server.awakeme->header->level[0].forward;
while(current && max--) {
skiplistIter iter;
skiplistRewind(server.awakeme, &iter);
while((current = skiplistNext(&iter)) != NULL && max--) {
job *j = current->obj;

#ifdef DEBUG_SCHEDULER
Expand All @@ -464,16 +465,15 @@ int processJobs(struct aeEventLoop *eventLoop, long long id, void *clientData) {
#endif

if (j->awakeme > now) break;
next = current->level[0].forward;
processJob(j);
current = next;
}

/* Try to block between 1 and 100 millseconds depending on how near
* in time is the next async event to process. Note that because of
* received commands or change in state jobs state may be modified so
* we set a max time of 100 milliseconds to wakeup anyway. */
current = server.awakeme->header->level[0].forward;
skiplistRewind(server.awakeme, &iter);
current = skiplistNext(&iter);
if (current) {
job *j = current->obj;
period = server.mstime-j->awakeme;
Expand Down
14 changes: 9 additions & 5 deletions src/queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -767,11 +767,18 @@ void qpeekCommand(client *c) {
newjobs = 1;
}

skiplistIter iter;
skiplistNode *sn = NULL;
queue *q = lookupQueue(c->argv[1]);

if (q != NULL)
sn = newjobs ? q->sl->tail : q->sl->header->level[0].forward;
{
if (newjobs)
skiplistRewindTail(server.awakeme, &iter);
else
skiplistRewind(server.awakeme, &iter);
sn = skiplistNext(&iter);
}

if (sn == NULL) {
addReply(c,shared.emptymultibulk);
Expand All @@ -785,10 +792,7 @@ void qpeekCommand(client *c) {
addReplyBulkCBuffer(c,j->id,JOB_ID_LEN);
addReplyBulkCBuffer(c,j->body,sdslen(j->body));
returned++;
if (newjobs)
sn = sn->backward;
else
sn = sn->level[0].forward;
sn = skiplistNext(&iter);
}
setDeferredMultiBulkLength(c,deflen,returned);
}
11 changes: 5 additions & 6 deletions src/skiplist.c
Original file line number Diff line number Diff line change
Expand Up @@ -288,12 +288,11 @@ int main(void) {
/* The following should fail. */
printf("\nInsert %s again: %p\n\n", words[2], skiplistInsert(sl,words[2]));

skiplistNode *x;
x = sl->header;
x = x->level[0].forward;
while(x) {
printf("%s\n", x->obj);
x = x->level[0].forward;
skiplistIter skiplist_iter;
skiplistNode *node;
skiplistRewind(sl, &skiplist_iter);
while ((node = skiplistNext(&skiplist_iter)) != NULL) {
printf("%s\n", node->obj);
}

printf("Searching for 'hello': %p\n", skiplistFind(sl,"hello"));
Expand Down
31 changes: 31 additions & 0 deletions src/skiplist.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ typedef struct skiplist {
int level;
} skiplist;

typedef struct skiplistIter {
skiplistNode *next;
int direction;
} skiplistIter;

skiplist *skiplistCreate(int (*compare)(const void *, const void *));
void skiplistFree(skiplist *sl);
skiplistNode *skiplistInsert(skiplist *sl, void *obj);
Expand All @@ -60,4 +65,30 @@ void *skiplistPopHead(skiplist *sl);
void *skiplistPopTail(skiplist *sl);
unsigned long skiplistLength(skiplist *sl);

/* Directions for iterators */
#define SL_START_HEAD 0
#define SL_START_TAIL 1

/* Create an iterator in the skiplist private iterator structure
* skiplistIter should be a pointer point to the struct.
* */
#define skiplistRewind(skiplist, skiplistIter) do { \
(skiplistIter)->next = (skiplist)->header->level[0].forward; \
(skiplistIter)->direction = SL_START_HEAD; \
} while(0)

#define skiplistRewindTail(skiplist, skiplistIter) do { \
(skiplistIter)->next = (skiplist)->tail; \
(skiplistIter)->direction = SL_START_TAIL; \
} while(0)

inline skiplistNode *skiplistNext(skiplistIter *iter) {
skiplistNode *current = iter->next;
if (current != NULL) {
iter->next = iter->direction == SL_START_HEAD ?
current->level[0].forward : current->backward;
}
return current;
}

#endif