ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Util.pm (file contents):
Revision 1.16 by root, Sat May 17 21:34:15 2008 UTC vs.
Revision 1.27 by root, Sun May 25 01:10:54 2008 UTC

18 18
19=cut 19=cut
20 20
21package AnyEvent::Util; 21package AnyEvent::Util;
22 22
23no warnings;
23use strict; 24use strict;
24 25
25no warnings "uninitialized"; 26use Carp ();
27use Errno ();
28use Socket ();
26 29
27use Errno;
28use Socket ();
29use IO::Socket::INET ();
30
31use AnyEvent; 30use AnyEvent ();
32 31
33use base 'Exporter'; 32use base 'Exporter';
34 33
35our @EXPORT = qw(inet_aton fh_nonblocking guard tcp_server tcp_connect); 34BEGIN {
35 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it
36}
37
38BEGIN {
39 my $af_inet6 = eval { &Socket::AF_INET6 };
40 eval "sub AF_INET6() { $af_inet6 }"; die if $@;
41
42 delete $AnyEvent::PROTOCOL{ipv6} unless $af_inet6;
43}
44
45BEGIN {
46 # broken windows perls use undocumented error codes...
47 if ($^O =~ /mswin32/i) {
48 eval "sub WSAEAGAIN() { 10035 }";
49 } else {
50 eval "sub WSAEAGAIN() { -1e99 }"; # should never match any errno value
51 }
52}
53
54our @EXPORT = qw(fh_nonblocking guard);
55our @EXPORT_OK = qw(AF_INET6 WSAEAGAIN);
36 56
37our $VERSION = '1.0'; 57our $VERSION = '1.0';
38 58
39our $MAXPARALLEL = 16; # max. number of parallel jobs 59our $MAXPARALLEL = 16; # max. number of parallel jobs
40 60
76sub _do_asy { 96sub _do_asy {
77 push @queue, [@_]; 97 push @queue, [@_];
78 _schedule; 98 _schedule;
79} 99}
80 100
101# to be removed
81sub dotted_quad($) { 102sub dotted_quad($) {
82 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 103 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
83 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 104 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
84 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 105 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
85 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x 106 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
86} 107}
87 108
88my $has_ev_adns; 109# just a forwarder
89
90sub has_ev_adns {
91 ($has_ev_adns ||= do {
92 my $model = AnyEvent::detect;
93 ($model eq "AnyEvent::Impl::EV" && eval { local $SIG{__DIE__}; require EV::ADNS })
94 ? 2 : 1 # so that || always detects as true
95 }) - 1 # 2 => true, 1 => false
96}
97
98=item inet_aton $name_or_address, $cb->($binary_address_or_undef)
99
100Works almost exactly like its Socket counterpart, except that it uses a
101callback. Also, if a host has only an IPv6 address, this might be passed
102to the callback instead (use the length to detetc this - 4 for IPv4, 16
103for IPv6).
104
105This function uses various shortcuts and will fall back to either
106L<EV::ADNS> or your systems C<inet_aton>.
107
108=cut
109
110sub inet_aton { 110sub inet_aton {
111 my ($name, $cb) = @_; 111 require AnyEvent::Socket;
112 112 *inet_aton = \&AnyEvent::Socket::inet_aton;
113 if (&dotted_quad) { 113 goto &inet_aton
114 $cb->(Socket::inet_aton $name);
115 } elsif ($name eq "localhost") { # rfc2606 et al.
116 $cb->(v127.0.0.1);
117 } elsif (&has_ev_adns) {
118 # work around some idiotic ands rfc readings
119 # rather hackish support for AAAA records (should
120 # wait for adns_getaddrinfo...)
121
122 my $loop = 10; # follow cname chains up to this length
123 my $qt;
124 my $acb; $acb = sub {
125 my ($status, undef, @a) = @_;
126
127 if ($status == &EV::ADNS::s_ok) {
128 if ($qt eq "a") {
129 return $cb->(Socket::inet_aton $a[0]);
130 } elsif ($qt eq "aaaa") {
131 return $cb->($a[0]);
132 } elsif ($qt eq "cname") {
133 $name = $a[0];
134 $qt = "a";
135 return EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
136 }
137 } elsif ($status == &EV::ADNS::s_prohibitedcname) {
138 # follow cname chains
139 if ($loop--) {
140 $qt = "cname";
141 return EV::ADNS::submit ($name, &EV::ADNS::r_cname, 0, $acb);
142 }
143 } elsif ($status == &EV::ADNS::s_nodata) {
144 if ($qt eq "a") {
145 # ask for raw AAAA (might not be a good method, but adns is too broken...)
146 $qt = "aaaa";
147 return EV::ADNS::submit ($name, &EV::ADNS::r_unknown | 28, 0, $acb);
148 }
149 }
150
151 $cb->(undef);
152 };
153
154 $qt = "a";
155 EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
156 } else {
157 _do_asy $cb, sub { Socket::inet_aton $_[0] }, @_;
158 }
159} 114}
160 115
161=item fh_nonblocking $fh, $nonblocking 116=item fh_nonblocking $fh, $nonblocking
162 117
163Sets the blocking state of the given filehandle (true == nonblocking, 118Sets the blocking state of the given filehandle (true == nonblocking,
185code block. 140code block.
186 141
187This is often handy in continuation-passing style code to clean up some 142This is often handy in continuation-passing style code to clean up some
188resource regardless of where you break out of a process. 143resource regardless of where you break out of a process.
189 144
145You can call one method on the returned object:
146
147=item $guard->cancel
148
149This simply causes the code block not to be invoked: it "cancels" the
150guard.
151
190=cut 152=cut
191 153
192sub AnyEvent::Util::Guard::DESTROY { 154sub AnyEvent::Util::Guard::DESTROY {
193 ${$_[0]}->(); 155 ${$_[0]}->();
194} 156}
195 157
158sub AnyEvent::Util::Guard::cancel($) {
159 ${$_[0]} = sub { };
160}
161
196sub guard(&) { 162sub guard(&) {
197 bless \(my $cb = shift), AnyEvent::Util::Guard:: 163 bless \(my $cb = shift), AnyEvent::Util::Guard::
198}
199
200=item my $guard = AnyEvent::Util::tcp_connect $host, $port, $connect_cb[, $prepare_cb]
201
202This function is experimental.
203
204This is a convenience function that creates a tcp socket and makes a 100%
205non-blocking connect to the given C<$host> (which can be a hostname or a
206textual IP address) and C<$port>.
207
208Unless called in void context, it returns a guard object that will
209automatically abort connecting when it gets destroyed (it does not do
210anything to the socket after the conenct was successful).
211
212If the connect is successful, then the C<$connect_cb> will be invoked with
213the socket filehandle (in non-blocking mode) as first and the peer host
214(as a textual IP address) and peer port as second and third arguments,
215respectively.
216
217If the connect is unsuccessful, then the C<$connect_cb> will be invoked
218without any arguments and C<$!> will be set appropriately (with C<ENXIO>
219indicating a dns resolution failure).
220
221The filehandle is suitable to be plugged into L<AnyEvent::Handle>, but can
222be used as a normal perl file handle as well.
223
224Sometimes you need to "prepare" the socket before connecting, for example,
225to C<bind> it to some port, or you want a specific connect timeout that
226is lower than your kernel's default timeout. In this case you can specify
227a second callback, C<$prepare_cb>. It will be called with the file handle
228in not-yet-connected state as only argument and must return the connection
229timeout value (or C<0>, C<undef> or the empty list to indicate the default
230timeout is to be used).
231
232Note that the socket could be either a IPv4 TCP socket or an IPv6 tcp
233socket (although only IPv4 is currently supported by this module).
234
235Simple Example: connect to localhost on port 22.
236
237 AnyEvent::Util::tcp_connect localhost => 22, sub {
238 my $fh = shift
239 or die "unable to connect: $!";
240 # do something
241 };
242
243Complex Example: connect to www.google.com on port 80 and make a simple
244GET request without much error handling. Also limit the connection timeout
245to 15 seconds.
246
247 AnyEvent::Util::tcp_connect "www.google.com", 80,
248 sub {
249 my ($fh) = @_
250 or die "unable to connect: $!";
251
252 my $handle; # avoid direct assignment so on_eof has it in scope.
253 $handle = new AnyEvent::Handle
254 fh => $fh,
255 on_eof => sub {
256 undef $handle; # keep it alive till eof
257 warn "done.\n";
258 };
259
260 $handle->push_write ("GET / HTTP/1.0\015\012\015\012");
261
262 $handle->push_read_line ("\015\012\015\012", sub {
263 my ($handle, $line) = @_;
264
265 # print response header
266 print "HEADER\n$line\n\nBODY\n";
267
268 $handle->on_read (sub {
269 # print response body
270 print $_[0]->rbuf;
271 $_[0]->rbuf = "";
272 });
273 });
274 }, sub {
275 my ($fh) = @_;
276 # could call $fh->bind etc. here
277
278 15
279 };
280
281=cut
282
283sub tcp_connect($$$;$) {
284 my ($host, $port, $connect, $prepare) = @_;
285
286 # see http://cr.yp.to/docs/connect.html for some background
287
288 my %state = ( fh => undef );
289
290 # name resolution
291 inet_aton $host, sub {
292 return unless exists $state{fh};
293
294 my $ipn = shift;
295
296 4 == length $ipn
297 or do {
298 %state = ();
299 $! = &Errno::ENXIO;
300 return $connect->();
301 };
302
303 # socket creation
304 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
305 or do {
306 %state = ();
307 return $connect->();
308 };
309
310 fh_nonblocking $state{fh}, 1;
311
312 # prepare and optional timeout
313 if ($prepare) {
314 my $timeout = $prepare->($state{fh});
315
316 $state{to} = AnyEvent->timer (after => $timeout, cb => sub {
317 %state = ();
318 $! = &Errno::ETIMEDOUT;
319 $connect->();
320 }) if $timeout;
321 }
322
323 # called when the connect was successful, which,
324 # in theory, could be the case immediately (but never is in practise)
325 my $connected = sub {
326 my $fh = delete $state{fh};
327 %state = ();
328
329 # we are connected, or maybe there was an error
330 if (my $sin = getpeername $fh) {
331 my ($port, $host) = Socket::unpack_sockaddr_in $sin;
332 $connect->($fh, (Socket::inet_ntoa $host), $port);
333 } else {
334 # dummy read to fetch real error code
335 sysread $fh, my $buf, 1;
336 $connect->();
337 }
338 };
339
340 # now connect
341 if (connect $state{fh}, Socket::pack_sockaddr_in $port, $ipn) {
342 $connected->();
343 } elsif ($! == &Errno::EINPROGRESS || $! == &Errno::EWOULDBLOCK) { # EINPROGRESS is POSIX
344 $state{ww} = AnyEvent->io (fh => $state{fh}, poll => 'w', cb => $connected);
345 } else {
346 %state = ();
347 $connect->();
348 }
349 };
350
351 defined wantarray
352 ? guard { %state = () } # break any circular dependencies and unregister watchers
353 : ()
354}
355
356=item $guard = AnyEvent::Util::tcp_server $host, $port, $accept_cb[, $prepare_cb]
357
358This function is experimental.
359
360Create and bind a tcp socket to the given host (any IPv4 host if undef,
361otherwise it must be an IPv4 or IPv6 address) and port (or an ephemeral
362port if given as zero or undef), set the SO_REUSEADDR flag and call
363C<listen>.
364
365For each new connection that could be C<accept>ed, call the C<$accept_cb>
366with the filehandle (in non-blocking mode) as first and the peer host and
367port as second and third arguments (see C<tcp_connect> for details).
368
369Croaks on any errors.
370
371If called in non-void context, then this function returns a guard object
372whose lifetime it tied to the tcp server: If the object gets destroyed,
373the server will be stopped (but existing accepted connections will
374continue).
375
376If you need more control over the listening socket, you can provide a
377C<$prepare_cb>, which is called just before the C<listen ()> call, with
378the listen file handle as first argument.
379
380It should return the length of the listen queue (or C<0> for the default).
381
382Example: bind on tcp port 8888 on the local machine and tell each client
383to go away.
384
385 AnyEvent::Util::tcp_server undef, 8888, sub {
386 my ($fh, $host, $port) = @_;
387
388 syswrite $fh, "The internet is full, $host:$port. Go away!\015\012";
389 };
390
391=cut
392
393sub tcp_server($$$;$) {
394 my ($host, $port, $accept, $prepare) = @_;
395
396 my %state;
397
398 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
399 or Carp::croak "socket: $!";
400
401 setsockopt $state{fh}, &Socket::SOL_SOCKET, &Socket::SO_REUSEADDR, 1
402 or Carp::croak "so_reuseaddr: $!";
403
404 bind $state{fh}, Socket::pack_sockaddr_in $port, Socket::inet_aton ($host || "0.0.0.0")
405 or Carp::croak "bind: $!";
406
407 fh_nonblocking $state{fh}, 1;
408
409 my $len = ($prepare && $prepare->($state{fh})) || 128;
410
411 listen $state{fh}, $len
412 or Carp::croak "listen: $!";
413
414 $state{aw} = AnyEvent->io (fh => $state{fh}, poll => 'r', cb => sub {
415 # this closure keeps $state alive
416 while (my $peer = accept my $fh, $state{fh}) {
417 fh_nonblocking $fh, 1; # POSIX requires inheritance, the outside world does not
418 my ($port, $host) = Socket::unpack_sockaddr_in $peer;
419 $accept->($fh, (Socket::inet_ntoa $host), $port);
420 }
421 });
422
423 defined wantarray
424 ? guard { %state = () } # clear fh and watcher, which breaks the circular dependency
425 : ()
426} 164}
427 165
4281; 1661;
429 167
430=back 168=back

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines