Skip to content

Commit b9ef5bc

Browse files
authored
Merge pull request #8 from D4-project/master
Redis_queue + multiprocess
2 parents e2a5d0f + 559b516 commit b9ef5bc

File tree

2 files changed

+36
-32
lines changed

2 files changed

+36
-32
lines changed

README.md

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,8 @@ files.sort()
6666
for rf in files:
6767
f = root + "/"+rf
6868
if f.endswith('pcap') == True:
69+
# Compressed files are supported too
70+
# if f.endswith('pcap.gz') == True:
6971
red.rpush("PCAPDJ_IN_QUEUE",f)
7072
```
7173

@@ -91,11 +93,11 @@ suricata -r /tmp/pcapbuffer
9193
```
9294

9395
Until now no packets are put in the buffer because pcapdj needs an
94-
authorization. PCAPDJ says that it is ready to process the pcapfile 1.pcap
95-
and that it waits for this authorization. For doing so, pcapdj puts the
96-
next file it wants to process in a queue called PCAPDJ_NEXT and it polls the
97-
key PCAPDJ_AUTH. The value of PCAPDJ_AUTH must correspond to the file pcapdj
98-
put previously in the queue PCAPDJ_NEXT.
96+
authorization. PCAPDJ says that it is ready to process the pcapfile 1.pcap and
97+
that it waits for this authorization. For doing so, pcapdj puts the next file
98+
it wants to process in a queue called PCAPDJ_NEXT and it searches for the given
99+
filename in the PCAPDJ_AUTH set. This way several pcadj processes can be managed
100+
by the same authorization script.
99101

100102
```
101103
[INFO] Next file to process /tmp/testpcaps/1.pcap
@@ -111,7 +113,7 @@ while True:
111113
pcapname = red.lpop("PCAPDJ_NEXT")
112114
if pcapname != None:
113115
print "Authorized file ",pcapname
114-
red.set("PCAPDJ_AUTH", pcapname)
116+
red.sadd("PCAPDJ_AUTH", pcapname)
115117
```
116118

117119
Wait until pcapdj and suricata are done

pcapdj.c

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,12 @@ statistics_t stats;
6464
void usage(void)
6565
{
6666

67-
printf("pcapdj [-h] -b namedpipe [-s redis_server] -p [redis_srv_port]\n\n");
67+
printf("pcapdj [-h] -b namedpipe [-s redis_server] -p [redis_srv_port] [-q redis_queue]\n\n");
6868
printf("Connects to the redis instance specified with by the redis_server\n");
6969
printf("and redis_srv_port.\n\n");
7070

71-
printf("Read a list of pcap-ng files from the queue PCAPDJ_IN_QUEUE.\n");
71+
printf("Read a list of pcap-ng files from the queue PCAPDJ_IN_QUEUE by default\n");
72+
printf("or the queue specified with the -q flag is set.\n");
7273
printf("Open the pcap-ng file and feed each packet to the fifo buffer\n");
7374
printf("specified by with the -b option. When a pcap file from the list\n");
7475
printf("has been transferred to the buffer update the queue PCAPDJ_PROCESSED\n");
@@ -176,11 +177,11 @@ void delete_next_file_queue(redisContext* ctx)
176177
}
177178

178179

179-
void delete_auth_file(redisContext* ctx)
180+
void delete_auth_file(redisContext* ctx, char* filename)
180181
{
181182
/* FIXME errors are ignored */
182183
redisReply * reply;
183-
reply = redisCommand(ctx, "DEL %s", AKEY);
184+
reply = redisCommand(ctx, "SREM %s %s", AKEY, filename);
184185
if (reply)
185186
freeReplyObject(reply);
186187
}
@@ -192,20 +193,15 @@ void wait_auth_to_proceed(redisContext* ctx, char* filename)
192193
/* If there is an error the program waits forever */
193194

