ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FastPing/FPing.xs
Revision: 1.4
Committed: Fri May 4 15:16:14 2007 UTC (17 years ago) by root
Branch: MAIN
Changes since 1.3: +21 -14 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #define _POSIX_C_SOURCE 199309
2 #define _GNU_SOURCE 1
3
4 #define IPV6 1 // if you get compilation problems try to disable IPv6
5
6 #include "EXTERN.h"
7 #include "perl.h"
8 #include "XSUB.h"
9
10 #include <pthread.h>
11
12 #include <math.h>
13 #include <stdio.h>
14 #include <stdlib.h>
15 #include <string.h>
16
17 #include <time.h>
18 #include <poll.h>
19 #include <unistd.h>
20 #include <inttypes.h>
21 #include <fcntl.h>
22 #include <errno.h>
23
24 #include <sys/types.h>
25 #include <sys/time.h>
26 #include <sys/socket.h>
27
28 #include <netinet/in.h>
29 #include <arpa/inet.h>
30
31 #ifdef __linux
32 # include <linux/icmp.h>
33 #endif
34
35 #define ICMP4_ECHO 8
36 #define ICMP4_ECHO_REPLY 0
37 #define ICMP6_ECHO 128
38 #define ICMP6_ECHO_REPLY 129
39
40 #define DRAIN_INTERVAL .000001 // how long to wait when sendto returns ENOBUFS, in seconds
41 #define MIN_INTERVAL .000001 // minimum packet send interval, in seconds
42
43 #define HDR_SIZE_IP4 20
44 #define HDR_SIZE_IP6 48
45
46 //TODO: xread/xwrite for atomicity? we currently rely on the fact that the pip biffersize divides exactly by pointer sizes
47
48 typedef uint8_t addr_t[16];
49
50 typedef double tstamp;
51
52 tstamp
53 NOW ()
54 {
55 struct timeval tv;
56 gettimeofday (&tv, 0);
57 return tv.tv_sec + tv.tv_usec * 0.000001;
58 }
59
60 typedef struct {
61 int family;
62 addr_t lo, hi;
63 double interval;
64 tstamp next;
65 } RANGE;
66
67 typedef struct {
68 SV *id;
69 double interval;
70 int nranges;
71 RANGE *ranges;
72 uint32_t payload;
73 } REQ;
74
75 typedef struct {
76 uint8_t version_ihl;
77 uint8_t tos;
78 uint16_t tot_len;
79
80 uint16_t id;
81 uint16_t flags;
82
83 uint8_t ttl;
84 uint8_t protocol;
85 uint16_t cksum;
86
87 uint32_t src;
88 uint32_t dst;
89 } IP4HDR;
90
91 typedef struct {
92 uint8_t version;
93 uint8_t x1, x2, x3;
94
95 uint16_t payload_len;
96 uint8_t nxt_hdr;
97 uint8_t hop_limit;
98
99 uint8_t src[16];
100 uint8_t dst[16];
101 } IP6HDR;
102
103 #define MAGIC 0xca4c
104
105 typedef struct {
106 uint8_t type, code;
107 uint16_t cksum;
108 uint16_t id, seq;
109 uint32_t payload;
110 tstamp stamp; // be careful when accessing this
111 } PKT;
112
113 static pthread_t pthrid;
114 static int thr_send[2]; // send to worker
115 static int thr_recv[2]; // receive from worker
116
117 static int icmp4_fd, icmp6_fd;
118
119 static AV *cbs;
120
121 static uint16_t
122 icmp_cksum (void *data, unsigned int len)
123 {
124 register int sum = 0;
125 uint16_t *wp;
126
127 assert (~len & 1);
128
129 for (wp = (uint16_t *)data; len; wp++, len -= 2)
130 sum += *wp;
131
132 sum = (sum >> 16) + (sum & 0xffff); /* add high 16 to low 16 */
133 sum += sum >> 16; /* add carry */
134
135 return ~sum;
136 }
137
138 static void
139 inc_addr (addr_t *addr)
140 {
141 int len = sizeof (addr_t) - 1;
142
143 do
144 {
145 if ((*addr)[len] != 0xff)
146 {
147 ++(*addr)[len];
148 break;
149 }
150
151 (*addr)[len] = 0;
152 }
153 while (len--);
154 }
155
156 static void *
157 ping_proc (void *unused)
158 {
159 PKT pkt;
160 struct sockaddr_in sa4;
161 #if IPV6
162 struct sockaddr_in6 sa6;
163 #endif
164
165 memset (&pkt, 0, sizeof (pkt));
166
167 memset (&sa4, 0, sizeof (sa4));
168 sa4.sin_family = AF_INET;
169 sa4.sin_port = 0;
170 #if IPV6
171 memset (&sa6, 0, sizeof (sa6));
172 sa6.sin6_family = AF_INET6;
173 sa6.sin6_port = 0;
174 #endif
175
176 for (;;)
177 {
178 REQ *req;
179 int len = read (thr_send [0], &req, sizeof (req));
180
181 if (!len)
182 pthread_exit (0);
183 else if (len != sizeof (req))
184 {
185 perror ("Net::FPing: short reead or read error");
186 pthread_exit ((void *)-1);
187 }
188
189 //TODO: bind to source address
190
191 pkt.code = 0;
192 pkt.id = (uint16_t)MAGIC;
193 pkt.seq = (uint16_t)~MAGIC;
194 pkt.payload = req->payload;
195
196 tstamp now = NOW ();
197 tstamp next = now;
198
199 while (req->nranges)
200 {
201 RANGE *range = req->ranges;
202
203 if (!memcmp (&range->lo, &range->hi, sizeof (addr_t)))
204 req->ranges [0] = req->ranges [--req->nranges];
205 else
206 {
207 // ranges [0] is always the next range to ping
208 tstamp wait = range->next - now;
209
210 // compare with the global frequency limit
211 {
212 tstamp diff = next - now;
213
214 if (wait < diff)
215 wait = diff;
216 else if (range)
217 next = range->next;
218 }
219
220 if (wait > 0.)
221 {
222 struct timespec ts;
223
224 ts.tv_sec = wait;
225 ts.tv_nsec = (wait - ts.tv_sec) * 1000000000.;
226
227 nanosleep (&ts, 0);
228 }
229
230 now = NOW ();
231
232 pkt.stamp = now;
233 pkt.cksum = 0;
234
235 if (range->family == AF_INET)
236 {
237 pkt.type = ICMP4_ECHO;
238 pkt.cksum = icmp_cksum (&pkt, sizeof (pkt));
239
240 memcpy (&sa4.sin_addr,
241 sizeof (addr_t) - sizeof (sa4.sin_addr) + (char *)&range->lo,
242 sizeof (sa4.sin_addr));
243
244 if (sendto (icmp4_fd, &pkt, sizeof (pkt), 0, (struct sockaddr *)&sa4, sizeof (sa4)) > 0)
245 errno = 0;
246 }
247 else
248 {
249 #if IPV6
250 pkt.type = ICMP6_ECHO;
251
252 memcpy (&sa6.sin6_addr,
253 sizeof (addr_t) - sizeof (sa6.sin6_addr) + (char *)&range->lo,
254 sizeof (sa6.sin6_addr));
255
256 if (sendto (icmp6_fd, &pkt, sizeof (pkt), 0, (struct sockaddr *)&sa6, sizeof (sa6)) > 0)
257 errno = 0;
258 #endif
259 }
260
261 if (errno == ENOBUFS)
262 {
263 struct timespec ts;
264
265 ts.tv_sec = 0;
266 ts.tv_nsec = DRAIN_INTERVAL * 1000000000;
267
268 nanosleep (&ts, 0);
269 }
270 else
271 {
272 inc_addr (&range->lo);
273
274 range->next = next;
275 range->next += range->interval;
276 }
277
278 next += req->interval;
279 }
280
281 // make a downheap operation
282 int k = 0;
283 int n = 0;
284 for (;;)
285 {
286 ++n;
287 int j = k * 2 + 1;
288
289 if (j >= req->nranges)
290 break;
291 else if (j < req->nranges - 1)
292 if (req->ranges [j].next > req->ranges [j + 1].next)
293 ++j;
294
295 if (req->ranges [j].next >= req->ranges [k].next)
296 break;
297
298 RANGE temp = req->ranges [k];
299 req->ranges [k] = req->ranges [j];
300 req->ranges [j] = temp;
301
302 k = j;
303 }
304 }
305
306 write (thr_recv [1], &req, sizeof (req));
307 }
308
309 return 0;
310 }
311
312 static void
313 feed_reply (AV *res_av)
314 {
315 if (av_len (res_av) < 0)
316 return;
317
318 dSP;
319 SV *res = sv_2mortal (newRV_inc ((SV *)res_av));
320 int i;
321
322 ENTER;
323 SAVETMPS;
324
325 for (i = av_len (cbs) + 1; i--; )
326 {
327 SV *cb = *av_fetch (cbs, i, 1);
328
329 PUSHMARK (SP);
330 XPUSHs (res);
331 PUTBACK;
332 call_sv (cb, G_DISCARD | G_VOID);
333 }
334
335 FREETMPS;
336 LEAVE;
337 }
338
339 static void
340 boot ()
341 {
342 sigset_t fullsigset, oldsigset;
343 pthread_attr_t attr;
344
345 if (pipe (thr_send) < 0)
346 croak ("Net::FPing: unable to create send pipe");
347
348 if (pipe (thr_recv) < 0)
349 croak ("Net::FPing: unable to create receive pipe");
350
351 icmp4_fd = socket (AF_INET, SOCK_RAW, IPPROTO_ICMP);
352 #if IPV6
353 icmp6_fd = socket (AF_INET6, SOCK_RAW, IPPROTO_ICMPV6);
354 #endif
355
356 #ifdef ICMP_FILTER
357 {
358 struct icmp_filter oval;
359 oval.data = 0xffffffff & ~(1 << ICMP4_ECHO_REPLY);
360 setsockopt (icmp4_fd, SOL_RAW, ICMP_FILTER, &oval, sizeof oval);
361 }
362 #endif
363
364 pthread_attr_init (&attr);
365 pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
366 #ifdef PTHREAD_SCOPE_PROCESS
367 pthread_attr_setscope (&attr, PTHREAD_SCOPE_PROCESS);
368 #endif
369
370 sigfillset (&fullsigset);
371
372 pthread_sigmask (SIG_SETMASK, &fullsigset, &oldsigset);
373
374 if (pthread_create (&pthrid, &attr, ping_proc, 0))
375 croak ("Net::FPing: unable to create pinger thread");
376
377 pthread_sigmask (SIG_SETMASK, &oldsigset, 0);
378
379 sv_setiv (get_sv ("Net::FPing::THR_REQ_FD", 1), thr_send [1]);
380 sv_setiv (get_sv ("Net::FPing::THR_RES_FD", 1), thr_recv [0]);
381
382 sv_setiv (get_sv ("Net::FPing::ICMP4_FD", 1), icmp4_fd);
383 sv_setiv (get_sv ("Net::FPing::ICMP6_FD", 1), icmp6_fd);
384 }
385
386 MODULE = Net::FPing PACKAGE = Net::FPing
387
388 BOOT:
389 {
390 HV *stash = gv_stashpv ("Net::FPing", 1);
391
392 cbs = get_av ("Net::FPing::CB", 1);
393
394 boot ();
395
396 newCONSTSUB (stash, "ipv4_supported", newSViv (icmp4_fd >= 0));
397 newCONSTSUB (stash, "ipv6_supported", newSViv (icmp6_fd >= 0));
398
399 newCONSTSUB (stash, "icmp4_pktsize", newSViv (HDR_SIZE_IP4 + sizeof (PKT)));
400 newCONSTSUB (stash, "icmp6_pktsize", newSViv (HDR_SIZE_IP6 + sizeof (PKT)));
401 }
402
403 PROTOTYPES: DISABLE
404
405 SV *
406 _req_icmp_ping (SV *ranges, NV interval, U32 payload, SV *id)
407 CODE:
408 {
409 if (!SvROK (ranges) || SvTYPE (SvRV (ranges)) != SVt_PVAV)
410 croak ("address ranges must be given as arrayref with lo, hi pairs");
411
412 AV *rav = (AV *)SvRV (ranges);
413 int nranges = av_len (rav) + 1;
414
415 REQ *req = malloc (sizeof (REQ));
416 int i;
417
418 if (interval < MIN_INTERVAL)
419 interval = MIN_INTERVAL;
420
421 req->id = newSVsv (id);
422 req->interval = interval;
423 req->payload = payload;
424 req->nranges = nranges;
425 req->ranges = (RANGE *)malloc (nranges * sizeof (RANGE));
426
427 while (nranges--)
428 {
429 SV *sv = *av_fetch (rav, nranges, 1);
430
431 if (!SvROK (sv) || SvTYPE (SvRV (sv)) != SVt_PVAV)
432 croak ("address range must be given as arrayref with lo, hi, interval arrayrefs");
433
434 AV *av = (AV *)SvRV (sv);
435 RANGE *r = req->ranges + nranges;
436
437 SV *lo = *av_fetch (av, 0, 1);
438 SV *hi = *av_fetch (av, 1, 1);
439
440 sv_utf8_downgrade (lo, 0);
441 sv_utf8_downgrade (hi, 0);
442
443 memset (&r->lo, 0, sizeof (addr_t));
444 memset (&r->hi, 0, sizeof (addr_t));
445
446 if (SvPOKp (lo) && SvPOKp (hi))
447 {
448 if (SvCUR (lo) != SvCUR (hi))
449 croak ("addresses in range must be of the same size (either 4 or 16 bytes)");
450
451 if (SvCUR (lo) == 4)
452 {
453 r->family = AF_INET;
454 memcpy (sizeof (addr_t) - 4 + (char *)&r->lo, SvPVX (lo), 4);
455 memcpy (sizeof (addr_t) - 4 + (char *)&r->hi, SvPVX (hi), 4);
456 }
457 else if (SvCUR (lo) == 16)
458 {
459 #if IPV6
460 r->family = AF_INET6;
461 memcpy (&r->lo, SvPVX (lo), sizeof (addr_t));
462 memcpy (&r->hi, SvPVX (hi), sizeof (addr_t));
463 #else
464 croak ("IPv6 not supported in this configuration");
465 #endif
466 }
467 else
468 croak ("addresses in range must be either 4 (IPv4) or 16 (IPV6) bytes in length");
469 }
470 else if (SvIOK (lo) && SvIOK (hi))
471 {
472 r->family = AF_INET;
473
474 uint32_t addr;
475 addr = htonl (SvUV (lo)); memcpy (sizeof (addr_t) - 4 + (char *)&r->lo, &addr, 4);
476 addr = htonl (SvUV (hi)); memcpy (sizeof (addr_t) - 4 + (char *)&r->hi, &addr, 4);
477 }
478 else
479 croak ("addresses in range must be strings with either 4 (IPv4) or 16 (IPv6) octets");
480
481 if (r->family == AF_INET)
482 {
483 if (icmp4_fd < 0)
484 croak ("Net::FPing: IPv4 ping support not available on this system");
485 }
486 else
487 {
488 if (icmp6_fd < 0)
489 croak ("Net::FPing: IPv6 ping support not available on this system");
490 }
491
492 r->interval = SvNV (*av_fetch (av, 2, 1));
493
494 if (r->interval < req->interval)
495 r->interval = req->interval;
496
497 r->next = 0.;
498 }
499
500 RETVAL = newSVpvn ((char *)&req, sizeof (req));
501 }
502 OUTPUT:
503 RETVAL
504
505 SV *
506 _read_res ()
507 CODE:
508 {
509 REQ *req;
510
511 if (read (thr_recv [0], &req, sizeof (req)) != sizeof (req))
512 RETVAL = &PL_sv_undef;
513
514 RETVAL = req->id;
515 free (req->ranges);
516 free (req);
517 }
518 OUTPUT:
519 RETVAL
520
521 void
522 _recv_icmp4 (...)
523 CODE:
524 {
525 char buf [512];
526 struct sockaddr_in sa;
527 socklen_t sl = sizeof (sa);
528 AV *res_av = newAV ();
529 SV *res_rv = sv_2mortal (newRV_noinc ((SV *)res_av));
530 tstamp now = NOW ();
531
532 for (;;)
533 {
534 int len = recvfrom (icmp4_fd, buf, sizeof (buf), MSG_DONTWAIT | MSG_TRUNC, &sa, &sl);
535
536 if (len <= HDR_SIZE_IP4)
537 break;
538
539 IP4HDR *iphdr = (IP4HDR *)buf;
540
541 int hdrlen = (iphdr->version_ihl & 15) * 4;
542 int totlen = ntohs (iphdr->tot_len);
543
544 // packet corrupt?
545 if (totlen > len
546 || iphdr->protocol != IPPROTO_ICMP
547 || hdrlen < HDR_SIZE_IP4 || hdrlen + sizeof (PKT) != totlen)
548 continue;
549
550 PKT *pkt = (PKT *)(buf + hdrlen);
551
552 if (pkt->type != ICMP4_ECHO_REPLY
553 || pkt->id != (uint16_t) MAGIC
554 || pkt->seq != (uint16_t)~MAGIC
555 || !isnormal (pkt->stamp))
556 continue;
557
558 AV *av = newAV ();
559 av_push (av, newSVpvn ((char *)&sa.sin_addr, 4));
560 av_push (av, newSVnv (now - pkt->stamp));
561 av_push (av, newSVuv (pkt->payload));
562
563 av_push (res_av, newRV_noinc ((SV *)av));
564 }
565
566 feed_reply (res_av);
567 }
568
569 void
570 _recv_icmp6 (...)
571 CODE:
572 {
573 struct sockaddr_in6 sa;
574 socklen_t sl = sizeof (sa);
575 AV *res_av = (AV *)sv_2mortal ((SV *)newAV ());
576 PKT pkt;
577 tstamp now = NOW ();
578
579 for (;;)
580 {
581 int len = recvfrom (icmp6_fd, &pkt, sizeof (pkt), MSG_DONTWAIT | MSG_TRUNC, &sa, &sl);
582
583 if (len != sizeof (PKT))
584 break;
585
586 if (pkt.type != ICMP6_ECHO_REPLY
587 || pkt.id != (uint16_t) MAGIC
588 || pkt.seq != (uint16_t)~MAGIC
589 || !isnormal (pkt.stamp))
590 continue;
591
592 AV *av = newAV ();
593 av_push (av, newSVpvn ((char *)&sa.sin6_addr, 16));
594 av_push (av, newSVnv (now - pkt.stamp));
595 av_push (av, newSVuv (pkt.payload));
596
597 av_push (res_av, newRV_noinc ((SV *)av));
598 }
599
600 feed_reply (res_av);
601 }
602