--- AnyEvent-FastPing/FastPing.xs 2011/01/30 02:03:35 1.9 +++ AnyEvent-FastPing/FastPing.xs 2011/01/31 05:35:48 1.10 @@ -50,6 +50,9 @@ typedef uint8_t addr_tt[16]; +static int thr_res[2]; // receive from worker(s) +static int icmp4_fd = -1, icmp6_fd = -1; + /*****************************************************************************/ typedef double tstamp; @@ -90,23 +93,6 @@ typedef struct { - int family; - addr_tt lo, hi; - double interval; - tstamp next; -} RANGE; - -typedef struct -{ - SV *id; - double interval; - int nranges; - RANGE *ranges; - uint32_t payload; -} REQ; - -typedef struct -{ uint8_t version_ihl; uint8_t tos; uint16_t tot_len; @@ -122,6 +108,7 @@ uint32_t dst; } IP4HDR; +#if 0 typedef struct { uint8_t version; @@ -134,28 +121,66 @@ uint8_t src[16]; uint8_t dst[16]; } IP6HDR; +#endif /*****************************************************************************/ -#define MAGIC 0xca4c +typedef struct +{ + tstamp next; + double interval; + int addrlen; + addr_tt lo, hi; + void *items; +} RANGE; + +typedef struct +{ + RANGE **ranges; + int rangecnt, rangemax; -static uint16_t magic; + tstamp interval; + tstamp maxrtt; + uint16_t magic1; + uint16_t magic2; + uint16_t magic3; + + int id; + + AV *recvq; /* receive queue */ + int nextrecv; + SV *recvcb; + + pthread_t thrid; + int running; +} PINGER; + +static PINGER **pingers; +static int *pingerfree; /* freelist next */ +static int pingercnt; +static int pingermax; +static int firstfree = -1; +static int firstrecv = -1; + +/*****************************************************************************/ typedef struct { uint8_t type, code; uint16_t cksum; uint16_t id, seq; - uint32_t payload; + uint16_t pinger; + uint16_t magic; uint32_t stamp_hi; uint32_t stamp_lo; } PKT; static int -pkt_is_valid (PKT *pkt) +pkt_is_valid_for (PKT *pkt, PINGER *pinger) { - return pkt->id == (uint16_t) magic - && pkt->seq == (uint16_t)~magic; + return pkt->id == pinger->magic1 + && pkt->seq == pinger->magic2 + && pkt->magic == pinger->magic3; } static void @@ -177,199 +202,370 @@ + pkt->stamp_lo * (4096. / 4294967296.); } +static void +pkt_cksum (PKT *pkt) +{ + uint_fast32_t sum = -pkt->cksum; + uint32_t *wp = (uint32_t *)pkt; + int len = sizeof (*pkt) / 4; + + do + { + uint_fast32_t w = *(volatile uint32_t *)wp++; + sum += (w & 0xffff) + (w >> 16); + } + while (len--); + + sum = (sum >> 16) + (sum & 0xffff); /* add high 16 to low 16 */ + sum += sum >> 16; /* add carry */ + + pkt->cksum = ~sum; +} + /*****************************************************************************/ -static pthread_t pthrid; -static int thr_send[2]; // send to worker -static int thr_recv[2]; // receive from worker +static void +range_free (RANGE *self) +{ + free (self); +} + +static void +inc_addr (addr_tt *addr) +{ + int len = sizeof (addr_tt) - 1; -static int icmp4_fd, icmp6_fd; + while (!++(*addr)[len]) + --len; +} -static AV *cbs; +/*****************************************************************************/ -static uint16_t -icmp_cksum (void *data, unsigned int len) +/* like sendto, but retries on failure */ +static void +xsendto (int fd, void *buf, size_t len, int flags, void *sa, int salen) { - register int sum = 0; - uint32_t *wp; + tstamp wait = 0; - for (wp = (uint32_t *)data; len; wp++, len -= 4) - sum += (*wp & 0xffff) + (*wp >> 16); + while (sendto (fd, buf, len, flags, sa, salen) < 0 && errno == ENOBUFS) + ssleep (wait += DRAIN_INTERVAL); +} - sum = (sum >> 16) + (sum & 0xffff); /* add high 16 to low 16 */ - sum += sum >> 16; /* add carry */ +static void +send_ping (PKT *pkt, addr_tt *addr, int addr_len) +{ + pkt->cksum = 0; + + if (addr_len == 4) + { + struct sockaddr_in sa; + + pkt->type = ICMP4_ECHO; + pkt_cksum (pkt); + + sa.sin_family = AF_INET; + sa.sin_port = 0; + + memcpy (&sa.sin_addr, + sizeof (addr_tt) - sizeof (sa.sin_addr) + (char *)addr, + sizeof (sa.sin_addr)); + + xsendto (icmp4_fd, pkt, sizeof (*pkt), 0, &sa, sizeof (sa)); + } + else + { +#if ENABLE_IPV6 + struct sockaddr_in6 sa; + + pkt->type = ICMP6_ECHO; - return ~sum; + sa.sin6_family = AF_INET6; + sa.sin6_port = 0; + sa.sin6_flowinfo = 0; + sa.sin6_scope_id = 0; + + memcpy (&sa.sin6_addr, + sizeof (addr_tt) - sizeof (sa.sin6_addr) + (char *)addr, + sizeof (sa.sin6_addr)); + + xsendto (icmp6_fd, &pkt, sizeof (pkt), 0, &sa, sizeof (sa)); +#endif + } } static void -inc_addr (addr_tt *addr) +downheap (PINGER *self) { - int len = sizeof (addr_tt) - 1; + RANGE *elem = self->ranges [0]; /* always exists */ + int Nm1 = self->rangecnt - 1; + int j; + int k; - do + for (k = 0; ; ) { - if ((*addr)[len] != 0xff) - { - ++(*addr)[len]; - break; - } + int j = k * 2 + 1; + + if (j > Nm1) + break; + + if (j < Nm1 + && self->ranges [j]->next > self->ranges [j + 1]->next) + ++j; + + if (self->ranges [j]->next >= elem->next) + break; - (*addr)[len] = 0; + self->ranges [k] = self->ranges [j]; + + k = j; } - while (len--); + + self->ranges [k] = elem; +} + +static void +upheap (PINGER *self, int k) +{ + RANGE *elem = self->ranges [k]; + + while (k) + { + int j = (k - 1) >> 1; + + if (self->ranges [j]->next <= elem->next) + break; + + self->ranges [k] = self->ranges [j]; + + k = j; + } + + self->ranges [k] = elem; } static void * -ping_proc (void *unused) +ping_proc (void *self_) { + PINGER *self = (PINGER *)self_; PKT pkt; - struct sockaddr_in sa4; -#if ENABLE_IPV6 - struct sockaddr_in6 sa6; -#endif memset (&pkt, 0, sizeof (pkt)); - memset (&sa4, 0, sizeof (sa4)); - sa4.sin_family = AF_INET; - sa4.sin_port = 0; -#if ENABLE_IPV6 - memset (&sa6, 0, sizeof (sa6)); - sa6.sin6_family = AF_INET6; - sa6.sin6_port = 0; -#endif + tstamp now = NOW (); + tstamp next = now; + + pkt.code = 0; + pkt.id = self->magic1; + pkt.seq = self->magic2; + pkt.magic = self->magic3; + pkt.pinger = self->id; - for (;;) + while (self->rangecnt) { - REQ *req; - int len = read (thr_send [0], &req, sizeof (req)); + RANGE *range = self->ranges [0]; + int n, k; - tstamp now = NOW (); - tstamp next = now; + // ranges [0] is always the next range to ping + tstamp wait = range->next - now; - if (!len) - pthread_exit (0); - else if (len != sizeof (req)) + // compare with the global frequency limit + { + tstamp diff = next - now; + + if (wait < diff) + wait = diff; + else if (range) + next = range->next; + } + + if (wait > 0.) + ssleep (wait); + + now = NOW (); + + ts_to_pkt (&pkt, now); + + send_ping (&pkt, &range->lo, range->addrlen); + + if (!memcmp (&range->lo, &range->hi, sizeof (addr_tt))) { - perror ("AnyEvent::FastPing: short read or read error"); - pthread_exit ((void *)-1); + self->ranges [0] = self->ranges [--self->rangecnt]; + range_free (range); } + else + { + inc_addr (&range->lo); - //TODO: bind to source address + range->next = next; + range->next += range->interval; - pkt.code = 0; - pkt.id = (uint16_t)magic; - pkt.seq = (uint16_t)~magic; - pkt.payload = req->payload; + downheap (self); + } - { - int r; - for (r = req->nranges; r--; ) - inc_addr (&req->ranges [r].hi); - } + next += self->interval; + } - while (req->nranges) - { - RANGE *range = req->ranges; - int n, k; + ssleep (self->maxrtt); - if (!memcmp (&range->lo, &range->hi, sizeof (addr_tt))) - req->ranges [0] = req->ranges [--req->nranges]; - else - { - // ranges [0] is always the next range to ping - tstamp wait = range->next - now; - - // compare with the global frequency limit - { - tstamp diff = next - now; - - if (wait < diff) - wait = diff; - else if (range) - next = range->next; - } - - if (wait > 0.) - ssleep (wait); - - now = NOW (); - - ts_to_pkt (&pkt, now); - pkt.cksum = 0; - - if (range->family == AF_INET) - { - pkt.type = ICMP4_ECHO; - pkt.cksum = icmp_cksum (&pkt, sizeof (pkt)); - - memcpy (&sa4.sin_addr, - sizeof (addr_tt) - sizeof (sa4.sin_addr) + (char *)&range->lo, - sizeof (sa4.sin_addr)); - - if (sendto (icmp4_fd, &pkt, sizeof (pkt), 0, (struct sockaddr *)&sa4, sizeof (sa4)) > 0) - errno = 0; - } - else - { -#if ENABLE_IPV6 - pkt.type = ICMP6_ECHO; + { + uint16_t id = self->id; - memcpy (&sa6.sin6_addr, - sizeof (addr_tt) - sizeof (sa6.sin6_addr) + (char *)&range->lo, - sizeof (sa6.sin6_addr)); + write (thr_res [1], &id, sizeof (id)); + } - if (sendto (icmp6_fd, &pkt, sizeof (pkt), 0, (struct sockaddr *)&sa6, sizeof (sa6)) > 0) - errno = 0; -#endif - } + return 0; +} - if (errno == ENOBUFS) - ssleep (DRAIN_INTERVAL); - else - { - inc_addr (&range->lo); - - range->next = next; - range->next += range->interval; - } - - next += req->interval; - } - - // make a downheap operation - for (n = k = 0; ; ) - { - int j = k * 2 + 1; - - ++n; - - if (j >= req->nranges) - break; - else if (j < req->nranges - 1) - if (req->ranges [j].next > req->ranges [j + 1].next) - ++j; - - if (req->ranges [j].next >= req->ranges [k].next) - break; - - { - RANGE temp = req->ranges [k]; - req->ranges [k] = req->ranges [j]; - req->ranges [j] = temp; - } +/*****************************************************************************/ - k = j; - } +static void +pinger_start (PINGER *self) +{ + sigset_t fullsigset, oldsigset; + pthread_attr_t attr; + + if (self->running) + return; + + sigfillset (&fullsigset); + + pthread_attr_init (&attr); + pthread_attr_setstacksize (&attr, PTHREAD_STACK_MIN < sizeof (long) * 2048 ? sizeof (long) * 2048 : PTHREAD_STACK_MIN); + + pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset); + + if (pthread_create (&self->thrid, &attr, ping_proc, (void *)self)) + croak ("AnyEvent::FastPing: unable to create pinger thread"); + + pthread_sigmask (SIG_SETMASK, &oldsigset, 0); + + self->running = 1; +} + +static void +pinger_stop (PINGER *self) +{ + if (!self->running) + return; + + self->running = 0; + pthread_cancel (self->thrid); + pthread_join (self->thrid, 0); +} + +static void +pinger_init (PINGER *self) +{ + memset (self, 0, sizeof (PINGER)); + + if (firstfree >= 0) + { + self->id = firstfree; + firstfree = pingerfree [firstfree]; + } + else if (pingercnt == 0xffff) + croak ("unable to create more than 65536 AnyEvent::FastPing objects"); + else + { + if (pingercnt == pingermax) + { + pingermax = pingermax * 2 + 16; + pingers = realloc (pingers , sizeof (pingers [0]) * pingermax); + pingerfree = realloc (pingerfree, sizeof (pingerfree [0]) * pingermax); } - write (thr_recv [1], &req, sizeof (req)); + self->id = pingercnt++; } - return 0; + pingers [self->id] = self; + + self->recvcb = &PL_sv_undef; + self->interval = MIN_INTERVAL; + self->maxrtt = 0.5; + self->rangemax = 16; + self->ranges = malloc (sizeof (self->ranges [0]) * self->rangemax); +} + +static void +pinger_free (PINGER *self) +{ + pinger_stop (self); + + pingers [self->id] = 0; + + SvREFCNT_dec (self->recvq); + SvREFCNT_dec (self->recvcb); + + pingerfree [self->id] = firstfree; + firstfree = self->id; + + while (self->rangecnt) + range_free (self->ranges [--self->rangecnt]); + + free (self->ranges); +} + +/*****************************************************************************/ + +static void +recv_feed (PINGER *self, void *addr, int addrlen, tstamp rtt) +{ + if (!self->recvq) + { + /* first seen this round */ + if (!SvOK (self->recvcb)) + return; + + self->recvq = newAV (); + + self->nextrecv = firstrecv; + firstrecv = self->id; + } + + { + AV *pkt = newAV (); + + av_extend (pkt, 2-1); + + AvARRAY (pkt)[0] = newSVpvn (addr, addrlen); + AvARRAY (pkt)[1] = newSVnv (rtt); + AvFILLp (pkt) = 2-1; + + av_push (self->recvq, newRV_noinc ((SV *)pkt)); + } +} + +static void +recv_flush (void) +{ + if (firstrecv < 0) + return; + + ENTER; + SAVETMPS; + + while (firstrecv >= 0) + { + dSP; + PINGER *self = pingers [firstrecv]; + firstrecv = self->nextrecv; + + self->nextrecv = -1; + + PUSHMARK (SP); + XPUSHs (sv_2mortal (newRV_noinc ((SV *)self->recvq))); + self->recvq = 0; + PUTBACK; + call_sv (self->recvcb, G_DISCARD | G_VOID); + } + + FREETMPS; + LEAVE; } +/*****************************************************************************/ + +#if 0 static void feed_reply (AV *res_av) { @@ -396,19 +592,16 @@ FREETMPS; LEAVE; } +#endif static void boot () { - sigset_t fullsigset, oldsigset; - pthread_attr_t attr; - - if (pipe (thr_send) < 0) - croak ("AnyEvent::FastPing: unable to create send pipe"); - - if (pipe (thr_recv) < 0) + if (pipe (thr_res) < 0) croak ("AnyEvent::FastPing: unable to create receive pipe"); + sv_setiv (get_sv ("AnyEvent::FastPing::THR_RES_FD", 1), thr_res [0]); + icmp4_fd = socket (AF_INET, SOCK_RAW, IPPROTO_ICMP); fcntl (icmp4_fd, F_SETFL, O_NONBLOCK); #ifdef ICMP_FILTER @@ -432,37 +625,16 @@ # endif #endif - pthread_attr_init (&attr); - pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); -#ifdef PTHREAD_SCOPE_PROCESS - pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS); -#endif - - sigfillset (&fullsigset); - - pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset); - - if (pthread_create (&pthrid, &attr, ping_proc, 0)) - croak ("AnyEvent::FastPing: unable to create pinger thread"); - - pthread_sigmask (SIG_SETMASK, &oldsigset, 0); - - sv_setiv (get_sv ("AnyEvent::FastPing::THR_REQ_FD", 1), thr_send [1]); - sv_setiv (get_sv ("AnyEvent::FastPing::THR_RES_FD", 1), thr_recv [0]); - sv_setiv (get_sv ("AnyEvent::FastPing::ICMP4_FD", 1), icmp4_fd); sv_setiv (get_sv ("AnyEvent::FastPing::ICMP6_FD", 1), icmp6_fd); } -MODULE = AnyEvent::FastPing PACKAGE = AnyEvent::FastPing +MODULE = AnyEvent::FastPing PACKAGE = AnyEvent::FastPing PREFIX = pinger_ BOOT: { HV *stash = gv_stashpv ("AnyEvent::FastPing", 1); - cbs = get_av ("AnyEvent::FastPing::CB", 1); - magic = getpid () ^ MAGIC; - if (sizeof (PKT) & 3) croak ("size of PKT structure is not a multiple of 4"); @@ -477,128 +649,86 @@ PROTOTYPES: DISABLE -SV * -_req_icmp_ping (SV *ranges, NV interval, U32 payload, SV *id) - CODE: +void +_new (SV *klass, UV magic1, UV magic2, UV magic3) + PPCODE: { - AV *rav; - int nranges, i; - REQ *req; + SV *pv = NEWSV (0, sizeof (PINGER)); + PINGER *self = (PINGER *)SvPVX (pv); + SvPOK_only (pv); + XPUSHs (sv_2mortal (sv_bless (newRV_noinc (pv), gv_stashpv (SvPV_nolen (klass), 1)))); + pinger_init (self); + self->magic1 = magic1; + self->magic2 = magic2; + self->magic3 = magic3; +} + +void +_free (PINGER *self) + CODE: + pinger_free (self); - if (!SvROK (ranges) || SvTYPE (SvRV (ranges)) != SVt_PVAV) - croak ("address ranges must be given as arrayref with lo, hi pairs"); +IV +id (PINGER *self, ...) + CODE: + RETVAL = self->id; + OUTPUT: + RETVAL - rav = (AV *)SvRV (ranges); - nranges = av_len (rav) + 1; +void pinger_start (PINGER *self) - req = malloc (sizeof (REQ)); +void pinger_stop (PINGER *self) - if (interval < MIN_INTERVAL) - interval = MIN_INTERVAL; +void _stop_id (UV id) + CODE: + if (id < pingercnt && pingers [id]) + pinger_stop (pingers [id]); - req->id = newSVsv (id); - req->interval = interval; - req->payload = payload; - req->nranges = nranges; - req->ranges = (RANGE *)malloc (nranges * sizeof (RANGE)); +void interval (PINGER *self, NV interval) + CODE: + self->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL; - while (nranges--) - { - SV *sv = *av_fetch (rav, nranges, 1); - SV *lo, *hi; - AV *av; - RANGE *r; - - if (!SvROK (sv) || SvTYPE (SvRV (sv)) != SVt_PVAV) - croak ("address range must be given as arrayref with lo, hi, interval arrayrefs"); - - av = (AV *)SvRV (sv); - r = req->ranges + nranges; - - lo = *av_fetch (av, 0, 1); - hi = *av_fetch (av, 1, 1); - - sv_utf8_downgrade (lo, 0); - sv_utf8_downgrade (hi, 0); - - memset (&r->lo, 0, sizeof (addr_tt)); - memset (&r->hi, 0, sizeof (addr_tt)); - - if (SvPOKp (lo) && SvPOKp (hi)) - { - if (SvCUR (lo) != SvCUR (hi)) - croak ("all addresses in range must be of the same size (either 4 or 16 bytes)"); - - if (SvCUR (lo) == 4) - { - r->family = AF_INET; - memcpy (sizeof (addr_tt) - 4 + (char *)&r->lo, SvPVX (lo), 4); - memcpy (sizeof (addr_tt) - 4 + (char *)&r->hi, SvPVX (hi), 4); - } - else if (SvCUR (lo) == 16) - { -#if ENABLE_IPV6 - r->family = AF_INET6; - memcpy (&r->lo, SvPVX (lo), sizeof (addr_tt)); - memcpy (&r->hi, SvPVX (hi), sizeof (addr_tt)); -#else - croak ("IPv6 not supported in this configuration"); -#endif - } - else - croak ("addresses in range must be either 4 (IPv4) or 16 (IPv6) bytes in length"); - } - else if (SvIOK (lo) && SvIOK (hi)) - { - uint32_t addr; - - r->family = AF_INET; - - addr = htonl (SvUV (lo)); memcpy (sizeof (addr_tt) - 4 + (char *)&r->lo, &addr, 4); - addr = htonl (SvUV (hi)); memcpy (sizeof (addr_tt) - 4 + (char *)&r->hi, &addr, 4); - } - else - croak ("addresses in range must be strings with either 4 (IPv4) or 16 (IPv6) octets"); - - if (r->family == AF_INET) - { - if (icmp4_fd < 0) - croak ("AnyEvent::FastPing: IPv4 ping support not available on this system"); - } - else - { - if (icmp6_fd < 0) - croak ("AnyEvent::FastPing: IPv6 ping support not available on this system"); - } +void add_range (PINGER *self, SV *lo_, SV *hi_, NV interval) + CODE: +{ + STRLEN lo_len, hi_len; + char *lo = SvPVbyte (lo_, lo_len); + char *hi = SvPVbyte (hi_, hi_len); + RANGE *range; - r->interval = SvNV (*av_fetch (av, 2, 1)); + if (lo_len != hi_len || (lo_len != 4 && lo_len != 16)) + croak ("AnyEvent::FastPing::add_range address range must be specified as two binary IPv4 or IPv6 addresses"); - if (r->interval < req->interval) - r->interval = req->interval; + if (lo_len == 4 && icmp4_fd < 0) croak ("IPv4 support unavailable"); + if (lo_len == 16 && icmp6_fd < 0) croak ("IPv6 support unavailable"); - r->next = 0.; - } + range = calloc (1, sizeof (RANGE)); - RETVAL = newSVpvn ((char *)&req, sizeof (req)); -} - OUTPUT: - RETVAL + range->next = 0; + range->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL; + range->addrlen = lo_len; -SV * -_read_res () - CODE: -{ - REQ *req; + memcpy (sizeof (addr_tt) - lo_len + (char *)&range->lo, lo, lo_len); + memcpy (sizeof (addr_tt) - lo_len + (char *)&range->hi, hi, lo_len); - if (read (thr_recv [0], &req, sizeof (req)) != sizeof (req)) - RETVAL = &PL_sv_undef; + if (self->rangecnt == self->rangemax) + self->ranges = realloc (self->ranges, sizeof (self->ranges [0]) * (self->rangemax <<= 1)); - RETVAL = req->id; - free (req->ranges); - free (req); + self->ranges [self->rangecnt] = range; + upheap (self, self->rangecnt); + ++self->rangecnt; } - OUTPUT: - RETVAL + +void +on_recv (PINGER *self, SV *cb) + CODE: + SvREFCNT_dec (self->recvcb); + self->recvcb = newSVsv (cb); + +void +max_rtt (PINGER *self, NV maxrtt) + CODE: + self->maxrtt = maxrtt; void _recv_icmp4 (...) @@ -606,16 +736,12 @@ { char buf [512]; struct sockaddr_in sa; - socklen_t sl = sizeof (sa); - AV *res_av = av_len (cbs) < 0 ? 0 : (AV *)sv_2mortal ((SV *)newAV ()); - tstamp now = NOW (); - - if (!res_av) - XSRETURN_UNDEF; for (;;) { + PINGER *pinger; IP4HDR *iphdr = (IP4HDR *)buf; + socklen_t sl = sizeof (sa); int len = recvfrom (icmp4_fd, buf, sizeof (buf), MSG_TRUNC, (struct sockaddr *)&sa, &sl); int hdrlen, totlen; PKT *pkt; @@ -634,20 +760,20 @@ pkt = (PKT *)(buf + hdrlen); - if (pkt->type != ICMP4_ECHO_REPLY || !pkt_is_valid (pkt)) + if (pkt->type != ICMP4_ECHO_REPLY + || pkt->pinger >= pingercnt + || !pingers [pkt->pinger]) continue; - { - AV *av = newAV (); - av_push (av, newSVpvn ((char *)&sa.sin_addr, 4)); - av_push (av, newSVnv (now - pkt_to_ts (pkt))); - av_push (av, newSVuv (pkt->payload)); + pinger = pingers [pkt->pinger]; + + if (!pkt_is_valid_for (pkt, pinger)) + continue; - av_push (res_av, newRV_noinc ((SV *)av)); - } + recv_feed (pinger, &sa.sin_addr, 4, NOW () - pkt_to_ts (pkt)); } - feed_reply (res_av); + recv_flush (); } void @@ -655,34 +781,30 @@ CODE: { struct sockaddr_in6 sa; - socklen_t sl = sizeof (sa); - AV *res_av = av_len (cbs) < 0 ? 0 : (AV *)sv_2mortal ((SV *)newAV ()); PKT pkt; - tstamp now = NOW (); - - if (!res_av) - XSRETURN_UNDEF; for (;;) { + PINGER *pinger; + socklen_t sl = sizeof (sa); int len = recvfrom (icmp6_fd, &pkt, sizeof (pkt), MSG_TRUNC, (struct sockaddr *)&sa, &sl); if (len != sizeof (PKT)) break; - if (pkt.type != ICMP6_ECHO_REPLY || !pkt_is_valid (&pkt)) + if (pkt.type != ICMP6_ECHO_REPLY + || pkt.pinger >= pingercnt + || !pingers [pkt.pinger]) continue; - { - AV *av = newAV (); - av_push (av, newSVpvn ((char *)&sa.sin6_addr, 16)); - av_push (av, newSVnv (now - pkt_to_ts (&pkt))); - av_push (av, newSVuv (pkt.payload)); + pinger = pingers [pkt.pinger]; + + if (!pkt_is_valid_for (&pkt, pinger)) + continue; - av_push (res_av, newRV_noinc ((SV *)av)); - } + recv_feed (pinger, &sa.sin6_addr, 16, NOW () - pkt_to_ts (&pkt)); } - feed_reply (res_av); + recv_flush (); }