ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FastPing/FastPing.xs
Revision: 1.4
Committed: Sun Mar 29 20:07:03 2009 UTC (15 years, 1 month ago) by root
Branch: MAIN
Changes since 1.3: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.2 #define _POSIX_C_SOURCE 200112
2     #define _XOPEN_SOURCE 600
3     #define _LARGEFILE64_SOURCE 1
4 root 1.1
5 root 1.3 #ifdef __APPLE__
6 root 1.2 # define IPV6 0
7     #else
8     # define IPV6 1 // if you get compilation problems try to disable IPv6
9     #endif
10 root 1.1
11     #include "EXTERN.h"
12     #include "perl.h"
13     #include "XSUB.h"
14    
15     #include <pthread.h>
16    
17     #include <math.h>
18     #include <stdio.h>
19     #include <stdlib.h>
20     #include <string.h>
21    
22     #include <time.h>
23     #include <poll.h>
24     #include <unistd.h>
25     #include <inttypes.h>
26     #include <fcntl.h>
27     #include <errno.h>
28    
29     #include <sys/types.h>
30     #include <sys/time.h>
31     #include <sys/socket.h>
32    
33     #include <netinet/in.h>
34     #include <arpa/inet.h>
35    
36     #ifdef __linux
37     # include <linux/icmp.h>
38     #endif
39     #if IPV6
40     # include <netinet/icmp6.h>
41     #endif
42    
43     #define ICMP4_ECHO 8
44     #define ICMP4_ECHO_REPLY 0
45     #define ICMP6_ECHO 128
46     #define ICMP6_ECHO_REPLY 129
47    
48     #define DRAIN_INTERVAL .000001 // how long to wait when sendto returns ENOBUFS, in seconds
49     #define MIN_INTERVAL .000001 // minimum packet send interval, in seconds
50    
51     #define HDR_SIZE_IP4 20
52     #define HDR_SIZE_IP6 48
53    
54 root 1.4 //TODO: xread/xwrite for atomicity? we currently rely on the fact that the pip buffersize divides exactly by pointer sizes
55 root 1.1
56     typedef uint8_t addr_t[16];
57    
58     typedef double tstamp;
59    
60     tstamp
61     NOW ()
62     {
63     struct timeval tv;
64     gettimeofday (&tv, 0);
65     return tv.tv_sec + tv.tv_usec * 0.000001;
66     }
67    
68     typedef struct {
69     int family;
70     addr_t lo, hi;
71     double interval;
72     tstamp next;
73     } RANGE;
74    
75     typedef struct {
76     SV *id;
77     double interval;
78     int nranges;
79     RANGE *ranges;
80     uint32_t payload;
81     } REQ;
82    
83     typedef struct {
84     uint8_t version_ihl;
85     uint8_t tos;
86     uint16_t tot_len;
87    
88     uint16_t id;
89     uint16_t flags;
90    
91     uint8_t ttl;
92     uint8_t protocol;
93     uint16_t cksum;
94    
95     uint32_t src;
96     uint32_t dst;
97     } IP4HDR;
98    
99     typedef struct {
100     uint8_t version;
101     uint8_t x1, x2, x3;
102    
103     uint16_t payload_len;
104     uint8_t nxt_hdr;
105     uint8_t hop_limit;
106    
107     uint8_t src[16];
108     uint8_t dst[16];
109     } IP6HDR;
110    
111     #define MAGIC 0xca4c
112    
113     static uint16_t magic;
114    
115     typedef struct {
116     uint8_t type, code;
117     uint16_t cksum;
118     uint16_t id, seq;
119     uint32_t payload;
120     tstamp stamp; // be careful when accessing this
121     } PKT;
122    
123     static pthread_t pthrid;
124     static int thr_send[2]; // send to worker
125     static int thr_recv[2]; // receive from worker
126    
127     static int icmp4_fd, icmp6_fd;
128    
129     static AV *cbs;
130    
131     static uint16_t
132     icmp_cksum (void *data, unsigned int len)
133     {
134     register int sum = 0;
135     uint16_t *wp;
136    
137     assert (~len & 1);
138    
139     for (wp = (uint16_t *)data; len; wp++, len -= 2)
140     sum += *wp;
141    
142     sum = (sum >> 16) + (sum & 0xffff); /* add high 16 to low 16 */
143     sum += sum >> 16; /* add carry */
144    
145     return ~sum;
146     }
147    
148     static void
149     inc_addr (addr_t *addr)
150     {
151     int len = sizeof (addr_t) - 1;
152    
153     do
154     {
155     if ((*addr)[len] != 0xff)
156     {
157     ++(*addr)[len];
158     break;
159     }
160    
161     (*addr)[len] = 0;
162     }
163     while (len--);
164     }
165    
166     static void *
167     ping_proc (void *unused)
168     {
169     PKT pkt;
170     struct sockaddr_in sa4;
171     #if IPV6
172     struct sockaddr_in6 sa6;
173     #endif
174    
175     memset (&pkt, 0, sizeof (pkt));
176    
177     memset (&sa4, 0, sizeof (sa4));
178     sa4.sin_family = AF_INET;
179     sa4.sin_port = 0;
180     #if IPV6
181     memset (&sa6, 0, sizeof (sa6));
182     sa6.sin6_family = AF_INET6;
183     sa6.sin6_port = 0;
184     #endif
185    
186     for (;;)
187     {
188     REQ *req;
189     int len = read (thr_send [0], &req, sizeof (req));
190    
191     if (!len)
192     pthread_exit (0);
193     else if (len != sizeof (req))
194     {
195     perror ("AnyEvent::FastPing: short reead or read error");
196     pthread_exit ((void *)-1);
197     }
198    
199     //TODO: bind to source address
200    
201     pkt.code = 0;
202     pkt.id = (uint16_t)magic;
203     pkt.seq = (uint16_t)~magic;
204     pkt.payload = req->payload;
205    
206     tstamp now = NOW ();
207     tstamp next = now;
208    
209     {
210     int r;
211     for (r = req->nranges; r--; )
212     inc_addr (&req->ranges [r].hi);
213     }
214    
215     while (req->nranges)
216     {
217     RANGE *range = req->ranges;
218    
219     if (!memcmp (&range->lo, &range->hi, sizeof (addr_t)))
220     req->ranges [0] = req->ranges [--req->nranges];
221     else
222     {
223     // ranges [0] is always the next range to ping
224     tstamp wait = range->next - now;
225    
226     // compare with the global frequency limit
227     {
228     tstamp diff = next - now;
229    
230     if (wait < diff)
231     wait = diff;
232     else if (range)
233     next = range->next;
234     }
235    
236     if (wait > 0.)
237     {
238     struct timespec ts;
239    
240     ts.tv_sec = wait;
241     ts.tv_nsec = (wait - ts.tv_sec) * 1000000000.;
242    
243     nanosleep (&ts, 0);
244     }
245    
246     now = NOW ();
247    
248     pkt.stamp = now;
249     pkt.cksum = 0;
250    
251     if (range->family == AF_INET)
252     {
253     pkt.type = ICMP4_ECHO;
254     pkt.cksum = icmp_cksum (&pkt, sizeof (pkt));
255    
256     memcpy (&sa4.sin_addr,
257     sizeof (addr_t) - sizeof (sa4.sin_addr) + (char *)&range->lo,
258     sizeof (sa4.sin_addr));
259    
260     if (sendto (icmp4_fd, &pkt, sizeof (pkt), 0, (struct sockaddr *)&sa4, sizeof (sa4)) > 0)
261     errno = 0;
262     }
263     else
264     {
265     #if IPV6
266     pkt.type = ICMP6_ECHO;
267    
268     memcpy (&sa6.sin6_addr,
269     sizeof (addr_t) - sizeof (sa6.sin6_addr) + (char *)&range->lo,
270     sizeof (sa6.sin6_addr));
271    
272     if (sendto (icmp6_fd, &pkt, sizeof (pkt), 0, (struct sockaddr *)&sa6, sizeof (sa6)) > 0)
273     errno = 0;
274     #endif
275     }
276    
277     if (errno == ENOBUFS)
278     {
279     struct timespec ts;
280    
281     ts.tv_sec = 0;
282     ts.tv_nsec = DRAIN_INTERVAL * 1000000000;
283    
284     nanosleep (&ts, 0);
285     }
286     else
287     {
288     inc_addr (&range->lo);
289    
290     range->next = next;
291     range->next += range->interval;
292     }
293    
294     next += req->interval;
295     }
296    
297     // make a downheap operation
298     int k = 0;
299     int n = 0;
300     for (;;)
301     {
302     ++n;
303     int j = k * 2 + 1;
304    
305     if (j >= req->nranges)
306     break;
307     else if (j < req->nranges - 1)
308     if (req->ranges [j].next > req->ranges [j + 1].next)
309     ++j;
310    
311     if (req->ranges [j].next >= req->ranges [k].next)
312     break;
313    
314     RANGE temp = req->ranges [k];
315     req->ranges [k] = req->ranges [j];
316     req->ranges [j] = temp;
317    
318     k = j;
319     }
320     }
321    
322     write (thr_recv [1], &req, sizeof (req));
323     }
324    
325     return 0;
326     }
327    
328     static void
329     feed_reply (AV *res_av)
330     {
331     if (av_len (res_av) < 0)
332     return;
333    
334     dSP;
335     SV *res = sv_2mortal (newRV_inc ((SV *)res_av));
336     int i;
337    
338     ENTER;
339     SAVETMPS;
340    
341     for (i = av_len (cbs) + 1; i--; )
342     {
343     SV *cb = *av_fetch (cbs, i, 1);
344    
345     PUSHMARK (SP);
346     XPUSHs (res);
347     PUTBACK;
348     call_sv (cb, G_DISCARD | G_VOID);
349     }
350    
351     FREETMPS;
352     LEAVE;
353     }
354    
355     static void
356     boot ()
357     {
358     sigset_t fullsigset, oldsigset;
359     pthread_attr_t attr;
360    
361     if (pipe (thr_send) < 0)
362     croak ("AnyEvent::FastPing: unable to create send pipe");
363    
364     if (pipe (thr_recv) < 0)
365     croak ("AnyEvent::FastPing: unable to create receive pipe");
366    
367     icmp4_fd = socket (AF_INET, SOCK_RAW, IPPROTO_ICMP);
368 root 1.2 fcntl (icmp4_fd, F_SETFL, O_NONBLOCK);
369 root 1.1 #ifdef ICMP_FILTER
370     {
371     struct icmp_filter oval;
372     oval.data = 0xffffffff & ~(1 << ICMP4_ECHO_REPLY);
373     setsockopt (icmp4_fd, SOL_RAW, ICMP_FILTER, &oval, sizeof oval);
374     }
375     #endif
376    
377     #if IPV6
378     icmp6_fd = socket (AF_INET6, SOCK_RAW, IPPROTO_ICMPV6);
379 root 1.2 fcntl (icmp6_fd, F_SETFL, O_NONBLOCK);
380 root 1.1 # ifdef ICMP6_FILTER
381     {
382     struct icmp6_filter oval;
383     ICMP6_FILTER_SETBLOCKALL (&oval);
384     ICMP6_FILTER_SETPASS (ICMP6_ECHO_REPLY, &oval);
385     setsockopt (icmp6_fd, IPPROTO_ICMPV6, ICMP6_FILTER, &oval, sizeof oval);
386     }
387     # endif
388     #endif
389    
390     pthread_attr_init (&attr);
391     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
392     #ifdef PTHREAD_SCOPE_PROCESS
393     pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS);
394     #endif
395    
396     sigfillset (&fullsigset);
397    
398     pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);
399    
400     if (pthread_create (&pthrid, &attr, ping_proc, 0))
401     croak ("AnyEvent::FastPing: unable to create pinger thread");
402    
403     pthread_sigmask (SIG_SETMASK, &oldsigset, 0);
404    
405     sv_setiv (get_sv ("AnyEvent::FastPing::THR_REQ_FD", 1), thr_send [1]);
406     sv_setiv (get_sv ("AnyEvent::FastPing::THR_RES_FD", 1), thr_recv [0]);
407    
408     sv_setiv (get_sv ("AnyEvent::FastPing::ICMP4_FD", 1), icmp4_fd);
409     sv_setiv (get_sv ("AnyEvent::FastPing::ICMP6_FD", 1), icmp6_fd);
410     }
411    
412     MODULE = AnyEvent::FastPing PACKAGE = AnyEvent::FastPing
413    
414     BOOT:
415     {
416     HV *stash = gv_stashpv ("AnyEvent::FastPing", 1);
417    
418     cbs = get_av ("AnyEvent::FastPing::CB", 1);
419     magic = getpid () ^ MAGIC;
420    
421     boot ();
422    
423     newCONSTSUB (stash, "ipv4_supported", newSViv (icmp4_fd >= 0));
424     newCONSTSUB (stash, "ipv6_supported", newSViv (icmp6_fd >= 0));
425    
426     newCONSTSUB (stash, "icmp4_pktsize", newSViv (HDR_SIZE_IP4 + sizeof (PKT)));
427     newCONSTSUB (stash, "icmp6_pktsize", newSViv (HDR_SIZE_IP6 + sizeof (PKT)));
428     }
429    
430     PROTOTYPES: DISABLE
431    
432     SV *
433     _req_icmp_ping (SV *ranges, NV interval, U32 payload, SV *id)
434     CODE:
435     {
436     if (!SvROK (ranges) || SvTYPE (SvRV (ranges)) != SVt_PVAV)
437     croak ("address ranges must be given as arrayref with lo, hi pairs");
438    
439     AV *rav = (AV *)SvRV (ranges);
440     int nranges = av_len (rav) + 1;
441    
442     REQ *req = malloc (sizeof (REQ));
443     int i;
444    
445     if (interval < MIN_INTERVAL)
446     interval = MIN_INTERVAL;
447    
448     req->id = newSVsv (id);
449     req->interval = interval;
450     req->payload = payload;
451     req->nranges = nranges;
452     req->ranges = (RANGE *)malloc (nranges * sizeof (RANGE));
453    
454     while (nranges--)
455     {
456     SV *sv = *av_fetch (rav, nranges, 1);
457    
458     if (!SvROK (sv) || SvTYPE (SvRV (sv)) != SVt_PVAV)
459     croak ("address range must be given as arrayref with lo, hi, interval arrayrefs");
460    
461     AV *av = (AV *)SvRV (sv);
462     RANGE *r = req->ranges + nranges;
463    
464     SV *lo = *av_fetch (av, 0, 1);
465     SV *hi = *av_fetch (av, 1, 1);
466    
467     sv_utf8_downgrade (lo, 0);
468     sv_utf8_downgrade (hi, 0);
469    
470     memset (&r->lo, 0, sizeof (addr_t));
471     memset (&r->hi, 0, sizeof (addr_t));
472    
473     if (SvPOKp (lo) && SvPOKp (hi))
474     {
475     if (SvCUR (lo) != SvCUR (hi))
476     croak ("addresses in range must be of the same size (either 4 or 16 bytes)");
477    
478     if (SvCUR (lo) == 4)
479     {
480     r->family = AF_INET;
481     memcpy (sizeof (addr_t) - 4 + (char *)&r->lo, SvPVX (lo), 4);
482     memcpy (sizeof (addr_t) - 4 + (char *)&r->hi, SvPVX (hi), 4);
483     }
484     else if (SvCUR (lo) == 16)
485     {
486     #if IPV6
487     r->family = AF_INET6;
488     memcpy (&r->lo, SvPVX (lo), sizeof (addr_t));
489     memcpy (&r->hi, SvPVX (hi), sizeof (addr_t));
490     #else
491     croak ("IPv6 not supported in this configuration");
492     #endif
493     }
494     else
495     croak ("addresses in range must be either 4 (IPv4) or 16 (IPV6) bytes in length");
496     }
497     else if (SvIOK (lo) && SvIOK (hi))
498     {
499     r->family = AF_INET;
500    
501     uint32_t addr;
502     addr = htonl (SvUV (lo)); memcpy (sizeof (addr_t) - 4 + (char *)&r->lo, &addr, 4);
503     addr = htonl (SvUV (hi)); memcpy (sizeof (addr_t) - 4 + (char *)&r->hi, &addr, 4);
504     }
505     else
506     croak ("addresses in range must be strings with either 4 (IPv4) or 16 (IPv6) octets");
507    
508     if (r->family == AF_INET)
509     {
510     if (icmp4_fd < 0)
511     croak ("AnyEvent::FastPing: IPv4 ping support not available on this system");
512     }
513     else
514     {
515     if (icmp6_fd < 0)
516     croak ("AnyEvent::FastPing: IPv6 ping support not available on this system");
517     }
518    
519     r->interval = SvNV (*av_fetch (av, 2, 1));
520    
521     if (r->interval < req->interval)
522     r->interval = req->interval;
523    
524     r->next = 0.;
525     }
526    
527     RETVAL = newSVpvn ((char *)&req, sizeof (req));
528     }
529     OUTPUT:
530     RETVAL
531    
532     SV *
533     _read_res ()
534     CODE:
535     {
536     REQ *req;
537    
538     if (read (thr_recv [0], &req, sizeof (req)) != sizeof (req))
539     RETVAL = &PL_sv_undef;
540    
541     RETVAL = req->id;
542     free (req->ranges);
543     free (req);
544     }
545     OUTPUT:
546     RETVAL
547    
548     void
549     _recv_icmp4 (...)
550     CODE:
551     {
552     char buf [512];
553     struct sockaddr_in sa;
554     socklen_t sl = sizeof (sa);
555     AV *res_av = av_len (cbs) < 0 ? 0 : (AV *)sv_2mortal ((SV *)newAV ());
556     tstamp now = NOW ();
557    
558     for (;;)
559     {
560 root 1.2 int len = recvfrom (icmp4_fd, buf, sizeof (buf), MSG_TRUNC, (struct sockaddr *)&sa, &sl);
561 root 1.1
562     if (len <= HDR_SIZE_IP4)
563     break;
564    
565     IP4HDR *iphdr = (IP4HDR *)buf;
566    
567     int hdrlen = (iphdr->version_ihl & 15) * 4;
568     int totlen = ntohs (iphdr->tot_len);
569    
570     // packet corrupt?
571     if (!res_av
572     || totlen > len
573     || iphdr->protocol != IPPROTO_ICMP
574     || hdrlen < HDR_SIZE_IP4 || hdrlen + sizeof (PKT) != totlen)
575     continue;
576    
577     PKT *pkt = (PKT *)(buf + hdrlen);
578    
579     if (pkt->type != ICMP4_ECHO_REPLY
580     || pkt->id != (uint16_t) magic
581     || pkt->seq != (uint16_t)~magic
582     || !isnormal (pkt->stamp))
583     continue;
584    
585     AV *av = newAV ();
586     av_push (av, newSVpvn ((char *)&sa.sin_addr, 4));
587     av_push (av, newSVnv (now - pkt->stamp));
588     av_push (av, newSVuv (pkt->payload));
589    
590     av_push (res_av, newRV_noinc ((SV *)av));
591     }
592    
593     if (res_av)
594     feed_reply (res_av);
595     }
596    
597     void
598     _recv_icmp6 (...)
599     CODE:
600     {
601     struct sockaddr_in6 sa;
602     socklen_t sl = sizeof (sa);
603     AV *res_av = av_len (cbs) < 0 ? 0 : (AV *)sv_2mortal ((SV *)newAV ());
604     PKT pkt;
605     tstamp now = NOW ();
606    
607     for (;;)
608     {
609 root 1.2 int len = recvfrom (icmp6_fd, &pkt, sizeof (pkt), MSG_TRUNC, (struct sockaddr *)&sa, &sl);
610 root 1.1
611     if (len != sizeof (PKT))
612     break;
613    
614     if (!res_av
615     || pkt.type != ICMP6_ECHO_REPLY
616     || pkt.id != (uint16_t) magic
617     || pkt.seq != (uint16_t)~magic
618     || !isnormal (pkt.stamp))
619     continue;
620    
621     AV *av = newAV ();
622     av_push (av, newSVpvn ((char *)&sa.sin6_addr, 16));
623     av_push (av, newSVnv (now - pkt.stamp));
624     av_push (av, newSVuv (pkt.payload));
625    
626     av_push (res_av, newRV_noinc ((SV *)av));
627     }
628    
629     if (res_av)
630     feed_reply (res_av);
631     }
632