Analysis Software
Documentation for sPHENIX simulation software
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
rcdaq.cc
Go to the documentation of this file. Or view the newest version in sPHENIX GitHub for file rcdaq.cc
1 //#define WRITEPRDF
2 
3 #include <stdio.h>
4 #include <unistd.h>
5 #include <fcntl.h>
6 
7 #include <stdlib.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <signal.h>
11 #include <limits.h>
12 #include <errno.h>
13 #include <sys/stat.h>
14 #include <sys/file.h>
15 
16 #include <pthread.h>
17 
18 #include <iostream>
19 #include <iomanip>
20 #include <sstream>
21 
22 #include <errno.h>
23 #include <string.h>
24 #include <unistd.h>
25 #include <time.h>
26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <arpa/inet.h>
29 #include <netinet/if_ether.h>
30 #include <netinet/in.h>
31 #include <netinet/ip.h>
32 #include <net/if.h>
33 #include <sys/ioctl.h>
34 #include <netpacket/packet.h>
35 #include <sys/socket.h>
36 #include <netdb.h>
37 
38 #include <vector>
39 #include <map>
40 #include <queue>
41 
42 
43 
44 #include "getopt.h"
45 
46 #include "daq_device.h"
47 #include "daqBuffer.h"
48 #include "eloghandler.h"
49 #include "TriggerHandler.h"
50 
51 #include "rcdaq.h"
52 #include "rcdaq_rpc.h"
53 #include "md5.h"
54 
55 #ifdef HAVE_MOSQUITTO_H
56 #include "MQTTConnection.h"
57 #endif
58 
59 
60 
61 int open_file_on_server(const int run_number);
63 int open_serverSocket(const char * host_name, const int port);
64 int server_send_beginrun_sequence(const char * filename, const int runnumber, int fd);
65 int server_send_rollover_sequence(const char * filename, int fd);
68 
69 void * mg_server (void *arg);
70 int mg_end();
71 int request_mg_update(const int what);
72 
73 
74 pthread_mutex_t WriteSem;
75 pthread_mutex_t WriteProtectSem;
76 
77 pthread_mutex_t MonitoringRequestSem;
78 pthread_mutex_t SendSem;
79 pthread_mutex_t SendProtectSem;
80 
81 pthread_mutex_t FdManagementSem;
82 
83 char pidfilename[128];
84 
85 // those are the "todo" definitions. DAQ can be woken up by a trigger
86 // and read something, or by a command and change its status.
87 
88 int servernumber = 0;
89 
90 #define DAQ_TRIGGER 0x01
91 #define DAQ_COMMAND 0x02
92 #define DAQ_SPECIAL 0x04
93 
94 // now there are a few commands which the DAQ process obeys.
95 
96 #define COMMAND_INIT 1
97 #define COMMAND_BEGIN 2
98 #define COMMAND_END 3
99 #define COMMAND_FINISH 4
100 #define COMMAND_OPENP 5
101 
102 // now there are a few actions for the DAQ process
103 // when DAQ-triggered
104 #define DAQ_INIT 1
105 #define DAQ_READ 2
106 
107 using namespace std;
108 
109 static std::map<string,string> RunTypes;
110 
111 static std::string TheFileRule = "rcdaq-%08d-%04d.evt";
112 static std::string TheRunType = " ";
115 
117 static int RunnumberfileIsSet = 0;
118 
120 static int RunnumberAppIsSet = 0;
121 
122 static std::string MyName = "";
123 
124 static int daq_open_flag = 0; //no files written unless asked for
125 static int daq_server_flag = 0; //no server access
126 static int daq_server_port = 0; // invalid port
127 static std::string daq_server_name = ""; // our server, if any
128 
129 static int file_is_open = 0;
130 static int server_is_open = 0;
131 static int current_filesequence = 0;
132 static int outfile_fd;
133 
134 #ifdef HAVE_MOSQUITTO_H
135 MQTTConnection *mqtt = 0;
136 std::string mqtt_host;
137 int mqtt_port = 0;
138 #endif
139 
141 
142 
143 static int RunControlMode = 0;
144 static int CurrentEventType = 0;
145 
146 static ElogHandler *ElogH =0;
148 
149 
150 typedef std::vector<daq_device *> devicevector;
151 typedef devicevector::iterator deviceiterator;
152 
153 #define MAXEVENTID 32
154 static int Eventsize[MAXEVENTID];
155 
156 // int Daq_Status;
157 
158 // #define DAQ_RUNNING 0x01
159 // #define DAQ_READING 0x02
160 // #define DAQ_ENDREQUESTED 0x04
161 // #define DAQ_PROTOCOL 0x10
162 // #define DAQ_BEGININPROGRESS 0x20
163 
164 // the original bit-wise status word manipulation wasn't particular thread-safe.
165 // upgrading to individual variables (and we ditch "DAQ_READING")
166 
167 int DAQ_RUNNING = 0;
170 
173 
176 
177 int TheRun = 0;
178 time_t StartTime = 0;
180 
185 
186 
188 
189 static int TriggerControl = 0;
190 
191 int ThePort=8899;
192 
193 int TheServerFD = 0;
194 
196 
197 pthread_t ThreadId;
198 pthread_t ThreadMon;
199 pthread_t ThreadEvt;
200 pthread_t ThreadWeb = 0;
201 
203 
204 int end_thread = 0;
205 
206 pthread_mutex_t M_cout;
207 
208 
209 struct sockaddr_in si_mine;
210 
211 #define MONITORINGPORT 9930
212 
213 
215 
216 
217 
218 int NumberWritten = 0;
219 unsigned long long BytesInThisRun = 0;
220 unsigned long long BytesInThisFile = 0;
221 unsigned long long RolloverLimit = 0;
222 
223 unsigned long long run_volume, max_volume;
225 
226 time_t last_speed_time = 0;
228 
229 static time_t last_volume_time = 0;
230 double last_volume = 0;
231 
232 int EvtId = 0;
233 
234 
235 int max_seconds = 0;
236 int verbose = 0;
237 int runnumber=1;
238 int packetid = 1001;
240 
243 
245 
248 
250 {
251  return pidfilename;
252 }
253 
255 {
256  // we already have one
257  if ( TriggerH ) return -1;
258  TriggerH = th;
259  return 0;
260 }
261 
263 {
264  TriggerH = 0;
265  return 0;
266 }
267 
268 
269 void sig_handler(int i)
270 {
271  if (verbose)
272  {
273  pthread_mutex_lock(&M_cout);
274  cout << "**interrupt " << endl;
275  pthread_mutex_unlock(&M_cout);
276  }
277 
278  TriggerControl = 0;
279 }
280 
281 
283 {
284  if ( TriggerH) TriggerH->rearm();
285  return 0;
286 }
287 
289 {
290 
291  TriggerControl=1;
292  if ( TriggerH) TriggerH->enable();
293 
294  int status = pthread_create(&ThreadEvt, NULL,
295  EventLoop,
296  (void *) 0);
297 
298  if (status )
299  {
300  cout << "error in event thread create " << status << endl;
301  exit(0);
302  }
303  else
304  {
305  if ( TriggerH) TriggerH->rearm();
306  }
307 
308  return 0;
309 
310 }
311 
313 {
314  TriggerControl=0; // this makes the trigger process terminate
315  if ( TriggerH) TriggerH->disable();
316 
317  return 0;
318 }
319 
320 
321 int daq_setmaxevents (const int n, std::ostream& os)
322 {
323 
324  max_events =n;
325  return 0;
326 
327 }
328 
329 int daq_setmaxvolume (const int n_mb, std::ostream& os)
330 {
331  unsigned long long x = n_mb;
332  max_volume =x * 1024 *1024;
333  return 0;
334 
335 }
336 
337 int daq_setRunControlMode(const int flag, std::ostream& os)
338 {
339  if ( DAQ_RUNNING )
340  {
341  os << "Run is active" << endl;
342  return -1;
343  }
344  if (flag) RunControlMode = 1;
345  else RunControlMode = 0;
346  return 0;
347 }
348 
349 int daq_setrolloverlimit (const int n_gb, std::ostream& os)
350 {
351  if ( DAQ_RUNNING )
352  {
353  os << "Run is active" << endl;
354  return -1;
355  }
356  RolloverLimit = n_gb;
357  return 0;
358 
359 }
360 
361 int daq_setmaxbuffersize (const int n_kb, std::ostream& os)
362 {
363  if ( DAQ_RUNNING )
364  {
365  os << "Run is active" << endl;
366  return -1;
367  }
368 
369  Buffer1.setMaxSize(n_kb*1024);
370  Buffer2.setMaxSize(n_kb*1024);
371  return 0;
372 }
373 
374 int daq_setadaptivebuffering (const int usecs, std::ostream& os)
375 {
376  adaptivebuffering = usecs;
377  return 0;
378 }
379 
380 // this call was added to allow chaging the format after the
381 // server was started. Before this was a compile-time option.
382 int daq_setEventFormat(const int f, std::ostream& os )
383 {
384  if ( daq_running() )
385  {
386  os << "Run is active" << endl;
387  return -1;
388  }
389 
390  if (DeviceList.size())
391  {
392  os << "Cannot switch format after devices are defined" << endl;
393  return -1;
394  }
395  int status = Buffer1.setEventFormat(f);
396  status |= Buffer2.setEventFormat(f);
397  return status;
398 }
399 
400 // this is a method for the devices to obtain
401 // the info which format we are writing
402 
404 {
405  return Buffer1.getEventFormat();
406 }
407 
408 int daq_getRunControlMode(std::ostream& os)
409 {
410  os << RunControlMode << endl;
411  return 0;
412 }
413 
414 // elog server setup
415 int daq_set_eloghandler( const char *host, const int port, const char *logname)
416 {
417 
418  if ( ElogH) delete ElogH;
419  ElogH = new ElogHandler (host, port, logname );
420 
421  setenv ( "DAQ_ELOGHOST", host , 1);
422  char str [128];
423  sprintf(str, "%d", port);
424  setenv ( "DAQ_ELOGPORT", str , 1);
425  setenv ( "DAQ_ELOGLOGBOOK", logname , 1);
426 
427  return 0;
428 }
429 
430 
431 
432 int readn (int fd, char *ptr, const int nbytes)
433 {
434  // in the interest of having just one "writen" call,
435  // we determine if the file descriptor we got is a socket
436  // (then we use send() ) or a file ( then we use write() ).
437  struct stat statbuf;
438  fstat(fd, &statbuf);
439  int fd_is_socket = 0;
440  if ( S_ISSOCK(statbuf.st_mode) ) fd_is_socket = 1;
441 
442  int nread, nleft;
443  nleft = nbytes;
444  while ( nleft>0 )
445  {
446  if ( fd_is_socket) nread = recv (fd, ptr, nleft, MSG_NOSIGNAL);
447  else nread = read (fd, ptr, nleft);
448 
449  if ( nread <= 0 )
450  {
451  return nread;
452  }
453 
454  nleft -= nread;
455  ptr += nread;
456  }
457 
458  return (nbytes-nleft);
459 }
460 
461 
462 int writen (int fd, char *ptr, const int nbytes)
463 {
464 
465  // in the interest of having just one "writen" call,
466  // we determine if the file descriptor we got is a socket
467  // (then we use send() ) or a file ( then we use write() ).
468  struct stat statbuf;
469  fstat(fd, &statbuf);
470  int fd_is_socket = 0;
471  if ( S_ISSOCK(statbuf.st_mode) ) fd_is_socket = 1;
472 
473  int nleft, nwritten;
474  nleft = nbytes;
475  while ( nleft>0 )
476  {
477  nwritten = 0;
478 
479  if ( fd_is_socket) nwritten = send (fd, ptr, nleft, MSG_NOSIGNAL);
480  else nwritten = write (fd, ptr, nleft);
481 
482  if ( nwritten <0 )
483  {
484  perror ("writen");
485  return 0;
486  }
487 
488  nleft -= nwritten;
489  ptr += nwritten;
490  }
491  return (nbytes-nleft);
492 }
493 
494 
495 
496 //-------------------------------------------------------------
497 
498 int open_file(const int run_number, int *fd)
499 {
500 
501  if ( file_is_open) return -1;
502 
503 
504  static char d[1024];
505  sprintf( d, TheFileRule.c_str(),
507 
508 
509  // test if the file exists, do not overwrite
510  int ifd = open(d, O_WRONLY | O_CREAT | O_EXCL | O_LARGEFILE ,
511  S_IRWXU | S_IROTH | S_IRGRP );
512  if (ifd < 0)
513  {
514  pthread_mutex_lock(&M_cout);
515  cout << " error opening file " << d << endl;
516  perror ( d);
517  pthread_mutex_unlock(&M_cout);
518 
519  *fd = 0;
520  return -1;
521  }
522 
523  *fd = ifd;
524  CurrentFilename = d;
526  md5_init(&md5state);
527 
528  file_is_open =1;
529 
530  // this is tricky. If we open the very first file, we latch in this event number.
531  // but if we need to roll over, we find that out when the current buffer is written,
532  // and the event nr is what's in this buffer, not the last written buffer.
533  // only for the real start this is right:
534  if (current_filesequence == 0)
535  {
538  }
539 
540 
541  daq_generate_json (0); // generate the "new file" report
542 
543  return 0;
544 }
545 
547 {
548 
549  if ( file_is_open) return -1;
550 
552  if (status) return -1;
553 
554  static char d[1024];
555  sprintf( d, TheFileRule.c_str(),
557 
558  status = server_send_beginrun_sequence(d, run_number, TheServerFD);
559  if ( status)
560  {
561  return -1;
562  }
563 
564  CurrentFilename = d;
566 
567  file_is_open =1;
568 
569  return 0;
570 }
571 
572 
573 // this function can be called sirectly from daq_end_interactive,
574 // or we can make a thread for the "immediate" option
575 void *daq_end_thread (void *arg)
576 {
577 
578  std::ostream *os = (std::ostream *) arg;
579 
580  // with an operator-induced daq_end, we make the event loop terminate
581  // and wait for it to be done.
582 
583  disable_trigger();
584 
585  // it is possible that we call daq_end before we ever started a run
586  if (ThreadEvt) pthread_join(ThreadEvt, NULL);
587 
588  daq_end(*os);
589  DAQ_ENDREQUESTED = 0;
590 
591  return 0;
592 
593 }
594 
595 
596 
597 
598 void *shutdown_thread (void *arg)
599 {
600 
601  unsigned long *t_args = (unsigned long *) arg;
602 
603 
604  pthread_mutex_lock(&M_cout);
605  cout << "shutting down... " << t_args[0] << " " << t_args[1] << endl;
606  int pid_fd = t_args[2];
607  TriggerControl = 0;
608  if ( TriggerH) delete TriggerH;
609  pthread_mutex_unlock(&M_cout);
610  // unregister out service
611  svc_unregister ( t_args[0], t_args[1]);
612 
613  flock(pid_fd, LOCK_UN);
614  unlink(pidfilename);
615 
616  sleep(2);
617  exit(0);
618 }
619 
620 
621 // this thread is watching for incoming requests for
622 // monitoring data
623 
624 std::queue<int> fd_queue; // this queue holds the monitoring requests
625 
626 
628 {
629 
630  struct sockaddr_in server_addr;
631  int sockfd;
632 
633  if ( (sockfd = socket(PF_INET, SOCK_STREAM, 0)) < 0 )
634  {
635  pthread_mutex_lock(&M_cout);
636  cout << "cannot create socket" << endl;
637  pthread_mutex_unlock(&M_cout);
638  }
639 
640  bzero( (char*)&server_addr, sizeof(server_addr));
641  server_addr.sin_family = PF_INET;
642  server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
643  server_addr.sin_port = htons(MONITORINGPORT + servernumber);
644 
645 
646  int status = bind(sockfd, (struct sockaddr*) &server_addr, sizeof(server_addr));
647  if (status < 0)
648  {
649  perror("bind");
650  }
651 
652  pthread_mutex_lock(&M_cout);
653  cout << "Listening for monitoring requests on port " << MONITORINGPORT + servernumber << endl;
654  pthread_mutex_unlock(&M_cout);
655 
656  listen(sockfd, 16);
657 
658  int dd_fd;
659  struct sockaddr_in out;
660 
661  while (sockfd > 0)
662  {
663 
664  socklen_t len = sizeof(out);
665  dd_fd = accept(sockfd, (struct sockaddr *) &out, &len);
666  if ( dd_fd < 0 )
667  {
668  pthread_mutex_lock(&M_cout);
669  cout << "error in accept socket" << endl;
670  pthread_mutex_unlock(&M_cout);
671  }
672  else
673  {
674  char *host = new char[64];
675 
676  getnameinfo((struct sockaddr *) &out, sizeof(struct sockaddr_in), host, 64,
677  NULL, 0, NI_NOFQDN);
678 
679  // time_t x = time(0);
680  // pthread_mutex_lock(&M_cout);
681  // cout << ctime(&x) << "new request for monitoring connection accepted from " << host << endl;
682  // pthread_mutex_unlock(&M_cout);
683 
684  pthread_mutex_lock(&FdManagementSem);
685  fd_queue.push(dd_fd);
686  pthread_mutex_unlock(&FdManagementSem);
687 
688  //kick off "the sender of monitoring data" thread
689  pthread_mutex_unlock(&MonitoringRequestSem);
690  }
691  }
692  return 0;
693 }
694 
695 void handler ( int sig)
696 {
697  //pthread_mutex_lock(&M_cout);
698  // cout << " in handler..." << endl;
699  //pthread_mutex_unlock(&M_cout);
700  pthread_mutex_unlock(&SendProtectSem);
701 }
702 
703 void *sendMonitorData( void *arg)
704 {
705 
706  int fd;
707  int status;
708 
709  while (1)
710  {
711  // we wait here until a monitoring request comes
712  //pthread_mutex_lock(&MonitoringRequestSem);
713 
714  // when we get here, there are requestst. Now wait for a buffer
715  // to be available:
716 
717  // pthread_mutex_lock(&M_cout);
718  // cout << " locking SendSem " << endl;
719  // pthread_mutex_unlock(&M_cout);
720 
721  pthread_mutex_lock( &SendSem);
722 
723  // Now we go through all requests in the queue
724  while ( !fd_queue.empty() )
725  {
726  // now we are manipulating the queue. To do that, we need to
727  // lock-protect this:
728 
729  pthread_mutex_lock(&FdManagementSem);
730  fd = fd_queue.front();
731  fd_queue.pop();
732 
733  pthread_mutex_unlock(&FdManagementSem);
734 
735  int max_length =0;
736 
737  // we always wait for a controlword which tells us what to do next.
738  if ( (status = readn (fd, (char *) &max_length, 4) ) <= 0)
739  {
740  max_length = 0;
741  }
742  else
743  {
744  max_length = ntohl(max_length);
745 
746  status = transportBuffer->sendData(fd, max_length);
747 
748  int reply;
749  if ( ! status && ( status = readn (fd, (char *) &reply, 4) ) <= 0)
750  {
751  // pthread_mutex_lock(&M_cout);
752  // time_t x = time(0);
753  // cout << ctime(&x) << " connection was broken for fd " << fd << endl;
754  // pthread_mutex_unlock(&M_cout);
755  }
756  else
757  {
758  // reply = ntohl(reply);
759  // pthread_mutex_lock(&M_cout);
760  // cout << " reply = " << reply << endl;
761  // pthread_mutex_unlock(&M_cout);
762  }
763  }
764  close(fd);
765  }
766  // pthread_mutex_lock(&M_cout);
767  // cout << " unlocking SendProtectSem " << endl;
768  // pthread_mutex_unlock(&M_cout);
769 
770  pthread_mutex_unlock(&SendProtectSem);
771  if ( end_thread) pthread_exit(0);
772 
773  }
774  return 0;
775 }
776 
777 void *writebuffers ( void * arg)
778 {
779 
780 
781  while(1)
782  {
783 
784  pthread_mutex_lock( &WriteSem); // we wait for an unlock
785 
786 
788  if ( daq_open_flag && outfile_fd)
789  {
790  unsigned int bytecount = transportBuffer->writeout(outfile_fd);
791  NumberWritten++;
792  BytesInThisRun += bytecount;
793  BytesInThisFile += bytecount;
794  }
795  else if ( daq_server_flag && TheServerFD)
796  {
797  unsigned int bytecount = transportBuffer->sendout(TheServerFD);
798  NumberWritten++;
799  BytesInThisRun += bytecount;
800  BytesInThisFile += bytecount;
801  }
802 
803  if ( end_thread) pthread_exit(0);
804 
805  pthread_mutex_unlock(&WriteProtectSem);
806  }
807  return 0;
808 }
809 
811 {
812 
813  // pthread_mutex_lock(&M_cout);
814  // cout << __LINE__ << " " << __FILE__ << " switching buffer" << endl;
815  // pthread_mutex_unlock(&M_cout);
816 
817  daqBuffer *spare;
818 
819  pthread_mutex_lock(&WriteProtectSem);
820  pthread_mutex_lock(&SendProtectSem);
821 
822  fillBuffer->addEoB();
823 
824  //switch buffers
825  spare = transportBuffer;
827  fillBuffer = spare;
828 
829  //+++
831 
832 
833 
834  // let's see if we need to roll over
836  {
837  unsigned int blength = transportBuffer->getLength();
838 
839  if ( blength + BytesInThisFile > RolloverLimit * 1024 * 1024 * 1024)
840  {
841 
842  if ( daq_server_flag)
843  {
844 
846  static char d[1024];
847  sprintf( d, TheFileRule.c_str(),
849 
850  cout << __FILE__ << " " << __LINE__ << " rolling over " << d << endl;
851 
853  CurrentFilename = d;
854  }
855  else // not server
856  {
857  close(outfile_fd);
858  file_is_open = 0;
859 
862 
864  int status = open_file ( TheRun, &outfile_fd);
865  if (status)
866  {
867  cout << MyHostName << "Could not open output file - Run " << TheRun << " file sequence " << current_filesequence<< endl;
868  }
869  }
870  // cout << MyHostName << " -- Rolling output file over at "
871  // << transportBuffer->getLength() + BytesInThisFile
872  // << " sequence: " << current_filesequence
873  // << " limit: " << RolloverLimit
874  // << " now: " << CurrentFilename
875  // << endl;
876  BytesInThisFile = 0;
877  }
878  }
879 
881  pthread_mutex_unlock(&WriteSem);
882  pthread_mutex_unlock(&SendSem);
883  return 0;
884 
885 }
886 
887 int daq_set_runnumberfile(const char *file, const int flag)
888 {
889  if ( flag)
890  {
891  TheRunnumberfile.clear();
892  RunnumberfileIsSet = 0;
893  return 0;
894  }
895 
897  RunnumberfileIsSet = 1;
898  FILE *fp = fopen(TheRunnumberfile.c_str(), "r");
899  int r = 0;
900  if (fp)
901  {
902  int status = fscanf(fp, "%d", &r);
903  if ( status != 1) r = 0;
904  if ( ! TheRun )
905  {
906  TheRun = r;
907  }
908  fclose(fp);
909  }
910 
911  return 0;
912 }
913 
915 {
916  if ( !RunnumberfileIsSet ) return 1;
917  if ( RunControlMode ) return 1;
918 
919  FILE *fp = fopen(TheRunnumberfile.c_str(), "w");
920  if (fp )
921  {
922  fprintf(fp, "%d\n", run);
923  fclose(fp);
924  }
925 
926  return 0;
927 }
928 
929 
930 int daq_set_runnumberApp(const char *file, const int flag)
931 {
932  if ( flag)
933  {
934  TheRunnumberApp.clear();
935  RunnumberAppIsSet = 0;
936  return 0;
937  }
938 
940  RunnumberAppIsSet = 1;
941  return 0;
942 }
943 
945 {
946  if ( ! RunnumberAppIsSet) return -1;
947  FILE *fp = popen(TheRunnumberApp.c_str(),"r");
948  if (fp == NULL)
949  {
950  std::cerr << "error running the runnumber app" << std::endl;
951  return -1;
952  }
953  char in[64];
954  int len = fread(in, 1, 64, fp);
955  pclose(fp);
956 
957  std::stringstream s (in);
958  int run;
959  if (! (s >> run) )
960  {
961  return -1;
962  }
963 
964  return run;
965 }
966 
967 
968 
969 int daq_set_filerule(const char *rule)
970 {
971  TheFileRule = rule;
972  return 0;
973 }
974 
975 int daq_set_name(const char *name)
976 {
977  MyName = name;
978  if (servernumber)
979  {
980  MyName = MyName + ":" + to_string(servernumber);
981  }
983  return 0;
984 }
985 
986 #ifdef HAVE_MOSQUITTO_H
987 int daq_set_mqtt_host(const char * host, const int port, std::ostream& os)
988 {
989  std::cout << __FILE__ << " " << __LINE__ << " mqtt host " << host << " port " << port << endl;
990  if (mqtt) delete mqtt;
991 
992  if (strcasecmp(host, "None") == 0) // delete existing def
993  {
994  mqtt = 0;
995  mqtt_host = "";
996  mqtt_port = 0;
997  return 0;
998  }
999 
1000  mqtt_host = host;
1001  mqtt = new MQTTConnection(mqtt_host, "rcdaq", port);
1002  if ( mqtt->Status())
1003  {
1004  delete mqtt;
1005  mqtt =0;
1006  mqtt_host = "";
1007  mqtt_port = 0;
1008  os << "Could not connect to host " << host << " on port " << port << endl;
1009  return 1;
1010  }
1011 
1012  mqtt_host = host;
1013  mqtt_port = port;
1014 
1015  return 0;
1016 }
1017 
1018 int daq_get_mqtt_host(std::ostream& os)
1019 {
1020  if (!mqtt)
1021  {
1022  os << " No MQTT host defined" << endl;
1023  return 1;
1024  }
1025  os << " Host " << mqtt->GetHostName() << " " << " port " << mqtt->GetPort() << endl;
1026  return 0;
1027 }
1028 #endif
1029 
1030 
1031 
1032 // this is selecting from any of the existing run types
1033 int daq_setruntype(const char *type, std::ostream& os )
1034 {
1035  std::string _type = type;
1036  std::map <string,string>::const_iterator iter = RunTypes.begin();
1037  for ( ; iter != RunTypes.end(); ++iter)
1038  {
1039  if ( iter->first == _type )
1040  {
1041  TheFileRule = iter->second;
1042  TheRunType = _type;
1043  return 0;
1044  }
1045  }
1046  os << " Run type " << type << " is not defined " << endl;
1047  return 1;
1048 }
1049 
1050 int daq_getruntype(const int flag, std::ostream& os)
1051 {
1052  std::map <string,string>::const_iterator iter = RunTypes.begin();
1053  for ( ; iter != RunTypes.end(); ++iter)
1054  {
1055  if ( iter->second == TheFileRule )
1056  {
1057  if ( flag == 2)
1058  {
1059  os << iter->first << " - " << iter->second << endl;
1060  return 0;
1061  }
1062  else
1063  {
1064  os << iter->first << endl;
1065  return 0;
1066  }
1067  }
1068  }
1069  return 0;
1070 }
1071 
1072 // this is defining a new run type (or re-defining an old one)
1073 int daq_define_runtype(const char *type, const char *rule)
1074 {
1075  std::string _type = type;
1076  // std::map <string,string>::const_iterator iter = RunTypes.begin();
1077  // for ( ; iter != RunTypes.end(); ++iter)
1078  // {
1079  // if ( iter->first == _type )
1080  // {
1081  // RunTypes[_type] = rule;
1082  // return 0;
1083  // }
1084  // }
1085  RunTypes[_type] = rule;
1086  return 0;
1087 }
1088 
1089 int daq_status_runtypes (std::ostream& os )
1090 {
1091  os << " -- defined Run Types: ";
1092  return daq_list_runtypes(2, os);
1093 }
1094 
1095 int daq_list_runtypes(const int flag, std::ostream& os)
1096 {
1097 
1098  if ( flag && RunTypes.size() == 0 )
1099  {
1100  os << " (none)" <<endl;
1101  return 0;
1102  }
1103  if (flag ==2) os << endl;
1104 
1105  std::map <string,string>::const_iterator iter = RunTypes.begin();
1106  for ( ; iter != RunTypes.end(); ++iter)
1107  {
1108  if (flag)
1109  {
1110  os << " " << setw(12) << iter->first << " - " << iter->second << endl;
1111  }
1112  else
1113  {
1114  os << iter->first << endl;
1115  }
1116 
1117  }
1118  return 0;
1119 }
1120 
1121 int daq_get_name (std::ostream& os )
1122 {
1123  os << MyName << endl;
1124  return 0;
1125 }
1126 
1128 {
1129  return MyName;
1130 }
1131 
1133 {
1134  return TheFileRule;
1135 }
1136 
1138 {
1139  return CurrentFilename;
1140 }
1141 
1143 {
1144  return PreviousFilename;
1145 }
1146 
1148 {
1149 
1150  time_t now_time = time(0);
1151  if ( now_time == last_volume_time) return 0;
1152  double time_delta = now_time - last_volume_time;
1153  double mb_per_second = ( run_volume - last_volume) / time_delta / 1024. /1204.;
1155  last_volume_time = now_time;
1156  return mb_per_second;
1157 }
1158 
1160 {
1161  time_t now_time = time(0);
1162  if ( now_time == last_speed_time) return 0;
1163  double time_delta = now_time - last_speed_time;
1164  double events_per_second = ( Event_number - last_event_nr) / time_delta;
1166  last_speed_time = now_time;
1167  return events_per_second;
1168 }
1169 
1170 
1171 void * daq_begin_thread( void *arg)
1172 {
1173  int irun = *(int*)arg;
1174  int status = daq_begin( irun, std::cout);
1175  if (status)
1176  {
1177  // not sure what to do exactly
1178  cout << __FILE__ << " " << __LINE__ << " asynchronous begin run failed" << endl;
1179  }
1180 
1181  DAQ_BEGININPROGRESS = 0;
1182 
1183  return 0;
1184 }
1185 
1186 int daq_begin_immediate(const int irun, std::ostream& os)
1187 {
1188  static unsigned int b_arg;
1189  b_arg = irun;
1190 
1191  if ( DAQ_RUNNING )
1192  {
1193  os << MyHostName << "Run is active" << endl;
1194  return -1;
1195  }
1196  if ( irun )
1197  {
1198  os << MyHostName << "Run " << irun << " begin requested" << endl;
1199  }
1200  else // I need to think about this a bit. We let the begin_run update the run number.
1201  {
1202  os << MyHostName << "Run " << TheRun+1 << " begin requested" << endl;
1203  }
1204 
1205  DAQ_BEGININPROGRESS = 1;
1206 
1207  pthread_t t;
1208 
1209  int status = pthread_create(&t, NULL,
1211  (void *) &b_arg);
1212  if (status )
1213  {
1214 
1215  cout << "begin_run failed " << status << endl;
1216  os << "begin_run failed " << status << endl;
1217  return -1;
1218  }
1219  return 0;
1220 }
1221 
1222 
1223 int daq_begin(const int irun, std::ostream& os)
1224 {
1225  if ( DAQ_RUNNING )
1226  {
1227  os << MyHostName << "Run is already active" << endl;;
1228  return -1;
1229  }
1230 
1231  if ( RunControlMode && irun ==0 )
1232  {
1233  os << MyHostName << " No automatic Run Numbers in Run Control Mode" << endl;;
1234  return -1;
1235  }
1236 
1238  {
1239  os << MyHostName << "*** Previous error with server connection" << endl;;
1240  return -1;
1241  }
1242 
1243  if (ThreadEvt) pthread_join(ThreadEvt, NULL);
1244 
1245 
1246  // set the status to "running"
1247  DAQ_RUNNING = 1;
1249 
1250  // if we are in run Control mode, we don't allow automatic run numbers
1251 
1252  if ( irun ==0)
1253  {
1254  if ( RunnumberAppIsSet)
1255  {
1256  int run = getRunNumberFromApp();
1257  if ( run < 0)
1258  {
1259  os << MyHostName << "Could not obtain a run number from " << TheRunnumberApp << ", run not started" << endl;
1260  DAQ_RUNNING = 0;
1261  return -1;
1262  }
1263  TheRun = run;
1264  }
1265  else
1266  {
1267  TheRun++;
1268  }
1269  }
1270  else
1271  {
1272  TheRun = irun;
1273  }
1274 
1275  //initialize the Buffer and event number
1276  Buffer_number = 1;
1277  Event_number = 1;
1281 
1282  // initialize the run/file volume
1283  BytesInThisRun = 0; // bytes actually written
1284  BytesInThisFile = 0;
1285 
1286 
1287  if ( daq_open_flag)
1288  {
1289  if ( daq_server_flag)
1290  {
1291 
1293  if ( !status)
1294  {
1295 
1296  if (ElogH) ElogH->BegrunLog( TheRun,"RCDAQ",
1298 
1300  last_bufferwritetime = time(0); // initialize this at begin-run
1301 
1302  }
1303  else
1304  {
1305  os << MyHostName << "Could not open remote output file - Run " << TheRun << " not started" << endl;;
1306  DAQ_RUNNING = 0;
1307  return -1;
1308  }
1309  }
1310 
1311  else // we log to a standard local file
1312  {
1313  int status = open_file ( TheRun, &outfile_fd);
1314  if ( !status)
1315  {
1316  if (ElogH) ElogH->BegrunLog( TheRun,"RCDAQ",
1318 
1320  last_bufferwritetime = time(0); // initialize this at begin-run
1321 
1322  }
1323  else
1324  {
1325  os << MyHostName << "Could not open output file - Run " << TheRun << " not started" << endl;;
1326  DAQ_RUNNING = 0;
1327  return -1;
1328  }
1329  }
1330 
1331 
1332  }
1333 
1334 
1335 
1336  // just to be safe, clear the "end requested" bit
1337  DAQ_ENDREQUESTED = 0;
1338 
1339  cout << "starting run " << TheRun << " at " << time(0) << endl;
1340  set_eventsizes();
1341  // initialize Buffer1 to be the fill buffer
1342  //fillBuffer = &Buffer1;
1343  //transportBuffer = &Buffer2;
1344 
1345  // a safety check: see that the buffers haven't been adjusted
1346  // to a smaller value than the event size
1347  int wantedmaxsize = 0;
1348  for (int i = 0; i< MAXEVENTID; i++)
1349  {
1350  if ( (4*Eventsize[i] + 4*32) > fillBuffer->getMaxSize()
1351  || (4*Eventsize[i] + 4*32) > transportBuffer->getMaxSize() )
1352  {
1353  int x = 4*Eventsize[i] + 4*32; // this is now in bytes
1354  if ( x > wantedmaxsize ) wantedmaxsize = x;
1355  }
1356  }
1357  if ( wantedmaxsize)
1358  {
1359  if ( fillBuffer->setMaxSize(wantedmaxsize) || transportBuffer->setMaxSize(wantedmaxsize))
1360  {
1361  os << MyHostName << "Cannot start run - event sizes larger than buffer, size "
1362  << wantedmaxsize/1024 << " Buffer size "
1363  << transportBuffer->getMaxSize()/1024 << endl;
1364  DAQ_RUNNING = 0;
1365  return -1;
1366  }
1367  // os << " Buffer size increased to " << transportBuffer->getMaxSize()/1024 << " KB"<< endl;
1368 
1369  }
1370 
1372  last_event_nr = 0;
1373  last_volume = 0;
1374 
1375 
1376  // here we sucessfully start a run. So now we set the env. variables
1377  char str[128];
1378  // RUNNUMBER
1379  sprintf( str, "%d", TheRun);
1380  setenv ( "DAQ_RUNNUMBER", str, 1);
1381  setenv ( "DAQ_FILERULE", TheFileRule.c_str() , 1);
1382 
1383  sprintf( str, "%ld", StartTime);
1384  setenv ( "DAQ_STARTTIME", str , 1);
1385 
1386  if ( daq_open_flag || daq_server_flag )
1387  {
1388  setenv ( "DAQ_FILENAME", CurrentFilename.c_str() , 1);
1389  }
1390 
1391 
1392  // we are opening a new file here, so we restart the MD5 calculation
1393  md5_init(&md5state);
1394 
1396 
1397  run_volume = 0;
1398 
1399  device_init();
1400 
1401  // readout the begin-run event
1403 
1404  //now enable the interrupts and reset the deadtime
1405  enable_trigger();
1406 
1408 
1409  os << MyHostName << "Run " << TheRun << " started" << endl;
1410 
1411  return 0;
1412 }
1413 
1414 int daq_end_immediate(std::ostream& os)
1415 {
1416  if ( ! (DAQ_RUNNING) )
1417  {
1418  os << MyHostName << "Run is not active" << endl;;
1419  return -1;
1420  }
1421  os << MyHostName << "Run " << TheRun << " end requested" << endl;
1422  DAQ_ENDREQUESTED = 1;
1423 
1424  pthread_t t;
1425 
1426  void * x = &os;
1427 
1428  //cout << __FILE__ << " " << __LINE__ << " calling end_thread as thread" << endl;
1429  int status = pthread_create(&t, NULL,
1431  x);
1432  if (status )
1433  {
1434 
1435  cout << "end_run failed " << status << endl;
1436  os << "end_run failed " << status << endl;
1437  }
1438  //cout << __FILE__ << " " << __LINE__ << " done setting up end_thread" << endl;
1439  return 0;
1440 }
1441 
1442 
1443 // this function is to hold further interactions until a asynchronous begin-run is over
1445 {
1446  while ( DAQ_BEGININPROGRESS ) usleep(10000);
1447  return 0;
1448 }
1449 
1450 // this function is to avoid a race condition with the asynchronous "end requested" feature
1452 {
1453 
1454  while ( DAQ_ENDREQUESTED )
1455  {
1456  usleep(10000);
1457  }
1458 
1459  return 0;
1460 }
1461 
1462 int daq_end_interactive(std::ostream& os)
1463 {
1464 
1465  void *x = &os;
1466  daq_end_thread (x);
1467  return 0;
1468 }
1469 
1470 
1471 
1472 int daq_end(std::ostream& os)
1473 {
1474 
1475  if ( ! (DAQ_RUNNING) )
1476  {
1477  os << MyHostName << "Run is not active" << endl;;
1478  return -1;
1479  }
1480 
1481  disable_trigger();
1482  device_endrun();
1483 
1485  switch_buffer(); // we force a buffer flush
1486 
1487  if ( file_is_open )
1488  {
1489 
1490  pthread_mutex_lock(&WriteProtectSem);
1491  pthread_mutex_unlock(&WriteProtectSem);
1492 
1493  double v = run_volume;
1494  v /= (1024*1024);
1495 
1496  if (ElogH) ElogH->EndrunLog( TheRun,"RCDAQ", Event_number, v, StartTime);
1497 
1498  if ( daq_server_flag)
1499  {
1502  {
1503  std::cout << __FILE__ << " " << __LINE__ << " error in closing connection... " << std::endl;
1504  }
1505  close(TheServerFD);
1506  TheServerFD = 0;
1507  }
1508  else
1509  {
1510  close (outfile_fd);
1511  daq_generate_json(1);
1512  outfile_fd = 0;
1513  }
1514  file_is_open = 0;
1515 
1516 
1517  }
1518 
1519 
1520 
1521  unsetenv ("DAQ_RUNNUMBER");
1522  unsetenv ("DAQ_FILENAME");
1523  unsetenv ("DAQ_STARTTIME");
1524 
1526  Event_number = 0;
1528  run_volume = 0; // volume in longwords
1529  BytesInThisRun = 0; // bytes actually written
1530  BytesInThisFile = 0;
1531  Buffer_number = 0;
1533  CurrentFilename = "";
1534  StartTime = 0;
1535  DAQ_RUNNING = 0;
1536 
1538  last_event_nr = 0;
1539  last_volume = 0;
1540 
1542 
1543  DAQ_ENDREQUESTED = 0;
1544 
1545  os << MyHostName << "Run " << TheRun << " ended" << endl;
1546 
1547  return 0;
1548 }
1549 
1550 int daq_fake_trigger (const int n, const int waitinterval)
1551 {
1552  int i;
1553  for ( i = 0; i < n; i++)
1554  {
1555 
1557 
1558 // pthread_mutex_lock(&M_cout);
1559 // cout << "trigger" << endl;
1560 // pthread_mutex_unlock(&M_cout);
1561 
1562  usleep (200000);
1563 
1564  }
1565  return 0;
1566 }
1567 
1568 
1569 void * EventLoop( void *arg)
1570 {
1571 
1572  // pthread_mutex_lock(&M_cout);
1573  // std::cout << __FILE__ << " " << __LINE__ << " event loop starting... " << std::endl;
1574  // pthread_mutex_unlock(&M_cout);
1575 
1576  int rstatus;
1577 
1578  while (TriggerControl)
1579  {
1580 
1581  // let's see if we have a TriggerHelper object
1582  if (TriggerH)
1583  {
1585  }
1586  else // we auto-generate a few triggers
1587  {
1588  CurrentEventType = 1;
1589  usleep (100000);
1590  }
1591 
1592 
1593  if (CurrentEventType)
1594  {
1595 
1596  if ( DAQ_RUNNING )
1597  {
1598 
1599  rstatus = readout(CurrentEventType);
1600 
1601  if ( rstatus) // we got an endrun signal
1602  {
1603  TriggerControl = 0;
1604  //reset_deadtime();
1605  cout << __LINE__ << " calling daq_end" << endl;
1606  daq_end ( std::cout);
1607  }
1608  else
1609  {
1610  rearm(DATAEVENT);
1611  reset_deadtime();
1612  }
1613  }
1614  else // no, we are not running
1615  {
1616  cout << "Run not active" << endl;
1617  // reset todo, and the DAQ_TRIGGER bit.
1618  TriggerControl = 0;
1619  }
1620  }
1621  }
1622 
1623  return 0;
1624 
1625 }
1626 
1627 
1629 {
1630  DeviceList.push_back (d);
1631  return 0;
1632 
1633 }
1634 
1635 
1637 {
1638 
1639  deviceiterator d_it;
1640 
1641  for ( d_it = DeviceList.begin(); d_it != DeviceList.end(); ++d_it)
1642  {
1643  // cout << "calling init on ";
1644  //(*d_it)->identify();
1645  (*d_it)->init();
1646  }
1647 
1648  return 0;
1649 }
1650 
1652 {
1653 
1654  deviceiterator d_it;
1655 
1656  for ( d_it = DeviceList.begin(); d_it != DeviceList.end(); ++d_it)
1657  {
1658  // cout << "calling init on ";
1659  //(*d_it)->identify();
1660  (*d_it)->endrun();
1661  }
1662 
1663  return 0;
1664 }
1665 
1666 
1667 int readout(const int etype)
1668 {
1669 
1670  // pthread_mutex_lock(&M_cout);
1671  // cout << " readout etype = " << etype << endl;
1672  // pthread_mutex_unlock(&M_cout);
1673 
1674  int len = EVTHEADERLENGTH;
1675 
1676  if (etype < 0 || etype>MAXEVENTID) return 0;
1677 
1678  deviceiterator d_it;
1679 
1680 
1681  int status = fillBuffer->nextEvent(etype,Event_number, Eventsize[etype]);
1682  if (status != 0)
1683  {
1684  switch_buffer();
1685  status = fillBuffer->nextEvent(etype,Event_number, Eventsize[etype]);
1686  }
1687  Event_number++;
1689 
1690  for ( d_it = DeviceList.begin(); d_it != DeviceList.end(); ++d_it)
1691  {
1692  len += fillBuffer->addSubevent ( (*d_it) );
1693  }
1694 
1695  run_volume += 4*len;
1696 
1697  int returncode = 0;
1698 
1699  if ( DAQ_RUNNING )
1700  {
1701  if ( etype == DATAEVENT && max_volume > 0 && run_volume >= max_volume)
1702  {
1703  cout << " automatic end after " << max_volume /(1024*1024) << " Mb" << endl;
1704  returncode = 1;
1705  }
1706 
1707  if ( etype == DATAEVENT && max_events > 0 && Event_number >= max_events )
1708  {
1709  cout << " automatic end after " << max_events<< " events" << endl;
1710  returncode = 1;
1711  }
1712  }
1713 
1715  {
1716  switch_buffer();
1717  // cout << "adaptive buffer switching" << endl;
1718  }
1719 
1720  return returncode;
1721 }
1722 
1723 
1724 int rearm(const int etype)
1725 {
1726 
1727  if (etype < 0 || etype>MAXEVENTID) return 0;
1728 
1729  deviceiterator d_it;
1730 
1731  for ( d_it = DeviceList.begin(); d_it != DeviceList.end(); ++d_it)
1732  {
1733  (*d_it)->rearm(etype);
1734  }
1735 
1736  return 0;
1737 }
1738 
1740 {
1741  int i;
1742  int size;
1743  deviceiterator d_it;
1744 
1745  for (i = 0; i< MAXEVENTID; i++)
1746  {
1747  size = EVTHEADERLENGTH;
1748 
1749 
1750  for ( d_it = DeviceList.begin(); d_it != DeviceList.end(); ++d_it)
1751  {
1752  size += (*d_it)->max_length(i) ;
1753  }
1754 
1755  Eventsize[i] = size;
1756  if (size>EVTHEADERLENGTH)
1757  cout << "Event id " << i << " size " << size << endl;
1758  }
1759 }
1760 
1761 int daq_open (std::ostream& os)
1762 {
1763 
1764  if ( DAQ_RUNNING )
1765  {
1766  os << MyHostName << "Run is active" << endl;;
1767  return -1;
1768  }
1769 
1770  // if ( daq_server_flag)
1771  // {
1772  // os << "Server logging is enabled" << endl;;
1773  // return -1;
1774  // }
1775 
1777 
1778  daq_open_flag =1;
1779  return 0;
1780 }
1781 
1782 int daq_set_compression (const int flag, std::ostream& os)
1783 {
1784 
1785  if ( DAQ_RUNNING )
1786  {
1787  os << MyHostName << "Run is active" << endl;;
1788  return -1;
1789  }
1790  if (flag)
1791  {
1794  }
1795  else
1796  {
1799  }
1800 
1801  return 0;
1802 }
1803 
1804 
1805 int daq_set_server (const char *hostname, const int port, std::ostream& os)
1806 {
1807 
1808  if ( DAQ_RUNNING )
1809  {
1810  os << MyHostName << "Run is active" << endl;;
1811  return -1;
1812  }
1813 
1814  daq_server_name = hostname;
1815  if ( daq_server_name == "None")
1816  {
1817  daq_server_flag = 0;
1818  daq_server_name = "";
1819  daq_server_port = 0;
1820  return 0;
1821  }
1822 
1823  daq_server_flag = 1;
1824  daq_server_port = port;
1825  if ( ! daq_server_port) daq_server_port = 5001;
1826 
1827  return 0;
1828 }
1829 
1830 
1831 
1833 {
1834 
1835  if ( !daq_server_flag)
1836  {
1837  return -1;
1838  }
1839 
1841 
1842  int theport = daq_server_port;
1843  if ( ! theport) theport = 5001;
1844 
1845  TheServerFD = open_serverSocket(daq_server_name.c_str(), theport);
1846  if ( TheServerFD < 0)
1847  {
1848  if ( TheServerFD == -1)
1849  {
1850  cout << __FILE__<< " " << __LINE__ << " error connecting to server " << daq_server_name << " on port " << theport << endl;
1851  }
1852  else
1853  {
1854  cout << __FILE__<< " " << __LINE__ << " error connecting to server " << daq_server_name << " on port " << theport << " " << gai_strerror(TheServerFD) << endl;
1855  }
1856 
1857  TheServerFD = 0;
1859  return -1;
1860  }
1861 
1862  return 0;
1863 }
1864 
1865 
1866 int daq_shutdown(const unsigned long servernumber, const unsigned long versionnumber, const int pid_fd,
1867  std::ostream& os)
1868 {
1869 
1870  if ( DAQ_RUNNING )
1871  {
1872  os << MyHostName << "Run is active" << endl;;
1873  return -1;
1874  }
1875 
1876  if (daq_server_flag)
1877  {
1878  daq_close(std::cout);
1879  }
1880 
1881  static unsigned long t_args[3];
1882  t_args[0] = servernumber;
1883  t_args[1] = versionnumber;
1884  t_args[3] = pid_fd;
1885 
1886 
1887  pthread_t t;
1888 
1889  int status = pthread_create(&t, NULL,
1890  shutdown_thread,
1891  (void *) t_args);
1892 
1893  if (status )
1894  {
1895  cout << "cannot shut down " << status << endl;
1896  os << MyHostName << "cannot shut down " << status << endl;
1897  return -1;
1898  }
1899  os << " ";
1900  return 0;
1901 }
1902 
1903 int is_open()
1904 {
1905  return daq_open_flag;
1906 }
1907 
1909 {
1910  return daq_server_flag;
1911 }
1912 
1913 int daq_close (std::ostream& os)
1914 {
1915 
1916  if ( DAQ_RUNNING )
1917  {
1918  os << MyHostName << "Run is active" << endl;;
1919  return -1;
1920  }
1921 
1922  daq_open_flag =0;
1924 
1925  return 0;
1926 }
1927 
1928 int daq_server_close (std::ostream& os)
1929 {
1930 
1931  if ( DAQ_RUNNING )
1932  {
1933  os << MyHostName << "Run is active" << endl;;
1934  return -1;
1935  }
1937  {
1938  std::cout << __FILE__ << " " << __LINE__ << " error in closing connection... " << std::endl;
1939  }
1940  close(TheServerFD);
1941  TheServerFD = 0;
1942 
1943  daq_server_flag =0;
1944  return 0;
1945 }
1946 
1947 
1948 int daq_list_readlist(std::ostream& os)
1949 {
1950 
1951  deviceiterator d_it;
1952 
1953  for ( d_it = DeviceList.begin(); d_it != DeviceList.end(); ++d_it)
1954  {
1955  (*d_it)->identify(os);
1956  }
1957 
1958  return 0;
1959 
1960 }
1961 
1962 int daq_clear_readlist(std::ostream& os)
1963 {
1964 
1965  if ( DAQ_RUNNING )
1966  {
1967  os << MyHostName << "Run is active" << endl;;
1968  return -1;
1969  }
1970 
1971  deviceiterator d_it;
1972 
1973  for ( d_it = DeviceList.begin(); d_it != DeviceList.end(); ++d_it)
1974  {
1975  delete (*d_it);
1976  }
1977 
1978  DeviceList.clear();
1979  os << MyHostName << "Readlist cleared" << endl;
1980 
1981  return 0;
1982 }
1983 
1984 
1985 int rcdaq_init( const int snumber, pthread_mutex_t &M)
1986 {
1987 
1988  int status;
1989 
1990  servernumber = snumber;
1991  ThePort += servernumber;
1992 
1993  char hostname[HOST_NAME_MAX];
1994  status = gethostname(hostname, HOST_NAME_MAX);
1995  if (!status)
1996  {
1997  shortHostName = hostname;
1998  MyHostName = hostname;
1999  if (servernumber)
2000  {
2003  }
2004  MyHostName += " - ";
2005  }
2007 
2008  // pthread_mutex_init(&M_cout, 0);
2009  M_cout = M;
2010 
2011  pthread_mutex_init( &WriteSem, 0);
2012  pthread_mutex_init( &WriteProtectSem, 0);
2013 
2014 
2015  pthread_mutex_init( &MonitoringRequestSem, 0);
2016  pthread_mutex_init( &SendSem, 0);
2017  pthread_mutex_init( &SendProtectSem, 0);
2018  pthread_mutex_init( &FdManagementSem,0);
2019 
2020  // pre-lock them except the "protect" ones
2021  pthread_mutex_lock( &MonitoringRequestSem);
2022  pthread_mutex_lock( &WriteSem);
2023  pthread_mutex_lock( &SendSem);
2024 
2025  ThreadEvt = 0;
2026 
2027  outfile_fd = 0;
2028 
2029  // we give the buffers our state variable
2032 
2033  fillBuffer = &Buffer1;
2035 
2036 
2037 #ifdef WRITEPRDF
2040 #endif
2041 
2042 
2043  status = pthread_create(&ThreadId, NULL,
2044  writebuffers,
2045  (void *) 0);
2046 
2047  if (status )
2048  {
2049  cout << "error in write thread create " << status << endl;
2050  exit(0);
2051  }
2052  else
2053  {
2054  pthread_mutex_lock(&M_cout);
2055  cout << "write thread created" << endl;
2056  pthread_mutex_unlock(&M_cout);
2057  }
2058 
2059  status = pthread_create(&ThreadMon, NULL,
2060  sendMonitorData,
2061  (void *) 0);
2062  if (status )
2063  {
2064  cout << "error in send monitor data thread create " << status << endl;
2065  exit(0);
2066  }
2067  else
2068  {
2069  pthread_mutex_lock(&M_cout);
2070  cout << "monitor thread created" << endl;
2071  pthread_mutex_unlock(&M_cout);
2072  }
2073 
2074  status = pthread_create(&ThreadMon, NULL,
2076  (void *) 0);
2077  if (status )
2078  {
2079  cout << "error in send monitor data thread create " << status << endl;
2080  exit(0);
2081  }
2082  else
2083  {
2084  pthread_mutex_lock(&M_cout);
2085  cout << "monitor request thread created" << endl;
2086  pthread_mutex_unlock(&M_cout);
2087  }
2088 
2089 
2090  // std::ostringstream outputstream;
2091  // daq_webcontrol ( ThePort, outputstream);
2093 
2094 
2095 
2096  return 0;
2097 }
2098 
2099 
2101 {
2102  if ( ! DAQ_RUNNING ) return -1;
2103  return TheRun;
2104 }
2105 
2106 int get_oldrunnumber() // like get_runnumber, but doesn't return -1 when stopped
2107 {
2108  return TheRun;
2109 }
2110 
2112 {
2113  if ( ! DAQ_RUNNING ) return 0;
2114  return Event_number;
2115 }
2117 {
2118  if ( ! DAQ_RUNNING ) return 0;
2119  double v = run_volume;
2120  return v / (1024*1024);
2121 }
2123 {
2124  if ( ! DAQ_RUNNING ) return 0;
2125  return time(0) - StartTime;
2126 }
2127 
2129 {
2130  return daq_open_flag;
2131 }
2133 {
2134  return daq_server_flag;
2135 }
2136 
2137 
2138 int daq_status( const int flag, std::ostream& os)
2139 {
2140 
2141  double volume = run_volume;
2142  volume /= (1024*1024);
2143 
2144  switch (flag)
2145  {
2146 
2147  case STATUSFORMAT_SHORT: // "short format"
2148 
2149  if ( DAQ_RUNNING )
2150  {
2151  os << TheRun << " " << Event_number -1 << " "
2152  << volume << " ";
2153  os << daq_open_flag << " ";
2154  os << daq_server_flag << " ";
2155  os << time(0) - StartTime << " ";
2156  os << get_current_filename() << " "
2157  << " \"" << MyName << "\"" << endl;
2158  }
2159  else
2160  {
2161  os << "-1 0 0 ";
2162  os << daq_open_flag << " ";
2163  os << daq_server_flag << " ";
2164  os << " 0 0"
2165  << " \"" << MyName << "\"" << endl;
2166 
2167  }
2168 
2169  break;
2170 
2171  case STATUSFORMAT_NORMAL:
2172  if ( DAQ_RUNNING )
2173  {
2174  os << MyHostName << "Run " << TheRun
2175  << " Event: " << Event_number -1
2176  << " Volume: " << volume;
2177  if ( daq_open_flag )
2178  {
2179  if ( daq_server_flag )
2180  {
2181  os << " Logging enabled via remote server " << daq_server_name << " Port " << daq_server_port;
2182  }
2183  else
2184  {
2185  os << " Logging enabled";
2186  if ( Buffer1.getCompression() ) os << " compression enabled";
2187  }
2188  }
2189  else
2190  {
2191  os << " Logging disabled";
2192  }
2193  }
2194  else // not running
2195  {
2196  os << MyHostName << "Stopped";
2197  if ( daq_open_flag )
2198  {
2199  if ( daq_server_flag )
2200  {
2201  os << " Logging enabled via remote server " << daq_server_name << " Port " << daq_server_port;
2202  }
2203  else
2204  {
2205  os << " Logging enabled";
2206  if ( Buffer1.getCompression() ) os << " compression enabled";
2207  }
2208  }
2209  else
2210  {
2211  if ( daq_server_flag )
2212  {
2213  os << " Logging disabled, remote server set " << daq_server_name << " Port " << daq_server_port;
2214  }
2215  else
2216  {
2217  os << " Logging disabled";
2218  }
2219  }
2220  }
2221 
2222  if ( RolloverLimit)
2223  {
2224  os << " File rollover: " << RolloverLimit << "GB";
2225  }
2226 
2227 
2228  os<< endl;
2229 
2230  break;
2231 
2232  default: // flag 2++
2233 
2234  if ( DAQ_RUNNING )
2235  {
2236  os << MyHostName << " running" << endl;
2237  //os << " " << MyHostName << ":" << endl;
2238  //os << " Running" << endl;
2239  os << " Run Number: " << TheRun << endl;
2240  os << " Event: " << Event_number << endl;;
2241  os << " Run Volume: " << volume << " MB"<< endl;
2242  os << " Filerule: " << daq_get_filerule() << endl;
2243  if ( RolloverLimit)
2244  {
2245  os << " File rollover: " << RolloverLimit << "GB" << endl;
2246  }
2247  //else
2248  // {
2249  // os << " File rollover: disabled" << endl;
2250  // }
2251 
2252  if ( daq_open_flag )
2253  {
2254  if ( daq_server_flag )
2255  {
2256  os << " Filename on server: " << get_current_filename() << endl;
2257  }
2258  else
2259  {
2260  os << " Filename: " << get_current_filename() << endl;
2261  }
2262  }
2263 
2264  os << " Duration: " << time(0) - StartTime << " s" <<endl;
2265 
2266  if (max_volume)
2267  {
2268  os << " Volume Limit: " << max_volume /(1024 *1024) << " Mb" << endl;
2269  }
2270  if (max_events)
2271  {
2272  os << " Event Limit: " << max_events << endl;
2273  }
2274  }
2275  else // not runnig
2276  {
2277  os << MyHostName << " Stopped" << endl;
2278  if ( daq_open_flag )
2279  {
2280  if ( daq_server_flag )
2281  {
2282  os << " Logging enabled via remote server " << daq_server_name << " Port " << daq_server_port << endl;
2283  }
2284  else
2285  {
2286  os << " Logging enabled";
2287  if ( Buffer1.getCompression() ) os << " compression enabled";
2288  os << endl;
2289  }
2290  }
2291  else
2292  {
2293  if ( daq_server_flag )
2294  {
2295  os << " Logging disabled, remote server set " << daq_server_name << " Port " << daq_server_port << endl;
2296  }
2297  else
2298  {
2299  os << " Logging disabled" << endl;
2300  }
2301  }
2302  os << " Filerule: " << daq_get_filerule() << endl;
2303 
2304  if ( RolloverLimit)
2305  {
2306  os << " File rollover: " << RolloverLimit << "GB" << endl;
2307  }
2308  //else
2309  // {
2310  // os << " File rollover: disabled" << endl;
2311  // }
2312 
2313 
2314  if (max_volume)
2315  {
2316  os << " Volume Limit: " << max_volume /(1024 *1024) << " Mb" << endl;
2317  }
2318  if (max_events)
2319  {
2320  os << " Event Limit: " << max_events << endl;
2321  }
2322  if ( TriggerH ) os << " have a trigger object" << endl;
2323  }
2324  if (RunControlMode)
2325  {
2326  os << " Run Control Mode enabled " << endl;
2327  }
2328 
2329  os << " Buffer Sizes: " << Buffer1.getMaxSize()/1024 << " KB";
2330  if ( adaptivebuffering)
2331  {
2332  os << " adaptive buffering: " << adaptivebuffering << " s";
2333  }
2334  os << endl;
2335 
2336  if ( ThePort)
2337  {
2338  os << " Web control Port: " << ThePort << endl;
2339  }
2340  else
2341  {
2342  os << " No Web control defined" << endl;
2343  }
2344 
2345  if ( daq_getEventFormat())
2346  {
2347  os << " Writing legacy format " << endl;
2348  }
2349 
2350  if ( ElogH)
2351  {
2352  os << " Elog: " << ElogH->getHost() << " " << ElogH->getLogbookName() << " Port " << ElogH->getPort() << endl;
2353  }
2354  else
2355  {
2356  os << " Elog: not defined" << endl;
2357  }
2358 #ifdef HAVE_MOSQUITTO_H
2359  if (mqtt)
2360  {
2361  os << " mqtt: " << mqtt->GetHostName() << " Port " << mqtt->GetPort() << endl;
2362  }
2363 #endif
2364 
2365  daq_status_runtypes ( os);
2366  daq_status_plugin(flag, os);
2367 
2368  break;
2369 
2370  }
2371 
2372  return 0;
2373 }
2374 
2375 int daq_webcontrol(const int port, std::ostream& os)
2376 {
2377 
2378  if ( port ==0)
2379  {
2380  ThePort=8899 + servernumber;
2381  }
2382  else
2383  {
2384  ThePort = port;
2385  }
2386 
2387  if ( ThreadWeb) // we had this thing running already
2388  {
2389  mg_end();
2390  pthread_join(ThreadWeb, NULL);
2391  ThreadWeb = 0;
2392  }
2393 
2394  int status = pthread_create(&ThreadWeb, NULL,
2395  mg_server,
2396  (void *) &ThePort);
2397 
2398  if (status )
2399  {
2400  os << MyHostName << "error in web service creation " << status << endl;
2401  ThePort=0;
2402  return -1;
2403  }
2404  else
2405  {
2406  os << MyHostName << "web service created" << endl;
2407  return 0;
2408  }
2409  return 0;
2410 
2411 }
2412 
2413 int daq_getlastfilename( std::ostream& os)
2414 {
2415  if (get_previous_filename().empty() ) return -1;
2416  os << get_previous_filename() << endl;
2417  return 0;
2418 }
2419 
2420 int daq_getlastevent_number( std::ostream& os)
2421 {
2422  os << Event_number_at_last_end << endl;
2423  return Event_number_at_last_end;
2424 }
2425 
2427 {
2428  if ( DAQ_RUNNING ) return 1;
2429  return 0;
2430 }
2431 
2432 
2433 // the routines to deal with the remote server if we are logging this way.
2434 
2435 // 1) we open a socket with open_serverSocket
2436 // 2) make the server open a file with server_send_beginrun_sequence
2437 // 3) ... send buffers
2438 // 4) send end-run with server_send_endrun_sequence
2439 // 5) rinse and repeat 2...4
2440 // 6) tell the server we are done with server_send_close_sequence
2441 
2442 int open_serverSocket(const char * hostname, const int port)
2443 {
2444 
2445  // extern int h_errno;
2446  int sockfd = 0;
2447 
2448  struct addrinfo hints;
2449  memset(&hints, 0, sizeof(struct addrinfo));
2450 
2451  hints.ai_family = AF_INET;
2452  struct addrinfo *result, *rp;
2453 
2454  char port_str[512];
2455  sprintf(port_str, "%d", port);
2456 
2457 
2458  int status = getaddrinfo(hostname, port_str,
2459  &hints,
2460  &result);
2461 
2462  if ( status < 0)
2463  {
2464  cout << __FILE__<< " " << __LINE__ << " " << hostname << " " << gai_strerror(status) << endl;
2465  return status;
2466  }
2467 
2468  for (rp = result; rp != NULL; rp = rp->ai_next)
2469  {
2470 
2471  if ( (sockfd = socket(result->ai_family, result->ai_socktype,
2472  result->ai_protocol) ) > 0 )
2473  {
2474  break;
2475  }
2476  }
2477 
2478  if ( sockfd < 0)
2479  {
2480  std::cout << __FILE__ << " " << __LINE__ << " error in socket" << std::endl;
2481  perror("socket");
2482  freeaddrinfo(result);
2483  return -1;
2484  }
2485 
2486  int xs = 512*1024;
2487 
2488  int s = setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &xs, sizeof(int));
2489  if (s) std::cout << "setsockopt status = " << s << std::endl;
2490 
2491  if ( connect(sockfd, rp->ai_addr, rp->ai_addrlen) < 0 )
2492  {
2493  std::cout << __FILE__ << " " << __LINE__ << " error in connect" << std::endl;
2494  perror("connect");
2495  freeaddrinfo(result);
2496  return -1;
2497  }
2498 
2499  freeaddrinfo(result);
2500  return sockfd;
2501 }
2502 
2503 int server_send_beginrun_sequence(const char * filename, const int runnumber, int fd)
2504 {
2505  int opcode;
2506  int status;
2507  int len;
2508 
2509  // std::cout << __FILE__ << " " << __LINE__ << " sending " << CTRL_SENDFILENAME << std::endl ;
2510  opcode = htonl(CTRL_SENDFILENAME);
2511  status = writen(fd, (char *) &opcode, sizeof(int));
2512  if ( status != sizeof(int)) return -1;
2513 
2514  len = strlen(filename);
2515  opcode = htonl(len);
2516  //std::cout << __FILE__ << " " << __LINE__ << " sending " << filename << " len = " << len<< std::endl ;
2517 
2518  status = writen(fd, (char *) &opcode, sizeof(int));
2519  if ( status != sizeof(int)) return -1;
2520 
2521  status = writen(fd, (char *) filename, len);
2522  if ( status != len) return -1;
2523 
2524  status = readn (fd, (char *) &opcode, sizeof(int) );
2525  if ( status != sizeof(int) || ntohl(opcode) != CTRL_REMOTESUCCESS )
2526  {
2527  perror("read_ack");
2528  return -1;
2529  }
2530 
2531  //std::cout << __FILE__ << " " << __LINE__ << " sending " << CTRL_BEGINRUN << std::endl ;
2532  opcode = htonl(CTRL_BEGINRUN);
2533  status = writen(fd, (char *) &opcode, sizeof(int));
2534  if ( status != sizeof(int)) return -1;
2535 
2536  //std::cout << __FILE__ << " " << __LINE__ << " sending " << runnumber << std::endl ;
2537  opcode = htonl(runnumber);
2538  status = writen(fd, (char *) &opcode, sizeof(int));
2539  if ( status != sizeof(int)) return -1;
2540 
2541  //std::cout << __FILE__ << " " << __LINE__ << " waiting for acknowledge... " << std::endl ;
2542  status = readn (fd, (char *) &opcode, sizeof(int) );
2543  //std::cout << __FILE__ << " " << __LINE__ << " returned status " << ntohl(opcode) << endl;
2544  if ( status != sizeof(int) )
2545  {
2546  perror("read_ack");
2547  return -1;
2548  }
2549  if (ntohl(opcode) != CTRL_REMOTESUCCESS )
2550  {
2551  return -1;
2552  }
2553 
2554  return 0;
2555 }
2556 
2558 {
2559  int opcode;
2560  int status;
2561  int len;
2562 
2563  // std::cout << __FILE__ << " " << __LINE__ << " sending " << CTRL_SENDFILENAME << std::endl ;
2564  opcode = htonl(CTRL_ROLLOVER);
2565  status = writen(fd, (char *) &opcode, sizeof(int));
2566  if ( status != sizeof(int)) return -1;
2567 
2568  len = strlen(filename);
2569  opcode = htonl(len);
2570  //std::cout << __FILE__ << " " << __LINE__ << " sending " << filename << " len = " << len<< std::endl ;
2571  status = writen(fd, (char *) &opcode, sizeof(int));
2572  if ( status != sizeof(int)) return -1;
2573 
2574  status = writen(fd, (char *) filename, len);
2575  if ( status != len) return -1;
2576 
2577  status = readn (fd, (char *) &opcode, sizeof(int) );
2578  if ( status != sizeof(int) || ntohl(opcode) != CTRL_REMOTESUCCESS )
2579  {
2580  perror("read_ack");
2581  return -1;
2582  }
2583 
2584  return 0;
2585 }
2586 
2587 
2589 {
2590  int opcode;
2591  int status;
2592 
2593  opcode = htonl(CTRL_ENDRUN);
2594  status = writen (fd, (char *)&opcode, sizeof(int) );
2595  if ( status != sizeof(int)) return -1;
2596 
2597  // std::cout << __FILE__ << " " << __LINE__ << " waiting for acknowledge... " << std::endl ;
2598  status = readn (fd, (char *) &opcode, sizeof(int) );
2599  if ( status != sizeof(int) || ntohl(opcode) != CTRL_REMOTESUCCESS )
2600  {
2601  perror("read_ack");
2602  return -1;
2603  }
2604  //std::cout << __FILE__ << " " << __LINE__ << " ok " << std::endl;
2605 
2606  return 0;
2607 }
2608 
2610 {
2611  int opcode;
2612  int status;
2613 
2614  opcode = htonl(CTRL_CLOSE);
2615  status = writen (fd, (char *)&opcode, sizeof(int) );
2616  if ( status != sizeof(int)) return -1;
2617 
2618  return 0;
2619 }
2620 
2621 // int update_fileSQLinfo()
2622 // {
2623 // int sfd = get_sqlfd();
2624 // md5_byte_t md5_digest[16];
2625 // char digest_string[33];
2626 
2627 // if ( sfd)
2628 // {
2629 // md5_finish(&md5state, md5_digest);
2630 // for ( int i=0; i< 16; i++)
2631 // {
2632 // sprintf ( &digest_string[2*i], "%02x", md5_digest[i]);
2633 // }
2634 // digest_string[32] = 0;
2635 
2636 // std::ostringstream out;
2637 // out << "update $FILETABLE set md5sum=\'" << digest_string << "\'"
2638 // << ",lastevent=" << Event_number_at_last_write -1
2639 // << ",events=" << Event_number_at_last_write - Event_number_at_last_open
2640 // << " where runnumber=" << TheRun
2641 // << " and filename=\'" << CurrentFilename << "\';" << std::endl;
2642 // write (sfd, out.str().c_str(), out.str().size());
2643 // }
2644 
2645 // return 0;
2646 // }
2647 
2648 // "what" refers to the various phases, new, update, end
2649 //int daq_generate_json (const int flag, const std::string what, const std::string type, std::ostream& os)
2650 int daq_generate_json (const int flag)
2651 {
2652 #ifdef HAVE_MOSQUITTO_H
2653 
2654  if ( ! mqtt) return 0;
2655 
2656  std::ostringstream out;
2657 
2658  if (flag == 0) // we start a new entry
2659  {
2660 
2661  out << "{\"file\": [" << endl;
2662  out << " { \"what\":\"new\","
2663  << " \"runnumber\":" << TheRun << ","
2664  << " \"host\":\"" << shortHostName << "\","
2665  << " \"runtype\":\"" << TheRunType << "\","
2666  << " \"CurrentFileName\":\"" << CurrentFilename << "\","
2667  << " \"CurrentFileSequence\":" << current_filesequence << ","
2668  << " \"FirstEventNr\":" << Event_number_at_last_open << ","
2669  << " \"time\": " << time(0) << " }" << endl;
2670  out << "] }" << endl;
2671  }
2672  else // update an entry
2673  {
2674  md5_byte_t md5_digest[16];
2675  char digest_string[33];
2676 
2677  md5_finish(&md5state, md5_digest);
2678  for ( int i=0; i< 16; i++)
2679  {
2680  sprintf ( &digest_string[2*i], "%02x", md5_digest[i]);
2681  }
2682  digest_string[32] = 0;
2683 
2684  out << "{\"file\": [" << endl;
2685  out << " { \"what\":\"" << "update"
2686  << "\", \"runnumber\":" << TheRun << ","
2687  << " \"host\":\"" << shortHostName << "\","
2688  << " \"CurrentFileName\":\"" << CurrentFilename << "\","
2689  << " \"MD5\":\"" << digest_string << "\","
2690  << " \"LastEventNr\":" << Event_number_at_last_write -1 << ","
2691  << " \"NrEvents\":" << Event_number_at_last_write - Event_number_at_last_open << ","
2692  << " \"time\":" << time(0) << " }" << endl;
2693  out << "] }" << endl;
2694  }
2695 
2696  mqtt->send(out.str());
2697 
2698 #endif
2699 
2700  return 0;
2701 }