194195
do {
195-
reply = redisCommand(ctx,"GET %s",AKEY);
196+
reply = redisCommand(ctx,"SISMEMBER %s %s",AKEY, filename);
196197
if (reply){
197-
if (reply->type == REDIS_REPLY_STRING) {
198-
/* Delete the authorized key. So in the next
199-
* iteration the AUTH_KEY is not there anymore and
200-
* the error message is not reated all the times
201-
*/
202-
delete_auth_file(ctx);
203-
if (!strncmp(reply->str, filename, strlen(filename))) {
198+
if (reply->type == REDIS_REPLY_INTEGER) {
199+
/* Delete the filename from the set if found */
200+
if (reply->integer == 1){
201+
delete_auth_file(ctx, filename);
204202
fprintf(stderr, "[INFO] Got authorization to process %s\n",filename);
205203
freeReplyObject(reply);
206204
return;
207-
}else{
208-
fprintf(stderr,"[ERROR] Got the wrong authorization. Waited for (%s). Got %s.\n", filename, reply->str);
209205
}
210206
}
211207
freeReplyObject(reply);
@@ -257,7 +253,7 @@ void process_file(redisContext* ctx, wtap_dumper* dumper, char* filename)
257253
}
258254
}
259255

260-
int process_input_queue(wtap_dumper *dumper, char* redis_server, int redis_srv_port)
256+
int process_input_queue(wtap_dumper *dumper, char* redis_server, int redis_srv_port, char* redis_queue)
261257
{
262258
redisContext* ctx;
263259
redisReply* reply;
@@ -268,10 +264,9 @@ int process_input_queue(wtap_dumper *dumper, char* redis_server, int redis_srv_p
268264
fprintf(stderr,"[ERROR] Could not connect to redis. %s.\n", ctx->errstr);
269265
return EXIT_FAILURE;
270266
}
271-
272267

273268
do {
274-
reply = redisCommand(ctx,"LPOP %s", PQUEUE);
269+
reply = redisCommand(ctx,"LPOP %s", redis_queue);
275270
if (!reply){
276271
fprintf(stderr,"[ERROR] Redis error %s\n",ctx->errstr);
277272
return EXIT_FAILURE;
@@ -320,36 +315,40 @@ void init(void)
320315
int main(int argc, char* argv[])
321316
{
322317

323-
int opt;
324-
int r;
318+
int opt, r, redis_srv_port, write_err;
325319
char* redis_server;
326-
int redis_srv_port;
327-
char *namedpipe;
320+
char* namedpipe;
321+
char* redis_queue;
328322
FILE *fifo;
329323
wtap_dumper *pdh = NULL;
330324
wtap_dump_params params = WTAP_DUMP_PARAMS_INIT;
331-
int write_err;
332325

333326
init();
334327

335328
namedpipe = calloc(128,1);
336329
assert(namedpipe);
337-
330+
331+
redis_queue = calloc(128,1);
332+
assert(redis_queue);
333+
338334
redis_server = calloc(64,1);
339335
assert(redis_server);
340336

341337
redis_srv_port = 6379;
342-
while ((opt = getopt(argc, argv, "b:hs:p:")) != -1) {
338+
while ((opt = getopt(argc, argv, "b:hs:p:q:")) != -1) {
343339
switch (opt) {
344340
case 's':
345-
strncpy(redis_server,optarg,64);
341+
strncpy(redis_server, optarg, 64);
346342
break;
347343
case 'p':
348344
redis_srv_port = atoi(optarg);
349345
break;
350346
case 'b':
351347
strncpy(namedpipe , optarg, 128);
352348
break;
349+
case 'q':
350+
strncpy(redis_queue , optarg, 128);
351+
break;
353352
case 'h':
354353
usage();
355354
return EXIT_SUCCESS;
@@ -365,6 +364,8 @@ int main(int argc, char* argv[])
365364
fprintf(stderr,"[ERROR] A named pipe must be specified\n");
366365
return EXIT_FAILURE;
367366
}
367+
if (!redis_queue[0])
368+
strncpy(redis_queue,PQUEUE,128);
368369

369370
fifo = fopen(namedpipe, "wb");
370371
if (fifo == NULL) {
@@ -373,13 +374,14 @@ int main(int argc, char* argv[])
373374

374375
fprintf(stderr, "[INFO] redis_server = %s\n",redis_server);
375376
fprintf(stderr, "[INFO] redis_port = %d\n",redis_srv_port);
377+
fprintf(stderr, "[INFO] redis_queue = %s\n", redis_queue);
376378
fprintf(stderr, "[INFO] named pipe = %s\n", namedpipe);
377379
fprintf(stderr, "[INFO] pid = %d\n",(int)getpid());
378380

379381
params.encap = WTAP_ENCAP_ETHERNET;
380382
pdh = wtap_dump_fdopen(fileno(fifo), WTAP_FILE_TYPE_SUBTYPE_PCAPNG, WTAP_UNCOMPRESSED, &params, &write_err);
381383
if (pdh != NULL){
382-
r = process_input_queue(pdh, redis_server, redis_srv_port);
384+
r = process_input_queue(pdh, redis_server, redis_srv_port, redis_queue);
383385
if (r == EXIT_FAILURE) {
384386
fprintf(stderr,"[ERROR] Something went wrong in during processing");
385387
}else{

0 commit comments

Comments
 (0)