ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
Revision: 1.15
Committed: Sat May 17 21:01:46 2008 UTC (16 years, 1 month ago) by root
Branch: MAIN
Changes since 1.14: +3 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Util - various utility functions.
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::Util;
8    
9     =head1 DESCRIPTION
10    
11     This module implements various utility functions, mostly replacing
12     well-known functions by event-ised counterparts.
13    
14 root 1.14 All functions documented without C<AnyEvent::Util::> prefix are exported
15     by default.
16    
17 root 1.1 =over 4
18    
19     =cut
20    
21     package AnyEvent::Util;
22    
23     use strict;
24    
25     no warnings "uninitialized";
26    
27 root 1.11 use Errno;
28 root 1.1 use Socket ();
29 elmex 1.10 use IO::Socket::INET ();
30 root 1.1
31     use AnyEvent;
32    
33     use base 'Exporter';
34    
35 root 1.14 our @EXPORT = qw(inet_aton fh_nonblocking guard tcp_server tcp_connect);
36 root 1.1
37     our $VERSION = '1.0';
38    
39     our $MAXPARALLEL = 16; # max. number of parallel jobs
40    
41     our $running;
42     our @queue;
43    
44     sub _schedule;
45     sub _schedule {
46     return unless @queue;
47     return if $running >= $MAXPARALLEL;
48    
49     ++$running;
50     my ($cb, $sub, @args) = @{shift @queue};
51    
52 root 1.2 if (eval { local $SIG{__DIE__}; require POSIX }) {
53 root 1.1 my $pid = open my $fh, "-|";
54    
55     if (!defined $pid) {
56     die "fork: $!";
57     } elsif (!$pid) {
58     syswrite STDOUT, join "\0", map { unpack "H*", $_ } $sub->(@args);
59     POSIX::_exit (0);
60     }
61    
62     my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
63     --$running;
64     _schedule;
65     undef $w;
66    
67     my $buf;
68     sysread $fh, $buf, 16384, length $buf;
69     $cb->(map { pack "H*", $_ } split /\0/, $buf);
70     });
71     } else {
72     $cb->($sub->(@args));
73     }
74     }
75    
76     sub _do_asy {
77     push @queue, [@_];
78     _schedule;
79     }
80    
81     sub dotted_quad($) {
82     $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
83     \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
84     \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
85     \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
86     }
87    
88     my $has_ev_adns;
89    
90     sub has_ev_adns {
91     ($has_ev_adns ||= do {
92     my $model = AnyEvent::detect;
93 root 1.7 ($model eq "AnyEvent::Impl::EV" && eval { local $SIG{__DIE__}; require EV::ADNS })
94 root 1.3 ? 2 : 1 # so that || always detects as true
95     }) - 1 # 2 => true, 1 => false
96 root 1.1 }
97    
98 root 1.14 =item inet_aton $name_or_address, $cb->($binary_address_or_undef)
99 root 1.1
100     Works almost exactly like its Socket counterpart, except that it uses a
101 root 1.14 callback. Also, if a host has only an IPv6 address, this might be passed
102     to the callback instead (use the length to detetc this - 4 for IPv4, 16
103     for IPv6).
104    
105     This function uses various shortcuts and will fall back to either
106     L<EV::ADNS> or your systems C<inet_aton>.
107 root 1.1
108     =cut
109    
110     sub inet_aton {
111     my ($name, $cb) = @_;
112    
113     if (&dotted_quad) {
114     $cb->(Socket::inet_aton $name);
115 root 1.6 } elsif ($name eq "localhost") { # rfc2606 et al.
116     $cb->(v127.0.0.1);
117 root 1.4 } elsif (&has_ev_adns) {
118 root 1.14 # work around some idiotic ands rfc readings
119     # rather hackish support for AAAA records (should
120     # wait for adns_getaddrinfo...)
121    
122     my $loop = 10; # follow cname chains up to this length
123     my $qt;
124     my $acb; $acb = sub {
125     my ($status, undef, @a) = @_;
126    
127     if ($status == &EV::ADNS::s_ok) {
128     if ($qt eq "a") {
129     return $cb->(Socket::inet_aton $a[0]);
130     } elsif ($qt eq "aaaa") {
131     return $cb->($a[0]);
132     } elsif ($qt eq "cname") {
133     $name = $a[0];
134     $qt = "a";
135     return EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
136     }
137     } elsif ($status == &EV::ADNS::s_prohibitedcname) {
138     # follow cname chains
139     if ($loop--) {
140     $qt = "cname";
141     return EV::ADNS::submit ($name, &EV::ADNS::r_cname, 0, $acb);
142     }
143     } elsif ($status == &EV::ADNS::s_nodata) {
144     if ($qt eq "a") {
145     # ask for raw AAAA (might not be a good method, but adns is too broken...)
146     $qt = "aaaa";
147     return EV::ADNS::submit ($name, &EV::ADNS::r_unknown | 28, 0, $acb);
148     }
149     }
150    
151     $cb->(undef);
152     };
153    
154     $qt = "a";
155     EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
156 root 1.1 } else {
157     _do_asy $cb, sub { Socket::inet_aton $_[0] }, @_;
158     }
159     }
160    
161 root 1.14 =item fh_nonblocking $fh, $nonblocking
162 root 1.6
163     Sets the blocking state of the given filehandle (true == nonblocking,
164     false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on
165     broken (i.e. windows) platforms.
166    
167     =cut
168    
169     sub fh_nonblocking($$) {
170     my ($fh, $nb) = @_;
171    
172     require Fcntl;
173    
174     if ($^O eq "MSWin32") {
175     $nb = (! ! $nb) + 0;
176     ioctl $fh, 0x8004667e, \$nb; # FIONBIO
177     } else {
178     fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
179     }
180     }
181    
182 root 1.14 =item $guard = guard { CODE }
183 elmex 1.8
184 root 1.11 This function creates a special object that, when called, will execute the
185     code block.
186 elmex 1.8
187 root 1.11 This is often handy in continuation-passing style code to clean up some
188     resource regardless of where you break out of a process.
189 elmex 1.8
190     =cut
191    
192 root 1.14 sub AnyEvent::Util::Guard::DESTROY {
193     ${$_[0]}->();
194     }
195    
196 root 1.11 sub guard(&) {
197     bless \(my $cb = shift), AnyEvent::Util::Guard::
198 elmex 1.8 }
199    
200 root 1.11 =item my $guard = AnyEvent::Util::tcp_connect $host, $port, $connect_cb[, $prepare_cb]
201 elmex 1.10
202 root 1.11 This is a convenience function that creates a tcp socket and makes a 100%
203     non-blocking connect to the given C<$host> (which can be a hostname or a
204     textual IP address) and C<$port>.
205    
206     Unless called in void context, it returns a guard object that will
207     automatically abort connecting when it gets destroyed (it does not do
208     anything to the socket after the conenct was successful).
209    
210     If the connect is successful, then the C<$connect_cb> will be invoked with
211     the socket filehandle (in non-blocking mode) as first and the peer host
212     (as a textual IP address) and peer port as second and third arguments,
213     respectively.
214    
215     If the connect is unsuccessful, then the C<$connect_cb> will be invoked
216     without any arguments and C<$!> will be set appropriately (with C<ENXIO>
217     indicating a dns resolution failure).
218    
219     The filehandle is suitable to be plugged into L<AnyEvent::Handle>, but can
220     be used as a normal perl file handle as well.
221    
222     Sometimes you need to "prepare" the socket before connecting, for example,
223     to C<bind> it to some port, or you want a specific connect timeout that
224     is lower than your kernel's default timeout. In this case you can specify
225     a second callback, C<$prepare_cb>. It will be called with the file handle
226     in not-yet-connected state as only argument and must return the connection
227     timeout value (or C<0>, C<undef> or the empty list to indicate the default
228     timeout is to be used).
229    
230     Note that the socket could be either a IPv4 TCP socket or an IPv6 tcp
231     socket (although only IPv4 is currently supported by this module).
232    
233     Simple Example: connect to localhost on port 22.
234    
235     AnyEvent::Util::tcp_connect localhost => 22, sub {
236     my $fh = shift
237     or die "unable to connect: $!";
238     # do something
239     };
240    
241     Complex Example: connect to www.google.com on port 80 and make a simple
242     GET request without much error handling. Also limit the connection timeout
243     to 15 seconds.
244    
245     AnyEvent::Util::tcp_connect "www.google.com", 80,
246     sub {
247     my ($fh) = @_
248     or die "unable to connect: $!";
249    
250     my $handle; # avoid direct assignment so on_eof has it in scope.
251     $handle = new AnyEvent::Handle
252     fh => $fh,
253     on_eof => sub {
254     undef $handle; # keep it alive till eof
255     warn "done.\n";
256     };
257    
258     $handle->push_write ("GET / HTTP/1.0\015\012\015\012");
259    
260     $handle->push_read_line ("\015\012\015\012", sub {
261     my ($handle, $line) = @_;
262    
263     # print response header
264     print "HEADER\n$line\n\nBODY\n";
265    
266     $handle->on_read (sub {
267     # print response body
268     print $_[0]->rbuf;
269     $_[0]->rbuf = "";
270     });
271     });
272     }, sub {
273     my ($fh) = @_;
274     # could call $fh->bind etc. here
275 elmex 1.10
276 root 1.11 15
277     };
278 elmex 1.10
279 root 1.11 =cut
280 elmex 1.10
281 root 1.11 sub tcp_connect($$$;$) {
282     my ($host, $port, $connect, $prepare) = @_;
283 elmex 1.10
284 root 1.11 # see http://cr.yp.to/docs/connect.html for some background
285 elmex 1.10
286 root 1.12 my %state = ( fh => undef );
287 elmex 1.10
288 root 1.11 # name resolution
289     inet_aton $host, sub {
290 root 1.12 return unless exists $state{fh};
291 root 1.11
292 root 1.15 my $ipn = shift;
293    
294     4 == length $ipn
295 root 1.11 or do {
296 root 1.12 %state = ();
297 root 1.11 $! = &Errno::ENXIO;
298     return $connect->();
299     };
300    
301     # socket creation
302 root 1.12 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
303 root 1.11 or do {
304 root 1.12 %state = ();
305 root 1.11 return $connect->();
306     };
307    
308 root 1.12 fh_nonblocking $state{fh}, 1;
309 root 1.11
310     # prepare and optional timeout
311     if ($prepare) {
312 root 1.12 my $timeout = $prepare->($state{fh});
313 root 1.11
314 root 1.12 $state{to} = AnyEvent->timer (after => $timeout, cb => sub {
315     %state = ();
316 root 1.11 $! = &Errno::ETIMEDOUT;
317     $connect->();
318     }) if $timeout;
319     }
320 elmex 1.10
321 root 1.11 # called when the connect was successful, which,
322     # in theory, could be the case immediately (but never is in practise)
323     my $connected = sub {
324 root 1.12 my $fh = delete $state{fh};
325     %state = ();
326 root 1.11
327     # we are connected, or maybe there was an error
328     if (my $sin = getpeername $fh) {
329     my ($port, $host) = Socket::unpack_sockaddr_in $sin;
330     $connect->($fh, (Socket::inet_ntoa $host), $port);
331     } else {
332     # dummy read to fetch real error code
333     sysread $fh, my $buf, 1;
334     $connect->();
335 elmex 1.10 }
336 root 1.11 };
337 elmex 1.10
338 root 1.11 # now connect
339 root 1.12 if (connect $state{fh}, Socket::pack_sockaddr_in $port, $ipn) {
340 root 1.11 $connected->();
341     } elsif ($! == &Errno::EINPROGRESS || $! == &Errno::EWOULDBLOCK) { # EINPROGRESS is POSIX
342 root 1.12 $state{ww} = AnyEvent->io (fh => $state{fh}, poll => 'w', cb => $connected);
343 elmex 1.10 } else {
344 root 1.12 %state = ();
345 root 1.11 $connect->();
346 elmex 1.10 }
347 root 1.11 };
348 elmex 1.10
349 root 1.11 defined wantarray
350 root 1.13 ? guard { %state = () } # break any circular dependencies and unregister watchers
351 root 1.11 : ()
352 elmex 1.10 }
353    
354 root 1.12 =item $guard = AnyEvent::Util::tcp_server $host, $port, $accept_cb[, $prepare_cb]
355 root 1.11
356 root 1.12 Create and bind a tcp socket to the given host (any IPv4 host if undef,
357     otherwise it must be an IPv4 or IPv6 address) and port (or an ephemeral
358     port if given as zero or undef), set the SO_REUSEADDR flag and call
359     C<listen>.
360 elmex 1.9
361 root 1.12 For each new connection that could be C<accept>ed, call the C<$accept_cb>
362     with the filehandle (in non-blocking mode) as first and the peer host and
363     port as second and third arguments (see C<tcp_connect> for details).
364 elmex 1.9
365 root 1.12 Croaks on any errors.
366 elmex 1.9
367 root 1.12 If called in non-void context, then this function returns a guard object
368     whose lifetime it tied to the tcp server: If the object gets destroyed,
369     the server will be stopped (but existing accepted connections will
370     continue).
371 elmex 1.9
372 root 1.12 If you need more control over the listening socket, you can provide a
373     C<$prepare_cb>, which is called just before the C<listen ()> call, with
374     the listen file handle as first argument.
375 elmex 1.9
376 root 1.12 It should return the length of the listen queue (or C<0> for the default).
377 elmex 1.9
378 root 1.12 Example: bind on tcp port 8888 on the local machine and tell each client
379     to go away.
380 elmex 1.9
381 root 1.12 AnyEvent::Util::tcp_server undef, 8888, sub {
382     my ($fh, $host, $port) = @_;
383 elmex 1.9
384 root 1.12 syswrite $fh, "The internet is full, $host:$port. Go away!\015\012";
385     };
386 elmex 1.9
387     =cut
388    
389 root 1.12 sub tcp_server($$$;$) {
390     my ($host, $port, $accept, $prepare) = @_;
391 elmex 1.9
392 root 1.12 my %state;
393 elmex 1.9
394 root 1.12 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
395     or Carp::croak "socket: $!";
396 elmex 1.9
397 root 1.12 setsockopt $state{fh}, &Socket::SOL_SOCKET, &Socket::SO_REUSEADDR, 1
398     or Carp::croak "so_reuseaddr: $!";
399 elmex 1.9
400 root 1.12 bind $state{fh}, Socket::pack_sockaddr_in $port, Socket::inet_aton ($host || "0.0.0.0")
401     or Carp::croak "bind: $!";
402 elmex 1.8
403 root 1.12 fh_nonblocking $state{fh}, 1;
404 elmex 1.8
405 root 1.12 my $len = ($prepare && $prepare->($state{fh})) || 128;
406 elmex 1.8
407 root 1.12 listen $state{fh}, $len
408     or Carp::croak "listen: $!";
409 elmex 1.9
410 root 1.12 $state{aw} = AnyEvent->io (fh => $state{fh}, poll => 'r', cb => sub {
411     # this closure keeps $state alive
412     while (my $peer = accept my $fh, $state{fh}) {
413     fh_nonblocking $fh, 1; # POSIX requires inheritance, the outside world does not
414     my ($port, $host) = Socket::unpack_sockaddr_in $peer;
415     $accept->($fh, (Socket::inet_ntoa $host), $port);
416     }
417     });
418 elmex 1.9
419 root 1.12 defined wantarray
420 root 1.13 ? guard { %state = () } # clear fh and watcher, which breaks the circular dependency
421 root 1.12 : ()
422 elmex 1.8 }
423    
424 root 1.1 1;
425    
426     =back
427    
428     =head1 AUTHOR
429    
430     Marc Lehmann <schmorp@schmorp.de>
431     http://home.schmorp.de/
432    
433     =cut
434