9 #include <sys/socket.h>
12 #include <lzo/lzo1x.h>
19 int readn (
int fd,
char *ptr,
const int nbytes);
20 int writen (
int fd,
char *ptr,
const int nbytes);
30 data_ptr = &(bptr->data[0]);
32 max_size = max_length;
42 wants_compression = 0;
44 outputarraylength = 0;
47 prepare_next (iseq, irun);
54 int *
b = (
int *) bptr;
56 if (outputarray)
delete [] outputarray;
57 if (wrkmem)
delete [] wrkmem;
68 if ( current_event)
delete current_event;
73 bptr->ID = currentBufferID;
75 if (irun>0) bptr->Runnr = irun;
88 if (current_event)
delete current_event;
96 current_event =
new daqPRDFEvent(&(bptr->data[current_index]), evtsize
97 ,bptr->Runnr, etype, evtseq);
104 current_event =
new daqONCSEvent(&(bptr->data[current_index]), evtsize
105 ,bptr->Runnr, etype, evtseq);
112 current_etype = etype;
122 len = current_event->addSubevent(current_etype, dev);
125 current_index += len;
126 bptr->Length += len*4;
134 if (has_end)
return -1;
135 bptr->data[current_index++] = 2;
136 bptr->data[current_index++] = 0;
140 if ( current_event)
delete current_event;
158 if ( _broken)
return 0;
159 if (!has_end) addEoB();
161 unsigned int bytes = 0;;
163 if ( ! wants_compression)
165 int blockcount = ( getLength() + 8192 -1)/8192;
166 int bytecount = blockcount*8192;
167 bytes =
writen ( fd, (
char *) bptr , bytecount );
178 int blockcount = ( outputarray[0] + 8192 -1)/8192;
179 int bytecount = blockcount*8192;
180 bytes =
writen ( fd, (
char *) outputarray , bytecount );
194 if ( _broken)
return 0;
196 if (!has_end) addEoB();
198 int total = getLength();
208 opcode = htonl(total);
209 status |=
writen(fd, (
char *) &opcode,
sizeof(
int));
212 char *
p = (
char *) bptr;
213 int sent =
writen(fd,p,total);
216 readn (fd, (
char *) &opcode,
sizeof(
int));
217 opcode = ntohl(opcode);
226 if ( _broken)
return 0;
228 if (!has_end) addEoB();
230 int total = getLength();
232 if ( total > max_length)
234 cout <<
"Monitoring: data size exceeds limit -- " << total <<
" limit: " << max_length << endl;
238 int ntotal = htonl(total);
241 char *
p = (
char *) bptr;
242 int sent =
writen(fd,p,total);
251 wants_compression = 0;
256 if ( ! lzo_initialized )
258 if (lzo_init() != LZO_E_OK)
260 std::cerr <<
"Could not initialize LZO" << std::endl;
268 wrkmem = (lzo_bytep) lzo_malloc(LZO1X_1_12_MEM_COMPRESS);
271 memset(wrkmem, 0, LZO1X_1_12_MEM_COMPRESS);
275 std::cerr <<
"Could not allocate LZO memory" << std::endl;
279 outputarraylength = max_length + 8192;
280 outputarray =
new unsigned int[outputarraylength];
282 wants_compression = 1;
290 if ( _broken)
return -1;
292 lzo_uint outputlength_in_bytes = outputarraylength*4-16;
293 lzo_uint in_len = getLength();
295 lzo1x_1_12_compress( (lzo_byte *) bptr,
297 (lzo_byte *)&outputarray[4],
298 &outputlength_in_bytes,wrkmem);
303 outputarray[2] = bptr->Bufseq;
304 outputarray[3] = getLength();
313 if (size < 0)
return -1;
314 if (size == 0) max_size = max_length;
317 max_size = (size + 8191)/8192;
320 if (max_size > max_length)
322 max_size = max_length;
348 bptr->ID = currentBufferID;