-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.c
138 lines (123 loc) · 2.56 KB
/
worker.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#include "crbehave_private.h"
#include <poll.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <err.h>
#include <assert.h>
#include <sys/wait.h>
#include <string.h>
struct crbehave_worker {
pid_t pid;
int sno;
};
static struct pollfd *pollfds;
static struct crbehave_worker *workers;
static int nworkers, maxworkers;
int
init_workers(int n)
{
maxworkers = n;
pollfds = calloc(maxworkers, sizeof(struct pollfd));
if (pollfds == NULL)
return -1;
workers = calloc(maxworkers, sizeof(struct crbehave_worker));
if (workers == NULL)
return -1;
return 0;
}
void
free_workers()
{
if (pollfds != NULL) {
free(pollfds);
pollfds = NULL;
}
if (workers != NULL) {
free(workers);
workers = NULL;
}
}
/*
* Returns 0 if the queue is full.
*/
int
crbehave_queue_worker(int sno, void (*workfunc)(int, void *), void *data)
{
int fds[2];
pid_t pid;
if (nworkers == maxworkers)
return 0;
if (pipe(fds) != 0)
err(1, "pipe");
pid = fork();
if (pid == 0) {
close(fds[0]);
workfunc(fds[1], data);
fflush(stdout);
close(fds[1]);
_exit(0);
} else {
close(fds[1]);
pollfds[nworkers].fd = fds[0];
pollfds[nworkers].events = POLLIN;
workers[nworkers].pid = pid;
workers[nworkers].sno = sno;
nworkers++;
}
return 1;
}
/*
* Returns the number of workers still left.
*/
int
crbehave_reap_workers(int *pass, int *fail)
{
int nready;
int i, n;
int status;
unsigned char c;
if (nworkers == 0)
return nworkers;
nready = poll(pollfds, nworkers, -1);
if (nready == -1)
err(1, "poll");
if (nready == 0)
errx(1, "time out");
while (nready--) {
for (i = 0; i < nworkers; i++) {
if ((pollfds[i].revents & (POLLERR|POLLNVAL)))
errx(1, "bad fd %d", pollfds[i].fd);
if ((pollfds[i].revents & (POLLIN|POLLHUP)))
break;
}
assert(i != nworkers);
if (i == nworkers)
break;
if ((n = read(pollfds[i].fd, &c, 1)) != 1) {
if (n < 0)
warn("read");
(*fail)++;
} else if (c != '1') {
(*fail)++;
} else {
(*pass)++;
}
close(pollfds[i].fd);
pollfds[i].fd = -1;
pollfds[i].events = 0;
waitpid(workers[i].pid, &status, 0);
if (WIFSIGNALED(status))
printf("%3d\tfail\tterminated by signal %d\n",
workers[i].sno, WTERMSIG(status));
else if (WIFEXITED(status) && WEXITSTATUS(status) != 0)
printf("%3d\tfail\texited with abnormal status %d\n",
workers[i].sno, WEXITSTATUS(status));
nworkers--;
memmove(&pollfds[i], &pollfds[i + 1],
sizeof(struct pollfd) * (nworkers - i));
memmove(&workers[i], &workers[i + 1],
sizeof(*workers) * (nworkers - i));
}
return nworkers;
}