ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Util.pm (file contents):
Revision 1.19 by root, Tue May 20 15:13:52 2008 UTC vs.
Revision 1.38 by root, Tue May 27 06:26:20 2008 UTC

18 18
19=cut 19=cut
20 20
21package AnyEvent::Util; 21package AnyEvent::Util;
22 22
23no warnings;
23use strict; 24use strict;
24
25no warnings "uninitialized";
26 25
27use Carp (); 26use Carp ();
28use Errno (); 27use Errno ();
29use Socket (); 28use Socket ();
30use IO::Socket::INET ();
31 29
32use AnyEvent; 30use AnyEvent ();
33 31
34use base 'Exporter'; 32use base 'Exporter';
35 33
34our @EXPORT = qw(fh_nonblocking guard fork_call portable_pipe);
35our @EXPORT_OK = qw(AF_INET6 WSAEWOULDBLOCK WSAEINPROGRESS WSAEINVAL WSAWOULDBLOCK);
36
37our $VERSION = '1.0';
38
36BEGIN { 39BEGIN {
40 my $posix = 1 * eval { local $SIG{__DIE__}; require POSIX };
41 eval "sub POSIX() { $posix }";
42}
43
44BEGIN {
45 # TODO remove this once not used anymore
37 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it 46 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it
38} 47}
39 48
40our @EXPORT = qw(inet_aton fh_nonblocking guard tcp_server tcp_connect); 49BEGIN {
50 my $af_inet6 = eval { local $SIG{__DIE__}; &Socket::AF_INET6 };
41 51
42our $VERSION = '1.0'; 52 # uhoh
53 $af_inet6 ||= 10 if $^O =~ /linux/;
54 $af_inet6 ||= 23 if $^O =~ /cygwin/i;
55 $af_inet6 ||= 23 if AnyEvent::WIN32;
56 $af_inet6 ||= 24 if $^O =~ /openbsd|netbsd/;
57 $af_inet6 ||= 28 if $^O =~ /freebsd/;
43 58
44our $MAXPARALLEL = 16; # max. number of parallel jobs 59 $af_inet6 && socket my $ipv6_socket, $af_inet6, &Socket::SOCK_STREAM, 0 # check if they can be created
60 or $af_inet6 = 0;
45 61
46our $running; 62 eval "sub AF_INET6() { $af_inet6 }"; die if $@;
47our @queue;
48 63
64 delete $AnyEvent::PROTOCOL{ipv6} unless $af_inet6;
65}
66
67BEGIN {
68 # broken windows perls use undocumented error codes...
69 if (AnyEvent::WIN32) {
70 eval "sub WSAEINVAL() { 10022 }";
71 eval "sub WSAEWOULDBLOCK() { 10035 }";
72 eval "sub WSAWOULDBLOCK() { 10035 }"; # TODO remove here ands from @export_ok
73 eval "sub WSAEINPROGRESS() { 10036 }";
74 } else {
75 # these should never match any errno value
76 eval "sub WSAEINVAL() { -1e99 }";
77 eval "sub WSAEWOULDBLOCK() { -1e99 }";
78 eval "sub WSAWOULDBLOCK() { -1e99 }"; # TODO
79 eval "sub WSAEINPROGRESS() { -1e99 }";
80 }
81}
82
83=item ($r, $w) = portable_pipe
84
85Calling C<pipe> in Perl is portable - except it doesn't really work on
86sucky windows platforms (at least not with most perls - cygwin's perl
87notably works fine).
88
89On that platform, you actually get two file handles you cannot use select
90on.
91
92This function gives you a pipe that actually works even on the broken
93Windows platform (by creating a pair of TCP sockets, so do not expect any
94speed from that).
95
96Returns the empty list on any errors.
97
98=cut
99
100sub portable_pipe() {
101 my ($r, $w);
102
103 if (AnyEvent::WIN32) {
104 socketpair $r, $w, &Socket::AF_UNIX, &Socket::SOCK_STREAM, 0
105 or return;
106 } else {
107 pipe $r, $w
108 or return;
109 }
110
111 ($r, $w)
112}
113
114=item fork_call $coderef, @args, $cb->(@res)
115
116Executes the given code reference asynchronously, by forking. Everything
117the C<$coderef> returns will transferred to the calling process (by
118serialising and deserialising via L<Storable>).
119
120If there are any errors, then the C<$cb> will be called without any
121arguments. In that case, either C<$@> contains the exception, or C<$!>
122contains an error number. In all other cases, C<$@> will be C<undef>ined.
123
124The C<$coderef> must not ever call an event-polling function or use
125event-based programming.
126
127Note that forking can be expensive in large programs (RSS 200MB+). On
128windows, it is abysmally slow, do not expect more than 5..20 forks/s on
129that sucky platform (note this uses perl's pseudo-threads, so avoid those
130like the plague).
131
132=item $AnyEvent::Util::MAX_FORKS [default: 10]
133
134The maximum number of child processes that C<fork_call> will fork in
135parallel. Any additional requests will be queued until a slot becomes free
136again.
137
138The environment variable C<PERL_ANYEVENT_MAX_FORKS> is used to initialise
139this value.
140
141=cut
142
143our $MAX_FORKS = int 1 * $ENV{PERL_ANYEVENT_MAX_FORKS};
144$MAX_FORKS = 10 if $MAX_FORKS <= 0;
145
146my $forks;
147my @fork_queue;
148
49sub _schedule; 149sub _fork_schedule;
50sub _schedule { 150sub _fork_schedule {
51 return unless @queue; 151 while () {
52 return if $running >= $MAXPARALLEL; 152 return if $forks >= $MAX_FORKS;
53 153
54 ++$running; 154 my $job = shift @fork_queue
55 my ($cb, $sub, @args) = @{shift @queue}; 155 or return;
56 156
57 if (eval { local $SIG{__DIE__}; require POSIX }) { 157 ++$forks;
58 my $pid = open my $fh, "-|";
59 158
159 my $coderef = shift @$job;
160 my $cb = pop @$job;
161
162 # gimme a break...
163 my ($r, $w) = portable_pipe
164 or ($forks and last) # allow failures when we have at least one job
165 or die "fork_call: $!";
166
167 my $pid = fork;
168
169 if ($pid != 0) {
170 # parent
171 close $w;
172
173 my $buf;
174
175 my $ww; $ww = AnyEvent->io (fh => $r, poll => 'r', cb => sub {
176 my $len = sysread $r, $buf, 65536, length $buf;
177
178 if ($len <= 0) {
179 undef $ww;
180 close $r;
181 --$forks;
182 _fork_schedule;
183
184 my $result = eval { Storable::thaw ($buf) };
185 $result = [$@] unless $result;
186 $@ = shift @$result;
187
188 $cb->(@$result);
189
190 # clean up the pid
191 waitpid $pid, 0;
192 }
193 });
194
60 if (!defined $pid) { 195 } elsif (defined $pid) {
61 die "fork: $!"; 196 # child
62 } elsif (!$pid) { 197 close $r;
63 syswrite STDOUT, join "\0", map { unpack "H*", $_ } $sub->(@args); 198
199 my $result = eval {
200 local $SIG{__DIE__};
201
202 Storable::freeze ([undef, $coderef->(@$job)])
203 };
204
205 $result = Storable::freeze (["$@"])
206 if $@;
207
208 # windows forces us to these contortions
209 my $ofs;
210
211 while () {
212 my $len = (length $result) - $ofs
213 or last;
214
215 $len = syswrite $w, $result, $len < 65536 ? $len : 65536, $ofs;
216
217 last if $len <= 0;
218
219 $ofs += $len;
220 }
221
222 close $w;
223
224 if (AnyEvent::WIN32) {
225 kill 9, $$; # yeah, windows for the win
226 } else {
227 # on native windows, _exit KILLS YOUR FORKED CHILDREN!
64 POSIX::_exit (0); 228 POSIX::_exit (0);
229 }
230 exit 1;
231
232 } elsif (($! != &Errno::EAGAIN && $! != &Errno::ENOMEM) || !$forks) {
233 # we ignore some errors as long as we can run at least one job
234 # maybe we should wait a few seconds and retry instead
235 die "fork_call: $!";
65 } 236 }
66
67 my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
68 --$running;
69 _schedule;
70 undef $w;
71
72 my $buf;
73 sysread $fh, $buf, 16384, length $buf;
74 $cb->(map { pack "H*", $_ } split /\0/, $buf);
75 });
76 } else {
77 $cb->($sub->(@args));
78 } 237 }
79} 238}
80 239
81sub _do_asy { 240sub fork_call {
241 require Storable;
242
82 push @queue, [@_]; 243 push @fork_queue, [@_];
83 _schedule; 244 _fork_schedule;
84} 245}
85 246
247# to be removed
86sub dotted_quad($) { 248sub dotted_quad($) {
87 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 249 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
88 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 250 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
89 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 251 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
90 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x 252 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
91} 253}
92 254
93my $has_ev_adns; 255# just a forwarder
94
95sub has_ev_adns {
96 ($has_ev_adns ||= do {
97 my $model = AnyEvent::detect;
98 ($model eq "AnyEvent::Impl::EV" && eval { local $SIG{__DIE__}; require EV::ADNS })
99 ? 2 : 1 # so that || always detects as true
100 }) - 1 # 2 => true, 1 => false
101}
102
103=item inet_aton $name_or_address, $cb->(@addresses)
104
105Works similarly to its Socket counterpart, except that it uses a
106callback. Also, if a host has only an IPv6 address, this might be passed
107to the callback instead (use the length to detect this - 4 for IPv4, 16
108for IPv6).
109
110This function uses various shortcuts and will fall back to either
111L<EV::ADNS> or your systems C<inet_aton>.
112
113Unlike the L<Socket> function, you can get multiple IP addresses as result
114(currently only when EV::ADNS is being used).
115
116=cut
117
118sub inet_aton { 256sub inet_aton {
119 my ($name, $cb) = @_; 257 require AnyEvent::Socket;
120 258 *inet_aton = \&AnyEvent::Socket::inet_aton;
121 if (&dotted_quad) { 259 goto &inet_aton
122 $cb->(socket_inet_aton $name);
123 } elsif ($name eq "localhost") { # rfc2606 et al.
124 $cb->(v127.0.0.1);
125 } elsif (&has_ev_adns) {
126 # work around some idiotic ands rfc readings
127 # rather hackish support for AAAA records (should
128 # wait for adns_getaddrinfo...)
129
130 my $loop = 10; # follow cname chains up to this length
131 my $qt;
132 my $acb; $acb = sub {
133 my ($status, undef, @a) = @_;
134
135 if ($status == &EV::ADNS::s_ok) {
136 if ($qt eq "a") {
137 return $cb->(map +(socket_inet_aton $_), @a);
138 } elsif ($qt eq "aaaa") {
139 return $cb->(@a);
140 } elsif ($qt eq "cname") {
141 $name = $a[0]; # there can only be one :)
142 $qt = "a";
143 return EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
144 }
145 } elsif ($status == &EV::ADNS::s_prohibitedcname) {
146 # follow cname chains
147 if ($loop--) {
148 $qt = "cname";
149 return EV::ADNS::submit ($name, &EV::ADNS::r_cname, 0, $acb);
150 }
151 } elsif ($status == &EV::ADNS::s_nodata) {
152 if ($qt eq "a") {
153 # ask for raw AAAA (might not be a good method, but adns is too broken...)
154 $qt = "aaaa";
155 return EV::ADNS::submit ($name, &EV::ADNS::r_unknown | 28, 0, $acb);
156 }
157 }
158
159 $cb->();
160 };
161
162 $qt = "a";
163 EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
164 } else {
165 _do_asy $cb, sub {
166 my $ipn = socket_inet_aton $_[0];
167 $ipn ? ($ipn) : ()
168 }, @_;
169 }
170} 260}
171 261
172=item fh_nonblocking $fh, $nonblocking 262=item fh_nonblocking $fh, $nonblocking
173 263
174Sets the blocking state of the given filehandle (true == nonblocking, 264Sets the blocking state of the given filehandle (true == nonblocking,
180sub fh_nonblocking($$) { 270sub fh_nonblocking($$) {
181 my ($fh, $nb) = @_; 271 my ($fh, $nb) = @_;
182 272
183 require Fcntl; 273 require Fcntl;
184 274
185 if ($^O eq "MSWin32") { 275 if (AnyEvent::WIN32) {
186 $nb = (! ! $nb) + 0; 276 $nb = (! ! $nb) + 0;
187 ioctl $fh, 0x8004667e, \$nb; # FIONBIO 277 ioctl $fh, 0x8004667e, \$nb; # FIONBIO
188 } else { 278 } else {
189 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0; 279 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
190 } 280 }
196code block. 286code block.
197 287
198This is often handy in continuation-passing style code to clean up some 288This is often handy in continuation-passing style code to clean up some
199resource regardless of where you break out of a process. 289resource regardless of where you break out of a process.
200 290
291You can call one method on the returned object:
292
293=item $guard->cancel
294
295This simply causes the code block not to be invoked: it "cancels" the
296guard.
297
201=cut 298=cut
202 299
203sub AnyEvent::Util::Guard::DESTROY { 300sub AnyEvent::Util::Guard::DESTROY {
204 ${$_[0]}->(); 301 ${$_[0]}->();
205} 302}
206 303
304sub AnyEvent::Util::Guard::cancel($) {
305 ${$_[0]} = sub { };
306}
307
207sub guard(&) { 308sub guard(&) {
208 bless \(my $cb = shift), AnyEvent::Util::Guard:: 309 bless \(my $cb = shift), AnyEvent::Util::Guard::
209} 310}
210 311
211sub _tcp_port($) {
212 $_[0] =~ /^\d*$/ and return $1*1;
213
214 (getservbyname $_[0], "tcp")[2]
215 or Carp::croak "$_[0]: service unknown"
216}
217
218=item my $guard = AnyEvent::Util::tcp_connect $host, $port, $connect_cb[, $prepare_cb]
219
220This function is experimental.
221
222This is a convenience function that creates a tcp socket and makes a 100%
223non-blocking connect to the given C<$host> (which can be a hostname or a
224textual IP address) and C<$port> (which can be a numeric port number or a
225service name).
226
227Unless called in void context, it returns a guard object that will
228automatically abort connecting when it gets destroyed (it does not do
229anything to the socket after the connect was successful).
230
231If the connect is successful, then the C<$connect_cb> will be invoked with
232the socket filehandle (in non-blocking mode) as first and the peer host
233(as a textual IP address) and peer port as second and third arguments,
234respectively.
235
236If the connect is unsuccessful, then the C<$connect_cb> will be invoked
237without any arguments and C<$!> will be set appropriately (with C<ENXIO>
238indicating a dns resolution failure).
239
240The filehandle is suitable to be plugged into L<AnyEvent::Handle>, but can
241be used as a normal perl file handle as well.
242
243Sometimes you need to "prepare" the socket before connecting, for example,
244to C<bind> it to some port, or you want a specific connect timeout that
245is lower than your kernel's default timeout. In this case you can specify
246a second callback, C<$prepare_cb>. It will be called with the file handle
247in not-yet-connected state as only argument and must return the connection
248timeout value (or C<0>, C<undef> or the empty list to indicate the default
249timeout is to be used).
250
251Note that the socket could be either a IPv4 TCP socket or an IPv6 tcp
252socket (although only IPv4 is currently supported by this module).
253
254Simple Example: connect to localhost on port 22.
255
256 AnyEvent::Util::tcp_connect localhost => 22, sub {
257 my $fh = shift
258 or die "unable to connect: $!";
259 # do something
260 };
261
262Complex Example: connect to www.google.com on port 80 and make a simple
263GET request without much error handling. Also limit the connection timeout
264to 15 seconds.
265
266 AnyEvent::Util::tcp_connect "www.google.com", "http",
267 sub {
268 my ($fh) = @_
269 or die "unable to connect: $!";
270
271 my $handle; # avoid direct assignment so on_eof has it in scope.
272 $handle = new AnyEvent::Handle
273 fh => $fh,
274 on_eof => sub {
275 undef $handle; # keep it alive till eof
276 warn "done.\n";
277 };
278
279 $handle->push_write ("GET / HTTP/1.0\015\012\015\012");
280
281 $handle->push_read_line ("\015\012\015\012", sub {
282 my ($handle, $line) = @_;
283
284 # print response header
285 print "HEADER\n$line\n\nBODY\n";
286
287 $handle->on_read (sub {
288 # print response body
289 print $_[0]->rbuf;
290 $_[0]->rbuf = "";
291 });
292 });
293 }, sub {
294 my ($fh) = @_;
295 # could call $fh->bind etc. here
296
297 15
298 };
299
300=cut
301
302sub tcp_connect($$$;$) {
303 my ($host, $port, $connect, $prepare) = @_;
304
305 # see http://cr.yp.to/docs/connect.html for some background
306
307 my %state = ( fh => undef );
308
309 # name resolution
310 inet_aton $host, sub {
311 return unless exists $state{fh};
312
313 my $ipn = shift;
314
315 4 == length $ipn
316 or do {
317 %state = ();
318 $! = &Errno::ENXIO;
319 return $connect->();
320 };
321
322 # socket creation
323 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
324 or do {
325 %state = ();
326 return $connect->();
327 };
328
329 fh_nonblocking $state{fh}, 1;
330
331 # prepare and optional timeout
332 if ($prepare) {
333 my $timeout = $prepare->($state{fh});
334
335 $state{to} = AnyEvent->timer (after => $timeout, cb => sub {
336 %state = ();
337 $! = &Errno::ETIMEDOUT;
338 $connect->();
339 }) if $timeout;
340 }
341
342 # called when the connect was successful, which,
343 # in theory, could be the case immediately (but never is in practise)
344 my $connected = sub {
345 my $fh = delete $state{fh};
346 %state = ();
347
348 # we are connected, or maybe there was an error
349 if (my $sin = getpeername $fh) {
350 my ($port, $host) = Socket::unpack_sockaddr_in $sin;
351 $connect->($fh, (Socket::inet_ntoa $host), $port);
352 } else {
353 # dummy read to fetch real error code
354 sysread $fh, my $buf, 1 if $! == &Errno::ENOTCONN;
355 $connect->();
356 }
357 };
358
359 # now connect
360 if (connect $state{fh}, Socket::pack_sockaddr_in _tcp_port $port, $ipn) {
361 $connected->();
362 } elsif ($! == &Errno::EINPROGRESS || $! == &Errno::EWOULDBLOCK) { # EINPROGRESS is POSIX
363 $state{ww} = AnyEvent->io (fh => $state{fh}, poll => 'w', cb => $connected);
364 } else {
365 %state = ();
366 $connect->();
367 }
368 };
369
370 defined wantarray
371 ? guard { %state = () } # break any circular dependencies and unregister watchers
372 : ()
373}
374
375=item $guard = AnyEvent::Util::tcp_server $host, $port, $accept_cb[, $prepare_cb]
376
377This function is experimental.
378
379Create and bind a tcp socket to the given host (any IPv4 host if undef,
380otherwise it must be an IPv4 or IPv6 address) and port (service name or
381numeric port number, or an ephemeral port if given as zero or undef, so
382you cnanot bind to tcp port zero), set the SO_REUSEADDR flag and call
383C<listen>.
384
385For each new connection that could be C<accept>ed, call the C<$accept_cb>
386with the filehandle (in non-blocking mode) as first and the peer host and
387port as second and third arguments (see C<tcp_connect> for details).
388
389Croaks on any errors.
390
391If called in non-void context, then this function returns a guard object
392whose lifetime it tied to the tcp server: If the object gets destroyed,
393the server will be stopped (but existing accepted connections will
394continue).
395
396If you need more control over the listening socket, you can provide a
397C<$prepare_cb>, which is called just before the C<listen ()> call, with
398the listen file handle as first argument.
399
400It should return the length of the listen queue (or C<0> for the default).
401
402Example: bind on tcp port 8888 on the local machine and tell each client
403to go away.
404
405 AnyEvent::Util::tcp_server undef, 8888, sub {
406 my ($fh, $host, $port) = @_;
407
408 syswrite $fh, "The internet is full, $host:$port. Go away!\015\012";
409 };
410
411=cut
412
413sub tcp_server($$$;$) {
414 my ($host, $port, $accept, $prepare) = @_;
415
416 my %state;
417
418 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
419 or Carp::croak "socket: $!";
420
421 setsockopt $state{fh}, &Socket::SOL_SOCKET, &Socket::SO_REUSEADDR, 1
422 or Carp::croak "so_reuseaddr: $!";
423
424 bind $state{fh}, Socket::pack_sockaddr_in _tcp_port $port, socket_inet_aton ($host || "0.0.0.0")
425 or Carp::croak "bind: $!";
426
427 fh_nonblocking $state{fh}, 1;
428
429 my $len = ($prepare && $prepare->($state{fh})) || 128;
430
431 listen $state{fh}, $len
432 or Carp::croak "listen: $!";
433
434 $state{aw} = AnyEvent->io (fh => $state{fh}, poll => 'r', cb => sub {
435 # this closure keeps $state alive
436 while (my $peer = accept my $fh, $state{fh}) {
437 fh_nonblocking $fh, 1; # POSIX requires inheritance, the outside world does not
438 my ($port, $host) = Socket::unpack_sockaddr_in $peer;
439 $accept->($fh, (Socket::inet_ntoa $host), $port);
440 }
441 });
442
443 defined wantarray
444 ? guard { %state = () } # clear fh and watcher, which breaks the circular dependency
445 : ()
446}
447
4481; 3121;
449 313
450=back 314=back
451 315
452=head1 AUTHOR 316=head1 AUTHOR

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines