Analysis Software
Documentation for sPHENIX simulation software
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
daqBuffer.cc
Go to the documentation of this file. Or view the newest version in sPHENIX GitHub for file daqBuffer.cc
1 #include <daqBuffer.h>
2 
3 #include <daqONCSEvent.h>
4 #include <daqPRDFEvent.h>
5 
6 #include <signal.h>
7 #include <unistd.h>
8 #include <sys/types.h>
9 #include <sys/socket.h>
10 #include <sys/stat.h>
11 #include <string.h>
12 #include <lzo/lzo1x.h>
13 
14 using namespace std;
15 
17 
18 
19 int readn (int fd, char *ptr, const int nbytes);
20 int writen (int fd, char *ptr, const int nbytes);
21 
22 
23 // the constructor first ----------------
24 daqBuffer::daqBuffer (const int irun, const int length
25  , const int iseq, md5_state_t *md5state)
26 {
27  int *b = new int [length];
28  bptr = ( buffer_ptr ) b;
29 
30  data_ptr = &(bptr->data[0]);
31  max_length = length; // in 32bit units
32  max_size = max_length;
33  _broken = 0;
34 
35  current_event = 0;
36  current_etype = -1;
37 
38  // preset everything to ONCS format
40  currentBufferID = ONCSBUFFERHEADER;
41  _md5state = md5state;
42  wants_compression = 0;
43  wrkmem = 0;
44  outputarraylength = 0;
45  outputarray = 0;
46 
47  prepare_next (iseq, irun);
48 }
49 
50 
51 // the destructor... ----------------
53 {
54  int *b = (int *) bptr;
55  delete [] b;
56  if (outputarray) delete [] outputarray;
57  if (wrkmem) delete [] wrkmem;
58 }
59 
60 
61 
62 int daqBuffer::prepare_next( const int iseq
63  , const int irun)
64 {
65 
66  // cout << __FILE__ << " " << __LINE__ << " bptr: " << bptr << endl;
67 
68  if ( current_event) delete current_event;
69  current_event = 0;
70 
71  // re-initialize the event header length
72  bptr->Length = BUFFERHEADERLENGTH*4;
73  bptr->ID = currentBufferID;
74  bptr->Bufseq = iseq;
75  if (irun>0) bptr->Runnr = irun;
76 
77  current_index = 0;
78  left = max_size - BUFFERHEADERLENGTH - EOBLENGTH;
79  has_end = 0;
80 
81 
82  return 0;
83 }
84 
85 // ---------------------------------------------------------
86 int daqBuffer::nextEvent(const int etype, const int evtseq, const int evtsize)
87 {
88  if (current_event) delete current_event;
89  current_event = 0;
90  current_etype = -1;
91 
92  if (evtsize > left-EOBLENGTH) return -1;
93 
94  if ( format)
95  {
96  current_event = new daqPRDFEvent(&(bptr->data[current_index]), evtsize
97  ,bptr->Runnr, etype, evtseq);
98  left -= (EVTHEADERLENGTH + 8);
99  current_index += EVTHEADERLENGTH+8;
100  bptr->Length += (EVTHEADERLENGTH+8)*4;
101  }
102  else
103  {
104  current_event = new daqONCSEvent(&(bptr->data[current_index]), evtsize
105  ,bptr->Runnr, etype, evtseq);
107  current_index += EVTHEADERLENGTH;
108  bptr->Length += EVTHEADERLENGTH*4;
109  }
110 
111 
112  current_etype = etype;
113 
114  return 0;
115 }
116 
117 // ----------------------------------------------------------
119 {
120  unsigned int len;
121 
122  len = current_event->addSubevent(current_etype, dev);
123 
124  left -= len;
125  current_index += len;
126  bptr->Length += len*4;
127 
128  return len;
129 }
130 
131 // ----------------------------------------------------------
132 unsigned int daqBuffer::addEoB()
133 {
134  if (has_end) return -1;
135  bptr->data[current_index++] = 2;
136  bptr->data[current_index++] = 0;
137  bptr->Length += 2*4;
138 
139  has_end = 1;
140  if ( current_event) delete current_event;
141  current_event = 0;
142  return 0;
143 }
144 
145 // ----------------------------------------------------------
146 // int daqBuffer::transfer(dataProtocol * protocol)
147 // {
148 // if (protocol)
149 // return protocol->transfer((char *) bptr, bptr->Length);
150 // else
151 // return 0;
152 
153 // }
154 
155 unsigned int daqBuffer::writeout ( int fd)
156 {
157 
158  if ( _broken) return 0;
159  if (!has_end) addEoB();
160 
161  unsigned int bytes = 0;;
162 
163  if ( ! wants_compression)
164  {
165  int blockcount = ( getLength() + 8192 -1)/8192;
166  int bytecount = blockcount*8192;
167  bytes = writen ( fd, (char *) bptr , bytecount );
168  if ( _md5state)
169  {
170  //cout << __FILE__ << " " << __LINE__ << " updating md5 with " << bytes << " bytes" << endl;
171  md5_append(_md5state, (const md5_byte_t *)bptr,bytes );
172  }
173  return bytes;
174  }
175  else // we want compression
176  {
177  compress();
178  int blockcount = ( outputarray[0] + 8192 -1)/8192;
179  int bytecount = blockcount*8192;
180  bytes = writen ( fd, (char *) outputarray , bytecount );
181  if ( _md5state)
182  {
183  //cout << __FILE__ << " " << __LINE__ << " updating md5 with " << bytes << " bytes" << endl;
184  md5_append(_md5state, (const md5_byte_t *)outputarray,bytes );
185  }
186  return bytes;
187  }
188 }
189 
190 #define ACKVALUE 101
191 
192 unsigned int daqBuffer::sendout ( int fd )
193 {
194  if ( _broken) return 0;
195 
196  if (!has_end) addEoB();
197 
198  int total = getLength();
199 
200  //std::cout << __FILE__ << " " << __LINE__ << " sending opcode ctrl_data" << CTRL_DATA << std::endl ;
201  // send "CTRL_DATA" opcode in network byte ordering
202  int opcode = htonl(CTRL_DATA);
203  int status = writen(fd, (char *) &opcode, sizeof(int));
204 
205  // std::cout << __FILE__ << " " << __LINE__ << " sending buffer size " << total << std::endl ;
206 
207  // re-use variable to send the length in network byte ordering
208  opcode = htonl(total);
209  status |= writen(fd, (char *) &opcode, sizeof(int));
210 
211  // now send the actual data
212  char *p = (char *) bptr;
213  int sent = writen(fd,p,total);
214 
215  // wait for acknowledge... we re-use the opcode variable once more
216  readn (fd, (char *) &opcode, sizeof(int));
217  opcode = ntohl(opcode);
218  if ( opcode != CTRL_REMOTESUCCESS) return -1; // signal error
219 
220  return sent;
221 }
222 
223 // this is sending the monitoring data to a client
224 unsigned int daqBuffer::sendData ( int fd, const int max_length)
225 {
226  if ( _broken) return 0;
227 
228  if (!has_end) addEoB();
229 
230  int total = getLength();
231 
232  if ( total > max_length)
233  {
234  cout << "Monitoring: data size exceeds limit -- " << total << " limit: " << max_length << endl;
235  total = max_length;
236  }
237 
238  int ntotal = htonl(total);
239  int status = writen(fd, (char *) &ntotal, 4);
240 
241  char *p = (char *) bptr;
242  int sent = writen(fd,p,total);
243 
244  return sent;
245 }
246 
247 int daqBuffer::setCompression(const int flag)
248 {
249  if ( !flag)
250  {
251  wants_compression = 0;
252  return 0;
253  }
254  else
255  {
256  if ( ! lzo_initialized )
257  {
258  if (lzo_init() != LZO_E_OK)
259  {
260  std::cerr << "Could not initialize LZO" << std::endl;
261  _broken = 1;
262  }
263  lzo_initialized = 1;
264  }
265 
266  if ( !wrkmem)
267  {
268  wrkmem = (lzo_bytep) lzo_malloc(LZO1X_1_12_MEM_COMPRESS);
269  if (wrkmem)
270  {
271  memset(wrkmem, 0, LZO1X_1_12_MEM_COMPRESS);
272  }
273  else
274  {
275  std::cerr << "Could not allocate LZO memory" << std::endl;
276  _broken = 1;
277  return -1;
278  }
279  outputarraylength = max_length + 8192;
280  outputarray = new unsigned int[outputarraylength];
281  }
282  wants_compression = 1;
283  //cout << " LZO compression enabled" << endl;
284  return 0;
285  }
286 }
287 
289 {
290  if ( _broken) return -1;
291 
292  lzo_uint outputlength_in_bytes = outputarraylength*4-16;
293  lzo_uint in_len = getLength();
294 
295  lzo1x_1_12_compress( (lzo_byte *) bptr,
296  in_len,
297  (lzo_byte *)&outputarray[4],
298  &outputlength_in_bytes,wrkmem);
299 
300 
301  outputarray[0] = outputlength_in_bytes +4*BUFFERHEADERLENGTH;
302  outputarray[1] = LZO1XBUFFERMARKER;
303  outputarray[2] = bptr->Bufseq;
304  outputarray[3] = getLength();
305 
306  return 0;
307 }
308 
309 
310 // ----------------------------------------------------------
312 {
313  if (size < 0) return -1;
314  if (size == 0) max_size = max_length;
315  else
316  {
317  max_size = (size + 8191)/8192;
318  max_size *= 2048;
319 
320  if (max_size > max_length)
321  {
322  max_size = max_length;
323  return -2;
324  }
325  }
326  return 0;
327 }
328 
329 // ----------------------------------------------------------
331 {
332  return max_size*4;
333 }
334 
336  {
337  if (f)
338  {
340  currentBufferID = PRDFBUFFERHEADER;
341  }
342  else
343  {
345  currentBufferID = ONCSBUFFERHEADER;
346  }
347 
348  bptr->ID = currentBufferID;
349  return 0;
350  }
351 
352