ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/ermyth/src/datastream.C
Revision: 1.7
Committed: Sun Sep 16 18:54:44 2007 UTC (16 years, 8 months ago) by pippijn
Content type: text/plain
Branch: MAIN
CVS Tags: HEAD
Changes since 1.6: +10 -5 lines
Log Message:
#defines to enum

File Contents

# User Rev Content
1 pippijn 1.7 /**
2     * datastream.C: Handling of streams and packet queues.
3     *
4     * Copyright © 2007 Pippijn van Steenhoven / The Ermyth Team
5     * Rights to this code are as documented in COPYING.
6     *
7     *
8     * Portions of this file were derived from sources bearing the following license:
9     * Rights to this code are documented in doc/pod/license.pod.
10 pippijn 1.1 * Copyright © 2005-2006 Atheme Development Group
11     */
12    
13 pippijn 1.7 static char const rcsid[] = "$Id: datastream.C,v 1.6 2007-09-05 11:23:15 pippijn Exp $";
14 pippijn 1.1
15     #include "atheme.h"
16     #include "datastream.h"
17     #include "connection.h"
18    
19     void
20     sendq_add (connection_t *cptr, char *buf, int len)
21     {
22     packetqueue *sq;
23     int l;
24     int pos = 0;
25    
26 pippijn 1.6 if (cptr->flags & (CF_DEAD | CF_SEND_EOF))
27 pippijn 1.1 {
28     slog (LG_DEBUG, "sendq_add(): attempted to send to fd %d which is already dead", cptr->fd);
29     return;
30     }
31    
32     if (!cptr->sendq.empty ())
33     {
34     sq = cptr->sendq.back ();
35     l = SENDQSIZE - sq->firstfree;
36     if (l > len)
37     l = len;
38     memcpy (sq->buf + sq->firstfree, buf + pos, l);
39     sq->firstfree += l;
40     pos += l;
41     len -= l;
42     }
43    
44     while (len > 0)
45     {
46     sq = new packetqueue;
47     l = SENDQSIZE - sq->firstfree;
48     if (l > len)
49     l = len;
50     memcpy (sq->buf + sq->firstfree, buf + pos, l);
51     sq->firstfree += l;
52     pos += l;
53     len -= l;
54     cptr->sendq.push_back (sq);
55     }
56     }
57    
58     void
59     sendq_add_eof (connection_t *cptr)
60     {
61 pippijn 1.6 if (cptr->flags & (CF_DEAD | CF_SEND_EOF))
62 pippijn 1.1 {
63     slog (LG_DEBUG, "sendq_add(): attempted to send to fd %d which is already dead", cptr->fd);
64     return;
65     }
66 pippijn 1.6 cptr->flags |= CF_SEND_EOF;
67 pippijn 1.1 }
68    
69     void
70     sendq_flush (connection_t *cptr)
71     {
72     packetqueue *sq;
73     int l;
74    
75     while (!cptr->sendq.empty ())
76     {
77     sq = cptr->sendq.front ();
78    
79     if (sq->firstused == sq->firstfree)
80     break;
81    
82     if ((l = write (cptr->fd, sq->buf + sq->firstused, sq->firstfree - sq->firstused)) == -1)
83     {
84     if (errno != EAGAIN)
85     {
86     slog (LG_DEBUG, "sendq_flush(): write error %d (%s) on connection %s[%d]", errno, strerror (errno), cptr->name, cptr->fd);
87 pippijn 1.6 cptr->flags |= CF_DEAD;
88 pippijn 1.1 }
89     return;
90     }
91    
92     sq->firstused += l;
93     if (sq->firstused == sq->firstfree)
94     {
95     if (cptr->sendq.size () > 1)
96     {
97     delete sq;
98     cptr->sendq.pop_front ();
99     }
100     else
101     /* keep one struct packetqueue */
102     sq->firstused = sq->firstfree = 0;
103     }
104     else
105     return;
106     }
107 pippijn 1.6 if (cptr->flags & CF_SEND_EOF)
108 pippijn 1.1 {
109     /* shut down write end, kill entire connection
110     * * only when the other side acknowledges -- jilles */
111     #ifdef SHUT_WR
112     shutdown (cptr->fd, SHUT_WR);
113     #else
114     shutdown (cptr->fd, 1);
115     #endif
116 pippijn 1.6 cptr->flags |= CF_SEND_DEAD;
117 pippijn 1.1 }
118     }
119    
120     bool
121     sendq_nonempty (connection_t *cptr)
122     {
123     packetqueue *sq;
124    
125 pippijn 1.6 if (cptr->flags & CF_SEND_DEAD)
126 pippijn 1.1 return false;
127 pippijn 1.6 if (cptr->flags & CF_SEND_EOF)
128 pippijn 1.1 return true;
129     if (cptr->sendq.empty ())
130     return false;
131     sq = cptr->sendq.front ();
132     return sq->firstfree > sq->firstused;
133     }
134    
135     int
136     recvq_length (connection_t *cptr)
137     {
138     int l = 0;
139 pippijn 1.4 connection_t::queue::iterator it = cptr->recvq.begin ();;
140     connection_t::queue::iterator et = cptr->recvq.end ();
141 pippijn 1.1 packetqueue *sq;
142    
143 pippijn 1.4 while (it != et)
144 pippijn 1.1 {
145     sq = *it;
146     l += sq->firstfree - sq->firstused;
147 pippijn 1.4 ++it;
148 pippijn 1.1 }
149    
150     return l;
151     }
152    
153     void
154     recvq_put (connection_t *cptr)
155     {
156     packetqueue *sq = NULL;
157     int l, ll;
158    
159 pippijn 1.6 if (cptr->flags & (CF_DEAD | CF_SEND_DEAD))
160 pippijn 1.1 {
161     /* If CF_SEND_DEAD:
162     * The client closed the connection or sent some
163     * data we don't care about, be done with it.
164     * If CF_DEAD:
165     * Connection died earlier, be done with it now.
166     * -- jilles
167     */
168     errno = 0;
169 pippijn 1.5 cptr->close ();
170 pippijn 1.1 return;
171     }
172    
173     if (!cptr->recvq.empty ())
174     {
175     sq = cptr->recvq.back ();
176     l = SENDQSIZE - sq->firstfree;
177     if (l == 0)
178     sq = NULL;
179     }
180     if (sq == NULL)
181     {
182     sq = new packetqueue;
183     sq->firstused = sq->firstfree = 0;
184     l = SENDQSIZE;
185     cptr->recvq.push_back (sq);
186     }
187     errno = 0;
188    
189     l = read (cptr->fd, sq->buf + sq->firstfree, l);
190     if (l == 0 || (l < 0 && errno != EWOULDBLOCK && errno != EAGAIN && errno != EALREADY && errno != EINTR && errno != ENOBUFS))
191     {
192     if (l == 0)
193     slog (LG_INFO, "recvq_put(): fd %d closed the connection", cptr->fd);
194     else
195     slog (LG_INFO, "recvq_put(): lost connection on fd %d", cptr->fd);
196 pippijn 1.5 cptr->close ();
197 pippijn 1.1 return;
198     }
199     else if (l > 0)
200     sq->firstfree += l;
201    
202     if (cptr->recvq_handler)
203     {
204     l = recvq_length (cptr);
205     do /* call handler until it consumes nothing */
206     {
207     cptr->recvq_handler (cptr);
208     ll = l;
209     l = recvq_length (cptr);
210     }
211     while (ll != l && l != 0);
212     }
213     return;
214     }
215    
216     int
217     recvq_get (connection_t *cptr, char *buf, int len)
218     {
219     packetqueue *sq;
220     int l;
221     char *p = buf;
222    
223     while (!cptr->recvq.empty ())
224     {
225     sq = cptr->recvq.front ();
226    
227     if (sq->firstused == sq->firstfree || len <= 0)
228     break;
229    
230     l = sq->firstfree - sq->firstused;
231     if (l > len)
232     l = len;
233     memcpy (p, sq->buf + sq->firstused, l);
234    
235     p += l;
236     len -= l;
237     sq->firstused += l;
238     if (sq->firstused == sq->firstfree)
239     {
240     if (cptr->recvq.size () > 1)
241     {
242     delete sq;
243     cptr->recvq.pop_front ();
244     }
245     else
246     /* keep one struct packetqueue */
247     sq->firstused = sq->firstfree = 0;
248     }
249     else
250     return p - buf;
251     }
252     return p - buf;
253     }
254    
255     int
256     recvq_getline (connection_t *cptr, char *buf, int len)
257     {
258     packetqueue *sq, *sq2 = NULL;
259     int l = 0;
260     char *p = buf;
261     char *newline = NULL;
262 pippijn 1.4 connection_t::queue::iterator it = cptr->recvq.begin ();
263     connection_t::queue::iterator et = cptr->recvq.end ();
264 pippijn 1.1
265 pippijn 1.4 while (it != et)
266 pippijn 1.1 {
267     sq2 = *it;
268     l += sq2->firstfree - sq2->firstused;
269     newline = static_cast < char *>(memchr (sq2->buf + sq2->firstused, '\n', sq2->firstfree - sq2->firstused));
270     if (newline != NULL || l >= len)
271     break;
272 pippijn 1.4 ++it;
273 pippijn 1.1 }
274 pippijn 1.4
275 pippijn 1.1 if (newline == NULL && l < len)
276     return 0;
277    
278 pippijn 1.6 cptr->flags |= CF_NONEWLINE;
279 pippijn 1.1 while (!cptr->recvq.empty ())
280     {
281     sq = cptr->recvq.front ();
282    
283     if (sq->firstused == sq->firstfree || len <= 0)
284     break;
285    
286     l = sq->firstfree - sq->firstused;
287     if (l > len)
288     l = len;
289     if (sq == sq2 && l >= newline - sq->buf - sq->firstused + 1)
290 pippijn 1.6 cptr->flags &= ~CF_NONEWLINE, l = newline - sq->buf - sq->firstused + 1;
291 pippijn 1.1 memcpy (p, sq->buf + sq->firstused, l);
292    
293     p += l;
294     len -= l;
295     sq->firstused += l;
296     if (sq->firstused == sq->firstfree)
297     {
298     if (cptr->recvq.size () > 1)
299     {
300     delete sq;
301     cptr->recvq.pop_front ();
302     }
303     else
304     /* keep one struct packetqueue */
305     sq->firstused = sq->firstfree = 0;
306     }
307     else
308     return p - buf;
309     }
310     return p - buf;
311     }
312    
313     void
314     sendqrecvq_free (connection_t *cptr)
315     {
316 pippijn 1.4 connection_t::queue::iterator first = cptr->recvq.begin ();
317     connection_t::queue::iterator last = cptr->recvq.end ();
318     while (first != last)
319 pippijn 1.1 {
320 pippijn 1.4 delete *first;
321     ++first;
322 pippijn 1.1 }
323 pippijn 1.4 cptr->recvq.clear ();
324 pippijn 1.1
325 pippijn 1.4 first = cptr->sendq.begin ();
326     last = cptr->sendq.end ();
327     while (first != last)
328 pippijn 1.1 {
329 pippijn 1.4 delete *first;
330     ++first;
331 pippijn 1.1 }
332 pippijn 1.4 cptr->sendq.clear ();
333 pippijn 1.1 }