ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Util.pm (file contents):
Revision 1.16 by root, Sat May 17 21:34:15 2008 UTC vs.
Revision 1.41 by root, Tue Jun 3 09:02:46 2008 UTC

2 2
3AnyEvent::Util - various utility functions. 3AnyEvent::Util - various utility functions.
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::Util; 7 use AnyEvent::Util;
8 8
9=head1 DESCRIPTION 9=head1 DESCRIPTION
10 10
11This module implements various utility functions, mostly replacing 11This module implements various utility functions, mostly replacing
12well-known functions by event-ised counterparts. 12well-known functions by event-ised counterparts.
18 18
19=cut 19=cut
20 20
21package AnyEvent::Util; 21package AnyEvent::Util;
22 22
23no warnings;
23use strict; 24use strict;
24 25
25no warnings "uninitialized"; 26use Carp ();
26
27use Errno; 27use Errno ();
28use Socket (); 28use Socket ();
29use IO::Socket::INET ();
30 29
31use AnyEvent; 30use AnyEvent ();
32 31
33use base 'Exporter'; 32use base 'Exporter';
34 33
35our @EXPORT = qw(inet_aton fh_nonblocking guard tcp_server tcp_connect); 34our @EXPORT = qw(fh_nonblocking guard fork_call portable_pipe);
35our @EXPORT_OK = qw(AF_INET6 WSAEWOULDBLOCK WSAEINPROGRESS WSAEINVAL WSAWOULDBLOCK);
36 36
37our $VERSION = '1.0'; 37our $VERSION = 4.12;
38 38
39our $MAXPARALLEL = 16; # max. number of parallel jobs 39BEGIN {
40 my $posix = 1 * eval { local $SIG{__DIE__}; require POSIX };
41 eval "sub POSIX() { $posix }";
42}
40 43
41our $running; 44BEGIN {
42our @queue; 45 # TODO remove this once not used anymore
46 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it
47}
43 48
49BEGIN {
50 my $af_inet6 = eval { local $SIG{__DIE__}; &Socket::AF_INET6 };
51
52 # uhoh
53 $af_inet6 ||= 10 if $^O =~ /linux/;
54 $af_inet6 ||= 23 if $^O =~ /cygwin/i;
55 $af_inet6 ||= 23 if AnyEvent::WIN32;
56 $af_inet6 ||= 24 if $^O =~ /openbsd|netbsd/;
57 $af_inet6 ||= 28 if $^O =~ /freebsd/;
58
59 $af_inet6 && socket my $ipv6_socket, $af_inet6, &Socket::SOCK_STREAM, 0 # check if they can be created
60 or $af_inet6 = 0;
61
62 eval "sub AF_INET6() { $af_inet6 }"; die if $@;
63
64 delete $AnyEvent::PROTOCOL{ipv6} unless $af_inet6;
65}
66
67BEGIN {
68 # broken windows perls use undocumented error codes...
69 if (AnyEvent::WIN32) {
70 eval "sub WSAEINVAL() { 10022 }";
71 eval "sub WSAEWOULDBLOCK() { 10035 }";
72 eval "sub WSAWOULDBLOCK() { 10035 }"; # TODO remove here ands from @export_ok
73 eval "sub WSAEINPROGRESS() { 10036 }";
74 } else {
75 # these should never match any errno value
76 eval "sub WSAEINVAL() { -1e99 }";
77 eval "sub WSAEWOULDBLOCK() { -1e99 }";
78 eval "sub WSAWOULDBLOCK() { -1e99 }"; # TODO
79 eval "sub WSAEINPROGRESS() { -1e99 }";
80 }
81}
82
83=item ($r, $w) = portable_pipe
84
85Calling C<pipe> in Perl is portable - except it doesn't really work on
86sucky windows platforms (at least not with most perls - cygwin's perl
87notably works fine).
88
89On that platform, you actually get two file handles you cannot use select
90on.
91
92This function gives you a pipe that actually works even on the broken
93Windows platform (by creating a pair of TCP sockets, so do not expect any
94speed from that).
95
96Returns the empty list on any errors.
97
98=cut
99
100sub portable_pipe() {
101 my ($r, $w);
102
103 if (AnyEvent::WIN32) {
104 socketpair $r, $w, &Socket::AF_UNIX, &Socket::SOCK_STREAM, 0
105 or return;
106 } else {
107 pipe $r, $w
108 or return;
109 }
110
111 ($r, $w)
112}
113
114=item fork_call $coderef, @args, $cb->(@res)
115
116Executes the given code reference asynchronously, by forking. Everything
117the C<$coderef> returns will transferred to the calling process (by
118serialising and deserialising via L<Storable>).
119
120If there are any errors, then the C<$cb> will be called without any
121arguments. In that case, either C<$@> contains the exception, or C<$!>
122contains an error number. In all other cases, C<$@> will be C<undef>ined.
123
124The C<$coderef> must not ever call an event-polling function or use
125event-based programming.
126
127Note that forking can be expensive in large programs (RSS 200MB+). On
128windows, it is abysmally slow, do not expect more than 5..20 forks/s on
129that sucky platform (note this uses perl's pseudo-threads, so avoid those
130like the plague).
131
132=item $AnyEvent::Util::MAX_FORKS [default: 10]
133
134The maximum number of child processes that C<fork_call> will fork in
135parallel. Any additional requests will be queued until a slot becomes free
136again.
137
138The environment variable C<PERL_ANYEVENT_MAX_FORKS> is used to initialise
139this value.
140
141=cut
142
143our $MAX_FORKS = int 1 * $ENV{PERL_ANYEVENT_MAX_FORKS};
144$MAX_FORKS = 10 if $MAX_FORKS <= 0;
145
146my $forks;
147my @fork_queue;
148
44sub _schedule; 149sub _fork_schedule;
45sub _schedule { 150sub _fork_schedule {
46 return unless @queue; 151 while () {
47 return if $running >= $MAXPARALLEL; 152 return if $forks >= $MAX_FORKS;
48 153
49 ++$running; 154 my $job = shift @fork_queue
50 my ($cb, $sub, @args) = @{shift @queue}; 155 or return;
51 156
52 if (eval { local $SIG{__DIE__}; require POSIX }) { 157 ++$forks;
53 my $pid = open my $fh, "-|";
54 158
159 my $coderef = shift @$job;
160 my $cb = pop @$job;
161
162 # gimme a break...
163 my ($r, $w) = portable_pipe
164 or ($forks and last) # allow failures when we have at least one job
165 or die "fork_call: $!";
166
167 my $pid = fork;
168
169 if ($pid != 0) {
170 # parent
171 close $w;
172
173 my $buf;
174
175 my $ww; $ww = AnyEvent->io (fh => $r, poll => 'r', cb => sub {
176 my $len = sysread $r, $buf, 65536, length $buf;
177
178 if ($len <= 0) {
179 undef $ww;
180 close $r;
181 --$forks;
182 _fork_schedule;
183
184 my $result = eval { Storable::thaw ($buf) };
185 $result = [$@] unless $result;
186 $@ = shift @$result;
187
188 $cb->(@$result);
189
190 # clean up the pid
191 waitpid $pid, 0;
192 }
193 });
194
55 if (!defined $pid) { 195 } elsif (defined $pid) {
56 die "fork: $!"; 196 # child
57 } elsif (!$pid) { 197 close $r;
58 syswrite STDOUT, join "\0", map { unpack "H*", $_ } $sub->(@args); 198
199 my $result = eval {
200 local $SIG{__DIE__};
201
202 Storable::freeze ([undef, $coderef->(@$job)])
203 };
204
205 $result = Storable::freeze (["$@"])
206 if $@;
207
208 # windows forces us to these contortions
209 my $ofs;
210
211 while () {
212 my $len = (length $result) - $ofs
213 or last;
214
215 $len = syswrite $w, $result, $len < 65536 ? $len : 65536, $ofs;
216
217 last if $len <= 0;
218
219 $ofs += $len;
220 }
221
222 close $w;
223
224 if (AnyEvent::WIN32) {
225 kill 9, $$; # yeah, windows for the win
226 } else {
227 # on native windows, _exit KILLS YOUR FORKED CHILDREN!
59 POSIX::_exit (0); 228 POSIX::_exit (0);
229 }
230 exit 1;
231
232 } elsif (($! != &Errno::EAGAIN && $! != &Errno::ENOMEM) || !$forks) {
233 # we ignore some errors as long as we can run at least one job
234 # maybe we should wait a few seconds and retry instead
235 die "fork_call: $!";
60 } 236 }
61
62 my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
63 --$running;
64 _schedule;
65 undef $w;
66
67 my $buf;
68 sysread $fh, $buf, 16384, length $buf;
69 $cb->(map { pack "H*", $_ } split /\0/, $buf);
70 });
71 } else {
72 $cb->($sub->(@args));
73 } 237 }
74} 238}
75 239
76sub _do_asy { 240sub fork_call {
241 require Storable;
242
77 push @queue, [@_]; 243 push @fork_queue, [@_];
78 _schedule; 244 _fork_schedule;
79} 245}
80 246
247# to be removed
81sub dotted_quad($) { 248sub dotted_quad($) {
82 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 249 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
83 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 250 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
84 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 251 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
85 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x 252 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
86} 253}
87 254
88my $has_ev_adns; 255# just a forwarder
89
90sub has_ev_adns {
91 ($has_ev_adns ||= do {
92 my $model = AnyEvent::detect;
93 ($model eq "AnyEvent::Impl::EV" && eval { local $SIG{__DIE__}; require EV::ADNS })
94 ? 2 : 1 # so that || always detects as true
95 }) - 1 # 2 => true, 1 => false
96}
97
98=item inet_aton $name_or_address, $cb->($binary_address_or_undef)
99
100Works almost exactly like its Socket counterpart, except that it uses a
101callback. Also, if a host has only an IPv6 address, this might be passed
102to the callback instead (use the length to detetc this - 4 for IPv4, 16
103for IPv6).
104
105This function uses various shortcuts and will fall back to either
106L<EV::ADNS> or your systems C<inet_aton>.
107
108=cut
109
110sub inet_aton { 256sub inet_aton {
111 my ($name, $cb) = @_; 257 require AnyEvent::Socket;
112 258 *inet_aton = \&AnyEvent::Socket::inet_aton;
113 if (&dotted_quad) { 259 goto &inet_aton
114 $cb->(Socket::inet_aton $name);
115 } elsif ($name eq "localhost") { # rfc2606 et al.
116 $cb->(v127.0.0.1);
117 } elsif (&has_ev_adns) {
118 # work around some idiotic ands rfc readings
119 # rather hackish support for AAAA records (should
120 # wait for adns_getaddrinfo...)
121
122 my $loop = 10; # follow cname chains up to this length
123 my $qt;
124 my $acb; $acb = sub {
125 my ($status, undef, @a) = @_;
126
127 if ($status == &EV::ADNS::s_ok) {
128 if ($qt eq "a") {
129 return $cb->(Socket::inet_aton $a[0]);
130 } elsif ($qt eq "aaaa") {
131 return $cb->($a[0]);
132 } elsif ($qt eq "cname") {
133 $name = $a[0];
134 $qt = "a";
135 return EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
136 }
137 } elsif ($status == &EV::ADNS::s_prohibitedcname) {
138 # follow cname chains
139 if ($loop--) {
140 $qt = "cname";
141 return EV::ADNS::submit ($name, &EV::ADNS::r_cname, 0, $acb);
142 }
143 } elsif ($status == &EV::ADNS::s_nodata) {
144 if ($qt eq "a") {
145 # ask for raw AAAA (might not be a good method, but adns is too broken...)
146 $qt = "aaaa";
147 return EV::ADNS::submit ($name, &EV::ADNS::r_unknown | 28, 0, $acb);
148 }
149 }
150
151 $cb->(undef);
152 };
153
154 $qt = "a";
155 EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
156 } else {
157 _do_asy $cb, sub { Socket::inet_aton $_[0] }, @_;
158 }
159} 260}
160 261
161=item fh_nonblocking $fh, $nonblocking 262=item fh_nonblocking $fh, $nonblocking
162 263
163Sets the blocking state of the given filehandle (true == nonblocking, 264Sets the blocking state of the given filehandle (true == nonblocking,
169sub fh_nonblocking($$) { 270sub fh_nonblocking($$) {
170 my ($fh, $nb) = @_; 271 my ($fh, $nb) = @_;
171 272
172 require Fcntl; 273 require Fcntl;
173 274
174 if ($^O eq "MSWin32") { 275 if (AnyEvent::WIN32) {
175 $nb = (! ! $nb) + 0; 276 $nb = (! ! $nb) + 0;
176 ioctl $fh, 0x8004667e, \$nb; # FIONBIO 277 ioctl $fh, 0x8004667e, \$nb; # FIONBIO
177 } else { 278 } else {
178 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0; 279 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
179 } 280 }
185code block. 286code block.
186 287
187This is often handy in continuation-passing style code to clean up some 288This is often handy in continuation-passing style code to clean up some
188resource regardless of where you break out of a process. 289resource regardless of where you break out of a process.
189 290
291You can call one method on the returned object:
292
293=item $guard->cancel
294
295This simply causes the code block not to be invoked: it "cancels" the
296guard.
297
190=cut 298=cut
191 299
192sub AnyEvent::Util::Guard::DESTROY { 300sub AnyEvent::Util::Guard::DESTROY {
193 ${$_[0]}->(); 301 ${$_[0]}->();
194} 302}
195 303
304sub AnyEvent::Util::Guard::cancel($) {
305 ${$_[0]} = sub { };
306}
307
196sub guard(&) { 308sub guard(&) {
197 bless \(my $cb = shift), AnyEvent::Util::Guard:: 309 bless \(my $cb = shift), AnyEvent::Util::Guard::
198} 310}
199 311
200=item my $guard = AnyEvent::Util::tcp_connect $host, $port, $connect_cb[, $prepare_cb]
201
202This function is experimental.
203
204This is a convenience function that creates a tcp socket and makes a 100%
205non-blocking connect to the given C<$host> (which can be a hostname or a
206textual IP address) and C<$port>.
207
208Unless called in void context, it returns a guard object that will
209automatically abort connecting when it gets destroyed (it does not do
210anything to the socket after the conenct was successful).
211
212If the connect is successful, then the C<$connect_cb> will be invoked with
213the socket filehandle (in non-blocking mode) as first and the peer host
214(as a textual IP address) and peer port as second and third arguments,
215respectively.
216
217If the connect is unsuccessful, then the C<$connect_cb> will be invoked
218without any arguments and C<$!> will be set appropriately (with C<ENXIO>
219indicating a dns resolution failure).
220
221The filehandle is suitable to be plugged into L<AnyEvent::Handle>, but can
222be used as a normal perl file handle as well.
223
224Sometimes you need to "prepare" the socket before connecting, for example,
225to C<bind> it to some port, or you want a specific connect timeout that
226is lower than your kernel's default timeout. In this case you can specify
227a second callback, C<$prepare_cb>. It will be called with the file handle
228in not-yet-connected state as only argument and must return the connection
229timeout value (or C<0>, C<undef> or the empty list to indicate the default
230timeout is to be used).
231
232Note that the socket could be either a IPv4 TCP socket or an IPv6 tcp
233socket (although only IPv4 is currently supported by this module).
234
235Simple Example: connect to localhost on port 22.
236
237 AnyEvent::Util::tcp_connect localhost => 22, sub {
238 my $fh = shift
239 or die "unable to connect: $!";
240 # do something
241 };
242
243Complex Example: connect to www.google.com on port 80 and make a simple
244GET request without much error handling. Also limit the connection timeout
245to 15 seconds.
246
247 AnyEvent::Util::tcp_connect "www.google.com", 80,
248 sub {
249 my ($fh) = @_
250 or die "unable to connect: $!";
251
252 my $handle; # avoid direct assignment so on_eof has it in scope.
253 $handle = new AnyEvent::Handle
254 fh => $fh,
255 on_eof => sub {
256 undef $handle; # keep it alive till eof
257 warn "done.\n";
258 };
259
260 $handle->push_write ("GET / HTTP/1.0\015\012\015\012");
261
262 $handle->push_read_line ("\015\012\015\012", sub {
263 my ($handle, $line) = @_;
264
265 # print response header
266 print "HEADER\n$line\n\nBODY\n";
267
268 $handle->on_read (sub {
269 # print response body
270 print $_[0]->rbuf;
271 $_[0]->rbuf = "";
272 });
273 });
274 }, sub {
275 my ($fh) = @_;
276 # could call $fh->bind etc. here
277
278 15
279 };
280
281=cut
282
283sub tcp_connect($$$;$) {
284 my ($host, $port, $connect, $prepare) = @_;
285
286 # see http://cr.yp.to/docs/connect.html for some background
287
288 my %state = ( fh => undef );
289
290 # name resolution
291 inet_aton $host, sub {
292 return unless exists $state{fh};
293
294 my $ipn = shift;
295
296 4 == length $ipn
297 or do {
298 %state = ();
299 $! = &Errno::ENXIO;
300 return $connect->();
301 };
302
303 # socket creation
304 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
305 or do {
306 %state = ();
307 return $connect->();
308 };
309
310 fh_nonblocking $state{fh}, 1;
311
312 # prepare and optional timeout
313 if ($prepare) {
314 my $timeout = $prepare->($state{fh});
315
316 $state{to} = AnyEvent->timer (after => $timeout, cb => sub {
317 %state = ();
318 $! = &Errno::ETIMEDOUT;
319 $connect->();
320 }) if $timeout;
321 }
322
323 # called when the connect was successful, which,
324 # in theory, could be the case immediately (but never is in practise)
325 my $connected = sub {
326 my $fh = delete $state{fh};
327 %state = ();
328
329 # we are connected, or maybe there was an error
330 if (my $sin = getpeername $fh) {
331 my ($port, $host) = Socket::unpack_sockaddr_in $sin;
332 $connect->($fh, (Socket::inet_ntoa $host), $port);
333 } else {
334 # dummy read to fetch real error code
335 sysread $fh, my $buf, 1;
336 $connect->();
337 }
338 };
339
340 # now connect
341 if (connect $state{fh}, Socket::pack_sockaddr_in $port, $ipn) {
342 $connected->();
343 } elsif ($! == &Errno::EINPROGRESS || $! == &Errno::EWOULDBLOCK) { # EINPROGRESS is POSIX
344 $state{ww} = AnyEvent->io (fh => $state{fh}, poll => 'w', cb => $connected);
345 } else {
346 %state = ();
347 $connect->();
348 }
349 };
350
351 defined wantarray
352 ? guard { %state = () } # break any circular dependencies and unregister watchers
353 : ()
354}
355
356=item $guard = AnyEvent::Util::tcp_server $host, $port, $accept_cb[, $prepare_cb]
357
358This function is experimental.
359
360Create and bind a tcp socket to the given host (any IPv4 host if undef,
361otherwise it must be an IPv4 or IPv6 address) and port (or an ephemeral
362port if given as zero or undef), set the SO_REUSEADDR flag and call
363C<listen>.
364
365For each new connection that could be C<accept>ed, call the C<$accept_cb>
366with the filehandle (in non-blocking mode) as first and the peer host and
367port as second and third arguments (see C<tcp_connect> for details).
368
369Croaks on any errors.
370
371If called in non-void context, then this function returns a guard object
372whose lifetime it tied to the tcp server: If the object gets destroyed,
373the server will be stopped (but existing accepted connections will
374continue).
375
376If you need more control over the listening socket, you can provide a
377C<$prepare_cb>, which is called just before the C<listen ()> call, with
378the listen file handle as first argument.
379
380It should return the length of the listen queue (or C<0> for the default).
381
382Example: bind on tcp port 8888 on the local machine and tell each client
383to go away.
384
385 AnyEvent::Util::tcp_server undef, 8888, sub {
386 my ($fh, $host, $port) = @_;
387
388 syswrite $fh, "The internet is full, $host:$port. Go away!\015\012";
389 };
390
391=cut
392
393sub tcp_server($$$;$) {
394 my ($host, $port, $accept, $prepare) = @_;
395
396 my %state;
397
398 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
399 or Carp::croak "socket: $!";
400
401 setsockopt $state{fh}, &Socket::SOL_SOCKET, &Socket::SO_REUSEADDR, 1
402 or Carp::croak "so_reuseaddr: $!";
403
404 bind $state{fh}, Socket::pack_sockaddr_in $port, Socket::inet_aton ($host || "0.0.0.0")
405 or Carp::croak "bind: $!";
406
407 fh_nonblocking $state{fh}, 1;
408
409 my $len = ($prepare && $prepare->($state{fh})) || 128;
410
411 listen $state{fh}, $len
412 or Carp::croak "listen: $!";
413
414 $state{aw} = AnyEvent->io (fh => $state{fh}, poll => 'r', cb => sub {
415 # this closure keeps $state alive
416 while (my $peer = accept my $fh, $state{fh}) {
417 fh_nonblocking $fh, 1; # POSIX requires inheritance, the outside world does not
418 my ($port, $host) = Socket::unpack_sockaddr_in $peer;
419 $accept->($fh, (Socket::inet_ntoa $host), $port);
420 }
421 });
422
423 defined wantarray
424 ? guard { %state = () } # clear fh and watcher, which breaks the circular dependency
425 : ()
426}
427
4281; 3121;
429 313
430=back 314=back
431 315
432=head1 AUTHOR 316=head1 AUTHOR

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines