ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FastPing/FastPing.xs
Revision: 1.13
Committed: Sat Feb 5 23:37:21 2011 UTC (13 years, 3 months ago) by root
Branch: MAIN
CVS Tags: rel-2_01
Changes since 1.12: +57 -16 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.8 #if defined(__linux) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__CYGWIN__)
2 root 1.5 # define ENABLE_IPV6 1 // if you get compilation problems try to disable IPv6
3 root 1.2 #else
4 root 1.5 # define ENABLE_IPV6 0
5 root 1.2 #endif
6 root 1.1
7     #include "EXTERN.h"
8     #include "perl.h"
9     #include "XSUB.h"
10    
11     #include <pthread.h>
12    
13     #include <stdio.h>
14     #include <stdlib.h>
15     #include <string.h>
16    
17     #include <time.h>
18     #include <poll.h>
19     #include <unistd.h>
20     #include <inttypes.h>
21     #include <fcntl.h>
22     #include <errno.h>
23 root 1.13 #include <limits.h>
24 root 1.1
25     #include <sys/types.h>
26     #include <sys/time.h>
27     #include <sys/socket.h>
28    
29     #include <netinet/in.h>
30     #include <arpa/inet.h>
31    
32     #ifdef __linux
33     # include <linux/icmp.h>
34     #endif
35 root 1.8 #if ENABLE_IPV6 && !defined (__CYGWIN__)
36 root 1.1 # include <netinet/icmp6.h>
37     #endif
38    
39     #define ICMP4_ECHO 8
40     #define ICMP4_ECHO_REPLY 0
41     #define ICMP6_ECHO 128
42     #define ICMP6_ECHO_REPLY 129
43    
44 root 1.9 #define DRAIN_INTERVAL 1e-6 // how long to wait when sendto returns ENOBUFS, in seconds
45     #define MIN_INTERVAL 1e-6 // minimum packet send interval, in seconds
46 root 1.1
47     #define HDR_SIZE_IP4 20
48     #define HDR_SIZE_IP6 48
49    
50 root 1.11 static int thr_res[2]; // worker thread finished status
51     static int icmp4_fd = -1;
52     static int icmp6_fd = -1;
53 root 1.10
54 root 1.9 /*****************************************************************************/
55    
56 root 1.1 typedef double tstamp;
57    
58 root 1.5 static tstamp
59     NOW (void)
60 root 1.1 {
61     struct timeval tv;
62 root 1.9
63 root 1.1 gettimeofday (&tv, 0);
64 root 1.9
65     return tv.tv_sec + tv.tv_usec * 1e-6;
66 root 1.1 }
67    
68 root 1.9 static void
69     ssleep (tstamp wait)
70     {
71     #if defined (__SVR4) && defined (__sun)
72     struct timeval tv;
73    
74     tv.tv_sec = wait;
75     tv.tv_usec = (wait - tv.tv_sec) * 1e6;
76    
77     select (0, 0, 0, 0, &tv);
78     #elif defined(_WIN32)
79     Sleep ((unsigned long)(delay * 1e3));
80     #else
81     struct timespec ts;
82    
83     ts.tv_sec = wait;
84     ts.tv_nsec = (wait - ts.tv_sec) * 1e9;
85    
86     nanosleep (&ts, 0);
87     #endif
88     }
89    
90     /*****************************************************************************/
91    
92 root 1.8 typedef struct
93     {
94 root 1.1 uint8_t version_ihl;
95     uint8_t tos;
96     uint16_t tot_len;
97    
98     uint16_t id;
99     uint16_t flags;
100    
101     uint8_t ttl;
102     uint8_t protocol;
103     uint16_t cksum;
104    
105     uint32_t src;
106     uint32_t dst;
107     } IP4HDR;
108    
109 root 1.11 /*****************************************************************************/
110 root 1.1
111 root 1.11 typedef uint8_t addr_tt[16];
112 root 1.9
113 root 1.10 typedef struct
114     {
115     tstamp next;
116 root 1.12 tstamp interval;
117 root 1.10 int addrlen;
118 root 1.11
119     addr_tt lo, hi; /* only if !addrcnt */
120    
121     int addrcnt;
122     /* addrcnt addresses follow */
123 root 1.10 } RANGE;
124    
125     typedef struct
126     {
127     RANGE **ranges;
128     int rangecnt, rangemax;
129 root 1.1
130 root 1.12 tstamp next;
131 root 1.10 tstamp interval;
132 root 1.12
133 root 1.10 tstamp maxrtt;
134 root 1.12
135 root 1.10 uint16_t magic1;
136     uint16_t magic2;
137     uint16_t magic3;
138    
139     int id;
140    
141     AV *recvq; /* receive queue */
142     int nextrecv;
143     SV *recvcb;
144    
145     pthread_t thrid;
146     int running;
147     } PINGER;
148    
149     static PINGER **pingers;
150     static int *pingerfree; /* freelist next */
151     static int pingercnt;
152     static int pingermax;
153     static int firstfree = -1;
154     static int firstrecv = -1;
155    
156     /*****************************************************************************/
157 root 1.1
158 root 1.8 typedef struct
159     {
160 root 1.1 uint8_t type, code;
161     uint16_t cksum;
162 root 1.11
163 root 1.1 uint16_t id, seq;
164 root 1.11
165 root 1.10 uint16_t pinger;
166     uint16_t magic;
167 root 1.11
168 root 1.9 uint32_t stamp_hi;
169     uint32_t stamp_lo;
170 root 1.1 } PKT;
171    
172 root 1.9 static int
173 root 1.10 pkt_is_valid_for (PKT *pkt, PINGER *pinger)
174 root 1.9 {
175 root 1.10 return pkt->id == pinger->magic1
176     && pkt->seq == pinger->magic2
177     && pkt->magic == pinger->magic3;
178 root 1.9 }
179    
180     static void
181     ts_to_pkt (PKT *pkt, tstamp ts)
182     {
183     /* move 12 bits of seconds into the 32 bit fractional part */
184     /* leaving 20 bits subsecond resolution and 44 bits of integers */
185     /* (of which 32 are typically usable) */
186     ts *= 1. / 4096.;
187    
188     pkt->stamp_hi = ts;
189     pkt->stamp_lo = (ts - pkt->stamp_hi) * 4294967296.;
190     }
191    
192     static tstamp
193     pkt_to_ts (PKT *pkt)
194     {
195     return pkt->stamp_hi * 4096.
196     + pkt->stamp_lo * (4096. / 4294967296.);
197     }
198    
199 root 1.10 static void
200     pkt_cksum (PKT *pkt)
201     {
202     uint_fast32_t sum = -pkt->cksum;
203     uint32_t *wp = (uint32_t *)pkt;
204     int len = sizeof (*pkt) / 4;
205    
206     do
207     {
208     uint_fast32_t w = *(volatile uint32_t *)wp++;
209     sum += (w & 0xffff) + (w >> 16);
210     }
211     while (len--);
212    
213     sum = (sum >> 16) + (sum & 0xffff); /* add high 16 to low 16 */
214     sum += sum >> 16; /* add carry */
215    
216     pkt->cksum = ~sum;
217     }
218    
219 root 1.9 /*****************************************************************************/
220    
221 root 1.10 static void
222     range_free (RANGE *self)
223     {
224     free (self);
225     }
226    
227     /* like sendto, but retries on failure */
228     static void
229     xsendto (int fd, void *buf, size_t len, int flags, void *sa, int salen)
230 root 1.1 {
231 root 1.11 tstamp wait = DRAIN_INTERVAL / 2.;
232 root 1.10
233     while (sendto (fd, buf, len, flags, sa, salen) < 0 && errno == ENOBUFS)
234 root 1.11 ssleep (wait *= 2.);
235 root 1.10 }
236    
237 root 1.11 // ping current address, return true and increment if more to ping
238     static int
239     range_send_ping (RANGE *self, PKT *pkt)
240 root 1.10 {
241 root 1.11 // send ping
242     uint8_t *addr;
243 root 1.13 int addrlen;
244 root 1.11
245     if (self->addrcnt)
246     addr = (self->addrcnt - 1) * self->addrlen + (uint8_t *)(self + 1);
247     else
248     addr = sizeof (addr_tt) - self->addrlen + self->lo;
249    
250 root 1.13 addrlen = self->addrlen;
251    
252     /* convert ipv4 mapped addresses - this only works for host lists */
253     /* this tries to match 0000:0000:0000:0000:0000:ffff:a.b.c.d */
254     /* efficiently but also with few insns */
255     if (addrlen == 16 && !addr [0] && icmp4_fd >= 0
256     && !( addr [ 1]
257     | addr [ 2] | addr [ 3]
258     | addr [ 4] | addr [ 5]
259     | addr [ 6] | addr [ 7]
260     | addr [ 8] | addr [ 9]
261     | (255-addr [10]) | (255-addr [11])))
262     {
263     addr += 12;
264     addrlen -= 12;
265     }
266    
267 root 1.10 pkt->cksum = 0;
268    
269 root 1.13 if (addrlen == 4)
270 root 1.10 {
271     struct sockaddr_in sa;
272    
273     pkt->type = ICMP4_ECHO;
274     pkt_cksum (pkt);
275    
276     sa.sin_family = AF_INET;
277     sa.sin_port = 0;
278    
279 root 1.11 memcpy (&sa.sin_addr, addr, sizeof (sa.sin_addr));
280 root 1.10
281     xsendto (icmp4_fd, pkt, sizeof (*pkt), 0, &sa, sizeof (sa));
282     }
283     else
284     {
285     #if ENABLE_IPV6
286     struct sockaddr_in6 sa;
287    
288     pkt->type = ICMP6_ECHO;
289 root 1.1
290 root 1.10 sa.sin6_family = AF_INET6;
291     sa.sin6_port = 0;
292     sa.sin6_flowinfo = 0;
293     sa.sin6_scope_id = 0;
294 root 1.1
295 root 1.11 memcpy (&sa.sin6_addr, addr, sizeof (sa.sin6_addr));
296 root 1.10
297 root 1.11 xsendto (icmp6_fd, pkt, sizeof (*pkt), 0, &sa, sizeof (sa));
298 root 1.10 #endif
299     }
300 root 1.11
301     // see if we have any more addresses
302     if (self->addrcnt)
303     {
304     if (!--self->addrcnt)
305     return 0;
306     }
307     else
308     {
309     if (!memcmp (&self->lo, &self->hi, sizeof (addr_tt)))
310     return 0;
311    
312     // increment self->lo
313     {
314     int len = sizeof (addr_tt) - 1;
315    
316     while (!++self->lo [len])
317     --len;
318     }
319     }
320    
321     return 1;
322 root 1.10 }
323    
324 root 1.11 /*****************************************************************************/
325    
326 root 1.10 static void
327     downheap (PINGER *self)
328     {
329     RANGE *elem = self->ranges [0]; /* always exists */
330     int Nm1 = self->rangecnt - 1;
331     int j;
332     int k;
333    
334     for (k = 0; ; )
335     {
336     int j = k * 2 + 1;
337    
338     if (j > Nm1)
339     break;
340    
341     if (j < Nm1
342     && self->ranges [j]->next > self->ranges [j + 1]->next)
343     ++j;
344    
345     if (self->ranges [j]->next >= elem->next)
346     break;
347    
348     self->ranges [k] = self->ranges [j];
349    
350     k = j;
351     }
352 root 1.1
353 root 1.10 self->ranges [k] = elem;
354 root 1.1 }
355    
356     static void
357 root 1.10 upheap (PINGER *self, int k)
358 root 1.1 {
359 root 1.10 RANGE *elem = self->ranges [k];
360 root 1.1
361 root 1.10 while (k)
362 root 1.1 {
363 root 1.10 int j = (k - 1) >> 1;
364    
365     if (self->ranges [j]->next <= elem->next)
366     break;
367    
368     self->ranges [k] = self->ranges [j];
369 root 1.1
370 root 1.10 k = j;
371 root 1.1 }
372 root 1.10
373     self->ranges [k] = elem;
374 root 1.1 }
375    
376     static void *
377 root 1.10 ping_proc (void *self_)
378 root 1.1 {
379 root 1.10 PINGER *self = (PINGER *)self_;
380 root 1.1 PKT pkt;
381    
382     memset (&pkt, 0, sizeof (pkt));
383    
384 root 1.12 tstamp now = NOW ();
385 root 1.10
386 root 1.12 pkt.code = 0;
387     pkt.id = self->magic1;
388     pkt.seq = self->magic2;
389     pkt.magic = self->magic3;
390     pkt.pinger = self->id;
391    
392     if (self->next < now)
393     self->next = now;
394 root 1.1
395 root 1.10 while (self->rangecnt)
396 root 1.1 {
397 root 1.10 RANGE *range = self->ranges [0];
398 root 1.1
399 root 1.10 // ranges [0] is always the next range to ping
400     tstamp wait = range->next - now;
401 root 1.6
402 root 1.10 // compare with the global frequency limit
403     {
404 root 1.12 tstamp diff = self->next - now;
405 root 1.10
406     if (wait < diff)
407 root 1.12 wait = diff; // global rate limit overrides
408     else
409     self->next = range->next; // fast forward
410 root 1.10 }
411    
412     if (wait > 0.)
413     ssleep (wait);
414    
415     now = NOW ();
416    
417     ts_to_pkt (&pkt, now);
418    
419 root 1.11 if (!range_send_ping (range, &pkt))
420 root 1.1 {
421 root 1.10 self->ranges [0] = self->ranges [--self->rangecnt];
422     range_free (range);
423     }
424     else
425 root 1.12 range->next = self->next + range->interval;
426 root 1.10
427 root 1.11 downheap (self);
428 root 1.1
429 root 1.12 self->next += self->interval;
430     now = NOW ();
431 root 1.10 }
432    
433     ssleep (self->maxrtt);
434    
435     {
436     uint16_t id = self->id;
437 root 1.1
438 root 1.10 write (thr_res [1], &id, sizeof (id));
439     }
440    
441     return 0;
442     }
443    
444     /*****************************************************************************/
445    
446 root 1.13 /* NetBSD, Solaris... */
447     #ifndef PTHREAD_STACK_MIN
448     # define PTHREAD_STACK_MIN 0
449     #endif
450    
451 root 1.10 static void
452     pinger_start (PINGER *self)
453     {
454     sigset_t fullsigset, oldsigset;
455     pthread_attr_t attr;
456    
457     if (self->running)
458     return;
459    
460     sigfillset (&fullsigset);
461 root 1.1
462 root 1.10 pthread_attr_init (&attr);
463     pthread_attr_setstacksize (&attr, PTHREAD_STACK_MIN < sizeof (long) * 2048 ? sizeof (long) * 2048 : PTHREAD_STACK_MIN);
464    
465     pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);
466    
467     if (pthread_create (&self->thrid, &attr, ping_proc, (void *)self))
468     croak ("AnyEvent::FastPing: unable to create pinger thread");
469    
470     pthread_sigmask (SIG_SETMASK, &oldsigset, 0);
471    
472     self->running = 1;
473     }
474    
475     static void
476     pinger_stop (PINGER *self)
477     {
478     if (!self->running)
479     return;
480    
481     self->running = 0;
482     pthread_cancel (self->thrid);
483     pthread_join (self->thrid, 0);
484     }
485    
486     static void
487     pinger_init (PINGER *self)
488     {
489     memset (self, 0, sizeof (PINGER));
490 root 1.1
491 root 1.10 if (firstfree >= 0)
492     {
493     self->id = firstfree;
494     firstfree = pingerfree [firstfree];
495     }
496     else if (pingercnt == 0xffff)
497     croak ("unable to create more than 65536 AnyEvent::FastPing objects");
498     else
499     {
500     if (pingercnt == pingermax)
501 root 1.1 {
502 root 1.10 pingermax = pingermax * 2 + 16;
503     pingers = realloc (pingers , sizeof (pingers [0]) * pingermax);
504     pingerfree = realloc (pingerfree, sizeof (pingerfree [0]) * pingermax);
505     }
506    
507     self->id = pingercnt++;
508     }
509    
510     pingers [self->id] = self;
511    
512     self->recvcb = &PL_sv_undef;
513 root 1.12 self->next = 0.;
514 root 1.10 self->interval = MIN_INTERVAL;
515     self->maxrtt = 0.5;
516     self->rangemax = 16;
517     self->ranges = malloc (sizeof (self->ranges [0]) * self->rangemax);
518     }
519    
520     static void
521     pinger_free (PINGER *self)
522     {
523     pinger_stop (self);
524    
525     pingers [self->id] = 0;
526    
527     SvREFCNT_dec (self->recvq);
528     SvREFCNT_dec (self->recvcb);
529    
530     pingerfree [self->id] = firstfree;
531     firstfree = self->id;
532    
533     while (self->rangecnt)
534     range_free (self->ranges [--self->rangecnt]);
535    
536     free (self->ranges);
537     }
538    
539 root 1.11 static void
540     pinger_add_range (PINGER *self, RANGE *range)
541     {
542     if (self->rangecnt == self->rangemax)
543     self->ranges = realloc (self->ranges, sizeof (self->ranges [0]) * (self->rangemax <<= 1));
544    
545     self->ranges [self->rangecnt] = range;
546     upheap (self, self->rangecnt);
547     ++self->rangecnt;
548     }
549    
550 root 1.10 /*****************************************************************************/
551    
552     static void
553     recv_feed (PINGER *self, void *addr, int addrlen, tstamp rtt)
554     {
555     if (!self->recvq)
556     {
557     /* first seen this round */
558     if (!SvOK (self->recvcb))
559     return;
560    
561     self->recvq = newAV ();
562    
563     self->nextrecv = firstrecv;
564     firstrecv = self->id;
565     }
566 root 1.1
567 root 1.10 {
568     AV *pkt = newAV ();
569    
570     av_extend (pkt, 2-1);
571    
572     AvARRAY (pkt)[0] = newSVpvn (addr, addrlen);
573     AvARRAY (pkt)[1] = newSVnv (rtt);
574     AvFILLp (pkt) = 2-1;
575 root 1.1
576 root 1.10 av_push (self->recvq, newRV_noinc ((SV *)pkt));
577     }
578     }
579 root 1.1
580 root 1.10 static void
581     recv_flush (void)
582     {
583     if (firstrecv < 0)
584     return;
585    
586     ENTER;
587     SAVETMPS;
588 root 1.1
589 root 1.11 do
590 root 1.10 {
591     dSP;
592     PINGER *self = pingers [firstrecv];
593     firstrecv = self->nextrecv;
594 root 1.1
595 root 1.10 self->nextrecv = -1;
596 root 1.1
597 root 1.10 PUSHMARK (SP);
598     XPUSHs (sv_2mortal (newRV_noinc ((SV *)self->recvq)));
599     self->recvq = 0;
600     PUTBACK;
601     call_sv (self->recvcb, G_DISCARD | G_VOID);
602 root 1.1 }
603 root 1.11 while (firstrecv >= 0);
604 root 1.1
605 root 1.10 FREETMPS;
606     LEAVE;
607 root 1.1 }
608    
609 root 1.10 /*****************************************************************************/
610    
611     #if 0
612 root 1.1 static void
613     feed_reply (AV *res_av)
614     {
615     dSP;
616     SV *res = sv_2mortal (newRV_inc ((SV *)res_av));
617     int i;
618    
619 root 1.6 if (av_len (res_av) < 0)
620     return;
621    
622 root 1.1 ENTER;
623     SAVETMPS;
624    
625     for (i = av_len (cbs) + 1; i--; )
626     {
627     SV *cb = *av_fetch (cbs, i, 1);
628    
629     PUSHMARK (SP);
630     XPUSHs (res);
631     PUTBACK;
632     call_sv (cb, G_DISCARD | G_VOID);
633     }
634    
635     FREETMPS;
636     LEAVE;
637     }
638 root 1.10 #endif
639 root 1.1
640     static void
641     boot ()
642     {
643 root 1.10 if (pipe (thr_res) < 0)
644     croak ("AnyEvent::FastPing: unable to create receive pipe");
645 root 1.1
646 root 1.10 sv_setiv (get_sv ("AnyEvent::FastPing::THR_RES_FD", 1), thr_res [0]);
647 root 1.1
648     icmp4_fd = socket (AF_INET, SOCK_RAW, IPPROTO_ICMP);
649 root 1.2 fcntl (icmp4_fd, F_SETFL, O_NONBLOCK);
650 root 1.1 #ifdef ICMP_FILTER
651     {
652     struct icmp_filter oval;
653     oval.data = 0xffffffff & ~(1 << ICMP4_ECHO_REPLY);
654     setsockopt (icmp4_fd, SOL_RAW, ICMP_FILTER, &oval, sizeof oval);
655     }
656     #endif
657    
658 root 1.5 #if ENABLE_IPV6
659 root 1.1 icmp6_fd = socket (AF_INET6, SOCK_RAW, IPPROTO_ICMPV6);
660 root 1.2 fcntl (icmp6_fd, F_SETFL, O_NONBLOCK);
661 root 1.1 # ifdef ICMP6_FILTER
662     {
663     struct icmp6_filter oval;
664     ICMP6_FILTER_SETBLOCKALL (&oval);
665     ICMP6_FILTER_SETPASS (ICMP6_ECHO_REPLY, &oval);
666     setsockopt (icmp6_fd, IPPROTO_ICMPV6, ICMP6_FILTER, &oval, sizeof oval);
667     }
668     # endif
669     #endif
670    
671     sv_setiv (get_sv ("AnyEvent::FastPing::ICMP4_FD", 1), icmp4_fd);
672     sv_setiv (get_sv ("AnyEvent::FastPing::ICMP6_FD", 1), icmp6_fd);
673     }
674    
675 root 1.12 #define NOT_RUNNING \
676     if (self->running) \
677     croak ("AnyEvent::FastPing object has been started - you have to sotp t first before calling this method, caught");
678    
679 root 1.10 MODULE = AnyEvent::FastPing PACKAGE = AnyEvent::FastPing PREFIX = pinger_
680 root 1.1
681     BOOT:
682     {
683     HV *stash = gv_stashpv ("AnyEvent::FastPing", 1);
684    
685 root 1.9 if (sizeof (PKT) & 3)
686     croak ("size of PKT structure is not a multiple of 4");
687    
688 root 1.1 boot ();
689    
690     newCONSTSUB (stash, "ipv4_supported", newSViv (icmp4_fd >= 0));
691     newCONSTSUB (stash, "ipv6_supported", newSViv (icmp6_fd >= 0));
692    
693     newCONSTSUB (stash, "icmp4_pktsize", newSViv (HDR_SIZE_IP4 + sizeof (PKT)));
694     newCONSTSUB (stash, "icmp6_pktsize", newSViv (HDR_SIZE_IP6 + sizeof (PKT)));
695     }
696    
697     PROTOTYPES: DISABLE
698    
699 root 1.10 void
700 root 1.11 _recv_icmp4 (...)
701     CODE:
702     {
703     char buf [512];
704     struct sockaddr_in sa;
705     int maxrecv;
706    
707     for (maxrecv = 256+1; --maxrecv; )
708     {
709     PINGER *pinger;
710     IP4HDR *iphdr = (IP4HDR *)buf;
711     socklen_t sl = sizeof (sa);
712     int len = recvfrom (icmp4_fd, buf, sizeof (buf), MSG_TRUNC, (struct sockaddr *)&sa, &sl);
713     int hdrlen, totlen;
714     PKT *pkt;
715    
716     if (len <= HDR_SIZE_IP4)
717     break;
718    
719     hdrlen = (iphdr->version_ihl & 15) * 4;
720     totlen = ntohs (iphdr->tot_len);
721    
722     if (totlen > len
723     || iphdr->protocol != IPPROTO_ICMP
724     || hdrlen < HDR_SIZE_IP4 || hdrlen + sizeof (PKT) != totlen)
725     continue;
726    
727     pkt = (PKT *)(buf + hdrlen);
728    
729     if (pkt->type != ICMP4_ECHO_REPLY
730     || pkt->pinger >= pingercnt
731     || !pingers [pkt->pinger])
732     continue;
733    
734     pinger = pingers [pkt->pinger];
735    
736     if (!pkt_is_valid_for (pkt, pinger))
737     continue;
738    
739     recv_feed (pinger, &sa.sin_addr, 4, NOW () - pkt_to_ts (pkt));
740     }
741    
742     recv_flush ();
743     }
744    
745     void
746     _recv_icmp6 (...)
747     CODE:
748     {
749     struct sockaddr_in6 sa;
750     PKT pkt;
751     int maxrecv;
752    
753     for (maxrecv = 256+1; --maxrecv; )
754     {
755     PINGER *pinger;
756     socklen_t sl = sizeof (sa);
757     int len = recvfrom (icmp6_fd, &pkt, sizeof (pkt), MSG_TRUNC, (struct sockaddr *)&sa, &sl);
758    
759     if (len != sizeof (PKT))
760     break;
761    
762     if (pkt.type != ICMP6_ECHO_REPLY
763     || pkt.pinger >= pingercnt
764     || !pingers [pkt.pinger])
765     continue;
766    
767     pinger = pingers [pkt.pinger];
768    
769     if (!pkt_is_valid_for (&pkt, pinger))
770     continue;
771    
772     recv_feed (pinger, &sa.sin6_addr, 16, NOW () - pkt_to_ts (&pkt));
773     }
774    
775     recv_flush ();
776     }
777    
778     void
779 root 1.10 _new (SV *klass, UV magic1, UV magic2, UV magic3)
780     PPCODE:
781     {
782     SV *pv = NEWSV (0, sizeof (PINGER));
783     PINGER *self = (PINGER *)SvPVX (pv);
784 root 1.12
785 root 1.10 SvPOK_only (pv);
786 root 1.11 XPUSHs (sv_2mortal (sv_bless (newRV_noinc (pv), gv_stashpv (SvPVutf8_nolen (klass), 1))));
787 root 1.10 pinger_init (self);
788     self->magic1 = magic1;
789     self->magic2 = magic2;
790     self->magic3 = magic3;
791     }
792    
793     void
794     _free (PINGER *self)
795     CODE:
796     pinger_free (self);
797    
798     IV
799     id (PINGER *self, ...)
800 root 1.1 CODE:
801 root 1.10 RETVAL = self->id;
802     OUTPUT:
803     RETVAL
804    
805     void pinger_start (PINGER *self)
806    
807     void pinger_stop (PINGER *self)
808 root 1.6
809 root 1.11 void
810     _stop_id (UV id)
811 root 1.10 CODE:
812     if (id < pingercnt && pingers [id])
813     pinger_stop (pingers [id]);
814 root 1.1
815 root 1.11 void
816     interval (PINGER *self, NV interval)
817 root 1.10 CODE:
818 root 1.12 NOT_RUNNING;
819 root 1.10 self->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL;
820 root 1.1
821 root 1.11 void
822     max_rtt (PINGER *self, NV maxrtt)
823     CODE:
824 root 1.12 NOT_RUNNING;
825 root 1.11 self->maxrtt = maxrtt;
826    
827     void
828 root 1.12 on_recv (PINGER *self, SV *cb)
829     CODE:
830     SvREFCNT_dec (self->recvcb);
831     self->recvcb = newSVsv (cb);
832    
833     void
834     add_range (PINGER *self, SV *lo_, SV *hi_, NV interval = 0)
835 root 1.10 CODE:
836     {
837     STRLEN lo_len, hi_len;
838     char *lo = SvPVbyte (lo_, lo_len);
839     char *hi = SvPVbyte (hi_, hi_len);
840     RANGE *range;
841 root 1.12 NOT_RUNNING;
842 root 1.1
843 root 1.10 if (lo_len != hi_len || (lo_len != 4 && lo_len != 16))
844     croak ("AnyEvent::FastPing::add_range address range must be specified as two binary IPv4 or IPv6 addresses");
845 root 1.1
846 root 1.10 if (lo_len == 4 && icmp4_fd < 0) croak ("IPv4 support unavailable");
847     if (lo_len == 16 && icmp6_fd < 0) croak ("IPv6 support unavailable");
848 root 1.1
849 root 1.11 if (memcmp (lo, hi, lo_len) > 0)
850     croak ("AnyEvent::FastPing::add_range called with lo > hi");
851    
852 root 1.10 range = calloc (1, sizeof (RANGE));
853 root 1.1
854 root 1.10 range->next = 0;
855     range->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL;
856     range->addrlen = lo_len;
857 root 1.1
858 root 1.10 memcpy (sizeof (addr_tt) - lo_len + (char *)&range->lo, lo, lo_len);
859     memcpy (sizeof (addr_tt) - lo_len + (char *)&range->hi, hi, lo_len);
860 root 1.1
861 root 1.11 pinger_add_range (self, range);
862 root 1.1 }
863    
864 root 1.10 void
865 root 1.12 add_hosts (PINGER *self, SV *addrs, NV interval = 0, UV interleave = 1)
866 root 1.1 CODE:
867 root 1.11 {
868     AV *av;
869 root 1.12 int i, j, k;
870 root 1.11 int cnt;
871 root 1.13 int addrlen = 0;
872 root 1.11 RANGE *range;
873 root 1.12 NOT_RUNNING;
874 root 1.1
875 root 1.11 if (!SvROK (addrs) || SvTYPE (SvRV (addrs)) != SVt_PVAV)
876 root 1.12 croak ("AnyEvent::FastPing::add_hosts expects an arrayref with binary IPv4 or IPv6 addresses");
877 root 1.9
878 root 1.11 av = (AV *)SvRV (addrs);
879     cnt = av_len (av) + 1;
880 root 1.1
881 root 1.13 for (i = 0; i < cnt; ++i)
882     {
883     SV *sv = *av_fetch (av, i, 1);
884     sv_utf8_downgrade (sv, 0);
885 root 1.1
886 root 1.13 j = SvCUR (sv);
887 root 1.1
888 root 1.13 if (j != 4 && j != 16)
889     croak ("AnyEvent::FastPing::add_hosts addresses must be specified as binary IPv4 or IPv6 addresses");
890 root 1.1
891 root 1.13 if (j > addrlen)
892     addrlen = j;
893     }
894 root 1.1
895 root 1.13 if (!cnt)
896     XSRETURN_EMPTY;
897 root 1.1
898 root 1.11 range = calloc (1, sizeof (RANGE) + cnt * addrlen);
899 root 1.10
900 root 1.11 range->next = 0;
901     range->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL;
902     range->addrlen = addrlen;
903     range->addrcnt = cnt;
904 root 1.1
905 root 1.12 if (interleave == 0)
906     interleave = cnt <= 256 * 256 ? 256 : (int)sqrtf (cnt);
907    
908     k = cnt;
909     for (j = 0; j < interleave; ++j)
910     for (i = j; i < cnt; i += interleave)
911 root 1.13 {
912     uint8_t *dst = (uint8_t *)(range + 1) + --k * addrlen;
913     char *pv;
914     STRLEN pvlen;
915     SV *sv = *av_fetch (av, i, 1);
916     sv_utf8_downgrade (sv, 0);
917    
918     pv = SvPVbyte (sv, pvlen);
919    
920     if (pvlen != addrlen)
921     {
922     dst [ 0] = 0x00; dst [ 1] = 0x00; dst [ 2] = 0x00; dst [ 3] = 0x00;
923     dst [ 4] = 0x00; dst [ 5] = 0x00; dst [ 6] = 0x00; dst [ 7] = 0x00;
924     dst [ 8] = 0x00; dst [ 9] = 0x00; dst [10] = 0xff; dst [11] = 0xff;
925     dst [12] = pv [0]; dst [13] = pv [1]; dst [14] = pv [2]; dst [15] = pv [3];
926     }
927     else
928     memcpy (dst, pv, addrlen);
929     }
930 root 1.1
931 root 1.11 pinger_add_range (self, range);
932 root 1.1 }
933