Analysis Software
Documentation for sPHENIX simulation software
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
sfs.cc
Go to the documentation of this file. Or view the newest version in sPHENIX GitHub for file sfs.cc
1 
2 #include <string.h>
3 #include <sys/types.h>
4 #include <sys/stat.h>
5 #include <fcntl.h>
6 
7 #include <sys/time.h>
8 #include <unistd.h>
9 #include <stdlib.h>
10 #include <errno.h>
11 #include <unistd.h>
12 #include <signal.h>
13 #include <dlfcn.h>
14 #include <sys/ipc.h>
15 
16 #include <pthread.h>
17 
18 #ifdef HAVE_BSTRING_H
19 #include <bstring.h>
20 #else
21 #include <strings.h>
22 #endif
23 
24 #include <sys/time.h>
25 
26 #include <sys/socket.h>
27 #include <net/if.h>
28 #include <netdb.h>
29 
30 #include <sys/wait.h>
31 #include <sys/resource.h>
32 
33 
34 #include <sys/types.h>
35 
36 #ifdef SunOS
37 #include <sys/filio.h>
38 #endif
39 
40 #include <sys/stat.h>
41 
42 #include <netinet/in.h>
43 #include <arpa/inet.h>
44 #include <netdb.h>
45 #include <sys/ioctl.h>
46 
47 #include <stdio.h>
48 #include <iostream>
49 #include <iomanip>
50 
51 #ifdef HAVE_GETOPT_H
52 #include <getopt.h>
53 #endif
54 
55 
56 typedef unsigned int PHDWORD;
57 typedef unsigned short SWORD;
58 typedef unsigned char BYTE;
59 typedef unsigned int UINT;
60 
61 #define BUFFERBLOCKSIZE 8192U
62 
63 
64 #define CTRL_BEGINRUN 1
65 #define CTRL_ENDRUN 2
66 #define CTRL_DATA 3
67 #define CTRL_CLOSE 4
68 #define CTRL_SENDFILENAME 5
69 #define CTRL_ROLLOVER 6
70 
71 #define CTRL_REMOTESUCCESS 100
72 #define CTRL_REMOTEFAIL 101
73 
74 #define ROLE_RECEIVED 0
75 #define ROLE_WRITTEN 1
76 
77 
78 
79 // initial size 16 Mbytes ( /4 in int units)
80 #define INITIAL_SIZE (4*1024*1024)
81 
82 
83 typedef struct
84 {
85  int dirty;
86  int role;
87  int bytecount;
91 
92 int run_number = 0;
93 int verbose = 0;
94 int sockfd = 0;
95 int dd_fd = 0;
96 
97 
98 pthread_mutex_t M_cout;
99 
100 pthread_mutex_t M_write;
101 pthread_mutex_t M_done;
102 
103 
104 pthread_t ThreadId;
105 pthread_t tid;
106 int output_fd = -1;
107 
108 int the_port = 5001;
109 
110 int do_not_write = 0;
111 
112 int RunIsActive = 0;
113 int NumberWritten = 0;
114 int file_open = 0;
115 
116 
117 int readn(int , char *, int);
118 int writen(int , char *, int);
119 
120 #if defined(SunOS) || defined(Linux)
121 void sig_handler(int);
122 #else
123 void sig_handler(...);
124 #endif
125 
126 void *writebuffers ( void * arg);
127 int handle_this_child( pid_t pid, const std::string &host);
128 in_addr_t find_address_from_interface(const char *);
129 
130 void cleanup(const int exitstatus);
131 
132 
135 
136 
137 using namespace std;
138 
139 
140 void exitmsg()
141 {
142  cout << "** This is the Super Fast Server :-)." << endl;
143  cout << "** usage: sfs " << endl;
144  cout << " -d disable database logging [ db not yet implemented ]" << endl;
145  cout << " -b interface bind only to this interface" << endl;
146  cout << " -p number use this port (default 5001)" << endl;
147  cout << " -v increase verbosity" << endl;
148  cout << " -x do not write data to disk (testing)" << endl;
149  cout << " Examples:" << endl;
150  cout << " sfs -b ens801f0 -- listen only on that interface" << endl;
151  cout << " sfs -p 5002 -- listen on port 5002" << endl;
152 
153  exit(0);
154 }
155 
156 
157 //char *s_opcode[] = {
158 // "Invalid code",
159 // "CTRL_BEGINRUN",
160 // "CTRL_ENDRUN",
161 // "CTRL_DATA",
162 // "CTRL_CLOSE",
163 // "CTRL_SENDFILENAME"};
164 
166 
167 int main( int argc, char* argv[])
168 {
169 
170 
171 #if defined(SunOS) || defined(Linux)
172  struct sockaddr client_addr;
173 #else
174  struct sockaddr_in client_addr;
175 #endif
176  struct sockaddr_in server_addr;
177 
178 #if defined(SunOS)
179  int client_addr_len = sizeof(client_addr);
180 #else
181  unsigned int client_addr_len = sizeof(client_addr);
182 #endif
183 
184  pthread_mutex_init(&M_cout, 0);
185  pthread_mutex_init(&M_write, 0);
186  pthread_mutex_init(&M_done, 0);
187 
188  pthread_mutex_lock(&M_write);
189 
190  // by default, we listen on all interfaces
191  in_addr_t listen_address = htonl(INADDR_ANY);
192 
193  char c;
194 
195  while ((c = getopt(argc, argv, "hvdxb:p:")) != EOF)
196  {
197  switch (c)
198  {
199 
200  case 'v': // verbose
201  verbose++;
202  break;
203 
204  case 'h':
205  exitmsg();
206  break;
207 
208  case 'b': // bind to this interface
209  listen_address = find_address_from_interface(optarg);
210  listen_interface = optarg;
211  break;
212 
213  case 'p': // port number
214  if ( !sscanf(optarg, "%d", &the_port) ) exitmsg();
215  break;
216 
217  case 'd': // no database
218  // databaseflag=1;
219  // cout << "database access enabled" << endl;
220  break;
221 
222  case 'x': // no writing
223  do_not_write = 1;
224  break;
225 
226 
227  }
228  }
229 
230 
231 
232  if (argc > optind)
233  {
234  sscanf (argv[optind],"%d", &the_port);
235  }
236 
237  // ------------------------
238  // now set up the sockets
239 
240  if ( (sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0 )
241  {
242  cleanup(1);
243  }
244  // int xs = 64*1024+21845;
245  int xs = 1024*1024;
246 
247  int s = setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF,
248  &xs, sizeof(xs));
249 
250  if (s)
251  {
252  perror("Setsockopt:");
253  }
254 
255  memset( &server_addr, 0, sizeof(server_addr));
256  server_addr.sin_family = AF_INET;
257  server_addr.sin_addr.s_addr = listen_address;
258  server_addr.sin_port = htons(the_port);
259 
260  int i;
261 
262  if ( ( i = bind(sockfd, (struct sockaddr*) &server_addr, sizeof(server_addr))) < 0 )
263  {
264  perror(" bind ");
265  cleanup(1);
266  }
267 
268  if ( ( i = listen(sockfd, 100) ) )
269  {
270  perror(" listen ");
271  cleanup(1);
272  }
273 
274  if (verbose)
275  {
276  cout << " listening on port " << the_port;
277  if (! listen_interface.empty() ) cout << " on interface " << listen_interface;
278  cout << endl;
279  }
280 
281  signal(SIGCHLD, SIG_IGN);
282 
283  pid_t pid;
284  struct sockaddr_in out;
285 
286  std::string host;
287 
288  while (sockfd > 0)
289  {
290 
291  client_addr_len = sizeof(out);
292  dd_fd = accept(sockfd, (struct sockaddr *) &out, &client_addr_len);
293  if ( dd_fd < 0 )
294  {
295  pthread_mutex_lock(&M_cout);
296  cout << "error in accept socket" << endl;
297  pthread_mutex_unlock(&M_cout);
298  perror (" accept" );
299  cleanup(1);
300  exit(1);
301  }
302 
303 
304  char h[512];
305  getnameinfo((struct sockaddr *) &out, sizeof(struct sockaddr_in), h, 511,
306  NULL, 0, NI_NOFQDN);
307  host = h;
308 
309  if (verbose)
310  {
311  cout << " new connection from " << host << " at " << time(0) << endl;
312  }
313 
314 
315  if ( (pid = fork()) == 0 )
316  {
317  close(sockfd);
318  return handle_this_child( pid, host);
319  }
320  else
321  {
322  close (dd_fd);
323  }
324  }
325 }
326 
327 int handle_this_child( pid_t pid, const std::string &host)
328 {
329 
330 
331  int controlword;
332  int local_runnr = 0;
333 
334  bufferstructure B0;
335  bufferstructure B1;
336 
337  bf_being_received = &B0;
338  bf_being_written = &B1;
339  bufferstructure *bf_temp;
340 
341  B0.bf = new PHDWORD[INITIAL_SIZE];
343  B0.dirty = 0;
344  B0.role= ROLE_RECEIVED;
345  B0.bytecount = 0;
346 
347  B1.bf = new PHDWORD[INITIAL_SIZE];
349  B1.dirty = 0;
350  B1.role= ROLE_RECEIVED;
351  B1.bytecount = 0;
352 
353  int i;
354 
355  // we make a thread that will write out our buffers
356  i = pthread_create(&ThreadId, NULL,
357  writebuffers,
358  (void *) 0);
359  if (i )
360  {
361  cout << "error in thread create " << i << endl;
362  perror("Thread ");
363  cleanup(1);
364  }
365  // else
366  // {
367  // pthread_mutex_lock(&M_cout);
368  // cout << "write thread created" << endl;
369  // pthread_mutex_unlock(&M_cout);
370  // }
371 
372  // should be the default, but set it to blocking mode anyway
373  i = ioctl (dd_fd, FIONBIO, 0);
374 
375  // find out where we were contacted from
376 
377 
378  int status;
379  int xx;
380 
381  int go_on = 1;
382  while ( go_on)
383  {
384  if ( (status = readn (dd_fd, (char *) &xx, sizeof(xx)) ) <= 0)
385  {
386  cout << "error in read from socket" << endl;
387  perror ("read " );
388  cleanup(1);
389  }
390 
391  controlword = ntohl(xx);
392 
393  // cout << endl;
394  // cout << __FILE__ << " " << __LINE__ << " controlword = " << controlword << " ntew: " << xx << " " ;
395  // if ( controlword >=0 && controlword <=5) cout << s_opcode[controlword];
396  // cout << endl;
397 
398  char *p;
399  char filename[1024];
400  int value, len;
401 
402  switch (controlword)
403  {
404 
405  case CTRL_SENDFILENAME:
406  // read the length of what we are about to get
407  readn (dd_fd, (char *) &len, sizeof(int));
408  len = ntohl(len);
409  // acknowledge... or not
410  //cout << " filename len = " << len << endl;
411  if ( len >= 1023)
412  {
413  i = htonl(CTRL_REMOTEFAIL);
414  writen (dd_fd, (char *)&i, sizeof(i));
415  break;
416  }
417  value = readn (dd_fd, filename, len);
418  filename[value] = 0;
419  //cout << " filename is " << filename << endl;
420 
421  i = htonl(CTRL_REMOTESUCCESS);
422  writen (dd_fd, (char *)&i, sizeof(i));
423 
424  break;
425 
426  case CTRL_ROLLOVER:
427 
428  // after we receive CTRL_ROLLOVER, we get
429  // - the length of the filename
430  // the actual filename
431  // then we send back the status
432 
433  // read the length of what we are about to get
434  readn (dd_fd, (char *) &len, sizeof(int));
435  len = ntohl(len);
436  // acknowledge... or not
437  // cout << __FILE__ << " " << __LINE__ << " filename len = " << len << endl;
438  if ( len >= 1023)
439  {
440  i = htonl(CTRL_REMOTEFAIL);
441  writen (dd_fd, (char *)&i, sizeof(i));
442  break;
443  }
444  value = readn (dd_fd, filename, len);
445  filename[value] = 0;
446  // cout << __FILE__ << " " << __LINE__ << " filename is " << filename << endl;
447 
448  if (! do_not_write)
449  {
450  close ( output_fd);
451  output_fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC | O_EXCL | O_LARGEFILE ,
452  S_IRWXU | S_IROTH | S_IRGRP );
453  if (output_fd < 0)
454  {
455  cerr << "file " << filename << " exists, I will not overwrite " << endl;
456  i = htonl(CTRL_REMOTEFAIL);
457  writen (dd_fd, (char *)&i, sizeof(i));
458  break;
459  }
460  if (verbose)
461  {
462  cout << " opened new rollover file " << filename << endl;
463  }
464  }
465 
466  i = htonl(CTRL_REMOTESUCCESS);
467  writen (dd_fd, (char *)&i, sizeof(i));
468 
469  break;
470 
471  case CTRL_BEGINRUN:
472 
473  readn (dd_fd, (char *) &local_runnr, sizeof(local_runnr) );
474  local_runnr = ntohl(local_runnr);
475  //cout << " runnumber = " << local_runnr << endl;
476 
477  if (! do_not_write)
478  {
479  output_fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC | O_EXCL | O_LARGEFILE ,
480  S_IRWXU | S_IROTH | S_IRGRP );
481  if (output_fd < 0)
482  {
483  cerr << "file " << filename << " exists, I will not overwrite " << endl;
484  i = htonl(CTRL_REMOTEFAIL);
485  writen (dd_fd, (char *)&i, sizeof(i));
486  break;
487  }
488  if (verbose)
489  {
490  cout << " opened new file " << filename << endl;
491  }
492  }
493 
494  bf_being_received = &B0;
495  bf_being_written = &B1;
496 
497  bf_being_received->role = ROLE_RECEIVED;
498  bf_being_received->dirty=0;
499 
500  bf_being_written->role = ROLE_WRITTEN;
501  bf_being_written->dirty = 0;
502 
503  i = htonl(CTRL_REMOTESUCCESS);
504  writen (dd_fd, (char *)&i, sizeof(i));
505 
506  break;
507 
508 
509 
510 
511 
512 
513  case CTRL_DATA:
514  status = readn (dd_fd, (char *) &len, sizeof(i));
515  len = ntohl(len);
516  //cout << " data! len = " << len << endl;
517 
518  bf_being_received->bytecount = len;
519  if ( (len+sizeof(int)-1)/sizeof(int) > bf_being_received->buffersize)
520  {
521  delete [] bf_being_received->bf;
522  bf_being_received->buffersize = (len + sizeof(int)-1)/sizeof(int) + 2048;
523 
524  pthread_mutex_lock(&M_cout);
525  // cout << "expanding buffer to " << bf_being_received->buffersize << " for host " << host << endl;
526  pthread_mutex_unlock(&M_cout);
527 
528  bf_being_received->bf = new PHDWORD[ bf_being_received->buffersize];
529  }
530 
531  p = (char *) bf_being_received->bf;
532  status = readn (dd_fd, p, len );
533  if ( len != status)
534  {
535 
536  pthread_mutex_lock(&M_cout);
537  // cout << "error on transfer: Expected " << len
538  // << " got status " << status << " for host " << host << endl;
539  perror("ctrl_data");
540  pthread_mutex_unlock(&M_cout);
541  i = htonl(CTRL_REMOTEFAIL);
542  }
543  // store the actual byte length we received
544  bf_being_received->bytecount = len;
545  bf_being_received->dirty = 1;
546 
547  // cout << __FILE__ << " " << __LINE__ << " waiting for write thread " << endl;
548  pthread_mutex_lock(&M_done);
549  // and switch the buffers
550  bf_temp = bf_being_received;
551  bf_being_received = bf_being_written;
552  bf_being_written = bf_temp;
553  bf_being_written->role = ROLE_WRITTEN;
554  bf_being_received->role = ROLE_RECEIVED;
555 
556  // cout << __FILE__ << " " << __LINE__ << " switching buffers " << bf_being_received << " " << bf_being_received << endl;
557  pthread_mutex_unlock(&M_write);
558 
559  i = htonl(CTRL_REMOTESUCCESS);
560  writen (dd_fd, (char *)&i, sizeof(int));
561 
562  break;
563 
564  case CTRL_ENDRUN:
565  //cout << " endrun signal " << endl;
566 
567  pthread_mutex_lock(&M_done);
568  if (! do_not_write)
569  {
570  close (output_fd);
571  if (verbose)
572  {
573  cout << " closed file " << filename << " at " << time(0) << endl;
574  }
575  }
576 
577  i = htonl(CTRL_REMOTESUCCESS);
578  writen (dd_fd, (char *)&i, sizeof(i));
579  pthread_mutex_unlock(&M_done);
580 
581  break;
582 
583  case CTRL_CLOSE:
584  close ( dd_fd);
585 
586  // we set go_on to 0 so our loop stops and we return
587  go_on = 0;
588  if (verbose == 1)
589  {
590  cout << " closed connection from " << host << endl;
591  }
592  else if (verbose > 1)
593  {
594  cout << " closed connection from " << host << " at " << time(0) << endl;
595  }
596  break;
597 
598  }
599 
600  }
601 
602  if (verbose > 1) cout << " ending thread " << " at " << time(0) << endl;
603 
604  return 0;
605 }
606 
607 
608 
609 void *writebuffers ( void * arg)
610 {
611 
612  while(1)
613  {
614 
615  pthread_mutex_lock(&M_write);
616 
617  if (! do_not_write)
618  {
619  pthread_mutex_lock(&M_cout);
620  //cout << __LINE__ << " " << __FILE__ << " write thread unlocked " << endl;
621  pthread_mutex_unlock(&M_cout);
622 
623  int blockcount = ( bf_being_written->bytecount + BUFFERBLOCKSIZE -1)/BUFFERBLOCKSIZE;
624  int bytecount = blockcount*BUFFERBLOCKSIZE;
625 
626  pthread_mutex_lock(&M_cout);
627  //cout << __LINE__ << " " << __FILE__ << " write thread unlocked, block count " << blockcount << endl;
628  pthread_mutex_unlock(&M_cout);
629 
630  int bytes = writen ( output_fd, (char *) bf_being_written->bf , bytecount );
631  if ( bytes != bytecount)
632  {
633  pthread_mutex_lock(&M_cout);
634  cout << __LINE__ << " " << __FILE__ << " write error " << bytes << " " << bytecount << endl;
635  pthread_mutex_unlock(&M_cout);
636  bf_being_written->dirty = -1; // mark as "error"
637  }
638  else
639  {
640  // usleep(1000000);
641  bf_being_written->dirty = 0;
642  }
643  }
644  else
645  {
646  bf_being_written->dirty = 0;
647  }
648  pthread_mutex_unlock(&M_done);
649 
650  }
651  return 0;
652 }
653 
654 
655 
656 
657 int readn (int fd, char *ptr, int nbytes)
658 {
659 
660  int nread, nleft;
661  //int nleft, nread;
662  nleft = nbytes;
663  while ( nleft>0 )
664  {
665  nread = read (fd, ptr, nleft);
666  if ( nread <= 0 )
667  {
668  return nread;
669  }
670 
671 #ifdef FRAGMENTMONITORING
672  history[hpos++] = nread;
673 #endif
674  nleft -= nread;
675  ptr += nread;
676  }
677 
678 #ifdef FRAGMENTMONITORING
679  if ( hpos >1 )
680  {
681  cout << "Fragmented transfer of " << nbytes << "bytes: ";
682  for ( int i=0; i<hpos; i++)
683  {
684  cout << " " << history[i]<< ",";
685  }
686  cout << endl;
687  }
688 #endif
689  return (nbytes-nleft);
690 }
691 
692 int writen (int fd, char *ptr, int nbytes)
693 {
694  int nleft, nwritten;
695  nleft = nbytes;
696  while ( nleft>0 )
697  {
698  nwritten = write (fd, ptr, nleft);
699  if ( nwritten < 0 )
700  return nwritten;
701 
702  nleft -= nwritten;
703  ptr += nwritten;
704  }
705  return (nbytes-nleft);
706 }
707 
708 
709 in_addr_t find_address_from_interface(const char *interface)
710 {
711  int fd;
712  struct ifreq ifr;
713 
714  // check the no one plays tricks with us...
715  if ( strlen(interface) >= IFNAMSIZ)
716  {
717  cerr << " Interface name too long, ignoring "<< endl;
718  return htonl(INADDR_ANY);
719  }
720 
721  fd = socket(AF_INET, SOCK_DGRAM, 0);
722 
723  // I want to find an IPv4 IP address
724  ifr.ifr_addr.sa_family = AF_INET;
725 
726  // I want IP address attached to "interface"
727  strncpy(ifr.ifr_name, interface, IFNAMSIZ-1);
728 
729  ioctl(fd, SIOCGIFADDR, &ifr);
730 
731  close(fd);
732  in_addr_t a = ((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr.s_addr;
733 
734  if (verbose) cout << " binding to address " << inet_ntoa( ((struct sockaddr_in *)&ifr.ifr_addr)->sin_addr ) << endl;
735  return a;
736 }
737 
738 
739 
740 void cleanup( const int exitstatus)
741 {
742  close (dd_fd);
743 
744  exit(exitstatus);
745 }
746