ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FastPing/FPing.xs
Revision: 1.1
Committed: Fri May 4 07:02:19 2007 UTC (17 years ago) by root
Branch: MAIN
Log Message:
initial check-in

File Contents

# User Rev Content
1 root 1.1 #define _POSIX_C_SOURCE 199309
2     #define _GNU_SOURCE 1
3    
4     #define IPV6 1
5    
6     #include "EXTERN.h"
7     #include "perl.h"
8     #include "XSUB.h"
9    
10     #include <pthread.h>
11    
12     #include <math.h>
13     #include <stdio.h>
14     #include <stdlib.h>
15     #include <string.h>
16    
17     #include <time.h>
18     #include <poll.h>
19     #include <unistd.h>
20     #include <inttypes.h>
21     #include <fcntl.h>
22     #include <errno.h>
23    
24     #include <sys/types.h>
25     #include <sys/time.h>
26     #include <sys/socket.h>
27    
28     #include <netinet/in.h>
29     #include <arpa/inet.h>
30    
31     #define ICMP4_ECHO 8
32     #define ICMP6_ECHO 128
33    
34     #define DRAIN_INTERVAL .000001 // how long to wait when sendto returns ENOBUFS, in seconds
35     #define MIN_INTERVAL .000001 // minimum packet send interval, in seconds
36    
37     #define HDR_SIZE_IP4 20
38     #define HDR_SIZE_IP6 48
39    
40     //TODO: xread/xwrite for atomicity? we currently rely on the fact that the pip biffersize divides exactly by pointer sizes
41    
42     typedef uint8_t addr_t[16];
43    
44     typedef double tstamp;
45    
46     tstamp
47     NOW ()
48     {
49     struct timeval tv;
50     gettimeofday (&tv, 0);
51     return tv.tv_sec + tv.tv_usec * 0.000001;
52     }
53    
54     typedef struct {
55     int family;
56     addr_t lo, hi;
57     double interval;
58     tstamp next;
59     } RANGE;
60    
61     typedef struct {
62     int send_fd;
63     SV *id;
64     double interval;
65     int nranges;
66     RANGE *ranges;
67     uint32_t payload;
68     } REQ;
69    
70     typedef struct {
71     uint8_t version_ihl;
72     uint8_t tos;
73     uint16_t tot_len;
74    
75     uint16_t id;
76     uint16_t flags;
77    
78     uint8_t ttl;
79     uint8_t protocol;
80     uint16_t cksum;
81    
82     uint32_t src;
83     uint32_t dst;
84     } IP4HDR;
85    
86     typedef struct {
87     uint8_t version;
88     uint8_t x1, x2, x3;
89    
90     uint16_t payload_len;
91     uint8_t nxt_hdr;
92     uint8_t hop_limit;
93    
94     uint8_t src[16];
95     uint8_t dst[16];
96     } IP6HDR;
97    
98     #define MAGIC 0xca4c
99    
100     typedef struct {
101     uint8_t type, code;
102     uint16_t cksum;
103     uint16_t id, seq;
104     tstamp stamp; // be careful when accessing this
105     uint32_t payload;
106     } PKT;
107    
108     pthread_t pthrid;
109     int thr_send[2]; // send to worker
110     int thr_recv[2]; // receive from worker
111    
112     int icmp4_fd, icmp6_fd;
113    
114     uint16_t
115     icmp_cksum (void *data, unsigned int len)
116     {
117     register int sum = 0;
118     uint16_t *wp;
119    
120     assert (~len & 1);
121    
122     for (wp = (uint16_t *)data; len; wp++, len -= 2)
123     sum += *wp;
124    
125     sum = (sum >> 16) + (sum & 0xffff); /* add high 16 to low 16 */
126     sum += sum >> 16; /* add carry */
127    
128     return ~sum;
129     }
130    
131     static void
132     inc_addr (addr_t *addr)
133     {
134     int len = sizeof (addr_t) - 1;
135    
136     do
137     {
138     if ((*addr)[len] != 0xff)
139     {
140     ++(*addr)[len];
141     break;
142     }
143    
144     (*addr)[len] = 0;
145     }
146     while (len--);
147     }
148    
149     static void *
150     ping_proc (void *unused)
151     {
152     PKT pkt;
153     struct sockaddr_in sa4;
154     #if IPV6
155     struct sockaddr_in6 sa6;
156     #endif
157    
158     memset (&pkt, 0, sizeof (pkt));
159    
160     memset (&sa4, 0, sizeof (sa4));
161     sa4.sin_family = AF_INET;
162     sa4.sin_port = 0; // unused
163     #if IPV6
164     memset (&sa6, 0, sizeof (sa6));
165     sa6.sin6_family = AF_INET6;
166     sa6.sin6_port = 0; // unused
167     #endif
168    
169     for (;;)
170     {
171     REQ *req;
172     int len = read (thr_send [0], &req, sizeof (req));
173    
174     if (!len)
175     pthread_exit (0);
176     else if (len != sizeof (req))
177     {
178     perror ("Net::FPing: short reead or read error");
179     pthread_exit ((void *)-1);
180     }
181    
182     //TODO: bind to source address
183    
184     pkt.code = 0;
185     pkt.id = (uint16_t)MAGIC;
186     pkt.seq = (uint16_t)~MAGIC;
187     pkt.payload = req->payload;
188    
189     tstamp next = NOW ();
190    
191     while (req->nranges)
192     {
193     RANGE *range = req->ranges;
194    
195     if (!memcmp (&range->lo, &range->hi, sizeof (addr_t)))
196     req->ranges [0] = req->ranges [--req->nranges];
197     else
198     {
199     tstamp now = NOW ();
200    
201     // ranges [0] is always the next range to ping
202     tstamp wait = range->next - now;
203    
204     // compare with the global frequency limit
205     {
206     tstamp diff = next - now;
207    
208     if (wait < diff)
209     wait = diff;
210     else if (range)
211     next = range->next;
212     }
213    
214     if (wait > 0.)
215     {
216     struct timespec ts;
217    
218     ts.tv_sec = wait;
219     ts.tv_nsec = (wait - ts.tv_sec) * 1000000000.;
220    
221     nanosleep (&ts, 0);
222     }
223    
224     pkt.stamp = now;
225     pkt.cksum = 0;
226    
227     if (range->family == AF_INET)
228     {
229     pkt.type = ICMP4_ECHO;
230     pkt.cksum = icmp_cksum (&pkt, sizeof (pkt));
231    
232     memcpy (&sa4.sin_addr,
233     sizeof (addr_t) - sizeof (sa4.sin_addr) + (char *)&range->lo,
234     sizeof (sa4.sin_addr));
235    
236     if (sendto (req->send_fd, &pkt, sizeof (pkt), 0, (struct sockaddr *)&sa4, sizeof (sa4)) > 0)
237     errno = 0;
238     }
239     else
240     {
241     #if IPV6
242     pkt.type = ICMP6_ECHO;
243    
244     memcpy (&sa6.sin6_addr,
245     sizeof (addr_t) - sizeof (sa6.sin6_addr) + (char *)&range->lo,
246     sizeof (sa6.sin6_addr));
247    
248     if (sendto (req->send_fd, &pkt, sizeof (pkt), 0, (struct sockaddr *)&sa6, sizeof (sa6)) > 0)
249     errno = 0;
250     #endif
251     }
252    
253     if (errno == ENOBUFS)
254     {
255     struct timespec ts;
256    
257     ts.tv_sec = 0;
258     ts.tv_nsec = DRAIN_INTERVAL * 1000000000;
259    
260     nanosleep (&ts, 0);
261     }
262     else
263     {
264     inc_addr (&range->lo);
265    
266     range->next = next;
267     range->next += range->interval;
268     }
269    
270     next += req->interval;
271     }
272    
273     // make a downheap operation
274     int k = 0;
275     int n = 0;
276     for (;;)
277     {
278     ++n;
279     int j = k * 2 + 1;
280    
281     if (j >= req->nranges)
282     break;
283     else if (j < req->nranges - 1)
284     if (req->ranges [j].next > req->ranges [j + 1].next)
285     ++j;
286    
287     if (req->ranges [j].next >= req->ranges [k].next)
288     break;
289    
290     RANGE temp = req->ranges [k];
291     req->ranges [k] = req->ranges [j];
292     req->ranges [j] = temp;
293    
294     k = j;
295     }
296     }
297    
298     write (thr_recv [1], &req, sizeof (req));
299     }
300    
301     return 0;
302     }
303    
304     static void
305     boot ()
306     {
307     sigset_t fullsigset, oldsigset;
308     pthread_attr_t attr;
309    
310     if (pipe (thr_send) < 0)
311     croak ("Net::FPing: unable to create send pipe");
312    
313     if (pipe (thr_recv) < 0)
314     croak ("Net::FPing: unable to create receive pipe");
315    
316     icmp4_fd = socket (AF_INET, SOCK_RAW, IPPROTO_ICMP);
317     #if IPV6
318     icmp6_fd = socket (AF_INET6, SOCK_RAW, IPPROTO_ICMPV6);
319     #endif
320    
321     pthread_attr_init (&attr);
322     pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
323     #ifdef PTHREAD_SCOPE_PROCESS
324     pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS);
325     #endif
326    
327     sigfillset (&fullsigset);
328    
329     pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);
330    
331     if (pthread_create (&pthrid, &attr, ping_proc, 0))
332     croak ("Net::FPing: unable to create pinger thread");
333    
334     pthread_sigmask (SIG_SETMASK, &oldsigset, 0);
335    
336     sv_setiv (get_sv ("Net::FPing::THR_REQ_FD", 1), thr_send [1]);
337     sv_setiv (get_sv ("Net::FPing::THR_RES_FD", 1), thr_recv [0]);
338    
339     sv_setiv (get_sv ("Net::FPing::ICMP4_FD", 1), icmp4_fd);
340     sv_setiv (get_sv ("Net::FPing::ICMP6_FD", 1), icmp6_fd);
341     }
342    
343     MODULE = Net::FPing PACKAGE = Net::FPing
344    
345     BOOT:
346     boot ();
347    
348     PROTOTYPES: DISABLE
349    
350     SV *
351     _req_ranges4 (SV *ranges, NV interval, U32 payload, SV *id)
352     CODE:
353     {
354     if (!SvROK (ranges) || SvTYPE (SvRV (ranges)) != SVt_PVAV)
355     croak ("address ranges must be given as arrayref with lo, hi pairs");
356    
357     AV *rav = (AV *)SvRV (ranges);
358     int nranges = av_len (rav) + 1;
359    
360     REQ *req = malloc (sizeof (REQ));
361     int i;
362    
363     if (interval < MIN_INTERVAL)
364     interval = MIN_INTERVAL;
365    
366     req->send_fd = icmp4_fd;
367     req->id = newSVsv (id);
368     req->interval = interval;
369     req->payload = payload;
370     req->nranges = nranges;
371     req->ranges = (RANGE *)malloc (nranges * sizeof (RANGE));
372    
373     while (nranges--)
374     {
375     SV *sv = *av_fetch (rav, nranges, 1);
376    
377     if (!SvROK (sv) || SvTYPE (SvRV (sv)) != SVt_PVAV)
378     croak ("address range must be given as arrayref with lo, hi, interval arrayrefs");
379    
380     AV *av = (AV *)SvRV (sv);
381     RANGE *r = req->ranges + nranges;
382    
383     SV *lo = *av_fetch (av, 0, 1);
384     SV *hi = *av_fetch (av, 1, 1);
385    
386     sv_utf8_downgrade (lo, 0);
387     sv_utf8_downgrade (hi, 0);
388    
389     memset (&r->lo, 0, sizeof (addr_t));
390     memset (&r->hi, 0, sizeof (addr_t));
391    
392     if (SvPOKp (lo) && SvPOKp (hi))
393     {
394     if (SvCUR (lo) != SvCUR (hi))
395     croak ("addresses in range must be of the same size (either 4 or 16 bytes)");
396    
397     if (SvCUR (lo) == 4)
398     {
399     r->family = AF_INET;
400     memcpy (sizeof (addr_t) - 4 + (char *)&r->lo, SvPVX (lo), 4);
401     memcpy (sizeof (addr_t) - 4 + (char *)&r->hi, SvPVX (hi), 4);
402     }
403     else if (SvCUR (lo) == 16)
404     {
405     #if IPV6
406     r->family = AF_INET6;
407     memcpy (&r->lo, SvPVX (lo), sizeof (addr_t));
408     memcpy (&r->hi, SvPVX (hi), sizeof (addr_t));
409     #else
410     croak ("IPv6 not supported in this configuration");
411     #endif
412     }
413     else
414     croak ("addresses in range must be either 4 (IPv4) or 16 (IPV6) bytes in length");
415     }
416     else if (SvIOK (lo) && SvIOK (hi))
417     {
418     r->family = AF_INET;
419    
420     uint32_t addr;
421     addr = htonl (SvUV (lo)); memcpy (sizeof (addr_t) - 4 + (char *)&r->lo, &addr, 4);
422     addr = htonl (SvUV (hi)); memcpy (sizeof (addr_t) - 4 + (char *)&r->hi, &addr, 4);
423     }
424     else
425     croak ("addresses in range must be strings with either 4 (IPv4) or 16 (IPv6) octets");
426    
427     if (r->family = AF_INET)
428     {
429     if (icmp4_fd < 0)
430     croak ("Net::FPing: IPv4 ping support not available on this system");
431     }
432     else
433     {
434     if (icmp6_fd < 0)
435     croak ("Net::FPing: IPv6 ping support not available on this system");
436     }
437    
438     r->interval = SvNV (*av_fetch (av, 2, 1));
439    
440     if (r->interval < req->interval)
441     r->interval = req->interval;
442    
443     r->next = 0.;
444     }
445    
446     RETVAL = newSVpvn ((char *)&req, sizeof (req));
447     }
448     OUTPUT:
449     RETVAL
450    
451     SV *
452     _read_res ()
453     CODE:
454     {
455     REQ *req;
456    
457     if (read (thr_recv [0], &req, sizeof (req)) != sizeof (req))
458     RETVAL = &PL_sv_undef;
459    
460     RETVAL = req->id;
461     free (req->ranges);
462     free (req);
463     }
464     OUTPUT:
465     RETVAL
466    
467     void
468     _recv_icmp4 (...)
469     CODE:
470     {
471     char buf [512];
472     struct sockaddr_in sa;
473     socklen_t sl = sizeof (sa);
474     AV *res_av = newAV ();
475     SV *res_rv = sv_2mortal (newRV_noinc ((SV *)res_av));
476    
477     for (;;)
478     {
479     int len = recvfrom (icmp4_fd, buf, sizeof (buf), MSG_DONTWAIT | MSG_TRUNC, &sa, &sl);
480    
481     if (len <= HDR_SIZE_IP4)
482     break;
483    
484     IP4HDR *iphdr = (IP4HDR *)buf;
485    
486     int hdrlen = (iphdr->version_ihl & 15) * 4;
487     int totlen = ntohs (iphdr->tot_len);
488    
489     // packet corrupt?
490     if (totlen > len
491     || iphdr->protocol != IPPROTO_ICMP
492     || hdrlen < HDR_SIZE_IP4 || hdrlen + sizeof (PKT) != totlen)
493     continue;
494    
495     PKT *pkt = (PKT *)(buf + hdrlen);
496    
497     if (pkt->id != (uint16_t)MAGIC
498     || pkt->seq != (uint16_t)~MAGIC
499     || pkt->type != ICMP4_ECHO
500     || !isnormal (pkt->stamp))
501     continue;
502    
503     // just drain for now
504     //av_push
505     }
506    
507     // feed (res_av);
508     }
509    
510     void
511     _recv_icmp6 (...)
512     CODE:
513     {
514     char buf [512];
515     struct sockaddr_in sa;
516     socklen_t sl = sizeof (sa);
517     AV *res_av = (AV *)sv_2mortal ((SV *)newAV ());
518    
519     for (;;)
520     {
521     int len = recvfrom (icmp6_fd, buf, sizeof (buf), MSG_DONTWAIT | MSG_TRUNC, &sa, &sl);
522    
523     if (len <= HDR_SIZE_IP6)
524     break;
525    
526     IP6HDR *iphdr = (IP6HDR *)buf;
527    
528     int datalen = ntohs (iphdr->payload_len);
529    
530     // packet corrupt?
531     if (HDR_SIZE_IP6 + datalen > len
532     || iphdr->nxt_hdr != IPPROTO_ICMPV6
533     || HDR_SIZE_IP6 + sizeof (PKT) != datalen)
534     continue;
535    
536     PKT *pkt = (PKT *)(buf + HDR_SIZE_IP6);
537    
538     if (pkt->id != (uint16_t)MAGIC
539     || pkt->seq != (uint16_t)~MAGIC
540     || pkt->type != ICMP6_ECHO
541     || !isnormal (pkt->stamp))
542     continue;
543    
544     //fprintf (stderr, "ip6 echo received\n");
545     // just drain for now
546     //av_push
547     }
548    
549     // feed (res_av);
550     }
551