ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FastPing/FastPing.xs
(Generate patch)

Comparing AnyEvent-FastPing/FastPing.xs (file contents):
Revision 1.10 by root, Mon Jan 31 05:35:48 2011 UTC vs.
Revision 1.12 by root, Wed Feb 2 19:26:45 2011 UTC

44#define MIN_INTERVAL 1e-6 // minimum packet send interval, in seconds 44#define MIN_INTERVAL 1e-6 // minimum packet send interval, in seconds
45 45
46#define HDR_SIZE_IP4 20 46#define HDR_SIZE_IP4 20
47#define HDR_SIZE_IP6 48 47#define HDR_SIZE_IP6 48
48 48
49//TODO: xread/xwrite for atomicity? we currently rely on the fact that the pip buffersize divides exactly by pointer sizes 49static int thr_res[2]; // worker thread finished status
50
51typedef uint8_t addr_tt[16];
52
53static int thr_res[2]; // receive from worker(s)
54static int icmp4_fd = -1, icmp6_fd = -1; 50static int icmp4_fd = -1;
51static int icmp6_fd = -1;
55 52
56/*****************************************************************************/ 53/*****************************************************************************/
57 54
58typedef double tstamp; 55typedef double tstamp;
59 56
106 103
107 uint32_t src; 104 uint32_t src;
108 uint32_t dst; 105 uint32_t dst;
109} IP4HDR; 106} IP4HDR;
110 107
111#if 0 108/*****************************************************************************/
109
110typedef uint8_t addr_tt[16];
111
112typedef struct 112typedef struct
113{ 113{
114 uint8_t version;
115 uint8_t x1, x2, x3;
116
117 uint16_t payload_len;
118 uint8_t nxt_hdr;
119 uint8_t hop_limit;
120
121 uint8_t src[16];
122 uint8_t dst[16];
123} IP6HDR;
124#endif
125
126/*****************************************************************************/
127
128typedef struct
129{
130 tstamp next; 114 tstamp next;
131 double interval; 115 tstamp interval;
132 int addrlen; 116 int addrlen;
133 addr_tt lo, hi; 117
134 void *items; 118 addr_tt lo, hi; /* only if !addrcnt */
119
120 int addrcnt;
121 /* addrcnt addresses follow */
135} RANGE; 122} RANGE;
136 123
137typedef struct 124typedef struct
138{ 125{
139 RANGE **ranges; 126 RANGE **ranges;
140 int rangecnt, rangemax; 127 int rangecnt, rangemax;
141 128
129 tstamp next;
142 tstamp interval; 130 tstamp interval;
131
143 tstamp maxrtt; 132 tstamp maxrtt;
133
144 uint16_t magic1; 134 uint16_t magic1;
145 uint16_t magic2; 135 uint16_t magic2;
146 uint16_t magic3; 136 uint16_t magic3;
147 137
148 int id; 138 int id;
166 156
167typedef struct 157typedef struct
168{ 158{
169 uint8_t type, code; 159 uint8_t type, code;
170 uint16_t cksum; 160 uint16_t cksum;
161
171 uint16_t id, seq; 162 uint16_t id, seq;
163
172 uint16_t pinger; 164 uint16_t pinger;
173 uint16_t magic; 165 uint16_t magic;
166
174 uint32_t stamp_hi; 167 uint32_t stamp_hi;
175 uint32_t stamp_lo; 168 uint32_t stamp_lo;
176} PKT; 169} PKT;
177 170
178static int 171static int
228range_free (RANGE *self) 221range_free (RANGE *self)
229{ 222{
230 free (self); 223 free (self);
231} 224}
232 225
233static void
234inc_addr (addr_tt *addr)
235{
236 int len = sizeof (addr_tt) - 1;
237
238 while (!++(*addr)[len])
239 --len;
240}
241
242/*****************************************************************************/
243
244/* like sendto, but retries on failure */ 226/* like sendto, but retries on failure */
245static void 227static void
246xsendto (int fd, void *buf, size_t len, int flags, void *sa, int salen) 228xsendto (int fd, void *buf, size_t len, int flags, void *sa, int salen)
247{ 229{
248 tstamp wait = 0; 230 tstamp wait = DRAIN_INTERVAL / 2.;
249 231
250 while (sendto (fd, buf, len, flags, sa, salen) < 0 && errno == ENOBUFS) 232 while (sendto (fd, buf, len, flags, sa, salen) < 0 && errno == ENOBUFS)
251 ssleep (wait += DRAIN_INTERVAL); 233 ssleep (wait *= 2.);
252} 234}
253 235
236// ping current address, return true and increment if more to ping
254static void 237static int
255send_ping (PKT *pkt, addr_tt *addr, int addr_len) 238range_send_ping (RANGE *self, PKT *pkt)
256{ 239{
240 // send ping
241 uint8_t *addr;
242
243 if (self->addrcnt)
244 addr = (self->addrcnt - 1) * self->addrlen + (uint8_t *)(self + 1);
245 else
246 addr = sizeof (addr_tt) - self->addrlen + self->lo;
247
257 pkt->cksum = 0; 248 pkt->cksum = 0;
258 249
259 if (addr_len == 4) 250 if (self->addrlen == 4)
260 { 251 {
261 struct sockaddr_in sa; 252 struct sockaddr_in sa;
262 253
263 pkt->type = ICMP4_ECHO; 254 pkt->type = ICMP4_ECHO;
264 pkt_cksum (pkt); 255 pkt_cksum (pkt);
265 256
266 sa.sin_family = AF_INET; 257 sa.sin_family = AF_INET;
267 sa.sin_port = 0; 258 sa.sin_port = 0;
268 259
269 memcpy (&sa.sin_addr, 260 memcpy (&sa.sin_addr, addr, sizeof (sa.sin_addr));
270 sizeof (addr_tt) - sizeof (sa.sin_addr) + (char *)addr,
271 sizeof (sa.sin_addr));
272 261
273 xsendto (icmp4_fd, pkt, sizeof (*pkt), 0, &sa, sizeof (sa)); 262 xsendto (icmp4_fd, pkt, sizeof (*pkt), 0, &sa, sizeof (sa));
274 } 263 }
275 else 264 else
276 { 265 {
282 sa.sin6_family = AF_INET6; 271 sa.sin6_family = AF_INET6;
283 sa.sin6_port = 0; 272 sa.sin6_port = 0;
284 sa.sin6_flowinfo = 0; 273 sa.sin6_flowinfo = 0;
285 sa.sin6_scope_id = 0; 274 sa.sin6_scope_id = 0;
286 275
287 memcpy (&sa.sin6_addr, 276 memcpy (&sa.sin6_addr, addr, sizeof (sa.sin6_addr));
288 sizeof (addr_tt) - sizeof (sa.sin6_addr) + (char *)addr,
289 sizeof (sa.sin6_addr));
290 277
291 xsendto (icmp6_fd, &pkt, sizeof (pkt), 0, &sa, sizeof (sa)); 278 xsendto (icmp6_fd, pkt, sizeof (*pkt), 0, &sa, sizeof (sa));
292#endif 279#endif
293 } 280 }
281
282 // see if we have any more addresses
283 if (self->addrcnt)
284 {
285 if (!--self->addrcnt)
286 return 0;
287 }
288 else
289 {
290 if (!memcmp (&self->lo, &self->hi, sizeof (addr_tt)))
291 return 0;
292
293 // increment self->lo
294 {
295 int len = sizeof (addr_tt) - 1;
296
297 while (!++self->lo [len])
298 --len;
299 }
300 }
301
302 return 1;
294} 303}
304
305/*****************************************************************************/
295 306
296static void 307static void
297downheap (PINGER *self) 308downheap (PINGER *self)
298{ 309{
299 RANGE *elem = self->ranges [0]; /* always exists */ 310 RANGE *elem = self->ranges [0]; /* always exists */
349 PINGER *self = (PINGER *)self_; 360 PINGER *self = (PINGER *)self_;
350 PKT pkt; 361 PKT pkt;
351 362
352 memset (&pkt, 0, sizeof (pkt)); 363 memset (&pkt, 0, sizeof (pkt));
353 364
354 tstamp now = NOW (); 365 tstamp now = NOW ();
355 tstamp next = now;
356 366
357 pkt.code = 0; 367 pkt.code = 0;
358 pkt.id = self->magic1; 368 pkt.id = self->magic1;
359 pkt.seq = self->magic2; 369 pkt.seq = self->magic2;
360 pkt.magic = self->magic3; 370 pkt.magic = self->magic3;
361 pkt.pinger = self->id; 371 pkt.pinger = self->id;
372
373 if (self->next < now)
374 self->next = now;
362 375
363 while (self->rangecnt) 376 while (self->rangecnt)
364 { 377 {
365 RANGE *range = self->ranges [0]; 378 RANGE *range = self->ranges [0];
366 int n, k;
367 379
368 // ranges [0] is always the next range to ping 380 // ranges [0] is always the next range to ping
369 tstamp wait = range->next - now; 381 tstamp wait = range->next - now;
370 382
371 // compare with the global frequency limit 383 // compare with the global frequency limit
372 { 384 {
373 tstamp diff = next - now; 385 tstamp diff = self->next - now;
374 386
375 if (wait < diff) 387 if (wait < diff)
376 wait = diff; 388 wait = diff; // global rate limit overrides
377 else if (range) 389 else
378 next = range->next; 390 self->next = range->next; // fast forward
379 } 391 }
380 392
381 if (wait > 0.) 393 if (wait > 0.)
382 ssleep (wait); 394 ssleep (wait);
383 395
384 now = NOW (); 396 now = NOW ();
385 397
386 ts_to_pkt (&pkt, now); 398 ts_to_pkt (&pkt, now);
387 399
388 send_ping (&pkt, &range->lo, range->addrlen); 400 if (!range_send_ping (range, &pkt))
389
390 if (!memcmp (&range->lo, &range->hi, sizeof (addr_tt)))
391 { 401 {
392 self->ranges [0] = self->ranges [--self->rangecnt]; 402 self->ranges [0] = self->ranges [--self->rangecnt];
393 range_free (range); 403 range_free (range);
394 } 404 }
395 else 405 else
396 {
397 inc_addr (&range->lo);
398
399 range->next = next;
400 range->next += range->interval; 406 range->next = self->next + range->interval;
401 407
402 downheap (self); 408 downheap (self);
403 }
404 409
405 next += self->interval; 410 self->next += self->interval;
411 now = NOW ();
406 } 412 }
407 413
408 ssleep (self->maxrtt); 414 ssleep (self->maxrtt);
409 415
410 { 416 {
478 } 484 }
479 485
480 pingers [self->id] = self; 486 pingers [self->id] = self;
481 487
482 self->recvcb = &PL_sv_undef; 488 self->recvcb = &PL_sv_undef;
489 self->next = 0.;
483 self->interval = MIN_INTERVAL; 490 self->interval = MIN_INTERVAL;
484 self->maxrtt = 0.5; 491 self->maxrtt = 0.5;
485 self->rangemax = 16; 492 self->rangemax = 16;
486 self->ranges = malloc (sizeof (self->ranges [0]) * self->rangemax); 493 self->ranges = malloc (sizeof (self->ranges [0]) * self->rangemax);
487} 494}
503 range_free (self->ranges [--self->rangecnt]); 510 range_free (self->ranges [--self->rangecnt]);
504 511
505 free (self->ranges); 512 free (self->ranges);
506} 513}
507 514
515static void
516pinger_add_range (PINGER *self, RANGE *range)
517{
518 if (self->rangecnt == self->rangemax)
519 self->ranges = realloc (self->ranges, sizeof (self->ranges [0]) * (self->rangemax <<= 1));
520
521 self->ranges [self->rangecnt] = range;
522 upheap (self, self->rangecnt);
523 ++self->rangecnt;
524}
525
508/*****************************************************************************/ 526/*****************************************************************************/
509 527
510static void 528static void
511recv_feed (PINGER *self, void *addr, int addrlen, tstamp rtt) 529recv_feed (PINGER *self, void *addr, int addrlen, tstamp rtt)
512{ 530{
542 return; 560 return;
543 561
544 ENTER; 562 ENTER;
545 SAVETMPS; 563 SAVETMPS;
546 564
547 while (firstrecv >= 0) 565 do
548 { 566 {
549 dSP; 567 dSP;
550 PINGER *self = pingers [firstrecv]; 568 PINGER *self = pingers [firstrecv];
551 firstrecv = self->nextrecv; 569 firstrecv = self->nextrecv;
552 570
556 XPUSHs (sv_2mortal (newRV_noinc ((SV *)self->recvq))); 574 XPUSHs (sv_2mortal (newRV_noinc ((SV *)self->recvq)));
557 self->recvq = 0; 575 self->recvq = 0;
558 PUTBACK; 576 PUTBACK;
559 call_sv (self->recvcb, G_DISCARD | G_VOID); 577 call_sv (self->recvcb, G_DISCARD | G_VOID);
560 } 578 }
579 while (firstrecv >= 0);
561 580
562 FREETMPS; 581 FREETMPS;
563 LEAVE; 582 LEAVE;
564} 583}
565 584
627 646
628 sv_setiv (get_sv ("AnyEvent::FastPing::ICMP4_FD", 1), icmp4_fd); 647 sv_setiv (get_sv ("AnyEvent::FastPing::ICMP4_FD", 1), icmp4_fd);
629 sv_setiv (get_sv ("AnyEvent::FastPing::ICMP6_FD", 1), icmp6_fd); 648 sv_setiv (get_sv ("AnyEvent::FastPing::ICMP6_FD", 1), icmp6_fd);
630} 649}
631 650
651#define NOT_RUNNING \
652 if (self->running) \
653 croak ("AnyEvent::FastPing object has been started - you have to sotp t first before calling this method, caught");
654
632MODULE = AnyEvent::FastPing PACKAGE = AnyEvent::FastPing PREFIX = pinger_ 655MODULE = AnyEvent::FastPing PACKAGE = AnyEvent::FastPing PREFIX = pinger_
633 656
634BOOT: 657BOOT:
635{ 658{
636 HV *stash = gv_stashpv ("AnyEvent::FastPing", 1); 659 HV *stash = gv_stashpv ("AnyEvent::FastPing", 1);
648} 671}
649 672
650PROTOTYPES: DISABLE 673PROTOTYPES: DISABLE
651 674
652void 675void
653_new (SV *klass, UV magic1, UV magic2, UV magic3)
654 PPCODE:
655{
656 SV *pv = NEWSV (0, sizeof (PINGER));
657 PINGER *self = (PINGER *)SvPVX (pv);
658 SvPOK_only (pv);
659 XPUSHs (sv_2mortal (sv_bless (newRV_noinc (pv), gv_stashpv (SvPV_nolen (klass), 1))));
660 pinger_init (self);
661 self->magic1 = magic1;
662 self->magic2 = magic2;
663 self->magic3 = magic3;
664}
665
666void
667_free (PINGER *self)
668 CODE:
669 pinger_free (self);
670
671IV
672id (PINGER *self, ...)
673 CODE:
674 RETVAL = self->id;
675 OUTPUT:
676 RETVAL
677
678void pinger_start (PINGER *self)
679
680void pinger_stop (PINGER *self)
681
682void _stop_id (UV id)
683 CODE:
684 if (id < pingercnt && pingers [id])
685 pinger_stop (pingers [id]);
686
687void interval (PINGER *self, NV interval)
688 CODE:
689 self->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL;
690
691void add_range (PINGER *self, SV *lo_, SV *hi_, NV interval)
692 CODE:
693{
694 STRLEN lo_len, hi_len;
695 char *lo = SvPVbyte (lo_, lo_len);
696 char *hi = SvPVbyte (hi_, hi_len);
697 RANGE *range;
698
699 if (lo_len != hi_len || (lo_len != 4 && lo_len != 16))
700 croak ("AnyEvent::FastPing::add_range address range must be specified as two binary IPv4 or IPv6 addresses");
701
702 if (lo_len == 4 && icmp4_fd < 0) croak ("IPv4 support unavailable");
703 if (lo_len == 16 && icmp6_fd < 0) croak ("IPv6 support unavailable");
704
705 range = calloc (1, sizeof (RANGE));
706
707 range->next = 0;
708 range->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL;
709 range->addrlen = lo_len;
710
711 memcpy (sizeof (addr_tt) - lo_len + (char *)&range->lo, lo, lo_len);
712 memcpy (sizeof (addr_tt) - lo_len + (char *)&range->hi, hi, lo_len);
713
714 if (self->rangecnt == self->rangemax)
715 self->ranges = realloc (self->ranges, sizeof (self->ranges [0]) * (self->rangemax <<= 1));
716
717 self->ranges [self->rangecnt] = range;
718 upheap (self, self->rangecnt);
719 ++self->rangecnt;
720}
721
722void
723on_recv (PINGER *self, SV *cb)
724 CODE:
725 SvREFCNT_dec (self->recvcb);
726 self->recvcb = newSVsv (cb);
727
728void
729max_rtt (PINGER *self, NV maxrtt)
730 CODE:
731 self->maxrtt = maxrtt;
732
733void
734_recv_icmp4 (...) 676_recv_icmp4 (...)
735 CODE: 677 CODE:
736{ 678{
737 char buf [512]; 679 char buf [512];
738 struct sockaddr_in sa; 680 struct sockaddr_in sa;
681 int maxrecv;
739 682
740 for (;;) 683 for (maxrecv = 256+1; --maxrecv; )
741 { 684 {
742 PINGER *pinger; 685 PINGER *pinger;
743 IP4HDR *iphdr = (IP4HDR *)buf; 686 IP4HDR *iphdr = (IP4HDR *)buf;
744 socklen_t sl = sizeof (sa); 687 socklen_t sl = sizeof (sa);
745 int len = recvfrom (icmp4_fd, buf, sizeof (buf), MSG_TRUNC, (struct sockaddr *)&sa, &sl); 688 int len = recvfrom (icmp4_fd, buf, sizeof (buf), MSG_TRUNC, (struct sockaddr *)&sa, &sl);
750 break; 693 break;
751 694
752 hdrlen = (iphdr->version_ihl & 15) * 4; 695 hdrlen = (iphdr->version_ihl & 15) * 4;
753 totlen = ntohs (iphdr->tot_len); 696 totlen = ntohs (iphdr->tot_len);
754 697
755 // packet corrupt?
756 if (totlen > len 698 if (totlen > len
757 || iphdr->protocol != IPPROTO_ICMP 699 || iphdr->protocol != IPPROTO_ICMP
758 || hdrlen < HDR_SIZE_IP4 || hdrlen + sizeof (PKT) != totlen) 700 || hdrlen < HDR_SIZE_IP4 || hdrlen + sizeof (PKT) != totlen)
759 continue; 701 continue;
760 702
780_recv_icmp6 (...) 722_recv_icmp6 (...)
781 CODE: 723 CODE:
782{ 724{
783 struct sockaddr_in6 sa; 725 struct sockaddr_in6 sa;
784 PKT pkt; 726 PKT pkt;
727 int maxrecv;
785 728
786 for (;;) 729 for (maxrecv = 256+1; --maxrecv; )
787 { 730 {
788 PINGER *pinger; 731 PINGER *pinger;
789 socklen_t sl = sizeof (sa); 732 socklen_t sl = sizeof (sa);
790 int len = recvfrom (icmp6_fd, &pkt, sizeof (pkt), MSG_TRUNC, (struct sockaddr *)&sa, &sl); 733 int len = recvfrom (icmp6_fd, &pkt, sizeof (pkt), MSG_TRUNC, (struct sockaddr *)&sa, &sl);
791 734
806 } 749 }
807 750
808 recv_flush (); 751 recv_flush ();
809} 752}
810 753
754void
755_new (SV *klass, UV magic1, UV magic2, UV magic3)
756 PPCODE:
757{
758 SV *pv = NEWSV (0, sizeof (PINGER));
759 PINGER *self = (PINGER *)SvPVX (pv);
760
761 SvPOK_only (pv);
762 XPUSHs (sv_2mortal (sv_bless (newRV_noinc (pv), gv_stashpv (SvPVutf8_nolen (klass), 1))));
763 pinger_init (self);
764 self->magic1 = magic1;
765 self->magic2 = magic2;
766 self->magic3 = magic3;
767}
768
769void
770_free (PINGER *self)
771 CODE:
772 pinger_free (self);
773
774IV
775id (PINGER *self, ...)
776 CODE:
777 RETVAL = self->id;
778 OUTPUT:
779 RETVAL
780
781void pinger_start (PINGER *self)
782
783void pinger_stop (PINGER *self)
784
785void
786_stop_id (UV id)
787 CODE:
788 if (id < pingercnt && pingers [id])
789 pinger_stop (pingers [id]);
790
791void
792interval (PINGER *self, NV interval)
793 CODE:
794 NOT_RUNNING;
795 self->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL;
796
797void
798max_rtt (PINGER *self, NV maxrtt)
799 CODE:
800 NOT_RUNNING;
801 self->maxrtt = maxrtt;
802
803void
804on_recv (PINGER *self, SV *cb)
805 CODE:
806 SvREFCNT_dec (self->recvcb);
807 self->recvcb = newSVsv (cb);
808
809void
810add_range (PINGER *self, SV *lo_, SV *hi_, NV interval = 0)
811 CODE:
812{
813 STRLEN lo_len, hi_len;
814 char *lo = SvPVbyte (lo_, lo_len);
815 char *hi = SvPVbyte (hi_, hi_len);
816 RANGE *range;
817 NOT_RUNNING;
818
819 if (lo_len != hi_len || (lo_len != 4 && lo_len != 16))
820 croak ("AnyEvent::FastPing::add_range address range must be specified as two binary IPv4 or IPv6 addresses");
821
822 if (lo_len == 4 && icmp4_fd < 0) croak ("IPv4 support unavailable");
823 if (lo_len == 16 && icmp6_fd < 0) croak ("IPv6 support unavailable");
824
825 if (memcmp (lo, hi, lo_len) > 0)
826 croak ("AnyEvent::FastPing::add_range called with lo > hi");
827
828 range = calloc (1, sizeof (RANGE));
829
830 range->next = 0;
831 range->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL;
832 range->addrlen = lo_len;
833
834 memcpy (sizeof (addr_tt) - lo_len + (char *)&range->lo, lo, lo_len);
835 memcpy (sizeof (addr_tt) - lo_len + (char *)&range->hi, hi, lo_len);
836
837 pinger_add_range (self, range);
838}
839
840void
841add_hosts (PINGER *self, SV *addrs, NV interval = 0, UV interleave = 1)
842 CODE:
843{
844 AV *av;
845 int i, j, k;
846 int cnt;
847 int addrlen;
848 RANGE *range;
849 NOT_RUNNING;
850
851 if (!SvROK (addrs) || SvTYPE (SvRV (addrs)) != SVt_PVAV)
852 croak ("AnyEvent::FastPing::add_hosts expects an arrayref with binary IPv4 or IPv6 addresses");
853
854 av = (AV *)SvRV (addrs);
855 cnt = av_len (av) + 1;
856
857 if (!cnt)
858 XSRETURN_EMPTY;
859
860 addrlen = SvCUR (*av_fetch (av, 0, 1));
861
862 if (addrlen != 4 && addrlen != 16)
863 croak ("AnyEvent::FastPing::add_hosts addresses must be specified as binary IPv4 or IPv6 addresses");
864
865 for (i = cnt; --i; )
866 {
867 SV *sv = *av_fetch (av, i, 1);
868
869 if (!sv_utf8_downgrade (sv, 1) || addrlen != SvCUR (sv))
870 croak ("AnyEvent::FastPing::add_hosts addresses must all have the same size");
871 }
872
873 range = calloc (1, sizeof (RANGE) + cnt * addrlen);
874
875 range->next = 0;
876 range->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL;
877 range->addrlen = addrlen;
878 range->addrcnt = cnt;
879
880 if (interleave == 0)
881 interleave = cnt <= 256 * 256 ? 256 : (int)sqrtf (cnt);
882
883 k = cnt;
884 for (j = 0; j < interleave; ++j)
885 for (i = j; i < cnt; i += interleave)
886 memcpy ((uint8_t *)(range + 1) + --k * addrlen,
887 SvPVbyte_nolen (*av_fetch (av, i, 1)),
888 addrlen);
889
890 pinger_add_range (self, range);
891}
892

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines