Analysis Software
Documentation for sPHENIX simulation software
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
eventServer.cc
Go to the documentation of this file. Or view the newest version in sPHENIX GitHub for file eventServer.cc
1 
2 
3 #define coutfl cout << __FILE__<< " " << __LINE__ << " "
4 #define cerrfl cerr << __FILE__<< " " << __LINE__ << " "
5 
6 
7 #include <iostream>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <string.h>
11 #include <sys/types.h>
12 #include <sys/socket.h>
13 #include <arpa/inet.h>
14 #include <netinet/in.h>
15 #include <pthread.h>
16 
17 #ifdef HAVE_GETOPT_H
18 #include "getopt.h"
19 #endif
20 
21 #include <map>
22 
23 
24 #include "testEventiterator.h"
25 #include "fileEventiterator.h"
26 #include "rcdaqEventiterator.h"
27 #include "oncsEventiterator.h"
28 
29 using namespace std;
30 
31 #define PORT 8080
32 #define MAXSIZE 5120
33 
34 map<int, Event*> EventMap;
36 
37 
38 #define RCDAQEVENTITERATOR 1
39 #define FILEEVENTITERATOR 2
40 #define TESTEVENTITERATOR 3
41 #define ONCSEVENTITERATOR 4
42 
43 void exitmsg()
44 {
45  cout << "** usage: gl1_server -nifTrOvh datastream" << std::endl;
46  cout << " type gl1_server -h for more help" << std::endl;
47  exit(0);
48 }
49 
50 void exithelp()
51 {
52 
53  cout << std::endl;
54  cout << " gl1_server serves events from a given datastream" << std::endl;
55  cout << std::endl;
56  cout << " List of options: " << std::endl;
57  cout << " -d <number> depth of buffer" << std::endl;
58  cout << " -f (stream is a file)" << std::endl;
59  cout << " -T (stream is a test stream)" << std::endl;
60  cout << " -r (stream is a rcdaq monitoring stream)" << std::endl;
61  cout << " -O (stream is a legacy ONCS format file)" << std::endl;
62  cout << " -h this message" << std::endl;
63  cout << endl;
64  cout << " debug options" << endl;
65  cout << " -s <number> sleep so many ticks (in units of usleep)" << std::endl;
66  cout << " -i <print event identity>" << std::endl;
67  cout << " -v verbose" << std::endl;
68  exit(0);
69 }
70 
71 unsigned int depth = 1000;
72 int go_on = 1;
73 int identify = 0;
74 int verbose = 0;
75 int repeatcount =1;
77 int sleeptime = 0;
78 int old_runnumber = -9999;
79 
80 pthread_mutex_t MapSem;
81 
82 
83 
84 void * EventLoop( void *arg)
85 {
86 
87  if ( identify) it->identify();
88 
89  while ( go_on)
90  {
91  Event *e = it->getNextEvent();
92  if ( ! e)
93  {
94  go_on = 0;
95  return 0;
96 
97  }
98 
99  pthread_mutex_lock( &MapSem);
100  map<int, Event*>::iterator it = EventMap.begin();
101 
102  // if we find that our run number has changed, we clear out what we have
103  if ( old_runnumber != e->getRunNumber())
104  {
106  for ( ; it != EventMap.end(); ++it)
107  {
108  delete it->second;
109  }
110  EventMap.clear();
111  }
112 
113  // if the next event inserted would exceed the envisioned depth, we remove the oldest
114  if (EventMap.size() >= depth)
115  {
116  map<int, Event*>::iterator it = EventMap.begin();
117  if ( verbose ) coutfl << "erasing event " << it->first << " depth = " << EventMap.size() << endl;
118  delete it->second;
119  EventMap.erase(it);
120  }
121 
122  // ok, so now we insert...
123  EventMap[e->getEvtSequence()] = e;
124 
125  // and unlock the map
126  pthread_mutex_unlock( &MapSem);
127 
128  if ( identify )
129  {
130  e->identify();
131  }
132 
133  if (sleeptime) usleep(sleeptime);
134  }
135  return 0;
136 }
137 
138 
139 int
140 main(int argc, char *argv[])
141 {
142  int sockfd;
143  int buffer[MAXSIZE];
144 
145  int c;
146 
147 
148  int status = -1;
149 
150  pthread_mutex_init( &MapSem, 0);
151 
152  while ((c = getopt(argc, argv, "d:s:ifTrOvh")) != EOF)
153  switch (c)
154  {
155  case 'd':
156  if ( !sscanf(optarg, "%d", &depth) ) exitmsg();
157  break;
158 
159  case 's':
160  if ( !sscanf(optarg, "%d", &sleeptime) ) exitmsg();
161  break;
162 
163  case 'i':
164  identify = 1;
165  break;
166 
167  case 'T':
169  break;
170 
171  case 'f':
173  break;
174 
175  case 'r':
177  break;
178 
179  case 'O':
181  break;
182 
183  case 'v': // verbose
184  verbose++;
185  break;
186 
187  case 'h':
188  exithelp();
189  break;
190  }
191 
192  switch (ittype)
193  {
194  case RCDAQEVENTITERATOR:
195  if ( optind+1>argc)
196  {
197  std::string host = "localhost";
198 
199  if ( getenv("RCDAQHOST") )
200  {
201  host = getenv("RCDAQHOST");
202  }
203 
204  it = new rcdaqEventiterator(host.c_str(), status);
205 
206  }
207  else
208  {
209  it = new rcdaqEventiterator(argv[optind], status);
210  }
211  break;
212 
213  case TESTEVENTITERATOR:
214  it = new testEventiterator();
215  status =0;
216  break;
217 
218  case FILEEVENTITERATOR:
219  if ( optind+1>argc) exitmsg();
220  it = new fileEventiterator(argv[optind], status);
221  break;
222 
223  case ONCSEVENTITERATOR:
224  if ( optind+1>argc) exitmsg();
225  it = new oncsEventiterator(argv[optind], status);
226  break;
227 
228  status = 1;
229  break;
230 
231  default:
232  exitmsg();
233  break;
234  }
235 
236  if (status)
237  {
238  delete it;
239  it = 0;
240  cout << "Could not open input stream" << std::endl;
241  exit(1);
242  }
243 
244 
245  pthread_t ThreadEvt;
246 
247  status = pthread_create(&ThreadEvt, NULL,
248  EventLoop,
249  (void *) 0);
250 
251  if (status )
252  {
253  cout << "error in event thread create " << status << endl;
254  exit(0);
255  }
256 
257 
258 
259  struct sockaddr_in servaddr, cliaddr;
260 
261  // Creating socket file descriptor
262  if ( (sockfd = socket(AF_INET, SOCK_DGRAM, 0)) < 0 )
263  {
264  perror("socket creation failed");
265  exit(EXIT_FAILURE);
266  }
267 
268  memset(&servaddr, 0, sizeof(servaddr));
269  memset(&cliaddr, 0, sizeof(cliaddr));
270 
271  // Filling server information
272  servaddr.sin_family = AF_INET; // IPv4
273  servaddr.sin_addr.s_addr = INADDR_ANY;
274  servaddr.sin_port = htons(PORT);
275 
276  // Bind the socket with the server address
277  if ( bind(sockfd, (const struct sockaddr *)&servaddr,
278  sizeof(servaddr)) < 0 )
279  {
280  perror("bind failed");
281  exit(EXIT_FAILURE);
282  }
283 
284  socklen_t len;
285  int n;
286 
287  len = sizeof(cliaddr); //len is value/result
288 
289  int recbuffer[10];
290 
291  while (1)
292  {
293  n = recvfrom(sockfd, (char *)recbuffer, 2*sizeof(int),
294  MSG_WAITALL, ( struct sockaddr *) &cliaddr,
295  &len);
296  if ( verbose )
297  {
298  cout << "request from " << inet_ntoa(cliaddr.sin_addr) << " requesting " << recbuffer[0] << endl;
299  }
300 
301  pthread_mutex_lock( &MapSem);
302 
303  map<int, Event*>::iterator it = EventMap.find(recbuffer[0]);
304  if ( it == EventMap.end() )
305  {
306  pthread_mutex_unlock( &MapSem);
307 
308  buffer[0] = 0;
309  sendto(sockfd, (const char *) buffer, sizeof(int),
310  MSG_CONFIRM, (const struct sockaddr *) &cliaddr,
311  len);
312  }
313  else
314  {
315  int nw;
316  (it->second)->Copy(buffer,MAXSIZE,&nw,"");
317  pthread_mutex_unlock( &MapSem);
318  sendto(sockfd, (const char *) buffer, nw*sizeof(int),
319  MSG_CONFIRM, (const struct sockaddr *) &cliaddr,
320  len);
321  }
322  }
323  return 0;
324 }