Skip to content

Commit

Permalink
Fix an issue where the stdout of the retried command
Browse files Browse the repository at this point in the history
was not correctly captured on the second try.
  • Loading branch information
minfrin committed Feb 18, 2020
1 parent 8a720e1 commit 319318a
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 63 deletions.
8 changes: 7 additions & 1 deletion ChangeLog
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@

CHanges with v1.0.3
Changes with v1.0.4

*) Fix an issue where the stdout of the retried command
was not correctly captured on the second try. [Graham
Leggett]

Changes with v1.0.3

*) Switch from help2man to txt2man. [Graham Leggett]

Expand Down
2 changes: 1 addition & 1 deletion configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# Process this file with autoconf to produce a configure script.

AC_PREREQ(2.61)
AC_INIT(retry, 1.0.3, minfrin@sharp.fm)
AC_INIT(retry, 1.0.4, minfrin@sharp.fm)
AC_CONFIG_AUX_DIR(build-aux)
AC_CONFIG_MACRO_DIRS([m4])
AM_INIT_AUTOMAKE([dist-bzip2])
Expand Down
150 changes: 89 additions & 61 deletions retry.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,17 @@ static struct option long_options[] =
};

typedef struct pump_t {
struct pollfd *rpollfd;
struct pollfd *wpollfd;
void *base;
size_t len;
off_t offset;
int rfd;
int wfd;
int read_closed:1;
int write_closed:1;
int exit_on_close:1;
int no_close:1;
int send_eof:1;
} pump_t;

Expand Down Expand Up @@ -232,38 +237,44 @@ static int status_match(int status, const char *criteria)
return 1;
}

