ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Util.pm (file contents):
Revision 1.19 by root, Tue May 20 15:13:52 2008 UTC vs.
Revision 1.27 by root, Sun May 25 01:10:54 2008 UTC

18 18
19=cut 19=cut
20 20
21package AnyEvent::Util; 21package AnyEvent::Util;
22 22
23no warnings;
23use strict; 24use strict;
24
25no warnings "uninitialized";
26 25
27use Carp (); 26use Carp ();
28use Errno (); 27use Errno ();
29use Socket (); 28use Socket ();
30use IO::Socket::INET ();
31 29
32use AnyEvent; 30use AnyEvent ();
33 31
34use base 'Exporter'; 32use base 'Exporter';
35 33
36BEGIN { 34BEGIN {
37 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it 35 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it
38} 36}
39 37
40our @EXPORT = qw(inet_aton fh_nonblocking guard tcp_server tcp_connect); 38BEGIN {
39 my $af_inet6 = eval { &Socket::AF_INET6 };
40 eval "sub AF_INET6() { $af_inet6 }"; die if $@;
41
42 delete $AnyEvent::PROTOCOL{ipv6} unless $af_inet6;
43}
44
45BEGIN {
46 # broken windows perls use undocumented error codes...
47 if ($^O =~ /mswin32/i) {
48 eval "sub WSAEAGAIN() { 10035 }";
49 } else {
50 eval "sub WSAEAGAIN() { -1e99 }"; # should never match any errno value
51 }
52}
53
54our @EXPORT = qw(fh_nonblocking guard);
55our @EXPORT_OK = qw(AF_INET6 WSAEAGAIN);
41 56
42our $VERSION = '1.0'; 57our $VERSION = '1.0';
43 58
44our $MAXPARALLEL = 16; # max. number of parallel jobs 59our $MAXPARALLEL = 16; # max. number of parallel jobs
45 60
81sub _do_asy { 96sub _do_asy {
82 push @queue, [@_]; 97 push @queue, [@_];
83 _schedule; 98 _schedule;
84} 99}
85 100
101# to be removed
86sub dotted_quad($) { 102sub dotted_quad($) {
87 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 103 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
88 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 104 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
89 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 105 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
90 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x 106 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
91} 107}
92 108
93my $has_ev_adns; 109# just a forwarder
94
95sub has_ev_adns {
96 ($has_ev_adns ||= do {
97 my $model = AnyEvent::detect;
98 ($model eq "AnyEvent::Impl::EV" && eval { local $SIG{__DIE__}; require EV::ADNS })
99 ? 2 : 1 # so that || always detects as true
100 }) - 1 # 2 => true, 1 => false
101}
102
103=item inet_aton $name_or_address, $cb->(@addresses)
104
105Works similarly to its Socket counterpart, except that it uses a
106callback. Also, if a host has only an IPv6 address, this might be passed
107to the callback instead (use the length to detect this - 4 for IPv4, 16
108for IPv6).
109
110This function uses various shortcuts and will fall back to either
111L<EV::ADNS> or your systems C<inet_aton>.
112
113Unlike the L<Socket> function, you can get multiple IP addresses as result
114(currently only when EV::ADNS is being used).
115
116=cut
117
118sub inet_aton { 110sub inet_aton {
119 my ($name, $cb) = @_; 111 require AnyEvent::Socket;
120 112 *inet_aton = \&AnyEvent::Socket::inet_aton;
121 if (&dotted_quad) { 113 goto &inet_aton
122 $cb->(socket_inet_aton $name);
123 } elsif ($name eq "localhost") { # rfc2606 et al.
124 $cb->(v127.0.0.1);
125 } elsif (&has_ev_adns) {
126 # work around some idiotic ands rfc readings
127 # rather hackish support for AAAA records (should
128 # wait for adns_getaddrinfo...)
129
130 my $loop = 10; # follow cname chains up to this length
131 my $qt;
132 my $acb; $acb = sub {
133 my ($status, undef, @a) = @_;
134
135 if ($status == &EV::ADNS::s_ok) {
136 if ($qt eq "a") {
137 return $cb->(map +(socket_inet_aton $_), @a);
138 } elsif ($qt eq "aaaa") {
139 return $cb->(@a);
140 } elsif ($qt eq "cname") {
141 $name = $a[0]; # there can only be one :)
142 $qt = "a";
143 return EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
144 }
145 } elsif ($status == &EV::ADNS::s_prohibitedcname) {
146 # follow cname chains
147 if ($loop--) {
148 $qt = "cname";
149 return EV::ADNS::submit ($name, &EV::ADNS::r_cname, 0, $acb);
150 }
151 } elsif ($status == &EV::ADNS::s_nodata) {
152 if ($qt eq "a") {
153 # ask for raw AAAA (might not be a good method, but adns is too broken...)
154 $qt = "aaaa";
155 return EV::ADNS::submit ($name, &EV::ADNS::r_unknown | 28, 0, $acb);
156 }
157 }
158
159 $cb->();
160 };
161
162 $qt = "a";
163 EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
164 } else {
165 _do_asy $cb, sub {
166 my $ipn = socket_inet_aton $_[0];
167 $ipn ? ($ipn) : ()
168 }, @_;
169 }
170} 114}
171 115
172=item fh_nonblocking $fh, $nonblocking 116=item fh_nonblocking $fh, $nonblocking
173 117
174Sets the blocking state of the given filehandle (true == nonblocking, 118Sets the blocking state of the given filehandle (true == nonblocking,
196code block. 140code block.
197 141
198This is often handy in continuation-passing style code to clean up some 142This is often handy in continuation-passing style code to clean up some
199resource regardless of where you break out of a process. 143resource regardless of where you break out of a process.
200 144
145You can call one method on the returned object:
146
147=item $guard->cancel
148
149This simply causes the code block not to be invoked: it "cancels" the
150guard.
151
201=cut 152=cut
202 153
203sub AnyEvent::Util::Guard::DESTROY { 154sub AnyEvent::Util::Guard::DESTROY {
204 ${$_[0]}->(); 155 ${$_[0]}->();
205} 156}
206 157
158sub AnyEvent::Util::Guard::cancel($) {
159 ${$_[0]} = sub { };
160}
161
207sub guard(&) { 162sub guard(&) {
208 bless \(my $cb = shift), AnyEvent::Util::Guard:: 163 bless \(my $cb = shift), AnyEvent::Util::Guard::
209}
210
211sub _tcp_port($) {
212 $_[0] =~ /^\d*$/ and return $1*1;
213
214 (getservbyname $_[0], "tcp")[2]
215 or Carp::croak "$_[0]: service unknown"
216}
217
218=item my $guard = AnyEvent::Util::tcp_connect $host, $port, $connect_cb[, $prepare_cb]
219
220This function is experimental.
221
222This is a convenience function that creates a tcp socket and makes a 100%
223non-blocking connect to the given C<$host> (which can be a hostname or a
224textual IP address) and C<$port> (which can be a numeric port number or a
225service name).
226
227Unless called in void context, it returns a guard object that will
228automatically abort connecting when it gets destroyed (it does not do
229anything to the socket after the connect was successful).
230
231If the connect is successful, then the C<$connect_cb> will be invoked with
232the socket filehandle (in non-blocking mode) as first and the peer host
233(as a textual IP address) and peer port as second and third arguments,
234respectively.
235
236If the connect is unsuccessful, then the C<$connect_cb> will be invoked
237without any arguments and C<$!> will be set appropriately (with C<ENXIO>
238indicating a dns resolution failure).
239
240The filehandle is suitable to be plugged into L<AnyEvent::Handle>, but can
241be used as a normal perl file handle as well.
242
243Sometimes you need to "prepare" the socket before connecting, for example,
244to C<bind> it to some port, or you want a specific connect timeout that
245is lower than your kernel's default timeout. In this case you can specify
246a second callback, C<$prepare_cb>. It will be called with the file handle
247in not-yet-connected state as only argument and must return the connection
248timeout value (or C<0>, C<undef> or the empty list to indicate the default
249timeout is to be used).
250
251Note that the socket could be either a IPv4 TCP socket or an IPv6 tcp
252socket (although only IPv4 is currently supported by this module).
253
254Simple Example: connect to localhost on port 22.
255
256 AnyEvent::Util::tcp_connect localhost => 22, sub {
257 my $fh = shift
258 or die "unable to connect: $!";
259 # do something
260 };
261
262Complex Example: connect to www.google.com on port 80 and make a simple
263GET request without much error handling. Also limit the connection timeout
264to 15 seconds.
265
266 AnyEvent::Util::tcp_connect "www.google.com", "http",
267 sub {
268 my ($fh) = @_
269 or die "unable to connect: $!";
270
271 my $handle; # avoid direct assignment so on_eof has it in scope.
272 $handle = new AnyEvent::Handle
273 fh => $fh,
274 on_eof => sub {
275 undef $handle; # keep it alive till eof
276 warn "done.\n";
277 };
278
279 $handle->push_write ("GET / HTTP/1.0\015\012\015\012");
280
281 $handle->push_read_line ("\015\012\015\012", sub {
282 my ($handle, $line) = @_;
283
284 # print response header
285 print "HEADER\n$line\n\nBODY\n";
286
287 $handle->on_read (sub {
288 # print response body
289 print $_[0]->rbuf;
290 $_[0]->rbuf = "";
291 });
292 });
293 }, sub {
294 my ($fh) = @_;
295 # could call $fh->bind etc. here
296
297 15
298 };
299
300=cut
301
302sub tcp_connect($$$;$) {
303 my ($host, $port, $connect, $prepare) = @_;
304
305 # see http://cr.yp.to/docs/connect.html for some background
306
307 my %state = ( fh => undef );
308
309 # name resolution
310 inet_aton $host, sub {
311 return unless exists $state{fh};
312
313 my $ipn = shift;
314
315 4 == length $ipn
316 or do {
317 %state = ();
318 $! = &Errno::ENXIO;
319 return $connect->();
320 };
321
322 # socket creation
323 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
324 or do {
325 %state = ();
326 return $connect->();
327 };
328
329 fh_nonblocking $state{fh}, 1;
330
331 # prepare and optional timeout
332 if ($prepare) {
333 my $timeout = $prepare->($state{fh});
334
335 $state{to} = AnyEvent->timer (after => $timeout, cb => sub {
336 %state = ();
337 $! = &Errno::ETIMEDOUT;
338 $connect->();
339 }) if $timeout;
340 }
341
342 # called when the connect was successful, which,
343 # in theory, could be the case immediately (but never is in practise)
344 my $connected = sub {
345 my $fh = delete $state{fh};
346 %state = ();
347
348 # we are connected, or maybe there was an error
349 if (my $sin = getpeername $fh) {
350 my ($port, $host) = Socket::unpack_sockaddr_in $sin;
351 $connect->($fh, (Socket::inet_ntoa $host), $port);
352 } else {
353 # dummy read to fetch real error code
354 sysread $fh, my $buf, 1 if $! == &Errno::ENOTCONN;
355 $connect->();
356 }
357 };
358
359 # now connect
360 if (connect $state{fh}, Socket::pack_sockaddr_in _tcp_port $port, $ipn) {
361 $connected->();
362 } elsif ($! == &Errno::EINPROGRESS || $! == &Errno::EWOULDBLOCK) { # EINPROGRESS is POSIX
363 $state{ww} = AnyEvent->io (fh => $state{fh}, poll => 'w', cb => $connected);
364 } else {
365 %state = ();
366 $connect->();
367 }
368 };
369
370 defined wantarray
371 ? guard { %state = () } # break any circular dependencies and unregister watchers
372 : ()
373}
374
375=item $guard = AnyEvent::Util::tcp_server $host, $port, $accept_cb[, $prepare_cb]
376
377This function is experimental.
378
379Create and bind a tcp socket to the given host (any IPv4 host if undef,
380otherwise it must be an IPv4 or IPv6 address) and port (service name or
381numeric port number, or an ephemeral port if given as zero or undef, so
382you cnanot bind to tcp port zero), set the SO_REUSEADDR flag and call
383C<listen>.
384
385For each new connection that could be C<accept>ed, call the C<$accept_cb>
386with the filehandle (in non-blocking mode) as first and the peer host and
387port as second and third arguments (see C<tcp_connect> for details).
388
389Croaks on any errors.
390
391If called in non-void context, then this function returns a guard object
392whose lifetime it tied to the tcp server: If the object gets destroyed,
393the server will be stopped (but existing accepted connections will
394continue).
395
396If you need more control over the listening socket, you can provide a
397C<$prepare_cb>, which is called just before the C<listen ()> call, with
398the listen file handle as first argument.
399
400It should return the length of the listen queue (or C<0> for the default).
401
402Example: bind on tcp port 8888 on the local machine and tell each client
403to go away.
404
405 AnyEvent::Util::tcp_server undef, 8888, sub {
406 my ($fh, $host, $port) = @_;
407
408 syswrite $fh, "The internet is full, $host:$port. Go away!\015\012";
409 };
410
411=cut
412
413sub tcp_server($$$;$) {
414 my ($host, $port, $accept, $prepare) = @_;
415
416 my %state;
417
418 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
419 or Carp::croak "socket: $!";
420
421 setsockopt $state{fh}, &Socket::SOL_SOCKET, &Socket::SO_REUSEADDR, 1
422 or Carp::croak "so_reuseaddr: $!";
423
424 bind $state{fh}, Socket::pack_sockaddr_in _tcp_port $port, socket_inet_aton ($host || "0.0.0.0")
425 or Carp::croak "bind: $!";
426
427 fh_nonblocking $state{fh}, 1;
428
429 my $len = ($prepare && $prepare->($state{fh})) || 128;
430
431 listen $state{fh}, $len
432 or Carp::croak "listen: $!";
433
434 $state{aw} = AnyEvent->io (fh => $state{fh}, poll => 'r', cb => sub {
435 # this closure keeps $state alive
436 while (my $peer = accept my $fh, $state{fh}) {
437 fh_nonblocking $fh, 1; # POSIX requires inheritance, the outside world does not
438 my ($port, $host) = Socket::unpack_sockaddr_in $peer;
439 $accept->($fh, (Socket::inet_ntoa $host), $port);
440 }
441 });
442
443 defined wantarray
444 ? guard { %state = () } # clear fh and watcher, which breaks the circular dependency
445 : ()
446} 164}
447 165
4481; 1661;
449 167
450=back 168=back

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines