Analysis Software
Documentation for sPHENIX simulation software
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
dpipe.cc
Go to the documentation of this file. Or view the newest version in sPHENIX GitHub for file dpipe.cc
1 #include <stdlib.h>
2 #include <signal.h>
3 #include <dlfcn.h>
4 #include <sys/types.h>
5 #include <sys/stat.h>
6 #include <fcntl.h>
7 #include <unistd.h>
8 
9 
10 #include "fileEventiterator.h"
11 #include "testEventiterator.h"
12 #include "listEventiterator.h"
13 #include "rcdaqEventiterator.h"
14 #include "ogzBuffer.h"
15 #include "olzoBuffer.h"
16 #include "oamlBuffer.h"
17 #include "ophBuffer.h"
18 #include "dpipe_filter.h"
19 
20 #include "phenixTypes.h"
21 #include "oEvent.h"
22 #include "stdio.h"
23 #include "EventTypes.h"
24 #ifdef HAVE_GETOPT_H
25 #include <getopt.h>
26 #endif
27 
28 #define RCDAQEVENTITERATOR 1
29 #define FILEEVENTITERATOR 2
30 #define TESTEVENTITERATOR 3
31 #define ETPOOL 4
32 #define DFILE 5
33 #define DNULL 6
34 #define LISTEVENTITERATOR 7
35 #define OAML 8
36 
37 
38 #ifndef WIN32
39 #if defined(SunOS) || defined(Linux) || defined(OSF1)
40 void sig_handler(int);
41 #else
42 void sig_handler(...);
43 #endif
44 #endif
45 
46 
47 void exitmsg()
48 {
49  COUT << "** usage: dpipe -s -d -v -w -n -i -z -l -x source destination" << std::endl;
50  COUT << " dpipe -h for more help" << std::endl;
51  exit(0);
52 }
53 
55 {
56  COUT << "** cannot specify both -e and -c!" << std::endl;
57  COUT << " type dpipe -h for more help" << std::endl;
58  exit(0);
59 }
60 
62 {
63  COUT << "** cannot specify both -z and -l!" << std::endl;
64  COUT << " type dpipe -h for more help" << std::endl;
65  exit(0);
66 }
67 
68 void exithelp()
69 {
70  COUT << std::endl;
71  COUT << " dpipe reads events from one source and writes it to a destination. The source can" << std::endl;
72  COUT << " be any of the standard sources (ET pool, file, filelist, or test stream). The destination " << std::endl;
73  COUT << " can be a file, a ET pool, an AML server, or no destination at all (you cannot write to a test " << std::endl;
74  COUT << " stream). Like with a Unix pipe, you can move events through a chain of sources " << std::endl;
75  COUT << " and destinations, for example, from one ET pool into another, or into a file. " << std::endl;
76  COUT << std::endl;
77  COUT << " While the events move through dpipe, you can have them identify themselves. If the " << std::endl;
78  COUT << " destination is null, this is a simple way to sift through a stream of events " << std::endl;
79  COUT << " and look at their identification messages. " << std::endl;
80  COUT << std::endl;
81  COUT << " You can throttle the data flow with the -w (wait) option, and you can stop after" << std::endl;
82  COUT << " a given number of events with the -n option. " << std::endl;
83  COUT << std::endl;
84  COUT << " In order to write events from a ET pool called ONLINE to a file (d.evt), use" << std::endl;
85  COUT << std::endl;
86  COUT << " > dpipe -s etpool -d file ONLINE d.evt" << std::endl;
87  COUT << std::endl;
88  COUT << " if you want to see which events are coming, add -i:" << std::endl;
89  COUT << std::endl;
90  COUT << " > dpipe -s etpool -d file -i ONLINE d.evt" << std::endl;
91  COUT << std::endl;
92  COUT << " If the output is a aml server, specify the destination as hostname:port: " << std::endl;
93  COUT << " > dpipe -s f -d a -i filename phnxbox4.phenix.bnl.gov:8900" << std::endl << std::endl;
94  COUT << " Note that you can abbreviate the etpool, file, listfile, and Test to d, f, l, and T." << std::endl;
95  COUT << " > dpipe -s etpool -d file ONLINE d.evt" << std::endl;
96  COUT << " is equivalent to " << std::endl;
97  COUT << " > dpipe -s d -d f ONLINE d.evt" << std::endl;
98  COUT << std::endl;
99  COUT << " List of options: " << std::endl;
100  COUT << " -s [d or f or l or T] source is et pool, file, listfile, or Test stream" << std::endl;
101  COUT << " -b <size in MB> in case you write to a file, specify the buffer size (default 4MB)" << std::endl;
102  COUT << " -d [d or f or a or n] destination is et pool or file , aml server or nothing" << std::endl;
103  COUT << " -v verbose" << std::endl;
104  COUT << " -w (time in milliseconds> wait time interval (in ms) between events to throttle the data flow" << std::endl;
105  COUT << " -e <event number> start from event number" << std::endl;
106  COUT << " -c <number> get nth event (-e gives event with number n)" << std::endl;
107  COUT << " -n <number> stop after so many events" << std::endl;
108  COUT << " -i have each event identify itself" << std::endl;
109  COUT << " -z gzip-compress each output buffer" << std::endl;
110  COUT << " -l LZO-compress each output buffer" << std::endl;
111  COUT << " -x sharedlibrary.so load a plugin that can select events" << std::endl;
112  COUT << " -h this message" << std::endl << std::endl;
113  exit(0);
114 }
115 
116 
117 // The global pointer to the Eventiterator (we must be able to
118 // get at it in the signal handler)
120 
121 
122 char *sharedlib;
123 int load_lib = 0;
124 
126 
127 
128 
130 {
131  if ( filter ) delete filter;
132  filter = T;
133 
134 }
135 
137 {
138  if ( filter && T == filter )
139  {
140  filter = 0;
141  }
142 
143 }
144 
145 
146 
147 
148 int
149 main(int argc, char *argv[])
150 {
151  int c;
152  int status = 0;
153 
154  int sourcetype =DFILE;
155  int destinationtype = ETPOOL;
156  int waitinterval = 0;
157  int verbose = 0;
158  int identify = 0;
159  int maxevents = 0;
160  int eventnr = 0;
161  int gzipcompress = 0;
162  int lzocompress = 0;
163  int eventnumber =0;
164  int countnumber =0;
165  void *voidpointer;
166 
167 
168  PHDWORD *buffer;
169  oBuffer *ob = 0;
170  int fd = 0;
171  int buffer_size = 256*1024*4 ; // makes 4MB (specifies how many dwords, so *4)
172 
173 
174  // initialize the it pointer to 0;
175  it = 0;
176 
177  // if (argc < 3) exitmsg();
178  // COUT << "parsing input" << std::endl;
179 
180 #ifndef WIN32
181  while ((c = getopt(argc, argv, "e:b:c:s:d:n:w:x:vhizl")) != EOF)
182  {
183  switch (c)
184  {
185  // the -s (source type) switch
186  case 'e':
187  if ( !sscanf(optarg, "%d", &eventnumber) ) exitmsg();
188  break;
189 
190  case 'b':
191  if ( !sscanf(optarg, "%d", &buffer_size) ) exitmsg();
192  buffer_size = buffer_size*256*1024;
193  break;
194 
195  case 'c':
196  if ( !sscanf(optarg, "%d", &countnumber) ) exitmsg();
197  break;
198 
199  case 's':
200  if ( *optarg == 'T' ) sourcetype = TESTEVENTITERATOR;
201  else if ( *optarg == 'f' ) sourcetype = FILEEVENTITERATOR;
202  else if ( *optarg == 'r' ) sourcetype = RCDAQEVENTITERATOR;
203  else if ( *optarg == 'l' ) sourcetype = LISTEVENTITERATOR;
204  else exitmsg();
205  break;
206 
207  // the -d (destination type) switch
208  case 'd':
209  if ( *optarg == 'd' ) destinationtype = ETPOOL;
210  else if ( *optarg == 'f' ) destinationtype = DFILE;
211  else if ( *optarg == 'n' ) destinationtype = DNULL;
212  else if ( *optarg == 'a' ) destinationtype = OAML;
213  else exitmsg();
214  break;
215 
216  case 'v': // verbose
217  verbose++;
218  break;
219 
220  case 'i': // identify
221  identify = 1;
222  break;
223 
224  case 'w': // wait interval
225  if ( !sscanf(optarg, "%d", &waitinterval) ) exitmsg();
226  break;
227 
228  case 'n': // number of events
229  if ( !sscanf(optarg, "%d", &maxevents) ) exitmsg();
230  break;
231 
232  case 'z': // gzip-compress
233  gzipcompress = 1;
234  break;
235 
236  case 'l': // lzo-compress
237  lzocompress = 1;
238  break;
239 
240  case 'x': // load a filter shared lib
241  voidpointer = dlopen(optarg, RTLD_GLOBAL | RTLD_NOW);
242  if (!voidpointer)
243  {
244  std::cout << "Loading of the filter library "
245  << optarg << " failed: " << dlerror() << std::endl;
246 
247  }
248  if (filter) std::cout <<" Filter \"" << filter->idString() << "\" registered" << std::endl;
249 
250  sharedlib=optarg;
251  load_lib=1;
252  break;
253 
254  case 'h':
255  exithelp();
256  break;
257 
258  default:
259  break;
260 
261  }
262  }
263 #else
264  char* pszParam; // gotten parameter
265  char chOpt;
266 
267  while ( (chOpt = GetOption(argc, argv, "e:b:c:s:d:n:w:vhiz", &pszParam) ) >1)
268  {
269  // COUT << "option is " << chOpt << std::endl;
270  // chOpt is valid argument
271  switch (chOpt)
272  {
273  case 'e':
274  //COUT << "parameter is " << pszParam << std::endl;
275  if ( !sscanf(pszParam, "%d", &eventnumber) ) exitmsg();
276  break;
277 
278  case 'b':
279  //COUT << "parameter is " << pszParam << std::endl;
280  if ( !sscanf(pszParam, "%d", &buffer_size) ) exitmsg();
281  buffer_size = buffer_size*256*1024;
282  break;
283 
284  case 'c':
285  //COUT << "parameter is " << pszParam << std::endl;
286  if ( !sscanf(pszParam, "%d", &countnumber) ) exitmsg();
287  break;
288 
289  case 's':
290  //COUT << "parameter is " << pszParam << std::endl;
291  if ( *pszParam == 'T' ) sourcetype = TESTEVENTITERATOR;
292  else if ( *pszParam == 'f' ) sourcetype = FILEEVENTITERATOR;
293  else exitmsg();
294  break;
295 
296  // the -d (destination type) switch
297  case 'd':
298  // COUT << "parameter is " << pszParam << std::endl;
299  if ( *pszParam == 'f' ) destinationtype = DFILE;
300  else if ( *pszParam == 'n' ) destinationtype = DNULL;
301  else exitmsg();
302  break;
303 
304  case 'v': // verbose
305  verbose = 1;
306  break;
307 
308  case 'i': // identify
309  identify = 1;
310  break;
311 
312  case 'w': // wait interval
313  if ( !sscanf(pszParam, "%d", &waitinterval) ) exitmsg();
314  break;
315 
316  case 'n': // number of events
317  //COUT << "parameter is " << pszParam << std::endl;
318  if ( !sscanf(pszParam, "%d", &maxevents) ) exitmsg();
319  break;
320 
321  case 'z': // gzip-compress
322  gzipcompress = 1;
323  break;
324 
325  case 'h':
326  exithelp();
327  break;
328 
329  default:
330  break;
331 
332  }
333  }
334 #endif
335 
336 
337  if ( eventnumber && countnumber) evtcountexitmsg();
338  if ( gzipcompress && lzocompress ) compressionexitmsg();
339 
340  // install some handlers for the most common signals
341 #ifndef WIN32
342  signal(SIGKILL, sig_handler);
343  signal(SIGTERM, sig_handler);
344  signal(SIGINT, sig_handler);
345 #endif
346 
347  // see if we can open the file
348  switch (sourcetype)
349  {
350  case RCDAQEVENTITERATOR:
351  it = new rcdaqEventiterator(argv[optind], status);
352  break;
353 
354  case TESTEVENTITERATOR:
355 
356  it = new testEventiterator();
357  status =0;
358  break;
359 
360  case FILEEVENTITERATOR:
361  it = new fileEventiterator(argv[optind], status);
362  break;
363 
364  case LISTEVENTITERATOR:
365  it = new listEventiterator(argv[optind], status);
366  break;
367 
368  default:
369  exitmsg();
370  break;
371  }
372 
373  if (status)
374  {
375  delete it;
376  COUT << "Could not open input stream" << std::endl;
377  exit(1);
378  }
379 
380  // set the verbosity of the iterator
381  it->setVerbosity(verbose);
382 
383  // now we'll see where the data go to
384 
385  if ( destinationtype == DNULL )
386  {
387  // identify = 1;
388  }
389 
390  else if ( destinationtype == DFILE)
391  {
392  buffer = new PHDWORD [buffer_size];
393 
394  unlink (argv[optind+1]);
395 
396  fd = open (argv[optind+1], O_WRONLY | O_CREAT | O_EXCL | O_LARGEFILE ,
397  S_IRWXU | S_IROTH | S_IRGRP );
398 
399  if ( fd < 0)
400  {
401  COUT << "Could not open file: " << argv[optind+1] << std::endl;
402  exit (1);
403  }
404  if (gzipcompress)
405  {
406  ob = new ogzBuffer (fd, buffer, buffer_size);
407  }
408  else if ( lzocompress)
409  {
410  ob = new olzoBuffer (fd, buffer, buffer_size);
411  }
412  else
413  {
414  ob = new ophBuffer (fd, buffer, buffer_size);
415  }
416 
417  }
418 
419 
420  else if ( destinationtype == OAML)
421  {
422  buffer = new PHDWORD [buffer_size];
423  ob = new oamlBuffer (argv[optind+1], buffer, buffer_size);
424  }
425 
426 
427  else
428  {
429  COUT << "invalid destination" << std::endl;
430  exitmsg();
431  }
432 
433 
434  // COUT << "waitinterval is " << waitinterval << std::endl;
435  // ok. now go through the events
436  Event *evt;
437  int take_this;
438  int count = 0;
439  //COUT << " max events = " << maxevents << std::endl;
440  while ( ( maxevents == 0 || eventnr < maxevents) &&
441  ( evt = it->getNextEvent()) )
442  {
443  take_this = 1;
444  count++;
445 
446  if ( eventnumber )
447  {
448  if ( evt->getEvtSequence() == eventnumber)
449  eventnumber = 0;
450  else
451  take_this = 0;
452  }
453 
454  if ( countnumber && count < countnumber)
455  take_this = 0;
456 
457  if (take_this)
458  {
459  if ( (! filter) || filter->select(evt) )
460  {
461  if (identify) evt->identify();
462  if ( destinationtype == DFILE || destinationtype == OAML )
463  {
464  status = ob->addEvent(evt);
465  }
466  if ( status )
467  {
468  COUT << "Error writing events " << std::endl;
469  break;
470  }
471  eventnr++;
472  }
473  }
474 
475  delete evt;
476 
477 #ifndef WIN32
478  if (waitinterval > 0) usleep(waitinterval*1000);
479 #else
480  if (waitinterval > 0) Sleep(waitinterval);
481 #endif
482 
483  }
484  delete it;
485 
486  if ( destinationtype == DFILE )
487  {
488  delete ob;
489 
490 #ifndef WIN32
491  close(fd);
492 #endif
493  }
494 
495 #ifndef WIN32
496  else if ( destinationtype == OAML )
497  {
498  delete ob;
499  }
500 #endif
501 
502 
503  return 0;
504 }
505 
506 #if defined(SunOS) || defined(Linux) || defined(OSF1)
507 void sig_handler(int i)
508 #else
509  void sig_handler(...)
510 #endif
511 {
512  if (it) delete it;
513  exit(0);
514 }
515 
516 
517