ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
Revision: 1.11
Committed: Sat May 17 19:05:51 2008 UTC (16 years, 1 month ago) by root
Branch: MAIN
Changes since 1.10: +151 -138 lines
Log Message:
implement tcp_connect

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Util - various utility functions.
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::Util;
8    
9 root 1.4 inet_aton $name, $cb->($ipn || undef);
10 root 1.1
11     =head1 DESCRIPTION
12    
13     This module implements various utility functions, mostly replacing
14     well-known functions by event-ised counterparts.
15    
16     =over 4
17    
18     =cut
19    
20     package AnyEvent::Util;
21    
22     use strict;
23    
24     no warnings "uninitialized";
25    
26 root 1.11 use Errno;
27 root 1.1 use Socket ();
28 elmex 1.10 use IO::Socket::INET ();
29 root 1.1
30     use AnyEvent;
31    
32     use base 'Exporter';
33    
34     #our @EXPORT = qw(gethostbyname gethostbyaddr);
35 root 1.4 our @EXPORT_OK = qw(inet_aton);
36 root 1.1
37     our $VERSION = '1.0';
38    
39     our $MAXPARALLEL = 16; # max. number of parallel jobs
40    
41     our $running;
42     our @queue;
43    
44     sub _schedule;
45     sub _schedule {
46     return unless @queue;
47     return if $running >= $MAXPARALLEL;
48    
49     ++$running;
50     my ($cb, $sub, @args) = @{shift @queue};
51    
52 root 1.2 if (eval { local $SIG{__DIE__}; require POSIX }) {
53 root 1.1 my $pid = open my $fh, "-|";
54    
55     if (!defined $pid) {
56     die "fork: $!";
57     } elsif (!$pid) {
58     syswrite STDOUT, join "\0", map { unpack "H*", $_ } $sub->(@args);
59     POSIX::_exit (0);
60     }
61    
62     my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
63     --$running;
64     _schedule;
65     undef $w;
66    
67     my $buf;
68     sysread $fh, $buf, 16384, length $buf;
69     $cb->(map { pack "H*", $_ } split /\0/, $buf);
70     });
71     } else {
72     $cb->($sub->(@args));
73     }
74     }
75    
76     sub _do_asy {
77     push @queue, [@_];
78     _schedule;
79     }
80    
81     sub dotted_quad($) {
82     $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
83     \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
84     \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
85     \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
86     }
87    
88     my $has_ev_adns;
89    
90     sub has_ev_adns {
91     ($has_ev_adns ||= do {
92     my $model = AnyEvent::detect;
93 root 1.7 ($model eq "AnyEvent::Impl::EV" && eval { local $SIG{__DIE__}; require EV::ADNS })
94 root 1.3 ? 2 : 1 # so that || always detects as true
95     }) - 1 # 2 => true, 1 => false
96 root 1.1 }
97    
98     =item AnyEvent::Util::inet_aton $name_or_address, $cb->($binary_address_or_undef)
99    
100     Works almost exactly like its Socket counterpart, except that it uses a
101     callback.
102    
103     =cut
104    
105     sub inet_aton {
106     my ($name, $cb) = @_;
107    
108     if (&dotted_quad) {
109     $cb->(Socket::inet_aton $name);
110 root 1.6 } elsif ($name eq "localhost") { # rfc2606 et al.
111     $cb->(v127.0.0.1);
112 root 1.4 } elsif (&has_ev_adns) {
113 root 1.5 EV::ADNS::submit ($name, &EV::ADNS::r_addr, 0, sub {
114 root 1.4 my (undef, undef, @a) = @_;
115 root 1.1 $cb->(@a ? Socket::inet_aton $a[0] : undef);
116     });
117     } else {
118     _do_asy $cb, sub { Socket::inet_aton $_[0] }, @_;
119     }
120     }
121    
122 root 1.6 =item AnyEvent::Util::fh_nonblocking $fh, $nonblocking
123    
124     Sets the blocking state of the given filehandle (true == nonblocking,
125     false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on
126     broken (i.e. windows) platforms.
127    
128     =cut
129    
130     sub fh_nonblocking($$) {
131     my ($fh, $nb) = @_;
132    
133     require Fcntl;
134    
135     if ($^O eq "MSWin32") {
136     $nb = (! ! $nb) + 0;
137     ioctl $fh, 0x8004667e, \$nb; # FIONBIO
138     } else {
139     fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
140     }
141     }
142    
143 root 1.11 sub AnyEvent::Util::Guard::DESTROY {
144     ${$_[0]}->();
145     }
146 elmex 1.8
147 root 1.11 =item $guard = AnyEvent::Util::guard { CODE }
148 elmex 1.8
149 root 1.11 This function creates a special object that, when called, will execute the
150     code block.
151 elmex 1.8
152 root 1.11 This is often handy in continuation-passing style code to clean up some
153     resource regardless of where you break out of a process.
154 elmex 1.8
155     =cut
156    
157 root 1.11 sub guard(&) {
158     bless \(my $cb = shift), AnyEvent::Util::Guard::
159 elmex 1.8 }
160    
161 root 1.11 =item my $guard = AnyEvent::Util::tcp_connect $host, $port, $connect_cb[, $prepare_cb]
162 elmex 1.10
163 root 1.11 This is a convenience function that creates a tcp socket and makes a 100%
164     non-blocking connect to the given C<$host> (which can be a hostname or a
165     textual IP address) and C<$port>.
166    
167     Unless called in void context, it returns a guard object that will
168     automatically abort connecting when it gets destroyed (it does not do
169     anything to the socket after the conenct was successful).
170    
171     If the connect is successful, then the C<$connect_cb> will be invoked with
172     the socket filehandle (in non-blocking mode) as first and the peer host
173     (as a textual IP address) and peer port as second and third arguments,
174     respectively.
175    
176     If the connect is unsuccessful, then the C<$connect_cb> will be invoked
177     without any arguments and C<$!> will be set appropriately (with C<ENXIO>
178     indicating a dns resolution failure).
179    
180     The filehandle is suitable to be plugged into L<AnyEvent::Handle>, but can
181     be used as a normal perl file handle as well.
182    
183     Sometimes you need to "prepare" the socket before connecting, for example,
184     to C<bind> it to some port, or you want a specific connect timeout that
185     is lower than your kernel's default timeout. In this case you can specify
186     a second callback, C<$prepare_cb>. It will be called with the file handle
187     in not-yet-connected state as only argument and must return the connection
188     timeout value (or C<0>, C<undef> or the empty list to indicate the default
189     timeout is to be used).
190    
191     Note that the socket could be either a IPv4 TCP socket or an IPv6 tcp
192     socket (although only IPv4 is currently supported by this module).
193    
194     Simple Example: connect to localhost on port 22.
195    
196     AnyEvent::Util::tcp_connect localhost => 22, sub {
197     my $fh = shift
198     or die "unable to connect: $!";
199     # do something
200     };
201    
202     Complex Example: connect to www.google.com on port 80 and make a simple
203     GET request without much error handling. Also limit the connection timeout
204     to 15 seconds.
205    
206     AnyEvent::Util::tcp_connect "www.google.com", 80,
207     sub {
208     my ($fh) = @_
209     or die "unable to connect: $!";
210    
211     my $handle; # avoid direct assignment so on_eof has it in scope.
212     $handle = new AnyEvent::Handle
213     fh => $fh,
214     on_eof => sub {
215     undef $handle; # keep it alive till eof
216     warn "done.\n";
217     };
218    
219     $handle->push_write ("GET / HTTP/1.0\015\012\015\012");
220    
221     $handle->push_read_line ("\015\012\015\012", sub {
222     my ($handle, $line) = @_;
223    
224     # print response header
225     print "HEADER\n$line\n\nBODY\n";
226    
227     $handle->on_read (sub {
228     # print response body
229     print $_[0]->rbuf;
230     $_[0]->rbuf = "";
231     });
232     });
233     }, sub {
234     my ($fh) = @_;
235     # could call $fh->bind etc. here
236 elmex 1.10
237 root 1.11 15
238     };
239 elmex 1.10
240    
241 root 1.11 =cut
242 elmex 1.10
243 root 1.11 sub tcp_connect($$$;$) {
244     my ($host, $port, $connect, $prepare) = @_;
245 elmex 1.10
246 root 1.11 # see http://cr.yp.to/docs/connect.html for some background
247 elmex 1.10
248 root 1.11 my $state = {};
249 elmex 1.10
250 root 1.11 # name resolution
251     inet_aton $host, sub {
252     return unless $state;
253    
254     my $ipn = shift
255     or do {
256     undef $state;
257     $! = &Errno::ENXIO;
258     return $connect->();
259     };
260    
261     # socket creation
262     socket $state->{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
263     or do {
264     undef $state;
265     return $connect->();
266     };
267    
268     fh_nonblocking $state->{fh}, 1;
269    
270     # prepare and optional timeout
271     if ($prepare) {
272     my $timeout = $prepare->($state->{fh});
273    
274     $state->{to} = AnyEvent->timer (after => $timeout, cb => sub {
275     undef $state;
276     $! = &Errno::ETIMEDOUT;
277     $connect->();
278     }) if $timeout;
279     }
280 elmex 1.10
281 root 1.11 # called when the connect was successful, which,
282     # in theory, could be the case immediately (but never is in practise)
283     my $connected = sub {
284     my $fh = delete $state->{fh};
285     undef $state;
286    
287     # we are connected, or maybe there was an error
288     if (my $sin = getpeername $fh) {
289     my ($port, $host) = Socket::unpack_sockaddr_in $sin;
290     $connect->($fh, (Socket::inet_ntoa $host), $port);
291     } else {
292     # dummy read to fetch real error code
293     sysread $fh, my $buf, 1;
294     $connect->();
295 elmex 1.10 }
296 root 1.11 };
297 elmex 1.10
298 root 1.11 # now connect
299     if (connect $state->{fh}, Socket::pack_sockaddr_in $port, $ipn) {
300     $connected->();
301     } elsif ($! == &Errno::EINPROGRESS || $! == &Errno::EWOULDBLOCK) { # EINPROGRESS is POSIX
302     $state->{ww} = AnyEvent->io (fh => $state->{fh}, poll => 'w', cb => $connected);
303 elmex 1.10 } else {
304 root 1.11 undef $state;
305     $connect->();
306 elmex 1.10 }
307 root 1.11 };
308 elmex 1.10
309 root 1.11 defined wantarray
310     ? guard { undef $state }
311     : ()
312 elmex 1.10 }
313    
314 root 1.11 =item AnyEvent::Util::tcp_server $host, $port, $accept_cb[, $prepare_cb]
315    
316     #TODO#
317 elmex 1.9
318     This will listen and accept new connections on the C<$socket> in a non-blocking
319     way. The callback C<$client_cb> will be called when a new client connection
320     was accepted and the callback C<$error_cb> will be called in case of an error.
321     C<$!> will be set to an approriate error number.
322    
323     The blocking state of C<$socket> will be set to nonblocking via C<fh_nonblocking> (see
324     above).
325    
326     The first argument to C<$client_cb> will be the socket of the accepted client
327     and the second argument the peer address.
328    
329     The return value is a guard object that you have to keep referenced as long as you
330     want to accept new connections.
331    
332     Here is an example usage:
333    
334     my $sock = IO::Socket::INET->new (
335     Listen => 5
336     ) or die "Couldn't make socket: $!\n";
337    
338     my $watchobj = AnyEvent::Util::listen ($sock, sub {
339     my ($cl_sock, $cl_addr) = @_;
340    
341     my ($port, $addr) = sockaddr_in ($cl_addr);
342     $addr = inet_ntoa ($addr);
343     print "Client connected: $addr:$port\n";
344    
345     # ...
346    
347     }, sub {
348     warn "Error on accept: $!"
349     });
350    
351     =cut
352    
353     sub listen {
354     my ($socket, $c_cb, $e_cb) = @_;
355    
356     fh_nonblocking ($socket, 1);
357    
358     my $o =
359     AnyEvent::Util::SocketHandle->new (
360     fh => $socket,
361     client_cb => $c_cb,
362     error_cb => $e_cb
363     );
364    
365     $o->listen;
366    
367     $o
368     }
369    
370 elmex 1.8 package AnyEvent::Util::SocketHandle;
371     use Errno qw/ETIMEDOUT/;
372     use Socket;
373     use Scalar::Util qw/weaken/;
374    
375     sub new {
376     my $this = shift;
377     my $class = ref($this) || $this;
378     my $self = { @_ };
379     bless $self, $class;
380    
381     return $self
382     }
383    
384     sub error {
385     my ($self) = @_;
386     delete $self->{con_w};
387 elmex 1.9 delete $self->{list_w};
388 elmex 1.8 delete $self->{tmout};
389     $self->{error_cb}->();
390     }
391    
392 elmex 1.9 sub listen {
393     my ($self) = @_;
394    
395     weaken $self;
396    
397     $self->{list_w} =
398     AnyEvent->io (poll => 'r', fh => $self->{fh}, cb => sub {
399     my ($new_sock, $paddr) = $self->{fh}->accept ();
400    
401     unless (defined $new_sock) {
402     $self->error;
403     return;
404     }
405    
406     $self->{client_cb}->($new_sock, $paddr);
407     });
408     }
409    
410 elmex 1.10 sub start_timeout {
411 elmex 1.8 my ($self) = @_;
412    
413     if (defined $self->{timeout}) {
414     $self->{tmout} =
415     AnyEvent->timer (after => $self->{timeout}, cb => sub {
416 elmex 1.10 delete $self->{tmout};
417 elmex 1.8 $! = ETIMEDOUT;
418     $self->error;
419 elmex 1.10 $self->{timed_out} = 1;
420 elmex 1.8 });
421     }
422 elmex 1.10 }
423    
424     sub connect {
425     my ($self) = @_;
426    
427     weaken $self;
428    
429     $self->start_timeout;
430 elmex 1.8
431     $self->{con_w} =
432     AnyEvent->io (poll => 'w', fh => $self->{fh}, cb => sub {
433     delete $self->{con_w};
434     delete $self->{tmout};
435    
436     if ($! = $self->{fh}->sockopt (SO_ERROR)) {
437     $self->error;
438    
439     } else {
440     $self->{connect_cb}->($self->{fh});
441     }
442     });
443     }
444    
445 root 1.1 1;
446    
447     =back
448    
449     =head1 AUTHOR
450    
451     Marc Lehmann <schmorp@schmorp.de>
452     http://home.schmorp.de/
453    
454     =cut
455