ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/ermyth/src/datastream.C
Revision: 1.6
Committed: Wed Sep 5 11:23:15 2007 UTC (16 years, 8 months ago) by pippijn
Content type: text/plain
Branch: MAIN
Changes since 1.5: +12 -12 lines
Log Message:
removed GPLed code and put license back to BSD

File Contents

# User Rev Content
1 pippijn 1.1 /*
2     * Copyright © 2005-2006 Atheme Development Group
3 pippijn 1.2 * Rights to this code are documented in doc/pod/license.pod.
4 pippijn 1.1 *
5     * Handling of streams and packet queues.
6     */
7    
8 pippijn 1.6 static char const rcsid[] = "$Id: datastream.C,v 1.5 2007-08-30 19:56:24 pippijn Exp $";
9 pippijn 1.1
10     #include "atheme.h"
11     #include "datastream.h"
12     #include "connection.h"
13    
14     void
15     sendq_add (connection_t *cptr, char *buf, int len)
16     {
17     packetqueue *sq;
18     int l;
19     int pos = 0;
20    
21 pippijn 1.6 if (cptr->flags & (CF_DEAD | CF_SEND_EOF))
22 pippijn 1.1 {
23     slog (LG_DEBUG, "sendq_add(): attempted to send to fd %d which is already dead", cptr->fd);
24     return;
25     }
26    
27     if (!cptr->sendq.empty ())
28     {
29     sq = cptr->sendq.back ();
30     l = SENDQSIZE - sq->firstfree;
31     if (l > len)
32     l = len;
33     memcpy (sq->buf + sq->firstfree, buf + pos, l);
34     sq->firstfree += l;
35     pos += l;
36     len -= l;
37     }
38    
39     while (len > 0)
40     {
41     sq = new packetqueue;
42     l = SENDQSIZE - sq->firstfree;
43     if (l > len)
44     l = len;
45     memcpy (sq->buf + sq->firstfree, buf + pos, l);
46     sq->firstfree += l;
47     pos += l;
48     len -= l;
49     cptr->sendq.push_back (sq);
50     }
51     }
52    
53     void
54     sendq_add_eof (connection_t *cptr)
55     {
56 pippijn 1.6 if (cptr->flags & (CF_DEAD | CF_SEND_EOF))
57 pippijn 1.1 {
58     slog (LG_DEBUG, "sendq_add(): attempted to send to fd %d which is already dead", cptr->fd);
59     return;
60     }
61 pippijn 1.6 cptr->flags |= CF_SEND_EOF;
62 pippijn 1.1 }
63    
64     void
65     sendq_flush (connection_t *cptr)
66     {
67     packetqueue *sq;
68     int l;
69    
70     while (!cptr->sendq.empty ())
71     {
72     sq = cptr->sendq.front ();
73    
74     if (sq->firstused == sq->firstfree)
75     break;
76    
77     if ((l = write (cptr->fd, sq->buf + sq->firstused, sq->firstfree - sq->firstused)) == -1)
78     {
79     if (errno != EAGAIN)
80     {
81     slog (LG_DEBUG, "sendq_flush(): write error %d (%s) on connection %s[%d]", errno, strerror (errno), cptr->name, cptr->fd);
82 pippijn 1.6 cptr->flags |= CF_DEAD;
83 pippijn 1.1 }
84     return;
85     }
86    
87     sq->firstused += l;
88     if (sq->firstused == sq->firstfree)
89     {
90     if (cptr->sendq.size () > 1)
91     {
92     delete sq;
93     cptr->sendq.pop_front ();
94     }
95     else
96     /* keep one struct packetqueue */
97     sq->firstused = sq->firstfree = 0;
98     }
99     else
100     return;
101     }
102 pippijn 1.6 if (cptr->flags & CF_SEND_EOF)
103 pippijn 1.1 {
104     /* shut down write end, kill entire connection
105     * * only when the other side acknowledges -- jilles */
106     #ifdef SHUT_WR
107     shutdown (cptr->fd, SHUT_WR);
108     #else
109     shutdown (cptr->fd, 1);
110     #endif
111 pippijn 1.6 cptr->flags |= CF_SEND_DEAD;
112 pippijn 1.1 }
113     }
114    
115     bool
116     sendq_nonempty (connection_t *cptr)
117     {
118     packetqueue *sq;
119    
120 pippijn 1.6 if (cptr->flags & CF_SEND_DEAD)
121 pippijn 1.1 return false;
122 pippijn 1.6 if (cptr->flags & CF_SEND_EOF)
123 pippijn 1.1 return true;
124     if (cptr->sendq.empty ())
125     return false;
126     sq = cptr->sendq.front ();
127     return sq->firstfree > sq->firstused;
128     }
129    
130     int
131     recvq_length (connection_t *cptr)
132     {
133     int l = 0;
134 pippijn 1.4 connection_t::queue::iterator it = cptr->recvq.begin ();;
135     connection_t::queue::iterator et = cptr->recvq.end ();
136 pippijn 1.1 packetqueue *sq;
137    
138 pippijn 1.4 while (it != et)
139 pippijn 1.1 {
140     sq = *it;
141     l += sq->firstfree - sq->firstused;
142 pippijn 1.4 ++it;
143 pippijn 1.1 }
144    
145     return l;
146     }
147    
148     void
149     recvq_put (connection_t *cptr)
150     {
151     packetqueue *sq = NULL;
152     int l, ll;
153    
154 pippijn 1.6 if (cptr->flags & (CF_DEAD | CF_SEND_DEAD))
155 pippijn 1.1 {
156     /* If CF_SEND_DEAD:
157     * The client closed the connection or sent some
158     * data we don't care about, be done with it.
159     * If CF_DEAD:
160     * Connection died earlier, be done with it now.
161     * -- jilles
162     */
163     errno = 0;
164 pippijn 1.5 cptr->close ();
165 pippijn 1.1 return;
166     }
167    
168     if (!cptr->recvq.empty ())
169     {
170     sq = cptr->recvq.back ();
171     l = SENDQSIZE - sq->firstfree;
172     if (l == 0)
173     sq = NULL;
174     }
175     if (sq == NULL)
176     {
177     sq = new packetqueue;
178     sq->firstused = sq->firstfree = 0;
179     l = SENDQSIZE;
180     cptr->recvq.push_back (sq);
181     }
182     errno = 0;
183    
184     l = read (cptr->fd, sq->buf + sq->firstfree, l);
185     if (l == 0 || (l < 0 && errno != EWOULDBLOCK && errno != EAGAIN && errno != EALREADY && errno != EINTR && errno != ENOBUFS))
186     {
187     if (l == 0)
188     slog (LG_INFO, "recvq_put(): fd %d closed the connection", cptr->fd);
189     else
190     slog (LG_INFO, "recvq_put(): lost connection on fd %d", cptr->fd);
191 pippijn 1.5 cptr->close ();
192 pippijn 1.1 return;
193     }
194     else if (l > 0)
195     sq->firstfree += l;
196    
197     if (cptr->recvq_handler)
198     {
199     l = recvq_length (cptr);
200     do /* call handler until it consumes nothing */
201     {
202     cptr->recvq_handler (cptr);
203     ll = l;
204     l = recvq_length (cptr);
205     }
206     while (ll != l && l != 0);
207     }
208     return;
209     }
210    
211     int
212     recvq_get (connection_t *cptr, char *buf, int len)
213     {
214     packetqueue *sq;
215     int l;
216     char *p = buf;
217    
218     while (!cptr->recvq.empty ())
219     {
220     sq = cptr->recvq.front ();
221    
222     if (sq->firstused == sq->firstfree || len <= 0)
223     break;
224    
225     l = sq->firstfree - sq->firstused;
226     if (l > len)
227     l = len;
228     memcpy (p, sq->buf + sq->firstused, l);
229    
230     p += l;
231     len -= l;
232     sq->firstused += l;
233     if (sq->firstused == sq->firstfree)
234     {
235     if (cptr->recvq.size () > 1)
236     {
237     delete sq;
238     cptr->recvq.pop_front ();
239     }
240     else
241     /* keep one struct packetqueue */
242     sq->firstused = sq->firstfree = 0;
243     }
244     else
245     return p - buf;
246     }
247     return p - buf;
248     }
249    
250     int
251     recvq_getline (connection_t *cptr, char *buf, int len)
252     {
253     packetqueue *sq, *sq2 = NULL;
254     int l = 0;
255     char *p = buf;
256     char *newline = NULL;
257 pippijn 1.4 connection_t::queue::iterator it = cptr->recvq.begin ();
258     connection_t::queue::iterator et = cptr->recvq.end ();
259 pippijn 1.1
260 pippijn 1.4 while (it != et)
261 pippijn 1.1 {
262     sq2 = *it;
263     l += sq2->firstfree - sq2->firstused;
264     newline = static_cast < char *>(memchr (sq2->buf + sq2->firstused, '\n', sq2->firstfree - sq2->firstused));
265     if (newline != NULL || l >= len)
266     break;
267 pippijn 1.4 ++it;
268 pippijn 1.1 }
269 pippijn 1.4
270 pippijn 1.1 if (newline == NULL && l < len)
271     return 0;
272    
273 pippijn 1.6 cptr->flags |= CF_NONEWLINE;
274 pippijn 1.1 while (!cptr->recvq.empty ())
275     {
276     sq = cptr->recvq.front ();
277    
278     if (sq->firstused == sq->firstfree || len <= 0)
279     break;
280    
281     l = sq->firstfree - sq->firstused;
282     if (l > len)
283     l = len;
284     if (sq == sq2 && l >= newline - sq->buf - sq->firstused + 1)
285 pippijn 1.6 cptr->flags &= ~CF_NONEWLINE, l = newline - sq->buf - sq->firstused + 1;
286 pippijn 1.1 memcpy (p, sq->buf + sq->firstused, l);
287    
288     p += l;
289     len -= l;
290     sq->firstused += l;
291     if (sq->firstused == sq->firstfree)
292     {
293     if (cptr->recvq.size () > 1)
294     {
295     delete sq;
296     cptr->recvq.pop_front ();
297     }
298     else
299     /* keep one struct packetqueue */
300     sq->firstused = sq->firstfree = 0;
301     }
302     else
303     return p - buf;
304     }
305     return p - buf;
306     }
307    
308     void
309     sendqrecvq_free (connection_t *cptr)
310     {
311 pippijn 1.4 connection_t::queue::iterator first = cptr->recvq.begin ();
312     connection_t::queue::iterator last = cptr->recvq.end ();
313     while (first != last)
314 pippijn 1.1 {
315 pippijn 1.4 delete *first;
316     ++first;
317 pippijn 1.1 }
318 pippijn 1.4 cptr->recvq.clear ();
319 pippijn 1.1
320 pippijn 1.4 first = cptr->sendq.begin ();
321     last = cptr->sendq.end ();
322     while (first != last)
323 pippijn 1.1 {
324 pippijn 1.4 delete *first;
325     ++first;
326 pippijn 1.1 }
327 pippijn 1.4 cptr->sendq.clear ();
328 pippijn 1.1 }