ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
Revision: 1.21
Committed: Wed May 21 14:34:03 2008 UTC (16 years, 1 month ago) by root
Branch: MAIN
Changes since 1.20: +2 -3 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Util - various utility functions.
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::Util;
8
9 =head1 DESCRIPTION
10
11 This module implements various utility functions, mostly replacing
12 well-known functions by event-ised counterparts.
13
14 All functions documented without C<AnyEvent::Util::> prefix are exported
15 by default.
16
17 =over 4
18
19 =cut
20
21 package AnyEvent::Util;
22
23 no warnings;
24 use strict;
25
26 use Carp ();
27 use Errno ();
28 use Socket ();
29 use IO::Socket::INET ();
30
31 use AnyEvent;
32
33 use base 'Exporter';
34
35 BEGIN {
36 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it
37 }
38
39 our @EXPORT = qw(inet_aton fh_nonblocking guard tcp_server tcp_connect);
40
41 our $VERSION = '1.0';
42
43 our $MAXPARALLEL = 16; # max. number of parallel jobs
44
45 our $running;
46 our @queue;
47
48 sub _schedule;
49 sub _schedule {
50 return unless @queue;
51 return if $running >= $MAXPARALLEL;
52
53 ++$running;
54 my ($cb, $sub, @args) = @{shift @queue};
55
56 if (eval { local $SIG{__DIE__}; require POSIX }) {
57 my $pid = open my $fh, "-|";
58
59 if (!defined $pid) {
60 die "fork: $!";
61 } elsif (!$pid) {
62 syswrite STDOUT, join "\0", map { unpack "H*", $_ } $sub->(@args);
63 POSIX::_exit (0);
64 }
65
66 my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
67 --$running;
68 _schedule;
69 undef $w;
70
71 my $buf;
72 sysread $fh, $buf, 16384, length $buf;
73 $cb->(map { pack "H*", $_ } split /\0/, $buf);
74 });
75 } else {
76 $cb->($sub->(@args));
77 }
78 }
79
80 sub _do_asy {
81 push @queue, [@_];
82 _schedule;
83 }
84
85 sub dotted_quad($) {
86 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
87 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
88 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
89 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
90 }
91
92 my $has_ev_adns;
93
94 sub has_ev_adns {
95 ($has_ev_adns ||= do {
96 my $model = AnyEvent::detect;
97 ($model eq "AnyEvent::Impl::EV" && eval { local $SIG{__DIE__}; require EV::ADNS })
98 ? 2 : 1 # so that || always detects as true
99 }) - 1 # 2 => true, 1 => false
100 }
101
102 =item inet_aton $name_or_address, $cb->(@addresses)
103
104 Works similarly to its Socket counterpart, except that it uses a
105 callback. Also, if a host has only an IPv6 address, this might be passed
106 to the callback instead (use the length to detect this - 4 for IPv4, 16
107 for IPv6).
108
109 This function uses various shortcuts and will fall back to either
110 L<EV::ADNS> or your systems C<inet_aton>.
111
112 Unlike the L<Socket> function, you can get multiple IP addresses as result
113 (currently only when EV::ADNS is being used).
114
115 =cut
116
117 sub inet_aton {
118 my ($name, $cb) = @_;
119
120 if (&dotted_quad) {
121 $cb->(socket_inet_aton $name);
122 } elsif ($name eq "localhost") { # rfc2606 et al.
123 $cb->(v127.0.0.1);
124 } elsif (&has_ev_adns) {
125 # work around some idiotic ands rfc readings
126 # rather hackish support for AAAA records (should
127 # wait for adns_getaddrinfo...)
128
129 my $loop = 10; # follow cname chains up to this length
130 my $qt;
131 my $acb; $acb = sub {
132 my ($status, undef, @a) = @_;
133
134 if ($status == &EV::ADNS::s_ok) {
135 if ($qt eq "a") {
136 return $cb->(map +(socket_inet_aton $_), @a);
137 } elsif ($qt eq "aaaa") {
138 return $cb->(@a);
139 } elsif ($qt eq "cname") {
140 $name = $a[0]; # there can only be one :)
141 $qt = "a";
142 return EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
143 }
144 } elsif ($status == &EV::ADNS::s_prohibitedcname) {
145 # follow cname chains
146 if ($loop--) {
147 $qt = "cname";
148 return EV::ADNS::submit ($name, &EV::ADNS::r_cname, 0, $acb);
149 }
150 } elsif ($status == &EV::ADNS::s_nodata) {
151 if ($qt eq "a") {
152 # ask for raw AAAA (might not be a good method, but adns is too broken...)
153 $qt = "aaaa";
154 return EV::ADNS::submit ($name, &EV::ADNS::r_unknown | 28, 0, $acb);
155 }
156 }
157
158 $cb->();
159 };
160
161 $qt = "a";
162 EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
163 } else {
164 _do_asy $cb, sub {
165 my $ipn = socket_inet_aton $_[0];
166 $ipn ? ($ipn) : ()
167 }, @_;
168 }
169 }
170
171 =item fh_nonblocking $fh, $nonblocking
172
173 Sets the blocking state of the given filehandle (true == nonblocking,
174 false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on
175 broken (i.e. windows) platforms.
176
177 =cut
178
179 sub fh_nonblocking($$) {
180 my ($fh, $nb) = @_;
181
182 require Fcntl;
183
184 if ($^O eq "MSWin32") {
185 $nb = (! ! $nb) + 0;
186 ioctl $fh, 0x8004667e, \$nb; # FIONBIO
187 } else {
188 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
189 }
190 }
191
192 =item $guard = guard { CODE }
193
194 This function creates a special object that, when called, will execute the
195 code block.
196
197 This is often handy in continuation-passing style code to clean up some
198 resource regardless of where you break out of a process.
199
200 =cut
201
202 sub AnyEvent::Util::Guard::DESTROY {
203 ${$_[0]}->();
204 }
205
206 sub guard(&) {
207 bless \(my $cb = shift), AnyEvent::Util::Guard::
208 }
209
210 sub _tcp_port($) {
211 $_[0] =~ /^(\d*)$/ and return $1*1;
212
213 (getservbyname $_[0], "tcp")[2]
214 or Carp::croak "$_[0]: service unknown"
215 }
216
217 =item my $guard = AnyEvent::Util::tcp_connect $host, $port, $connect_cb[, $prepare_cb]
218
219 This function is experimental.
220
221 This is a convenience function that creates a tcp socket and makes a 100%
222 non-blocking connect to the given C<$host> (which can be a hostname or a
223 textual IP address) and C<$port> (which can be a numeric port number or a
224 service name).
225
226 Unless called in void context, it returns a guard object that will
227 automatically abort connecting when it gets destroyed (it does not do
228 anything to the socket after the connect was successful).
229
230 If the connect is successful, then the C<$connect_cb> will be invoked with
231 the socket filehandle (in non-blocking mode) as first and the peer host
232 (as a textual IP address) and peer port as second and third arguments,
233 respectively.
234
235 If the connect is unsuccessful, then the C<$connect_cb> will be invoked
236 without any arguments and C<$!> will be set appropriately (with C<ENXIO>
237 indicating a dns resolution failure).
238
239 The filehandle is suitable to be plugged into L<AnyEvent::Handle>, but can
240 be used as a normal perl file handle as well.
241
242 Sometimes you need to "prepare" the socket before connecting, for example,
243 to C<bind> it to some port, or you want a specific connect timeout that
244 is lower than your kernel's default timeout. In this case you can specify
245 a second callback, C<$prepare_cb>. It will be called with the file handle
246 in not-yet-connected state as only argument and must return the connection
247 timeout value (or C<0>, C<undef> or the empty list to indicate the default
248 timeout is to be used).
249
250 Note that the socket could be either a IPv4 TCP socket or an IPv6 tcp
251 socket (although only IPv4 is currently supported by this module).
252
253 Simple Example: connect to localhost on port 22.
254
255 AnyEvent::Util::tcp_connect localhost => 22, sub {
256 my $fh = shift
257 or die "unable to connect: $!";
258 # do something
259 };
260
261 Complex Example: connect to www.google.com on port 80 and make a simple
262 GET request without much error handling. Also limit the connection timeout
263 to 15 seconds.
264
265 AnyEvent::Util::tcp_connect "www.google.com", "http",
266 sub {
267 my ($fh) = @_
268 or die "unable to connect: $!";
269
270 my $handle; # avoid direct assignment so on_eof has it in scope.
271 $handle = new AnyEvent::Handle
272 fh => $fh,
273 on_eof => sub {
274 undef $handle; # keep it alive till eof
275 warn "done.\n";
276 };
277
278 $handle->push_write ("GET / HTTP/1.0\015\012\015\012");
279
280 $handle->push_read_line ("\015\012\015\012", sub {
281 my ($handle, $line) = @_;
282
283 # print response header
284 print "HEADER\n$line\n\nBODY\n";
285
286 $handle->on_read (sub {
287 # print response body
288 print $_[0]->rbuf;
289 $_[0]->rbuf = "";
290 });
291 });
292 }, sub {
293 my ($fh) = @_;
294 # could call $fh->bind etc. here
295
296 15
297 };
298
299 =cut
300
301 sub tcp_connect($$$;$) {
302 my ($host, $port, $connect, $prepare) = @_;
303
304 # see http://cr.yp.to/docs/connect.html for some background
305
306 my %state = ( fh => undef );
307
308 # name resolution
309 inet_aton $host, sub {
310 return unless exists $state{fh};
311
312 my $ipn = shift;
313
314 4 == length $ipn
315 or do {
316 %state = ();
317 $! = &Errno::ENXIO;
318 return $connect->();
319 };
320
321 # socket creation
322 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
323 or do {
324 %state = ();
325 return $connect->();
326 };
327
328 fh_nonblocking $state{fh}, 1;
329
330 # prepare and optional timeout
331 if ($prepare) {
332 my $timeout = $prepare->($state{fh});
333
334 $state{to} = AnyEvent->timer (after => $timeout, cb => sub {
335 %state = ();
336 $! = &Errno::ETIMEDOUT;
337 $connect->();
338 }) if $timeout;
339 }
340
341 # called when the connect was successful, which,
342 # in theory, could be the case immediately (but never is in practise)
343 my $connected = sub {
344 my $fh = delete $state{fh};
345 %state = ();
346
347 # we are connected, or maybe there was an error
348 if (my $sin = getpeername $fh) {
349 my ($port, $host) = Socket::unpack_sockaddr_in $sin;
350 $connect->($fh, (Socket::inet_ntoa $host), $port);
351 } else {
352 # dummy read to fetch real error code
353 sysread $fh, my $buf, 1 if $! == &Errno::ENOTCONN;
354 $connect->();
355 }
356 };
357
358 # now connect
359 if (connect $state{fh}, Socket::pack_sockaddr_in _tcp_port $port, $ipn) {
360 $connected->();
361 } elsif ($! == &Errno::EINPROGRESS || $! == &Errno::EWOULDBLOCK) { # EINPROGRESS is POSIX
362 $state{ww} = AnyEvent->io (fh => $state{fh}, poll => 'w', cb => $connected);
363 } else {
364 %state = ();
365 $connect->();
366 }
367 };
368
369 defined wantarray
370 ? guard { %state = () } # break any circular dependencies and unregister watchers
371 : ()
372 }
373
374 =item $guard = AnyEvent::Util::tcp_server $host, $port, $accept_cb[, $prepare_cb]
375
376 This function is experimental.
377
378 Create and bind a tcp socket to the given host (any IPv4 host if undef,
379 otherwise it must be an IPv4 or IPv6 address) and port (service name or
380 numeric port number, or an ephemeral port if given as zero or undef), set
381 the SO_REUSEADDR flag and call C<listen>.
382
383 For each new connection that could be C<accept>ed, call the C<$accept_cb>
384 with the filehandle (in non-blocking mode) as first and the peer host and
385 port as second and third arguments (see C<tcp_connect> for details).
386
387 Croaks on any errors.
388
389 If called in non-void context, then this function returns a guard object
390 whose lifetime it tied to the tcp server: If the object gets destroyed,
391 the server will be stopped (but existing accepted connections will
392 continue).
393
394 If you need more control over the listening socket, you can provide a
395 C<$prepare_cb>, which is called just before the C<listen ()> call, with
396 the listen file handle as first argument.
397
398 It should return the length of the listen queue (or C<0> for the default).
399
400 Example: bind on tcp port 8888 on the local machine and tell each client
401 to go away.
402
403 AnyEvent::Util::tcp_server undef, 8888, sub {
404 my ($fh, $host, $port) = @_;
405
406 syswrite $fh, "The internet is full, $host:$port. Go away!\015\012";
407 };
408
409 =cut
410
411 sub tcp_server($$$;$) {
412 my ($host, $port, $accept, $prepare) = @_;
413
414 my %state;
415
416 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
417 or Carp::croak "socket: $!";
418
419 setsockopt $state{fh}, &Socket::SOL_SOCKET, &Socket::SO_REUSEADDR, 1
420 or Carp::croak "so_reuseaddr: $!";
421
422 bind $state{fh}, Socket::pack_sockaddr_in _tcp_port $port, socket_inet_aton ($host || "0.0.0.0")
423 or Carp::croak "bind: $!";
424
425 fh_nonblocking $state{fh}, 1;
426
427 my $len = ($prepare && $prepare->($state{fh})) || 128;
428
429 listen $state{fh}, $len
430 or Carp::croak "listen: $!";
431
432 $state{aw} = AnyEvent->io (fh => $state{fh}, poll => 'r', cb => sub {
433 # this closure keeps $state alive
434 while (my $peer = accept my $fh, $state{fh}) {
435 fh_nonblocking $fh, 1; # POSIX requires inheritance, the outside world does not
436 my ($port, $host) = Socket::unpack_sockaddr_in $peer;
437 $accept->($fh, (Socket::inet_ntoa $host), $port);
438 }
439 });
440
441 defined wantarray
442 ? guard { %state = () } # clear fh and watcher, which breaks the circular dependency
443 : ()
444 }
445
446 1;
447
448 =back
449
450 =head1 AUTHOR
451
452 Marc Lehmann <schmorp@schmorp.de>
453 http://home.schmorp.de/
454
455 =cut
456