ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FastPing/FastPing.xs
Revision: 1.15
Committed: Sat Nov 12 01:20:46 2016 UTC (7 years, 6 months ago) by root
Branch: MAIN
CVS Tags: rel-2_1, HEAD
Changes since 1.14: +34 -20 lines
Log Message:
2.1

File Contents

# User Rev Content
1 root 1.8 #if defined(__linux) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__CYGWIN__)
2 root 1.5 # define ENABLE_IPV6 1 // if you get compilation problems try to disable IPv6
3 root 1.2 #else
4 root 1.5 # define ENABLE_IPV6 0
5 root 1.2 #endif
6 root 1.1
7     #include "EXTERN.h"
8     #include "perl.h"
9     #include "XSUB.h"
10    
11     #include <pthread.h>
12    
13     #include <stdio.h>
14     #include <stdlib.h>
15     #include <string.h>
16    
17     #include <time.h>
18     #include <poll.h>
19     #include <unistd.h>
20     #include <inttypes.h>
21     #include <fcntl.h>
22     #include <errno.h>
23 root 1.13 #include <limits.h>
24 root 1.1
25     #include <sys/types.h>
26     #include <sys/time.h>
27     #include <sys/socket.h>
28    
29     #include <netinet/in.h>
30     #include <arpa/inet.h>
31    
32     #ifdef __linux
33     # include <linux/icmp.h>
34     #endif
35 root 1.8 #if ENABLE_IPV6 && !defined (__CYGWIN__)
36 root 1.1 # include <netinet/icmp6.h>
37     #endif
38    
39     #define ICMP4_ECHO 8
40     #define ICMP4_ECHO_REPLY 0
41     #define ICMP6_ECHO 128
42     #define ICMP6_ECHO_REPLY 129
43    
44 root 1.9 #define DRAIN_INTERVAL 1e-6 // how long to wait when sendto returns ENOBUFS, in seconds
45     #define MIN_INTERVAL 1e-6 // minimum packet send interval, in seconds
46 root 1.1
47     #define HDR_SIZE_IP4 20
48     #define HDR_SIZE_IP6 48
49    
50 root 1.11 static int thr_res[2]; // worker thread finished status
51     static int icmp4_fd = -1;
52     static int icmp6_fd = -1;
53 root 1.10
54 root 1.9 /*****************************************************************************/
55    
56 root 1.1 typedef double tstamp;
57    
58 root 1.5 static tstamp
59     NOW (void)
60 root 1.1 {
61     struct timeval tv;
62 root 1.9
63 root 1.1 gettimeofday (&tv, 0);
64 root 1.9
65     return tv.tv_sec + tv.tv_usec * 1e-6;
66 root 1.1 }
67    
68 root 1.9 static void
69     ssleep (tstamp wait)
70     {
71     #if defined (__SVR4) && defined (__sun)
72     struct timeval tv;
73    
74     tv.tv_sec = wait;
75     tv.tv_usec = (wait - tv.tv_sec) * 1e6;
76    
77     select (0, 0, 0, 0, &tv);
78     #elif defined(_WIN32)
79     Sleep ((unsigned long)(delay * 1e3));
80     #else
81     struct timespec ts;
82    
83     ts.tv_sec = wait;
84     ts.tv_nsec = (wait - ts.tv_sec) * 1e9;
85    
86     nanosleep (&ts, 0);
87     #endif
88     }
89    
90     /*****************************************************************************/
91    
92 root 1.8 typedef struct
93     {
94 root 1.1 uint8_t version_ihl;
95     uint8_t tos;
96     uint16_t tot_len;
97    
98     uint16_t id;
99     uint16_t flags;
100    
101     uint8_t ttl;
102     uint8_t protocol;
103     uint16_t cksum;
104    
105     uint32_t src;
106     uint32_t dst;
107     } IP4HDR;
108    
109 root 1.11 /*****************************************************************************/
110 root 1.1
111 root 1.11 typedef uint8_t addr_tt[16];
112 root 1.9
113 root 1.10 typedef struct
114     {
115     tstamp next;
116 root 1.12 tstamp interval;
117 root 1.10 int addrlen;
118 root 1.11
119     addr_tt lo, hi; /* only if !addrcnt */
120    
121     int addrcnt;
122     /* addrcnt addresses follow */
123 root 1.10 } RANGE;
124    
125     typedef struct
126     {
127     RANGE **ranges;
128     int rangecnt, rangemax;
129 root 1.1
130 root 1.12 tstamp next;
131 root 1.10 tstamp interval;
132 root 1.12
133 root 1.10 tstamp maxrtt;
134 root 1.12
135 root 1.10 uint16_t magic1;
136     uint16_t magic2;
137     uint16_t magic3;
138    
139     int id;
140    
141     AV *recvq; /* receive queue */
142     int nextrecv;
143     SV *recvcb;
144    
145     pthread_t thrid;
146     int running;
147     } PINGER;
148    
149     static PINGER **pingers;
150     static int *pingerfree; /* freelist next */
151     static int pingercnt;
152     static int pingermax;
153     static int firstfree = -1;
154     static int firstrecv = -1;
155    
156     /*****************************************************************************/
157 root 1.1
158 root 1.8 typedef struct
159     {
160 root 1.1 uint8_t type, code;
161     uint16_t cksum;
162 root 1.11
163 root 1.1 uint16_t id, seq;
164 root 1.11
165 root 1.10 uint16_t pinger;
166     uint16_t magic;
167 root 1.11
168 root 1.9 uint32_t stamp_hi;
169     uint32_t stamp_lo;
170 root 1.1 } PKT;
171    
172 root 1.9 static int
173 root 1.10 pkt_is_valid_for (PKT *pkt, PINGER *pinger)
174 root 1.9 {
175 root 1.10 return pkt->id == pinger->magic1
176     && pkt->seq == pinger->magic2
177     && pkt->magic == pinger->magic3;
178 root 1.9 }
179    
180     static void
181     ts_to_pkt (PKT *pkt, tstamp ts)
182     {
183     /* move 12 bits of seconds into the 32 bit fractional part */
184     /* leaving 20 bits subsecond resolution and 44 bits of integers */
185     /* (of which 32 are typically usable) */
186     ts *= 1. / 4096.;
187    
188     pkt->stamp_hi = ts;
189     pkt->stamp_lo = (ts - pkt->stamp_hi) * 4294967296.;
190     }
191    
192     static tstamp
193     pkt_to_ts (PKT *pkt)
194     {
195     return pkt->stamp_hi * 4096.
196     + pkt->stamp_lo * (4096. / 4294967296.);
197     }
198    
199 root 1.10 static void
200     pkt_cksum (PKT *pkt)
201     {
202 root 1.14 uint_fast32_t sum = 0;
203 root 1.10 uint32_t *wp = (uint32_t *)pkt;
204     int len = sizeof (*pkt) / 4;
205    
206     do
207     {
208     uint_fast32_t w = *(volatile uint32_t *)wp++;
209     sum += (w & 0xffff) + (w >> 16);
210     }
211 root 1.14 while (--len);
212 root 1.10
213     sum = (sum >> 16) + (sum & 0xffff); /* add high 16 to low 16 */
214     sum += sum >> 16; /* add carry */
215    
216     pkt->cksum = ~sum;
217     }
218    
219 root 1.9 /*****************************************************************************/
220    
221 root 1.10 static void
222     range_free (RANGE *self)
223     {
224     free (self);
225     }
226    
227     /* like sendto, but retries on failure */
228     static void
229     xsendto (int fd, void *buf, size_t len, int flags, void *sa, int salen)
230 root 1.1 {
231 root 1.11 tstamp wait = DRAIN_INTERVAL / 2.;
232 root 1.10
233     while (sendto (fd, buf, len, flags, sa, salen) < 0 && errno == ENOBUFS)
234 root 1.11 ssleep (wait *= 2.);
235 root 1.10 }
236    
237 root 1.11 // ping current address, return true and increment if more to ping
238     static int
239     range_send_ping (RANGE *self, PKT *pkt)
240 root 1.10 {
241 root 1.11 // send ping
242     uint8_t *addr;
243 root 1.13 int addrlen;
244 root 1.11
245     if (self->addrcnt)
246     addr = (self->addrcnt - 1) * self->addrlen + (uint8_t *)(self + 1);
247     else
248     addr = sizeof (addr_tt) - self->addrlen + self->lo;
249    
250 root 1.13 addrlen = self->addrlen;
251    
252     /* convert ipv4 mapped addresses - this only works for host lists */
253     /* this tries to match 0000:0000:0000:0000:0000:ffff:a.b.c.d */
254     /* efficiently but also with few insns */
255     if (addrlen == 16 && !addr [0] && icmp4_fd >= 0
256     && !( addr [ 1]
257     | addr [ 2] | addr [ 3]
258     | addr [ 4] | addr [ 5]
259     | addr [ 6] | addr [ 7]
260     | addr [ 8] | addr [ 9]
261     | (255-addr [10]) | (255-addr [11])))
262     {
263     addr += 12;
264     addrlen -= 12;
265     }
266    
267 root 1.10 pkt->cksum = 0;
268    
269 root 1.13 if (addrlen == 4)
270 root 1.10 {
271     struct sockaddr_in sa;
272    
273 root 1.14 pkt->type = ICMP4_ECHO;
274 root 1.10 pkt_cksum (pkt);
275    
276     sa.sin_family = AF_INET;
277     sa.sin_port = 0;
278    
279 root 1.11 memcpy (&sa.sin_addr, addr, sizeof (sa.sin_addr));
280 root 1.10
281     xsendto (icmp4_fd, pkt, sizeof (*pkt), 0, &sa, sizeof (sa));
282     }
283     else
284     {
285     #if ENABLE_IPV6
286     struct sockaddr_in6 sa;
287    
288     pkt->type = ICMP6_ECHO;
289 root 1.1
290 root 1.10 sa.sin6_family = AF_INET6;
291     sa.sin6_port = 0;
292     sa.sin6_flowinfo = 0;
293     sa.sin6_scope_id = 0;
294 root 1.1
295 root 1.11 memcpy (&sa.sin6_addr, addr, sizeof (sa.sin6_addr));
296 root 1.10
297 root 1.11 xsendto (icmp6_fd, pkt, sizeof (*pkt), 0, &sa, sizeof (sa));
298 root 1.10 #endif
299     }
300 root 1.11
301     // see if we have any more addresses
302     if (self->addrcnt)
303     {
304     if (!--self->addrcnt)
305     return 0;
306     }
307     else
308     {
309     if (!memcmp (&self->lo, &self->hi, sizeof (addr_tt)))
310     return 0;
311    
312     // increment self->lo
313     {
314     int len = sizeof (addr_tt) - 1;
315    
316     while (!++self->lo [len])
317     --len;
318     }
319     }
320    
321     return 1;
322 root 1.10 }
323    
324 root 1.11 /*****************************************************************************/
325    
326 root 1.10 static void
327     downheap (PINGER *self)
328     {
329     RANGE *elem = self->ranges [0]; /* always exists */
330     int Nm1 = self->rangecnt - 1;
331     int j;
332     int k;
333    
334     for (k = 0; ; )
335     {
336     int j = k * 2 + 1;
337    
338     if (j > Nm1)
339     break;
340    
341     if (j < Nm1
342     && self->ranges [j]->next > self->ranges [j + 1]->next)
343     ++j;
344    
345     if (self->ranges [j]->next >= elem->next)
346     break;
347    
348     self->ranges [k] = self->ranges [j];
349    
350     k = j;
351     }
352 root 1.1
353 root 1.10 self->ranges [k] = elem;
354 root 1.1 }
355    
356     static void
357 root 1.10 upheap (PINGER *self, int k)
358 root 1.1 {
359 root 1.10 RANGE *elem = self->ranges [k];
360 root 1.1
361 root 1.10 while (k)
362 root 1.1 {
363 root 1.10 int j = (k - 1) >> 1;
364    
365     if (self->ranges [j]->next <= elem->next)
366     break;
367    
368     self->ranges [k] = self->ranges [j];
369 root 1.1
370 root 1.10 k = j;
371 root 1.1 }
372 root 1.10
373     self->ranges [k] = elem;
374 root 1.1 }
375    
376     static void *
377 root 1.10 ping_proc (void *self_)
378 root 1.1 {
379 root 1.10 PINGER *self = (PINGER *)self_;
380 root 1.1 PKT pkt;
381    
382     memset (&pkt, 0, sizeof (pkt));
383    
384 root 1.12 tstamp now = NOW ();
385 root 1.10
386 root 1.12 pkt.code = 0;
387     pkt.id = self->magic1;
388     pkt.seq = self->magic2;
389     pkt.magic = self->magic3;
390     pkt.pinger = self->id;
391    
392     if (self->next < now)
393     self->next = now;
394 root 1.1
395 root 1.10 while (self->rangecnt)
396 root 1.1 {
397 root 1.10 RANGE *range = self->ranges [0];
398 root 1.1
399 root 1.10 // ranges [0] is always the next range to ping
400     tstamp wait = range->next - now;
401 root 1.6
402 root 1.10 // compare with the global frequency limit
403     {
404 root 1.12 tstamp diff = self->next - now;
405 root 1.10
406     if (wait < diff)
407 root 1.12 wait = diff; // global rate limit overrides
408     else
409     self->next = range->next; // fast forward
410 root 1.10 }
411    
412     if (wait > 0.)
413     ssleep (wait);
414    
415     now = NOW ();
416    
417     ts_to_pkt (&pkt, now);
418    
419 root 1.11 if (!range_send_ping (range, &pkt))
420 root 1.1 {
421 root 1.10 self->ranges [0] = self->ranges [--self->rangecnt];
422     range_free (range);
423     }
424     else
425 root 1.12 range->next = self->next + range->interval;
426 root 1.10
427 root 1.11 downheap (self);
428 root 1.1
429 root 1.12 self->next += self->interval;
430     now = NOW ();
431 root 1.10 }
432    
433     ssleep (self->maxrtt);
434    
435     {
436     uint16_t id = self->id;
437 root 1.1
438 root 1.10 write (thr_res [1], &id, sizeof (id));
439     }
440    
441     return 0;
442     }
443    
444     /*****************************************************************************/
445    
446 root 1.13 /* NetBSD, Solaris... */
447     #ifndef PTHREAD_STACK_MIN
448     # define PTHREAD_STACK_MIN 0
449     #endif
450    
451 root 1.10 static void
452     pinger_start (PINGER *self)
453     {
454     sigset_t fullsigset, oldsigset;
455     pthread_attr_t attr;
456    
457     if (self->running)
458     return;
459    
460     sigfillset (&fullsigset);
461 root 1.1
462 root 1.10 pthread_attr_init (&attr);
463     pthread_attr_setstacksize (&attr, PTHREAD_STACK_MIN < sizeof (long) * 2048 ? sizeof (long) * 2048 : PTHREAD_STACK_MIN);
464    
465     pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);
466    
467     if (pthread_create (&self->thrid, &attr, ping_proc, (void *)self))
468     croak ("AnyEvent::FastPing: unable to create pinger thread");
469    
470     pthread_sigmask (SIG_SETMASK, &oldsigset, 0);
471    
472     self->running = 1;
473     }
474    
475     static void
476     pinger_stop (PINGER *self)
477     {
478     if (!self->running)
479     return;
480    
481     self->running = 0;
482     pthread_cancel (self->thrid);
483     pthread_join (self->thrid, 0);
484     }
485    
486     static void
487     pinger_init (PINGER *self)
488     {
489     memset (self, 0, sizeof (PINGER));
490 root 1.1
491 root 1.10 if (firstfree >= 0)
492     {
493     self->id = firstfree;
494     firstfree = pingerfree [firstfree];
495     }
496     else if (pingercnt == 0xffff)
497     croak ("unable to create more than 65536 AnyEvent::FastPing objects");
498     else
499     {
500     if (pingercnt == pingermax)
501 root 1.1 {
502 root 1.10 pingermax = pingermax * 2 + 16;
503     pingers = realloc (pingers , sizeof (pingers [0]) * pingermax);
504     pingerfree = realloc (pingerfree, sizeof (pingerfree [0]) * pingermax);
505     }
506    
507     self->id = pingercnt++;
508     }
509    
510     pingers [self->id] = self;
511    
512     self->recvcb = &PL_sv_undef;
513 root 1.12 self->next = 0.;
514 root 1.10 self->interval = MIN_INTERVAL;
515     self->maxrtt = 0.5;
516     self->rangemax = 16;
517     self->ranges = malloc (sizeof (self->ranges [0]) * self->rangemax);
518     }
519    
520     static void
521     pinger_free (PINGER *self)
522     {
523     pinger_stop (self);
524    
525     pingers [self->id] = 0;
526    
527     SvREFCNT_dec (self->recvq);
528     SvREFCNT_dec (self->recvcb);
529    
530     pingerfree [self->id] = firstfree;
531     firstfree = self->id;
532    
533     while (self->rangecnt)
534     range_free (self->ranges [--self->rangecnt]);
535    
536     free (self->ranges);
537     }
538    
539 root 1.11 static void
540     pinger_add_range (PINGER *self, RANGE *range)
541     {
542     if (self->rangecnt == self->rangemax)
543     self->ranges = realloc (self->ranges, sizeof (self->ranges [0]) * (self->rangemax <<= 1));
544    
545     self->ranges [self->rangecnt] = range;
546     upheap (self, self->rangecnt);
547     ++self->rangecnt;
548     }
549    
550 root 1.10 /*****************************************************************************/
551    
552     static void
553     recv_feed (PINGER *self, void *addr, int addrlen, tstamp rtt)
554     {
555     if (!self->recvq)
556     {
557     /* first seen this round */
558     if (!SvOK (self->recvcb))
559     return;
560    
561     self->recvq = newAV ();
562    
563     self->nextrecv = firstrecv;
564     firstrecv = self->id;
565     }
566 root 1.1
567 root 1.10 {
568     AV *pkt = newAV ();
569    
570     av_extend (pkt, 2-1);
571    
572     AvARRAY (pkt)[0] = newSVpvn (addr, addrlen);
573     AvARRAY (pkt)[1] = newSVnv (rtt);
574     AvFILLp (pkt) = 2-1;
575 root 1.1
576 root 1.10 av_push (self->recvq, newRV_noinc ((SV *)pkt));
577     }
578     }
579 root 1.1
580 root 1.10 static void
581     recv_flush (void)
582     {
583     if (firstrecv < 0)
584     return;
585    
586     ENTER;
587     SAVETMPS;
588 root 1.1
589 root 1.11 do
590 root 1.10 {
591     dSP;
592     PINGER *self = pingers [firstrecv];
593     firstrecv = self->nextrecv;
594 root 1.1
595 root 1.10 self->nextrecv = -1;
596 root 1.1
597 root 1.10 PUSHMARK (SP);
598     XPUSHs (sv_2mortal (newRV_noinc ((SV *)self->recvq)));
599     self->recvq = 0;
600     PUTBACK;
601     call_sv (self->recvcb, G_DISCARD | G_VOID);
602 root 1.1 }
603 root 1.11 while (firstrecv >= 0);
604 root 1.1
605 root 1.10 FREETMPS;
606     LEAVE;
607 root 1.1 }
608    
609 root 1.10 /*****************************************************************************/
610    
611     #if 0
612 root 1.1 static void
613     feed_reply (AV *res_av)
614     {
615     dSP;
616     SV *res = sv_2mortal (newRV_inc ((SV *)res_av));
617     int i;
618    
619 root 1.6 if (av_len (res_av) < 0)
620     return;
621    
622 root 1.1 ENTER;
623     SAVETMPS;
624    
625     for (i = av_len (cbs) + 1; i--; )
626     {
627     SV *cb = *av_fetch (cbs, i, 1);
628    
629     PUSHMARK (SP);
630     XPUSHs (res);
631     PUTBACK;
632     call_sv (cb, G_DISCARD | G_VOID);
633     }
634    
635     FREETMPS;
636     LEAVE;
637     }
638 root 1.10 #endif
639 root 1.1
640     static void
641 root 1.15 boot_protocols (void)
642 root 1.1 {
643     icmp4_fd = socket (AF_INET, SOCK_RAW, IPPROTO_ICMP);
644 root 1.2 fcntl (icmp4_fd, F_SETFL, O_NONBLOCK);
645 root 1.1 #ifdef ICMP_FILTER
646     {
647     struct icmp_filter oval;
648     oval.data = 0xffffffff & ~(1 << ICMP4_ECHO_REPLY);
649     setsockopt (icmp4_fd, SOL_RAW, ICMP_FILTER, &oval, sizeof oval);
650     }
651     #endif
652    
653 root 1.5 #if ENABLE_IPV6
654 root 1.1 icmp6_fd = socket (AF_INET6, SOCK_RAW, IPPROTO_ICMPV6);
655 root 1.2 fcntl (icmp6_fd, F_SETFL, O_NONBLOCK);
656 root 1.1 # ifdef ICMP6_FILTER
657     {
658     struct icmp6_filter oval;
659     ICMP6_FILTER_SETBLOCKALL (&oval);
660     ICMP6_FILTER_SETPASS (ICMP6_ECHO_REPLY, &oval);
661     setsockopt (icmp6_fd, IPPROTO_ICMPV6, ICMP6_FILTER, &oval, sizeof oval);
662     }
663     # endif
664     #endif
665 root 1.15 }
666    
667     static void
668     boot (void)
669     {
670     if (pipe (thr_res) < 0)
671     croak ("AnyEvent::FastPing: unable to create receive pipe");
672    
673     sv_setiv (get_sv ("AnyEvent::FastPing::THR_RES_FD", 1), thr_res [0]);
674    
675     boot_protocols ();
676 root 1.1
677     sv_setiv (get_sv ("AnyEvent::FastPing::ICMP4_FD", 1), icmp4_fd);
678     sv_setiv (get_sv ("AnyEvent::FastPing::ICMP6_FD", 1), icmp6_fd);
679     }
680    
681 root 1.12 #define NOT_RUNNING \
682     if (self->running) \
683 root 1.15 croak ("AnyEvent::FastPing object has been started - you have to stop it first before calling this method, caught");
684 root 1.12
685 root 1.10 MODULE = AnyEvent::FastPing PACKAGE = AnyEvent::FastPing PREFIX = pinger_
686 root 1.1
687 root 1.15 PROTOTYPES: DISABLE
688    
689 root 1.1 BOOT:
690     {
691 root 1.15 HV *stash = gv_stashpv ("AnyEvent::FastPing", 1);
692    
693     if (sizeof (PKT) & 3)
694     croak ("size of PKT structure is not a multiple of 4");
695    
696     newCONSTSUB (stash, "icmp4_pktsize", newSViv (HDR_SIZE_IP4 + sizeof (PKT)));
697     newCONSTSUB (stash, "icmp6_pktsize", newSViv (HDR_SIZE_IP6 + sizeof (PKT)));
698    
699     boot_protocols ();
700    
701     newCONSTSUB (stash, "ipv4_supported", newSViv (icmp4_fd >= 0));
702     newCONSTSUB (stash, "ipv6_supported", newSViv (icmp6_fd >= 0));
703    
704     close (icmp4_fd);
705     close (icmp6_fd);
706 root 1.1 }
707    
708 root 1.15 void
709     _boot ()
710     CODE:
711     boot ();
712 root 1.1
713 root 1.10 void
714 root 1.11 _recv_icmp4 (...)
715     CODE:
716     {
717     char buf [512];
718     struct sockaddr_in sa;
719     int maxrecv;
720    
721     for (maxrecv = 256+1; --maxrecv; )
722     {
723     PINGER *pinger;
724     IP4HDR *iphdr = (IP4HDR *)buf;
725     socklen_t sl = sizeof (sa);
726     int len = recvfrom (icmp4_fd, buf, sizeof (buf), MSG_TRUNC, (struct sockaddr *)&sa, &sl);
727     int hdrlen, totlen;
728     PKT *pkt;
729    
730     if (len <= HDR_SIZE_IP4)
731     break;
732    
733     hdrlen = (iphdr->version_ihl & 15) * 4;
734     totlen = ntohs (iphdr->tot_len);
735    
736     if (totlen > len
737     || iphdr->protocol != IPPROTO_ICMP
738     || hdrlen < HDR_SIZE_IP4 || hdrlen + sizeof (PKT) != totlen)
739     continue;
740    
741     pkt = (PKT *)(buf + hdrlen);
742    
743     if (pkt->type != ICMP4_ECHO_REPLY
744     || pkt->pinger >= pingercnt
745     || !pingers [pkt->pinger])
746     continue;
747    
748     pinger = pingers [pkt->pinger];
749    
750     if (!pkt_is_valid_for (pkt, pinger))
751     continue;
752    
753     recv_feed (pinger, &sa.sin_addr, 4, NOW () - pkt_to_ts (pkt));
754     }
755    
756     recv_flush ();
757     }
758    
759     void
760     _recv_icmp6 (...)
761     CODE:
762     {
763     struct sockaddr_in6 sa;
764     PKT pkt;
765     int maxrecv;
766    
767     for (maxrecv = 256+1; --maxrecv; )
768     {
769     PINGER *pinger;
770     socklen_t sl = sizeof (sa);
771     int len = recvfrom (icmp6_fd, &pkt, sizeof (pkt), MSG_TRUNC, (struct sockaddr *)&sa, &sl);
772    
773     if (len != sizeof (PKT))
774     break;
775    
776     if (pkt.type != ICMP6_ECHO_REPLY
777     || pkt.pinger >= pingercnt
778     || !pingers [pkt.pinger])
779     continue;
780    
781     pinger = pingers [pkt.pinger];
782    
783     if (!pkt_is_valid_for (&pkt, pinger))
784     continue;
785    
786     recv_feed (pinger, &sa.sin6_addr, 16, NOW () - pkt_to_ts (&pkt));
787     }
788    
789     recv_flush ();
790     }
791    
792     void
793 root 1.10 _new (SV *klass, UV magic1, UV magic2, UV magic3)
794     PPCODE:
795     {
796     SV *pv = NEWSV (0, sizeof (PINGER));
797     PINGER *self = (PINGER *)SvPVX (pv);
798 root 1.12
799 root 1.10 SvPOK_only (pv);
800 root 1.11 XPUSHs (sv_2mortal (sv_bless (newRV_noinc (pv), gv_stashpv (SvPVutf8_nolen (klass), 1))));
801 root 1.10 pinger_init (self);
802     self->magic1 = magic1;
803     self->magic2 = magic2;
804     self->magic3 = magic3;
805     }
806    
807     void
808     _free (PINGER *self)
809     CODE:
810     pinger_free (self);
811    
812     IV
813     id (PINGER *self, ...)
814 root 1.1 CODE:
815 root 1.10 RETVAL = self->id;
816     OUTPUT:
817     RETVAL
818    
819     void pinger_start (PINGER *self)
820    
821     void pinger_stop (PINGER *self)
822 root 1.6
823 root 1.11 void
824     _stop_id (UV id)
825 root 1.10 CODE:
826     if (id < pingercnt && pingers [id])
827     pinger_stop (pingers [id]);
828 root 1.1
829 root 1.11 void
830     interval (PINGER *self, NV interval)
831 root 1.10 CODE:
832 root 1.12 NOT_RUNNING;
833 root 1.10 self->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL;
834 root 1.1
835 root 1.11 void
836     max_rtt (PINGER *self, NV maxrtt)
837     CODE:
838 root 1.12 NOT_RUNNING;
839 root 1.11 self->maxrtt = maxrtt;
840    
841     void
842 root 1.12 on_recv (PINGER *self, SV *cb)
843     CODE:
844     SvREFCNT_dec (self->recvcb);
845     self->recvcb = newSVsv (cb);
846    
847     void
848     add_range (PINGER *self, SV *lo_, SV *hi_, NV interval = 0)
849 root 1.10 CODE:
850     {
851     STRLEN lo_len, hi_len;
852     char *lo = SvPVbyte (lo_, lo_len);
853     char *hi = SvPVbyte (hi_, hi_len);
854     RANGE *range;
855 root 1.12 NOT_RUNNING;
856 root 1.1
857 root 1.10 if (lo_len != hi_len || (lo_len != 4 && lo_len != 16))
858     croak ("AnyEvent::FastPing::add_range address range must be specified as two binary IPv4 or IPv6 addresses");
859 root 1.1
860 root 1.10 if (lo_len == 4 && icmp4_fd < 0) croak ("IPv4 support unavailable");
861     if (lo_len == 16 && icmp6_fd < 0) croak ("IPv6 support unavailable");
862 root 1.1
863 root 1.11 if (memcmp (lo, hi, lo_len) > 0)
864     croak ("AnyEvent::FastPing::add_range called with lo > hi");
865    
866 root 1.10 range = calloc (1, sizeof (RANGE));
867 root 1.1
868 root 1.10 range->next = 0;
869     range->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL;
870     range->addrlen = lo_len;
871 root 1.1
872 root 1.10 memcpy (sizeof (addr_tt) - lo_len + (char *)&range->lo, lo, lo_len);
873     memcpy (sizeof (addr_tt) - lo_len + (char *)&range->hi, hi, lo_len);
874 root 1.1
875 root 1.11 pinger_add_range (self, range);
876 root 1.1 }
877    
878 root 1.10 void
879 root 1.12 add_hosts (PINGER *self, SV *addrs, NV interval = 0, UV interleave = 1)
880 root 1.1 CODE:
881 root 1.11 {
882     AV *av;
883 root 1.12 int i, j, k;
884 root 1.11 int cnt;
885 root 1.13 int addrlen = 0;
886 root 1.11 RANGE *range;
887 root 1.12 NOT_RUNNING;
888 root 1.1
889 root 1.11 if (!SvROK (addrs) || SvTYPE (SvRV (addrs)) != SVt_PVAV)
890 root 1.12 croak ("AnyEvent::FastPing::add_hosts expects an arrayref with binary IPv4 or IPv6 addresses");
891 root 1.9
892 root 1.11 av = (AV *)SvRV (addrs);
893     cnt = av_len (av) + 1;
894 root 1.1
895 root 1.13 for (i = 0; i < cnt; ++i)
896     {
897     SV *sv = *av_fetch (av, i, 1);
898     sv_utf8_downgrade (sv, 0);
899 root 1.1
900 root 1.13 j = SvCUR (sv);
901 root 1.1
902 root 1.13 if (j != 4 && j != 16)
903     croak ("AnyEvent::FastPing::add_hosts addresses must be specified as binary IPv4 or IPv6 addresses");
904 root 1.1
905 root 1.13 if (j > addrlen)
906     addrlen = j;
907     }
908 root 1.1
909 root 1.13 if (!cnt)
910     XSRETURN_EMPTY;
911 root 1.1
912 root 1.11 range = calloc (1, sizeof (RANGE) + cnt * addrlen);
913 root 1.10
914 root 1.11 range->next = 0;
915     range->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL;
916     range->addrlen = addrlen;
917     range->addrcnt = cnt;
918 root 1.1
919 root 1.12 if (interleave == 0)
920     interleave = cnt <= 256 * 256 ? 256 : (int)sqrtf (cnt);
921    
922     k = cnt;
923     for (j = 0; j < interleave; ++j)
924     for (i = j; i < cnt; i += interleave)
925 root 1.13 {
926     uint8_t *dst = (uint8_t *)(range + 1) + --k * addrlen;
927     char *pv;
928     STRLEN pvlen;
929     SV *sv = *av_fetch (av, i, 1);
930     sv_utf8_downgrade (sv, 0);
931    
932     pv = SvPVbyte (sv, pvlen);
933    
934     if (pvlen != addrlen)
935     {
936     dst [ 0] = 0x00; dst [ 1] = 0x00; dst [ 2] = 0x00; dst [ 3] = 0x00;
937     dst [ 4] = 0x00; dst [ 5] = 0x00; dst [ 6] = 0x00; dst [ 7] = 0x00;
938     dst [ 8] = 0x00; dst [ 9] = 0x00; dst [10] = 0xff; dst [11] = 0xff;
939     dst [12] = pv [0]; dst [13] = pv [1]; dst [14] = pv [2]; dst [15] = pv [3];
940     }
941     else
942     memcpy (dst, pv, addrlen);
943     }
944 root 1.1
945 root 1.11 pinger_add_range (self, range);
946 root 1.1 }
947