ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Util.pm (file contents):
Revision 1.19 by root, Tue May 20 15:13:52 2008 UTC vs.
Revision 1.35 by root, Tue May 27 03:13:44 2008 UTC

18 18
19=cut 19=cut
20 20
21package AnyEvent::Util; 21package AnyEvent::Util;
22 22
23no warnings;
23use strict; 24use strict;
24
25no warnings "uninitialized";
26 25
27use Carp (); 26use Carp ();
28use Errno (); 27use Errno ();
29use Socket (); 28use Socket ();
30use IO::Socket::INET ();
31 29
32use AnyEvent; 30use AnyEvent ();
33 31
34use base 'Exporter'; 32use base 'Exporter';
35 33
34our @EXPORT = qw(fh_nonblocking guard fork_call portable_pipe);
35our @EXPORT_OK = qw(AF_INET6 WSAWOULDBLOCK WSAEINPROGRESS);
36
37our $VERSION = '1.0';
38
36BEGIN { 39BEGIN {
40 my $posix = 1 * eval { local $SIG{__DIE__}; require POSIX };
41 eval "sub POSIX() { $posix }";
42}
43
44BEGIN {
45 # TODO remove this once not used anymore
37 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it 46 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it
38} 47}
39 48
40our @EXPORT = qw(inet_aton fh_nonblocking guard tcp_server tcp_connect); 49BEGIN {
50 my $af_inet6 = eval { local $SIG{__DIE__}; &Socket::AF_INET6 };
41 51
42our $VERSION = '1.0'; 52 # uhoh
53 $af_inet6 ||= 10 if $^O =~ /linux/;
54 $af_inet6 ||= 23 if $^O =~ /cygwin/i;
55 $af_inet6 ||= 23 if AnyEvent::WIN32;
56 $af_inet6 ||= 24 if $^O =~ /openbsd|netbsd/;
57 $af_inet6 ||= 28 if $^O =~ /freebsd/;
43 58
44our $MAXPARALLEL = 16; # max. number of parallel jobs 59 $af_inet6 && socket my $ipv6_socket, $af_inet6, &Socket::SOCK_STREAM, 0 # check if they can be created
60 or $af_inet6 = 0;
45 61
46our $running; 62 eval "sub AF_INET6() { $af_inet6 }"; die if $@;
47our @queue;
48 63
64 delete $AnyEvent::PROTOCOL{ipv6} unless $af_inet6;
65}
66
67BEGIN {
68 # broken windows perls use undocumented error codes...
69 if (AnyEvent::WIN32) {
70 eval "sub WSAWOULDBLOCK() { 10035 }";
71 eval "sub WSAEINPROGRESS() { 10036 }";
72 } else {
73 eval "sub WSAWOULDBLOCK() { -1e99 }"; # should never match any errno value
74 eval "sub WSAEINPROGRESS() { -1e99 }"; # should never match any errno value
75 }
76}
77
78=item ($r, $w) = portable_pipe
79
80Calling C<pipe> in Perl is portable - except it doesn't really work on
81sucky windows platforms (at least not with most perls - cygwin's perl
82notably works fine).
83
84On that platform, you actually get two file handles you cannot use select
85on.
86
87This function gives you a pipe that actually works even on the broken
88Windows platform (by creating a pair of TCP sockets, so do not expect any
89speed from that).
90
91Returns the empty list on any errors.
92
93=cut
94
95sub portable_pipe() {
96 my ($r, $w);
97
98 if (AnyEvent::WIN32) {
99 socketpair $r, $w, &Socket::AF_UNIX, &Socket::SOCK_STREAM, 0
100 or return;
101 } else {
102 pipe $r, $w
103 or return;
104 }
105
106 ($r, $w)
107}
108
109=item fork_call $coderef, @args, $cb->(@res)
110
111Executes the given code reference asynchronously, by forking. Everything
112the C<$coderef> returns will transferred to the calling process (by
113serialising and deserialising via L<Storable>).
114
115If there are any errors, then the C<$cb> will be called without any
116arguments. In that case, either C<$@> contains the exception, or C<$!>
117contains an error number. In all other cases, C<$@> will be C<undef>ined.
118
119The C<$coderef> must not ever call an event-polling function or use
120event-based programming.
121
122Note that forking can be expensive in large programs (RSS 200MB+). On
123windows, it is abysmally slow, do not expect more than 5..20 forks/s on
124that sucky platform (note this uses perl's pseudo-threads, so avoid those
125like the plague).
126
127=item $AnyEvent::Util::MAX_FORKS [default: 10]
128
129The maximum number of child processes that C<fork_call> will fork in
130parallel. Any additional requests will be queued until a slot becomes free
131again.
132
133The environment variable C<PERL_ANYEVENT_MAX_FORKS> is used to initialise
134this value.
135
136=cut
137
138our $MAX_FORKS = int 1 * $ENV{PERL_ANYEVENT_MAX_FORKS};
139$MAX_FORKS = 10 if $MAX_FORKS <= 0;
140
141my $forks;
142my @fork_queue;
143
49sub _schedule; 144sub _fork_schedule;
50sub _schedule { 145sub _fork_schedule {
51 return unless @queue; 146 while () {
52 return if $running >= $MAXPARALLEL; 147 return if $forks >= $MAX_FORKS;
53 148
54 ++$running; 149 my $job = shift @fork_queue
55 my ($cb, $sub, @args) = @{shift @queue}; 150 or return;
56 151
57 if (eval { local $SIG{__DIE__}; require POSIX }) { 152 ++$forks;
58 my $pid = open my $fh, "-|";
59 153
154 my $coderef = shift @$job;
155 my $cb = pop @$job;
156
157 # gimme a break...
158 my ($r, $w) = portable_pipe
159 or ($forks and last) # allow failures when we have at least one job
160 or die "fork_call: $!";
161
162 my $pid = fork;
163
164 if ($pid != 0) {
165 # parent
166 close $w;
167
168 my $buf;
169
170 my $ww; $ww = AnyEvent->io (fh => $r, poll => 'r', cb => sub {
171 my $len = sysread $r, $buf, 65536, length $buf;
172
173 if ($len <= 0) {
174 undef $ww;
175 close $r;
176 --$forks;
177 _fork_schedule;
178
179 my $result = eval { Storable::thaw ($buf) };
180 $result = [$@] unless $result;
181 $@ = shift @$result;
182
183 $cb->(@$result);
184
185 # clean up the pid
186 waitpid $pid, 0;
187 }
188 });
189
60 if (!defined $pid) { 190 } elsif (defined $pid) {
61 die "fork: $!"; 191 # child
62 } elsif (!$pid) { 192 close $r;
63 syswrite STDOUT, join "\0", map { unpack "H*", $_ } $sub->(@args); 193
194 my $result = eval {
195 local $SIG{__DIE__};
196
197 Storable::freeze ([undef, $coderef->(@$job)])
198 };
199
200 $result = Storable::freeze (["$@"])
201 if $@;
202
203 # windows forces us to these contortions
204 my $ofs;
205
206 while () {
207 my $len = (length $result) - $ofs
208 or last;
209
210 $len = syswrite $w, $result, $len < 65536 ? $len : 65536, $ofs;
211
212 last if $len <= 0;
213
214 $ofs += $len;
215 }
216
217 close $w;
218
219 if (AnyEvent::WIN32) {
220 kill 9, $$; # yeah, windows for the win
221 } else {
222 # on native windows, _exit KILLS YOUR FORKED CHILDREN!
64 POSIX::_exit (0); 223 POSIX::_exit (0);
224 }
225 exit 1;
226
227 } elsif (($! != &Errno::EAGAIN && $! != &Errno::ENOMEM) || !$forks) {
228 # we ignore some errors as long as we can run at least one job
229 # maybe we should wait a few seconds and retry instead
230 die "fork_call: $!";
65 } 231 }
66
67 my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
68 --$running;
69 _schedule;
70 undef $w;
71
72 my $buf;
73 sysread $fh, $buf, 16384, length $buf;
74 $cb->(map { pack "H*", $_ } split /\0/, $buf);
75 });
76 } else {
77 $cb->($sub->(@args));
78 } 232 }
79} 233}
80 234
81sub _do_asy { 235sub fork_call {
236 require Storable;
237
82 push @queue, [@_]; 238 push @fork_queue, [@_];
83 _schedule; 239 _fork_schedule;
84} 240}
85 241
242# to be removed
86sub dotted_quad($) { 243sub dotted_quad($) {
87 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 244 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
88 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 245 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
89 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 246 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
90 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x 247 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
91} 248}
92 249
93my $has_ev_adns; 250# just a forwarder
94
95sub has_ev_adns {
96 ($has_ev_adns ||= do {
97 my $model = AnyEvent::detect;
98 ($model eq "AnyEvent::Impl::EV" && eval { local $SIG{__DIE__}; require EV::ADNS })
99 ? 2 : 1 # so that || always detects as true
100 }) - 1 # 2 => true, 1 => false
101}
102
103=item inet_aton $name_or_address, $cb->(@addresses)
104
105Works similarly to its Socket counterpart, except that it uses a
106callback. Also, if a host has only an IPv6 address, this might be passed
107to the callback instead (use the length to detect this - 4 for IPv4, 16
108for IPv6).
109
110This function uses various shortcuts and will fall back to either
111L<EV::ADNS> or your systems C<inet_aton>.
112
113Unlike the L<Socket> function, you can get multiple IP addresses as result
114(currently only when EV::ADNS is being used).
115
116=cut
117
118sub inet_aton { 251sub inet_aton {
119 my ($name, $cb) = @_; 252 require AnyEvent::Socket;
120 253 *inet_aton = \&AnyEvent::Socket::inet_aton;
121 if (&dotted_quad) { 254 goto &inet_aton
122 $cb->(socket_inet_aton $name);
123 } elsif ($name eq "localhost") { # rfc2606 et al.
124 $cb->(v127.0.0.1);
125 } elsif (&has_ev_adns) {
126 # work around some idiotic ands rfc readings
127 # rather hackish support for AAAA records (should
128 # wait for adns_getaddrinfo...)
129
130 my $loop = 10; # follow cname chains up to this length
131 my $qt;
132 my $acb; $acb = sub {
133 my ($status, undef, @a) = @_;
134
135 if ($status == &EV::ADNS::s_ok) {
136 if ($qt eq "a") {
137 return $cb->(map +(socket_inet_aton $_), @a);
138 } elsif ($qt eq "aaaa") {
139 return $cb->(@a);
140 } elsif ($qt eq "cname") {
141 $name = $a[0]; # there can only be one :)
142 $qt = "a";
143 return EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
144 }
145 } elsif ($status == &EV::ADNS::s_prohibitedcname) {
146 # follow cname chains
147 if ($loop--) {
148 $qt = "cname";
149 return EV::ADNS::submit ($name, &EV::ADNS::r_cname, 0, $acb);
150 }
151 } elsif ($status == &EV::ADNS::s_nodata) {
152 if ($qt eq "a") {
153 # ask for raw AAAA (might not be a good method, but adns is too broken...)
154 $qt = "aaaa";
155 return EV::ADNS::submit ($name, &EV::ADNS::r_unknown | 28, 0, $acb);
156 }
157 }
158
159 $cb->();
160 };
161
162 $qt = "a";
163 EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
164 } else {
165 _do_asy $cb, sub {
166 my $ipn = socket_inet_aton $_[0];
167 $ipn ? ($ipn) : ()
168 }, @_;
169 }
170} 255}
171 256
172=item fh_nonblocking $fh, $nonblocking 257=item fh_nonblocking $fh, $nonblocking
173 258
174Sets the blocking state of the given filehandle (true == nonblocking, 259Sets the blocking state of the given filehandle (true == nonblocking,
180sub fh_nonblocking($$) { 265sub fh_nonblocking($$) {
181 my ($fh, $nb) = @_; 266 my ($fh, $nb) = @_;
182 267
183 require Fcntl; 268 require Fcntl;
184 269
185 if ($^O eq "MSWin32") { 270 if (AnyEvent::WIN32) {
186 $nb = (! ! $nb) + 0; 271 $nb = (! ! $nb) + 0;
187 ioctl $fh, 0x8004667e, \$nb; # FIONBIO 272 ioctl $fh, 0x8004667e, \$nb; # FIONBIO
188 } else { 273 } else {
189 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0; 274 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
190 } 275 }
196code block. 281code block.
197 282
198This is often handy in continuation-passing style code to clean up some 283This is often handy in continuation-passing style code to clean up some
199resource regardless of where you break out of a process. 284resource regardless of where you break out of a process.
200 285
286You can call one method on the returned object:
287
288=item $guard->cancel
289
290This simply causes the code block not to be invoked: it "cancels" the
291guard.
292
201=cut 293=cut
202 294
203sub AnyEvent::Util::Guard::DESTROY { 295sub AnyEvent::Util::Guard::DESTROY {
204 ${$_[0]}->(); 296 ${$_[0]}->();
205} 297}
206 298
299sub AnyEvent::Util::Guard::cancel($) {
300 ${$_[0]} = sub { };
301}
302
207sub guard(&) { 303sub guard(&) {
208 bless \(my $cb = shift), AnyEvent::Util::Guard:: 304 bless \(my $cb = shift), AnyEvent::Util::Guard::
209} 305}
210 306
211sub _tcp_port($) {
212 $_[0] =~ /^\d*$/ and return $1*1;
213
214 (getservbyname $_[0], "tcp")[2]
215 or Carp::croak "$_[0]: service unknown"
216}
217
218=item my $guard = AnyEvent::Util::tcp_connect $host, $port, $connect_cb[, $prepare_cb]
219
220This function is experimental.
221
222This is a convenience function that creates a tcp socket and makes a 100%
223non-blocking connect to the given C<$host> (which can be a hostname or a
224textual IP address) and C<$port> (which can be a numeric port number or a
225service name).
226
227Unless called in void context, it returns a guard object that will
228automatically abort connecting when it gets destroyed (it does not do
229anything to the socket after the connect was successful).
230
231If the connect is successful, then the C<$connect_cb> will be invoked with
232the socket filehandle (in non-blocking mode) as first and the peer host
233(as a textual IP address) and peer port as second and third arguments,
234respectively.
235
236If the connect is unsuccessful, then the C<$connect_cb> will be invoked
237without any arguments and C<$!> will be set appropriately (with C<ENXIO>
238indicating a dns resolution failure).
239
240The filehandle is suitable to be plugged into L<AnyEvent::Handle>, but can
241be used as a normal perl file handle as well.
242
243Sometimes you need to "prepare" the socket before connecting, for example,
244to C<bind> it to some port, or you want a specific connect timeout that
245is lower than your kernel's default timeout. In this case you can specify
246a second callback, C<$prepare_cb>. It will be called with the file handle
247in not-yet-connected state as only argument and must return the connection
248timeout value (or C<0>, C<undef> or the empty list to indicate the default
249timeout is to be used).
250
251Note that the socket could be either a IPv4 TCP socket or an IPv6 tcp
252socket (although only IPv4 is currently supported by this module).
253
254Simple Example: connect to localhost on port 22.
255
256 AnyEvent::Util::tcp_connect localhost => 22, sub {
257 my $fh = shift
258 or die "unable to connect: $!";
259 # do something
260 };
261
262Complex Example: connect to www.google.com on port 80 and make a simple
263GET request without much error handling. Also limit the connection timeout
264to 15 seconds.
265
266 AnyEvent::Util::tcp_connect "www.google.com", "http",
267 sub {
268 my ($fh) = @_
269 or die "unable to connect: $!";
270
271 my $handle; # avoid direct assignment so on_eof has it in scope.
272 $handle = new AnyEvent::Handle
273 fh => $fh,
274 on_eof => sub {
275 undef $handle; # keep it alive till eof
276 warn "done.\n";
277 };
278
279 $handle->push_write ("GET / HTTP/1.0\015\012\015\012");
280
281 $handle->push_read_line ("\015\012\015\012", sub {
282 my ($handle, $line) = @_;
283
284 # print response header
285 print "HEADER\n$line\n\nBODY\n";
286
287 $handle->on_read (sub {
288 # print response body
289 print $_[0]->rbuf;
290 $_[0]->rbuf = "";
291 });
292 });
293 }, sub {
294 my ($fh) = @_;
295 # could call $fh->bind etc. here
296
297 15
298 };
299
300=cut
301
302sub tcp_connect($$$;$) {
303 my ($host, $port, $connect, $prepare) = @_;
304
305 # see http://cr.yp.to/docs/connect.html for some background
306
307 my %state = ( fh => undef );
308
309 # name resolution
310 inet_aton $host, sub {
311 return unless exists $state{fh};
312
313 my $ipn = shift;
314
315 4 == length $ipn
316 or do {
317 %state = ();
318 $! = &Errno::ENXIO;
319 return $connect->();
320 };
321
322 # socket creation
323 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
324 or do {
325 %state = ();
326 return $connect->();
327 };
328
329 fh_nonblocking $state{fh}, 1;
330
331 # prepare and optional timeout
332 if ($prepare) {
333 my $timeout = $prepare->($state{fh});
334
335 $state{to} = AnyEvent->timer (after => $timeout, cb => sub {
336 %state = ();
337 $! = &Errno::ETIMEDOUT;
338 $connect->();
339 }) if $timeout;
340 }
341
342 # called when the connect was successful, which,
343 # in theory, could be the case immediately (but never is in practise)
344 my $connected = sub {
345 my $fh = delete $state{fh};
346 %state = ();
347
348 # we are connected, or maybe there was an error
349 if (my $sin = getpeername $fh) {
350 my ($port, $host) = Socket::unpack_sockaddr_in $sin;
351 $connect->($fh, (Socket::inet_ntoa $host), $port);
352 } else {
353 # dummy read to fetch real error code
354 sysread $fh, my $buf, 1 if $! == &Errno::ENOTCONN;
355 $connect->();
356 }
357 };
358
359 # now connect
360 if (connect $state{fh}, Socket::pack_sockaddr_in _tcp_port $port, $ipn) {
361 $connected->();
362 } elsif ($! == &Errno::EINPROGRESS || $! == &Errno::EWOULDBLOCK) { # EINPROGRESS is POSIX
363 $state{ww} = AnyEvent->io (fh => $state{fh}, poll => 'w', cb => $connected);
364 } else {
365 %state = ();
366 $connect->();
367 }
368 };
369
370 defined wantarray
371 ? guard { %state = () } # break any circular dependencies and unregister watchers
372 : ()
373}
374
375=item $guard = AnyEvent::Util::tcp_server $host, $port, $accept_cb[, $prepare_cb]
376
377This function is experimental.
378
379Create and bind a tcp socket to the given host (any IPv4 host if undef,
380otherwise it must be an IPv4 or IPv6 address) and port (service name or
381numeric port number, or an ephemeral port if given as zero or undef, so
382you cnanot bind to tcp port zero), set the SO_REUSEADDR flag and call
383C<listen>.
384
385For each new connection that could be C<accept>ed, call the C<$accept_cb>
386with the filehandle (in non-blocking mode) as first and the peer host and
387port as second and third arguments (see C<tcp_connect> for details).
388
389Croaks on any errors.
390
391If called in non-void context, then this function returns a guard object
392whose lifetime it tied to the tcp server: If the object gets destroyed,
393the server will be stopped (but existing accepted connections will
394continue).
395
396If you need more control over the listening socket, you can provide a
397C<$prepare_cb>, which is called just before the C<listen ()> call, with
398the listen file handle as first argument.
399
400It should return the length of the listen queue (or C<0> for the default).
401
402Example: bind on tcp port 8888 on the local machine and tell each client
403to go away.
404
405 AnyEvent::Util::tcp_server undef, 8888, sub {
406 my ($fh, $host, $port) = @_;
407
408 syswrite $fh, "The internet is full, $host:$port. Go away!\015\012";
409 };
410
411=cut
412
413sub tcp_server($$$;$) {
414 my ($host, $port, $accept, $prepare) = @_;
415
416 my %state;
417
418 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
419 or Carp::croak "socket: $!";
420
421 setsockopt $state{fh}, &Socket::SOL_SOCKET, &Socket::SO_REUSEADDR, 1
422 or Carp::croak "so_reuseaddr: $!";
423
424 bind $state{fh}, Socket::pack_sockaddr_in _tcp_port $port, socket_inet_aton ($host || "0.0.0.0")
425 or Carp::croak "bind: $!";
426
427 fh_nonblocking $state{fh}, 1;
428
429 my $len = ($prepare && $prepare->($state{fh})) || 128;
430
431 listen $state{fh}, $len
432 or Carp::croak "listen: $!";
433
434 $state{aw} = AnyEvent->io (fh => $state{fh}, poll => 'r', cb => sub {
435 # this closure keeps $state alive
436 while (my $peer = accept my $fh, $state{fh}) {
437 fh_nonblocking $fh, 1; # POSIX requires inheritance, the outside world does not
438 my ($port, $host) = Socket::unpack_sockaddr_in $peer;
439 $accept->($fh, (Socket::inet_ntoa $host), $port);
440 }
441 });
442
443 defined wantarray
444 ? guard { %state = () } # clear fh and watcher, which breaks the circular dependency
445 : ()
446}
447
4481; 3071;
449 308
450=back 309=back
451 310
452=head1 AUTHOR 311=head1 AUTHOR

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines