Analysis Software
Documentation for sPHENIX simulation software
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
rcdaqEventiterator.cc
Go to the documentation of this file. Or view the newest version in sPHENIX GitHub for file rcdaqEventiterator.cc
1 //
2 // rcdaqeventIterator mlp 4/19/1997
3 //
4 // this iterator reads events froma data file.
5 
6 #define MONITORINGPORT 9930
7 
8 #include "rcdaqEventiterator.h"
9 #include <stdio.h>
10 #include <iostream>
11 #include <stdlib.h>
12 
13 #include "oncsBuffer.h"
14 #include "gzbuffer.h"
15 #include "lzobuffer.h"
16 #include "Event.h"
17 
18 #include <stddef.h>
19 #include <string.h>
20 #include <sys/types.h>
21 #include <sys/stat.h>
22 #include <fcntl.h>
23 #include <unistd.h>
24 
25 
26 #include <netdb.h>
27 #include <netinet/in.h>
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 
31 
32 using namespace std;
33 
34 
35 // there are two similar constructors, one with just the
36 // filename, the other with an additional status value
37 // which is non-zero on return if anything goes wrong.
38 
40 {
41  if (_sockfd) close (_sockfd);
42  if (bp != NULL ) delete [] bp;
43  if (bptr != NULL ) delete bptr;
44 }
45 
46 
48 {
49  string host = "localhost";
50 
51  if ( getenv("RCDAQHOST") )
52  {
53  host = getenv("RCDAQHOST");
54  }
55 
56  int status;
57  setup (host.c_str(), status);
58 }
59 
61 {
62  int status;
63  setup (ip, status);
64 }
65 
67 {
68  setup (ip, status);
69 }
70 
71 
72 int rcdaqEventiterator::setup(const char *ip, int &status)
73 {
74  _defunct = 0;
75 
76 
77  struct hostent *p_host;
78  p_host = gethostbyname(ip);
79 
80  // std::cout << __FILE__ << " " << __LINE__ << " " << ip << " " << p_host->h_name << " " << p_host->h_addr << std::endl;
81 
82 
83  if ( ! p_host )
84  {
85  status = -2;
86  _defunct = 1;
87  return -2;
88  }
89 
90  // std::cout << p_host->h_name << " " << p_host->h_addr << std::endl;
91 
92  bptr = 0;
93  bp = 0;
94  allocatedsize = 0;
95  _theIP = p_host->h_name;
96  status = 0;
97  last_read_status = 0;
98  current_index = 0;
99 
100  memset((char *) &server, 0, sizeof(server));
101  server.sin_family = AF_INET;
102  bcopy(p_host->h_addr, &(server.sin_addr.s_addr), p_host->h_length);
103  server.sin_port = htons(MONITORINGPORT);
104 
105  return 0;
106 }
107 
109 {
110  os << getIdTag();
111  if ( _defunct ) os << " *** defunct";
112  os << std::endl;
113 
114 };
115 
116 const char * rcdaqEventiterator::getIdTag () const
117 {
118  static char line[180];
119  strcpy (line, " -- rcdaqEventiterator reading from ");
120  strcat (line, _theIP.c_str());
121  return line;
122 };
123 
124 
125 // and, finally, the only non-constructor member function to
126 // retrieve events from the iterator.
127 
129 {
130  if ( _defunct ) return 0;
131  Event *evt = 0;
132 
133  // if we had a read error before, we just return
134  if (last_read_status) return NULL;
135 
136  // see if we have a buffer to read
137  if (bptr == 0)
138  {
139  if ( (last_read_status = read_next_buffer()) !=0 )
140  {
141  return NULL;
142  }
143  }
144 
145  while (last_read_status == 0)
146  {
147  if (bptr) evt = bptr->getEvent();
148  if (evt) return evt;
149 
150  last_read_status = read_next_buffer();
151  }
152 
153  return NULL;
154 
155 }
156 
157 // -----------------------------------------------------
158 // this is a private function to read the next buffer
159 // if needed.
160 
162 {
163  if (bptr)
164  {
165  delete bptr;
166  bptr = 0;
167  }
168 
169  _sockfd = socket(AF_INET, SOCK_STREAM, 0);
170  if ( _sockfd < 0) return 0;
171 
172  if ( connect(_sockfd, (struct sockaddr*) &server, sizeof(server)) < 0 )
173  {
174  //std::cout << "error in connect" << std::endl;
175  close (_sockfd);
176  usleep(1000); // we just slow down a bit to limit the rate or retries
177  return 0;
178  }
179 
180  // say that this is our max size
181  int flag = htonl(64*1024*1024);
182 
183  int status = writen (_sockfd,(char *) &flag, 4);
184  if ( status < 0)
185  {
186  close (_sockfd);
187  return 0;
188  }
189 
190 
191  int sizetobesent;
192  status = readn (_sockfd, (char *) &sizetobesent, 4);
193  if ( status < 0)
194  {
195  close (_sockfd);
196  return 0;
197  }
198 
199  buffer_size = ntohl(sizetobesent);
200  int i;
201  if (bp)
202  {
203  if (buffer_size > allocatedsize*4)
204  {
205  delete [] bp;
206  i = (buffer_size +8191) /8192;
207  allocatedsize = i * 2048;
208  bp = new PHDWORD[allocatedsize];
209  }
210  }
211  else
212  {
213  i = (buffer_size +8191) /8192;
214  allocatedsize = i * BUFFERBLOCKSIZE/4;
215  bp = new PHDWORD[allocatedsize];
216 
217  }
218 
219  status = readn ( _sockfd, (char *) bp, buffer_size);
220  if ( status < 0)
221  {
222  close (_sockfd);
223  return 0;
224  }
225 
226  int ackvalue = htonl(101);
227  writen (_sockfd,(char *) &ackvalue, 4);
228  close (_sockfd);
229 
230  return buffer::makeBuffer( bp, allocatedsize, &bptr);
231 
232 }
233 
234 int rcdaqEventiterator::readn (int fd, char *ptr, int nbytes)
235 {
236  int nleft, nread;
237  nleft = nbytes;
238  while ( nleft>0 )
239  {
240  nread = recv (fd, ptr, nleft, MSG_NOSIGNAL);
241  if ( nread < 0 )
242  {
243  return nread;
244  }
245  else if (nread == 0)
246  break;
247  nleft -= nread;
248  ptr += nread;
249  }
250  return (nbytes-nleft);
251 }
252 
253 
254 int rcdaqEventiterator::writen (int fd, char *ptr, int nbytes)
255 {
256  int nleft, nwritten;
257  nleft = nbytes;
258  while ( nleft>0 )
259  {
260  nwritten = send (fd, ptr, nleft, MSG_NOSIGNAL);
261  if ( nwritten < 0 )
262  {
263  return nwritten;
264  }
265  nleft -= nwritten;
266  ptr += nwritten;
267  }
268  return (nbytes-nleft);
269 }