-
Notifications
You must be signed in to change notification settings - Fork 1
/
queue.c
170 lines (137 loc) · 3.86 KB
/
queue.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
/*-------------------------------------------------------------------------
*
* queue.c: Job queue with thread pooling.
*
* Copyright (c) 2009-2011, NIPPON TELEGRAPH AND TELEPHONE CORPORATION
*
*-------------------------------------------------------------------------
*/
#include "pg_rman.h"
#include "pgut/pgut-pthread.h"
struct JobQueue
{
pthread_mutex_t mutex; /* protects the queue data */
pthread_cond_t anyjobs; /* fired if any jobs */
pthread_cond_t nojobs; /* fired if no jobs */
List *threads; /* list of worker thread handles */
List *jobs; /* pending jobs */
volatile int maximum; /* maximum allowed threads */
volatile int idle; /* number of idle threads */
volatile bool terminated; /* in termination? */
};
static void *worker_thread(void *arg);
JobQueue *
JobQueue_new(int nthreads)
{
JobQueue *queue;
Assert(nthreads >= 1);
queue = pgut_new(JobQueue);
pthread_mutex_init(&queue->mutex, NULL);
pthread_cond_init(&queue->anyjobs, NULL);
pthread_cond_init(&queue->nojobs, NULL);
queue->threads = NIL;
queue->jobs = NIL;
queue->maximum = nthreads;
queue->idle = 0;
queue->terminated = false;
return queue;
}
/*
* Job must be allocated with malloc. The ownership will be granted to
* the queue.
*/
void
JobQueue_push(JobQueue *queue, Job *job)
{
Assert(queue);
Assert(!queue->terminated);
Assert(job);
Assert(job->routine);
pgut_mutex_lock(&queue->mutex);
queue->jobs = lappend(queue->jobs, job);
if (queue->idle > 0)
pthread_cond_signal(&queue->anyjobs);
else if (list_length(queue->threads) < queue->maximum)
{
pthread_t th;
if (pthread_create(&th, NULL, worker_thread, queue))
ereport(ERROR,
(errcode_errno(),
errmsg("could not create thread: ")));
queue->threads = lappend(queue->threads, (void *) th);
Assert(list_length(queue->threads) <= queue->maximum);
}
pthread_mutex_unlock(&queue->mutex);
}
/* wait for all job finished */
void
JobQueue_wait(JobQueue *queue)
{
Assert(queue);
Assert(!queue->terminated);
pgut_mutex_lock(&queue->mutex);
while (queue->jobs || queue->idle < list_length(queue->threads))
pgut_cond_wait(&queue->nojobs, &queue->mutex);
pthread_mutex_unlock(&queue->mutex);
}
/* Free job queue. All pending jobs are also discarded. */
void
JobQueue_free(JobQueue *queue)
{
ListCell *cell;
if (queue == NULL)
return;
Assert(!queue->terminated);
/* Terminate all threads. */
pgut_mutex_lock(&queue->mutex);
queue->terminated = true;
pthread_cond_broadcast(&queue->anyjobs);
pthread_mutex_unlock(&queue->mutex);
/*
* Wait for all threads.
* XXX: cancel thread for long running jobs?
*/
foreach(cell, queue->threads)
{
pthread_t th = (pthread_t) lfirst(cell);
pthread_join(th, NULL);
}
list_free(queue->threads);
/* Free all pending jobs, though it must be avoided. */
list_free_deep(queue->jobs);
pthread_cond_destroy(&queue->nojobs);
pthread_cond_destroy(&queue->anyjobs);
pthread_mutex_destroy(&queue->mutex);
free(queue);
}
static void *
worker_thread(void *arg)
{
JobQueue *queue = (JobQueue *) arg;
pgut_mutex_lock(&queue->mutex);
while (!queue->terminated)
{
Job *job;
if (queue->jobs == NIL)
{
queue->idle++;
/* notify if done all jobs */
if (queue->idle >= list_length(queue->threads))
pthread_cond_broadcast(&queue->nojobs);
pgut_cond_wait(&queue->anyjobs, &queue->mutex);
queue->idle--;
if (queue->terminated)
break;
}
if (queue->jobs == NIL)
continue; /* job might have done by another worker */
job = linitial(queue->jobs);
queue->jobs = list_delete_first(queue->jobs);
pthread_mutex_unlock(&queue->mutex);
job->routine(job);
free(job);
pgut_mutex_lock(&queue->mutex);
}
pthread_mutex_unlock(&queue->mutex);
return NULL;
}