--- AnyEvent-FastPing/FastPing.xs 2011/01/31 05:35:48 1.10 +++ AnyEvent-FastPing/FastPing.xs 2011/02/01 04:06:24 1.11 @@ -46,12 +46,9 @@ #define HDR_SIZE_IP4 20 #define HDR_SIZE_IP6 48 -//TODO: xread/xwrite for atomicity? we currently rely on the fact that the pip buffersize divides exactly by pointer sizes - -typedef uint8_t addr_tt[16]; - -static int thr_res[2]; // receive from worker(s) -static int icmp4_fd = -1, icmp6_fd = -1; +static int thr_res[2]; // worker thread finished status +static int icmp4_fd = -1; +static int icmp6_fd = -1; /*****************************************************************************/ @@ -108,30 +105,20 @@ uint32_t dst; } IP4HDR; -#if 0 -typedef struct -{ - uint8_t version; - uint8_t x1, x2, x3; - - uint16_t payload_len; - uint8_t nxt_hdr; - uint8_t hop_limit; - - uint8_t src[16]; - uint8_t dst[16]; -} IP6HDR; -#endif - /*****************************************************************************/ +typedef uint8_t addr_tt[16]; + typedef struct { tstamp next; double interval; int addrlen; - addr_tt lo, hi; - void *items; + + addr_tt lo, hi; /* only if !addrcnt */ + + int addrcnt; + /* addrcnt addresses follow */ } RANGE; typedef struct @@ -168,9 +155,12 @@ { uint8_t type, code; uint16_t cksum; + uint16_t id, seq; + uint16_t pinger; uint16_t magic; + uint32_t stamp_hi; uint32_t stamp_lo; } PKT; @@ -230,33 +220,31 @@ free (self); } -static void -inc_addr (addr_tt *addr) -{ - int len = sizeof (addr_tt) - 1; - - while (!++(*addr)[len]) - --len; -} - -/*****************************************************************************/ - /* like sendto, but retries on failure */ static void xsendto (int fd, void *buf, size_t len, int flags, void *sa, int salen) { - tstamp wait = 0; + tstamp wait = DRAIN_INTERVAL / 2.; while (sendto (fd, buf, len, flags, sa, salen) < 0 && errno == ENOBUFS) - ssleep (wait += DRAIN_INTERVAL); + ssleep (wait *= 2.); } -static void -send_ping (PKT *pkt, addr_tt *addr, int addr_len) +// ping current address, return true and increment if more to ping +static int +range_send_ping (RANGE *self, PKT *pkt) { + // send ping + uint8_t *addr; + + if (self->addrcnt) + addr = (self->addrcnt - 1) * self->addrlen + (uint8_t *)(self + 1); + else + addr = sizeof (addr_tt) - self->addrlen + self->lo; + pkt->cksum = 0; - if (addr_len == 4) + if (self->addrlen == 4) { struct sockaddr_in sa; @@ -266,9 +254,7 @@ sa.sin_family = AF_INET; sa.sin_port = 0; - memcpy (&sa.sin_addr, - sizeof (addr_tt) - sizeof (sa.sin_addr) + (char *)addr, - sizeof (sa.sin_addr)); + memcpy (&sa.sin_addr, addr, sizeof (sa.sin_addr)); xsendto (icmp4_fd, pkt, sizeof (*pkt), 0, &sa, sizeof (sa)); } @@ -284,15 +270,37 @@ sa.sin6_flowinfo = 0; sa.sin6_scope_id = 0; - memcpy (&sa.sin6_addr, - sizeof (addr_tt) - sizeof (sa.sin6_addr) + (char *)addr, - sizeof (sa.sin6_addr)); + memcpy (&sa.sin6_addr, addr, sizeof (sa.sin6_addr)); - xsendto (icmp6_fd, &pkt, sizeof (pkt), 0, &sa, sizeof (sa)); + xsendto (icmp6_fd, pkt, sizeof (*pkt), 0, &sa, sizeof (sa)); #endif } + + // see if we have any more addresses + if (self->addrcnt) + { + if (!--self->addrcnt) + return 0; + } + else + { + if (!memcmp (&self->lo, &self->hi, sizeof (addr_tt))) + return 0; + + // increment self->lo + { + int len = sizeof (addr_tt) - 1; + + while (!++self->lo [len]) + --len; + } + } + + return 1; } +/*****************************************************************************/ + static void downheap (PINGER *self) { @@ -385,23 +393,19 @@ ts_to_pkt (&pkt, now); - send_ping (&pkt, &range->lo, range->addrlen); - - if (!memcmp (&range->lo, &range->hi, sizeof (addr_tt))) + if (!range_send_ping (range, &pkt)) { self->ranges [0] = self->ranges [--self->rangecnt]; range_free (range); } else { - inc_addr (&range->lo); - range->next = next; range->next += range->interval; - - downheap (self); } + downheap (self); + next += self->interval; } @@ -505,6 +509,17 @@ free (self->ranges); } +static void +pinger_add_range (PINGER *self, RANGE *range) +{ + if (self->rangecnt == self->rangemax) + self->ranges = realloc (self->ranges, sizeof (self->ranges [0]) * (self->rangemax <<= 1)); + + self->ranges [self->rangecnt] = range; + upheap (self, self->rangecnt); + ++self->rangecnt; +} + /*****************************************************************************/ static void @@ -544,7 +559,7 @@ ENTER; SAVETMPS; - while (firstrecv >= 0) + do { dSP; PINGER *self = pingers [firstrecv]; @@ -558,6 +573,7 @@ PUTBACK; call_sv (self->recvcb, G_DISCARD | G_VOID); } + while (firstrecv >= 0); FREETMPS; LEAVE; @@ -650,13 +666,92 @@ PROTOTYPES: DISABLE void +_recv_icmp4 (...) + CODE: +{ + char buf [512]; + struct sockaddr_in sa; + int maxrecv; + + for (maxrecv = 256+1; --maxrecv; ) + { + PINGER *pinger; + IP4HDR *iphdr = (IP4HDR *)buf; + socklen_t sl = sizeof (sa); + int len = recvfrom (icmp4_fd, buf, sizeof (buf), MSG_TRUNC, (struct sockaddr *)&sa, &sl); + int hdrlen, totlen; + PKT *pkt; + + if (len <= HDR_SIZE_IP4) + break; + + hdrlen = (iphdr->version_ihl & 15) * 4; + totlen = ntohs (iphdr->tot_len); + + if (totlen > len + || iphdr->protocol != IPPROTO_ICMP + || hdrlen < HDR_SIZE_IP4 || hdrlen + sizeof (PKT) != totlen) + continue; + + pkt = (PKT *)(buf + hdrlen); + + if (pkt->type != ICMP4_ECHO_REPLY + || pkt->pinger >= pingercnt + || !pingers [pkt->pinger]) + continue; + + pinger = pingers [pkt->pinger]; + + if (!pkt_is_valid_for (pkt, pinger)) + continue; + + recv_feed (pinger, &sa.sin_addr, 4, NOW () - pkt_to_ts (pkt)); + } + + recv_flush (); +} + +void +_recv_icmp6 (...) + CODE: +{ + struct sockaddr_in6 sa; + PKT pkt; + int maxrecv; + + for (maxrecv = 256+1; --maxrecv; ) + { + PINGER *pinger; + socklen_t sl = sizeof (sa); + int len = recvfrom (icmp6_fd, &pkt, sizeof (pkt), MSG_TRUNC, (struct sockaddr *)&sa, &sl); + + if (len != sizeof (PKT)) + break; + + if (pkt.type != ICMP6_ECHO_REPLY + || pkt.pinger >= pingercnt + || !pingers [pkt.pinger]) + continue; + + pinger = pingers [pkt.pinger]; + + if (!pkt_is_valid_for (&pkt, pinger)) + continue; + + recv_feed (pinger, &sa.sin6_addr, 16, NOW () - pkt_to_ts (&pkt)); + } + + recv_flush (); +} + +void _new (SV *klass, UV magic1, UV magic2, UV magic3) PPCODE: { SV *pv = NEWSV (0, sizeof (PINGER)); PINGER *self = (PINGER *)SvPVX (pv); SvPOK_only (pv); - XPUSHs (sv_2mortal (sv_bless (newRV_noinc (pv), gv_stashpv (SvPV_nolen (klass), 1)))); + XPUSHs (sv_2mortal (sv_bless (newRV_noinc (pv), gv_stashpv (SvPVutf8_nolen (klass), 1)))); pinger_init (self); self->magic1 = magic1; self->magic2 = magic2; @@ -679,16 +774,24 @@ void pinger_stop (PINGER *self) -void _stop_id (UV id) +void +_stop_id (UV id) CODE: if (id < pingercnt && pingers [id]) pinger_stop (pingers [id]); -void interval (PINGER *self, NV interval) +void +interval (PINGER *self, NV interval) CODE: self->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL; -void add_range (PINGER *self, SV *lo_, SV *hi_, NV interval) +void +max_rtt (PINGER *self, NV maxrtt) + CODE: + self->maxrtt = maxrtt; + +void +add_range (PINGER *self, SV *lo_, SV *hi_, NV interval) CODE: { STRLEN lo_len, hi_len; @@ -702,6 +805,9 @@ if (lo_len == 4 && icmp4_fd < 0) croak ("IPv4 support unavailable"); if (lo_len == 16 && icmp6_fd < 0) croak ("IPv6 support unavailable"); + if (memcmp (lo, hi, lo_len) > 0) + croak ("AnyEvent::FastPing::add_range called with lo > hi"); + range = calloc (1, sizeof (RANGE)); range->next = 0; @@ -711,100 +817,59 @@ memcpy (sizeof (addr_tt) - lo_len + (char *)&range->lo, lo, lo_len); memcpy (sizeof (addr_tt) - lo_len + (char *)&range->hi, hi, lo_len); - if (self->rangecnt == self->rangemax) - self->ranges = realloc (self->ranges, sizeof (self->ranges [0]) * (self->rangemax <<= 1)); - - self->ranges [self->rangecnt] = range; - upheap (self, self->rangecnt); - ++self->rangecnt; + pinger_add_range (self, range); } void -on_recv (PINGER *self, SV *cb) - CODE: - SvREFCNT_dec (self->recvcb); - self->recvcb = newSVsv (cb); - -void -max_rtt (PINGER *self, NV maxrtt) - CODE: - self->maxrtt = maxrtt; - -void -_recv_icmp4 (...) +add_hosts (PINGER *self, SV *addrs, NV interval) CODE: { - char buf [512]; - struct sockaddr_in sa; + AV *av; + int i; + int cnt; + int addrlen; + RANGE *range; - for (;;) - { - PINGER *pinger; - IP4HDR *iphdr = (IP4HDR *)buf; - socklen_t sl = sizeof (sa); - int len = recvfrom (icmp4_fd, buf, sizeof (buf), MSG_TRUNC, (struct sockaddr *)&sa, &sl); - int hdrlen, totlen; - PKT *pkt; + if (!SvROK (addrs) || SvTYPE (SvRV (addrs)) != SVt_PVAV) + croak ("AnyEvent::FastPing::add_hosst expects an arrayref with binary IPv4 or IPv6 addresses"); - if (len <= HDR_SIZE_IP4) - break; + av = (AV *)SvRV (addrs); + cnt = av_len (av) + 1; - hdrlen = (iphdr->version_ihl & 15) * 4; - totlen = ntohs (iphdr->tot_len); + if (!cnt) + XSRETURN_EMPTY; - // packet corrupt? - if (totlen > len - || iphdr->protocol != IPPROTO_ICMP - || hdrlen < HDR_SIZE_IP4 || hdrlen + sizeof (PKT) != totlen) - continue; + addrlen = SvCUR (*av_fetch (av, 0, 1)); - pkt = (PKT *)(buf + hdrlen); + if (addrlen != 4 && addrlen != 16) + croak ("AnyEvent::FastPing::add_hosts addresses must be specified as binary IPv4 or IPv6 addresses"); - if (pkt->type != ICMP4_ECHO_REPLY - || pkt->pinger >= pingercnt - || !pingers [pkt->pinger]) - continue; + for (i = cnt; --i; ) + { + SV *sv = *av_fetch (av, i, 1); - pinger = pingers [pkt->pinger]; + if (!sv_utf8_downgrade (sv, 1) || addrlen != SvCUR (sv)) + croak ("AnyEvent::FastPing::add_hosts addresses must all have the same size"); + } - if (!pkt_is_valid_for (pkt, pinger)) - continue; + range = calloc (1, sizeof (RANGE) + cnt * addrlen); - recv_feed (pinger, &sa.sin_addr, 4, NOW () - pkt_to_ts (pkt)); - } + range->next = 0; + range->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL; + range->addrlen = addrlen; + range->addrcnt = cnt; - recv_flush (); + for (i = cnt; i--; ) + memcpy ((uint8_t *)(range + 1) + (cnt - 1 - i) * addrlen, + SvPVbyte_nolen (*av_fetch (av, i, 1)), + addrlen); + + pinger_add_range (self, range); } void -_recv_icmp6 (...) +on_recv (PINGER *self, SV *cb) CODE: -{ - struct sockaddr_in6 sa; - PKT pkt; - - for (;;) - { - PINGER *pinger; - socklen_t sl = sizeof (sa); - int len = recvfrom (icmp6_fd, &pkt, sizeof (pkt), MSG_TRUNC, (struct sockaddr *)&sa, &sl); - - if (len != sizeof (PKT)) - break; - - if (pkt.type != ICMP6_ECHO_REPLY - || pkt.pinger >= pingercnt - || !pingers [pkt.pinger]) - continue; - - pinger = pingers [pkt.pinger]; - - if (!pkt_is_valid_for (&pkt, pinger)) - continue; - - recv_feed (pinger, &sa.sin6_addr, 16, NOW () - pkt_to_ts (&pkt)); - } - - recv_flush (); -} + SvREFCNT_dec (self->recvcb); + self->recvcb = newSVsv (cb);