Skip to content

Commit

Permalink
add new -count option
Browse files Browse the repository at this point in the history
this makes it possible to use jobflow to cut a subset of line numbers
like a combination of `head` and `tail` utilities.

e.g.

    seq 100 | jobflow -skip=10 -count=10

will print the lines from 11-20
  • Loading branch information
rofl0r committed Dec 16, 2021
1 parent bd3b854 commit 6948c2a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 2 deletions.
14 changes: 12 additions & 2 deletions jobflow.c
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ typedef struct {
unsigned long numthreads;
unsigned long threads_running;
unsigned long skip;
unsigned long count;
unsigned long delayedspinup_interval; /* use a random delay until the queue gets filled for the first time.
the top value in ms can be supplied via a command line switch.
this option makes only sense if the interval is somewhat smaller than the
Expand Down Expand Up @@ -371,13 +372,15 @@ static int syntax(void) {
"until EOF is received. we call this 'pipe mode'.\n"
"\n"
"available options:\n\n"
"-skip N -threads N -resume -statefile=/tmp/state -delayedflush\n"
"-skip N -count N -threads N -resume -statefile=/tmp/state -delayedflush\n"
"-delayedspinup N -buffered -joinoutput -limits mem=16M,cpu=10\n"
"-eof=XXX\n"
"-exec ./mycommand {}\n"
"\n"
"-skip N\n"
" N=number of entries to skip\n"
"-count N\n"
" N=only process count lines (after skipping)\n"
"-threads N (alternative: -j N)\n"
" N=number of parallel processes to spawn\n"
"-resume\n"
Expand Down Expand Up @@ -453,6 +456,7 @@ static int parse_args(unsigned argc, char** argv) {
{"statefile", 0, 's', .dest.s = &prog_state.statefile },
{"eof", 0, 's', .dest.s = &prog_state.eof_marker },
{"skip", 0, 'i', .dest.i = &prog_state.skip },
{"count", 0, 'i', .dest.i = &prog_state.count },
{"resume", 0, 'b', .dest.b = &resume },
{"delayedflush", 0, 'b', .dest.b = &prog_state.delayedflush },
{"delayedspinup", 0, 'i', .dest.i = &prog_state.delayedspinup_interval },
Expand All @@ -463,6 +467,7 @@ static int parse_args(unsigned argc, char** argv) {
};

prog_state.numthreads = 1;
prog_state.count = -1UL;

for(i=1; i<argc; ++i) {
char *p = argv[i], *q = strchr(p, '=');
Expand Down Expand Up @@ -671,7 +676,8 @@ static char* mystrnrchr_chk(const char *in, int ch, size_t end) {
}

static int need_linecounter(void) {
return !!prog_state.skip || prog_state.statefile || prog_state.use_seqnr;
return !!prog_state.skip || prog_state.statefile ||
prog_state.use_seqnr || prog_state.count != -1UL;
}
static size_t count_linefeeds(const char *buf, size_t len) {
const char *p = buf, *e = buf+len;
Expand Down Expand Up @@ -723,7 +729,11 @@ static int dispatch_line(char* inbuf, size_t len, char** argv) {
}
if(!len) return 1;
}
} else if(prog_state.count != -1UL) {
if(!prog_state.count) return -1;
--prog_state.count;
}

if(!prog_state.cmd_startarg) {
write_all(1, inbuf, len);
return 1;
Expand Down
6 changes: 6 additions & 0 deletions test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ $JF -skip=5 < $(tmp).1 > $(tmp).2
tail -n 5 < $(tmp).1 > $(tmp).3
test_equal $(tmp).2 $(tmp).3

dotest "seq 10 catmode skip 5 count 3"
seq 10 > $(tmp).1
$JF -skip=5 -count=3 < $(tmp).1 > $(tmp).2
tail -n 5 < $(tmp).1 | head -n 3 > $(tmp).3
test_equal $(tmp).2 $(tmp).3

dotest "seq 10000 bulk skip 1337"
seq 10000 | sort -u > $(tmp).1
$JF -bulk=4K -skip=1337 -exec tests/stdin_printer.out < $(tmp).1 | sort -u > $(tmp).2
Expand Down

0 comments on commit 6948c2a

Please sign in to comment.