ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FastPing/FPing.xs
Revision: 1.3
Committed: Fri May 4 13:50:02 2007 UTC (17 years ago) by root
Branch: MAIN
Changes since 1.2: +67 -42 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #define _POSIX_C_SOURCE 199309
2 #define _GNU_SOURCE 1
3
4 #define IPV6 1 // if you get compilation problems try to disable IPv6
5
6 #include "EXTERN.h"
7 #include "perl.h"
8 #include "XSUB.h"
9
10 #include <pthread.h>
11
12 #include <math.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <string.h>
16
17 #include <time.h>
18 #include <poll.h>
19 #include <unistd.h>
20 #include <inttypes.h>
21 #include <fcntl.h>
22 #include <errno.h>
23
24 #include <sys/types.h>
25 #include <sys/time.h>
26 #include <sys/socket.h>
27
28 #include <netinet/in.h>
29 #include <arpa/inet.h>
30
31 #define ICMP4_ECHO 8
32 #define ICMP6_ECHO 128
33
34 #define DRAIN_INTERVAL .000001 // how long to wait when sendto returns ENOBUFS, in seconds
35 #define MIN_INTERVAL .000001 // minimum packet send interval, in seconds
36
37 #define HDR_SIZE_IP4 20
38 #define HDR_SIZE_IP6 48
39
40 //TODO: xread/xwrite for atomicity? we currently rely on the fact that the pip biffersize divides exactly by pointer sizes
41
42 typedef uint8_t addr_t[16];
43
44 typedef double tstamp;
45
46 tstamp
47 NOW ()
48 {
49 struct timeval tv;
50 gettimeofday (&tv, 0);
51 return tv.tv_sec + tv.tv_usec * 0.000001;
52 }
53
54 typedef struct {
55 int family;
56 addr_t lo, hi;
57 double interval;
58 tstamp next;
59 } RANGE;
60
61 typedef struct {
62 SV *id;
63 double interval;
64 int nranges;
65 RANGE *ranges;
66 uint32_t payload;
67 } REQ;
68
69 typedef struct {
70 uint8_t version_ihl;
71 uint8_t tos;
72 uint16_t tot_len;
73
74 uint16_t id;
75 uint16_t flags;
76
77 uint8_t ttl;
78 uint8_t protocol;
79 uint16_t cksum;
80
81 uint32_t src;
82 uint32_t dst;
83 } IP4HDR;
84
85 typedef struct {
86 uint8_t version;
87 uint8_t x1, x2, x3;
88
89 uint16_t payload_len;
90 uint8_t nxt_hdr;
91 uint8_t hop_limit;
92
93 uint8_t src[16];
94 uint8_t dst[16];
95 } IP6HDR;
96
97 #define MAGIC 0xca4c
98
99 typedef struct {
100 uint8_t type, code;
101 uint16_t cksum;
102 uint16_t id, seq;
103 uint32_t payload;
104 tstamp stamp; // be careful when accessing this
105 } PKT;
106
107 static pthread_t pthrid;
108 static int thr_send[2]; // send to worker
109 static int thr_recv[2]; // receive from worker
110
111 static int icmp4_fd, icmp6_fd;
112
113 static AV *cbs;
114
115 static uint16_t
116 icmp_cksum (void *data, unsigned int len)
117 {
118 register int sum = 0;
119 uint16_t *wp;
120
121 assert (~len & 1);
122
123 for (wp = (uint16_t *)data; len; wp++, len -= 2)
124 sum += *wp;
125
126 sum = (sum >> 16) + (sum & 0xffff); /* add high 16 to low 16 */
127 sum += sum >> 16; /* add carry */
128
129 return ~sum;
130 }
131
132 static void
133 inc_addr (addr_t *addr)
134 {
135 int len = sizeof (addr_t) - 1;
136
137 do
138 {
139 if ((*addr)[len] != 0xff)
140 {
141 ++(*addr)[len];
142 break;
143 }
144
145 (*addr)[len] = 0;
146 }
147 while (len--);
148 }
149
150 static void *
151 ping_proc (void *unused)
152 {
153 PKT pkt;
154 struct sockaddr_in sa4;
155 #if IPV6
156 struct sockaddr_in6 sa6;
157 #endif
158
159 memset (&pkt, 0, sizeof (pkt));
160
161 memset (&sa4, 0, sizeof (sa4));
162 sa4.sin_family = AF_INET;
163 sa4.sin_port = 0;
164 #if IPV6
165 memset (&sa6, 0, sizeof (sa6));
166 sa6.sin6_family = AF_INET6;
167 sa6.sin6_port = 0;
168 #endif
169
170 for (;;)
171 {
172 REQ *req;
173 int len = read (thr_send [0], &req, sizeof (req));
174
175 if (!len)
176 pthread_exit (0);
177 else if (len != sizeof (req))
178 {
179 perror ("Net::FPing: short reead or read error");
180 pthread_exit ((void *)-1);
181 }
182
183 //TODO: bind to source address
184
185 pkt.code = 0;
186 pkt.id = (uint16_t)MAGIC;
187 pkt.seq = (uint16_t)~MAGIC;
188 pkt.payload = req->payload;
189
190 tstamp next = NOW ();
191
192 while (req->nranges)
193 {
194 RANGE *range = req->ranges;
195
196 if (!memcmp (&range->lo, &range->hi, sizeof (addr_t)))
197 req->ranges [0] = req->ranges [--req->nranges];
198 else
199 {
200 tstamp now = NOW ();
201
202 // ranges [0] is always the next range to ping
203 tstamp wait = range->next - now;
204
205 // compare with the global frequency limit
206 {
207 tstamp diff = next - now;
208
209 if (wait < diff)
210 wait = diff;
211 else if (range)
212 next = range->next;
213 }
214
215 if (wait > 0.)
216 {
217 struct timespec ts;
218
219 ts.tv_sec = wait;
220 ts.tv_nsec = (wait - ts.tv_sec) * 1000000000.;
221
222 nanosleep (&ts, 0);
223 }
224
225 pkt.stamp = now;
226 pkt.cksum = 0;
227
228 if (range->family == AF_INET)
229 {
230 pkt.type = ICMP4_ECHO;
231 pkt.cksum = icmp_cksum (&pkt, sizeof (pkt));
232
233 memcpy (&sa4.sin_addr,
234 sizeof (addr_t) - sizeof (sa4.sin_addr) + (char *)&range->lo,
235 sizeof (sa4.sin_addr));
236
237 if (sendto (icmp4_fd, &pkt, sizeof (pkt), 0, (struct sockaddr *)&sa4, sizeof (sa4)) > 0)
238 errno = 0;
239 }
240 else
241 {
242 #if IPV6
243 pkt.type = ICMP6_ECHO;
244
245 memcpy (&sa6.sin6_addr,
246 sizeof (addr_t) - sizeof (sa6.sin6_addr) + (char *)&range->lo,
247 sizeof (sa6.sin6_addr));
248
249 if (sendto (icmp6_fd, &pkt, sizeof (pkt), 0, (struct sockaddr *)&sa6, sizeof (sa6)) > 0)
250 errno = 0;
251 #endif
252 }
253
254 if (errno == ENOBUFS)
255 {
256 struct timespec ts;
257
258 ts.tv_sec = 0;
259 ts.tv_nsec = DRAIN_INTERVAL * 1000000000;
260
261 nanosleep (&ts, 0);
262 }
263 else
264 {
265 inc_addr (&range->lo);
266
267 range->next = next;
268 range->next += range->interval;
269 }
270
271 next += req->interval;
272 }
273
274 // make a downheap operation
275 int k = 0;
276 int n = 0;
277 for (;;)
278 {
279 ++n;
280 int j = k * 2 + 1;
281
282 if (j >= req->nranges)
283 break;
284 else if (j < req->nranges - 1)
285 if (req->ranges [j].next > req->ranges [j + 1].next)
286 ++j;
287
288 if (req->ranges [j].next >= req->ranges [k].next)
289 break;
290
291 RANGE temp = req->ranges [k];
292 req->ranges [k] = req->ranges [j];
293 req->ranges [j] = temp;
294
295 k = j;
296 }
297 }
298
299 write (thr_recv [1], &req, sizeof (req));
300 }
301
302 return 0;
303 }
304
305 static void
306 feed_reply (AV *res_av)
307 {
308 if (av_len (res_av) < 0)
309 return;
310
311 dSP;
312 SV *res = sv_2mortal (newRV_inc ((SV *)res_av));
313 int i;
314
315 ENTER;
316 SAVETMPS;
317
318 for (i = av_len (cbs) + 1; i--; )
319 {
320 SV *cb = *av_fetch (cbs, i, 1);
321
322 PUSHMARK (SP);
323 XPUSHs (res);
324 PUTBACK;
325 call_sv (cb, G_DISCARD | G_VOID);
326 }
327
328 FREETMPS;
329 LEAVE;
330 }
331
332 static void
333 boot ()
334 {
335 sigset_t fullsigset, oldsigset;
336 pthread_attr_t attr;
337
338 if (pipe (thr_send) < 0)
339 croak ("Net::FPing: unable to create send pipe");
340
341 if (pipe (thr_recv) < 0)
342 croak ("Net::FPing: unable to create receive pipe");
343
344 icmp4_fd = socket (AF_INET, SOCK_RAW, IPPROTO_ICMP);
345 #if IPV6
346 icmp6_fd = socket (AF_INET6, SOCK_RAW, IPPROTO_ICMPV6);
347 #endif
348
349 #ifdef ICMP_FILTER
350 {
351 icmp_filter oval;
352 oval.data = 0xffffffff & ~(1 << ICMP4_ECHO);
353 setsockopt (icmpv4_fd, SOL_RAW, ICMP_FILTER, &oval, sizeof oval);
354 }
355 #endif
356
357 pthread_attr_init (&attr);
358 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
359 #ifdef PTHREAD_SCOPE_PROCESS
360 pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS);
361 #endif
362
363 sigfillset (&fullsigset);
364
365 pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);
366
367 if (pthread_create (&pthrid, &attr, ping_proc, 0))
368 croak ("Net::FPing: unable to create pinger thread");
369
370 pthread_sigmask (SIG_SETMASK, &oldsigset, 0);
371
372 sv_setiv (get_sv ("Net::FPing::THR_REQ_FD", 1), thr_send [1]);
373 sv_setiv (get_sv ("Net::FPing::THR_RES_FD", 1), thr_recv [0]);
374
375 sv_setiv (get_sv ("Net::FPing::ICMP4_FD", 1), icmp4_fd);
376 sv_setiv (get_sv ("Net::FPing::ICMP6_FD", 1), icmp6_fd);
377 }
378
379 MODULE = Net::FPing PACKAGE = Net::FPing
380
381 BOOT:
382 {
383 HV *stash = gv_stashpv ("Net::FPing", 1);
384
385 cbs = get_av ("Net::FPing::CB", 1);
386
387 boot ();
388
389 newCONSTSUB (stash, "ipv4_supported", newSViv (icmp4_fd >= 0));
390 newCONSTSUB (stash, "ipv6_supported", newSViv (icmp6_fd >= 0));
391
392 newCONSTSUB (stash, "ipv4_pktsize", newSViv (HDR_SIZE_IP4 + sizeof (PKT)));
393 newCONSTSUB (stash, "ipv6_pktsize", newSViv (HDR_SIZE_IP6 + sizeof (PKT)));
394 }
395
396 PROTOTYPES: DISABLE
397
398 SV *
399 _req_icmp_ping (SV *ranges, NV interval, U32 payload, SV *id)
400 CODE:
401 {
402 if (!SvROK (ranges) || SvTYPE (SvRV (ranges)) != SVt_PVAV)
403 croak ("address ranges must be given as arrayref with lo, hi pairs");
404
405 AV *rav = (AV *)SvRV (ranges);
406 int nranges = av_len (rav) + 1;
407
408 REQ *req = malloc (sizeof (REQ));
409 int i;
410
411 if (interval < MIN_INTERVAL)
412 interval = MIN_INTERVAL;
413
414 req->id = newSVsv (id);
415 req->interval = interval;
416 req->payload = payload;
417 req->nranges = nranges;
418 req->ranges = (RANGE *)malloc (nranges * sizeof (RANGE));
419
420 while (nranges--)
421 {
422 SV *sv = *av_fetch (rav, nranges, 1);
423
424 if (!SvROK (sv) || SvTYPE (SvRV (sv)) != SVt_PVAV)
425 croak ("address range must be given as arrayref with lo, hi, interval arrayrefs");
426
427 AV *av = (AV *)SvRV (sv);
428 RANGE *r = req->ranges + nranges;
429
430 SV *lo = *av_fetch (av, 0, 1);
431 SV *hi = *av_fetch (av, 1, 1);
432
433 sv_utf8_downgrade (lo, 0);
434 sv_utf8_downgrade (hi, 0);
435
436 memset (&r->lo, 0, sizeof (addr_t));
437 memset (&r->hi, 0, sizeof (addr_t));
438
439 if (SvPOKp (lo) && SvPOKp (hi))
440 {
441 if (SvCUR (lo) != SvCUR (hi))
442 croak ("addresses in range must be of the same size (either 4 or 16 bytes)");
443
444 if (SvCUR (lo) == 4)
445 {
446 r->family = AF_INET;
447 memcpy (sizeof (addr_t) - 4 + (char *)&r->lo, SvPVX (lo), 4);
448 memcpy (sizeof (addr_t) - 4 + (char *)&r->hi, SvPVX (hi), 4);
449 }
450 else if (SvCUR (lo) == 16)
451 {
452 #if IPV6
453 r->family = AF_INET6;
454 memcpy (&r->lo, SvPVX (lo), sizeof (addr_t));
455 memcpy (&r->hi, SvPVX (hi), sizeof (addr_t));
456 #else
457 croak ("IPv6 not supported in this configuration");
458 #endif
459 }
460 else
461 croak ("addresses in range must be either 4 (IPv4) or 16 (IPV6) bytes in length");
462 }
463 else if (SvIOK (lo) && SvIOK (hi))
464 {
465 r->family = AF_INET;
466
467 uint32_t addr;
468 addr = htonl (SvUV (lo)); memcpy (sizeof (addr_t) - 4 + (char *)&r->lo, &addr, 4);
469 addr = htonl (SvUV (hi)); memcpy (sizeof (addr_t) - 4 + (char *)&r->hi, &addr, 4);
470 }
471 else
472 croak ("addresses in range must be strings with either 4 (IPv4) or 16 (IPv6) octets");
473
474 if (r->family == AF_INET)
475 {
476 if (icmp4_fd < 0)
477 croak ("Net::FPing: IPv4 ping support not available on this system");
478 }
479 else
480 {
481 if (icmp6_fd < 0)
482 croak ("Net::FPing: IPv6 ping support not available on this system");
483 }
484
485 r->interval = SvNV (*av_fetch (av, 2, 1));
486
487 if (r->interval < req->interval)
488 r->interval = req->interval;
489
490 r->next = 0.;
491 }
492
493 RETVAL = newSVpvn ((char *)&req, sizeof (req));
494 }
495 OUTPUT:
496 RETVAL
497
498 SV *
499 _read_res ()
500 CODE:
501 {
502 REQ *req;
503
504 if (read (thr_recv [0], &req, sizeof (req)) != sizeof (req))
505 RETVAL = &PL_sv_undef;
506
507 RETVAL = req->id;
508 free (req->ranges);
509 free (req);
510 }
511 OUTPUT:
512 RETVAL
513
514 void
515 _recv_icmp4 (...)
516 CODE:
517 {
518 char buf [512];
519 struct sockaddr_in sa;
520 socklen_t sl = sizeof (sa);
521 AV *res_av = newAV ();
522 SV *res_rv = sv_2mortal (newRV_noinc ((SV *)res_av));
523 tstamp now = NOW ();
524
525 for (;;)
526 {
527 int len = recvfrom (icmp4_fd, buf, sizeof (buf), MSG_DONTWAIT | MSG_TRUNC, &sa, &sl);
528
529 if (len <= HDR_SIZE_IP4)
530 break;
531
532 IP4HDR *iphdr = (IP4HDR *)buf;
533
534 int hdrlen = (iphdr->version_ihl & 15) * 4;
535 int totlen = ntohs (iphdr->tot_len);
536
537 // packet corrupt?
538 if (totlen > len
539 || iphdr->protocol != IPPROTO_ICMP
540 || hdrlen < HDR_SIZE_IP4 || hdrlen + sizeof (PKT) != totlen)
541 continue;
542
543 PKT *pkt = (PKT *)(buf + hdrlen);
544
545 if (pkt->id != (uint16_t)MAGIC
546 || pkt->seq != (uint16_t)~MAGIC
547 || pkt->type != ICMP4_ECHO
548 || !isnormal (pkt->stamp))
549 continue;
550
551 AV *av = newAV ();
552 av_push (av, newSVpvn ((char *)&sa.sin_addr, 4));
553 av_push (av, newSVnv (now - pkt->stamp));
554 av_push (av, newSVuv (pkt->payload));
555
556 av_push (res_av, newRV_noinc ((SV *)av));
557 }
558
559 feed_reply (res_av);
560 }
561
562 void
563 _recv_icmp6 (...)
564 CODE:
565 {
566 struct sockaddr_in6 sa;
567 socklen_t sl = sizeof (sa);
568 AV *res_av = (AV *)sv_2mortal ((SV *)newAV ());
569 PKT pkt;
570 tstamp now = NOW ();
571
572 for (;;)
573 {
574 int len = recvfrom (icmp6_fd, &pkt, sizeof (pkt), MSG_DONTWAIT | MSG_TRUNC, &sa, &sl);
575
576 if (len != sizeof (PKT))
577 break;
578
579 if (pkt.type != ICMP6_ECHO
580 || pkt.id != (uint16_t)MAGIC
581 || pkt.seq != (uint16_t)~MAGIC
582 || !isnormal (pkt.stamp))
583 continue;
584
585 AV *av = newAV ();
586 av_push (av, newSVpvn ((char *)&sa.sin6_addr, 16));
587 av_push (av, newSVnv (now - pkt.stamp));
588 av_push (av, newSVuv (pkt.payload));
589
590 av_push (res_av, newRV_noinc ((SV *)av));
591 }
592
593 feed_reply (res_av);
594 }
595