Analysis Software
Documentation for sPHENIX simulation software
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
rcdaq_mg_server.cc
Go to the documentation of this file. Or view the newest version in sPHENIX GitHub for file rcdaq_mg_server.cc
1 
2 #include "mongoose.h"
3 #include "rcdaq.h"
4 
5 #include <iostream>
6 #include <sstream>
7 #include <sys/stat.h>
8 
9 using namespace std;
10 
11 int update(struct mg_connection *nc, const char *key, const double value);
12 int update(struct mg_connection *nc, const char *key, const int value);
13 int update(struct mg_connection *nc, const char *key, const char *value);
14 int trigger_updates(struct mg_connection *nc);
15 
16 void initial_ws_update (struct mg_connection *nc);
17 void send_ws_updates (struct mg_connection *nc);
18 void send_updates (struct mg_connection *nc);
19 void send_status (struct mg_connection *nc, std::string out);
20 void send_error (struct mg_connection *nc, std::string out);
21 void send_nothing (struct mg_connection *nc);
22 void * mg_server (void *arg);
23 
24 static void broadcast(struct mg_connection *nc, char *str, const int len);
25 
26 
28 
30 
31 pthread_mutex_t M_ws_send;
32 
33 int error_flag = 0;
35 
36 static int last_runstate;
37 static int last_runnumber;
38 static int last_eventnumber;
39 static double last_runvolume;
40 static int last_runduration;
41 static int last_openflag;
42 static int last_serverflag;
44 static int speed_request_flag = 0;
45 
46 static int speed_update_interval =9;
47 
48 
49 
50 int mg_end()
51 {
52  end_web_thread = 1;
53  return 0;
54 }
55 
56 int request_mg_update (const int what)
57 {
58  switch (what)
59  {
60  case MG_REQUEST_NAME:
62  return 0;
63  break;
64 
65  case MG_REQUEST_SPEED:
67  return 0;
68  break;
69 
70  default:
71  break;
72  }
73  return -1;
74 }
75 
76 
77 // static int is_websocket(const struct mg_connection *nc)
78 // {
79 // return nc->flags & MG_F_IS_WEBSOCKET;
80 // }
81 
83 {
84  stringstream out;
85 
86  if ( error_flag)
87  {
88  return error_string;
89  }
90  else if ( daq_running() )
91  {
92  out << "Running for " << get_runduration() << "s" << ends;
93  }
94  else
95  {
96  out << "Stopped Run " << get_oldrunnumber() << ends;
97  }
98  return out.str();
99 }
100 
102 {
103  stringstream out;
104  if ( get_openflag())
105  {
106  if ( daq_running() )
107  {
108  if ( get_serverflag() )
109  {
110  out << "File on server: " << get_current_filename();
111  return out.str();
112  }
113  else
114  {
115  out << "File: " << get_current_filename();
116  return out.str();
117  }
118  }
119  else
120  {
121  if ( get_serverflag() )
122  {
123  return "Logging enabled (Server)";
124  }
125  else
126  {
127  return "Logging enabled";
128  }
129  }
130  }
131  return "Logging disabled";
132 }
133 
134 
136 {
137  char str[2048];
138  int len;
139 
140  int openvalue = get_openflag() | get_serverflag();
141 
142  len = sprintf(str, "{ \"RunFlag\":%d, \"Status\":\"%s\" , \"RunNr\":%d , \"Events\":%d , \"Volume\":\"%f\", \"Duration\":%d, \"Logging\":\"%s\" ,\"Filename\":\"%s \" , \"OpenFlag\":%d , \"Name\":\"%s\" } "
143  , daq_running()
144  , get_statusstring().c_str()
145  , get_runnumber()
146  , get_eventnumber()
147  , get_runvolume()
148  , get_runduration()
149  , get_loggingstring().c_str()
150  , get_current_filename().c_str()
151  , openvalue
152  , daq_get_myname().c_str()
153  );
154  // cout << __FILE__ << " " << __LINE__ << " " << str << endl;
155 
157 // broadcast(nc, str, len);
158 
159 }
160 
161 static void broadcast(struct mg_connection *nc, char *str, const int len)
162 {
163  struct mg_connection *c;
164 
165  for (c = mg_next(nc->mgr, NULL); c != NULL; c = mg_next(nc->mgr, c))
166  {
167  if ( c != nc)
168  {
169  // cout << __FILE__ << " " << __LINE__ << " sending " << str << " to " << c << endl;
170 
171  // pthread_mutex_lock(&M_ws_send);
173  //pthread_mutex_unlock(&M_ws_send);
174  }
175  }
176 }
177 
178 
179 int update(struct mg_connection *nc, const char *key, const double value)
180 {
181  char str[512];
182  int len = sprintf(str, "{ \"%s\":%f }", key, value);
183 
184  broadcast(nc, str, len);
185  return 0;
186 }
187 
188 int update(struct mg_connection *nc, const char *key, const int value)
189 {
190  char str[512];
191  int len = sprintf(str, "{ \"%s\":%d }", key, value);
192 
193  broadcast(nc, str, len);
194  return 0;
195 }
196 
197 int update(struct mg_connection *nc, const char *key, const char *value)
198 {
199  char str[512];
200  int len = sprintf(str, "{ \"%s\":\"%s\" }", key, value);
201 
202  broadcast(nc, str, len);
203  return 0;
204 }
205 
207 {
208  if ( daq_running() )
209  {
210  // cout << __FILE__ << " " << __LINE__ << " in send_ws_updates" << endl;
211  update(nc, "Volume", get_runvolume());
212  update(nc, "Events", get_eventnumber());
213  update(nc, "Status", get_statusstring().c_str() );
214  }
215 
216 }
217 
218 void send_updates (struct mg_connection *nc)
219 {
220  char str[512];
221  int len;
222 
223 
224  len = sprintf(str, "{ \"Events\":%d , \"Volume\":\"%f\" , \"Status\":\"%s\" }"
225  , get_eventnumber()
226  , get_runvolume()
227  , get_statusstring().c_str()
228  );
229 
230  //cout << __FILE__ << " " << __LINE__ << " sending " << str << endl;
231  nc->flags |= MG_F_SEND_AND_CLOSE;
232  mg_printf(nc, "HTTP/1.0 200 OK\r\nContent-Length: %d\r\n"
233  "Content-Type: application/json\r\n\r\n%s",
234  len, str);
235 
236 }
237 
239 {
240 
241  // we eliminate the line break
242  ostringstream x;
243  x<< endl;
244 
245  out.replace(out.find(x.str()),x.str().length(),"");
246  //cout << __FILE__ << " " << __LINE__ << " " << out << endl;
247 
248 
249 
250  char str[512];
251  int len;
252  len = sprintf(str, "{ \"Status\":\"%s\" }"
253  , out.c_str() );
254 
255  // pthread_mutex_lock(&M_ws_send);
256  nc->flags |= MG_F_SEND_AND_CLOSE;
257  mg_printf(nc, "HTTP/1.0 200 OK\r\nContent-Length: %d\r\n"
258  "Content-Type: application/json\r\n\r\n%s",
259  len, str);
260  // pthread_mutex_unlock(&M_ws_send);
261 
262 }
263 
265 {
266 
267  // we eliminate the line break
268  ostringstream x;
269  x<< endl;
270 
271  out.replace(out.find(x.str()),x.str().length(),"");
272  //cout << __FILE__ << " " << __LINE__ << " " << out << endl;
273 
274 
275 
276  // pthread_mutex_lock(&M_ws_send);
277  nc->flags |= MG_F_SEND_AND_CLOSE;
278  mg_printf(nc, "{ \"Status\":\"%s\" }", out.c_str());
279  // pthread_mutex_unlock(&M_ws_send);
280 
281 }
282 
283 void send_nothing (struct mg_connection *nc)
284 {
285  // pthread_mutex_lock(&M_ws_send);
286  // printf(" sending HTTP/1.0 200 OK\r\nContent-Length: 0\r\nContent-Type: text/html\r\n\r\n");
287  nc->flags |= MG_F_SEND_AND_CLOSE;
288  mg_printf(nc, "HTTP/1.0 200 OK\r\nContent-Length: 0\r\n"
289  "Content-Type: text/html\r\n\r\n");
290  //pthread_mutex_unlock(&M_ws_send);
291 
292 }
293 
294 
295 
296 //int has_request=0;
297 
298 static void ev_handler(struct mg_connection *nc, int ev, void *ev_data)
299 {
300  struct http_message *hm = (struct http_message *) ev_data;
301  struct websocket_message *wm = (struct websocket_message *)ev_data;
302  int status;
303 
304  std::ostringstream out;
305 
306  // if (ev) printf (" ---- %d\n" ,ev);
307  // printf (" --bit 20 -- %lx\n" , nc->flags & MG_F_USER_1);
308  switch (ev)
309  {
311  {
312  /* New websocket connection. Tell everybody. */
313  nc->flags |= MG_F_USER_1;
314  break;
315  }
316 
318  {
319  struct mg_str msg = {(char *) wm->data, wm->size};
320  //cout << __FILE__ << " " << __LINE__ << " ws message " << wm->data << endl;
321 
322  if ( mg_vcmp ( &msg, "daq_begin") == 0)
323  {
324 
325  status = daq_begin(0,out);
326  if (status)
327  {
328  error_flag = 1;
329  error_string = out.str();
330  error_string.replace(error_string.find("\n"),1,"");
331 
332  }
333  else
334  {
335  error_flag = 0;
336  error_string = "";
337  }
338 
339  return;
340  }
341 
342  else if ( mg_vcmp ( &msg, "daq_end") == 0)
343  {
344  status = daq_end(out);
345  if (status)
346  {
347  send_error(nc,out.str() );
348  }
349  return;
350  }
351 
352  else if ( mg_vcmp ( &msg, "daq_open") == 0)
353  {
354  // cout << __FILE__ << " " << __LINE__ << " daq_open request" << endl;
355  daq_open();
356  return;
357  }
358 
359  else if ( mg_vcmp ( &msg, "daq_close") == 0)
360  {
361  // cout << __FILE__ << " " << __LINE__ << " daq_open request" << endl;
362  daq_close();
363  return;
364  }
365 
366  else if ( mg_vcmp ( &msg, "initial_update") == 0)
367  {
368  //cout << __FILE__ << " " << __LINE__ << "sending initial update" << endl;
369  initial_ws_update(nc);
370  return;
371  }
372 
373  break;
374  }
375 
376  case MG_EV_HTTP_REQUEST:
377  {
378  //cout << __FILE__ << " " << __LINE__ << " connection: " << nc << " uri: " << hm->uri.p << endl;
379 
380  if ( mg_vcmp ( &hm->uri, "/send_updates") == 0)
381  {
382  send_updates(nc);
383  return;
384  }
385 
386  else if ( mg_vcmp ( &hm->uri, "/daq_begin") == 0)
387  {
388  status = daq_begin(0,out);
389  if (status)
390  {
391  send_status(nc,out.str() );
392  }
393  else
394  {
395  send_nothing(nc);
396  }
397  return;
398  }
399 
400  else if ( mg_vcmp ( &hm->uri, "/daq_end") == 0)
401  {
402  status = daq_end(out);
403  if (status)
404  {
405  send_status(nc,out.str() );
406  }
407  else
408  {
409  send_nothing(nc);
410 
411  }
412  return;
413  }
414 
415  else if ( mg_vcmp ( &hm->uri, "/daq_open") == 0)
416  {
417  // cout << __FILE__ << " " << __LINE__ << " daq_open request" << endl;
418  daq_open();
419  send_nothing(nc);
420  return;
421  }
422 
423  else if ( mg_vcmp ( &hm->uri, "/daq_close") == 0)
424  {
425  // cout << __FILE__ << " " << __LINE__ << " daq_open request" << endl;
426  daq_close();
427  send_nothing(nc);
428  return;
429  }
430 
432  break;
433  }
434  case MG_EV_CLOSE:
435  {
436  // nc->flags |= MG_F_USER_1;
437  //has_request=0;
438  break;
439  }
440  default:
441  break;
442  }
443 
444 
445 }
446 
447 
448 
449 
451 {
452  if ( error_flag)
453  {
454  update(nc, "Status", get_statusstring().c_str());
455  error_flag = 0;
456  }
457 
458  if ( last_runstate != daq_running() )
459  {
461  update(nc, "LINE", __LINE__);
462  update(nc, "RunFlag", daq_running());
463  update(nc, "Status", get_statusstring().c_str());
464  update(nc, "RunNr", get_runnumber());
465  update(nc, "Logging", get_loggingstring().c_str());
466  }
467 
468  if ( last_openflag != get_openflag() )
469  {
471  update(nc, "Logging", get_loggingstring().c_str());
472  update(nc, "OpenFlag", get_openflag());
473  }
474  if ( last_serverflag != get_serverflag() )
475  {
477  update(nc, "Logging", get_loggingstring().c_str());
478  update(nc, "ServerFlag", get_serverflag());
479  }
480 
482  {
484  update(nc, "Name", daq_get_myname().c_str());
485  }
486 
487  if ( speed_request_flag )
488  {
489  speed_request_flag = 0;
490  update(nc, "MBps", daq_get_mb_per_second());
491  update(nc, "Evtps", daq_get_events_per_second());
492  }
493 
494  return 0;
495 }
496 
497 void * mg_server (void *arg)
498 {
499 
500  end_web_thread = 0;
501  struct mg_mgr mgr;
502  struct mg_connection *nc;
503 
504  int port = (int) *(int *)arg;
505  stringstream portstring;
506  portstring << port << ends;
507 
508  pthread_mutex_init( &M_ws_send, 0);
509 
510 
519  speed_request_flag = 0;
520 
521  // last_current_filename;
522 
523 
524  mg_mgr_init(&mgr, NULL);
525  nc = mg_bind(&mgr, portstring.str().c_str(), ev_handler);
526  if (nc == NULL)
527  {
528  cerr << __FILE__ << " " << __LINE__ << " Error starting server on port " << port << endl;
529  return 0;
530  }
531  // cout << __FILE__ << " " << __LINE__ << " web server started on port " << port << endl;
532 
535  s_http_server_opts.index_files = "control.html";
537 
538 
539  time_t last_time = time(0);
540  time_t last_time_for_speed = last_time;
541 
542  while(!end_web_thread)
543  {
544  mg_mgr_poll(&mgr, 1000);
545 
546  if ( time(0) - last_time_for_speed > speed_update_interval)
547  {
548  if (daq_running() ) speed_request_flag = 1;
549  last_time_for_speed = time(0);
550  }
551 
552  trigger_updates(nc);
553 
554  if ( time(0) - last_time > 1)
555  {
556  send_ws_updates(nc);
557  last_time = time(0);
558  }
559  }
560  // cout << __FILE__ << " " << __LINE__ << " web server is ending" << endl;
561 
562  return 0;
563 }