ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/ermyth/src/datastream.C
Revision: 1.5
Committed: Thu Aug 30 19:56:24 2007 UTC (16 years, 9 months ago) by pippijn
Content type: text/plain
Branch: MAIN
Changes since 1.4: +14 -14 lines
Log Message:
- put faultcodes into their own namespace
- removed old files
- limited header garbage in atheme.h
- macros to inline bools for connection_t::is_*
- put some connection_t functions into the connection_t class

File Contents

# Content
1 /*
2 * Copyright © 2005-2006 Atheme Development Group
3 * Rights to this code are documented in doc/pod/license.pod.
4 *
5 * Handling of streams and packet queues.
6 */
7
8 static char const rcsid[] = "$Id: datastream.C,v 1.4 2007-08-28 17:08:12 pippijn Exp $";
9
10 #include "atheme.h"
11 #include "datastream.h"
12 #include "connection.h"
13
14 void
15 sendq_add (connection_t *cptr, char *buf, int len)
16 {
17 packetqueue *sq;
18 int l;
19 int pos = 0;
20
21 if (cptr->flags & (connection_t::CF_DEAD | connection_t::CF_SEND_EOF))
22 {
23 slog (LG_DEBUG, "sendq_add(): attempted to send to fd %d which is already dead", cptr->fd);
24 return;
25 }
26
27 if (!cptr->sendq.empty ())
28 {
29 sq = cptr->sendq.back ();
30 l = SENDQSIZE - sq->firstfree;
31 if (l > len)
32 l = len;
33 memcpy (sq->buf + sq->firstfree, buf + pos, l);
34 sq->firstfree += l;
35 pos += l;
36 len -= l;
37 }
38
39 while (len > 0)
40 {
41 sq = new packetqueue;
42 l = SENDQSIZE - sq->firstfree;
43 if (l > len)
44 l = len;
45 memcpy (sq->buf + sq->firstfree, buf + pos, l);
46 sq->firstfree += l;
47 pos += l;
48 len -= l;
49 cptr->sendq.push_back (sq);
50 }
51 }
52
53 void
54 sendq_add_eof (connection_t *cptr)
55 {
56 if (cptr->flags & (connection_t::CF_DEAD | connection_t::CF_SEND_EOF))
57 {
58 slog (LG_DEBUG, "sendq_add(): attempted to send to fd %d which is already dead", cptr->fd);
59 return;
60 }
61 cptr->flags |= connection_t::CF_SEND_EOF;
62 }
63
64 void
65 sendq_flush (connection_t *cptr)
66 {
67 packetqueue *sq;
68 int l;
69
70 while (!cptr->sendq.empty ())
71 {
72 sq = cptr->sendq.front ();
73
74 if (sq->firstused == sq->firstfree)
75 break;
76
77 if ((l = write (cptr->fd, sq->buf + sq->firstused, sq->firstfree - sq->firstused)) == -1)
78 {
79 if (errno != EAGAIN)
80 {
81 slog (LG_DEBUG, "sendq_flush(): write error %d (%s) on connection %s[%d]", errno, strerror (errno), cptr->name, cptr->fd);
82 cptr->flags |= connection_t::CF_DEAD;
83 }
84 return;
85 }
86
87 sq->firstused += l;
88 if (sq->firstused == sq->firstfree)
89 {
90 if (cptr->sendq.size () > 1)
91 {
92 delete sq;
93 cptr->sendq.pop_front ();
94 }
95 else
96 /* keep one struct packetqueue */
97 sq->firstused = sq->firstfree = 0;
98 }
99 else
100 return;
101 }
102 if (cptr->flags & connection_t::CF_SEND_EOF)
103 {
104 /* shut down write end, kill entire connection
105 * * only when the other side acknowledges -- jilles */
106 #ifdef SHUT_WR
107 shutdown (cptr->fd, SHUT_WR);
108 #else
109 shutdown (cptr->fd, 1);
110 #endif
111 cptr->flags |= connection_t::CF_SEND_DEAD;
112 }
113 }
114
115 bool
116 sendq_nonempty (connection_t *cptr)
117 {
118 packetqueue *sq;
119
120 if (cptr->flags & connection_t::CF_SEND_DEAD)
121 return false;
122 if (cptr->flags & connection_t::CF_SEND_EOF)
123 return true;
124 if (cptr->sendq.empty ())
125 return false;
126 sq = cptr->sendq.front ();
127 return sq->firstfree > sq->firstused;
128 }
129
130 int
131 recvq_length (connection_t *cptr)
132 {
133 int l = 0;
134 connection_t::queue::iterator it = cptr->recvq.begin ();;
135 connection_t::queue::iterator et = cptr->recvq.end ();
136 packetqueue *sq;
137
138 while (it != et)
139 {
140 sq = *it;
141 l += sq->firstfree - sq->firstused;
142 ++it;
143 }
144
145 return l;
146 }
147
148 void
149 recvq_put (connection_t *cptr)
150 {
151 packetqueue *sq = NULL;
152 int l, ll;
153
154 if (cptr->flags & (connection_t::CF_DEAD | connection_t::CF_SEND_DEAD))
155 {
156 /* If CF_SEND_DEAD:
157 * The client closed the connection or sent some
158 * data we don't care about, be done with it.
159 * If CF_DEAD:
160 * Connection died earlier, be done with it now.
161 * -- jilles
162 */
163 errno = 0;
164 cptr->close ();
165 return;
166 }
167
168 if (!cptr->recvq.empty ())
169 {
170 sq = cptr->recvq.back ();
171 l = SENDQSIZE - sq->firstfree;
172 if (l == 0)
173 sq = NULL;
174 }
175 if (sq == NULL)
176 {
177 sq = new packetqueue;
178 sq->firstused = sq->firstfree = 0;
179 l = SENDQSIZE;
180 cptr->recvq.push_back (sq);
181 }
182 errno = 0;
183
184 l = read (cptr->fd, sq->buf + sq->firstfree, l);
185 if (l == 0 || (l < 0 && errno != EWOULDBLOCK && errno != EAGAIN && errno != EALREADY && errno != EINTR && errno != ENOBUFS))
186 {
187 if (l == 0)
188 slog (LG_INFO, "recvq_put(): fd %d closed the connection", cptr->fd);
189 else
190 slog (LG_INFO, "recvq_put(): lost connection on fd %d", cptr->fd);
191 cptr->close ();
192 return;
193 }
194 else if (l > 0)
195 sq->firstfree += l;
196
197 if (cptr->recvq_handler)
198 {
199 l = recvq_length (cptr);
200 do /* call handler until it consumes nothing */
201 {
202 cptr->recvq_handler (cptr);
203 ll = l;
204 l = recvq_length (cptr);
205 }
206 while (ll != l && l != 0);
207 }
208 return;
209 }
210
211 int
212 recvq_get (connection_t *cptr, char *buf, int len)
213 {
214 packetqueue *sq;
215 int l;
216 char *p = buf;
217
218 while (!cptr->recvq.empty ())
219 {
220 sq = cptr->recvq.front ();
221
222 if (sq->firstused == sq->firstfree || len <= 0)
223 break;
224
225 l = sq->firstfree - sq->firstused;
226 if (l > len)
227 l = len;
228 memcpy (p, sq->buf + sq->firstused, l);
229
230 p += l;
231 len -= l;
232 sq->firstused += l;
233 if (sq->firstused == sq->firstfree)
234 {
235 if (cptr->recvq.size () > 1)
236 {
237 delete sq;
238 cptr->recvq.pop_front ();
239 }
240 else
241 /* keep one struct packetqueue */
242 sq->firstused = sq->firstfree = 0;
243 }
244 else
245 return p - buf;
246 }
247 return p - buf;
248 }
249
250 int
251 recvq_getline (connection_t *cptr, char *buf, int len)
252 {
253 packetqueue *sq, *sq2 = NULL;
254 int l = 0;
255 char *p = buf;
256 char *newline = NULL;
257 connection_t::queue::iterator it = cptr->recvq.begin ();
258 connection_t::queue::iterator et = cptr->recvq.end ();
259
260 while (it != et)
261 {
262 sq2 = *it;
263 l += sq2->firstfree - sq2->firstused;
264 newline = static_cast < char *>(memchr (sq2->buf + sq2->firstused, '\n', sq2->firstfree - sq2->firstused));
265 if (newline != NULL || l >= len)
266 break;
267 ++it;
268 }
269
270 if (newline == NULL && l < len)
271 return 0;
272
273 cptr->flags |= connection_t::CF_NONEWLINE;
274 while (!cptr->recvq.empty ())
275 {
276 sq = cptr->recvq.front ();
277
278 if (sq->firstused == sq->firstfree || len <= 0)
279 break;
280
281 l = sq->firstfree - sq->firstused;
282 if (l > len)
283 l = len;
284 if (sq == sq2 && l >= newline - sq->buf - sq->firstused + 1)
285 cptr->flags &= ~connection_t::CF_NONEWLINE, l = newline - sq->buf - sq->firstused + 1;
286 memcpy (p, sq->buf + sq->firstused, l);
287
288 p += l;
289 len -= l;
290 sq->firstused += l;
291 if (sq->firstused == sq->firstfree)
292 {
293 if (cptr->recvq.size () > 1)
294 {
295 delete sq;
296 cptr->recvq.pop_front ();
297 }
298 else
299 /* keep one struct packetqueue */
300 sq->firstused = sq->firstfree = 0;
301 }
302 else
303 return p - buf;
304 }
305 return p - buf;
306 }
307
308 void
309 sendqrecvq_free (connection_t *cptr)
310 {
311 connection_t::queue::iterator first = cptr->recvq.begin ();
312 connection_t::queue::iterator last = cptr->recvq.end ();
313 while (first != last)
314 {
315 delete *first;
316 ++first;
317 }
318 cptr->recvq.clear ();
319
320 first = cptr->sendq.begin ();
321 last = cptr->sendq.end ();
322 while (first != last)
323 {
324 delete *first;
325 ++first;
326 }
327 cptr->sendq.clear ();
328 }