#if defined(__linux) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__CYGWIN__) # define ENABLE_IPV6 1 // if you get compilation problems try to disable IPv6 #else # define ENABLE_IPV6 0 #endif #include "EXTERN.h" #include "perl.h" #include "XSUB.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifdef __linux # include #endif #if ENABLE_IPV6 && !defined (__CYGWIN__) # include #endif #define ICMP4_ECHO 8 #define ICMP4_ECHO_REPLY 0 #define ICMP6_ECHO 128 #define ICMP6_ECHO_REPLY 129 #define DRAIN_INTERVAL 1e-6 // how long to wait when sendto returns ENOBUFS, in seconds #define MIN_INTERVAL 1e-6 // minimum packet send interval, in seconds #define HDR_SIZE_IP4 20 #define HDR_SIZE_IP6 48 //TODO: xread/xwrite for atomicity? we currently rely on the fact that the pip buffersize divides exactly by pointer sizes typedef uint8_t addr_tt[16]; /*****************************************************************************/ typedef double tstamp; static tstamp NOW (void) { struct timeval tv; gettimeofday (&tv, 0); return tv.tv_sec + tv.tv_usec * 1e-6; } static void ssleep (tstamp wait) { #if defined (__SVR4) && defined (__sun) struct timeval tv; tv.tv_sec = wait; tv.tv_usec = (wait - tv.tv_sec) * 1e6; select (0, 0, 0, 0, &tv); #elif defined(_WIN32) Sleep ((unsigned long)(delay * 1e3)); #else struct timespec ts; ts.tv_sec = wait; ts.tv_nsec = (wait - ts.tv_sec) * 1e9; nanosleep (&ts, 0); #endif } /*****************************************************************************/ typedef struct { int family; addr_tt lo, hi; double interval; tstamp next; } RANGE; typedef struct { SV *id; double interval; int nranges; RANGE *ranges; uint32_t payload; } REQ; typedef struct { uint8_t version_ihl; uint8_t tos; uint16_t tot_len; uint16_t id; uint16_t flags; uint8_t ttl; uint8_t protocol; uint16_t cksum; uint32_t src; uint32_t dst; } IP4HDR; typedef struct { uint8_t version; uint8_t x1, x2, x3; uint16_t payload_len; uint8_t nxt_hdr; uint8_t hop_limit; uint8_t src[16]; uint8_t dst[16]; } IP6HDR; /*****************************************************************************/ #define MAGIC 0xca4c static uint16_t magic; typedef struct { uint8_t type, code; uint16_t cksum; uint16_t id, seq; uint32_t payload; uint32_t stamp_hi; uint32_t stamp_lo; } PKT; static int pkt_is_valid (PKT *pkt) { return pkt->id == (uint16_t) magic && pkt->seq == (uint16_t)~magic; } static void ts_to_pkt (PKT *pkt, tstamp ts) { /* move 12 bits of seconds into the 32 bit fractional part */ /* leaving 20 bits subsecond resolution and 44 bits of integers */ /* (of which 32 are typically usable) */ ts *= 1. / 4096.; pkt->stamp_hi = ts; pkt->stamp_lo = (ts - pkt->stamp_hi) * 4294967296.; } static tstamp pkt_to_ts (PKT *pkt) { return pkt->stamp_hi * 4096. + pkt->stamp_lo * (4096. / 4294967296.); } /*****************************************************************************/ static pthread_t pthrid; static int thr_send[2]; // send to worker static int thr_recv[2]; // receive from worker static int icmp4_fd, icmp6_fd; static AV *cbs; static uint16_t icmp_cksum (void *data, unsigned int len) { register int sum = 0; uint32_t *wp; for (wp = (uint32_t *)data; len; wp++, len -= 4) sum += (*wp & 0xffff) + (*wp >> 16); sum = (sum >> 16) + (sum & 0xffff); /* add high 16 to low 16 */ sum += sum >> 16; /* add carry */ return ~sum; } static void inc_addr (addr_tt *addr) { int len = sizeof (addr_tt) - 1; do { if ((*addr)[len] != 0xff) { ++(*addr)[len]; break; } (*addr)[len] = 0; } while (len--); } static void * ping_proc (void *unused) { PKT pkt; struct sockaddr_in sa4; #if ENABLE_IPV6 struct sockaddr_in6 sa6; #endif memset (&pkt, 0, sizeof (pkt)); memset (&sa4, 0, sizeof (sa4)); sa4.sin_family = AF_INET; sa4.sin_port = 0; #if ENABLE_IPV6 memset (&sa6, 0, sizeof (sa6)); sa6.sin6_family = AF_INET6; sa6.sin6_port = 0; #endif for (;;) { REQ *req; int len = read (thr_send [0], &req, sizeof (req)); tstamp now = NOW (); tstamp next = now; if (!len) pthread_exit (0); else if (len != sizeof (req)) { perror ("AnyEvent::FastPing: short read or read error"); pthread_exit ((void *)-1); } //TODO: bind to source address pkt.code = 0; pkt.id = (uint16_t)magic; pkt.seq = (uint16_t)~magic; pkt.payload = req->payload; { int r; for (r = req->nranges; r--; ) inc_addr (&req->ranges [r].hi); } while (req->nranges) { RANGE *range = req->ranges; int n, k; if (!memcmp (&range->lo, &range->hi, sizeof (addr_tt))) req->ranges [0] = req->ranges [--req->nranges]; else { // ranges [0] is always the next range to ping tstamp wait = range->next - now; // compare with the global frequency limit { tstamp diff = next - now; if (wait < diff) wait = diff; else if (range) next = range->next; } if (wait > 0.) ssleep (wait); now = NOW (); ts_to_pkt (&pkt, now); pkt.cksum = 0; if (range->family == AF_INET) { pkt.type = ICMP4_ECHO; pkt.cksum = icmp_cksum (&pkt, sizeof (pkt)); memcpy (&sa4.sin_addr, sizeof (addr_tt) - sizeof (sa4.sin_addr) + (char *)&range->lo, sizeof (sa4.sin_addr)); if (sendto (icmp4_fd, &pkt, sizeof (pkt), 0, (struct sockaddr *)&sa4, sizeof (sa4)) > 0) errno = 0; } else { #if ENABLE_IPV6 pkt.type = ICMP6_ECHO; memcpy (&sa6.sin6_addr, sizeof (addr_tt) - sizeof (sa6.sin6_addr) + (char *)&range->lo, sizeof (sa6.sin6_addr)); if (sendto (icmp6_fd, &pkt, sizeof (pkt), 0, (struct sockaddr *)&sa6, sizeof (sa6)) > 0) errno = 0; #endif } if (errno == ENOBUFS) ssleep (DRAIN_INTERVAL); else { inc_addr (&range->lo); range->next = next; range->next += range->interval; } next += req->interval; } // make a downheap operation for (n = k = 0; ; ) { int j = k * 2 + 1; ++n; if (j >= req->nranges) break; else if (j < req->nranges - 1) if (req->ranges [j].next > req->ranges [j + 1].next) ++j; if (req->ranges [j].next >= req->ranges [k].next) break; { RANGE temp = req->ranges [k]; req->ranges [k] = req->ranges [j]; req->ranges [j] = temp; } k = j; } } write (thr_recv [1], &req, sizeof (req)); } return 0; } static void feed_reply (AV *res_av) { dSP; SV *res = sv_2mortal (newRV_inc ((SV *)res_av)); int i; if (av_len (res_av) < 0) return; ENTER; SAVETMPS; for (i = av_len (cbs) + 1; i--; ) { SV *cb = *av_fetch (cbs, i, 1); PUSHMARK (SP); XPUSHs (res); PUTBACK; call_sv (cb, G_DISCARD | G_VOID); } FREETMPS; LEAVE; } static void boot () { sigset_t fullsigset, oldsigset; pthread_attr_t attr; if (pipe (thr_send) < 0) croak ("AnyEvent::FastPing: unable to create send pipe"); if (pipe (thr_recv) < 0) croak ("AnyEvent::FastPing: unable to create receive pipe"); icmp4_fd = socket (AF_INET, SOCK_RAW, IPPROTO_ICMP); fcntl (icmp4_fd, F_SETFL, O_NONBLOCK); #ifdef ICMP_FILTER { struct icmp_filter oval; oval.data = 0xffffffff & ~(1 << ICMP4_ECHO_REPLY); setsockopt (icmp4_fd, SOL_RAW, ICMP_FILTER, &oval, sizeof oval); } #endif #if ENABLE_IPV6 icmp6_fd = socket (AF_INET6, SOCK_RAW, IPPROTO_ICMPV6); fcntl (icmp6_fd, F_SETFL, O_NONBLOCK); # ifdef ICMP6_FILTER { struct icmp6_filter oval; ICMP6_FILTER_SETBLOCKALL (&oval); ICMP6_FILTER_SETPASS (ICMP6_ECHO_REPLY, &oval); setsockopt (icmp6_fd, IPPROTO_ICMPV6, ICMP6_FILTER, &oval, sizeof oval); } # endif #endif pthread_attr_init (&attr); pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); #ifdef PTHREAD_SCOPE_PROCESS pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS); #endif sigfillset (&fullsigset); pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset); if (pthread_create (&pthrid, &attr, ping_proc, 0)) croak ("AnyEvent::FastPing: unable to create pinger thread"); pthread_sigmask (SIG_SETMASK, &oldsigset, 0); sv_setiv (get_sv ("AnyEvent::FastPing::THR_REQ_FD", 1), thr_send [1]); sv_setiv (get_sv ("AnyEvent::FastPing::THR_RES_FD", 1), thr_recv [0]); sv_setiv (get_sv ("AnyEvent::FastPing::ICMP4_FD", 1), icmp4_fd); sv_setiv (get_sv ("AnyEvent::FastPing::ICMP6_FD", 1), icmp6_fd); } MODULE = AnyEvent::FastPing PACKAGE = AnyEvent::FastPing BOOT: { HV *stash = gv_stashpv ("AnyEvent::FastPing", 1); cbs = get_av ("AnyEvent::FastPing::CB", 1); magic = getpid () ^ MAGIC; if (sizeof (PKT) & 3) croak ("size of PKT structure is not a multiple of 4"); boot (); newCONSTSUB (stash, "ipv4_supported", newSViv (icmp4_fd >= 0)); newCONSTSUB (stash, "ipv6_supported", newSViv (icmp6_fd >= 0)); newCONSTSUB (stash, "icmp4_pktsize", newSViv (HDR_SIZE_IP4 + sizeof (PKT))); newCONSTSUB (stash, "icmp6_pktsize", newSViv (HDR_SIZE_IP6 + sizeof (PKT))); } PROTOTYPES: DISABLE SV * _req_icmp_ping (SV *ranges, NV interval, U32 payload, SV *id) CODE: { AV *rav; int nranges, i; REQ *req; if (!SvROK (ranges) || SvTYPE (SvRV (ranges)) != SVt_PVAV) croak ("address ranges must be given as arrayref with lo, hi pairs"); rav = (AV *)SvRV (ranges); nranges = av_len (rav) + 1; req = malloc (sizeof (REQ)); if (interval < MIN_INTERVAL) interval = MIN_INTERVAL; req->id = newSVsv (id); req->interval = interval; req->payload = payload; req->nranges = nranges; req->ranges = (RANGE *)malloc (nranges * sizeof (RANGE)); while (nranges--) { SV *sv = *av_fetch (rav, nranges, 1); SV *lo, *hi; AV *av; RANGE *r; if (!SvROK (sv) || SvTYPE (SvRV (sv)) != SVt_PVAV) croak ("address range must be given as arrayref with lo, hi, interval arrayrefs"); av = (AV *)SvRV (sv); r = req->ranges + nranges; lo = *av_fetch (av, 0, 1); hi = *av_fetch (av, 1, 1); sv_utf8_downgrade (lo, 0); sv_utf8_downgrade (hi, 0); memset (&r->lo, 0, sizeof (addr_tt)); memset (&r->hi, 0, sizeof (addr_tt)); if (SvPOKp (lo) && SvPOKp (hi)) { if (SvCUR (lo) != SvCUR (hi)) croak ("all addresses in range must be of the same size (either 4 or 16 bytes)"); if (SvCUR (lo) == 4) { r->family = AF_INET; memcpy (sizeof (addr_tt) - 4 + (char *)&r->lo, SvPVX (lo), 4); memcpy (sizeof (addr_tt) - 4 + (char *)&r->hi, SvPVX (hi), 4); } else if (SvCUR (lo) == 16) { #if ENABLE_IPV6 r->family = AF_INET6; memcpy (&r->lo, SvPVX (lo), sizeof (addr_tt)); memcpy (&r->hi, SvPVX (hi), sizeof (addr_tt)); #else croak ("IPv6 not supported in this configuration"); #endif } else croak ("addresses in range must be either 4 (IPv4) or 16 (IPv6) bytes in length"); } else if (SvIOK (lo) && SvIOK (hi)) { uint32_t addr; r->family = AF_INET; addr = htonl (SvUV (lo)); memcpy (sizeof (addr_tt) - 4 + (char *)&r->lo, &addr, 4); addr = htonl (SvUV (hi)); memcpy (sizeof (addr_tt) - 4 + (char *)&r->hi, &addr, 4); } else croak ("addresses in range must be strings with either 4 (IPv4) or 16 (IPv6) octets"); if (r->family == AF_INET) { if (icmp4_fd < 0) croak ("AnyEvent::FastPing: IPv4 ping support not available on this system"); } else { if (icmp6_fd < 0) croak ("AnyEvent::FastPing: IPv6 ping support not available on this system"); } r->interval = SvNV (*av_fetch (av, 2, 1)); if (r->interval < req->interval) r->interval = req->interval; r->next = 0.; } RETVAL = newSVpvn ((char *)&req, sizeof (req)); } OUTPUT: RETVAL SV * _read_res () CODE: { REQ *req; if (read (thr_recv [0], &req, sizeof (req)) != sizeof (req)) RETVAL = &PL_sv_undef; RETVAL = req->id; free (req->ranges); free (req); } OUTPUT: RETVAL void _recv_icmp4 (...) CODE: { char buf [512]; struct sockaddr_in sa; socklen_t sl = sizeof (sa); AV *res_av = av_len (cbs) < 0 ? 0 : (AV *)sv_2mortal ((SV *)newAV ()); tstamp now = NOW (); if (!res_av) XSRETURN_UNDEF; for (;;) { IP4HDR *iphdr = (IP4HDR *)buf; int len = recvfrom (icmp4_fd, buf, sizeof (buf), MSG_TRUNC, (struct sockaddr *)&sa, &sl); int hdrlen, totlen; PKT *pkt; if (len <= HDR_SIZE_IP4) break; hdrlen = (iphdr->version_ihl & 15) * 4; totlen = ntohs (iphdr->tot_len); // packet corrupt? if (totlen > len || iphdr->protocol != IPPROTO_ICMP || hdrlen < HDR_SIZE_IP4 || hdrlen + sizeof (PKT) != totlen) continue; pkt = (PKT *)(buf + hdrlen); if (pkt->type != ICMP4_ECHO_REPLY || !pkt_is_valid (pkt)) continue; { AV *av = newAV (); av_push (av, newSVpvn ((char *)&sa.sin_addr, 4)); av_push (av, newSVnv (now - pkt_to_ts (pkt))); av_push (av, newSVuv (pkt->payload)); av_push (res_av, newRV_noinc ((SV *)av)); } } feed_reply (res_av); } void _recv_icmp6 (...) CODE: { struct sockaddr_in6 sa; socklen_t sl = sizeof (sa); AV *res_av = av_len (cbs) < 0 ? 0 : (AV *)sv_2mortal ((SV *)newAV ()); PKT pkt; tstamp now = NOW (); if (!res_av) XSRETURN_UNDEF; for (;;) { int len = recvfrom (icmp6_fd, &pkt, sizeof (pkt), MSG_TRUNC, (struct sockaddr *)&sa, &sl); if (len != sizeof (PKT)) break; if (pkt.type != ICMP6_ECHO_REPLY || !pkt_is_valid (&pkt)) continue; { AV *av = newAV (); av_push (av, newSVpvn ((char *)&sa.sin6_addr, 16)); av_push (av, newSVnv (now - pkt_to_ts (&pkt))); av_push (av, newSVuv (pkt.payload)); av_push (res_av, newRV_noinc ((SV *)av)); } } feed_reply (res_av); }