/** * datastream.C: Handling of streams and packet queues. * * Copyright © 2007 Pippijn van Steenhoven / The Ermyth Team * Rights to this code are as documented in COPYING. * * * Portions of this file were derived from sources bearing the following license: * Rights to this code are documented in doc/pod/license.pod. * Copyright © 2005-2006 Atheme Development Group */ static char const rcsid[] = "$Id: datastream.C,v 1.7 2007/09/16 18:54:44 pippijn Exp $"; #include "atheme.h" #include "datastream.h" #include "connection.h" void sendq_add (connection_t *cptr, char *buf, int len) { packetqueue *sq; int l; int pos = 0; if (cptr->flags & (CF_DEAD | CF_SEND_EOF)) { slog (LG_DEBUG, "sendq_add(): attempted to send to fd %d which is already dead", cptr->fd); return; } if (!cptr->sendq.empty ()) { sq = cptr->sendq.back (); l = SENDQSIZE - sq->firstfree; if (l > len) l = len; memcpy (sq->buf + sq->firstfree, buf + pos, l); sq->firstfree += l; pos += l; len -= l; } while (len > 0) { sq = new packetqueue; l = SENDQSIZE - sq->firstfree; if (l > len) l = len; memcpy (sq->buf + sq->firstfree, buf + pos, l); sq->firstfree += l; pos += l; len -= l; cptr->sendq.push_back (sq); } } void sendq_add_eof (connection_t *cptr) { if (cptr->flags & (CF_DEAD | CF_SEND_EOF)) { slog (LG_DEBUG, "sendq_add(): attempted to send to fd %d which is already dead", cptr->fd); return; } cptr->flags |= CF_SEND_EOF; } void sendq_flush (connection_t *cptr) { packetqueue *sq; int l; while (!cptr->sendq.empty ()) { sq = cptr->sendq.front (); if (sq->firstused == sq->firstfree) break; if ((l = write (cptr->fd, sq->buf + sq->firstused, sq->firstfree - sq->firstused)) == -1) { if (errno != EAGAIN) { slog (LG_DEBUG, "sendq_flush(): write error %d (%s) on connection %s[%d]", errno, strerror (errno), cptr->name, cptr->fd); cptr->flags |= CF_DEAD; } return; } sq->firstused += l; if (sq->firstused == sq->firstfree) { if (cptr->sendq.size () > 1) { delete sq; cptr->sendq.pop_front (); } else /* keep one struct packetqueue */ sq->firstused = sq->firstfree = 0; } else return; } if (cptr->flags & CF_SEND_EOF) { /* shut down write end, kill entire connection * * only when the other side acknowledges -- jilles */ #ifdef SHUT_WR shutdown (cptr->fd, SHUT_WR); #else shutdown (cptr->fd, 1); #endif cptr->flags |= CF_SEND_DEAD; } } bool sendq_nonempty (connection_t *cptr) { packetqueue *sq; if (cptr->flags & CF_SEND_DEAD) return false; if (cptr->flags & CF_SEND_EOF) return true; if (cptr->sendq.empty ()) return false; sq = cptr->sendq.front (); return sq->firstfree > sq->firstused; } int recvq_length (connection_t *cptr) { int l = 0; connection_t::queue::iterator it = cptr->recvq.begin ();; connection_t::queue::iterator et = cptr->recvq.end (); packetqueue *sq; while (it != et) { sq = *it; l += sq->firstfree - sq->firstused; ++it; } return l; } void recvq_put (connection_t *cptr) { packetqueue *sq = NULL; int l, ll; if (cptr->flags & (CF_DEAD | CF_SEND_DEAD)) { /* If CF_SEND_DEAD: * The client closed the connection or sent some * data we don't care about, be done with it. * If CF_DEAD: * Connection died earlier, be done with it now. * -- jilles */ errno = 0; cptr->close (); return; } if (!cptr->recvq.empty ()) { sq = cptr->recvq.back (); l = SENDQSIZE - sq->firstfree; if (l == 0) sq = NULL; } if (sq == NULL) { sq = new packetqueue; sq->firstused = sq->firstfree = 0; l = SENDQSIZE; cptr->recvq.push_back (sq); } errno = 0; l = read (cptr->fd, sq->buf + sq->firstfree, l); if (l == 0 || (l < 0 && errno != EWOULDBLOCK && errno != EAGAIN && errno != EALREADY && errno != EINTR && errno != ENOBUFS)) { if (l == 0) slog (LG_INFO, "recvq_put(): fd %d closed the connection", cptr->fd); else slog (LG_INFO, "recvq_put(): lost connection on fd %d", cptr->fd); cptr->close (); return; } else if (l > 0) sq->firstfree += l; if (cptr->recvq_handler) { l = recvq_length (cptr); do /* call handler until it consumes nothing */ { cptr->recvq_handler (cptr); ll = l; l = recvq_length (cptr); } while (ll != l && l != 0); } return; } int recvq_get (connection_t *cptr, char *buf, int len) { packetqueue *sq; int l; char *p = buf; while (!cptr->recvq.empty ()) { sq = cptr->recvq.front (); if (sq->firstused == sq->firstfree || len <= 0) break; l = sq->firstfree - sq->firstused; if (l > len) l = len; memcpy (p, sq->buf + sq->firstused, l); p += l; len -= l; sq->firstused += l; if (sq->firstused == sq->firstfree) { if (cptr->recvq.size () > 1) { delete sq; cptr->recvq.pop_front (); } else /* keep one struct packetqueue */ sq->firstused = sq->firstfree = 0; } else return p - buf; } return p - buf; } int recvq_getline (connection_t *cptr, char *buf, int len) { packetqueue *sq, *sq2 = NULL; int l = 0; char *p = buf; char *newline = NULL; connection_t::queue::iterator it = cptr->recvq.begin (); connection_t::queue::iterator et = cptr->recvq.end (); while (it != et) { sq2 = *it; l += sq2->firstfree - sq2->firstused; newline = static_cast < char *>(memchr (sq2->buf + sq2->firstused, '\n', sq2->firstfree - sq2->firstused)); if (newline != NULL || l >= len) break; ++it; } if (newline == NULL && l < len) return 0; cptr->flags |= CF_NONEWLINE; while (!cptr->recvq.empty ()) { sq = cptr->recvq.front (); if (sq->firstused == sq->firstfree || len <= 0) break; l = sq->firstfree - sq->firstused; if (l > len) l = len; if (sq == sq2 && l >= newline - sq->buf - sq->firstused + 1) cptr->flags &= ~CF_NONEWLINE, l = newline - sq->buf - sq->firstused + 1; memcpy (p, sq->buf + sq->firstused, l); p += l; len -= l; sq->firstused += l; if (sq->firstused == sq->firstfree) { if (cptr->recvq.size () > 1) { delete sq; cptr->recvq.pop_front (); } else /* keep one struct packetqueue */ sq->firstused = sq->firstfree = 0; } else return p - buf; } return p - buf; } void sendqrecvq_free (connection_t *cptr) { connection_t::queue::iterator first = cptr->recvq.begin (); connection_t::queue::iterator last = cptr->recvq.end (); while (first != last) { delete *first; ++first; } cptr->recvq.clear (); first = cptr->sendq.begin (); last = cptr->sendq.end (); while (first != last) { delete *first; ++first; } cptr->sendq.clear (); }