ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FastPing/FPing.xs
Revision: 1.8
Committed: Sun Apr 27 15:43:51 2008 UTC (16 years ago) by root
Branch: MAIN
CVS Tags: HEAD
Changes since 1.7: +0 -0 lines
State: FILE REMOVED
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #define _POSIX_C_SOURCE 199309
2     #define _GNU_SOURCE 1
3    
4 root 1.2 #define IPV6 1 // if you get compilation problems try to disable IPv6
5 root 1.1
6     #include "EXTERN.h"
7     #include "perl.h"
8     #include "XSUB.h"
9    
10     #include <pthread.h>
11    
12     #include <math.h>
13     #include <stdio.h>
14     #include <stdlib.h>
15     #include <string.h>
16    
17     #include <time.h>
18     #include <poll.h>
19     #include <unistd.h>
20     #include <inttypes.h>
21     #include <fcntl.h>
22     #include <errno.h>
23    
24     #include <sys/types.h>
25     #include <sys/time.h>
26     #include <sys/socket.h>
27    
28     #include <netinet/in.h>
29     #include <arpa/inet.h>
30    
31 root 1.4 #ifdef __linux
32     # include <linux/icmp.h>
33     #endif
34 root 1.5 #if IPV6
35     # include <netinet/icmp6.h>
36     #endif
37 root 1.4
38     #define ICMP4_ECHO 8
39     #define ICMP4_ECHO_REPLY 0
40     #define ICMP6_ECHO 128
41     #define ICMP6_ECHO_REPLY 129
42 root 1.1
43     #define DRAIN_INTERVAL .000001 // how long to wait when sendto returns ENOBUFS, in seconds
44     #define MIN_INTERVAL .000001 // minimum packet send interval, in seconds
45    
46     #define HDR_SIZE_IP4 20
47     #define HDR_SIZE_IP6 48
48    
49     //TODO: xread/xwrite for atomicity? we currently rely on the fact that the pip biffersize divides exactly by pointer sizes
50    
51     typedef uint8_t addr_t[16];
52    
53     typedef double tstamp;
54    
55     tstamp
56     NOW ()
57     {
58     struct timeval tv;
59     gettimeofday (&tv, 0);
60     return tv.tv_sec + tv.tv_usec * 0.000001;
61     }
62    
63     typedef struct {
64     int family;
65     addr_t lo, hi;
66     double interval;
67     tstamp next;
68     } RANGE;
69    
70     typedef struct {
71     SV *id;
72     double interval;
73     int nranges;
74     RANGE *ranges;
75     uint32_t payload;
76     } REQ;
77    
78     typedef struct {
79     uint8_t version_ihl;
80     uint8_t tos;
81     uint16_t tot_len;
82    
83     uint16_t id;
84     uint16_t flags;
85    
86     uint8_t ttl;
87     uint8_t protocol;
88     uint16_t cksum;
89    
90     uint32_t src;
91     uint32_t dst;
92     } IP4HDR;
93    
94     typedef struct {
95     uint8_t version;
96     uint8_t x1, x2, x3;
97    
98     uint16_t payload_len;
99     uint8_t nxt_hdr;
100     uint8_t hop_limit;
101    
102     uint8_t src[16];
103     uint8_t dst[16];
104     } IP6HDR;
105    
106     #define MAGIC 0xca4c
107    
108 root 1.7 static uint16_t magic;
109    
110 root 1.1 typedef struct {
111     uint8_t type, code;
112     uint16_t cksum;
113     uint16_t id, seq;
114 root 1.2 uint32_t payload;
115 root 1.1 tstamp stamp; // be careful when accessing this
116     } PKT;
117    
118 root 1.3 static pthread_t pthrid;
119     static int thr_send[2]; // send to worker
120     static int thr_recv[2]; // receive from worker
121 root 1.1
122 root 1.3 static int icmp4_fd, icmp6_fd;
123    
124     static AV *cbs;
125 root 1.1
126 root 1.2 static uint16_t
127 root 1.1 icmp_cksum (void *data, unsigned int len)
128     {
129     register int sum = 0;
130     uint16_t *wp;
131    
132     assert (~len & 1);
133    
134     for (wp = (uint16_t *)data; len; wp++, len -= 2)
135     sum += *wp;
136    
137     sum = (sum >> 16) + (sum & 0xffff); /* add high 16 to low 16 */
138     sum += sum >> 16; /* add carry */
139    
140     return ~sum;
141     }
142    
143     static void
144     inc_addr (addr_t *addr)
145     {
146     int len = sizeof (addr_t) - 1;
147    
148     do
149     {
150     if ((*addr)[len] != 0xff)
151     {
152     ++(*addr)[len];
153     break;
154     }
155    
156     (*addr)[len] = 0;
157     }
158     while (len--);
159     }
160    
161     static void *
162     ping_proc (void *unused)
163     {
164     PKT pkt;
165     struct sockaddr_in sa4;
166     #if IPV6
167     struct sockaddr_in6 sa6;
168     #endif
169    
170     memset (&pkt, 0, sizeof (pkt));
171    
172     memset (&sa4, 0, sizeof (sa4));
173     sa4.sin_family = AF_INET;
174 root 1.3 sa4.sin_port = 0;
175 root 1.1 #if IPV6
176     memset (&sa6, 0, sizeof (sa6));
177     sa6.sin6_family = AF_INET6;
178 root 1.3 sa6.sin6_port = 0;
179 root 1.1 #endif
180    
181     for (;;)
182     {
183     REQ *req;
184     int len = read (thr_send [0], &req, sizeof (req));
185    
186     if (!len)
187     pthread_exit (0);
188     else if (len != sizeof (req))
189     {
190     perror ("Net::FPing: short reead or read error");
191     pthread_exit ((void *)-1);
192     }
193    
194     //TODO: bind to source address
195    
196     pkt.code = 0;
197 root 1.7 pkt.id = (uint16_t)magic;
198     pkt.seq = (uint16_t)~magic;
199 root 1.1 pkt.payload = req->payload;
200    
201 root 1.4 tstamp now = NOW ();
202     tstamp next = now;
203 root 1.1
204 root 1.5 {
205     int r;
206     for (r = req->nranges; r--; )
207     inc_addr (&req->ranges [r].hi);
208     }
209    
210 root 1.1 while (req->nranges)
211     {
212     RANGE *range = req->ranges;
213    
214     if (!memcmp (&range->lo, &range->hi, sizeof (addr_t)))
215     req->ranges [0] = req->ranges [--req->nranges];
216     else
217     {
218     // ranges [0] is always the next range to ping
219     tstamp wait = range->next - now;
220    
221     // compare with the global frequency limit
222     {
223     tstamp diff = next - now;
224    
225     if (wait < diff)
226     wait = diff;
227     else if (range)
228     next = range->next;
229     }
230    
231     if (wait > 0.)
232     {
233     struct timespec ts;
234    
235     ts.tv_sec = wait;
236     ts.tv_nsec = (wait - ts.tv_sec) * 1000000000.;
237    
238     nanosleep (&ts, 0);
239     }
240    
241 root 1.4 now = NOW ();
242    
243 root 1.1 pkt.stamp = now;
244     pkt.cksum = 0;
245    
246     if (range->family == AF_INET)
247     {
248     pkt.type = ICMP4_ECHO;
249 root 1.3 pkt.cksum = icmp_cksum (&pkt, sizeof (pkt));
250 root 1.1
251     memcpy (&sa4.sin_addr,
252     sizeof (addr_t) - sizeof (sa4.sin_addr) + (char *)&range->lo,
253     sizeof (sa4.sin_addr));
254    
255 root 1.3 if (sendto (icmp4_fd, &pkt, sizeof (pkt), 0, (struct sockaddr *)&sa4, sizeof (sa4)) > 0)
256 root 1.1 errno = 0;
257     }
258     else
259     {
260     #if IPV6
261     pkt.type = ICMP6_ECHO;
262    
263     memcpy (&sa6.sin6_addr,
264     sizeof (addr_t) - sizeof (sa6.sin6_addr) + (char *)&range->lo,
265     sizeof (sa6.sin6_addr));
266    
267 root 1.3 if (sendto (icmp6_fd, &pkt, sizeof (pkt), 0, (struct sockaddr *)&sa6, sizeof (sa6)) > 0)
268 root 1.1 errno = 0;
269     #endif
270     }
271    
272     if (errno == ENOBUFS)
273     {
274     struct timespec ts;
275    
276     ts.tv_sec = 0;
277     ts.tv_nsec = DRAIN_INTERVAL * 1000000000;
278    
279     nanosleep (&ts, 0);
280     }
281     else
282     {
283     inc_addr (&range->lo);
284    
285     range->next = next;
286     range->next += range->interval;
287     }
288    
289     next += req->interval;
290     }
291    
292     // make a downheap operation
293     int k = 0;
294     int n = 0;
295     for (;;)
296     {
297     ++n;
298     int j = k * 2 + 1;
299    
300     if (j >= req->nranges)
301     break;
302     else if (j < req->nranges - 1)
303     if (req->ranges [j].next > req->ranges [j + 1].next)
304     ++j;
305    
306     if (req->ranges [j].next >= req->ranges [k].next)
307     break;
308    
309     RANGE temp = req->ranges [k];
310     req->ranges [k] = req->ranges [j];
311     req->ranges [j] = temp;
312    
313     k = j;
314     }
315     }
316    
317     write (thr_recv [1], &req, sizeof (req));
318     }
319    
320     return 0;
321     }
322    
323     static void
324 root 1.3 feed_reply (AV *res_av)
325     {
326     if (av_len (res_av) < 0)
327     return;
328    
329     dSP;
330     SV *res = sv_2mortal (newRV_inc ((SV *)res_av));
331     int i;
332    
333     ENTER;
334     SAVETMPS;
335    
336     for (i = av_len (cbs) + 1; i--; )
337     {
338     SV *cb = *av_fetch (cbs, i, 1);
339    
340     PUSHMARK (SP);
341     XPUSHs (res);
342     PUTBACK;
343     call_sv (cb, G_DISCARD | G_VOID);
344     }
345    
346     FREETMPS;
347     LEAVE;
348     }
349    
350     static void
351 root 1.1 boot ()
352     {
353     sigset_t fullsigset, oldsigset;
354     pthread_attr_t attr;
355    
356     if (pipe (thr_send) < 0)
357     croak ("Net::FPing: unable to create send pipe");
358    
359     if (pipe (thr_recv) < 0)
360     croak ("Net::FPing: unable to create receive pipe");
361    
362     icmp4_fd = socket (AF_INET, SOCK_RAW, IPPROTO_ICMP);
363 root 1.2 #ifdef ICMP_FILTER
364     {
365 root 1.4 struct icmp_filter oval;
366     oval.data = 0xffffffff & ~(1 << ICMP4_ECHO_REPLY);
367     setsockopt (icmp4_fd, SOL_RAW, ICMP_FILTER, &oval, sizeof oval);
368 root 1.2 }
369     #endif
370    
371 root 1.5 #if IPV6
372     icmp6_fd = socket (AF_INET6, SOCK_RAW, IPPROTO_ICMPV6);
373     # ifdef ICMP6_FILTER
374     {
375     struct icmp6_filter oval;
376     ICMP6_FILTER_SETBLOCKALL (&oval);
377     ICMP6_FILTER_SETPASS (ICMP6_ECHO_REPLY, &oval);
378     setsockopt (icmp6_fd, IPPROTO_ICMPV6, ICMP6_FILTER, &oval, sizeof oval);
379     }
380     # endif
381     #endif
382    
383 root 1.1 pthread_attr_init (&attr);
384     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
385     #ifdef PTHREAD_SCOPE_PROCESS
386     pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS);
387     #endif
388    
389     sigfillset (&fullsigset);
390    
391     pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);
392    
393     if (pthread_create (&pthrid, &attr, ping_proc, 0))
394     croak ("Net::FPing: unable to create pinger thread");
395    
396     pthread_sigmask (SIG_SETMASK, &oldsigset, 0);
397    
398     sv_setiv (get_sv ("Net::FPing::THR_REQ_FD", 1), thr_send [1]);
399     sv_setiv (get_sv ("Net::FPing::THR_RES_FD", 1), thr_recv [0]);
400    
401     sv_setiv (get_sv ("Net::FPing::ICMP4_FD", 1), icmp4_fd);
402     sv_setiv (get_sv ("Net::FPing::ICMP6_FD", 1), icmp6_fd);
403     }
404    
405     MODULE = Net::FPing PACKAGE = Net::FPing
406    
407     BOOT:
408 root 1.2 {
409     HV *stash = gv_stashpv ("Net::FPing", 1);
410    
411 root 1.3 cbs = get_av ("Net::FPing::CB", 1);
412 root 1.7 magic = getpid () ^ MAGIC;
413 root 1.3
414 root 1.1 boot ();
415    
416 root 1.2 newCONSTSUB (stash, "ipv4_supported", newSViv (icmp4_fd >= 0));
417     newCONSTSUB (stash, "ipv6_supported", newSViv (icmp6_fd >= 0));
418    
419 root 1.4 newCONSTSUB (stash, "icmp4_pktsize", newSViv (HDR_SIZE_IP4 + sizeof (PKT)));
420     newCONSTSUB (stash, "icmp6_pktsize", newSViv (HDR_SIZE_IP6 + sizeof (PKT)));
421 root 1.2 }
422    
423 root 1.1 PROTOTYPES: DISABLE
424    
425     SV *
426 root 1.2 _req_icmp_ping (SV *ranges, NV interval, U32 payload, SV *id)
427 root 1.1 CODE:
428     {
429     if (!SvROK (ranges) || SvTYPE (SvRV (ranges)) != SVt_PVAV)
430     croak ("address ranges must be given as arrayref with lo, hi pairs");
431    
432     AV *rav = (AV *)SvRV (ranges);
433     int nranges = av_len (rav) + 1;
434    
435     REQ *req = malloc (sizeof (REQ));
436     int i;
437    
438     if (interval < MIN_INTERVAL)
439     interval = MIN_INTERVAL;
440    
441     req->id = newSVsv (id);
442     req->interval = interval;
443     req->payload = payload;
444     req->nranges = nranges;
445     req->ranges = (RANGE *)malloc (nranges * sizeof (RANGE));
446    
447     while (nranges--)
448     {
449     SV *sv = *av_fetch (rav, nranges, 1);
450    
451     if (!SvROK (sv) || SvTYPE (SvRV (sv)) != SVt_PVAV)
452     croak ("address range must be given as arrayref with lo, hi, interval arrayrefs");
453    
454     AV *av = (AV *)SvRV (sv);
455     RANGE *r = req->ranges + nranges;
456    
457     SV *lo = *av_fetch (av, 0, 1);
458     SV *hi = *av_fetch (av, 1, 1);
459    
460     sv_utf8_downgrade (lo, 0);
461     sv_utf8_downgrade (hi, 0);
462    
463     memset (&r->lo, 0, sizeof (addr_t));
464     memset (&r->hi, 0, sizeof (addr_t));
465    
466     if (SvPOKp (lo) && SvPOKp (hi))
467     {
468     if (SvCUR (lo) != SvCUR (hi))
469     croak ("addresses in range must be of the same size (either 4 or 16 bytes)");
470    
471     if (SvCUR (lo) == 4)
472     {
473     r->family = AF_INET;
474     memcpy (sizeof (addr_t) - 4 + (char *)&r->lo, SvPVX (lo), 4);
475     memcpy (sizeof (addr_t) - 4 + (char *)&r->hi, SvPVX (hi), 4);
476     }
477     else if (SvCUR (lo) == 16)
478     {
479     #if IPV6
480     r->family = AF_INET6;
481     memcpy (&r->lo, SvPVX (lo), sizeof (addr_t));
482     memcpy (&r->hi, SvPVX (hi), sizeof (addr_t));
483     #else
484     croak ("IPv6 not supported in this configuration");
485     #endif
486     }
487     else
488     croak ("addresses in range must be either 4 (IPv4) or 16 (IPV6) bytes in length");
489     }
490     else if (SvIOK (lo) && SvIOK (hi))
491     {
492     r->family = AF_INET;
493    
494     uint32_t addr;
495     addr = htonl (SvUV (lo)); memcpy (sizeof (addr_t) - 4 + (char *)&r->lo, &addr, 4);
496     addr = htonl (SvUV (hi)); memcpy (sizeof (addr_t) - 4 + (char *)&r->hi, &addr, 4);
497     }
498     else
499     croak ("addresses in range must be strings with either 4 (IPv4) or 16 (IPv6) octets");
500    
501 root 1.3 if (r->family == AF_INET)
502 root 1.1 {
503     if (icmp4_fd < 0)
504     croak ("Net::FPing: IPv4 ping support not available on this system");
505     }
506     else
507     {
508     if (icmp6_fd < 0)
509     croak ("Net::FPing: IPv6 ping support not available on this system");
510     }
511    
512     r->interval = SvNV (*av_fetch (av, 2, 1));
513    
514     if (r->interval < req->interval)
515     r->interval = req->interval;
516    
517     r->next = 0.;
518     }
519    
520     RETVAL = newSVpvn ((char *)&req, sizeof (req));
521     }
522     OUTPUT:
523     RETVAL
524    
525     SV *
526     _read_res ()
527     CODE:
528     {
529     REQ *req;
530    
531     if (read (thr_recv [0], &req, sizeof (req)) != sizeof (req))
532     RETVAL = &PL_sv_undef;
533    
534     RETVAL = req->id;
535     free (req->ranges);
536     free (req);
537     }
538     OUTPUT:
539     RETVAL
540    
541     void
542     _recv_icmp4 (...)
543     CODE:
544     {
545     char buf [512];
546     struct sockaddr_in sa;
547     socklen_t sl = sizeof (sa);
548 root 1.6 AV *res_av = av_len (cbs) < 0 ? 0 : (AV *)sv_2mortal ((SV *)newAV ());
549 root 1.3 tstamp now = NOW ();
550 root 1.1
551     for (;;)
552     {
553     int len = recvfrom (icmp4_fd, buf, sizeof (buf), MSG_DONTWAIT | MSG_TRUNC, &sa, &sl);
554    
555     if (len <= HDR_SIZE_IP4)
556     break;
557    
558     IP4HDR *iphdr = (IP4HDR *)buf;
559    
560     int hdrlen = (iphdr->version_ihl & 15) * 4;
561     int totlen = ntohs (iphdr->tot_len);
562    
563     // packet corrupt?
564 root 1.6 if (!res_av
565     || totlen > len
566 root 1.1 || iphdr->protocol != IPPROTO_ICMP
567     || hdrlen < HDR_SIZE_IP4 || hdrlen + sizeof (PKT) != totlen)
568     continue;
569    
570     PKT *pkt = (PKT *)(buf + hdrlen);
571    
572 root 1.4 if (pkt->type != ICMP4_ECHO_REPLY
573 root 1.7 || pkt->id != (uint16_t) magic
574     || pkt->seq != (uint16_t)~magic
575 root 1.1 || !isnormal (pkt->stamp))
576     continue;
577    
578 root 1.3 AV *av = newAV ();
579     av_push (av, newSVpvn ((char *)&sa.sin_addr, 4));
580     av_push (av, newSVnv (now - pkt->stamp));
581     av_push (av, newSVuv (pkt->payload));
582    
583     av_push (res_av, newRV_noinc ((SV *)av));
584 root 1.1 }
585    
586 root 1.6 if (res_av)
587     feed_reply (res_av);
588 root 1.1 }
589    
590     void
591     _recv_icmp6 (...)
592     CODE:
593     {
594 root 1.3 struct sockaddr_in6 sa;
595 root 1.1 socklen_t sl = sizeof (sa);
596 root 1.6 AV *res_av = av_len (cbs) < 0 ? 0 : (AV *)sv_2mortal ((SV *)newAV ());
597 root 1.3 PKT pkt;
598     tstamp now = NOW ();
599 root 1.1
600     for (;;)
601     {
602 root 1.3 int len = recvfrom (icmp6_fd, &pkt, sizeof (pkt), MSG_DONTWAIT | MSG_TRUNC, &sa, &sl);
603 root 1.1
604 root 1.3 if (len != sizeof (PKT))
605 root 1.1 break;
606    
607 root 1.6 if (!res_av
608     || pkt.type != ICMP6_ECHO_REPLY
609 root 1.7 || pkt.id != (uint16_t) magic
610     || pkt.seq != (uint16_t)~magic
611 root 1.3 || !isnormal (pkt.stamp))
612 root 1.1 continue;
613    
614 root 1.3 AV *av = newAV ();
615     av_push (av, newSVpvn ((char *)&sa.sin6_addr, 16));
616     av_push (av, newSVnv (now - pkt.stamp));
617     av_push (av, newSVuv (pkt.payload));
618 root 1.1
619 root 1.3 av_push (res_av, newRV_noinc ((SV *)av));
620 root 1.1 }
621    
622 root 1.6 if (res_av)
623     feed_reply (res_av);
624 root 1.1 }
625