ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Util.pm (file contents):
Revision 1.10 by elmex, Thu May 15 13:50:23 2008 UTC vs.
Revision 1.46 by root, Fri Jun 6 15:35:30 2008 UTC

2 2
3AnyEvent::Util - various utility functions. 3AnyEvent::Util - various utility functions.
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::Util; 7 use AnyEvent::Util;
8
9 inet_aton $name, $cb->($ipn || undef);
10 8
11=head1 DESCRIPTION 9=head1 DESCRIPTION
12 10
13This module implements various utility functions, mostly replacing 11This module implements various utility functions, mostly replacing
14well-known functions by event-ised counterparts. 12well-known functions by event-ised counterparts.
15 13
14All functions documented without C<AnyEvent::Util::> prefix are exported
15by default.
16
16=over 4 17=over 4
17 18
18=cut 19=cut
19 20
20package AnyEvent::Util; 21package AnyEvent::Util;
21 22
23no warnings;
22use strict; 24use strict;
23 25
24no warnings "uninitialized"; 26use Carp ();
25 27use Errno ();
26use Errno qw/ENXIO/;
27use Socket (); 28use Socket ();
28use IO::Socket::INET ();
29 29
30use AnyEvent; 30use AnyEvent ();
31 31
32use base 'Exporter'; 32use base 'Exporter';
33 33
34#our @EXPORT = qw(gethostbyname gethostbyaddr); 34our @EXPORT = qw(fh_nonblocking guard fork_call portable_pipe);
35our @EXPORT_OK = qw(inet_aton); 35our @EXPORT_OK = qw(AF_INET6 WSAEWOULDBLOCK WSAEINPROGRESS WSAEINVAL WSAWOULDBLOCK);
36 36
37our $VERSION = '1.0'; 37our $VERSION = 4.151;
38 38
39our $MAXPARALLEL = 16; # max. number of parallel jobs 39BEGIN {
40 my $posix = 1 * eval { local $SIG{__DIE__}; require POSIX };
41 eval "sub POSIX() { $posix }";
42}
40 43
41our $running; 44BEGIN {
42our @queue; 45 # TODO remove this once not used anymore
46 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it
47}
43 48
49BEGIN {
50 my $af_inet6 = eval { local $SIG{__DIE__}; &Socket::AF_INET6 };
51
52 # uhoh
53 $af_inet6 ||= 10 if $^O =~ /linux/;
54 $af_inet6 ||= 23 if $^O =~ /cygwin/i;
55 $af_inet6 ||= 23 if AnyEvent::WIN32;
56 $af_inet6 ||= 24 if $^O =~ /openbsd|netbsd/;
57 $af_inet6 ||= 28 if $^O =~ /freebsd/;
58
59 $af_inet6 && socket my $ipv6_socket, $af_inet6, &Socket::SOCK_STREAM, 0 # check if they can be created
60 or $af_inet6 = 0;
61
62 eval "sub AF_INET6() { $af_inet6 }"; die if $@;
63
64 delete $AnyEvent::PROTOCOL{ipv6} unless $af_inet6;
65}
66
67BEGIN {
68 # broken windows perls use undocumented error codes...
69 if (AnyEvent::WIN32) {
70 eval "sub WSAEINVAL() { 10022 }";
71 eval "sub WSAEWOULDBLOCK() { 10035 }";
72 eval "sub WSAWOULDBLOCK() { 10035 }"; # TODO remove here ands from @export_ok
73 eval "sub WSAEINPROGRESS() { 10036 }";
74 } else {
75 # these should never match any errno value
76 eval "sub WSAEINVAL() { -1e99 }";
77 eval "sub WSAEWOULDBLOCK() { -1e99 }";
78 eval "sub WSAWOULDBLOCK() { -1e99 }"; # TODO
79 eval "sub WSAEINPROGRESS() { -1e99 }";
80 }
81}
82
83=item ($r, $w) = portable_pipe
84
85Calling C<pipe> in Perl is portable - except it doesn't really work on
86sucky windows platforms (at least not with most perls - cygwin's perl
87notably works fine).
88
89On that platform, you actually get two file handles you cannot use select
90on.
91
92This function gives you a pipe that actually works even on the broken
93Windows platform (by creating a pair of TCP sockets, so do not expect any
94speed from that).
95
96Returns the empty list on any errors.
97
98=cut
99
100sub portable_pipe() {
101 my ($r, $w);
102
103 if (AnyEvent::WIN32) {
104 socketpair $r, $w, &Socket::AF_UNIX, &Socket::SOCK_STREAM, 0
105 or return;
106 } else {
107 pipe $r, $w
108 or return;
109 }
110
111 ($r, $w)
112}
113
114=item fork_call $coderef, @args, $cb->(@res)
115
116Executes the given code reference asynchronously, by forking. Everything
117the C<$coderef> returns will transferred to the calling process (by
118serialising and deserialising via L<Storable>).
119
120If there are any errors, then the C<$cb> will be called without any
121arguments. In that case, either C<$@> contains the exception, or C<$!>
122contains an error number. In all other cases, C<$@> will be C<undef>ined.
123
124The C<$coderef> must not ever call an event-polling function or use
125event-based programming.
126
127Note that forking can be expensive in large programs (RSS 200MB+). On
128windows, it is abysmally slow, do not expect more than 5..20 forks/s on
129that sucky platform (note this uses perl's pseudo-threads, so avoid those
130like the plague).
131
132=item $AnyEvent::Util::MAX_FORKS [default: 10]
133
134The maximum number of child processes that C<fork_call> will fork in
135parallel. Any additional requests will be queued until a slot becomes free
136again.
137
138The environment variable C<PERL_ANYEVENT_MAX_FORKS> is used to initialise
139this value.
140
141=cut
142
143our $MAX_FORKS = int 1 * $ENV{PERL_ANYEVENT_MAX_FORKS};
144$MAX_FORKS = 10 if $MAX_FORKS <= 0;
145
146my $forks;
147my @fork_queue;
148
44sub _schedule; 149sub _fork_schedule;
45sub _schedule { 150sub _fork_schedule {
46 return unless @queue; 151 while () {
47 return if $running >= $MAXPARALLEL; 152 return if $forks >= $MAX_FORKS;
48 153
49 ++$running; 154 my $job = shift @fork_queue
50 my ($cb, $sub, @args) = @{shift @queue}; 155 or return;
51 156
52 if (eval { local $SIG{__DIE__}; require POSIX }) { 157 ++$forks;
53 my $pid = open my $fh, "-|";
54 158
159 my $coderef = shift @$job;
160 my $cb = pop @$job;
161
162 # gimme a break...
163 my ($r, $w) = portable_pipe
164 or ($forks and last) # allow failures when we have at least one job
165 or die "fork_call: $!";
166
167 my $pid = fork;
168
169 if ($pid != 0) {
170 # parent
171 close $w;
172
173 my $buf;
174
175 my $ww; $ww = AnyEvent->io (fh => $r, poll => 'r', cb => sub {
176 my $len = sysread $r, $buf, 65536, length $buf;
177
178 if ($len <= 0) {
179 undef $ww;
180 close $r;
181 --$forks;
182 _fork_schedule;
183
184 my $result = eval { Storable::thaw ($buf) };
185 $result = [$@] unless $result;
186 $@ = shift @$result;
187
188 $cb->(@$result);
189
190 # clean up the pid
191 waitpid $pid, 0;
192 }
193 });
194
55 if (!defined $pid) { 195 } elsif (defined $pid) {
56 die "fork: $!"; 196 # child
57 } elsif (!$pid) { 197 close $r;
58 syswrite STDOUT, join "\0", map { unpack "H*", $_ } $sub->(@args); 198
199 my $result = eval {
200 local $SIG{__DIE__};
201
202 Storable::freeze ([undef, $coderef->(@$job)])
203 };
204
205 $result = Storable::freeze (["$@"])
206 if $@;
207
208 # windows forces us to these contortions
209 my $ofs;
210
211 while () {
212 my $len = (length $result) - $ofs
213 or last;
214
215 $len = syswrite $w, $result, $len < 65536 ? $len : 65536, $ofs;
216
217 last if $len <= 0;
218
219 $ofs += $len;
220 }
221
222 close $w;
223
224 if (AnyEvent::WIN32) {
225 kill 9, $$; # yeah, windows for the win
226 } else {
227 # on native windows, _exit KILLS YOUR FORKED CHILDREN!
59 POSIX::_exit (0); 228 POSIX::_exit (0);
229 }
230 exit 1;
231
232 } elsif (($! != &Errno::EAGAIN && $! != &Errno::ENOMEM) || !$forks) {
233 # we ignore some errors as long as we can run at least one job
234 # maybe we should wait a few seconds and retry instead
235 die "fork_call: $!";
60 } 236 }
61
62 my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
63 --$running;
64 _schedule;
65 undef $w;
66
67 my $buf;
68 sysread $fh, $buf, 16384, length $buf;
69 $cb->(map { pack "H*", $_ } split /\0/, $buf);
70 });
71 } else {
72 $cb->($sub->(@args));
73 } 237 }
74} 238}
75 239
76sub _do_asy { 240sub fork_call {
241 require Storable;
242
77 push @queue, [@_]; 243 push @fork_queue, [@_];
78 _schedule; 244 _fork_schedule;
79} 245}
80 246
247# to be removed
81sub dotted_quad($) { 248sub dotted_quad($) {
82 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 249 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
83 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 250 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
84 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 251 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
85 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x 252 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
86} 253}
87 254
88my $has_ev_adns; 255# just a forwarder
89
90sub has_ev_adns {
91 ($has_ev_adns ||= do {
92 my $model = AnyEvent::detect;
93 ($model eq "AnyEvent::Impl::EV" && eval { local $SIG{__DIE__}; require EV::ADNS })
94 ? 2 : 1 # so that || always detects as true
95 }) - 1 # 2 => true, 1 => false
96}
97
98=item AnyEvent::Util::inet_aton $name_or_address, $cb->($binary_address_or_undef)
99
100Works almost exactly like its Socket counterpart, except that it uses a
101callback.
102
103=cut
104
105sub inet_aton { 256sub inet_aton {
106 my ($name, $cb) = @_; 257 require AnyEvent::Socket;
107 258 *inet_aton = \&AnyEvent::Socket::inet_aton;
108 if (&dotted_quad) { 259 goto &inet_aton
109 $cb->(Socket::inet_aton $name);
110 } elsif ($name eq "localhost") { # rfc2606 et al.
111 $cb->(v127.0.0.1);
112 } elsif (&has_ev_adns) {
113 EV::ADNS::submit ($name, &EV::ADNS::r_addr, 0, sub {
114 my (undef, undef, @a) = @_;
115 $cb->(@a ? Socket::inet_aton $a[0] : undef);
116 });
117 } else {
118 _do_asy $cb, sub { Socket::inet_aton $_[0] }, @_;
119 }
120} 260}
121 261
122=item AnyEvent::Util::fh_nonblocking $fh, $nonblocking 262=item fh_nonblocking $fh, $nonblocking
123 263
124Sets the blocking state of the given filehandle (true == nonblocking, 264Sets the blocking state of the given filehandle (true == nonblocking,
125false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on 265false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on
126broken (i.e. windows) platforms. 266broken (i.e. windows) platforms.
127 267
130sub fh_nonblocking($$) { 270sub fh_nonblocking($$) {
131 my ($fh, $nb) = @_; 271 my ($fh, $nb) = @_;
132 272
133 require Fcntl; 273 require Fcntl;
134 274
135 if ($^O eq "MSWin32") { 275 if (AnyEvent::WIN32) {
136 $nb = (! ! $nb) + 0; 276 $nb = (! ! $nb) + 0;
137 ioctl $fh, 0x8004667e, \$nb; # FIONBIO 277 ioctl $fh, 0x8004667e, \$nb; # FIONBIO
138 } else { 278 } else {
139 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0; 279 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
140 } 280 }
141} 281}
142 282
143=item AnyEvent::Util::connect ($socket, $connect_cb->($socket), $error_cb->()[, $timeout]) 283=item $guard = guard { CODE }
144 284
145Connects the socket C<$socket> non-blocking. C<$connect_cb> will be 285This function creates a special object that, when called, will execute the
146called when the socket was successfully connected and became writable, 286code block.
147the first argument to the C<$connect_cb> callback will be the C<$socket>
148itself.
149 287
150The blocking state of C<$socket> will be set to nonblocking via C<fh_nonblocking> (see 288This is often handy in continuation-passing style code to clean up some
151above). 289resource regardless of where you break out of a process.
152 290
153C<$error_cb> will be called when any error happened while connecting 291You can call one method on the returned object:
154the socket. C<$!> will be set to an appropriate error number.
155 292
156If C<$timeout> is given a timeout will be installed for the connect. If the 293=item $guard->cancel
157timeout was reached the C<$error_cb> callback will be called and C<$!> is set to
158C<ETIMEDOUT>.
159 294
160The return value of C<connect> will be a guard object that you have to keep 295This simply causes the code block not to be invoked: it "cancels" the
161referenced until you are done with the connect or received an error. 296guard.
162If you let the object's reference drop to zero the internal connect and timeout
163watchers will be removed.
164 297
165Here is a short example, which creates a socket and does a blocking DNS lookup via
166L<IO::Socket::INET>:
167
168 my $sock = IO::Socket::INET->new (
169 PeerAddr => "www.google.com:80",
170 Blocking => 0,
171 ) or die "Couldn't make socket: $!\n";
172
173 my $hdl;
174
175 my $watchobj = AnyEvent::Util::connect ($sock, sub {
176 my ($sock) = @_;
177
178 $hdl =
179 AnyEvent::Handle->new (
180 fh => $sock,
181 on_eof => sub {
182 print "received eof\n";
183 undef $hdl
184 }
185 );
186
187 $hdl->push_write ("GET / HTTP/1.0\015\012\015\012");
188
189 $hdl->push_read_line (sub {
190 my ($hdl, $line) = @_;
191 print "Yay, got line: $line\n";
192 });
193
194 }, sub {
195 warn "Got error on connect: $!\n";
196 }, 10);
197
198=cut 298=cut
199 299
200sub connect { 300sub AnyEvent::Util::Guard::DESTROY {
201 my ($socket, $c_cb, $e_cb, $tout) = @_; 301 local $@;
202 302
203 fh_nonblocking ($socket, 1); 303 eval {
204 304 local $SIG{__DIE__};
205 my $o = AnyEvent::Util::SocketHandle->new ( 305 ${$_[0]}->();
206 fh => $socket,
207 connect_cb => $c_cb,
208 error_cb => $e_cb,
209 timeout => $tout,
210 );
211
212 $o->connect;
213
214 $o
215}
216
217=item AnyEvent::Util::tcp_connect ($host, $port, $connect_cb->($socket), $error_cb->()[, $timeout])
218
219This is a shortcut function which behaves similar to the C<connect> function
220described above, except that it does a C<AnyEvent::Util::inet_aton> on C<$host>
221and creates a L<IO::Socket::INET> TCP connection for you, which will be
222passed as C<$socket> argument to the C<$connect_cb> callback above.
223
224In case the hostname couldn't be resolved C<$error_cb> will be called and C<$!>
225will be set to C<ENXIO>.
226
227For more details about the return value and the arguments see the C<connect>
228function above.
229
230Here is a short example:
231
232
233 my $hdl;
234 my $watchobj = AnyEvent::Util::tcp_connect ("www.google.com", 80, sub {
235 my ($sock) = @_;
236
237 $hdl =
238 AnyEvent::Handle->new (
239 fh => $sock,
240 on_eof => sub {
241 print "received eof\n";
242 undef $hdl
243 }
244 );
245
246 $hdl->push_write ("GET / HTTP/1.0\015\012\015\012");
247
248 $hdl->push_read_line (sub {
249 my ($hdl, $line) = @_;
250 print "Yay, got line: $line\n";
251 });
252
253 }, sub {
254 warn "Got error on connect: $!\n";
255 }, 10);
256
257=cut
258
259sub tcp_connect {
260 my ($host, $port, $c_cb, $e_cb, $tout, %sockargs) = @_;
261
262 my $o = AnyEvent::Util::SocketHandle->new (
263 connect_cb => $c_cb,
264 error_cb => $e_cb,
265 timeout => $tout,
266 );
267
268 $o->start_timeout;
269
270 AnyEvent::Util::inet_aton ($host, sub {
271 my ($addr) = @_;
272
273 return if $o->{timed_out};
274
275 if ($addr) {
276 my $sock =
277 IO::Socket::INET->new (
278 PeerHost => Socket::inet_ntoa ($addr),
279 PeerPort => $port,
280 Blocking => 0,
281 %sockargs
282 );
283
284 unless ($sock) {
285 $o->error;
286 }
287
288 fh_nonblocking ($sock, 1);
289
290 $o->{fh} = $sock;
291
292 $o->connect;
293
294 } else {
295 $! = ENXIO;
296 $o->error;
297 }
298 }); 306 };
299 307
300 $o 308 warn "runtime error in AnyEvent::guard callback: $@" if $@;
301} 309}
302 310
303=item AnyEvent::Util::listen ($socket, $client_cb->($new_socket, $peer_ad), $error_cb->()) 311sub AnyEvent::Util::Guard::cancel($) {
304 312 ${$_[0]} = sub { };
305This will listen and accept new connections on the C<$socket> in a non-blocking
306way. The callback C<$client_cb> will be called when a new client connection
307was accepted and the callback C<$error_cb> will be called in case of an error.
308C<$!> will be set to an approriate error number.
309
310The blocking state of C<$socket> will be set to nonblocking via C<fh_nonblocking> (see
311above).
312
313The first argument to C<$client_cb> will be the socket of the accepted client
314and the second argument the peer address.
315
316The return value is a guard object that you have to keep referenced as long as you
317want to accept new connections.
318
319Here is an example usage:
320
321 my $sock = IO::Socket::INET->new (
322 Listen => 5
323 ) or die "Couldn't make socket: $!\n";
324
325 my $watchobj = AnyEvent::Util::listen ($sock, sub {
326 my ($cl_sock, $cl_addr) = @_;
327
328 my ($port, $addr) = sockaddr_in ($cl_addr);
329 $addr = inet_ntoa ($addr);
330 print "Client connected: $addr:$port\n";
331
332 # ...
333
334 }, sub {
335 warn "Error on accept: $!"
336 });
337
338=cut
339
340sub listen {
341 my ($socket, $c_cb, $e_cb) = @_;
342
343 fh_nonblocking ($socket, 1);
344
345 my $o =
346 AnyEvent::Util::SocketHandle->new (
347 fh => $socket,
348 client_cb => $c_cb,
349 error_cb => $e_cb
350 );
351
352 $o->listen;
353
354 $o
355} 313}
356 314
357package AnyEvent::Util::SocketHandle; 315sub guard(&) {
358use Errno qw/ETIMEDOUT/; 316 bless \(my $cb = shift), AnyEvent::Util::Guard::
359use Socket;
360use Scalar::Util qw/weaken/;
361
362sub new {
363 my $this = shift;
364 my $class = ref($this) || $this;
365 my $self = { @_ };
366 bless $self, $class;
367
368 return $self
369}
370
371sub error {
372 my ($self) = @_;
373 delete $self->{con_w};
374 delete $self->{list_w};
375 delete $self->{tmout};
376 $self->{error_cb}->();
377}
378
379sub listen {
380 my ($self) = @_;
381
382 weaken $self;
383
384 $self->{list_w} =
385 AnyEvent->io (poll => 'r', fh => $self->{fh}, cb => sub {
386 my ($new_sock, $paddr) = $self->{fh}->accept ();
387
388 unless (defined $new_sock) {
389 $self->error;
390 return;
391 }
392
393 $self->{client_cb}->($new_sock, $paddr);
394 });
395}
396
397sub start_timeout {
398 my ($self) = @_;
399
400 if (defined $self->{timeout}) {
401 $self->{tmout} =
402 AnyEvent->timer (after => $self->{timeout}, cb => sub {
403 delete $self->{tmout};
404 $! = ETIMEDOUT;
405 $self->error;
406 $self->{timed_out} = 1;
407 });
408 }
409}
410
411sub connect {
412 my ($self) = @_;
413
414 weaken $self;
415
416 $self->start_timeout;
417
418 $self->{con_w} =
419 AnyEvent->io (poll => 'w', fh => $self->{fh}, cb => sub {
420 delete $self->{con_w};
421 delete $self->{tmout};
422
423 if ($! = $self->{fh}->sockopt (SO_ERROR)) {
424 $self->error;
425
426 } else {
427 $self->{connect_cb}->($self->{fh});
428 }
429 });
430} 317}
431 318
4321; 3191;
433 320
434=back 321=back

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines