ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FastPing/FastPing.xs
Revision: 1.8
Committed: Sat Jan 29 23:36:49 2011 UTC (13 years, 3 months ago) by root
Branch: MAIN
CVS Tags: rel-1_15, rel-1_14
Changes since 1.7: +27 -22 lines
Log Message:
1.14

File Contents

# User Rev Content
1 root 1.8 #if defined(__linux) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__CYGWIN__)
2 root 1.5 # define ENABLE_IPV6 1 // if you get compilation problems try to disable IPv6
3 root 1.2 #else
4 root 1.5 # define ENABLE_IPV6 0
5 root 1.2 #endif
6 root 1.1
7     #include "EXTERN.h"
8     #include "perl.h"
9     #include "XSUB.h"
10    
11     #include <pthread.h>
12    
13     #include <math.h>
14     #include <stdio.h>
15     #include <stdlib.h>
16     #include <string.h>
17    
18     #include <time.h>
19     #include <poll.h>
20     #include <unistd.h>
21     #include <inttypes.h>
22     #include <fcntl.h>
23     #include <errno.h>
24    
25     #include <sys/types.h>
26     #include <sys/time.h>
27     #include <sys/socket.h>
28    
29     #include <netinet/in.h>
30     #include <arpa/inet.h>
31    
32     #ifdef __linux
33     # include <linux/icmp.h>
34     #endif
35 root 1.8 #if ENABLE_IPV6 && !defined (__CYGWIN__)
36 root 1.1 # include <netinet/icmp6.h>
37     #endif
38    
39     #define ICMP4_ECHO 8
40     #define ICMP4_ECHO_REPLY 0
41     #define ICMP6_ECHO 128
42     #define ICMP6_ECHO_REPLY 129
43    
44     #define DRAIN_INTERVAL .000001 // how long to wait when sendto returns ENOBUFS, in seconds
45     #define MIN_INTERVAL .000001 // minimum packet send interval, in seconds
46    
47     #define HDR_SIZE_IP4 20
48     #define HDR_SIZE_IP6 48
49    
50 root 1.4 //TODO: xread/xwrite for atomicity? we currently rely on the fact that the pip buffersize divides exactly by pointer sizes
51 root 1.1
52 root 1.8 typedef uint8_t addr_tt[16];
53 root 1.1
54     typedef double tstamp;
55    
56 root 1.5 static tstamp
57     NOW (void)
58 root 1.1 {
59     struct timeval tv;
60     gettimeofday (&tv, 0);
61     return tv.tv_sec + tv.tv_usec * 0.000001;
62     }
63    
64 root 1.8 typedef struct
65     {
66 root 1.1 int family;
67 root 1.8 addr_tt lo, hi;
68 root 1.1 double interval;
69     tstamp next;
70     } RANGE;
71    
72 root 1.8 typedef struct
73     {
74 root 1.1 SV *id;
75     double interval;
76     int nranges;
77     RANGE *ranges;
78     uint32_t payload;
79     } REQ;
80    
81 root 1.8 typedef struct
82     {
83 root 1.1 uint8_t version_ihl;
84     uint8_t tos;
85     uint16_t tot_len;
86    
87     uint16_t id;
88     uint16_t flags;
89    
90     uint8_t ttl;
91     uint8_t protocol;
92     uint16_t cksum;
93    
94     uint32_t src;
95     uint32_t dst;
96     } IP4HDR;
97    
98 root 1.8 typedef struct
99     {
100 root 1.1 uint8_t version;
101     uint8_t x1, x2, x3;
102    
103     uint16_t payload_len;
104     uint8_t nxt_hdr;
105     uint8_t hop_limit;
106    
107     uint8_t src[16];
108     uint8_t dst[16];
109     } IP6HDR;
110    
111     #define MAGIC 0xca4c
112    
113     static uint16_t magic;
114    
115 root 1.8 typedef struct
116     {
117 root 1.1 uint8_t type, code;
118     uint16_t cksum;
119     uint16_t id, seq;
120     uint32_t payload;
121     tstamp stamp; // be careful when accessing this
122     } PKT;
123    
124     static pthread_t pthrid;
125     static int thr_send[2]; // send to worker
126     static int thr_recv[2]; // receive from worker
127    
128     static int icmp4_fd, icmp6_fd;
129    
130     static AV *cbs;
131    
132     static uint16_t
133     icmp_cksum (void *data, unsigned int len)
134     {
135     register int sum = 0;
136     uint16_t *wp;
137    
138     assert (~len & 1);
139    
140     for (wp = (uint16_t *)data; len; wp++, len -= 2)
141     sum += *wp;
142    
143     sum = (sum >> 16) + (sum & 0xffff); /* add high 16 to low 16 */
144     sum += sum >> 16; /* add carry */
145    
146     return ~sum;
147     }
148    
149     static void
150 root 1.8 inc_addr (addr_tt *addr)
151 root 1.1 {
152 root 1.8 int len = sizeof (addr_tt) - 1;
153 root 1.1
154     do
155     {
156     if ((*addr)[len] != 0xff)
157     {
158     ++(*addr)[len];
159     break;
160     }
161    
162     (*addr)[len] = 0;
163     }
164     while (len--);
165     }
166    
167     static void *
168     ping_proc (void *unused)
169     {
170     PKT pkt;
171     struct sockaddr_in sa4;
172 root 1.5 #if ENABLE_IPV6
173 root 1.1 struct sockaddr_in6 sa6;
174     #endif
175    
176     memset (&pkt, 0, sizeof (pkt));
177    
178     memset (&sa4, 0, sizeof (sa4));
179     sa4.sin_family = AF_INET;
180     sa4.sin_port = 0;
181 root 1.5 #if ENABLE_IPV6
182 root 1.1 memset (&sa6, 0, sizeof (sa6));
183     sa6.sin6_family = AF_INET6;
184     sa6.sin6_port = 0;
185     #endif
186    
187     for (;;)
188     {
189     REQ *req;
190     int len = read (thr_send [0], &req, sizeof (req));
191    
192 root 1.6 tstamp now = NOW ();
193     tstamp next = now;
194    
195 root 1.1 if (!len)
196     pthread_exit (0);
197     else if (len != sizeof (req))
198     {
199 root 1.7 perror ("AnyEvent::FastPing: short read or read error");
200 root 1.1 pthread_exit ((void *)-1);
201     }
202    
203     //TODO: bind to source address
204    
205     pkt.code = 0;
206     pkt.id = (uint16_t)magic;
207     pkt.seq = (uint16_t)~magic;
208     pkt.payload = req->payload;
209    
210     {
211     int r;
212     for (r = req->nranges; r--; )
213     inc_addr (&req->ranges [r].hi);
214     }
215    
216     while (req->nranges)
217     {
218     RANGE *range = req->ranges;
219 root 1.6 int n, k;
220 root 1.1
221 root 1.8 if (!memcmp (&range->lo, &range->hi, sizeof (addr_tt)))
222 root 1.1 req->ranges [0] = req->ranges [--req->nranges];
223     else
224     {
225     // ranges [0] is always the next range to ping
226     tstamp wait = range->next - now;
227    
228     // compare with the global frequency limit
229     {
230     tstamp diff = next - now;
231    
232     if (wait < diff)
233     wait = diff;
234     else if (range)
235     next = range->next;
236     }
237    
238     if (wait > 0.)
239     {
240     struct timespec ts;
241    
242     ts.tv_sec = wait;
243     ts.tv_nsec = (wait - ts.tv_sec) * 1000000000.;
244    
245     nanosleep (&ts, 0);
246     }
247    
248     now = NOW ();
249    
250     pkt.stamp = now;
251     pkt.cksum = 0;
252    
253     if (range->family == AF_INET)
254     {
255     pkt.type = ICMP4_ECHO;
256     pkt.cksum = icmp_cksum (&pkt, sizeof (pkt));
257    
258     memcpy (&sa4.sin_addr,
259 root 1.8 sizeof (addr_tt) - sizeof (sa4.sin_addr) + (char *)&range->lo,
260 root 1.1 sizeof (sa4.sin_addr));
261    
262     if (sendto (icmp4_fd, &pkt, sizeof (pkt), 0, (struct sockaddr *)&sa4, sizeof (sa4)) > 0)
263     errno = 0;
264     }
265     else
266     {
267 root 1.5 #if ENABLE_IPV6
268 root 1.1 pkt.type = ICMP6_ECHO;
269    
270     memcpy (&sa6.sin6_addr,
271 root 1.8 sizeof (addr_tt) - sizeof (sa6.sin6_addr) + (char *)&range->lo,
272 root 1.1 sizeof (sa6.sin6_addr));
273    
274     if (sendto (icmp6_fd, &pkt, sizeof (pkt), 0, (struct sockaddr *)&sa6, sizeof (sa6)) > 0)
275     errno = 0;
276     #endif
277     }
278    
279     if (errno == ENOBUFS)
280     {
281     struct timespec ts;
282    
283     ts.tv_sec = 0;
284     ts.tv_nsec = DRAIN_INTERVAL * 1000000000;
285    
286     nanosleep (&ts, 0);
287     }
288     else
289     {
290     inc_addr (&range->lo);
291    
292     range->next = next;
293     range->next += range->interval;
294     }
295    
296     next += req->interval;
297     }
298    
299     // make a downheap operation
300 root 1.6 for (n = k = 0; ; )
301 root 1.1 {
302 root 1.6 int j = k * 2 + 1;
303    
304 root 1.1 ++n;
305    
306     if (j >= req->nranges)
307     break;
308     else if (j < req->nranges - 1)
309     if (req->ranges [j].next > req->ranges [j + 1].next)
310     ++j;
311    
312     if (req->ranges [j].next >= req->ranges [k].next)
313     break;
314    
315 root 1.6 {
316     RANGE temp = req->ranges [k];
317     req->ranges [k] = req->ranges [j];
318     req->ranges [j] = temp;
319     }
320 root 1.1
321     k = j;
322     }
323     }
324    
325     write (thr_recv [1], &req, sizeof (req));
326     }
327    
328     return 0;
329     }
330    
331     static void
332     feed_reply (AV *res_av)
333     {
334     dSP;
335     SV *res = sv_2mortal (newRV_inc ((SV *)res_av));
336     int i;
337    
338 root 1.6 if (av_len (res_av) < 0)
339     return;
340    
341 root 1.1 ENTER;
342     SAVETMPS;
343    
344     for (i = av_len (cbs) + 1; i--; )
345     {
346     SV *cb = *av_fetch (cbs, i, 1);
347    
348     PUSHMARK (SP);
349     XPUSHs (res);
350     PUTBACK;
351     call_sv (cb, G_DISCARD | G_VOID);
352     }
353    
354     FREETMPS;
355     LEAVE;
356     }
357    
358     static void
359     boot ()
360     {
361     sigset_t fullsigset, oldsigset;
362     pthread_attr_t attr;
363    
364     if (pipe (thr_send) < 0)
365     croak ("AnyEvent::FastPing: unable to create send pipe");
366    
367     if (pipe (thr_recv) < 0)
368     croak ("AnyEvent::FastPing: unable to create receive pipe");
369    
370     icmp4_fd = socket (AF_INET, SOCK_RAW, IPPROTO_ICMP);
371 root 1.2 fcntl (icmp4_fd, F_SETFL, O_NONBLOCK);
372 root 1.1 #ifdef ICMP_FILTER
373     {
374     struct icmp_filter oval;
375     oval.data = 0xffffffff & ~(1 << ICMP4_ECHO_REPLY);
376     setsockopt (icmp4_fd, SOL_RAW, ICMP_FILTER, &oval, sizeof oval);
377     }
378     #endif
379    
380 root 1.5 #if ENABLE_IPV6
381 root 1.1 icmp6_fd = socket (AF_INET6, SOCK_RAW, IPPROTO_ICMPV6);
382 root 1.2 fcntl (icmp6_fd, F_SETFL, O_NONBLOCK);
383 root 1.1 # ifdef ICMP6_FILTER
384     {
385     struct icmp6_filter oval;
386     ICMP6_FILTER_SETBLOCKALL (&oval);
387     ICMP6_FILTER_SETPASS (ICMP6_ECHO_REPLY, &oval);
388     setsockopt (icmp6_fd, IPPROTO_ICMPV6, ICMP6_FILTER, &oval, sizeof oval);
389     }
390     # endif
391     #endif
392    
393     pthread_attr_init (&attr);
394     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
395     #ifdef PTHREAD_SCOPE_PROCESS
396     pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS);
397     #endif
398    
399     sigfillset (&fullsigset);
400    
401     pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);
402    
403     if (pthread_create (&pthrid, &attr, ping_proc, 0))
404     croak ("AnyEvent::FastPing: unable to create pinger thread");
405    
406     pthread_sigmask (SIG_SETMASK, &oldsigset, 0);
407    
408     sv_setiv (get_sv ("AnyEvent::FastPing::THR_REQ_FD", 1), thr_send [1]);
409     sv_setiv (get_sv ("AnyEvent::FastPing::THR_RES_FD", 1), thr_recv [0]);
410    
411     sv_setiv (get_sv ("AnyEvent::FastPing::ICMP4_FD", 1), icmp4_fd);
412     sv_setiv (get_sv ("AnyEvent::FastPing::ICMP6_FD", 1), icmp6_fd);
413     }
414    
415     MODULE = AnyEvent::FastPing PACKAGE = AnyEvent::FastPing
416    
417     BOOT:
418     {
419     HV *stash = gv_stashpv ("AnyEvent::FastPing", 1);
420    
421     cbs = get_av ("AnyEvent::FastPing::CB", 1);
422     magic = getpid () ^ MAGIC;
423    
424     boot ();
425    
426     newCONSTSUB (stash, "ipv4_supported", newSViv (icmp4_fd >= 0));
427     newCONSTSUB (stash, "ipv6_supported", newSViv (icmp6_fd >= 0));
428    
429     newCONSTSUB (stash, "icmp4_pktsize", newSViv (HDR_SIZE_IP4 + sizeof (PKT)));
430     newCONSTSUB (stash, "icmp6_pktsize", newSViv (HDR_SIZE_IP6 + sizeof (PKT)));
431     }
432    
433     PROTOTYPES: DISABLE
434    
435     SV *
436     _req_icmp_ping (SV *ranges, NV interval, U32 payload, SV *id)
437     CODE:
438     {
439 root 1.6 AV *rav;
440     int nranges, i;
441     REQ *req;
442    
443 root 1.1 if (!SvROK (ranges) || SvTYPE (SvRV (ranges)) != SVt_PVAV)
444     croak ("address ranges must be given as arrayref with lo, hi pairs");
445    
446 root 1.6 rav = (AV *)SvRV (ranges);
447     nranges = av_len (rav) + 1;
448 root 1.1
449 root 1.6 req = malloc (sizeof (REQ));
450 root 1.1
451     if (interval < MIN_INTERVAL)
452     interval = MIN_INTERVAL;
453    
454     req->id = newSVsv (id);
455     req->interval = interval;
456     req->payload = payload;
457     req->nranges = nranges;
458     req->ranges = (RANGE *)malloc (nranges * sizeof (RANGE));
459    
460     while (nranges--)
461     {
462     SV *sv = *av_fetch (rav, nranges, 1);
463 root 1.6 SV *lo, *hi;
464     AV *av;
465     RANGE *r;
466 root 1.1
467     if (!SvROK (sv) || SvTYPE (SvRV (sv)) != SVt_PVAV)
468     croak ("address range must be given as arrayref with lo, hi, interval arrayrefs");
469    
470 root 1.6 av = (AV *)SvRV (sv);
471     r = req->ranges + nranges;
472 root 1.1
473 root 1.6 lo = *av_fetch (av, 0, 1);
474     hi = *av_fetch (av, 1, 1);
475 root 1.1
476     sv_utf8_downgrade (lo, 0);
477     sv_utf8_downgrade (hi, 0);
478    
479 root 1.8 memset (&r->lo, 0, sizeof (addr_tt));
480     memset (&r->hi, 0, sizeof (addr_tt));
481 root 1.1
482     if (SvPOKp (lo) && SvPOKp (hi))
483     {
484     if (SvCUR (lo) != SvCUR (hi))
485 root 1.5 croak ("all addresses in range must be of the same size (either 4 or 16 bytes)");
486 root 1.1
487     if (SvCUR (lo) == 4)
488     {
489     r->family = AF_INET;
490 root 1.8 memcpy (sizeof (addr_tt) - 4 + (char *)&r->lo, SvPVX (lo), 4);
491     memcpy (sizeof (addr_tt) - 4 + (char *)&r->hi, SvPVX (hi), 4);
492 root 1.1 }
493     else if (SvCUR (lo) == 16)
494     {
495 root 1.5 #if ENABLE_IPV6
496 root 1.1 r->family = AF_INET6;
497 root 1.8 memcpy (&r->lo, SvPVX (lo), sizeof (addr_tt));
498     memcpy (&r->hi, SvPVX (hi), sizeof (addr_tt));
499 root 1.1 #else
500     croak ("IPv6 not supported in this configuration");
501     #endif
502     }
503     else
504 root 1.5 croak ("addresses in range must be either 4 (IPv4) or 16 (IPv6) bytes in length");
505 root 1.1 }
506     else if (SvIOK (lo) && SvIOK (hi))
507     {
508 root 1.6 uint32_t addr;
509    
510 root 1.1 r->family = AF_INET;
511    
512 root 1.8 addr = htonl (SvUV (lo)); memcpy (sizeof (addr_tt) - 4 + (char *)&r->lo, &addr, 4);
513     addr = htonl (SvUV (hi)); memcpy (sizeof (addr_tt) - 4 + (char *)&r->hi, &addr, 4);
514 root 1.1 }
515     else
516     croak ("addresses in range must be strings with either 4 (IPv4) or 16 (IPv6) octets");
517    
518     if (r->family == AF_INET)
519     {
520     if (icmp4_fd < 0)
521     croak ("AnyEvent::FastPing: IPv4 ping support not available on this system");
522     }
523     else
524     {
525     if (icmp6_fd < 0)
526     croak ("AnyEvent::FastPing: IPv6 ping support not available on this system");
527     }
528    
529     r->interval = SvNV (*av_fetch (av, 2, 1));
530    
531     if (r->interval < req->interval)
532     r->interval = req->interval;
533    
534     r->next = 0.;
535     }
536    
537     RETVAL = newSVpvn ((char *)&req, sizeof (req));
538     }
539     OUTPUT:
540     RETVAL
541    
542     SV *
543     _read_res ()
544     CODE:
545     {
546     REQ *req;
547    
548     if (read (thr_recv [0], &req, sizeof (req)) != sizeof (req))
549     RETVAL = &PL_sv_undef;
550    
551     RETVAL = req->id;
552     free (req->ranges);
553     free (req);
554     }
555     OUTPUT:
556     RETVAL
557    
558     void
559     _recv_icmp4 (...)
560     CODE:
561     {
562     char buf [512];
563     struct sockaddr_in sa;
564     socklen_t sl = sizeof (sa);
565     AV *res_av = av_len (cbs) < 0 ? 0 : (AV *)sv_2mortal ((SV *)newAV ());
566     tstamp now = NOW ();
567    
568     for (;;)
569     {
570 root 1.6 IP4HDR *iphdr = (IP4HDR *)buf;
571 root 1.2 int len = recvfrom (icmp4_fd, buf, sizeof (buf), MSG_TRUNC, (struct sockaddr *)&sa, &sl);
572 root 1.6 int hdrlen, totlen;
573     PKT *pkt;
574 root 1.1
575     if (len <= HDR_SIZE_IP4)
576     break;
577    
578 root 1.6 hdrlen = (iphdr->version_ihl & 15) * 4;
579     totlen = ntohs (iphdr->tot_len);
580 root 1.1
581     // packet corrupt?
582     if (!res_av
583     || totlen > len
584     || iphdr->protocol != IPPROTO_ICMP
585     || hdrlen < HDR_SIZE_IP4 || hdrlen + sizeof (PKT) != totlen)
586     continue;
587    
588 root 1.6 pkt = (PKT *)(buf + hdrlen);
589 root 1.1
590     if (pkt->type != ICMP4_ECHO_REPLY
591     || pkt->id != (uint16_t) magic
592     || pkt->seq != (uint16_t)~magic
593     || !isnormal (pkt->stamp))
594     continue;
595    
596 root 1.6 {
597     AV *av = newAV ();
598     av_push (av, newSVpvn ((char *)&sa.sin_addr, 4));
599     av_push (av, newSVnv (now - pkt->stamp));
600     av_push (av, newSVuv (pkt->payload));
601 root 1.1
602 root 1.6 av_push (res_av, newRV_noinc ((SV *)av));
603     }
604 root 1.1 }
605    
606     if (res_av)
607     feed_reply (res_av);
608     }
609    
610     void
611     _recv_icmp6 (...)
612     CODE:
613     {
614     struct sockaddr_in6 sa;
615     socklen_t sl = sizeof (sa);
616     AV *res_av = av_len (cbs) < 0 ? 0 : (AV *)sv_2mortal ((SV *)newAV ());
617     PKT pkt;
618     tstamp now = NOW ();
619    
620     for (;;)
621     {
622 root 1.2 int len = recvfrom (icmp6_fd, &pkt, sizeof (pkt), MSG_TRUNC, (struct sockaddr *)&sa, &sl);
623 root 1.1
624     if (len != sizeof (PKT))
625     break;
626    
627     if (!res_av
628     || pkt.type != ICMP6_ECHO_REPLY
629     || pkt.id != (uint16_t) magic
630     || pkt.seq != (uint16_t)~magic
631     || !isnormal (pkt.stamp))
632     continue;
633    
634 root 1.6 {
635     AV *av = newAV ();
636     av_push (av, newSVpvn ((char *)&sa.sin6_addr, 16));
637     av_push (av, newSVnv (now - pkt.stamp));
638     av_push (av, newSVuv (pkt.payload));
639 root 1.1
640 root 1.6 av_push (res_av, newRV_noinc ((SV *)av));
641     }
642 root 1.1 }
643    
644     if (res_av)
645     feed_reply (res_av);
646     }
647