26 #include <sys/types.h>
27 #include <sys/socket.h>
28 #include <arpa/inet.h>
29 #include <netinet/if_ether.h>
30 #include <netinet/in.h>
31 #include <netinet/ip.h>
33 #include <sys/ioctl.h>
34 #include <netpacket/packet.h>
35 #include <sys/socket.h>
52 #include "rcdaq_rpc.h"
55 #ifdef HAVE_MOSQUITTO_H
90 #define DAQ_TRIGGER 0x01
91 #define DAQ_COMMAND 0x02
92 #define DAQ_SPECIAL 0x04
96 #define COMMAND_INIT 1
97 #define COMMAND_BEGIN 2
99 #define COMMAND_FINISH 4
100 #define COMMAND_OPENP 5
134 #ifdef HAVE_MOSQUITTO_H
153 #define MAXEVENTID 32
211 #define MONITORINGPORT 9930
273 pthread_mutex_lock(&
M_cout);
274 cout <<
"**interrupt " << endl;
275 pthread_mutex_unlock(&
M_cout);
300 cout <<
"error in event thread create " << status << endl;
331 unsigned long long x = n_mb;
341 os <<
"Run is active" << endl;
353 os <<
"Run is active" << endl;
365 os <<
"Run is active" << endl;
386 os <<
"Run is active" << endl;
392 os <<
"Cannot switch format after devices are defined" << endl;
421 setenv (
"DAQ_ELOGHOST", host , 1);
423 sprintf(str,
"%d", port);
424 setenv (
"DAQ_ELOGPORT", str , 1);
425 setenv (
"DAQ_ELOGLOGBOOK", logname , 1);
432 int readn (
int fd,
char *ptr,
const int nbytes)
439 int fd_is_socket = 0;
440 if ( S_ISSOCK(statbuf.st_mode) ) fd_is_socket = 1;
446 if ( fd_is_socket) nread = recv (fd, ptr, nleft, MSG_NOSIGNAL);
447 else nread =
read (fd, ptr, nleft);
458 return (nbytes-nleft);
470 int fd_is_socket = 0;
471 if ( S_ISSOCK(statbuf.st_mode) ) fd_is_socket = 1;
479 if ( fd_is_socket) nwritten = send (fd, ptr, nleft, MSG_NOSIGNAL);
480 else nwritten =
write (fd, ptr, nleft);
491 return (nbytes-nleft);
510 int ifd =
open(d, O_WRONLY | O_CREAT | O_EXCL | O_LARGEFILE ,
511 S_IRWXU | S_IROTH | S_IRGRP );
514 pthread_mutex_lock(&
M_cout);
515 cout <<
" error opening file " << d << endl;
517 pthread_mutex_unlock(&
M_cout);
552 if (status)
return -1;
578 std::ostream *
os = (std::ostream *) arg;
601 unsigned long *t_args = (
unsigned long *) arg;
604 pthread_mutex_lock(&
M_cout);
605 cout <<
"shutting down... " << t_args[0] <<
" " << t_args[1] << endl;
609 pthread_mutex_unlock(&
M_cout);
611 svc_unregister ( t_args[0], t_args[1]);
613 flock(pid_fd, LOCK_UN);
630 struct sockaddr_in server_addr;
633 if ( (sockfd = socket(PF_INET, SOCK_STREAM, 0)) < 0 )
635 pthread_mutex_lock(&
M_cout);
636 cout <<
"cannot create socket" << endl;
637 pthread_mutex_unlock(&
M_cout);
640 bzero( (
char*)&server_addr,
sizeof(server_addr));
641 server_addr.sin_family = PF_INET;
642 server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
646 int status = bind(sockfd, (
struct sockaddr*) &server_addr,
sizeof(server_addr));
652 pthread_mutex_lock(&
M_cout);
654 pthread_mutex_unlock(&
M_cout);
659 struct sockaddr_in out;
664 socklen_t len =
sizeof(
out);
665 dd_fd = accept(sockfd, (
struct sockaddr *) &out, &len);
668 pthread_mutex_lock(&
M_cout);
669 cout <<
"error in accept socket" << endl;
670 pthread_mutex_unlock(&
M_cout);
674 char *host =
new char[64];
676 getnameinfo((
struct sockaddr *) &out,
sizeof(
struct sockaddr_in), host, 64,
738 if ( (status =
readn (fd, (
char *) &max_length, 4) ) <= 0)
744 max_length = ntohl(max_length);
749 if ( ! status && ( status =
readn (fd, (
char *) &reply, 4) ) <= 0)
850 cout << __FILE__ <<
" " << __LINE__ <<
" rolling over " << d << endl;
882 pthread_mutex_unlock(&
SendSem);
902 int status = fscanf(fp,
"%d", &r);
903 if ( status != 1) r = 0;
950 std::cerr <<
"error running the runnumber app" << std::endl;
954 int len = fread(in, 1, 64, fp);
957 std::stringstream
s (in);
986 #ifdef HAVE_MOSQUITTO_H
989 std::cout << __FILE__ <<
" " << __LINE__ <<
" mqtt host " << host <<
" port " << port << endl;
990 if (mqtt)
delete mqtt;
992 if (strcasecmp(host,
"None") == 0)
1002 if ( mqtt->Status())
1008 os <<
"Could not connect to host " << host <<
" on port " << port << endl;
1022 os <<
" No MQTT host defined" << endl;
1025 os <<
" Host " << mqtt->GetHostName() <<
" " <<
" port " << mqtt->GetPort() << endl;
1036 std::map <string,string>::const_iterator iter =
RunTypes.begin();
1037 for ( ; iter !=
RunTypes.end(); ++iter)
1039 if ( iter->first == _type )
1046 os <<
" Run type " << type <<
" is not defined " << endl;
1052 std::map <string,string>::const_iterator iter =
RunTypes.begin();
1053 for ( ; iter !=
RunTypes.end(); ++iter)
1059 os << iter->first <<
" - " << iter->second << endl;
1064 os << iter->first << endl;
1091 os <<
" -- defined Run Types: ";
1098 if ( flag &&
RunTypes.size() == 0 )
1100 os <<
" (none)" <<endl;
1103 if (flag ==2) os << endl;
1105 std::map <string,string>::const_iterator iter =
RunTypes.begin();
1106 for ( ; iter !=
RunTypes.end(); ++iter)
1110 os <<
" " << setw(12) << iter->first <<
" - " << iter->second << endl;
1114 os << iter->first << endl;
1150 time_t now_time =
time(0);
1155 last_volume_time = now_time;
1156 return mb_per_second;
1161 time_t now_time =
time(0);
1166 last_speed_time = now_time;
1167 return events_per_second;
1173 int irun = *(
int*)arg;
1178 cout << __FILE__ <<
" " << __LINE__ <<
" asynchronous begin run failed" << endl;
1188 static unsigned int b_arg;
1198 os <<
MyHostName <<
"Run " << irun <<
" begin requested" << endl;
1209 int status = pthread_create(&t, NULL,
1215 cout <<
"begin_run failed " << status << endl;
1216 os <<
"begin_run failed " << status << endl;
1227 os <<
MyHostName <<
"Run is already active" << endl;;
1233 os <<
MyHostName <<
" No automatic Run Numbers in Run Control Mode" << endl;;
1239 os <<
MyHostName <<
"*** Previous error with server connection" << endl;;
1305 os <<
MyHostName <<
"Could not open remote output file - Run " <<
TheRun <<
" not started" << endl;;
1325 os <<
MyHostName <<
"Could not open output file - Run " <<
TheRun <<
" not started" << endl;;
1339 cout <<
"starting run " <<
TheRun <<
" at " <<
time(0) << endl;
1347 int wantedmaxsize = 0;
1354 if ( x > wantedmaxsize ) wantedmaxsize =
x;
1361 os <<
MyHostName <<
"Cannot start run - event sizes larger than buffer, size "
1362 << wantedmaxsize/1024 <<
" Buffer size "
1379 sprintf( str,
"%d",
TheRun);
1380 setenv (
"DAQ_RUNNUMBER", str, 1);
1381 setenv (
"DAQ_FILERULE",
TheFileRule.c_str() , 1);
1384 setenv (
"DAQ_STARTTIME", str , 1);
1418 os <<
MyHostName <<
"Run is not active" << endl;;
1429 int status = pthread_create(&t, NULL,
1435 cout <<
"end_run failed " << status << endl;
1436 os <<
"end_run failed " << status << endl;
1477 os <<
MyHostName <<
"Run is not active" << endl;;
1503 std::cout << __FILE__ <<
" " << __LINE__ <<
" error in closing connection... " << std::endl;
1521 unsetenv (
"DAQ_RUNNUMBER");
1522 unsetenv (
"DAQ_FILENAME");
1523 unsetenv (
"DAQ_STARTTIME");
1553 for ( i = 0; i <
n; i++)
1605 cout << __LINE__ <<
" calling daq_end" << endl;
1616 cout <<
"Run not active" << endl;
1703 cout <<
" automatic end after " <<
max_volume /(1024*1024) <<
" Mb" << endl;
1709 cout <<
" automatic end after " <<
max_events<<
" events" << endl;
1733 (*d_it)->rearm(etype);
1752 size += (*d_it)->max_length(i) ;
1757 cout <<
"Event id " << i <<
" size " << size << endl;
1766 os <<
MyHostName <<
"Run is active" << endl;;
1787 os <<
MyHostName <<
"Run is active" << endl;;
1810 os <<
MyHostName <<
"Run is active" << endl;;
1843 if ( ! theport) theport = 5001;
1850 cout << __FILE__<<
" " << __LINE__ <<
" error connecting to server " <<
daq_server_name <<
" on port " << theport << endl;
1854 cout << __FILE__<<
" " << __LINE__ <<
" error connecting to server " <<
daq_server_name <<
" on port " << theport <<
" " << gai_strerror(
TheServerFD) << endl;
1872 os <<
MyHostName <<
"Run is active" << endl;;
1881 static unsigned long t_args[3];
1883 t_args[1] = versionnumber;
1889 int status = pthread_create(&t, NULL,
1895 cout <<
"cannot shut down " << status << endl;
1896 os <<
MyHostName <<
"cannot shut down " << status << endl;
1918 os <<
MyHostName <<
"Run is active" << endl;;
1933 os <<
MyHostName <<
"Run is active" << endl;;
1938 std::cout << __FILE__ <<
" " << __LINE__ <<
" error in closing connection... " << std::endl;
1955 (*d_it)->identify(os);
1967 os <<
MyHostName <<
"Run is active" << endl;;
1979 os <<
MyHostName <<
"Readlist cleared" << endl;
1993 char hostname[HOST_NAME_MAX];
1994 status = gethostname(hostname, HOST_NAME_MAX);
2016 pthread_mutex_init( &
SendSem, 0);
2023 pthread_mutex_lock( &
SendSem);
2043 status = pthread_create(&
ThreadId, NULL,
2049 cout <<
"error in write thread create " << status << endl;
2054 pthread_mutex_lock(&
M_cout);
2055 cout <<
"write thread created" << endl;
2056 pthread_mutex_unlock(&
M_cout);
2059 status = pthread_create(&
ThreadMon, NULL,
2064 cout <<
"error in send monitor data thread create " << status << endl;
2069 pthread_mutex_lock(&
M_cout);
2070 cout <<
"monitor thread created" << endl;
2071 pthread_mutex_unlock(&
M_cout);
2074 status = pthread_create(&
ThreadMon, NULL,
2079 cout <<
"error in send monitor data thread create " << status << endl;
2084 pthread_mutex_lock(&
M_cout);
2085 cout <<
"monitor request thread created" << endl;
2086 pthread_mutex_unlock(&
M_cout);
2120 return v / (1024*1024);
2142 volume /= (1024*1024);
2147 case STATUSFORMAT_SHORT:
2157 <<
" \"" <<
MyName <<
"\"" << endl;
2165 <<
" \"" <<
MyName <<
"\"" << endl;
2171 case STATUSFORMAT_NORMAL:
2176 <<
" Volume: " << volume;
2185 os <<
" Logging enabled";
2191 os <<
" Logging disabled";
2205 os <<
" Logging enabled";
2217 os <<
" Logging disabled";
2239 os <<
" Run Number: " <<
TheRun << endl;
2241 os <<
" Run Volume: " << volume <<
" MB"<< endl;
2268 os <<
" Volume Limit: " <<
max_volume /(1024 *1024) <<
" Mb" << endl;
2272 os <<
" Event Limit: " <<
max_events << endl;
2286 os <<
" Logging enabled";
2299 os <<
" Logging disabled" << endl;
2316 os <<
" Volume Limit: " <<
max_volume /(1024 *1024) <<
" Mb" << endl;
2320 os <<
" Event Limit: " <<
max_events << endl;
2322 if (
TriggerH ) os <<
" have a trigger object" << endl;
2326 os <<
" Run Control Mode enabled " << endl;
2338 os <<
" Web control Port: " <<
ThePort << endl;
2342 os <<
" No Web control defined" << endl;
2347 os <<
" Writing legacy format " << endl;
2356 os <<
" Elog: not defined" << endl;
2358 #ifdef HAVE_MOSQUITTO_H
2361 os <<
" mqtt: " << mqtt->GetHostName() <<
" Port " << mqtt->GetPort() << endl;
2400 os <<
MyHostName <<
"error in web service creation " << status << endl;
2406 os <<
MyHostName <<
"web service created" << endl;
2448 struct addrinfo hints;
2449 memset(&hints, 0,
sizeof(
struct addrinfo));
2451 hints.ai_family = AF_INET;
2452 struct addrinfo *result, *rp;
2455 sprintf(port_str,
"%d", port);
2458 int status = getaddrinfo(hostname, port_str,
2464 cout << __FILE__<<
" " << __LINE__ <<
" " << hostname <<
" " << gai_strerror(status) << endl;
2468 for (rp = result; rp != NULL; rp = rp->ai_next)
2471 if ( (sockfd = socket(result->ai_family, result->ai_socktype,
2472 result->ai_protocol) ) > 0 )
2480 std::cout << __FILE__ <<
" " << __LINE__ <<
" error in socket" << std::endl;
2482 freeaddrinfo(result);
2488 int s = setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &xs,
sizeof(
int));
2489 if (s) std::cout <<
"setsockopt status = " << s << std::endl;
2491 if ( connect(sockfd, rp->ai_addr, rp->ai_addrlen) < 0 )
2493 std::cout << __FILE__ <<
" " << __LINE__ <<
" error in connect" << std::endl;
2495 freeaddrinfo(result);
2499 freeaddrinfo(result);
2511 status =
writen(fd, (
char *) &opcode,
sizeof(
int));
2512 if ( status !=
sizeof(
int))
return -1;
2514 len = strlen(filename);
2515 opcode = htonl(len);
2518 status =
writen(fd, (
char *) &opcode,
sizeof(
int));
2519 if ( status !=
sizeof(
int))
return -1;
2521 status =
writen(fd, (
char *) filename, len);
2522 if ( status != len)
return -1;
2524 status =
readn (fd, (
char *) &opcode,
sizeof(
int) );
2533 status =
writen(fd, (
char *) &opcode,
sizeof(
int));
2534 if ( status !=
sizeof(
int))
return -1;
2537 opcode = htonl(runnumber);
2538 status =
writen(fd, (
char *) &opcode,
sizeof(
int));
2539 if ( status !=
sizeof(
int))
return -1;
2542 status =
readn (fd, (
char *) &opcode,
sizeof(
int) );
2544 if ( status !=
sizeof(
int) )
2565 status =
writen(fd, (
char *) &opcode,
sizeof(
int));
2566 if ( status !=
sizeof(
int))
return -1;
2568 len = strlen(filename);
2569 opcode = htonl(len);
2571 status =
writen(fd, (
char *) &opcode,
sizeof(
int));
2572 if ( status !=
sizeof(
int))
return -1;
2574 status =
writen(fd, (
char *) filename, len);
2575 if ( status != len)
return -1;
2577 status =
readn (fd, (
char *) &opcode,
sizeof(
int) );
2594 status =
writen (fd, (
char *)&opcode,
sizeof(
int) );
2595 if ( status !=
sizeof(
int))
return -1;
2598 status =
readn (fd, (
char *) &opcode,
sizeof(
int) );
2615 status =
writen (fd, (
char *)&opcode,
sizeof(
int) );
2616 if ( status !=
sizeof(
int))
return -1;
2652 #ifdef HAVE_MOSQUITTO_H
2654 if ( ! mqtt)
return 0;
2656 std::ostringstream
out;
2661 out <<
"{\"file\": [" << endl;
2662 out <<
" { \"what\":\"new\","
2663 <<
" \"runnumber\":" <<
TheRun <<
","
2669 <<
" \"time\": " <<
time(0) <<
" }" << endl;
2670 out <<
"] }" << endl;
2675 char digest_string[33];
2678 for (
int i=0;
i< 16;
i++)
2680 sprintf ( &digest_string[2*
i],
"%02x", md5_digest[i]);
2682 digest_string[32] = 0;
2684 out <<
"{\"file\": [" << endl;
2685 out <<
" { \"what\":\"" <<
"update"
2686 <<
"\", \"runnumber\":" <<
TheRun <<
","
2689 <<
" \"MD5\":\"" << digest_string <<
"\","
2692 <<
" \"time\":" <<
time(0) <<
" }" << endl;
2693 out <<
"] }" << endl;
2696 mqtt->send(out.str());