#if defined(__linux) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__CYGWIN__) # define ENABLE_IPV6 1 // if you get compilation problems try to disable IPv6 #else # define ENABLE_IPV6 0 #endif #include "EXTERN.h" #include "perl.h" #include "XSUB.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef __linux # include #endif #if ENABLE_IPV6 && !defined (__CYGWIN__) # include #endif #define ICMP4_ECHO 8 #define ICMP4_ECHO_REPLY 0 #define ICMP6_ECHO 128 #define ICMP6_ECHO_REPLY 129 #define DRAIN_INTERVAL 1e-6 // how long to wait when sendto returns ENOBUFS, in seconds #define MIN_INTERVAL 1e-6 // minimum packet send interval, in seconds #define HDR_SIZE_IP4 20 #define HDR_SIZE_IP6 48 //TODO: xread/xwrite for atomicity? we currently rely on the fact that the pip buffersize divides exactly by pointer sizes typedef uint8_t addr_tt[16]; static int thr_res[2]; // receive from worker(s) static int icmp4_fd = -1, icmp6_fd = -1; /*****************************************************************************/ typedef double tstamp; static tstamp NOW (void) { struct timeval tv; gettimeofday (&tv, 0); return tv.tv_sec + tv.tv_usec * 1e-6; } static void ssleep (tstamp wait) { #if defined (__SVR4) && defined (__sun) struct timeval tv; tv.tv_sec = wait; tv.tv_usec = (wait - tv.tv_sec) * 1e6; select (0, 0, 0, 0, &tv); #elif defined(_WIN32) Sleep ((unsigned long)(delay * 1e3)); #else struct timespec ts; ts.tv_sec = wait; ts.tv_nsec = (wait - ts.tv_sec) * 1e9; nanosleep (&ts, 0); #endif } /*****************************************************************************/ typedef struct { uint8_t version_ihl; uint8_t tos; uint16_t tot_len; uint16_t id; uint16_t flags; uint8_t ttl; uint8_t protocol; uint16_t cksum; uint32_t src; uint32_t dst; } IP4HDR; #if 0 typedef struct { uint8_t version; uint8_t x1, x2, x3; uint16_t payload_len; uint8_t nxt_hdr; uint8_t hop_limit; uint8_t src[16]; uint8_t dst[16]; } IP6HDR; #endif /*****************************************************************************/ typedef struct { tstamp next; double interval; int addrlen; addr_tt lo, hi; void *items; } RANGE; typedef struct { RANGE **ranges; int rangecnt, rangemax; tstamp interval; tstamp maxrtt; uint16_t magic1; uint16_t magic2; uint16_t magic3; int id; AV *recvq; /* receive queue */ int nextrecv; SV *recvcb; pthread_t thrid; int running; } PINGER; static PINGER **pingers; static int *pingerfree; /* freelist next */ static int pingercnt; static int pingermax; static int firstfree = -1; static int firstrecv = -1; /*****************************************************************************/ typedef struct { uint8_t type, code; uint16_t cksum; uint16_t id, seq; uint16_t pinger; uint16_t magic; uint32_t stamp_hi; uint32_t stamp_lo; } PKT; static int pkt_is_valid_for (PKT *pkt, PINGER *pinger) { return pkt->id == pinger->magic1 && pkt->seq == pinger->magic2 && pkt->magic == pinger->magic3; } static void ts_to_pkt (PKT *pkt, tstamp ts) { /* move 12 bits of seconds into the 32 bit fractional part */ /* leaving 20 bits subsecond resolution and 44 bits of integers */ /* (of which 32 are typically usable) */ ts *= 1. / 4096.; pkt->stamp_hi = ts; pkt->stamp_lo = (ts - pkt->stamp_hi) * 4294967296.; } static tstamp pkt_to_ts (PKT *pkt) { return pkt->stamp_hi * 4096. + pkt->stamp_lo * (4096. / 4294967296.); } static void pkt_cksum (PKT *pkt) { uint_fast32_t sum = -pkt->cksum; uint32_t *wp = (uint32_t *)pkt; int len = sizeof (*pkt) / 4; do { uint_fast32_t w = *(volatile uint32_t *)wp++; sum += (w & 0xffff) + (w >> 16); } while (len--); sum = (sum >> 16) + (sum & 0xffff); /* add high 16 to low 16 */ sum += sum >> 16; /* add carry */ pkt->cksum = ~sum; } /*****************************************************************************/ static void range_free (RANGE *self) { free (self); } static void inc_addr (addr_tt *addr) { int len = sizeof (addr_tt) - 1; while (!++(*addr)[len]) --len; } /*****************************************************************************/ /* like sendto, but retries on failure */ static void xsendto (int fd, void *buf, size_t len, int flags, void *sa, int salen) { tstamp wait = 0; while (sendto (fd, buf, len, flags, sa, salen) < 0 && errno == ENOBUFS) ssleep (wait += DRAIN_INTERVAL); } static void send_ping (PKT *pkt, addr_tt *addr, int addr_len) { pkt->cksum = 0; if (addr_len == 4) { struct sockaddr_in sa; pkt->type = ICMP4_ECHO; pkt_cksum (pkt); sa.sin_family = AF_INET; sa.sin_port = 0; memcpy (&sa.sin_addr, sizeof (addr_tt) - sizeof (sa.sin_addr) + (char *)addr, sizeof (sa.sin_addr)); xsendto (icmp4_fd, pkt, sizeof (*pkt), 0, &sa, sizeof (sa)); } else { #if ENABLE_IPV6 struct sockaddr_in6 sa; pkt->type = ICMP6_ECHO; sa.sin6_family = AF_INET6; sa.sin6_port = 0; sa.sin6_flowinfo = 0; sa.sin6_scope_id = 0; memcpy (&sa.sin6_addr, sizeof (addr_tt) - sizeof (sa.sin6_addr) + (char *)addr, sizeof (sa.sin6_addr)); xsendto (icmp6_fd, &pkt, sizeof (pkt), 0, &sa, sizeof (sa)); #endif } } static void downheap (PINGER *self) { RANGE *elem = self->ranges [0]; /* always exists */ int Nm1 = self->rangecnt - 1; int j; int k; for (k = 0; ; ) { int j = k * 2 + 1; if (j > Nm1) break; if (j < Nm1 && self->ranges [j]->next > self->ranges [j + 1]->next) ++j; if (self->ranges [j]->next >= elem->next) break; self->ranges [k] = self->ranges [j]; k = j; } self->ranges [k] = elem; } static void upheap (PINGER *self, int k) { RANGE *elem = self->ranges [k]; while (k) { int j = (k - 1) >> 1; if (self->ranges [j]->next <= elem->next) break; self->ranges [k] = self->ranges [j]; k = j; } self->ranges [k] = elem; } static void * ping_proc (void *self_) { PINGER *self = (PINGER *)self_; PKT pkt; memset (&pkt, 0, sizeof (pkt)); tstamp now = NOW (); tstamp next = now; pkt.code = 0; pkt.id = self->magic1; pkt.seq = self->magic2; pkt.magic = self->magic3; pkt.pinger = self->id; while (self->rangecnt) { RANGE *range = self->ranges [0]; int n, k; // ranges [0] is always the next range to ping tstamp wait = range->next - now; // compare with the global frequency limit { tstamp diff = next - now; if (wait < diff) wait = diff; else if (range) next = range->next; } if (wait > 0.) ssleep (wait); now = NOW (); ts_to_pkt (&pkt, now); send_ping (&pkt, &range->lo, range->addrlen); if (!memcmp (&range->lo, &range->hi, sizeof (addr_tt))) { self->ranges [0] = self->ranges [--self->rangecnt]; range_free (range); } else { inc_addr (&range->lo); range->next = next; range->next += range->interval; downheap (self); } next += self->interval; } ssleep (self->maxrtt); { uint16_t id = self->id; write (thr_res [1], &id, sizeof (id)); } return 0; } /*****************************************************************************/ static void pinger_start (PINGER *self) { sigset_t fullsigset, oldsigset; pthread_attr_t attr; if (self->running) return; sigfillset (&fullsigset); pthread_attr_init (&attr); pthread_attr_setstacksize (&attr, PTHREAD_STACK_MIN < sizeof (long) * 2048 ? sizeof (long) * 2048 : PTHREAD_STACK_MIN); pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset); if (pthread_create (&self->thrid, &attr, ping_proc, (void *)self)) croak ("AnyEvent::FastPing: unable to create pinger thread"); pthread_sigmask (SIG_SETMASK, &oldsigset, 0); self->running = 1; } static void pinger_stop (PINGER *self) { if (!self->running) return; self->running = 0; pthread_cancel (self->thrid); pthread_join (self->thrid, 0); } static void pinger_init (PINGER *self) { memset (self, 0, sizeof (PINGER)); if (firstfree >= 0) { self->id = firstfree; firstfree = pingerfree [firstfree]; } else if (pingercnt == 0xffff) croak ("unable to create more than 65536 AnyEvent::FastPing objects"); else { if (pingercnt == pingermax) { pingermax = pingermax * 2 + 16; pingers = realloc (pingers , sizeof (pingers [0]) * pingermax); pingerfree = realloc (pingerfree, sizeof (pingerfree [0]) * pingermax); } self->id = pingercnt++; } pingers [self->id] = self; self->recvcb = &PL_sv_undef; self->interval = MIN_INTERVAL; self->maxrtt = 0.5; self->rangemax = 16; self->ranges = malloc (sizeof (self->ranges [0]) * self->rangemax); } static void pinger_free (PINGER *self) { pinger_stop (self); pingers [self->id] = 0; SvREFCNT_dec (self->recvq); SvREFCNT_dec (self->recvcb); pingerfree [self->id] = firstfree; firstfree = self->id; while (self->rangecnt) range_free (self->ranges [--self->rangecnt]); free (self->ranges); } /*****************************************************************************/ static void recv_feed (PINGER *self, void *addr, int addrlen, tstamp rtt) { if (!self->recvq) { /* first seen this round */ if (!SvOK (self->recvcb)) return; self->recvq = newAV (); self->nextrecv = firstrecv; firstrecv = self->id; } { AV *pkt = newAV (); av_extend (pkt, 2-1); AvARRAY (pkt)[0] = newSVpvn (addr, addrlen); AvARRAY (pkt)[1] = newSVnv (rtt); AvFILLp (pkt) = 2-1; av_push (self->recvq, newRV_noinc ((SV *)pkt)); } } static void recv_flush (void) { if (firstrecv < 0) return; ENTER; SAVETMPS; while (firstrecv >= 0) { dSP; PINGER *self = pingers [firstrecv]; firstrecv = self->nextrecv; self->nextrecv = -1; PUSHMARK (SP); XPUSHs (sv_2mortal (newRV_noinc ((SV *)self->recvq))); self->recvq = 0; PUTBACK; call_sv (self->recvcb, G_DISCARD | G_VOID); } FREETMPS; LEAVE; } /*****************************************************************************/ #if 0 static void feed_reply (AV *res_av) { dSP; SV *res = sv_2mortal (newRV_inc ((SV *)res_av)); int i; if (av_len (res_av) < 0) return; ENTER; SAVETMPS; for (i = av_len (cbs) + 1; i--; ) { SV *cb = *av_fetch (cbs, i, 1); PUSHMARK (SP); XPUSHs (res); PUTBACK; call_sv (cb, G_DISCARD | G_VOID); } FREETMPS; LEAVE; } #endif static void boot () { if (pipe (thr_res) < 0) croak ("AnyEvent::FastPing: unable to create receive pipe"); sv_setiv (get_sv ("AnyEvent::FastPing::THR_RES_FD", 1), thr_res [0]); icmp4_fd = socket (AF_INET, SOCK_RAW, IPPROTO_ICMP); fcntl (icmp4_fd, F_SETFL, O_NONBLOCK); #ifdef ICMP_FILTER { struct icmp_filter oval; oval.data = 0xffffffff & ~(1 << ICMP4_ECHO_REPLY); setsockopt (icmp4_fd, SOL_RAW, ICMP_FILTER, &oval, sizeof oval); } #endif #if ENABLE_IPV6 icmp6_fd = socket (AF_INET6, SOCK_RAW, IPPROTO_ICMPV6); fcntl (icmp6_fd, F_SETFL, O_NONBLOCK); # ifdef ICMP6_FILTER { struct icmp6_filter oval; ICMP6_FILTER_SETBLOCKALL (&oval); ICMP6_FILTER_SETPASS (ICMP6_ECHO_REPLY, &oval); setsockopt (icmp6_fd, IPPROTO_ICMPV6, ICMP6_FILTER, &oval, sizeof oval); } # endif #endif sv_setiv (get_sv ("AnyEvent::FastPing::ICMP4_FD", 1), icmp4_fd); sv_setiv (get_sv ("AnyEvent::FastPing::ICMP6_FD", 1), icmp6_fd); } MODULE = AnyEvent::FastPing PACKAGE = AnyEvent::FastPing PREFIX = pinger_ BOOT: { HV *stash = gv_stashpv ("AnyEvent::FastPing", 1); if (sizeof (PKT) & 3) croak ("size of PKT structure is not a multiple of 4"); boot (); newCONSTSUB (stash, "ipv4_supported", newSViv (icmp4_fd >= 0)); newCONSTSUB (stash, "ipv6_supported", newSViv (icmp6_fd >= 0)); newCONSTSUB (stash, "icmp4_pktsize", newSViv (HDR_SIZE_IP4 + sizeof (PKT))); newCONSTSUB (stash, "icmp6_pktsize", newSViv (HDR_SIZE_IP6 + sizeof (PKT))); } PROTOTYPES: DISABLE void _new (SV *klass, UV magic1, UV magic2, UV magic3) PPCODE: { SV *pv = NEWSV (0, sizeof (PINGER)); PINGER *self = (PINGER *)SvPVX (pv); SvPOK_only (pv); XPUSHs (sv_2mortal (sv_bless (newRV_noinc (pv), gv_stashpv (SvPV_nolen (klass), 1)))); pinger_init (self); self->magic1 = magic1; self->magic2 = magic2; self->magic3 = magic3; } void _free (PINGER *self) CODE: pinger_free (self); IV id (PINGER *self, ...) CODE: RETVAL = self->id; OUTPUT: RETVAL void pinger_start (PINGER *self) void pinger_stop (PINGER *self) void _stop_id (UV id) CODE: if (id < pingercnt && pingers [id]) pinger_stop (pingers [id]); void interval (PINGER *self, NV interval) CODE: self->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL; void add_range (PINGER *self, SV *lo_, SV *hi_, NV interval) CODE: { STRLEN lo_len, hi_len; char *lo = SvPVbyte (lo_, lo_len); char *hi = SvPVbyte (hi_, hi_len); RANGE *range; if (lo_len != hi_len || (lo_len != 4 && lo_len != 16)) croak ("AnyEvent::FastPing::add_range address range must be specified as two binary IPv4 or IPv6 addresses"); if (lo_len == 4 && icmp4_fd < 0) croak ("IPv4 support unavailable"); if (lo_len == 16 && icmp6_fd < 0) croak ("IPv6 support unavailable"); range = calloc (1, sizeof (RANGE)); range->next = 0; range->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL; range->addrlen = lo_len; memcpy (sizeof (addr_tt) - lo_len + (char *)&range->lo, lo, lo_len); memcpy (sizeof (addr_tt) - lo_len + (char *)&range->hi, hi, lo_len); if (self->rangecnt == self->rangemax) self->ranges = realloc (self->ranges, sizeof (self->ranges [0]) * (self->rangemax <<= 1)); self->ranges [self->rangecnt] = range; upheap (self, self->rangecnt); ++self->rangecnt; } void on_recv (PINGER *self, SV *cb) CODE: SvREFCNT_dec (self->recvcb); self->recvcb = newSVsv (cb); void max_rtt (PINGER *self, NV maxrtt) CODE: self->maxrtt = maxrtt; void _recv_icmp4 (...) CODE: { char buf [512]; struct sockaddr_in sa; for (;;) { PINGER *pinger; IP4HDR *iphdr = (IP4HDR *)buf; socklen_t sl = sizeof (sa); int len = recvfrom (icmp4_fd, buf, sizeof (buf), MSG_TRUNC, (struct sockaddr *)&sa, &sl); int hdrlen, totlen; PKT *pkt; if (len <= HDR_SIZE_IP4) break; hdrlen = (iphdr->version_ihl & 15) * 4; totlen = ntohs (iphdr->tot_len); // packet corrupt? if (totlen > len || iphdr->protocol != IPPROTO_ICMP || hdrlen < HDR_SIZE_IP4 || hdrlen + sizeof (PKT) != totlen) continue; pkt = (PKT *)(buf + hdrlen); if (pkt->type != ICMP4_ECHO_REPLY || pkt->pinger >= pingercnt || !pingers [pkt->pinger]) continue; pinger = pingers [pkt->pinger]; if (!pkt_is_valid_for (pkt, pinger)) continue; recv_feed (pinger, &sa.sin_addr, 4, NOW () - pkt_to_ts (pkt)); } recv_flush (); } void _recv_icmp6 (...) CODE: { struct sockaddr_in6 sa; PKT pkt; for (;;) { PINGER *pinger; socklen_t sl = sizeof (sa); int len = recvfrom (icmp6_fd, &pkt, sizeof (pkt), MSG_TRUNC, (struct sockaddr *)&sa, &sl); if (len != sizeof (PKT)) break; if (pkt.type != ICMP6_ECHO_REPLY || pkt.pinger >= pingercnt || !pingers [pkt.pinger]) continue; pinger = pingers [pkt.pinger]; if (!pkt_is_valid_for (&pkt, pinger)) continue; recv_feed (pinger, &sa.sin6_addr, 16, NOW () - pkt_to_ts (&pkt)); } recv_flush (); }