ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FastPing/FastPing.xs
Revision: 1.9
Committed: Sun Jan 30 02:03:35 2011 UTC (13 years, 3 months ago) by root
Branch: MAIN
Changes since 1.8: +85 -44 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.8 #if defined(__linux) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) || defined(__CYGWIN__)
2 root 1.5 # define ENABLE_IPV6 1 // if you get compilation problems try to disable IPv6
3 root 1.2 #else
4 root 1.5 # define ENABLE_IPV6 0
5 root 1.2 #endif
6 root 1.1
7     #include "EXTERN.h"
8     #include "perl.h"
9     #include "XSUB.h"
10    
11     #include <pthread.h>
12    
13     #include <stdio.h>
14     #include <stdlib.h>
15     #include <string.h>
16    
17     #include <time.h>
18     #include <poll.h>
19     #include <unistd.h>
20     #include <inttypes.h>
21     #include <fcntl.h>
22     #include <errno.h>
23    
24     #include <sys/types.h>
25     #include <sys/time.h>
26     #include <sys/socket.h>
27    
28     #include <netinet/in.h>
29     #include <arpa/inet.h>
30    
31     #ifdef __linux
32     # include <linux/icmp.h>
33     #endif
34 root 1.8 #if ENABLE_IPV6 && !defined (__CYGWIN__)
35 root 1.1 # include <netinet/icmp6.h>
36     #endif
37    
38     #define ICMP4_ECHO 8
39     #define ICMP4_ECHO_REPLY 0
40     #define ICMP6_ECHO 128
41     #define ICMP6_ECHO_REPLY 129
42    
43 root 1.9 #define DRAIN_INTERVAL 1e-6 // how long to wait when sendto returns ENOBUFS, in seconds
44     #define MIN_INTERVAL 1e-6 // minimum packet send interval, in seconds
45 root 1.1
46     #define HDR_SIZE_IP4 20
47     #define HDR_SIZE_IP6 48
48    
49 root 1.4 //TODO: xread/xwrite for atomicity? we currently rely on the fact that the pip buffersize divides exactly by pointer sizes
50 root 1.1
51 root 1.8 typedef uint8_t addr_tt[16];
52 root 1.1
53 root 1.9 /*****************************************************************************/
54    
55 root 1.1 typedef double tstamp;
56    
57 root 1.5 static tstamp
58     NOW (void)
59 root 1.1 {
60     struct timeval tv;
61 root 1.9
62 root 1.1 gettimeofday (&tv, 0);
63 root 1.9
64     return tv.tv_sec + tv.tv_usec * 1e-6;
65 root 1.1 }
66    
67 root 1.9 static void
68     ssleep (tstamp wait)
69     {
70     #if defined (__SVR4) && defined (__sun)
71     struct timeval tv;
72    
73     tv.tv_sec = wait;
74     tv.tv_usec = (wait - tv.tv_sec) * 1e6;
75    
76     select (0, 0, 0, 0, &tv);
77     #elif defined(_WIN32)
78     Sleep ((unsigned long)(delay * 1e3));
79     #else
80     struct timespec ts;
81    
82     ts.tv_sec = wait;
83     ts.tv_nsec = (wait - ts.tv_sec) * 1e9;
84    
85     nanosleep (&ts, 0);
86     #endif
87     }
88    
89     /*****************************************************************************/
90    
91 root 1.8 typedef struct
92     {
93 root 1.1 int family;
94 root 1.8 addr_tt lo, hi;
95 root 1.1 double interval;
96     tstamp next;
97     } RANGE;
98    
99 root 1.8 typedef struct
100     {
101 root 1.1 SV *id;
102     double interval;
103     int nranges;
104     RANGE *ranges;
105     uint32_t payload;
106     } REQ;
107    
108 root 1.8 typedef struct
109     {
110 root 1.1 uint8_t version_ihl;
111     uint8_t tos;
112     uint16_t tot_len;
113    
114     uint16_t id;
115     uint16_t flags;
116    
117     uint8_t ttl;
118     uint8_t protocol;
119     uint16_t cksum;
120    
121     uint32_t src;
122     uint32_t dst;
123     } IP4HDR;
124    
125 root 1.8 typedef struct
126     {
127 root 1.1 uint8_t version;
128     uint8_t x1, x2, x3;
129    
130     uint16_t payload_len;
131     uint8_t nxt_hdr;
132     uint8_t hop_limit;
133    
134     uint8_t src[16];
135     uint8_t dst[16];
136     } IP6HDR;
137    
138 root 1.9 /*****************************************************************************/
139    
140 root 1.1 #define MAGIC 0xca4c
141    
142     static uint16_t magic;
143    
144 root 1.8 typedef struct
145     {
146 root 1.1 uint8_t type, code;
147     uint16_t cksum;
148     uint16_t id, seq;
149     uint32_t payload;
150 root 1.9 uint32_t stamp_hi;
151     uint32_t stamp_lo;
152 root 1.1 } PKT;
153    
154 root 1.9 static int
155     pkt_is_valid (PKT *pkt)
156     {
157     return pkt->id == (uint16_t) magic
158     && pkt->seq == (uint16_t)~magic;
159     }
160    
161     static void
162     ts_to_pkt (PKT *pkt, tstamp ts)
163     {
164     /* move 12 bits of seconds into the 32 bit fractional part */
165     /* leaving 20 bits subsecond resolution and 44 bits of integers */
166     /* (of which 32 are typically usable) */
167     ts *= 1. / 4096.;
168    
169     pkt->stamp_hi = ts;
170     pkt->stamp_lo = (ts - pkt->stamp_hi) * 4294967296.;
171     }
172    
173     static tstamp
174     pkt_to_ts (PKT *pkt)
175     {
176     return pkt->stamp_hi * 4096.
177     + pkt->stamp_lo * (4096. / 4294967296.);
178     }
179    
180     /*****************************************************************************/
181    
182 root 1.1 static pthread_t pthrid;
183     static int thr_send[2]; // send to worker
184     static int thr_recv[2]; // receive from worker
185    
186     static int icmp4_fd, icmp6_fd;
187    
188     static AV *cbs;
189    
190     static uint16_t
191     icmp_cksum (void *data, unsigned int len)
192     {
193     register int sum = 0;
194 root 1.9 uint32_t *wp;
195 root 1.1
196 root 1.9 for (wp = (uint32_t *)data; len; wp++, len -= 4)
197     sum += (*wp & 0xffff) + (*wp >> 16);
198 root 1.1
199     sum = (sum >> 16) + (sum & 0xffff); /* add high 16 to low 16 */
200     sum += sum >> 16; /* add carry */
201    
202     return ~sum;
203     }
204    
205     static void
206 root 1.8 inc_addr (addr_tt *addr)
207 root 1.1 {
208 root 1.8 int len = sizeof (addr_tt) - 1;
209 root 1.1
210     do
211     {
212     if ((*addr)[len] != 0xff)
213     {
214     ++(*addr)[len];
215     break;
216     }
217    
218     (*addr)[len] = 0;
219     }
220     while (len--);
221     }
222    
223     static void *
224     ping_proc (void *unused)
225     {
226     PKT pkt;
227     struct sockaddr_in sa4;
228 root 1.5 #if ENABLE_IPV6
229 root 1.1 struct sockaddr_in6 sa6;
230     #endif
231    
232     memset (&pkt, 0, sizeof (pkt));
233    
234     memset (&sa4, 0, sizeof (sa4));
235     sa4.sin_family = AF_INET;
236     sa4.sin_port = 0;
237 root 1.5 #if ENABLE_IPV6
238 root 1.1 memset (&sa6, 0, sizeof (sa6));
239     sa6.sin6_family = AF_INET6;
240     sa6.sin6_port = 0;
241     #endif
242    
243     for (;;)
244     {
245     REQ *req;
246     int len = read (thr_send [0], &req, sizeof (req));
247    
248 root 1.6 tstamp now = NOW ();
249     tstamp next = now;
250    
251 root 1.1 if (!len)
252     pthread_exit (0);
253     else if (len != sizeof (req))
254     {
255 root 1.7 perror ("AnyEvent::FastPing: short read or read error");
256 root 1.1 pthread_exit ((void *)-1);
257     }
258    
259     //TODO: bind to source address
260    
261     pkt.code = 0;
262     pkt.id = (uint16_t)magic;
263     pkt.seq = (uint16_t)~magic;
264     pkt.payload = req->payload;
265    
266     {
267     int r;
268     for (r = req->nranges; r--; )
269     inc_addr (&req->ranges [r].hi);
270     }
271    
272     while (req->nranges)
273     {
274     RANGE *range = req->ranges;
275 root 1.6 int n, k;
276 root 1.1
277 root 1.8 if (!memcmp (&range->lo, &range->hi, sizeof (addr_tt)))
278 root 1.1 req->ranges [0] = req->ranges [--req->nranges];
279     else
280     {
281     // ranges [0] is always the next range to ping
282     tstamp wait = range->next - now;
283    
284     // compare with the global frequency limit
285     {
286     tstamp diff = next - now;
287    
288     if (wait < diff)
289     wait = diff;
290     else if (range)
291     next = range->next;
292     }
293    
294     if (wait > 0.)
295 root 1.9 ssleep (wait);
296 root 1.1
297     now = NOW ();
298    
299 root 1.9 ts_to_pkt (&pkt, now);
300 root 1.1 pkt.cksum = 0;
301    
302     if (range->family == AF_INET)
303     {
304     pkt.type = ICMP4_ECHO;
305     pkt.cksum = icmp_cksum (&pkt, sizeof (pkt));
306    
307     memcpy (&sa4.sin_addr,
308 root 1.8 sizeof (addr_tt) - sizeof (sa4.sin_addr) + (char *)&range->lo,
309 root 1.1 sizeof (sa4.sin_addr));
310    
311     if (sendto (icmp4_fd, &pkt, sizeof (pkt), 0, (struct sockaddr *)&sa4, sizeof (sa4)) > 0)
312     errno = 0;
313     }
314     else
315     {
316 root 1.5 #if ENABLE_IPV6
317 root 1.1 pkt.type = ICMP6_ECHO;
318    
319     memcpy (&sa6.sin6_addr,
320 root 1.8 sizeof (addr_tt) - sizeof (sa6.sin6_addr) + (char *)&range->lo,
321 root 1.1 sizeof (sa6.sin6_addr));
322    
323     if (sendto (icmp6_fd, &pkt, sizeof (pkt), 0, (struct sockaddr *)&sa6, sizeof (sa6)) > 0)
324     errno = 0;
325     #endif
326     }
327    
328     if (errno == ENOBUFS)
329 root 1.9 ssleep (DRAIN_INTERVAL);
330 root 1.1 else
331     {
332     inc_addr (&range->lo);
333    
334     range->next = next;
335     range->next += range->interval;
336     }
337    
338     next += req->interval;
339     }
340    
341     // make a downheap operation
342 root 1.6 for (n = k = 0; ; )
343 root 1.1 {
344 root 1.6 int j = k * 2 + 1;
345    
346 root 1.1 ++n;
347    
348     if (j >= req->nranges)
349     break;
350     else if (j < req->nranges - 1)
351     if (req->ranges [j].next > req->ranges [j + 1].next)
352     ++j;
353    
354     if (req->ranges [j].next >= req->ranges [k].next)
355     break;
356    
357 root 1.6 {
358     RANGE temp = req->ranges [k];
359     req->ranges [k] = req->ranges [j];
360     req->ranges [j] = temp;
361     }
362 root 1.1
363     k = j;
364     }
365     }
366    
367     write (thr_recv [1], &req, sizeof (req));
368     }
369    
370     return 0;
371     }
372    
373     static void
374     feed_reply (AV *res_av)
375     {
376     dSP;
377     SV *res = sv_2mortal (newRV_inc ((SV *)res_av));
378     int i;
379    
380 root 1.6 if (av_len (res_av) < 0)
381     return;
382    
383 root 1.1 ENTER;
384     SAVETMPS;
385    
386     for (i = av_len (cbs) + 1; i--; )
387     {
388     SV *cb = *av_fetch (cbs, i, 1);
389    
390     PUSHMARK (SP);
391     XPUSHs (res);
392     PUTBACK;
393     call_sv (cb, G_DISCARD | G_VOID);
394     }
395    
396     FREETMPS;
397     LEAVE;
398     }
399    
400     static void
401     boot ()
402     {
403     sigset_t fullsigset, oldsigset;
404     pthread_attr_t attr;
405    
406     if (pipe (thr_send) < 0)
407     croak ("AnyEvent::FastPing: unable to create send pipe");
408    
409     if (pipe (thr_recv) < 0)
410     croak ("AnyEvent::FastPing: unable to create receive pipe");
411    
412     icmp4_fd = socket (AF_INET, SOCK_RAW, IPPROTO_ICMP);
413 root 1.2 fcntl (icmp4_fd, F_SETFL, O_NONBLOCK);
414 root 1.1 #ifdef ICMP_FILTER
415     {
416     struct icmp_filter oval;
417     oval.data = 0xffffffff & ~(1 << ICMP4_ECHO_REPLY);
418     setsockopt (icmp4_fd, SOL_RAW, ICMP_FILTER, &oval, sizeof oval);
419     }
420     #endif
421    
422 root 1.5 #if ENABLE_IPV6
423 root 1.1 icmp6_fd = socket (AF_INET6, SOCK_RAW, IPPROTO_ICMPV6);
424 root 1.2 fcntl (icmp6_fd, F_SETFL, O_NONBLOCK);
425 root 1.1 # ifdef ICMP6_FILTER
426     {
427     struct icmp6_filter oval;
428     ICMP6_FILTER_SETBLOCKALL (&oval);
429     ICMP6_FILTER_SETPASS (ICMP6_ECHO_REPLY, &oval);
430     setsockopt (icmp6_fd, IPPROTO_ICMPV6, ICMP6_FILTER, &oval, sizeof oval);
431     }
432     # endif
433     #endif
434    
435     pthread_attr_init (&attr);
436     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
437     #ifdef PTHREAD_SCOPE_PROCESS
438     pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS);
439     #endif
440    
441     sigfillset (&fullsigset);
442    
443     pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);
444    
445     if (pthread_create (&pthrid, &attr, ping_proc, 0))
446     croak ("AnyEvent::FastPing: unable to create pinger thread");
447    
448     pthread_sigmask (SIG_SETMASK, &oldsigset, 0);
449    
450     sv_setiv (get_sv ("AnyEvent::FastPing::THR_REQ_FD", 1), thr_send [1]);
451     sv_setiv (get_sv ("AnyEvent::FastPing::THR_RES_FD", 1), thr_recv [0]);
452    
453     sv_setiv (get_sv ("AnyEvent::FastPing::ICMP4_FD", 1), icmp4_fd);
454     sv_setiv (get_sv ("AnyEvent::FastPing::ICMP6_FD", 1), icmp6_fd);
455     }
456    
457     MODULE = AnyEvent::FastPing PACKAGE = AnyEvent::FastPing
458    
459     BOOT:
460     {
461     HV *stash = gv_stashpv ("AnyEvent::FastPing", 1);
462    
463     cbs = get_av ("AnyEvent::FastPing::CB", 1);
464     magic = getpid () ^ MAGIC;
465    
466 root 1.9 if (sizeof (PKT) & 3)
467     croak ("size of PKT structure is not a multiple of 4");
468    
469 root 1.1 boot ();
470    
471     newCONSTSUB (stash, "ipv4_supported", newSViv (icmp4_fd >= 0));
472     newCONSTSUB (stash, "ipv6_supported", newSViv (icmp6_fd >= 0));
473    
474     newCONSTSUB (stash, "icmp4_pktsize", newSViv (HDR_SIZE_IP4 + sizeof (PKT)));
475     newCONSTSUB (stash, "icmp6_pktsize", newSViv (HDR_SIZE_IP6 + sizeof (PKT)));
476     }
477    
478     PROTOTYPES: DISABLE
479    
480     SV *
481     _req_icmp_ping (SV *ranges, NV interval, U32 payload, SV *id)
482     CODE:
483     {
484 root 1.6 AV *rav;
485     int nranges, i;
486     REQ *req;
487    
488 root 1.1 if (!SvROK (ranges) || SvTYPE (SvRV (ranges)) != SVt_PVAV)
489     croak ("address ranges must be given as arrayref with lo, hi pairs");
490    
491 root 1.6 rav = (AV *)SvRV (ranges);
492     nranges = av_len (rav) + 1;
493 root 1.1
494 root 1.6 req = malloc (sizeof (REQ));
495 root 1.1
496     if (interval < MIN_INTERVAL)
497     interval = MIN_INTERVAL;
498    
499     req->id = newSVsv (id);
500     req->interval = interval;
501     req->payload = payload;
502     req->nranges = nranges;
503     req->ranges = (RANGE *)malloc (nranges * sizeof (RANGE));
504    
505     while (nranges--)
506     {
507     SV *sv = *av_fetch (rav, nranges, 1);
508 root 1.6 SV *lo, *hi;
509     AV *av;
510     RANGE *r;
511 root 1.1
512     if (!SvROK (sv) || SvTYPE (SvRV (sv)) != SVt_PVAV)
513     croak ("address range must be given as arrayref with lo, hi, interval arrayrefs");
514    
515 root 1.6 av = (AV *)SvRV (sv);
516     r = req->ranges + nranges;
517 root 1.1
518 root 1.6 lo = *av_fetch (av, 0, 1);
519     hi = *av_fetch (av, 1, 1);
520 root 1.1
521     sv_utf8_downgrade (lo, 0);
522     sv_utf8_downgrade (hi, 0);
523    
524 root 1.8 memset (&r->lo, 0, sizeof (addr_tt));
525     memset (&r->hi, 0, sizeof (addr_tt));
526 root 1.1
527     if (SvPOKp (lo) && SvPOKp (hi))
528     {
529     if (SvCUR (lo) != SvCUR (hi))
530 root 1.5 croak ("all addresses in range must be of the same size (either 4 or 16 bytes)");
531 root 1.1
532     if (SvCUR (lo) == 4)
533     {
534     r->family = AF_INET;
535 root 1.8 memcpy (sizeof (addr_tt) - 4 + (char *)&r->lo, SvPVX (lo), 4);
536     memcpy (sizeof (addr_tt) - 4 + (char *)&r->hi, SvPVX (hi), 4);
537 root 1.1 }
538     else if (SvCUR (lo) == 16)
539     {
540 root 1.5 #if ENABLE_IPV6
541 root 1.1 r->family = AF_INET6;
542 root 1.8 memcpy (&r->lo, SvPVX (lo), sizeof (addr_tt));
543     memcpy (&r->hi, SvPVX (hi), sizeof (addr_tt));
544 root 1.1 #else
545     croak ("IPv6 not supported in this configuration");
546     #endif
547     }
548     else
549 root 1.5 croak ("addresses in range must be either 4 (IPv4) or 16 (IPv6) bytes in length");
550 root 1.1 }
551     else if (SvIOK (lo) && SvIOK (hi))
552     {
553 root 1.6 uint32_t addr;
554    
555 root 1.1 r->family = AF_INET;
556    
557 root 1.8 addr = htonl (SvUV (lo)); memcpy (sizeof (addr_tt) - 4 + (char *)&r->lo, &addr, 4);
558     addr = htonl (SvUV (hi)); memcpy (sizeof (addr_tt) - 4 + (char *)&r->hi, &addr, 4);
559 root 1.1 }
560     else
561     croak ("addresses in range must be strings with either 4 (IPv4) or 16 (IPv6) octets");
562    
563     if (r->family == AF_INET)
564     {
565     if (icmp4_fd < 0)
566     croak ("AnyEvent::FastPing: IPv4 ping support not available on this system");
567     }
568     else
569     {
570     if (icmp6_fd < 0)
571     croak ("AnyEvent::FastPing: IPv6 ping support not available on this system");
572     }
573    
574     r->interval = SvNV (*av_fetch (av, 2, 1));
575    
576     if (r->interval < req->interval)
577     r->interval = req->interval;
578    
579     r->next = 0.;
580     }
581    
582     RETVAL = newSVpvn ((char *)&req, sizeof (req));
583     }
584     OUTPUT:
585     RETVAL
586    
587     SV *
588     _read_res ()
589     CODE:
590     {
591     REQ *req;
592    
593     if (read (thr_recv [0], &req, sizeof (req)) != sizeof (req))
594     RETVAL = &PL_sv_undef;
595    
596     RETVAL = req->id;
597     free (req->ranges);
598     free (req);
599     }
600     OUTPUT:
601     RETVAL
602    
603     void
604     _recv_icmp4 (...)
605     CODE:
606     {
607     char buf [512];
608     struct sockaddr_in sa;
609     socklen_t sl = sizeof (sa);
610     AV *res_av = av_len (cbs) < 0 ? 0 : (AV *)sv_2mortal ((SV *)newAV ());
611     tstamp now = NOW ();
612    
613 root 1.9 if (!res_av)
614     XSRETURN_UNDEF;
615    
616 root 1.1 for (;;)
617     {
618 root 1.6 IP4HDR *iphdr = (IP4HDR *)buf;
619 root 1.2 int len = recvfrom (icmp4_fd, buf, sizeof (buf), MSG_TRUNC, (struct sockaddr *)&sa, &sl);
620 root 1.6 int hdrlen, totlen;
621     PKT *pkt;
622 root 1.1
623     if (len <= HDR_SIZE_IP4)
624     break;
625    
626 root 1.6 hdrlen = (iphdr->version_ihl & 15) * 4;
627     totlen = ntohs (iphdr->tot_len);
628 root 1.1
629     // packet corrupt?
630 root 1.9 if (totlen > len
631 root 1.1 || iphdr->protocol != IPPROTO_ICMP
632     || hdrlen < HDR_SIZE_IP4 || hdrlen + sizeof (PKT) != totlen)
633     continue;
634    
635 root 1.6 pkt = (PKT *)(buf + hdrlen);
636 root 1.1
637 root 1.9 if (pkt->type != ICMP4_ECHO_REPLY || !pkt_is_valid (pkt))
638 root 1.1 continue;
639    
640 root 1.6 {
641     AV *av = newAV ();
642     av_push (av, newSVpvn ((char *)&sa.sin_addr, 4));
643 root 1.9 av_push (av, newSVnv (now - pkt_to_ts (pkt)));
644 root 1.6 av_push (av, newSVuv (pkt->payload));
645 root 1.1
646 root 1.6 av_push (res_av, newRV_noinc ((SV *)av));
647     }
648 root 1.1 }
649    
650 root 1.9 feed_reply (res_av);
651 root 1.1 }
652    
653     void
654     _recv_icmp6 (...)
655     CODE:
656     {
657     struct sockaddr_in6 sa;
658     socklen_t sl = sizeof (sa);
659     AV *res_av = av_len (cbs) < 0 ? 0 : (AV *)sv_2mortal ((SV *)newAV ());
660     PKT pkt;
661     tstamp now = NOW ();
662    
663 root 1.9 if (!res_av)
664     XSRETURN_UNDEF;
665    
666 root 1.1 for (;;)
667     {
668 root 1.2 int len = recvfrom (icmp6_fd, &pkt, sizeof (pkt), MSG_TRUNC, (struct sockaddr *)&sa, &sl);
669 root 1.1
670     if (len != sizeof (PKT))
671     break;
672    
673 root 1.9 if (pkt.type != ICMP6_ECHO_REPLY || !pkt_is_valid (&pkt))
674 root 1.1 continue;
675    
676 root 1.6 {
677     AV *av = newAV ();
678     av_push (av, newSVpvn ((char *)&sa.sin6_addr, 16));
679 root 1.9 av_push (av, newSVnv (now - pkt_to_ts (&pkt)));
680 root 1.6 av_push (av, newSVuv (pkt.payload));
681 root 1.1
682 root 1.6 av_push (res_av, newRV_noinc ((SV *)av));
683     }
684 root 1.1 }
685    
686 root 1.9 feed_reply (res_av);
687 root 1.1 }
688