Analysis Software
Documentation for sPHENIX simulation software
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
pmonitorInterface.cc
Go to the documentation of this file. Or view the newest version in sPHENIX GitHub for file pmonitorInterface.cc
1 /*
2  This is the interface from the monitoring to pmonitor
3 */
4 
5 #include "pmonitorInterface.h"
6 #include "HistoBinDefs.h"
7 #include "OnlMon.h"
8 #include "OnlMonDefs.h"
9 #include "OnlMonServer.h"
10 
11 #pragma GCC diagnostic push
12 #pragma GCC diagnostic ignored "-Wunused-parameter"
13 #include <Event/Event.h>
14 #include <pmonitor.h>
15 #pragma GCC diagnostic pop
16 
17 #include <Event/EventTypes.h>
18 #include <Event/msg_control.h>
19 #include <Event/msg_profile.h>
20 #include <Event/packet.h>
21 
22 #include <MessageTypes.h> // for kMESS_OBJECT, kMESS_STRING
23 #include <TClass.h>
24 #include <TH1.h>
25 #include <TInetAddress.h> // for TInetAddress
26 #include <TMessage.h>
27 #include <TROOT.h>
28 #include <TServerSocket.h>
29 #include <TSocket.h>
30 #include <TSystem.h>
31 #include <TThread.h>
32 
33 #include <pthread.h>
34 #include <sys/types.h> // for time_t
35 #include <unistd.h> // for sleep
36 #include <cstdio> // for printf, NULL
37 #include <cstdlib> // for exit
38 #include <cstring> // for strcmp
39 #include <iostream> // for operator<<, basic_ostream, endl, basic_o...
40 #include <sstream>
41 #include <string>
42 
43 //#define ROOTTHREAD
44 
45 #ifndef ROOTTHREAD
46 #define SERVER
47 #endif
48 
49 #ifdef SERVER
50 static void *server(void *);
51 int ServerThread = 0;
52 #endif
53 
54 #ifdef ROOTTHREAD
55 static void *server(void *);
56 static TThread *ServerThread = nullptr;
57 #endif
58 
59 #ifdef USE_MUTEX
60 pthread_mutex_t mutex;
61 #endif
62 
63 TH1 *FrameWorkVars = nullptr;
64 
65 //*********************************************************************
66 
67 int pinit()
68 {
69  OnlMonServer *Onlmonserver = OnlMonServer::instance();
70 #ifdef USE_MUTEX
71  Onlmonserver->GetMutex(mutex);
72 #endif
73  for (int i = 0; i < kMAXSIGNALS; i++)
74  {
75  gSystem->IgnoreSignal((ESignals) i);
76  }
77 #ifdef USE_MUTEX
78  pthread_mutex_lock(&mutex);
79 #endif
80 
81 #if defined(SERVER) || defined(ROOTTHREAD)
82 
83  pthread_t ThreadId = 0;
84  if (!ServerThread)
85  {
86  // std::cout << "creating server thread" << std::endl;
87 #ifdef SERVER
88 
89  ServerThread = pthread_create(&ThreadId, nullptr, server, (void *) nullptr);
90  Onlmonserver->SetThreadId(ThreadId);
91 #endif
92 #ifdef ROOTTHREAD
93 
94  ServerThread = new TThread(server, (void *) 0);
95  ServerThread->Run();
96 #endif
97  }
98 #endif
99  // for the timestamp we need doubles
100  FrameWorkVars = new TH1D("FrameWorkVars", "FrameWorkVars", NFRAMEWORKBINS, 0., NFRAMEWORKBINS);
101  Onlmonserver->registerCommonHisto(FrameWorkVars);
102 #ifdef USE_MUTEX
103  pthread_mutex_unlock(&mutex);
104 #endif
105 
106  return 0;
107 }
108 
109 //*********************************************************************
111 {
112  static uint64_t savetmpticks = 0x7FFFFFFF;
113  static uint64_t borticks = 0;
114  static uint64_t eorticks = 0;
115  static int eventcnt = 0;
116 
118  uint64_t tmpticks = evt->getTime();
119 
120  // first test if a new run has started and call BOR/EOR methods of monitors
121  if (se->RunNumber() == -1)
122  {
123  savetmpticks = 0x7FFFFFFF;
124  eventcnt = 0;
125  int newrun = evt->getRunNumber();
126 #ifdef USE_MUTEX
127  pthread_mutex_lock(&mutex);
128 #endif
129  FrameWorkVars->SetBinContent(RUNNUMBERBIN, (Stat_t) newrun);
130  se->BadEvents(0);
131  se->EventNumber(evt->getEvtSequence());
132  se->RunNumber(newrun);
133  // set ticks to the current event, so the begin run has the
134  // up to date time stamp, while the end run has the time stamp from
135  // the last event of the previous run
136  se->CurrentTicks(tmpticks);
137  se->BeginRun(newrun);
138  // set trigger mask in et pool frontend
139  borticks = se->BorTicks();
140  FrameWorkVars->SetBinContent(BORTIMEBIN, (Stat_t) borticks);
141 #ifdef USE_MUTEX
142  pthread_mutex_unlock(&mutex);
143 #endif
144  eorticks = borticks;
145  }
146  if (evt->getEvtLength() <= 0 || evt->getEvtLength() > 2500000)
147  {
148  std::ostringstream msg;
149  msg << __PRETTY_FUNCTION__ << "Discarding event with length "
150  << evt->getEvtLength();
151  send_message(MSG_SEV_WARNING, msg.str());
152  se->AddBadEvent();
153  return 0;
154  }
155 
156  int oldrun;
157  if ((oldrun = se->RunNumber()) != evt->getRunNumber())
158  {
159  // ROOT crashes when one thread updates histos while they are
160  // being saved, need mutex protection here
161 #ifdef USE_MUTEX
162  pthread_mutex_lock(&mutex);
163 #endif
164  FrameWorkVars->SetBinContent(EORTIMEBIN, (Stat_t) eorticks); // set EOR time
165  se->EndRun(oldrun);
166  se->WriteHistoFile();
167  se->Reset(); // reset all monitors
168  int newrun = evt->getRunNumber();
169  FrameWorkVars->SetBinContent(RUNNUMBERBIN, (Stat_t) newrun);
170  eorticks = tmpticks; // initialize eorticks
171  se->BadEvents(0);
172  se->RunNumber(newrun);
173  se->EventNumber(evt->getEvtSequence());
174  // set ticks to the current event, so the begin run has the
175  // up to date time stamp, while the end run has the time stamp from
176  // the last event of the previous run
177  se->CurrentTicks(tmpticks);
178  se->BeginRun(newrun);
179  borticks = se->BorTicks();
180  FrameWorkVars->SetBinContent(BORTIMEBIN, (Stat_t) borticks);
181  eventcnt = 0;
182 #ifdef USE_MUTEX
183  pthread_mutex_unlock(&mutex);
184 #endif
185  savetmpticks = 0x7FFFFFFF;
186  }
187 
188  se->CurrentTicks(tmpticks);
189  // check if we get an event which was earlier than the BOR timestamp
190  // save earliest time stamp and number of events with earlier timestamps
191  if (tmpticks < borticks)
192  {
193 #ifdef USE_MUTEX
194  pthread_mutex_lock(&mutex);
195 #endif
196  FrameWorkVars->AddBinContent(EARLYEVENTNUMBIN);
197  if (tmpticks < savetmpticks)
198  {
199  savetmpticks = tmpticks;
200  FrameWorkVars->SetBinContent(EARLYEVENTTIMEBIN, (Stat_t) tmpticks);
201  }
202 #ifdef USE_MUTEX
203  pthread_mutex_unlock(&mutex);
204 #endif
205  }
206  if (eorticks < se->CurrentTicks())
207  {
208  eorticks = se->CurrentTicks();
209  }
210  if (evt->getErrorCode())
211  {
212  std::ostringstream msg;
213  msg << __PRETTY_FUNCTION__ << " Event with error code: "
214  << evt->getErrorCode()
215  << " discarding event " << evt->getEvtSequence();
216  send_message(MSG_SEV_WARNING, msg.str());
217  se->AddBadEvent();
218  return 0;
219  }
220 
221  eventcnt++;
222 #ifdef USE_MUTEX
223  pthread_mutex_lock(&mutex);
224 #endif
225  FrameWorkVars->SetBinContent(CURRENTTIMEBIN, (Stat_t) se->CurrentTicks());
226  se->EventNumber(evt->getEvtSequence());
227  se->process_event(evt);
228 #ifdef USE_MUTEX
229  pthread_mutex_unlock(&mutex);
230 #endif
231  return 0;
232 }
233 
235 {
236  return 0;
237 }
238 
239 static void *server(void * /* arg */)
240 {
241  OnlMonServer *Onlmonserver = OnlMonServer::instance();
242  int MoniPort = OnlMonDefs::MONIPORT;
243  // int thread_arg[5];
244 #ifdef USE_MUTEX
245  pthread_mutex_lock(&mutex);
246 #endif
247  TServerSocket *ss = nullptr;
248  sleep(5);
249  do
250  {
251  if (ss)
252  {
253  delete ss;
254  }
255  ss = new TServerSocket(MoniPort, kTRUE);
256  // Accept a connection and return a full-duplex communication socket.
257  Onlmonserver->PortNumber(MoniPort);
258  if ((MoniPort - OnlMonDefs::MONIPORT) >= OnlMonDefs::NUMMONIPORT)
259  {
260  std::ostringstream msg;
261  msg << "Too many Online Monitors running on this machine, bailing out";
262  send_message(MSG_SEV_FATAL, msg.str());
263 
264  exit(1);
265  }
266  MoniPort++;
267  if (!ss->IsValid())
268  {
269  printf("Ignore ROOT error about socket in use, I try another one\n");
270  }
271  } while (!ss->IsValid()); // from do{}while
272 
273  // root keeps a list of sockets and tries to close them when quitting.
274  // this interferes with my own threading and makes valgrind crash
275  // The solution is to remove the TServerSocket *ss from roots list of
276  // sockets. Then it will leave this socket alone.
277  int isock = gROOT->GetListOfSockets()->IndexOf(ss);
278  gROOT->GetListOfSockets()->RemoveAt(isock);
279  sleep(10);
280 #ifdef USE_MUTEX
281  pthread_mutex_unlock(&mutex);
282 #endif
283 again:
284  TSocket *s0 = ss->Accept();
285  if (!s0)
286  {
287  std::cout << "Server socket " << OnlMonDefs::MONIPORT
288  << " in use, either go to a different node or" << std::endl
289  << "change MONIPORT in server/OnlMonDefs.h and recompile" << std::endl
290  << "server and client" << std::endl;
291  exit(1);
292  }
293  // mutex protected since writing of histo
294  // to outgoing buffer and updating by other thread do not
295  // go well together
296  if (Onlmonserver->Verbosity() > 2)
297  {
298  TInetAddress adr = s0->GetInetAddress();
299  std::cout << "got connection from " << std::endl;
300  adr.Print();
301  }
302  // std::cout << "try locking mutex" << std::endl;
303 #ifdef USE_MUTEX
304  pthread_mutex_lock(&mutex);
305 #endif
306  // std::cout << "got mutex" << std::endl;
307  handleconnection(s0);
308  // std::cout << "try releasing mutex" << std::endl;
309 #ifdef USE_MUTEX
310  pthread_mutex_unlock(&mutex);
311 #endif
312  // std::cout << "mutex released" << std::endl;
313  delete s0;
314  /*
315  if (!aargh)
316  {
317  std::cout << "making thread" << std::endl;
318  aargh = new TThread(handletest,(void *)0);
319  aargh->Run();
320  }
321  */
322  // std::cout << "closing socket" << std::endl;
323  // s0->Close();
324  goto again;
325 }
326 
327 void handletest(void * /* arg */)
328 {
329  // std::cout << "threading" << std::endl;
330  return;
331 }
332 
333 void handleconnection(void *arg)
334 {
335  TSocket *s0 = (TSocket *) arg;
336 
337  OnlMonServer *Onlmonserver = OnlMonServer::instance();
338  /*
339  int val;
340  s0->GetOption(kSendBuffer, val);
341  printf("sendbuffer size: %d\n", val);
342  s0->GetOption(kRecvBuffer, val);
343  printf("recvbuffer size: %d\n", val);
344  */
345  TMessage *mess = nullptr;
346  TMessage outgoing(kMESS_OBJECT);
347  while (true)
348  {
349  if (Onlmonserver->Verbosity() > 2)
350  {
351  std::cout << "Waiting for message" << std::endl;
352  }
353  s0->Recv(mess);
354  if (!mess)
355  {
356  std::cout << "Broken Connection, closing socket" << std::endl;
357  break;
358  }
359  if (mess->What() == kMESS_STRING)
360  {
361  char strchr[OnlMonDefs::MSGLEN];
362  mess->ReadString(strchr, OnlMonDefs::MSGLEN);
363  delete mess;
364  mess = nullptr;
365  std::string str = strchr;
366  if (Onlmonserver->Verbosity() > 2)
367  {
368  std::cout << "received message: " << str << std::endl;
369  }
370  if (str == "Finished")
371  {
372  break;
373  }
374  else if (str == "WriteRootFile")
375  {
376  Onlmonserver->WriteHistoFile();
377  s0->Send("Finished");
378  break;
379  }
380  else if (str == "Ack")
381  {
382  continue;
383  }
384  else if (str == "HistoList")
385  {
386  if (Onlmonserver->Verbosity() > 2)
387  {
388  std::cout << "number of histos: " << Onlmonserver->nHistos() << std::endl;
389  }
390  for (auto monitors = Onlmonserver->monibegin(); monitors != Onlmonserver->moniend(); ++monitors)
391  {
392  for (auto &histos : monitors->second)
393  {
394  std::string subsyshisto = monitors->first + ' ' + histos.first;
395  if (Onlmonserver->Verbosity() > 2)
396  {
397  std::cout << "subsystem: " << monitors->first << ", histo: " << histos.first << std::endl;
398  std::cout << " sending: \"" << subsyshisto << "\"" << std::endl;
399  }
400  s0->Send(subsyshisto.c_str());
401  int nbytes = s0->Recv(mess);
402  delete mess;
403  mess = nullptr;
404  if (nbytes <= 0)
405  {
406  std::ostringstream msg;
407 
408  msg << "Problem receiving message: return code: " << nbytes;
409  send_message(MSG_SEV_ERROR, msg.str());
410  }
411  }
412  }
413  s0->Send("Finished");
414  }
415  else if (str == "ALL")
416  {
417  if (Onlmonserver->Verbosity() > 2)
418  {
419  std::cout << "number of histos: " << Onlmonserver->nHistos() << std::endl;
420  }
421  for (unsigned int i = 0; i < Onlmonserver->nHistos(); i++)
422  {
423  TH1 *histo = Onlmonserver->getHisto(i);
424  if (histo)
425  {
426  outgoing.Reset();
427  outgoing.WriteObject(histo);
428  s0->Send(outgoing);
429  outgoing.Reset();
430  s0->Recv(mess);
431  delete mess;
432  mess = nullptr;
433  }
434  }
435  s0->Send("Finished");
436  }
437  else if (str.find("ISRUNNING") != std::string::npos)
438  {
439  std::string answer = "No";
440  unsigned int pos_space = str.find(' ');
441  std::string moniname = str.substr(pos_space + 1, str.size());
442  for (auto moniter = Onlmonserver->monitor_vec_begin(); moniter != Onlmonserver->monitor_vec_end(); ++moniter)
443  {
444  if ((*moniter)->Name() == moniname)
445  {
446  answer = "Yes";
447  break;
448  }
449  }
450  if (Onlmonserver->Verbosity() > 2)
451  {
452  std::cout << "got " << str << ", replied " << answer << std::endl;
453  }
454  s0->Send(answer.c_str());
455  }
456  else if (str == "LISTMONITORS")
457  {
458  s0->Send("go");
459  for (auto moniter = Onlmonserver->monitor_vec_begin(); moniter != Onlmonserver->monitor_vec_end(); ++moniter)
460  {
461  if (Onlmonserver->Verbosity() > 2)
462  {
463  std::cout << "sending " << (*moniter)->Name().c_str() << std::endl;
464  }
465  s0->Send((*moniter)->Name().c_str());
466  }
467  s0->Send("Finished");
468  break;
469  }
470  else if (str == "LIST")
471  {
472  s0->Send("go");
473  while (true)
474  {
475  char strmess[OnlMonDefs::MSGLEN];
476  s0->Recv(mess);
477  if (!mess)
478  {
479  break;
480  }
481  if (mess->What() == kMESS_STRING)
482  {
483  mess->ReadString(strmess, OnlMonDefs::MSGLEN);
484  delete mess;
485  mess = nullptr;
486  if (std::string(strmess) == "alldone")
487  {
488  break;
489  }
490  }
491  std::string str1(strmess);
492  unsigned int pos_space = str1.find(' ');
493  if (Onlmonserver->Verbosity() > 2)
494  {
495  std::cout << __PRETTY_FUNCTION__ << " getting subsystem " << str1.substr(0, pos_space) << ", histo " << str1.substr(pos_space + 1, str1.size()) << std::endl;
496  }
497  TH1 *histo = Onlmonserver->getHisto(str1.substr(0, pos_space), str1.substr(pos_space + 1, str1.size()));
498  if (histo)
499  {
500  outgoing.Reset();
501  outgoing.WriteObject(histo);
502  s0->Send(outgoing);
503  outgoing.Reset();
504  }
505  else
506  {
507  s0->Send("UnknownHisto");
508  }
509  // delete mess;
510  }
511  s0->Send("Finished");
512  }
513  else
514  {
515  std::string strstr(str);
516  unsigned int pos_space = str.find(' ');
517  TH1 *histo = Onlmonserver->getHisto(strstr.substr(0, pos_space), strstr.substr(pos_space + 1, str.size()));
518  if (histo)
519  {
520  // const char *hisname = histo->GetName();
521  outgoing.Reset();
522  outgoing.WriteObject(histo);
523  s0->Send(outgoing);
524  outgoing.Reset();
525  s0->Recv(mess);
526  delete mess;
527  s0->Send("Finished");
528  }
529  else
530  {
531  s0->Send("UnknownHisto");
532  }
533  }
534  }
535  else if (mess->What() == kMESS_OBJECT)
536  {
537  printf("got object of class: %s\n", mess->GetClass()->GetName());
538  delete mess;
539  }
540  else
541  {
542  printf("*** Unexpected message ***\n");
543  delete mess;
544  }
545  }
546 
547  // Close the socket.
548  s0->Close();
549  return;
550 }
551 
552 int send_message(const int severity, const std::string &msg)
553 {
554  // check $ONLINE_MAIN/include/msg_profile.h for MSG defs
555  // if you do not find your subsystem, do not initialize it and drop me a line
558  severity, "pmonitorInterface");
559  std::cout << *Message << msg << std::endl;
560  delete Message;
561  return 0;
562 }