ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/ermyth/src/datastream.C
Revision: 1.1
Committed: Thu Jul 19 08:24:57 2007 UTC (16 years, 10 months ago) by pippijn
Content type: text/plain
Branch: MAIN
Log Message:
initial import. the most important changes since Atheme are:
- fixed many memory leaks
- fixed many bugs
- converted to C++ and use more STL containers
- added a (not very enhanced yet) perl module
- greatly improved XML-RPC speed
- added a JSON-RPC module with code from json-cpp
- added a valgrind memcheck module to operserv
- added a more object oriented base64 implementation
- added a specialised unit test framework
- improved stability
- use gettimeofday() if available
- reworked adding/removing commands
- MemoServ IGNORE DEL can now remove indices

File Contents

# Content
1 /*
2 * Copyright © 2005-2006 Atheme Development Group
3 * Rights to this code are documented in doc/LICENSE.
4 *
5 * Handling of streams and packet queues.
6 */
7
8 static char const rcsid[] = "$Id";
9
10 #include "atheme.h"
11 #include "datastream.h"
12 #include "connection.h"
13
14 void
15 sendq_add (connection_t *cptr, char *buf, int len)
16 {
17 packetqueue *sq;
18 int l;
19 int pos = 0;
20
21 return_if_fail (cptr != NULL);
22
23 if (cptr->flags & (CF_DEAD | CF_SEND_EOF))
24 {
25 slog (LG_DEBUG, "sendq_add(): attempted to send to fd %d which is already dead", cptr->fd);
26 return;
27 }
28
29 if (!cptr->sendq.empty ())
30 {
31 sq = cptr->sendq.back ();
32 l = SENDQSIZE - sq->firstfree;
33 if (l > len)
34 l = len;
35 memcpy (sq->buf + sq->firstfree, buf + pos, l);
36 sq->firstfree += l;
37 pos += l;
38 len -= l;
39 }
40
41 while (len > 0)
42 {
43 sq = new packetqueue;
44 l = SENDQSIZE - sq->firstfree;
45 if (l > len)
46 l = len;
47 memcpy (sq->buf + sq->firstfree, buf + pos, l);
48 sq->firstfree += l;
49 pos += l;
50 len -= l;
51 cptr->sendq.push_back (sq);
52 }
53 }
54
55 void
56 sendq_add_eof (connection_t *cptr)
57 {
58 return_if_fail (cptr != NULL);
59
60 if (cptr->flags & (CF_DEAD | CF_SEND_EOF))
61 {
62 slog (LG_DEBUG, "sendq_add(): attempted to send to fd %d which is already dead", cptr->fd);
63 return;
64 }
65 cptr->flags |= CF_SEND_EOF;
66 }
67
68 void
69 sendq_flush (connection_t *cptr)
70 {
71 packetqueue *sq;
72 int l;
73
74 return_if_fail (cptr != NULL);
75
76 while (!cptr->sendq.empty ())
77 {
78 sq = cptr->sendq.front ();
79
80 if (sq->firstused == sq->firstfree)
81 break;
82
83 if ((l = write (cptr->fd, sq->buf + sq->firstused, sq->firstfree - sq->firstused)) == -1)
84 {
85 if (errno != EAGAIN)
86 {
87 slog (LG_DEBUG, "sendq_flush(): write error %d (%s) on connection %s[%d]", errno, strerror (errno), cptr->name, cptr->fd);
88 cptr->flags |= CF_DEAD;
89 }
90 return;
91 }
92
93 sq->firstused += l;
94 if (sq->firstused == sq->firstfree)
95 {
96 if (cptr->sendq.size () > 1)
97 {
98 delete sq;
99 cptr->sendq.pop_front ();
100 }
101 else
102 /* keep one struct packetqueue */
103 sq->firstused = sq->firstfree = 0;
104 }
105 else
106 return;
107 }
108 if (cptr->flags & CF_SEND_EOF)
109 {
110 /* shut down write end, kill entire connection
111 * * only when the other side acknowledges -- jilles */
112 #ifdef SHUT_WR
113 shutdown (cptr->fd, SHUT_WR);
114 #else
115 shutdown (cptr->fd, 1);
116 #endif
117 cptr->flags |= CF_SEND_DEAD;
118 }
119 }
120
121 bool
122 sendq_nonempty (connection_t *cptr)
123 {
124 packetqueue *sq;
125
126 if (cptr->flags & CF_SEND_DEAD)
127 return false;
128 if (cptr->flags & CF_SEND_EOF)
129 return true;
130 if (cptr->sendq.empty ())
131 return false;
132 sq = cptr->sendq.front ();
133 return sq->firstfree > sq->firstused;
134 }
135
136 int
137 recvq_length (connection_t *cptr)
138 {
139 int l = 0;
140 connection_t::queue::iterator it, it_end;
141 packetqueue *sq;
142
143 it_end = cptr->recvq.end ();
144 for (it = cptr->recvq.begin (); it != it_end; ++it)
145 {
146 sq = *it;
147 l += sq->firstfree - sq->firstused;
148 }
149
150 return l;
151 }
152
153 void
154 recvq_put (connection_t *cptr)
155 {
156 packetqueue *sq = NULL;
157 int l, ll;
158
159 return_if_fail (cptr != NULL);
160
161 if (cptr->flags & (CF_DEAD | CF_SEND_DEAD))
162 {
163 /* If CF_SEND_DEAD:
164 * The client closed the connection or sent some
165 * data we don't care about, be done with it.
166 * If CF_DEAD:
167 * Connection died earlier, be done with it now.
168 * -- jilles
169 */
170 errno = 0;
171 connection_close (cptr);
172 return;
173 }
174
175 if (!cptr->recvq.empty ())
176 {
177 sq = cptr->recvq.back ();
178 l = SENDQSIZE - sq->firstfree;
179 if (l == 0)
180 sq = NULL;
181 }
182 if (sq == NULL)
183 {
184 sq = new packetqueue;
185 sq->firstused = sq->firstfree = 0;
186 l = SENDQSIZE;
187 cptr->recvq.push_back (sq);
188 }
189 errno = 0;
190
191 l = read (cptr->fd, sq->buf + sq->firstfree, l);
192 if (l == 0 || (l < 0 && errno != EWOULDBLOCK && errno != EAGAIN && errno != EALREADY && errno != EINTR && errno != ENOBUFS))
193 {
194 if (l == 0)
195 slog (LG_INFO, "recvq_put(): fd %d closed the connection", cptr->fd);
196 else
197 slog (LG_INFO, "recvq_put(): lost connection on fd %d", cptr->fd);
198 connection_close (cptr);
199 return;
200 }
201 else if (l > 0)
202 sq->firstfree += l;
203
204 if (cptr->recvq_handler)
205 {
206 l = recvq_length (cptr);
207 do /* call handler until it consumes nothing */
208 {
209 cptr->recvq_handler (cptr);
210 ll = l;
211 l = recvq_length (cptr);
212 }
213 while (ll != l && l != 0);
214 }
215 return;
216 }
217
218 int
219 recvq_get (connection_t *cptr, char *buf, int len)
220 {
221 packetqueue *sq;
222 int l;
223 char *p = buf;
224
225 return_val_if_fail (cptr != NULL, 0);
226
227 while (!cptr->recvq.empty ())
228 {
229 sq = cptr->recvq.front ();
230
231 if (sq->firstused == sq->firstfree || len <= 0)
232 break;
233
234 l = sq->firstfree - sq->firstused;
235 if (l > len)
236 l = len;
237 memcpy (p, sq->buf + sq->firstused, l);
238
239 p += l;
240 len -= l;
241 sq->firstused += l;
242 if (sq->firstused == sq->firstfree)
243 {
244 if (cptr->recvq.size () > 1)
245 {
246 delete sq;
247 cptr->recvq.pop_front ();
248 }
249 else
250 /* keep one struct packetqueue */
251 sq->firstused = sq->firstfree = 0;
252 }
253 else
254 return p - buf;
255 }
256 return p - buf;
257 }
258
259 int
260 recvq_getline (connection_t *cptr, char *buf, int len)
261 {
262 packetqueue *sq, *sq2 = NULL;
263 int l = 0;
264 char *p = buf;
265 char *newline = NULL;
266 connection_t::queue::iterator it, it_end;
267
268 return_val_if_fail (cptr != NULL, 0);
269
270 it_end = cptr->recvq.end ();
271 for (it = cptr->recvq.begin (); it != it_end; ++it)
272 {
273 sq2 = *it;
274 l += sq2->firstfree - sq2->firstused;
275 newline = static_cast < char *>(memchr (sq2->buf + sq2->firstused, '\n', sq2->firstfree - sq2->firstused));
276 if (newline != NULL || l >= len)
277 break;
278 }
279 if (newline == NULL && l < len)
280 return 0;
281
282 cptr->flags |= CF_NONEWLINE;
283 while (!cptr->recvq.empty ())
284 {
285 sq = cptr->recvq.front ();
286
287 if (sq->firstused == sq->firstfree || len <= 0)
288 break;
289
290 l = sq->firstfree - sq->firstused;
291 if (l > len)
292 l = len;
293 if (sq == sq2 && l >= newline - sq->buf - sq->firstused + 1)
294 cptr->flags &= ~CF_NONEWLINE, l = newline - sq->buf - sq->firstused + 1;
295 memcpy (p, sq->buf + sq->firstused, l);
296
297 p += l;
298 len -= l;
299 sq->firstused += l;
300 if (sq->firstused == sq->firstfree)
301 {
302 if (cptr->recvq.size () > 1)
303 {
304 delete sq;
305 cptr->recvq.pop_front ();
306 }
307 else
308 /* keep one struct packetqueue */
309 sq->firstused = sq->firstfree = 0;
310 }
311 else
312 return p - buf;
313 }
314 return p - buf;
315 }
316
317 void
318 sendqrecvq_free (connection_t *cptr)
319 {
320 while (!cptr->recvq.empty ())
321 {
322 delete cptr->recvq.front ();
323 cptr->recvq.pop_front ();
324 }
325
326 while (!cptr->sendq.empty ())
327 {
328 delete cptr->sendq.front ();
329 cptr->sendq.pop_front ();
330 }
331 }