ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Util.pm (file contents):
Revision 1.14 by root, Sat May 17 20:42:18 2008 UTC vs.
Revision 1.41 by root, Tue Jun 3 09:02:46 2008 UTC

2 2
3AnyEvent::Util - various utility functions. 3AnyEvent::Util - various utility functions.
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::Util; 7 use AnyEvent::Util;
8 8
9=head1 DESCRIPTION 9=head1 DESCRIPTION
10 10
11This module implements various utility functions, mostly replacing 11This module implements various utility functions, mostly replacing
12well-known functions by event-ised counterparts. 12well-known functions by event-ised counterparts.
18 18
19=cut 19=cut
20 20
21package AnyEvent::Util; 21package AnyEvent::Util;
22 22
23no warnings;
23use strict; 24use strict;
24 25
25no warnings "uninitialized"; 26use Carp ();
26
27use Errno; 27use Errno ();
28use Socket (); 28use Socket ();
29use IO::Socket::INET ();
30 29
31use AnyEvent; 30use AnyEvent ();
32 31
33use base 'Exporter'; 32use base 'Exporter';
34 33
35our @EXPORT = qw(inet_aton fh_nonblocking guard tcp_server tcp_connect); 34our @EXPORT = qw(fh_nonblocking guard fork_call portable_pipe);
35our @EXPORT_OK = qw(AF_INET6 WSAEWOULDBLOCK WSAEINPROGRESS WSAEINVAL WSAWOULDBLOCK);
36 36
37our $VERSION = '1.0'; 37our $VERSION = 4.12;
38 38
39our $MAXPARALLEL = 16; # max. number of parallel jobs 39BEGIN {
40 my $posix = 1 * eval { local $SIG{__DIE__}; require POSIX };
41 eval "sub POSIX() { $posix }";
42}
40 43
41our $running; 44BEGIN {
42our @queue; 45 # TODO remove this once not used anymore
46 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it
47}
43 48
49BEGIN {
50 my $af_inet6 = eval { local $SIG{__DIE__}; &Socket::AF_INET6 };
51
52 # uhoh
53 $af_inet6 ||= 10 if $^O =~ /linux/;
54 $af_inet6 ||= 23 if $^O =~ /cygwin/i;
55 $af_inet6 ||= 23 if AnyEvent::WIN32;
56 $af_inet6 ||= 24 if $^O =~ /openbsd|netbsd/;
57 $af_inet6 ||= 28 if $^O =~ /freebsd/;
58
59 $af_inet6 && socket my $ipv6_socket, $af_inet6, &Socket::SOCK_STREAM, 0 # check if they can be created
60 or $af_inet6 = 0;
61
62 eval "sub AF_INET6() { $af_inet6 }"; die if $@;
63
64 delete $AnyEvent::PROTOCOL{ipv6} unless $af_inet6;
65}
66
67BEGIN {
68 # broken windows perls use undocumented error codes...
69 if (AnyEvent::WIN32) {
70 eval "sub WSAEINVAL() { 10022 }";
71 eval "sub WSAEWOULDBLOCK() { 10035 }";
72 eval "sub WSAWOULDBLOCK() { 10035 }"; # TODO remove here ands from @export_ok
73 eval "sub WSAEINPROGRESS() { 10036 }";
74 } else {
75 # these should never match any errno value
76 eval "sub WSAEINVAL() { -1e99 }";
77 eval "sub WSAEWOULDBLOCK() { -1e99 }";
78 eval "sub WSAWOULDBLOCK() { -1e99 }"; # TODO
79 eval "sub WSAEINPROGRESS() { -1e99 }";
80 }
81}
82
83=item ($r, $w) = portable_pipe
84
85Calling C<pipe> in Perl is portable - except it doesn't really work on
86sucky windows platforms (at least not with most perls - cygwin's perl
87notably works fine).
88
89On that platform, you actually get two file handles you cannot use select
90on.
91
92This function gives you a pipe that actually works even on the broken
93Windows platform (by creating a pair of TCP sockets, so do not expect any
94speed from that).
95
96Returns the empty list on any errors.
97
98=cut
99
100sub portable_pipe() {
101 my ($r, $w);
102
103 if (AnyEvent::WIN32) {
104 socketpair $r, $w, &Socket::AF_UNIX, &Socket::SOCK_STREAM, 0
105 or return;
106 } else {
107 pipe $r, $w
108 or return;
109 }
110
111 ($r, $w)
112}
113
114=item fork_call $coderef, @args, $cb->(@res)
115
116Executes the given code reference asynchronously, by forking. Everything
117the C<$coderef> returns will transferred to the calling process (by
118serialising and deserialising via L<Storable>).
119
120If there are any errors, then the C<$cb> will be called without any
121arguments. In that case, either C<$@> contains the exception, or C<$!>
122contains an error number. In all other cases, C<$@> will be C<undef>ined.
123
124The C<$coderef> must not ever call an event-polling function or use
125event-based programming.
126
127Note that forking can be expensive in large programs (RSS 200MB+). On
128windows, it is abysmally slow, do not expect more than 5..20 forks/s on
129that sucky platform (note this uses perl's pseudo-threads, so avoid those
130like the plague).
131
132=item $AnyEvent::Util::MAX_FORKS [default: 10]
133
134The maximum number of child processes that C<fork_call> will fork in
135parallel. Any additional requests will be queued until a slot becomes free
136again.
137
138The environment variable C<PERL_ANYEVENT_MAX_FORKS> is used to initialise
139this value.
140
141=cut
142
143our $MAX_FORKS = int 1 * $ENV{PERL_ANYEVENT_MAX_FORKS};
144$MAX_FORKS = 10 if $MAX_FORKS <= 0;
145
146my $forks;
147my @fork_queue;
148
44sub _schedule; 149sub _fork_schedule;
45sub _schedule { 150sub _fork_schedule {
46 return unless @queue; 151 while () {
47 return if $running >= $MAXPARALLEL; 152 return if $forks >= $MAX_FORKS;
48 153
49 ++$running; 154 my $job = shift @fork_queue
50 my ($cb, $sub, @args) = @{shift @queue}; 155 or return;
51 156
52 if (eval { local $SIG{__DIE__}; require POSIX }) { 157 ++$forks;
53 my $pid = open my $fh, "-|";
54 158
159 my $coderef = shift @$job;
160 my $cb = pop @$job;
161
162 # gimme a break...
163 my ($r, $w) = portable_pipe
164 or ($forks and last) # allow failures when we have at least one job
165 or die "fork_call: $!";
166
167 my $pid = fork;
168
169 if ($pid != 0) {
170 # parent
171 close $w;
172
173 my $buf;
174
175 my $ww; $ww = AnyEvent->io (fh => $r, poll => 'r', cb => sub {
176 my $len = sysread $r, $buf, 65536, length $buf;
177
178 if ($len <= 0) {
179 undef $ww;
180 close $r;
181 --$forks;
182 _fork_schedule;
183
184 my $result = eval { Storable::thaw ($buf) };
185 $result = [$@] unless $result;
186 $@ = shift @$result;
187
188 $cb->(@$result);
189
190 # clean up the pid
191 waitpid $pid, 0;
192 }
193 });
194
55 if (!defined $pid) { 195 } elsif (defined $pid) {
56 die "fork: $!"; 196 # child
57 } elsif (!$pid) { 197 close $r;
58 syswrite STDOUT, join "\0", map { unpack "H*", $_ } $sub->(@args); 198
199 my $result = eval {
200 local $SIG{__DIE__};
201
202 Storable::freeze ([undef, $coderef->(@$job)])
203 };
204
205 $result = Storable::freeze (["$@"])
206 if $@;
207
208 # windows forces us to these contortions
209 my $ofs;
210
211 while () {
212 my $len = (length $result) - $ofs
213 or last;
214
215 $len = syswrite $w, $result, $len < 65536 ? $len : 65536, $ofs;
216
217 last if $len <= 0;
218
219 $ofs += $len;
220 }
221
222 close $w;
223
224 if (AnyEvent::WIN32) {
225 kill 9, $$; # yeah, windows for the win
226 } else {
227 # on native windows, _exit KILLS YOUR FORKED CHILDREN!
59 POSIX::_exit (0); 228 POSIX::_exit (0);
229 }
230 exit 1;
231
232 } elsif (($! != &Errno::EAGAIN && $! != &Errno::ENOMEM) || !$forks) {
233 # we ignore some errors as long as we can run at least one job
234 # maybe we should wait a few seconds and retry instead
235 die "fork_call: $!";
60 } 236 }
61
62 my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
63 --$running;
64 _schedule;
65 undef $w;
66
67 my $buf;
68 sysread $fh, $buf, 16384, length $buf;
69 $cb->(map { pack "H*", $_ } split /\0/, $buf);
70 });
71 } else {
72 $cb->($sub->(@args));
73 } 237 }
74} 238}
75 239
76sub _do_asy { 240sub fork_call {
241 require Storable;
242
77 push @queue, [@_]; 243 push @fork_queue, [@_];
78 _schedule; 244 _fork_schedule;
79} 245}
80 246
247# to be removed
81sub dotted_quad($) { 248sub dotted_quad($) {
82 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 249 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
83 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 250 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
84 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 251 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
85 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x 252 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
86} 253}
87 254
88my $has_ev_adns; 255# just a forwarder
89
90sub has_ev_adns {
91 ($has_ev_adns ||= do {
92 my $model = AnyEvent::detect;
93 ($model eq "AnyEvent::Impl::EV" && eval { local $SIG{__DIE__}; require EV::ADNS })
94 ? 2 : 1 # so that || always detects as true
95 }) - 1 # 2 => true, 1 => false
96}
97
98=item inet_aton $name_or_address, $cb->($binary_address_or_undef)
99
100Works almost exactly like its Socket counterpart, except that it uses a
101callback. Also, if a host has only an IPv6 address, this might be passed
102to the callback instead (use the length to detetc this - 4 for IPv4, 16
103for IPv6).
104
105This function uses various shortcuts and will fall back to either
106L<EV::ADNS> or your systems C<inet_aton>.
107
108=cut
109
110sub inet_aton { 256sub inet_aton {
111 my ($name, $cb) = @_; 257 require AnyEvent::Socket;
112 258 *inet_aton = \&AnyEvent::Socket::inet_aton;
113 if (&dotted_quad) { 259 goto &inet_aton
114 $cb->(Socket::inet_aton $name);
115 } elsif ($name eq "localhost") { # rfc2606 et al.
116 $cb->(v127.0.0.1);
117 } elsif (&has_ev_adns) {
118 # work around some idiotic ands rfc readings
119 # rather hackish support for AAAA records (should
120 # wait for adns_getaddrinfo...)
121
122 my $loop = 10; # follow cname chains up to this length
123 my $qt;
124 my $acb; $acb = sub {
125 my ($status, undef, @a) = @_;
126
127 if ($status == &EV::ADNS::s_ok) {
128 if ($qt eq "a") {
129 return $cb->(Socket::inet_aton $a[0]);
130 } elsif ($qt eq "aaaa") {
131 return $cb->($a[0]);
132 } elsif ($qt eq "cname") {
133 $name = $a[0];
134 $qt = "a";
135 return EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
136 }
137 } elsif ($status == &EV::ADNS::s_prohibitedcname) {
138 # follow cname chains
139 if ($loop--) {
140 $qt = "cname";
141 return EV::ADNS::submit ($name, &EV::ADNS::r_cname, 0, $acb);
142 }
143 } elsif ($status == &EV::ADNS::s_nodata) {
144 if ($qt eq "a") {
145 # ask for raw AAAA (might not be a good method, but adns is too broken...)
146 $qt = "aaaa";
147 return EV::ADNS::submit ($name, &EV::ADNS::r_unknown | 28, 0, $acb);
148 }
149 }
150
151 $cb->(undef);
152 };
153
154 $qt = "a";
155 EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
156 } else {
157 _do_asy $cb, sub { Socket::inet_aton $_[0] }, @_;
158 }
159} 260}
160 261
161=item fh_nonblocking $fh, $nonblocking 262=item fh_nonblocking $fh, $nonblocking
162 263
163Sets the blocking state of the given filehandle (true == nonblocking, 264Sets the blocking state of the given filehandle (true == nonblocking,
169sub fh_nonblocking($$) { 270sub fh_nonblocking($$) {
170 my ($fh, $nb) = @_; 271 my ($fh, $nb) = @_;
171 272
172 require Fcntl; 273 require Fcntl;
173 274
174 if ($^O eq "MSWin32") { 275 if (AnyEvent::WIN32) {
175 $nb = (! ! $nb) + 0; 276 $nb = (! ! $nb) + 0;
176 ioctl $fh, 0x8004667e, \$nb; # FIONBIO 277 ioctl $fh, 0x8004667e, \$nb; # FIONBIO
177 } else { 278 } else {
178 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0; 279 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
179 } 280 }
185code block. 286code block.
186 287
187This is often handy in continuation-passing style code to clean up some 288This is often handy in continuation-passing style code to clean up some
188resource regardless of where you break out of a process. 289resource regardless of where you break out of a process.
189 290
291You can call one method on the returned object:
292
293=item $guard->cancel
294
295This simply causes the code block not to be invoked: it "cancels" the
296guard.
297
190=cut 298=cut
191 299
192sub AnyEvent::Util::Guard::DESTROY { 300sub AnyEvent::Util::Guard::DESTROY {
193 ${$_[0]}->(); 301 ${$_[0]}->();
194} 302}
195 303
304sub AnyEvent::Util::Guard::cancel($) {
305 ${$_[0]} = sub { };
306}
307
196sub guard(&) { 308sub guard(&) {
197 bless \(my $cb = shift), AnyEvent::Util::Guard:: 309 bless \(my $cb = shift), AnyEvent::Util::Guard::
198} 310}
199 311
200=item my $guard = AnyEvent::Util::tcp_connect $host, $port, $connect_cb[, $prepare_cb]
201
202This is a convenience function that creates a tcp socket and makes a 100%
203non-blocking connect to the given C<$host> (which can be a hostname or a
204textual IP address) and C<$port>.
205
206Unless called in void context, it returns a guard object that will
207automatically abort connecting when it gets destroyed (it does not do
208anything to the socket after the conenct was successful).
209
210If the connect is successful, then the C<$connect_cb> will be invoked with
211the socket filehandle (in non-blocking mode) as first and the peer host
212(as a textual IP address) and peer port as second and third arguments,
213respectively.
214
215If the connect is unsuccessful, then the C<$connect_cb> will be invoked
216without any arguments and C<$!> will be set appropriately (with C<ENXIO>
217indicating a dns resolution failure).
218
219The filehandle is suitable to be plugged into L<AnyEvent::Handle>, but can
220be used as a normal perl file handle as well.
221
222Sometimes you need to "prepare" the socket before connecting, for example,
223to C<bind> it to some port, or you want a specific connect timeout that
224is lower than your kernel's default timeout. In this case you can specify
225a second callback, C<$prepare_cb>. It will be called with the file handle
226in not-yet-connected state as only argument and must return the connection
227timeout value (or C<0>, C<undef> or the empty list to indicate the default
228timeout is to be used).
229
230Note that the socket could be either a IPv4 TCP socket or an IPv6 tcp
231socket (although only IPv4 is currently supported by this module).
232
233Simple Example: connect to localhost on port 22.
234
235 AnyEvent::Util::tcp_connect localhost => 22, sub {
236 my $fh = shift
237 or die "unable to connect: $!";
238 # do something
239 };
240
241Complex Example: connect to www.google.com on port 80 and make a simple
242GET request without much error handling. Also limit the connection timeout
243to 15 seconds.
244
245 AnyEvent::Util::tcp_connect "www.google.com", 80,
246 sub {
247 my ($fh) = @_
248 or die "unable to connect: $!";
249
250 my $handle; # avoid direct assignment so on_eof has it in scope.
251 $handle = new AnyEvent::Handle
252 fh => $fh,
253 on_eof => sub {
254 undef $handle; # keep it alive till eof
255 warn "done.\n";
256 };
257
258 $handle->push_write ("GET / HTTP/1.0\015\012\015\012");
259
260 $handle->push_read_line ("\015\012\015\012", sub {
261 my ($handle, $line) = @_;
262
263 # print response header
264 print "HEADER\n$line\n\nBODY\n";
265
266 $handle->on_read (sub {
267 # print response body
268 print $_[0]->rbuf;
269 $_[0]->rbuf = "";
270 });
271 });
272 }, sub {
273 my ($fh) = @_;
274 # could call $fh->bind etc. here
275
276 15
277 };
278
279=cut
280
281sub tcp_connect($$$;$) {
282 my ($host, $port, $connect, $prepare) = @_;
283
284 # see http://cr.yp.to/docs/connect.html for some background
285
286 my %state = ( fh => undef );
287
288 # name resolution
289 inet_aton $host, sub {
290 return unless exists $state{fh};
291
292 my $ipn = shift
293 or do {
294 %state = ();
295 $! = &Errno::ENXIO;
296 return $connect->();
297 };
298
299 # socket creation
300 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
301 or do {
302 %state = ();
303 return $connect->();
304 };
305
306 fh_nonblocking $state{fh}, 1;
307
308 # prepare and optional timeout
309 if ($prepare) {
310 my $timeout = $prepare->($state{fh});
311
312 $state{to} = AnyEvent->timer (after => $timeout, cb => sub {
313 %state = ();
314 $! = &Errno::ETIMEDOUT;
315 $connect->();
316 }) if $timeout;
317 }
318
319 # called when the connect was successful, which,
320 # in theory, could be the case immediately (but never is in practise)
321 my $connected = sub {
322 my $fh = delete $state{fh};
323 %state = ();
324
325 # we are connected, or maybe there was an error
326 if (my $sin = getpeername $fh) {
327 my ($port, $host) = Socket::unpack_sockaddr_in $sin;
328 $connect->($fh, (Socket::inet_ntoa $host), $port);
329 } else {
330 # dummy read to fetch real error code
331 sysread $fh, my $buf, 1;
332 $connect->();
333 }
334 };
335
336 # now connect
337 if (connect $state{fh}, Socket::pack_sockaddr_in $port, $ipn) {
338 $connected->();
339 } elsif ($! == &Errno::EINPROGRESS || $! == &Errno::EWOULDBLOCK) { # EINPROGRESS is POSIX
340 $state{ww} = AnyEvent->io (fh => $state{fh}, poll => 'w', cb => $connected);
341 } else {
342 %state = ();
343 $connect->();
344 }
345 };
346
347 defined wantarray
348 ? guard { %state = () } # break any circular dependencies and unregister watchers
349 : ()
350}
351
352=item $guard = AnyEvent::Util::tcp_server $host, $port, $accept_cb[, $prepare_cb]
353
354Create and bind a tcp socket to the given host (any IPv4 host if undef,
355otherwise it must be an IPv4 or IPv6 address) and port (or an ephemeral
356port if given as zero or undef), set the SO_REUSEADDR flag and call
357C<listen>.
358
359For each new connection that could be C<accept>ed, call the C<$accept_cb>
360with the filehandle (in non-blocking mode) as first and the peer host and
361port as second and third arguments (see C<tcp_connect> for details).
362
363Croaks on any errors.
364
365If called in non-void context, then this function returns a guard object
366whose lifetime it tied to the tcp server: If the object gets destroyed,
367the server will be stopped (but existing accepted connections will
368continue).
369
370If you need more control over the listening socket, you can provide a
371C<$prepare_cb>, which is called just before the C<listen ()> call, with
372the listen file handle as first argument.
373
374It should return the length of the listen queue (or C<0> for the default).
375
376Example: bind on tcp port 8888 on the local machine and tell each client
377to go away.
378
379 AnyEvent::Util::tcp_server undef, 8888, sub {
380 my ($fh, $host, $port) = @_;
381
382 syswrite $fh, "The internet is full, $host:$port. Go away!\015\012";
383 };
384
385=cut
386
387sub tcp_server($$$;$) {
388 my ($host, $port, $accept, $prepare) = @_;
389
390 my %state;
391
392 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
393 or Carp::croak "socket: $!";
394
395 setsockopt $state{fh}, &Socket::SOL_SOCKET, &Socket::SO_REUSEADDR, 1
396 or Carp::croak "so_reuseaddr: $!";
397
398 bind $state{fh}, Socket::pack_sockaddr_in $port, Socket::inet_aton ($host || "0.0.0.0")
399 or Carp::croak "bind: $!";
400
401 fh_nonblocking $state{fh}, 1;
402
403 my $len = ($prepare && $prepare->($state{fh})) || 128;
404
405 listen $state{fh}, $len
406 or Carp::croak "listen: $!";
407
408 $state{aw} = AnyEvent->io (fh => $state{fh}, poll => 'r', cb => sub {
409 # this closure keeps $state alive
410 while (my $peer = accept my $fh, $state{fh}) {
411 fh_nonblocking $fh, 1; # POSIX requires inheritance, the outside world does not
412 my ($port, $host) = Socket::unpack_sockaddr_in $peer;
413 $accept->($fh, (Socket::inet_ntoa $host), $port);
414 }
415 });
416
417 defined wantarray
418 ? guard { %state = () } # clear fh and watcher, which breaks the circular dependency
419 : ()
420}
421
4221; 3121;
423 313
424=back 314=back
425 315
426=head1 AUTHOR 316=head1 AUTHOR

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines