ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
Revision: 1.20
Committed: Tue May 20 15:17:46 2008 UTC (16 years, 1 month ago) by root
Branch: MAIN
Changes since 1.19: +2 -3 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Util - various utility functions.
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::Util;
8
9 =head1 DESCRIPTION
10
11 This module implements various utility functions, mostly replacing
12 well-known functions by event-ised counterparts.
13
14 All functions documented without C<AnyEvent::Util::> prefix are exported
15 by default.
16
17 =over 4
18
19 =cut
20
21 package AnyEvent::Util;
22
23 use strict;
24
25 no warnings "uninitialized";
26
27 use Carp ();
28 use Errno ();
29 use Socket ();
30 use IO::Socket::INET ();
31
32 use AnyEvent;
33
34 use base 'Exporter';
35
36 BEGIN {
37 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it
38 }
39
40 our @EXPORT = qw(inet_aton fh_nonblocking guard tcp_server tcp_connect);
41
42 our $VERSION = '1.0';
43
44 our $MAXPARALLEL = 16; # max. number of parallel jobs
45
46 our $running;
47 our @queue;
48
49 sub _schedule;
50 sub _schedule {
51 return unless @queue;
52 return if $running >= $MAXPARALLEL;
53
54 ++$running;
55 my ($cb, $sub, @args) = @{shift @queue};
56
57 if (eval { local $SIG{__DIE__}; require POSIX }) {
58 my $pid = open my $fh, "-|";
59
60 if (!defined $pid) {
61 die "fork: $!";
62 } elsif (!$pid) {
63 syswrite STDOUT, join "\0", map { unpack "H*", $_ } $sub->(@args);
64 POSIX::_exit (0);
65 }
66
67 my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
68 --$running;
69 _schedule;
70 undef $w;
71
72 my $buf;
73 sysread $fh, $buf, 16384, length $buf;
74 $cb->(map { pack "H*", $_ } split /\0/, $buf);
75 });
76 } else {
77 $cb->($sub->(@args));
78 }
79 }
80
81 sub _do_asy {
82 push @queue, [@_];
83 _schedule;
84 }
85
86 sub dotted_quad($) {
87 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
88 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
89 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
90 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
91 }
92
93 my $has_ev_adns;
94
95 sub has_ev_adns {
96 ($has_ev_adns ||= do {
97 my $model = AnyEvent::detect;
98 ($model eq "AnyEvent::Impl::EV" && eval { local $SIG{__DIE__}; require EV::ADNS })
99 ? 2 : 1 # so that || always detects as true
100 }) - 1 # 2 => true, 1 => false
101 }
102
103 =item inet_aton $name_or_address, $cb->(@addresses)
104
105 Works similarly to its Socket counterpart, except that it uses a
106 callback. Also, if a host has only an IPv6 address, this might be passed
107 to the callback instead (use the length to detect this - 4 for IPv4, 16
108 for IPv6).
109
110 This function uses various shortcuts and will fall back to either
111 L<EV::ADNS> or your systems C<inet_aton>.
112
113 Unlike the L<Socket> function, you can get multiple IP addresses as result
114 (currently only when EV::ADNS is being used).
115
116 =cut
117
118 sub inet_aton {
119 my ($name, $cb) = @_;
120
121 if (&dotted_quad) {
122 $cb->(socket_inet_aton $name);
123 } elsif ($name eq "localhost") { # rfc2606 et al.
124 $cb->(v127.0.0.1);
125 } elsif (&has_ev_adns) {
126 # work around some idiotic ands rfc readings
127 # rather hackish support for AAAA records (should
128 # wait for adns_getaddrinfo...)
129
130 my $loop = 10; # follow cname chains up to this length
131 my $qt;
132 my $acb; $acb = sub {
133 my ($status, undef, @a) = @_;
134
135 if ($status == &EV::ADNS::s_ok) {
136 if ($qt eq "a") {
137 return $cb->(map +(socket_inet_aton $_), @a);
138 } elsif ($qt eq "aaaa") {
139 return $cb->(@a);
140 } elsif ($qt eq "cname") {
141 $name = $a[0]; # there can only be one :)
142 $qt = "a";
143 return EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
144 }
145 } elsif ($status == &EV::ADNS::s_prohibitedcname) {
146 # follow cname chains
147 if ($loop--) {
148 $qt = "cname";
149 return EV::ADNS::submit ($name, &EV::ADNS::r_cname, 0, $acb);
150 }
151 } elsif ($status == &EV::ADNS::s_nodata) {
152 if ($qt eq "a") {
153 # ask for raw AAAA (might not be a good method, but adns is too broken...)
154 $qt = "aaaa";
155 return EV::ADNS::submit ($name, &EV::ADNS::r_unknown | 28, 0, $acb);
156 }
157 }
158
159 $cb->();
160 };
161
162 $qt = "a";
163 EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
164 } else {
165 _do_asy $cb, sub {
166 my $ipn = socket_inet_aton $_[0];
167 $ipn ? ($ipn) : ()
168 }, @_;
169 }
170 }
171
172 =item fh_nonblocking $fh, $nonblocking
173
174 Sets the blocking state of the given filehandle (true == nonblocking,
175 false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on
176 broken (i.e. windows) platforms.
177
178 =cut
179
180 sub fh_nonblocking($$) {
181 my ($fh, $nb) = @_;
182
183 require Fcntl;
184
185 if ($^O eq "MSWin32") {
186 $nb = (! ! $nb) + 0;
187 ioctl $fh, 0x8004667e, \$nb; # FIONBIO
188 } else {
189 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
190 }
191 }
192
193 =item $guard = guard { CODE }
194
195 This function creates a special object that, when called, will execute the
196 code block.
197
198 This is often handy in continuation-passing style code to clean up some
199 resource regardless of where you break out of a process.
200
201 =cut
202
203 sub AnyEvent::Util::Guard::DESTROY {
204 ${$_[0]}->();
205 }
206
207 sub guard(&) {
208 bless \(my $cb = shift), AnyEvent::Util::Guard::
209 }
210
211 sub _tcp_port($) {
212 $_[0] =~ /^\d*$/ and return $1*1;
213
214 (getservbyname $_[0], "tcp")[2]
215 or Carp::croak "$_[0]: service unknown"
216 }
217
218 =item my $guard = AnyEvent::Util::tcp_connect $host, $port, $connect_cb[, $prepare_cb]
219
220 This function is experimental.
221
222 This is a convenience function that creates a tcp socket and makes a 100%
223 non-blocking connect to the given C<$host> (which can be a hostname or a
224 textual IP address) and C<$port> (which can be a numeric port number or a
225 service name).
226
227 Unless called in void context, it returns a guard object that will
228 automatically abort connecting when it gets destroyed (it does not do
229 anything to the socket after the connect was successful).
230
231 If the connect is successful, then the C<$connect_cb> will be invoked with
232 the socket filehandle (in non-blocking mode) as first and the peer host
233 (as a textual IP address) and peer port as second and third arguments,
234 respectively.
235
236 If the connect is unsuccessful, then the C<$connect_cb> will be invoked
237 without any arguments and C<$!> will be set appropriately (with C<ENXIO>
238 indicating a dns resolution failure).
239
240 The filehandle is suitable to be plugged into L<AnyEvent::Handle>, but can
241 be used as a normal perl file handle as well.
242
243 Sometimes you need to "prepare" the socket before connecting, for example,
244 to C<bind> it to some port, or you want a specific connect timeout that
245 is lower than your kernel's default timeout. In this case you can specify
246 a second callback, C<$prepare_cb>. It will be called with the file handle
247 in not-yet-connected state as only argument and must return the connection
248 timeout value (or C<0>, C<undef> or the empty list to indicate the default
249 timeout is to be used).
250
251 Note that the socket could be either a IPv4 TCP socket or an IPv6 tcp
252 socket (although only IPv4 is currently supported by this module).
253
254 Simple Example: connect to localhost on port 22.
255
256 AnyEvent::Util::tcp_connect localhost => 22, sub {
257 my $fh = shift
258 or die "unable to connect: $!";
259 # do something
260 };
261
262 Complex Example: connect to www.google.com on port 80 and make a simple
263 GET request without much error handling. Also limit the connection timeout
264 to 15 seconds.
265
266 AnyEvent::Util::tcp_connect "www.google.com", "http",
267 sub {
268 my ($fh) = @_
269 or die "unable to connect: $!";
270
271 my $handle; # avoid direct assignment so on_eof has it in scope.
272 $handle = new AnyEvent::Handle
273 fh => $fh,
274 on_eof => sub {
275 undef $handle; # keep it alive till eof
276 warn "done.\n";
277 };
278
279 $handle->push_write ("GET / HTTP/1.0\015\012\015\012");
280
281 $handle->push_read_line ("\015\012\015\012", sub {
282 my ($handle, $line) = @_;
283
284 # print response header
285 print "HEADER\n$line\n\nBODY\n";
286
287 $handle->on_read (sub {
288 # print response body
289 print $_[0]->rbuf;
290 $_[0]->rbuf = "";
291 });
292 });
293 }, sub {
294 my ($fh) = @_;
295 # could call $fh->bind etc. here
296
297 15
298 };
299
300 =cut
301
302 sub tcp_connect($$$;$) {
303 my ($host, $port, $connect, $prepare) = @_;
304
305 # see http://cr.yp.to/docs/connect.html for some background
306
307 my %state = ( fh => undef );
308
309 # name resolution
310 inet_aton $host, sub {
311 return unless exists $state{fh};
312
313 my $ipn = shift;
314
315 4 == length $ipn
316 or do {
317 %state = ();
318 $! = &Errno::ENXIO;
319 return $connect->();
320 };
321
322 # socket creation
323 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
324 or do {
325 %state = ();
326 return $connect->();
327 };
328
329 fh_nonblocking $state{fh}, 1;
330
331 # prepare and optional timeout
332 if ($prepare) {
333 my $timeout = $prepare->($state{fh});
334
335 $state{to} = AnyEvent->timer (after => $timeout, cb => sub {
336 %state = ();
337 $! = &Errno::ETIMEDOUT;
338 $connect->();
339 }) if $timeout;
340 }
341
342 # called when the connect was successful, which,
343 # in theory, could be the case immediately (but never is in practise)
344 my $connected = sub {
345 my $fh = delete $state{fh};
346 %state = ();
347
348 # we are connected, or maybe there was an error
349 if (my $sin = getpeername $fh) {
350 my ($port, $host) = Socket::unpack_sockaddr_in $sin;
351 $connect->($fh, (Socket::inet_ntoa $host), $port);
352 } else {
353 # dummy read to fetch real error code
354 sysread $fh, my $buf, 1 if $! == &Errno::ENOTCONN;
355 $connect->();
356 }
357 };
358
359 # now connect
360 if (connect $state{fh}, Socket::pack_sockaddr_in _tcp_port $port, $ipn) {
361 $connected->();
362 } elsif ($! == &Errno::EINPROGRESS || $! == &Errno::EWOULDBLOCK) { # EINPROGRESS is POSIX
363 $state{ww} = AnyEvent->io (fh => $state{fh}, poll => 'w', cb => $connected);
364 } else {
365 %state = ();
366 $connect->();
367 }
368 };
369
370 defined wantarray
371 ? guard { %state = () } # break any circular dependencies and unregister watchers
372 : ()
373 }
374
375 =item $guard = AnyEvent::Util::tcp_server $host, $port, $accept_cb[, $prepare_cb]
376
377 This function is experimental.
378
379 Create and bind a tcp socket to the given host (any IPv4 host if undef,
380 otherwise it must be an IPv4 or IPv6 address) and port (service name or
381 numeric port number, or an ephemeral port if given as zero or undef), set
382 the SO_REUSEADDR flag and call C<listen>.
383
384 For each new connection that could be C<accept>ed, call the C<$accept_cb>
385 with the filehandle (in non-blocking mode) as first and the peer host and
386 port as second and third arguments (see C<tcp_connect> for details).
387
388 Croaks on any errors.
389
390 If called in non-void context, then this function returns a guard object
391 whose lifetime it tied to the tcp server: If the object gets destroyed,
392 the server will be stopped (but existing accepted connections will
393 continue).
394
395 If you need more control over the listening socket, you can provide a
396 C<$prepare_cb>, which is called just before the C<listen ()> call, with
397 the listen file handle as first argument.
398
399 It should return the length of the listen queue (or C<0> for the default).
400
401 Example: bind on tcp port 8888 on the local machine and tell each client
402 to go away.
403
404 AnyEvent::Util::tcp_server undef, 8888, sub {
405 my ($fh, $host, $port) = @_;
406
407 syswrite $fh, "The internet is full, $host:$port. Go away!\015\012";
408 };
409
410 =cut
411
412 sub tcp_server($$$;$) {
413 my ($host, $port, $accept, $prepare) = @_;
414
415 my %state;
416
417 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
418 or Carp::croak "socket: $!";
419
420 setsockopt $state{fh}, &Socket::SOL_SOCKET, &Socket::SO_REUSEADDR, 1
421 or Carp::croak "so_reuseaddr: $!";
422
423 bind $state{fh}, Socket::pack_sockaddr_in _tcp_port $port, socket_inet_aton ($host || "0.0.0.0")
424 or Carp::croak "bind: $!";
425
426 fh_nonblocking $state{fh}, 1;
427
428 my $len = ($prepare && $prepare->($state{fh})) || 128;
429
430 listen $state{fh}, $len
431 or Carp::croak "listen: $!";
432
433 $state{aw} = AnyEvent->io (fh => $state{fh}, poll => 'r', cb => sub {
434 # this closure keeps $state alive
435 while (my $peer = accept my $fh, $state{fh}) {
436 fh_nonblocking $fh, 1; # POSIX requires inheritance, the outside world does not
437 my ($port, $host) = Socket::unpack_sockaddr_in $peer;
438 $accept->($fh, (Socket::inet_ntoa $host), $port);
439 }
440 });
441
442 defined wantarray
443 ? guard { %state = () } # clear fh and watcher, which breaks the circular dependency
444 : ()
445 }
446
447 1;
448
449 =back
450
451 =head1 AUTHOR
452
453 Marc Lehmann <schmorp@schmorp.de>
454 http://home.schmorp.de/
455
456 =cut
457