static int pump(const char *name, pump_t *pumps, struct pollfd *fds)
static int pump(const char *name, pump_t *pumps)
{
nfds_t nfds = PUMPS * 2;
int i;

while (1) {
int stay = 0;
nfds_t nfds = 0;
struct pollfd fds[PUMPS * 2] = { 0 };

for (i = 0; i < PUMPS; i++) {

fds[OFFSET(i, READ_FD)].events = 0;
fds[OFFSET(i, WRITE_FD)].events = 0;
pumps[i].rpollfd = NULL;
pumps[i].wpollfd = NULL;

if (!pumps[i].read_closed) {
fds[OFFSET(i, READ_FD)].events = POLLIN;
stay = 1;
pumps[i].rpollfd = fds + nfds;
fds[nfds].events = POLLIN;
fds[nfds].fd = pumps[i].rfd;
fds[nfds].revents = 0;
nfds++;
}

if (!pumps[i].write_closed
&& (pumps[i].send_eof || pumps[i].len > pumps[i].offset)) {
fds[OFFSET(i, WRITE_FD)].events = POLLOUT;
stay = 1;
pumps[i].wpollfd = fds + nfds;
fds[nfds].events = POLLOUT;
fds[nfds].fd = pumps[i].wfd;
fds[nfds].revents = 0;
nfds++;
}

if (pumps[i].read_closed && pumps[i].exit_on_close) {
stay = 0;
nfds = 0;
break;
}

}

if (!stay) {
if (!nfds) {
break;
}

Expand All @@ -277,77 +288,88 @@ static int pump(const char *name, pump_t *pumps, struct pollfd *fds)

for (i = 0; i < PUMPS; i++) {

if (fds[OFFSET(i, READ_FD)].revents & POLLIN) {
void *base;
ssize_t num;
if (pumps[i].rpollfd) {

base = realloc(pumps[i].base, pumps[i].len + BUFFER_SIZE);
if (!base) {
if (pumps[i].rpollfd->revents & POLLIN) {
void *base;
ssize_t num;

fprintf(stderr, "%s: Out of memory, giving up.\n", name);
base = realloc(pumps[i].base, pumps[i].len + BUFFER_SIZE);
if (!base) {

return -1;
}
pumps[i].base = base;
fprintf(stderr, "%s: Out of memory, giving up.\n",
name);

return -1;
}
pumps[i].base = base;

num = read(pumps[i].rfd, base + pumps[i].len, BUFFER_SIZE);

num = read(fds[OFFSET(i, READ_FD)].fd, base + pumps[i].len, BUFFER_SIZE);
if (num < 0) {

if (num < 0) {
fprintf(stderr, "%s: Could not read, giving up: %s\n",
name, strerror(errno));

fprintf(stderr, "%s: Could not read, giving up: %s\n", name,
strerror(errno));
return -1;
} else if (num == 0) {
pumps[i].read_closed = 1;
pumps[i].send_eof = 1;
close(pumps[i].rfd);
} else {
pumps[i].len += num;
}

return -1;
}
else if (num == 0) {

if ((pumps[i].rpollfd->revents & POLLHUP)
|| (pumps[i].rpollfd->revents & POLLERR)
|| (pumps[i].rpollfd->revents & POLLNVAL)) {

pumps[i].read_closed = 1;
pumps[i].send_eof = 1;
}
else {
pumps[i].len += num;
close(pumps[i].rfd);

}

}

if ((fds[OFFSET(i, READ_FD)].revents & POLLHUP) ||
(fds[OFFSET(i, READ_FD)].revents & POLLERR) ||
(fds[OFFSET(i, READ_FD)].revents & POLLNVAL)) {

pumps[i].read_closed = 1;
pumps[i].send_eof = 1;
if (pumps[i].wpollfd) {

}
if (pumps[i].wpollfd->revents & POLLOUT) {
ssize_t num;

if (fds[OFFSET(i, WRITE_FD)].revents & POLLOUT) {
ssize_t num;
num = write(pumps[i].wfd, pumps[i].base + pumps[i].offset,
pumps[i].len - pumps[i].offset);

num = write(fds[OFFSET(i, WRITE_FD)].fd,
pumps[i].base + pumps[i].offset,
pumps[i].len - pumps[i].offset);
if (num < 0) {

if (num < 0) {
fprintf(stderr, "%s: Could not write, giving up: %s\n",
name, strerror(errno));

fprintf(stderr, "%s: Could not write, giving up: %s\n", name,
strerror(errno));
return -1;
} else {
pumps[i].offset += num;
}

return -1;
}
else {
pumps[i].offset += num;
}
if (pumps[i].read_closed
&& pumps[i].offset == pumps[i].len) {

if (pumps[i].read_closed && pumps[i].offset == pumps[i].len) {
pumps[i].write_closed = 1;
close(pumps[i].wfd);

pumps[i].write_closed = 1;
}

}

}
if ((pumps[i].wpollfd->revents & POLLHUP)
|| (pumps[i].wpollfd->revents & POLLERR)
|| (pumps[i].wpollfd->revents & POLLNVAL)) {

if ((fds[OFFSET(i, WRITE_FD)].revents & POLLERR) ||
(fds[OFFSET(i, WRITE_FD)].revents & POLLNVAL)) {
pumps[i].write_closed = 1;
close(pumps[i].wfd);

pumps[i].write_closed = 1;
}

}

Expand Down Expand Up @@ -476,18 +498,17 @@ int main (int argc, char **argv)

/* parent */
else {
struct pollfd fds[PUMPS * 2];

/* handle stdin */

fds[OFFSET(STDIN_FD, READ_FD)].fd = STDIN_FD;
fds[OFFSET(STDIN_FD, WRITE_FD)].fd = inpair[WRITE_FD];
pumps[STDIN_FD].rfd = dup(STDIN_FD);
pumps[STDIN_FD].wfd = inpair[WRITE_FD];
close(inpair[READ_FD]);

/* handle stdout */

fds[OFFSET(STDOUT_FD, READ_FD)].fd = outpair[READ_FD];
fds[OFFSET(STDOUT_FD, WRITE_FD)].fd = STDOUT_FD;
pumps[STDOUT_FD].rfd = outpair[READ_FD];
pumps[STDOUT_FD].wfd = dup(STDOUT_FD);
close(outpair[WRITE_FD]);

/* prevent write to stdout, we will handle it later */
Expand All @@ -497,16 +518,19 @@ int main (int argc, char **argv)
pumps[STDOUT_FD].exit_on_close = 1;

/* pump all data */
if (pump(name, pumps, fds)) {
if (pump(name, pumps)) {
status = EXIT_FAILURE;
break;
}

close(inpair[WRITE_FD]);
close(outpair[READ_FD]);
close(pumps[STDIN_FD].rfd);
close(pumps[STDOUT_FD].wfd);

/* reset stdin in case we repeat the command */
pumps[STDIN_FD].offset = 0;
pumps[STDIN_FD].write_closed = 0;

/* wait for the child process to be done */
do {
Expand Down Expand Up @@ -543,6 +567,10 @@ int main (int argc, char **argv)
/* failure - write stdout to stderr */
fwrite(pumps[STDOUT_FD].base, pumps[STDOUT_FD].len, 1, stderr);

/* reset stdout for a go-around */
free(pumps[STDOUT_FD].base);
memset(&pumps[STDOUT_FD], 0, sizeof(pump_t));

if (delay) {
fprintf(stderr,
"%s: '%s' returned %d, backing off for %ld second%s and trying again...\n",
Expand Down

0 comments on commit 319318a

Please sign in to comment.