ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-FastPing/FastPing.xs
(Generate patch)

Comparing AnyEvent-FastPing/FastPing.xs (file contents):
Revision 1.10 by root, Mon Jan 31 05:35:48 2011 UTC vs.
Revision 1.11 by root, Tue Feb 1 04:06:24 2011 UTC

44#define MIN_INTERVAL 1e-6 // minimum packet send interval, in seconds 44#define MIN_INTERVAL 1e-6 // minimum packet send interval, in seconds
45 45
46#define HDR_SIZE_IP4 20 46#define HDR_SIZE_IP4 20
47#define HDR_SIZE_IP6 48 47#define HDR_SIZE_IP6 48
48 48
49//TODO: xread/xwrite for atomicity? we currently rely on the fact that the pip buffersize divides exactly by pointer sizes 49static int thr_res[2]; // worker thread finished status
50
51typedef uint8_t addr_tt[16];
52
53static int thr_res[2]; // receive from worker(s)
54static int icmp4_fd = -1, icmp6_fd = -1; 50static int icmp4_fd = -1;
51static int icmp6_fd = -1;
55 52
56/*****************************************************************************/ 53/*****************************************************************************/
57 54
58typedef double tstamp; 55typedef double tstamp;
59 56
106 103
107 uint32_t src; 104 uint32_t src;
108 uint32_t dst; 105 uint32_t dst;
109} IP4HDR; 106} IP4HDR;
110 107
111#if 0
112typedef struct
113{
114 uint8_t version;
115 uint8_t x1, x2, x3;
116
117 uint16_t payload_len;
118 uint8_t nxt_hdr;
119 uint8_t hop_limit;
120
121 uint8_t src[16];
122 uint8_t dst[16];
123} IP6HDR;
124#endif
125
126/*****************************************************************************/ 108/*****************************************************************************/
109
110typedef uint8_t addr_tt[16];
127 111
128typedef struct 112typedef struct
129{ 113{
130 tstamp next; 114 tstamp next;
131 double interval; 115 double interval;
132 int addrlen; 116 int addrlen;
133 addr_tt lo, hi; 117
134 void *items; 118 addr_tt lo, hi; /* only if !addrcnt */
119
120 int addrcnt;
121 /* addrcnt addresses follow */
135} RANGE; 122} RANGE;
136 123
137typedef struct 124typedef struct
138{ 125{
139 RANGE **ranges; 126 RANGE **ranges;
166 153
167typedef struct 154typedef struct
168{ 155{
169 uint8_t type, code; 156 uint8_t type, code;
170 uint16_t cksum; 157 uint16_t cksum;
158
171 uint16_t id, seq; 159 uint16_t id, seq;
160
172 uint16_t pinger; 161 uint16_t pinger;
173 uint16_t magic; 162 uint16_t magic;
163
174 uint32_t stamp_hi; 164 uint32_t stamp_hi;
175 uint32_t stamp_lo; 165 uint32_t stamp_lo;
176} PKT; 166} PKT;
177 167
178static int 168static int
228range_free (RANGE *self) 218range_free (RANGE *self)
229{ 219{
230 free (self); 220 free (self);
231} 221}
232 222
233static void
234inc_addr (addr_tt *addr)
235{
236 int len = sizeof (addr_tt) - 1;
237
238 while (!++(*addr)[len])
239 --len;
240}
241
242/*****************************************************************************/
243
244/* like sendto, but retries on failure */ 223/* like sendto, but retries on failure */
245static void 224static void
246xsendto (int fd, void *buf, size_t len, int flags, void *sa, int salen) 225xsendto (int fd, void *buf, size_t len, int flags, void *sa, int salen)
247{ 226{
248 tstamp wait = 0; 227 tstamp wait = DRAIN_INTERVAL / 2.;
249 228
250 while (sendto (fd, buf, len, flags, sa, salen) < 0 && errno == ENOBUFS) 229 while (sendto (fd, buf, len, flags, sa, salen) < 0 && errno == ENOBUFS)
251 ssleep (wait += DRAIN_INTERVAL); 230 ssleep (wait *= 2.);
252} 231}
253 232
233// ping current address, return true and increment if more to ping
254static void 234static int
255send_ping (PKT *pkt, addr_tt *addr, int addr_len) 235range_send_ping (RANGE *self, PKT *pkt)
256{ 236{
237 // send ping
238 uint8_t *addr;
239
240 if (self->addrcnt)
241 addr = (self->addrcnt - 1) * self->addrlen + (uint8_t *)(self + 1);
242 else
243 addr = sizeof (addr_tt) - self->addrlen + self->lo;
244
257 pkt->cksum = 0; 245 pkt->cksum = 0;
258 246
259 if (addr_len == 4) 247 if (self->addrlen == 4)
260 { 248 {
261 struct sockaddr_in sa; 249 struct sockaddr_in sa;
262 250
263 pkt->type = ICMP4_ECHO; 251 pkt->type = ICMP4_ECHO;
264 pkt_cksum (pkt); 252 pkt_cksum (pkt);
265 253
266 sa.sin_family = AF_INET; 254 sa.sin_family = AF_INET;
267 sa.sin_port = 0; 255 sa.sin_port = 0;
268 256
269 memcpy (&sa.sin_addr, 257 memcpy (&sa.sin_addr, addr, sizeof (sa.sin_addr));
270 sizeof (addr_tt) - sizeof (sa.sin_addr) + (char *)addr,
271 sizeof (sa.sin_addr));
272 258
273 xsendto (icmp4_fd, pkt, sizeof (*pkt), 0, &sa, sizeof (sa)); 259 xsendto (icmp4_fd, pkt, sizeof (*pkt), 0, &sa, sizeof (sa));
274 } 260 }
275 else 261 else
276 { 262 {
282 sa.sin6_family = AF_INET6; 268 sa.sin6_family = AF_INET6;
283 sa.sin6_port = 0; 269 sa.sin6_port = 0;
284 sa.sin6_flowinfo = 0; 270 sa.sin6_flowinfo = 0;
285 sa.sin6_scope_id = 0; 271 sa.sin6_scope_id = 0;
286 272
287 memcpy (&sa.sin6_addr, 273 memcpy (&sa.sin6_addr, addr, sizeof (sa.sin6_addr));
288 sizeof (addr_tt) - sizeof (sa.sin6_addr) + (char *)addr,
289 sizeof (sa.sin6_addr));
290 274
291 xsendto (icmp6_fd, &pkt, sizeof (pkt), 0, &sa, sizeof (sa)); 275 xsendto (icmp6_fd, pkt, sizeof (*pkt), 0, &sa, sizeof (sa));
292#endif 276#endif
293 } 277 }
278
279 // see if we have any more addresses
280 if (self->addrcnt)
281 {
282 if (!--self->addrcnt)
283 return 0;
284 }
285 else
286 {
287 if (!memcmp (&self->lo, &self->hi, sizeof (addr_tt)))
288 return 0;
289
290 // increment self->lo
291 {
292 int len = sizeof (addr_tt) - 1;
293
294 while (!++self->lo [len])
295 --len;
296 }
297 }
298
299 return 1;
294} 300}
301
302/*****************************************************************************/
295 303
296static void 304static void
297downheap (PINGER *self) 305downheap (PINGER *self)
298{ 306{
299 RANGE *elem = self->ranges [0]; /* always exists */ 307 RANGE *elem = self->ranges [0]; /* always exists */
383 391
384 now = NOW (); 392 now = NOW ();
385 393
386 ts_to_pkt (&pkt, now); 394 ts_to_pkt (&pkt, now);
387 395
388 send_ping (&pkt, &range->lo, range->addrlen); 396 if (!range_send_ping (range, &pkt))
389
390 if (!memcmp (&range->lo, &range->hi, sizeof (addr_tt)))
391 { 397 {
392 self->ranges [0] = self->ranges [--self->rangecnt]; 398 self->ranges [0] = self->ranges [--self->rangecnt];
393 range_free (range); 399 range_free (range);
394 } 400 }
395 else 401 else
396 { 402 {
397 inc_addr (&range->lo);
398
399 range->next = next; 403 range->next = next;
400 range->next += range->interval; 404 range->next += range->interval;
401
402 downheap (self);
403 } 405 }
406
407 downheap (self);
404 408
405 next += self->interval; 409 next += self->interval;
406 } 410 }
407 411
408 ssleep (self->maxrtt); 412 ssleep (self->maxrtt);
503 range_free (self->ranges [--self->rangecnt]); 507 range_free (self->ranges [--self->rangecnt]);
504 508
505 free (self->ranges); 509 free (self->ranges);
506} 510}
507 511
512static void
513pinger_add_range (PINGER *self, RANGE *range)
514{
515 if (self->rangecnt == self->rangemax)
516 self->ranges = realloc (self->ranges, sizeof (self->ranges [0]) * (self->rangemax <<= 1));
517
518 self->ranges [self->rangecnt] = range;
519 upheap (self, self->rangecnt);
520 ++self->rangecnt;
521}
522
508/*****************************************************************************/ 523/*****************************************************************************/
509 524
510static void 525static void
511recv_feed (PINGER *self, void *addr, int addrlen, tstamp rtt) 526recv_feed (PINGER *self, void *addr, int addrlen, tstamp rtt)
512{ 527{
542 return; 557 return;
543 558
544 ENTER; 559 ENTER;
545 SAVETMPS; 560 SAVETMPS;
546 561
547 while (firstrecv >= 0) 562 do
548 { 563 {
549 dSP; 564 dSP;
550 PINGER *self = pingers [firstrecv]; 565 PINGER *self = pingers [firstrecv];
551 firstrecv = self->nextrecv; 566 firstrecv = self->nextrecv;
552 567
556 XPUSHs (sv_2mortal (newRV_noinc ((SV *)self->recvq))); 571 XPUSHs (sv_2mortal (newRV_noinc ((SV *)self->recvq)));
557 self->recvq = 0; 572 self->recvq = 0;
558 PUTBACK; 573 PUTBACK;
559 call_sv (self->recvcb, G_DISCARD | G_VOID); 574 call_sv (self->recvcb, G_DISCARD | G_VOID);
560 } 575 }
576 while (firstrecv >= 0);
561 577
562 FREETMPS; 578 FREETMPS;
563 LEAVE; 579 LEAVE;
564} 580}
565 581
648} 664}
649 665
650PROTOTYPES: DISABLE 666PROTOTYPES: DISABLE
651 667
652void 668void
653_new (SV *klass, UV magic1, UV magic2, UV magic3)
654 PPCODE:
655{
656 SV *pv = NEWSV (0, sizeof (PINGER));
657 PINGER *self = (PINGER *)SvPVX (pv);
658 SvPOK_only (pv);
659 XPUSHs (sv_2mortal (sv_bless (newRV_noinc (pv), gv_stashpv (SvPV_nolen (klass), 1))));
660 pinger_init (self);
661 self->magic1 = magic1;
662 self->magic2 = magic2;
663 self->magic3 = magic3;
664}
665
666void
667_free (PINGER *self)
668 CODE:
669 pinger_free (self);
670
671IV
672id (PINGER *self, ...)
673 CODE:
674 RETVAL = self->id;
675 OUTPUT:
676 RETVAL
677
678void pinger_start (PINGER *self)
679
680void pinger_stop (PINGER *self)
681
682void _stop_id (UV id)
683 CODE:
684 if (id < pingercnt && pingers [id])
685 pinger_stop (pingers [id]);
686
687void interval (PINGER *self, NV interval)
688 CODE:
689 self->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL;
690
691void add_range (PINGER *self, SV *lo_, SV *hi_, NV interval)
692 CODE:
693{
694 STRLEN lo_len, hi_len;
695 char *lo = SvPVbyte (lo_, lo_len);
696 char *hi = SvPVbyte (hi_, hi_len);
697 RANGE *range;
698
699 if (lo_len != hi_len || (lo_len != 4 && lo_len != 16))
700 croak ("AnyEvent::FastPing::add_range address range must be specified as two binary IPv4 or IPv6 addresses");
701
702 if (lo_len == 4 && icmp4_fd < 0) croak ("IPv4 support unavailable");
703 if (lo_len == 16 && icmp6_fd < 0) croak ("IPv6 support unavailable");
704
705 range = calloc (1, sizeof (RANGE));
706
707 range->next = 0;
708 range->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL;
709 range->addrlen = lo_len;
710
711 memcpy (sizeof (addr_tt) - lo_len + (char *)&range->lo, lo, lo_len);
712 memcpy (sizeof (addr_tt) - lo_len + (char *)&range->hi, hi, lo_len);
713
714 if (self->rangecnt == self->rangemax)
715 self->ranges = realloc (self->ranges, sizeof (self->ranges [0]) * (self->rangemax <<= 1));
716
717 self->ranges [self->rangecnt] = range;
718 upheap (self, self->rangecnt);
719 ++self->rangecnt;
720}
721
722void
723on_recv (PINGER *self, SV *cb)
724 CODE:
725 SvREFCNT_dec (self->recvcb);
726 self->recvcb = newSVsv (cb);
727
728void
729max_rtt (PINGER *self, NV maxrtt)
730 CODE:
731 self->maxrtt = maxrtt;
732
733void
734_recv_icmp4 (...) 669_recv_icmp4 (...)
735 CODE: 670 CODE:
736{ 671{
737 char buf [512]; 672 char buf [512];
738 struct sockaddr_in sa; 673 struct sockaddr_in sa;
674 int maxrecv;
739 675
740 for (;;) 676 for (maxrecv = 256+1; --maxrecv; )
741 { 677 {
742 PINGER *pinger; 678 PINGER *pinger;
743 IP4HDR *iphdr = (IP4HDR *)buf; 679 IP4HDR *iphdr = (IP4HDR *)buf;
744 socklen_t sl = sizeof (sa); 680 socklen_t sl = sizeof (sa);
745 int len = recvfrom (icmp4_fd, buf, sizeof (buf), MSG_TRUNC, (struct sockaddr *)&sa, &sl); 681 int len = recvfrom (icmp4_fd, buf, sizeof (buf), MSG_TRUNC, (struct sockaddr *)&sa, &sl);
750 break; 686 break;
751 687
752 hdrlen = (iphdr->version_ihl & 15) * 4; 688 hdrlen = (iphdr->version_ihl & 15) * 4;
753 totlen = ntohs (iphdr->tot_len); 689 totlen = ntohs (iphdr->tot_len);
754 690
755 // packet corrupt?
756 if (totlen > len 691 if (totlen > len
757 || iphdr->protocol != IPPROTO_ICMP 692 || iphdr->protocol != IPPROTO_ICMP
758 || hdrlen < HDR_SIZE_IP4 || hdrlen + sizeof (PKT) != totlen) 693 || hdrlen < HDR_SIZE_IP4 || hdrlen + sizeof (PKT) != totlen)
759 continue; 694 continue;
760 695
780_recv_icmp6 (...) 715_recv_icmp6 (...)
781 CODE: 716 CODE:
782{ 717{
783 struct sockaddr_in6 sa; 718 struct sockaddr_in6 sa;
784 PKT pkt; 719 PKT pkt;
720 int maxrecv;
785 721
786 for (;;) 722 for (maxrecv = 256+1; --maxrecv; )
787 { 723 {
788 PINGER *pinger; 724 PINGER *pinger;
789 socklen_t sl = sizeof (sa); 725 socklen_t sl = sizeof (sa);
790 int len = recvfrom (icmp6_fd, &pkt, sizeof (pkt), MSG_TRUNC, (struct sockaddr *)&sa, &sl); 726 int len = recvfrom (icmp6_fd, &pkt, sizeof (pkt), MSG_TRUNC, (struct sockaddr *)&sa, &sl);
791 727
806 } 742 }
807 743
808 recv_flush (); 744 recv_flush ();
809} 745}
810 746
747void
748_new (SV *klass, UV magic1, UV magic2, UV magic3)
749 PPCODE:
750{
751 SV *pv = NEWSV (0, sizeof (PINGER));
752 PINGER *self = (PINGER *)SvPVX (pv);
753 SvPOK_only (pv);
754 XPUSHs (sv_2mortal (sv_bless (newRV_noinc (pv), gv_stashpv (SvPVutf8_nolen (klass), 1))));
755 pinger_init (self);
756 self->magic1 = magic1;
757 self->magic2 = magic2;
758 self->magic3 = magic3;
759}
760
761void
762_free (PINGER *self)
763 CODE:
764 pinger_free (self);
765
766IV
767id (PINGER *self, ...)
768 CODE:
769 RETVAL = self->id;
770 OUTPUT:
771 RETVAL
772
773void pinger_start (PINGER *self)
774
775void pinger_stop (PINGER *self)
776
777void
778_stop_id (UV id)
779 CODE:
780 if (id < pingercnt && pingers [id])
781 pinger_stop (pingers [id]);
782
783void
784interval (PINGER *self, NV interval)
785 CODE:
786 self->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL;
787
788void
789max_rtt (PINGER *self, NV maxrtt)
790 CODE:
791 self->maxrtt = maxrtt;
792
793void
794add_range (PINGER *self, SV *lo_, SV *hi_, NV interval)
795 CODE:
796{
797 STRLEN lo_len, hi_len;
798 char *lo = SvPVbyte (lo_, lo_len);
799 char *hi = SvPVbyte (hi_, hi_len);
800 RANGE *range;
801
802 if (lo_len != hi_len || (lo_len != 4 && lo_len != 16))
803 croak ("AnyEvent::FastPing::add_range address range must be specified as two binary IPv4 or IPv6 addresses");
804
805 if (lo_len == 4 && icmp4_fd < 0) croak ("IPv4 support unavailable");
806 if (lo_len == 16 && icmp6_fd < 0) croak ("IPv6 support unavailable");
807
808 if (memcmp (lo, hi, lo_len) > 0)
809 croak ("AnyEvent::FastPing::add_range called with lo > hi");
810
811 range = calloc (1, sizeof (RANGE));
812
813 range->next = 0;
814 range->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL;
815 range->addrlen = lo_len;
816
817 memcpy (sizeof (addr_tt) - lo_len + (char *)&range->lo, lo, lo_len);
818 memcpy (sizeof (addr_tt) - lo_len + (char *)&range->hi, hi, lo_len);
819
820 pinger_add_range (self, range);
821}
822
823void
824add_hosts (PINGER *self, SV *addrs, NV interval)
825 CODE:
826{
827 AV *av;
828 int i;
829 int cnt;
830 int addrlen;
831 RANGE *range;
832
833 if (!SvROK (addrs) || SvTYPE (SvRV (addrs)) != SVt_PVAV)
834 croak ("AnyEvent::FastPing::add_hosst expects an arrayref with binary IPv4 or IPv6 addresses");
835
836 av = (AV *)SvRV (addrs);
837 cnt = av_len (av) + 1;
838
839 if (!cnt)
840 XSRETURN_EMPTY;
841
842 addrlen = SvCUR (*av_fetch (av, 0, 1));
843
844 if (addrlen != 4 && addrlen != 16)
845 croak ("AnyEvent::FastPing::add_hosts addresses must be specified as binary IPv4 or IPv6 addresses");
846
847 for (i = cnt; --i; )
848 {
849 SV *sv = *av_fetch (av, i, 1);
850
851 if (!sv_utf8_downgrade (sv, 1) || addrlen != SvCUR (sv))
852 croak ("AnyEvent::FastPing::add_hosts addresses must all have the same size");
853 }
854
855 range = calloc (1, sizeof (RANGE) + cnt * addrlen);
856
857 range->next = 0;
858 range->interval = interval > MIN_INTERVAL ? interval : MIN_INTERVAL;
859 range->addrlen = addrlen;
860 range->addrcnt = cnt;
861
862 for (i = cnt; i--; )
863 memcpy ((uint8_t *)(range + 1) + (cnt - 1 - i) * addrlen,
864 SvPVbyte_nolen (*av_fetch (av, i, 1)),
865 addrlen);
866
867 pinger_add_range (self, range);
868}
869
870void
871on_recv (PINGER *self, SV *cb)
872 CODE:
873 SvREFCNT_dec (self->recvcb);
874 self->recvcb = newSVsv (cb);
875

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines