ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/ermyth/src/datastream.C
Revision: 1.7
Committed: Sun Sep 16 18:54:44 2007 UTC (16 years, 8 months ago) by pippijn
Content type: text/plain
Branch: MAIN
CVS Tags: HEAD
Changes since 1.6: +10 -5 lines
Log Message:
#defines to enum

File Contents

# Content
1 /**
2 * datastream.C: Handling of streams and packet queues.
3 *
4 * Copyright © 2007 Pippijn van Steenhoven / The Ermyth Team
5 * Rights to this code are as documented in COPYING.
6 *
7 *
8 * Portions of this file were derived from sources bearing the following license:
9 * Rights to this code are documented in doc/pod/license.pod.
10 * Copyright © 2005-2006 Atheme Development Group
11 */
12
13 static char const rcsid[] = "$Id: datastream.C,v 1.6 2007-09-05 11:23:15 pippijn Exp $";
14
15 #include "atheme.h"
16 #include "datastream.h"
17 #include "connection.h"
18
19 void
20 sendq_add (connection_t *cptr, char *buf, int len)
21 {
22 packetqueue *sq;
23 int l;
24 int pos = 0;
25
26 if (cptr->flags & (CF_DEAD | CF_SEND_EOF))
27 {
28 slog (LG_DEBUG, "sendq_add(): attempted to send to fd %d which is already dead", cptr->fd);
29 return;
30 }
31
32 if (!cptr->sendq.empty ())
33 {
34 sq = cptr->sendq.back ();
35 l = SENDQSIZE - sq->firstfree;
36 if (l > len)
37 l = len;
38 memcpy (sq->buf + sq->firstfree, buf + pos, l);
39 sq->firstfree += l;
40 pos += l;
41 len -= l;
42 }
43
44 while (len > 0)
45 {
46 sq = new packetqueue;
47 l = SENDQSIZE - sq->firstfree;
48 if (l > len)
49 l = len;
50 memcpy (sq->buf + sq->firstfree, buf + pos, l);
51 sq->firstfree += l;
52 pos += l;
53 len -= l;
54 cptr->sendq.push_back (sq);
55 }
56 }
57
58 void
59 sendq_add_eof (connection_t *cptr)
60 {
61 if (cptr->flags & (CF_DEAD | CF_SEND_EOF))
62 {
63 slog (LG_DEBUG, "sendq_add(): attempted to send to fd %d which is already dead", cptr->fd);
64 return;
65 }
66 cptr->flags |= CF_SEND_EOF;
67 }
68
69 void
70 sendq_flush (connection_t *cptr)
71 {
72 packetqueue *sq;
73 int l;
74
75 while (!cptr->sendq.empty ())
76 {
77 sq = cptr->sendq.front ();
78
79 if (sq->firstused == sq->firstfree)
80 break;
81
82 if ((l = write (cptr->fd, sq->buf + sq->firstused, sq->firstfree - sq->firstused)) == -1)
83 {
84 if (errno != EAGAIN)
85 {
86 slog (LG_DEBUG, "sendq_flush(): write error %d (%s) on connection %s[%d]", errno, strerror (errno), cptr->name, cptr->fd);
87 cptr->flags |= CF_DEAD;
88 }
89 return;
90 }
91
92 sq->firstused += l;
93 if (sq->firstused == sq->firstfree)
94 {
95 if (cptr->sendq.size () > 1)
96 {
97 delete sq;
98 cptr->sendq.pop_front ();
99 }
100 else
101 /* keep one struct packetqueue */
102 sq->firstused = sq->firstfree = 0;
103 }
104 else
105 return;
106 }
107 if (cptr->flags & CF_SEND_EOF)
108 {
109 /* shut down write end, kill entire connection
110 * * only when the other side acknowledges -- jilles */
111 #ifdef SHUT_WR
112 shutdown (cptr->fd, SHUT_WR);
113 #else
114 shutdown (cptr->fd, 1);
115 #endif
116 cptr->flags |= CF_SEND_DEAD;
117 }
118 }
119
120 bool
121 sendq_nonempty (connection_t *cptr)
122 {
123 packetqueue *sq;
124
125 if (cptr->flags & CF_SEND_DEAD)
126 return false;
127 if (cptr->flags & CF_SEND_EOF)
128 return true;
129 if (cptr->sendq.empty ())
130 return false;
131 sq = cptr->sendq.front ();
132 return sq->firstfree > sq->firstused;
133 }
134
135 int
136 recvq_length (connection_t *cptr)
137 {
138 int l = 0;
139 connection_t::queue::iterator it = cptr->recvq.begin ();;
140 connection_t::queue::iterator et = cptr->recvq.end ();
141 packetqueue *sq;
142
143 while (it != et)
144 {
145 sq = *it;
146 l += sq->firstfree - sq->firstused;
147 ++it;
148 }
149
150 return l;
151 }
152
153 void
154 recvq_put (connection_t *cptr)
155 {
156 packetqueue *sq = NULL;
157 int l, ll;
158
159 if (cptr->flags & (CF_DEAD | CF_SEND_DEAD))
160 {
161 /* If CF_SEND_DEAD:
162 * The client closed the connection or sent some
163 * data we don't care about, be done with it.
164 * If CF_DEAD:
165 * Connection died earlier, be done with it now.
166 * -- jilles
167 */
168 errno = 0;
169 cptr->close ();
170 return;
171 }
172
173 if (!cptr->recvq.empty ())
174 {
175 sq = cptr->recvq.back ();
176 l = SENDQSIZE - sq->firstfree;
177 if (l == 0)
178 sq = NULL;
179 }
180 if (sq == NULL)
181 {
182 sq = new packetqueue;
183 sq->firstused = sq->firstfree = 0;
184 l = SENDQSIZE;
185 cptr->recvq.push_back (sq);
186 }
187 errno = 0;
188
189 l = read (cptr->fd, sq->buf + sq->firstfree, l);
190 if (l == 0 || (l < 0 && errno != EWOULDBLOCK && errno != EAGAIN && errno != EALREADY && errno != EINTR && errno != ENOBUFS))
191 {
192 if (l == 0)
193 slog (LG_INFO, "recvq_put(): fd %d closed the connection", cptr->fd);
194 else
195 slog (LG_INFO, "recvq_put(): lost connection on fd %d", cptr->fd);
196 cptr->close ();
197 return;
198 }
199 else if (l > 0)
200 sq->firstfree += l;
201
202 if (cptr->recvq_handler)
203 {
204 l = recvq_length (cptr);
205 do /* call handler until it consumes nothing */
206 {
207 cptr->recvq_handler (cptr);
208 ll = l;
209 l = recvq_length (cptr);
210 }
211 while (ll != l && l != 0);
212 }
213 return;
214 }
215
216 int
217 recvq_get (connection_t *cptr, char *buf, int len)
218 {
219 packetqueue *sq;
220 int l;
221 char *p = buf;
222
223 while (!cptr->recvq.empty ())
224 {
225 sq = cptr->recvq.front ();
226
227 if (sq->firstused == sq->firstfree || len <= 0)
228 break;
229
230 l = sq->firstfree - sq->firstused;
231 if (l > len)
232 l = len;
233 memcpy (p, sq->buf + sq->firstused, l);
234
235 p += l;
236 len -= l;
237 sq->firstused += l;
238 if (sq->firstused == sq->firstfree)
239 {
240 if (cptr->recvq.size () > 1)
241 {
242 delete sq;
243 cptr->recvq.pop_front ();
244 }
245 else
246 /* keep one struct packetqueue */
247 sq->firstused = sq->firstfree = 0;
248 }
249 else
250 return p - buf;
251 }
252 return p - buf;
253 }
254
255 int
256 recvq_getline (connection_t *cptr, char *buf, int len)
257 {
258 packetqueue *sq, *sq2 = NULL;
259 int l = 0;
260 char *p = buf;
261 char *newline = NULL;
262 connection_t::queue::iterator it = cptr->recvq.begin ();
263 connection_t::queue::iterator et = cptr->recvq.end ();
264
265 while (it != et)
266 {
267 sq2 = *it;
268 l += sq2->firstfree - sq2->firstused;
269 newline = static_cast < char *>(memchr (sq2->buf + sq2->firstused, '\n', sq2->firstfree - sq2->firstused));
270 if (newline != NULL || l >= len)
271 break;
272 ++it;
273 }
274
275 if (newline == NULL && l < len)
276 return 0;
277
278 cptr->flags |= CF_NONEWLINE;
279 while (!cptr->recvq.empty ())
280 {
281 sq = cptr->recvq.front ();
282
283 if (sq->firstused == sq->firstfree || len <= 0)
284 break;
285
286 l = sq->firstfree - sq->firstused;
287 if (l > len)
288 l = len;
289 if (sq == sq2 && l >= newline - sq->buf - sq->firstused + 1)
290 cptr->flags &= ~CF_NONEWLINE, l = newline - sq->buf - sq->firstused + 1;
291 memcpy (p, sq->buf + sq->firstused, l);
292
293 p += l;
294 len -= l;
295 sq->firstused += l;
296 if (sq->firstused == sq->firstfree)
297 {
298 if (cptr->recvq.size () > 1)
299 {
300 delete sq;
301 cptr->recvq.pop_front ();
302 }
303 else
304 /* keep one struct packetqueue */
305 sq->firstused = sq->firstfree = 0;
306 }
307 else
308 return p - buf;
309 }
310 return p - buf;
311 }
312
313 void
314 sendqrecvq_free (connection_t *cptr)
315 {
316 connection_t::queue::iterator first = cptr->recvq.begin ();
317 connection_t::queue::iterator last = cptr->recvq.end ();
318 while (first != last)
319 {
320 delete *first;
321 ++first;
322 }
323 cptr->recvq.clear ();
324
325 first = cptr->sendq.begin ();
326 last = cptr->sendq.end ();
327 while (first != last)
328 {
329 delete *first;
330 ++first;
331 }
332 cptr->sendq.clear ();
333 }