-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdsink.cpp
1586 lines (1462 loc) · 49 KB
/
dsink.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
Simple data sink code for dserver
SvirLex, ITEP, 2016
*/
#define _FILE_OFFSET_BITS 64
#include <arpa/inet.h>
#include <ctype.h>
#include <errno.h>
#include <fcntl.h>
#include <libconfig.h>
#include <netdb.h>
#include <netinet/in.h>
#include <readline/history.h>
#include <readline/readline.h>
#include <signal.h>
#include <stdarg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <time.h>
#include <unistd.h>
#include "dsink.h"
#include "dmodule.h"
#include "recformat.h"
/* Global variables */
struct cfg_struct {
int InPort; // Data port 0xA336
int OutPort; // Out port 0xB230
int udpPort; // slow control port - 15629
char MyName[MAXSTR]; // The server host name
char SlaveList[MAXCON][MAXSTR]; // Crate host names list
int NSlaves; // number of crates
char SlaveCMD[MAXSTR]; // Command to start slave
int TriggerMasterCrate; // Trigger master crate (index in slave list)
int TriggerMasterModule; // Trigger master module
int VetoMasterCrate; // Veto master crate (index in slave list)
int VetoMasterModule; // Veto master module
char LogFile[MAXSTR]; // dsink log file name
char LogTermCMD[MAXSTR]; // start log view in a separate window
char XilinxFirmware[MAXSTR]; // Xilinx firmware
// char InitScript[MAXSTR]; // initialize modules
// char StartScript[MAXSTR]; // put vme into acquire mode. Agruments: server, port
// char StopScript[MAXSTR]; // stop acquire mode
// char InhibitScript[MAXSTR]; // Inhibit triggers
// char EnableScript[MAXSTR]; // Enables triggers
int MaxEvent; // Size of Event cache
char CheckDiskScript[MAXSTR]; // the script is called before new file in auto mode is written
char AutoName[MAXSTR]; // auto file name format
int AutoTime; // half an hour
int AutoSize; // in MBytes (2^20 bytes)
char ConfSavePattern[MAXSTR]; // pattern to copy configuration when dsink reads it
char LogSavePattern[MAXSTR]; // Pattern to rename the old log file before compression
int PeriodicTriggerPeriod; // Period of the pulser trigger, ms. 0 - disabled, Maximum - 2^13-1.
char PlatformPositionFile[MAXSTR]; // File with platform position (internal units = 1/6 of mm)
int MaxInitAttempts; // Maximum number of attempts for automatic init
int LiftTimeout; // Timeout after lift was moved (s)
} Config;
struct run_struct {
FILE *fLog; // Log file
pid_t fLogPID; // Log file broser PID
FILE *fData; // Data file
char fDataName[MAXSTR]; // Data file name
struct rec_header_struct fHead; // record header to write data file
void *wData; // mememory for write data
int wDataSize; // size of wData
int fdPort; // Port for input data connections
struct slave_struct Slave[MAXCON]; // Slave VME connections
con_struct Con[MAXCON]; // Data connections
int NCon; // Number of data connections
int fdOut; // port for output data connections
struct client_struct Client[MAXCON]; // output data clients
int iStop; // Quit flag
int iRun; // DAQ running flag
int iAuto; // Auto change file mode
int Initialized; // VME modules initialized
Dmodule *WFD[MAXWFD]; // class WFD modules for data processing
struct event_struct *Evt; // Pointer to event cache
struct event_struct *EvtCopy; // Pointer to event cache copy (for rotation)
int lTokenBase; // long token of the first even in the cache
long TypeStat[8]; // record types statistics
long RecStat[2]; // events/selftriggers in the output file
time_t LastFileStarted; // time() when the last file was started
long FileCounter; // bytes to the last file
int RestartFlag; // flag restart
int SuspendFlag; // flag to suspend run while platform is moving
time_t ReleaseTime; // time to continue after suspend
int udpPort; // slow control port
} Run;
/* Functions */
/* Add record to the event */
void Add2Event(int num, struct blkinfo_struct *info)
{
int k;
char *ptr;
int new_len;
// Find our event buffer
k = info->lToken - Run.lTokenBase;
if (k < 0) {
Log(TXT_ERROR "DSINK: Internal error - negative shift in Event cache: long token = %d token base = %d Module %d.\n",
info->lToken, Run.lTokenBase, num);
return;
}
if (k >= Config.MaxEvent) {
Log(TXT_ERROR "DSINK: Event cache of %d events looks too small: long token = %d token base = %d Module %d.\n",
Config.MaxEvent, info->lToken, Run.lTokenBase, num);
Run.RestartFlag = 1;
return;
}
// Check memory
new_len = ((info->data[0] & 0x1FF) + 1) * sizeof(short);
if (new_len + Run.Evt[k].len > Run.Evt[k].size) {
ptr = (char *)realloc(Run.Evt[k].data, Run.Evt[k].size + MCHUNK);
if (!ptr) {
Log(TXT_FATAL "DSINK: Out of memory: %m\n");
Run.iStop = 1;
return;
}
Run.Evt[k].data = ptr;
Run.Evt[k].size += MCHUNK;
}
// Put module number in the place of token - 12 LS bits of data[1]
info->data[1] &= 0xF000;
info->data[1] |= num & 0xFFF;
// Store the data
memcpy(Run.Evt[k].data + Run.Evt[k].len, info->data, new_len);
Run.Evt[k].len += new_len;
}
/* Initialize data connection port */
int BindPort(int port)
{
int fd, i, irc;
struct sockaddr_in name;
fd = socket (PF_INET, SOCK_STREAM, 0);
if (fd < 0) {
printf(TXT_FATAL "DSINK: Can not create socket.: %m\n");
return fd;
}
i = 1;
irc = setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &i, sizeof(i));
if (irc) {
Log(TXT_ERROR "DSINK: setsockopt error: %m\n");
close(fd);
return -1;
}
name.sin_family = AF_INET;
name.sin_port = htons(port);
name.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(fd, (struct sockaddr *)&name, sizeof(name)) < 0) {
Log(TXT_FATAL "DSINK: Can not bind to port %d: %m\n", port);
close(fd);
return -1;
}
if (listen(fd, MAXCON) < 0) {
Log(TXT_FATAL "DSINK: Can not listen to port %d: %m\n", port);
close(fd);
return -1;
}
return fd;
}
/* Initialize data connection port */
int BindUdpPort(int port)
{
int fd, i, irc;
struct sockaddr_in name;
fd = socket(PF_INET, SOCK_DGRAM, 0);
if (fd < 0) {
Log(TXT_FATAL "DSINK: Can not create socket.: %m\n");
return fd;
}
name.sin_family = AF_INET;
name.sin_port = htons(port);
name.sin_addr.s_addr = htonl(INADDR_ANY);
if (bind(fd, (struct sockaddr *)&name, sizeof(name)) < 0) {
Log(TXT_FATAL "DSINK: Can not bind to port %d: %m\n", port);
close(fd);
return -1;
}
return fd;
}
/* Find mimimal delimiter */
void CheckReadyEvents(void)
{
int lD;
int i;
lD = 0x7FFFFFFF; // large positive number
for (i=0; i<MAXWFD; i++) if (Run.WFD[i] && Run.WFD[i]->GetLongDelim() < lD) lD = Run.WFD[i]->GetLongDelim();
FlushEvents(lD);
}
/* Clean Con array from closed connections */
void CleanCon(void)
{
int i, j;
for (j=0; j<Run.NCon; j++) if (Run.Con[j].fd < 0) {
for (i=j; i<Run.NCon-1; i++) memcpy(&Run.Con[i], &Run.Con[i+1], sizeof(Run.Con[0]));
Run.NCon--;
}
}
/* Push data for client to FIFO. Do nothing if no space left. */
void ClientPush(struct client_struct *client, char *data, int len)
{
int flen;
flen = client->rptr - client->wptr;
if (flen <= 0) flen += FIFOSIZE;
if (flen <= len) return; // no room left
flen = (len <= FIFOSIZE - client->wptr) ? len : FIFOSIZE - client->wptr;
memcpy(&client->fifo[client->wptr], data, flen);
if (flen < len) memcpy(client->fifo, &data[flen], len - flen);
client->wptr += len;
if (client->wptr >= FIFOSIZE) client->wptr -= FIFOSIZE;
}
/* Send data from fifo to TCP client */
void ClientSend(struct client_struct *client)
{
int flen, irc;
flen = (client->wptr > client->rptr) ? client->wptr - client->rptr : FIFOSIZE - client->rptr;
irc = write(client->fd, &client->fifo[client->rptr], flen);
if (irc < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
Log(TXT_WARN "DSINK: Client %s send error: %m\n", My_inet_ntoa(client->ip));
shutdown(client->fd, 2);
free(client->fifo);
client->fd = 0;
} else {
client->rptr += irc;
if (client->rptr == FIFOSIZE) client->rptr = 0;
}
}
/* Close Data File */
void CloseDataFile(void)
{
char cmd[MAXSTR];
if (!Run.fData) return;
fclose(Run.fData);
snprintf(cmd, MAXSTR, "echo %s >> %s", Run.fDataName, DATA_LOG);
if (system(cmd)) Log(TXT_ERROR "DSINK: Can not write to " DATA_LOG);
Log(TXT_INFO "DSINK: File %s closed. %d events, %d SelfTriggers, %Ld bytes in %d s.\n",
Run.fDataName, Run.RecStat[0], Run.RecStat[1], Run.FileCounter, time(NULL) - Run.LastFileStarted);
Run.fData = NULL;
}
/* Close command connections */
void CloseSlaves(void)
{
int i;
for (i=0; i<Config.NSlaves; i++) if (Run.Slave[i].in.f) {
if (Run.Slave[i].PID) {
fprintf(Run.Slave[i].in.f, "q\n");
fprintf(Run.Slave[i].in.f, "q\n");
}
fclose(Run.Slave[i].in.f);
if (Run.Slave[i].out.f) fclose(Run.Slave[i].out.f);
if (Run.Slave[i].err.f) fclose(Run.Slave[i].err.f);
}
sleep(1);
for (i=0; i<Config.NSlaves; i++)
if (Run.Slave[i].PID && !waitpid(Run.Slave[i].PID, NULL, WNOHANG)) kill(Run.Slave[i].PID, SIGTERM);
}
/* Select get events once */
void DoSelect(int AcceptCommands)
{
fd_set set;
fd_set wset;
struct timeval tm;
int i, irc;
tm.tv_sec = 2; // 2 s
tm.tv_usec = 0;
FD_ZERO(&set);
if (AcceptCommands) FD_SET(fileno(rl_instream), &set);
FD_SET(Run.fdPort, &set);
FD_SET(Run.fdOut, &set);
FD_SET(Run.udpPort, &set);
for (i=0; i<Run.NCon; i++) FD_SET(Run.Con[i].fd, &set);
for (i=0; i<Config.NSlaves; i++) if (Run.Slave[i].PID) FD_SET(Run.Slave[i].out.fd[0], &set);
for (i=0; i<Config.NSlaves; i++) if (Run.Slave[i].PID) FD_SET(Run.Slave[i].err.fd[0], &set);
FD_ZERO(&wset);
for (i=0; i<MAXCON; i++) if (Run.Client[i].fd && Run.Client[i].rptr != Run.Client[i].wptr) FD_SET(Run.Client[i].fd, &wset);
irc = select(FD_SETSIZE, &set, &wset, NULL, &tm);
if (irc < 0) return;
for (i=0; i<Config.NSlaves; i++) if (Run.Slave[i].PID && waitpid(Run.Slave[i].PID, NULL, WNOHANG)) {
Log(TXT_ERROR "DSINK: no/lost connection to %s\n", Config.SlaveList[i]);
Run.Slave[i].PID = 0;
}
if (irc) {
if (AcceptCommands) if (FD_ISSET(fileno(rl_instream), &set)) rl_callback_read_char();
if (FD_ISSET(Run.fdPort, &set)) {
if (Run.NCon < MAXCON) {
OpenCon(Run.fdPort, &Run.Con[Run.NCon]);
if (Run.Con[Run.NCon].fd > 0) Run.NCon++;
} else {
DropCon(Run.fdPort);
}
}
if (FD_ISSET(Run.fdOut, &set) && OpenClient() > 0) DropCon(Run.fdOut);
if (FD_ISSET(Run.udpPort, &set)) ProcessSlow();
for (i=0; i<Run.NCon; i++) if (FD_ISSET(Run.Con[i].fd, &set)) GetAndWrite(i);
for (i=0; i<Config.NSlaves; i++) if (Run.Slave[i].PID && FD_ISSET(Run.Slave[i].out.fd[0], &set))
GetFromSlave(Config.SlaveList[i], &Run.Slave[i].out, &Run.Slave[i]);
for (i=0; i<Config.NSlaves; i++) if (Run.Slave[i].PID && FD_ISSET(Run.Slave[i].err.fd[0], &set))
GetFromSlave(Config.SlaveList[i], &Run.Slave[i].err, &Run.Slave[i]);
CleanCon();
for (i=0; i<MAXCON; i++) if (Run.Client[i].fd && FD_ISSET(Run.Client[i].fd, &wset)) ClientSend(&Run.Client[i]);
} else {
FlushEvents(-1);
}
}
/* Ignore new connection if too many. Should never happen. */
void DropCon(int fd)
{
struct sockaddr_in addr;
socklen_t len;
int irc;
len = sizeof(addr);
irc = accept(fd, (struct sockaddr *)&addr, &len);
if (irc < 0) {
Log(TXT_ERROR "DSINK: Connection accept error: %m\n");
return;
}
Log(TXT_WARN "DSINK: Too many connections dropping from %s\n", inet_ntoa(addr.sin_addr));
close(irc);
}
/* Look for line like:
"Modules: 35 36 16 17 18 19 21 22 23 24 20"
and set WFD presence
*/
void FindModulesLine(char *str)
{
char *ptr;
int i;
ptr = strstr(str, "Modules:");
if (!ptr) return;
ptr += strlen("Modules:");
for(;;) {
if (!ptr) return;
i = strtol(ptr, &ptr, 10) - 1;
if (i < 0) return;
if (i >= MAXWFD) continue;
if (Run.WFD[i]) Run.WFD[i]->SetStatus(1);
}
}
/* Save to disk all events with long token less than lToken
lToken = -1 - all events */
void FlushEvents(int lToken)
{
int LastEvent;
int i;
if (lToken == -1) {
for (LastEvent = 0; LastEvent < Config.MaxEvent; LastEvent++) if (!Run.Evt[LastEvent].len) break;
} else {
LastEvent = lToken - 128 - Run.lTokenBase;
if (LastEvent <= 0) return; // nothing to do
if (LastEvent > Config.MaxEvent) {
Log(TXT_ERROR "DSINK: Internal logic error: lToken(%d) - lTokenBase(%d) > MaxEvent(%d).\n",
lToken, Run.lTokenBase, Config.MaxEvent);
LastEvent = Config.MaxEvent;
}
}
// Write events
for (i=0; i<LastEvent; i++) {
if (Run.Evt[i].len) {
WriteEvent(Run.lTokenBase + i, &Run.Evt[i]);
Run.Evt[i].len = 0;
} else {
Log(TXT_WARN "DSINK: Empty event with long token %d [arg=%d LastEvent=%d Base=%d]\n",
Run.lTokenBase + i, lToken, LastEvent, Run.lTokenBase);
}
}
// Rotate cache
if (LastEvent < Config.MaxEvent) {
memcpy(Run.EvtCopy, Run.Evt, Config.MaxEvent * sizeof(struct event_struct));
memcpy(Run.Evt, &Run.EvtCopy[LastEvent], (Config.MaxEvent - LastEvent) * sizeof(struct event_struct));
memcpy(&Run.Evt[Config.MaxEvent - LastEvent], Run.EvtCopy, LastEvent * sizeof(struct event_struct));
}
Run.lTokenBase += LastEvent;
}
/* Get Data */
void GetAndWrite(int num)
{
int irc;
struct con_struct *con;
con = &Run.Con[num];
if (con->len == 0) { // getting length
irc = read(con->fd, con->buf, sizeof(int));
if (irc == 0) {
Log(TXT_INFO "DSINK: Connection closed from %s\n", My_inet_ntoa(con->ip));
free(con->buf);
close(con->fd);
con->fd = -1;
return;
}
if (irc != sizeof(int) || con->header->len < sizeof(struct rec_header_struct) || con->header->len > MBYTE) {
Log(TXT_ERROR "DSINK: Connection closed or stream data error irc = %d len = %d from %s %m\n",
irc, con->header->len, My_inet_ntoa(con->ip));
free(con->buf);
close(con->fd);
con->fd = -1;
return;
}
con->len = irc;
} else { // getting body
irc = read(con->fd, &con->buf[con->len], con->header->len - con->len);
if (irc <= 0) {
Log(TXT_ERROR "DSINK: Connection closed unexpectingly or stream data error irc = %d at %s %m\n", irc, My_inet_ntoa(con->ip));
free(con->buf);
close(con->fd);
con->fd = -1;
return;
}
con->len += irc;
if (con->len == con->header->len) {
if (con->header->ip != INADDR_LOOPBACK) {
Log(TXT_ERROR "DSINK: Wrong data signature %X - sychronization lost ? @ %s\n", con->header->ip, My_inet_ntoa(con->ip));
free(con->buf);
close(con->fd);
con->fd = -1;
return;
}
con->header->ip = con->ip;
if (con->BlkCnt >= 0 && con->header->cnt != con->BlkCnt + 1) con->ErrCnt++;
con->BlkCnt = con->header->cnt;
con->len = 0;
con->cnt += con->header->len;
ProcessData(con->buf);
return;
}
}
return;
}
/* Get text output from slave and send it to log */
void GetFromSlave(char *name, struct pipe_struct *p, struct slave_struct *slave)
{
char c;
int irc;
c = getc(p->f);
if (c == '\0' || c == '\n') {
p->buf[p->wptr] = '\n';
p->buf[p->wptr+1] = '\0';
} else if (p->wptr == MAXSTR - 3) {
p->buf[p->wptr] = c;
p->buf[p->wptr+1] = '\n';
p->buf[p->wptr+2] = '\0';
} else {
p->buf[p->wptr] = c;
p->wptr++;
return;
}
if (p->buf[0] == '_' && p->buf[1] == '_') {
irc = strtol(&p->buf[2], NULL, 0);
slave->IsWaiting = 0;
slave->LastResponse = irc;
if (irc) {
Log(TXT_ERROR "%s: The last command returned an error.\n", name);
slave->CommandFifo[0] = '\0';
} else {
SendFromFifo(slave);
}
} else {
FindModulesLine(p->buf);
Log(TXT_INFO "%s: %s", name, p->buf);
}
p->wptr = 0;
}
/* Read log file and find the next auto file number.
Also call the script to check/switch disk. */
int GetNextAutoNumber(void)
{
FILE *f;
char str[MAXSTR];
int irc;
int num;
char *ptr;
// Get the last line from the data log
f = popen("[ -f " DATA_LOG " ] && tail -1 " DATA_LOG, "r");
if (!f) {
Log(TXT_ERROR "DSINK: Internal error - can not get info from the data log\n");
return -1;
}
irc = fread(str, 1, MAXSTR-1, f);
str[irc] = '\0';
fclose(f);
// Get file number - the first number in the file name
for (ptr = str; ptr[0]; ptr++) if (isdigit(ptr[0])) break;
if (ptr[0]) {
num = strtol(ptr, NULL, 10);
} else {
num = 0;
}
num++;
// Make a file
for (;;num++) {
strcpy(str, DATA_DIR);
snprintf(&str[strlen(DATA_DIR)], MAXSTR - strlen(DATA_DIR), Config.AutoName, num);
// Check if exists
f = fopen(str, "rb");
if (!f) break;
fclose(f);
}
// Call disk check/switch script
if (Config.CheckDiskScript[0] && system(Config.CheckDiskScript)) {
Log(TXT_ERROR "DSINK: disk check error. No Space left ?\n");
return -1;
}
return num;
}
/* Print Help */
void Help(void)
{
printf("Usage: dsink [-a] [-c config.conf] [-h]. Options:\n");
printf("-a - auto start data taking;\n");
printf("-c config.conf - use configuration file config,conf;\n");
printf("-h - print this message and exit.\n");
printf("Commands:\n");
printf("\tcmd <vme>|* <uwfdtool command> - send command to vme crate;\n");
printf("\tfile [<file_name>] - set file to write data;\n");
printf("\thelp - print this message;\n");
printf("\tinfo - print statistics;\n");
printf("\tinit - init vme modules;\n");
printf("\tlist - list connected slaves;\n");
printf("\tpause - set inhibit;\n");
printf("\tquit - Quit;\n");
printf("\tresume - clear inhibit;\n");
printf("\tstart/stop - start/stop data taking;\n");
printf("The simplest way to write data in automatic mode is:\n");
printf("\tstart\n");
printf("\tfile auto\n");
printf("\t\t... DAQ started - files and disks will be changed automatically\n");
}
void Info(void)
{
int i, j;
int *cnt;
long long BlkCnt;
int flag;
const char type_names[8][8] = {"SELF", "MAST", "TRIG", "RAW ", "HIST", "SYNC", "RSRV", "RSRV"};
printf("System Initialization %s done.\n", (Run.Initialized) ? "" : TXT_BOLDRED "not" TXT_NORMAL);
if (Run.fData) printf("File: %s: %Ld bytes / %d records: %Ld events + %Ld SelfTriggers / %d s\n",
Run.fDataName, ftello(Run.fData), Run.fHead.cnt, Run.RecStat[0], Run.RecStat[1], (int)(time(NULL) - Run.LastFileStarted));
if (Run.SuspendFlag == 1) printf("Run suspended for platform moution.\n");
if (Run.SuspendFlag == 2) printf("Run suspended for platform moution till %s", ctime(&Run.ReleaseTime));
printf("Modules: ");
for (i=0; i<MAXWFD; i++) if (Run.WFD[i]) printf("%d ", i+1);
printf("\nRecord types: ");
for (i=0; i<8; i++) printf("%s: %Ld ", type_names[i], Run.TypeStat[i]);
printf("\n");
BlkCnt = 0;
flag = 0;
for (i=0; i<MAXWFD; i++) if (Run.WFD[i]) {
cnt = Run.WFD[i]->GetErrCnt();
BlkCnt += Run.WFD[i]->GetBlkCnt();
for (j=0; j<=ERR_OTHER; j++) if (cnt[j]) break;
if (j <= ERR_OTHER) {
if (!flag) {
printf("Format statistics (errors):\n");
// 12345671234567890A1234567890A1234567890A1234567890A1234567890A1234567890A
printf("Module ChanPar SumPar Token Delimiter SelfTrig Other Blocks\n");
flag = 1;
}
printf(" %3d: ", i+1);
for (j=0; j<=ERR_OTHER; j++) printf((cnt[j]) ? TXT_BOLDRED "%10d " TXT_NORMAL : "%10d ", cnt[j]);
printf("%12Ld\n", Run.WFD[i]->GetBlkCnt());
}
}
printf("Grand total %Ld blocks received.\n", BlkCnt);
}
void Init(void)
{
char cmd[MAXSTR];
int i, j;
int Flag[MAXCON];
Run.Initialized = 0;
memset(Flag, 0, sizeof(Flag));
for (i=0; i<MAXWFD; i++) if (Run.WFD[i]) Run.WFD[i]->SetStatus(0); // clear presence status
snprintf(cmd, MAXSTR, "p * %s;?", Config.XilinxFirmware);
for (j = 0; j < Config.MaxInitAttempts; j++) {
for (i=0; i<Config.NSlaves; i++) if (Run.Slave[i].PID && !Flag[i]) SendScript(&Run.Slave[i], cmd);
for (i=0; i<Config.NSlaves; i++) while (!Flag[i] && Run.Slave[i].IsWaiting) DoSelect(0);
for (i=0; i<Config.NSlaves; i++) if (!Flag[i] && !Run.Slave[i].LastResponse) Flag[i] = 1;
j = 0;
for (i=0; i<MAXWFD; i++) if (Run.WFD[i] && !Run.WFD[i]->GetStatus()) {
Log(TXT_ERROR "DSINK: Module %d not present.\n", i+1);
printf(TXT_BOLDRED "Attention !!! Module %d not present." TXT_NORMAL "\n", i+1);
j++;
}
if (j) {
Log(TXT_ERROR "DSINK: Initialization failed.\n");
printf(TXT_BOLDRED "Initialization failed." TXT_NORMAL "\n");
return;
}
sleep(1);
for (i=0; i<Config.NSlaves; i++) if (Flag[i] == 1) SendScript(&Run.Slave[i], "i *;?");
for (i=0; i<Config.NSlaves; i++) while (Flag[i] == 1 && Run.Slave[i].IsWaiting) DoSelect(0);
for (i=0; i<Config.NSlaves; i++) if (Flag[i] == 1) Flag[i] = (Run.Slave[i].LastResponse) ? 0 : 2;
for (i=0; i<Config.NSlaves; i++) if (Flag[i] != 2) break;
if (i == Config.NSlaves) break;
}
if (i != Config.NSlaves) {
Log(TXT_ERROR "DSINK: Can not initialize the modules in %d attempts - check VME responses.\n", Config.MaxInitAttempts);
printf(TXT_BOLDRED "Init failed. Check the system and try again." TXT_NORMAL "\n");
return;
}
// Clear inhibit on VETO module
snprintf(cmd, MAXSTR, "m %d 0;?", Config.VetoMasterModule);
SendScript(&Run.Slave[Config.VetoMasterCrate], cmd);
Run.Initialized = 1;
}
void Log(const char *msg, ...)
{
char str[MAXSTR];
time_t t;
FILE *f;
va_list ap;
va_start(ap, msg);
t = time(NULL);
strftime(str, MAXSTR,"%F %T", localtime(&t));
f = (Run.fLog) ? Run.fLog : stdout;
fprintf(f, str);
vfprintf(f, msg, ap);
va_end(ap);
fflush(f);
}
/* Convert int to inet address */
char *My_inet_ntoa(int num)
{
struct in_addr addr;
addr.s_addr = num;
return inet_ntoa(addr);
}
/* Open client data connection to the server. Return 0 on success. */
int OpenClient(void)
{
struct sockaddr_in addr;
socklen_t len;
int i, irc, descr;
// Search for empty slot
for (i=0; i<MAXCON; i++) if (!Run.Client[i].fd) break;
if (i == MAXCON) return 100;
len = sizeof(addr);
irc = accept(Run.fdOut, (struct sockaddr *)&addr, &len);
if (irc <= 0) {
Log(TXT_ERROR "DSINK: Client connection accept error: %m\n");
return -10;
}
Run.Client[i].fd = irc;
Run.Client[i].ip = addr.sin_addr.s_addr;
Run.Client[i].rptr = 0;
Run.Client[i].wptr = 0;
// Set client non-blocking
irc = fcntl(Run.Client[i].fd, F_GETFL, 0);
if (irc == -1) {
close(Run.Client[i].fd);
Run.Client[i].fd = 0;
Log(TXT_ERROR "DSINK: Client fcntl(F_GETFL) error: %m\n");
return -15;
}
descr = irc | O_NONBLOCK;
irc = fcntl(Run.Client[i].fd, F_SETFL, descr);
if (irc == -1) {
close(Run.Client[i].fd);
Run.Client[i].fd = 0;
Log(TXT_ERROR "DSINK: Client fcntl(F_SETFL) error: %m\n");
return -16;
}
Run.Client[i].fifo = (char *) malloc(FIFOSIZE);
if (!Run.Client[i].fifo) {
close(Run.Client[i].fd);
Run.Client[i].fd = 0;
Log(TXT_ERROR "DSINK: Client FIFO allocation error: %m\n");
return -20;
}
Log(TXT_INFO "DSINK: client connection from %s accepted\n", inet_ntoa(addr.sin_addr));
return 0;
}
/* Open slave data connection to the server */
void OpenCon(int fd, con_struct *con)
{
struct sockaddr_in addr;
socklen_t len;
int irc;
memset(con, 0, sizeof(con_struct));
len = sizeof(addr);
irc = accept(fd, (struct sockaddr *)&addr, &len);
if (irc < 0) {
Log(TXT_ERROR "DSINK: Connection accept error: %m\n");
return;
}
con->buf = (char *)malloc(MBYTE);
if (!con->buf) {
Log(TXT_ERROR "DSINK: Can not allocate buffer of %d bytes: %m\n", MBYTE);
close(irc);
return;
}
con->header = (struct rec_header_struct *) con->buf;
con->fd = irc;
con->ip = addr.sin_addr.s_addr;
con->BlkCnt = -1;
Log(TXT_INFO "DSINK: data connection from %s accepted\n", inet_ntoa(addr.sin_addr));
}
/* Open file to write data */
void OpenDataFile(const char *name)
{
char cmd[2*MAXSTR];
int irc;
CloseDataFile();
Run.fDataName[MAXSTR-1] = '\0';
Run.fData = NULL;
Run.iAuto = 0;
if (!name || !name[0]) {
Run.fDataName[0] = '\0';
return;
} else if (!strcmp(name, "auto")) {
irc = GetNextAutoNumber();
if (irc < 0) return; // error message was already printed
Run.iAuto = 1;
strcpy(Run.fDataName, DATA_DIR);
snprintf(&Run.fDataName[strlen(DATA_DIR)], MAXSTR - strlen(DATA_DIR), Config.AutoName, irc);
} else if (name[0] == '/') {
strncpy(Run.fDataName, name, MAXSTR);
} else {
snprintf(Run.fDataName, MAXSTR, "%s%s", DATA_DIR, name);
}
Run.fData = fopen(Run.fDataName, "ab"); // we append to data files
if (!Run.fData) {
Log(TXT_ERROR "DSINK: Can not open data file to write: %s (%m)\n", Run.fDataName);
Run.fDataName[0] = '\0';
Run.iAuto = 0; // reset auto flag
return;
}
Run.fHead.cnt = 0;
Run.fHead.ip = INADDR_LOOPBACK;
memset(Run.RecStat, 0, sizeof(Run.RecStat));
Run.LastFileStarted = time(NULL);
Run.FileCounter = 0;
WritePlatformPosition();
}
/* Open log file and xterm */
int OpenLog(void)
{
char cmd[MAXSTR];
Run.fLog = fopen(Config.LogFile, "at");
if (!Run.fLog) {
Log(TXT_FATAL "DSINK: Can not open log-file: %s\n", Config.LogFile);
return -10;
}
snprintf(cmd, MAXSTR, Config.LogTermCMD, Config.LogFile);
Run.fLogPID = StartProcess(cmd);
if (Run.fLogPID < 0) return -11;
return 0;
}
/* Open command connection to VME crates and start uwfdtool in them */
int OpenSlaves(void)
{
int i, irc;
pid_t pid;
for (i=0; i<Config.NSlaves; i++) {
if (TEMP_FAILURE_RETRY(pipe(Run.Slave[i].in.fd))) {
Log(TXT_FATAL "DSINK: Can not create in pipe for %s: %m\n", Config.SlaveList[i]);
return -1;
}
if (TEMP_FAILURE_RETRY(pipe(Run.Slave[i].out.fd))) {
Log(TXT_FATAL "DSINK: Can not create out pipe for %s: %m\n", Config.SlaveList[i]);
return -2;
}
if (TEMP_FAILURE_RETRY(pipe(Run.Slave[i].err.fd))) {
Log(TXT_FATAL "DSINK: Can not create error pipe for %s: %m\n", Config.SlaveList[i]);
return -3;
}
Run.Slave[i].in.f = fdopen(Run.Slave[i].in.fd[1], "wt");
if (!Run.Slave[i].in.f) {
Log(TXT_FATAL "DSINK: fdopen for stdin failed for %s: %m\n", Config.SlaveList[i]);
return -30;
}
Run.Slave[i].out.f = fdopen(Run.Slave[i].out.fd[0], "rt");
if (!Run.Slave[i].out.f) {
Log(TXT_FATAL "DSINK: fdopen for stdout failed for %s: %m\n", Config.SlaveList[i]);
return -31;
}
Run.Slave[i].err.f = fdopen(Run.Slave[i].err.fd[0], "rt");
if (!Run.Slave[i].err.f) {
Log(TXT_FATAL "DSINK: fdopen for stderr failed for %s: %m\n", Config.SlaveList[i]);
return -32;
}
if (setvbuf(Run.Slave[i].out.f, NULL, _IONBF, 0)) {
Log(TXT_FATAL "DSINK: stdout setting no buffering mode failed for %s: %m\n", Config.SlaveList[i]);
return -33;
}
if (setvbuf(Run.Slave[i].err.f, NULL, _IONBF, 0)) {
Log(TXT_FATAL "DSINK: stderr setting no buffering mode failed for %s: %m\n", Config.SlaveList[i]);
return -34;
}
pid = fork();
if ((int) pid < 0) { // error
Log(TXT_FATAL "DSINK: Can not fork for %s: %m\n", Config.SlaveList[i]);
return -10;
} else if ((int) pid == 0) { // child process
dup2(Run.Slave[i].in.fd[0], STDIN_FILENO);
TEMP_FAILURE_RETRY(close(Run.Slave[i].in.fd[0]));
TEMP_FAILURE_RETRY(close(Run.Slave[i].in.fd[1]));
dup2(Run.Slave[i].out.fd[1], STDOUT_FILENO);
TEMP_FAILURE_RETRY(close(Run.Slave[i].out.fd[0]));
TEMP_FAILURE_RETRY(close(Run.Slave[i].out.fd[1]));
dup2(Run.Slave[i].err.fd[1], STDERR_FILENO);
TEMP_FAILURE_RETRY(close(Run.Slave[i].err.fd[0]));
TEMP_FAILURE_RETRY(close(Run.Slave[i].err.fd[1]));
execl(SSH, SSH, "-x", Config.SlaveList[i], Config.SlaveCMD, NULL);
Log(TXT_FATAL "DSINK: Can not do ssh %s: %s (%m)\n", Config.SlaveList[i], Config.SlaveCMD); // we shouldn't get here after execl
exit(-20);
} else { // main process
Run.Slave[i].PID = pid;
TEMP_FAILURE_RETRY(close(Run.Slave[i].in.fd[0]));
TEMP_FAILURE_RETRY(close(Run.Slave[i].out.fd[1]));
TEMP_FAILURE_RETRY(close(Run.Slave[i].err.fd[1]));
}
}
return 0;
}
/* Process data received */
void ProcessData(char *buf)
{
struct rec_header_struct *header;
struct blkinfo_struct *info;
int num;
if (Run.RestartFlag) return;
header = (struct rec_header_struct *)buf;
if ((header->type & 0xFFFF0000) != REC_WFDDATA) return; // we ignore other records here
num = header->type & REC_SERIALMASK;
if (num == 0 || num >= MAXWFD) {
Log(TXT_WARN "Out of range module serial number %d met (1-%d)\n", num, MAXWFD);
return;
}
if (!Run.WFD[num-1]) return; // we ignore not configured modules
try {
Run.WFD[num-1]->Add(buf, header->len); // store data
for (;;) { // distribute data over events
info = Run.WFD[num-1]->Get();
if (!info) break;
if (info->type == TYPE_SELF) {
WriteSelfTrig(num, info);
} else {
Add2Event(num, info);
if (info->type == TYPE_DELIM) CheckReadyEvents();
}
Run.TypeStat[info->type & 7]++;
}
} catch (const int irc) {
Log(TXT_FATAL "Exception %d signalled.\n", irc);
Run.iStop = 1;
};
}
void ProcessSlow(void)
{
struct {
int len;
int src;
int data[0x3FFE]; // udp record is 64k maximum
} buf;
int irc;
void *ptr;
// receive the record
irc = read(Run.udpPort, &buf, sizeof(buf));
if (irc < 2 * sizeof(int) || irc != buf.len) return; // strange record - nothing to do (error?)
// save to data file
irc += sizeof(struct rec_header_struct) - 2 * sizeof(int);
if (Run.wDataSize < irc) {
ptr = realloc(Run.wData, irc);
if (!ptr) {
Log(TXT_ERROR "DSINK: Memory allocation failure of %d bytes in ProcessSlow %m.\n", irc);
goto skip_slow_write;
}
Run.wData = ptr;
Run.wDataSize = irc;
}
Run.fHead.len = irc;
Run.fHead.type = REC_SLOW | (buf.src & REC_SLOWMASK);
Run.fHead.time = time(NULL);
memcpy(Run.wData, &Run.fHead, sizeof(struct rec_header_struct));
if (irc > sizeof(struct rec_header_struct))
memcpy((char *)Run.wData + sizeof(struct rec_header_struct), buf.data, irc - sizeof(struct rec_header_struct));
WriteAndSend();
skip_slow_write:
// Analize the record
switch(buf.src) {
case LIFT_SRC:
if ((!(Run.SuspendFlag & SUSPEND_FLAG) && !buf.data[0]) || !Run.iAuto) break;
if (buf.data[0]) {
Run.SuspendFlag = SUSPEND_FLAG;
} else {
Run.SuspendFlag = RELEASE_FLAG;
Run.ReleaseTime = time(NULL) + Config.LiftTimeout;
}
break;
default: // skip unknown records
break;
}
}
/* Read configuration file */
int ReadConf(const char *fname)
{
config_t cnf;
int tmp;
int i;
char *stmp;
char *tok;
config_setting_t *ptr;
char cmd[MAXSTR];
memset(&Config, 0, sizeof(Config));
config_init(&cnf);
if (config_read_file(&cnf, fname) != CONFIG_TRUE) {
Log(TXT_FATAL "DSINK: Configuration error in file %s at line %d: %s\n", fname, config_error_line(&cnf), config_error_text(&cnf));
config_destroy(&cnf);
return -10;
}
// int InPort; // Data port 0xA336
Config.InPort = (config_lookup_int(&cnf, "Sink.InPort", &tmp)) ? tmp : 0xA336;
// int OutPort; // Out port 0xB230
Config.OutPort = (config_lookup_int(&cnf, "Sink.OutPort", &tmp)) ? tmp : 0xB230;
// int udpPort; // udp port 15629
Config.udpPort = (config_lookup_int(&cnf, "Sink.udpPort", &tmp)) ? tmp : 15629;
// char MyName[MAXSTR]; // The server host name
strncpy(Config.MyName, (config_lookup_string(&cnf, "Sink.MyName", (const char **) &stmp)) ? stmp : "dserver.danss.local", MAXSTR);
// char SlaveList[MAXCON][MAXSTR]; // Crate host names list
if (!config_lookup_string(&cnf, "Sink.SlaveList", (const char **) &stmp))
stmp = (char *)"vme01.danss.local vme02.danss.local vme03.danss.local vme04.danss.local";
tok = strtok(stmp, " \t,");
for (i=0; i<MAXCON; i++) {
if (!tok || !tok[0]) break;
strncpy(Config.SlaveList[i], tok, MAXSTR);
tok = strtok(NULL, " \t,");