ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Util.pm (file contents):
Revision 1.5 by root, Sun Apr 27 20:17:46 2008 UTC vs.
Revision 1.39 by root, Fri May 30 21:38:46 2008 UTC

3AnyEvent::Util - various utility functions. 3AnyEvent::Util - various utility functions.
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::Util; 7 use AnyEvent::Util;
8
9 inet_aton $name, $cb->($ipn || undef);
10 8
11=head1 DESCRIPTION 9=head1 DESCRIPTION
12 10
13This module implements various utility functions, mostly replacing 11This module implements various utility functions, mostly replacing
14well-known functions by event-ised counterparts. 12well-known functions by event-ised counterparts.
15 13
14All functions documented without C<AnyEvent::Util::> prefix are exported
15by default.
16
16=over 4 17=over 4
17 18
18=cut 19=cut
19 20
20package AnyEvent::Util; 21package AnyEvent::Util;
21 22
23no warnings;
22use strict; 24use strict;
23 25
24no warnings "uninitialized"; 26use Carp ();
25 27use Errno ();
26use Socket (); 28use Socket ();
27 29
28use AnyEvent; 30use AnyEvent ();
29 31
30use base 'Exporter'; 32use base 'Exporter';
31 33
32#our @EXPORT = qw(gethostbyname gethostbyaddr); 34our @EXPORT = qw(fh_nonblocking guard fork_call portable_pipe);
33our @EXPORT_OK = qw(inet_aton); 35our @EXPORT_OK = qw(AF_INET6 WSAEWOULDBLOCK WSAEINPROGRESS WSAEINVAL WSAWOULDBLOCK);
34 36
35our $VERSION = '1.0'; 37our $VERSION = 4.1;
36 38
37our $MAXPARALLEL = 16; # max. number of parallel jobs 39BEGIN {
40 my $posix = 1 * eval { local $SIG{__DIE__}; require POSIX };
41 eval "sub POSIX() { $posix }";
42}
38 43
39our $running; 44BEGIN {
40our @queue; 45 # TODO remove this once not used anymore
46 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it
47}
41 48
49BEGIN {
50 my $af_inet6 = eval { local $SIG{__DIE__}; &Socket::AF_INET6 };
51
52 # uhoh
53 $af_inet6 ||= 10 if $^O =~ /linux/;
54 $af_inet6 ||= 23 if $^O =~ /cygwin/i;
55 $af_inet6 ||= 23 if AnyEvent::WIN32;
56 $af_inet6 ||= 24 if $^O =~ /openbsd|netbsd/;
57 $af_inet6 ||= 28 if $^O =~ /freebsd/;
58
59 $af_inet6 && socket my $ipv6_socket, $af_inet6, &Socket::SOCK_STREAM, 0 # check if they can be created
60 or $af_inet6 = 0;
61
62 eval "sub AF_INET6() { $af_inet6 }"; die if $@;
63
64 delete $AnyEvent::PROTOCOL{ipv6} unless $af_inet6;
65}
66
67BEGIN {
68 # broken windows perls use undocumented error codes...
69 if (AnyEvent::WIN32) {
70 eval "sub WSAEINVAL() { 10022 }";
71 eval "sub WSAEWOULDBLOCK() { 10035 }";
72 eval "sub WSAWOULDBLOCK() { 10035 }"; # TODO remove here ands from @export_ok
73 eval "sub WSAEINPROGRESS() { 10036 }";
74 } else {
75 # these should never match any errno value
76 eval "sub WSAEINVAL() { -1e99 }";
77 eval "sub WSAEWOULDBLOCK() { -1e99 }";
78 eval "sub WSAWOULDBLOCK() { -1e99 }"; # TODO
79 eval "sub WSAEINPROGRESS() { -1e99 }";
80 }
81}
82
83=item ($r, $w) = portable_pipe
84
85Calling C<pipe> in Perl is portable - except it doesn't really work on
86sucky windows platforms (at least not with most perls - cygwin's perl
87notably works fine).
88
89On that platform, you actually get two file handles you cannot use select
90on.
91
92This function gives you a pipe that actually works even on the broken
93Windows platform (by creating a pair of TCP sockets, so do not expect any
94speed from that).
95
96Returns the empty list on any errors.
97
98=cut
99
100sub portable_pipe() {
101 my ($r, $w);
102
103 if (AnyEvent::WIN32) {
104 socketpair $r, $w, &Socket::AF_UNIX, &Socket::SOCK_STREAM, 0
105 or return;
106 } else {
107 pipe $r, $w
108 or return;
109 }
110
111 ($r, $w)
112}
113
114=item fork_call $coderef, @args, $cb->(@res)
115
116Executes the given code reference asynchronously, by forking. Everything
117the C<$coderef> returns will transferred to the calling process (by
118serialising and deserialising via L<Storable>).
119
120If there are any errors, then the C<$cb> will be called without any
121arguments. In that case, either C<$@> contains the exception, or C<$!>
122contains an error number. In all other cases, C<$@> will be C<undef>ined.
123
124The C<$coderef> must not ever call an event-polling function or use
125event-based programming.
126
127Note that forking can be expensive in large programs (RSS 200MB+). On
128windows, it is abysmally slow, do not expect more than 5..20 forks/s on
129that sucky platform (note this uses perl's pseudo-threads, so avoid those
130like the plague).
131
132=item $AnyEvent::Util::MAX_FORKS [default: 10]
133
134The maximum number of child processes that C<fork_call> will fork in
135parallel. Any additional requests will be queued until a slot becomes free
136again.
137
138The environment variable C<PERL_ANYEVENT_MAX_FORKS> is used to initialise
139this value.
140
141=cut
142
143our $MAX_FORKS = int 1 * $ENV{PERL_ANYEVENT_MAX_FORKS};
144$MAX_FORKS = 10 if $MAX_FORKS <= 0;
145
146my $forks;
147my @fork_queue;
148
42sub _schedule; 149sub _fork_schedule;
43sub _schedule { 150sub _fork_schedule {
44 return unless @queue; 151 while () {
45 return if $running >= $MAXPARALLEL; 152 return if $forks >= $MAX_FORKS;
46 153
47 ++$running; 154 my $job = shift @fork_queue
48 my ($cb, $sub, @args) = @{shift @queue}; 155 or return;
49 156
50 if (eval { local $SIG{__DIE__}; require POSIX }) { 157 ++$forks;
51 my $pid = open my $fh, "-|";
52 158
159 my $coderef = shift @$job;
160 my $cb = pop @$job;
161
162 # gimme a break...
163 my ($r, $w) = portable_pipe
164 or ($forks and last) # allow failures when we have at least one job
165 or die "fork_call: $!";
166
167 my $pid = fork;
168
169 if ($pid != 0) {
170 # parent
171 close $w;
172
173 my $buf;
174
175 my $ww; $ww = AnyEvent->io (fh => $r, poll => 'r', cb => sub {
176 my $len = sysread $r, $buf, 65536, length $buf;
177
178 if ($len <= 0) {
179 undef $ww;
180 close $r;
181 --$forks;
182 _fork_schedule;
183
184 my $result = eval { Storable::thaw ($buf) };
185 $result = [$@] unless $result;
186 $@ = shift @$result;
187
188 $cb->(@$result);
189
190 # clean up the pid
191 waitpid $pid, 0;
192 }
193 });
194
53 if (!defined $pid) { 195 } elsif (defined $pid) {
54 die "fork: $!"; 196 # child
55 } elsif (!$pid) { 197 close $r;
56 syswrite STDOUT, join "\0", map { unpack "H*", $_ } $sub->(@args); 198
199 my $result = eval {
200 local $SIG{__DIE__};
201
202 Storable::freeze ([undef, $coderef->(@$job)])
203 };
204
205 $result = Storable::freeze (["$@"])
206 if $@;
207
208 # windows forces us to these contortions
209 my $ofs;
210
211 while () {
212 my $len = (length $result) - $ofs
213 or last;
214
215 $len = syswrite $w, $result, $len < 65536 ? $len : 65536, $ofs;
216
217 last if $len <= 0;
218
219 $ofs += $len;
220 }
221
222 close $w;
223
224 if (AnyEvent::WIN32) {
225 kill 9, $$; # yeah, windows for the win
226 } else {
227 # on native windows, _exit KILLS YOUR FORKED CHILDREN!
57 POSIX::_exit (0); 228 POSIX::_exit (0);
229 }
230 exit 1;
231
232 } elsif (($! != &Errno::EAGAIN && $! != &Errno::ENOMEM) || !$forks) {
233 # we ignore some errors as long as we can run at least one job
234 # maybe we should wait a few seconds and retry instead
235 die "fork_call: $!";
58 } 236 }
59
60 my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
61 --$running;
62 _schedule;
63 undef $w;
64
65 my $buf;
66 sysread $fh, $buf, 16384, length $buf;
67 $cb->(map { pack "H*", $_ } split /\0/, $buf);
68 });
69 } else {
70 $cb->($sub->(@args));
71 } 237 }
72} 238}
73 239
74sub _do_asy { 240sub fork_call {
241 require Storable;
242
75 push @queue, [@_]; 243 push @fork_queue, [@_];
76 _schedule; 244 _fork_schedule;
77} 245}
78 246
247# to be removed
79sub dotted_quad($) { 248sub dotted_quad($) {
80 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 249 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
81 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 250 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
82 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 251 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
83 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x 252 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
84} 253}
85 254
86my $has_ev_adns; 255# just a forwarder
87
88sub has_ev_adns {
89 ($has_ev_adns ||= do {
90 my $model = AnyEvent::detect;
91 (($model eq "AnyEvent::Impl::CoroEV" or $model eq "AnyEvent::Impl::EV")
92 && eval { local $SIG{__DIE__}; require EV::ADNS })
93 ? 2 : 1 # so that || always detects as true
94 }) - 1 # 2 => true, 1 => false
95}
96
97=item AnyEvent::Util::inet_aton $name_or_address, $cb->($binary_address_or_undef)
98
99Works almost exactly like its Socket counterpart, except that it uses a
100callback.
101
102=cut
103
104sub inet_aton { 256sub inet_aton {
105 my ($name, $cb) = @_; 257 require AnyEvent::Socket;
258 *inet_aton = \&AnyEvent::Socket::inet_aton;
259 goto &inet_aton
260}
106 261
107 if (&dotted_quad) { 262=item fh_nonblocking $fh, $nonblocking
108 $cb->(Socket::inet_aton $name); 263
109 } elsif (&has_ev_adns) { 264Sets the blocking state of the given filehandle (true == nonblocking,
110 EV::ADNS::submit ($name, &EV::ADNS::r_addr, 0, sub { 265false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on
111 my (undef, undef, @a) = @_; 266broken (i.e. windows) platforms.
112 $cb->(@a ? Socket::inet_aton $a[0] : undef); 267
113 }); 268=cut
269
270sub fh_nonblocking($$) {
271 my ($fh, $nb) = @_;
272
273 require Fcntl;
274
275 if (AnyEvent::WIN32) {
276 $nb = (! ! $nb) + 0;
277 ioctl $fh, 0x8004667e, \$nb; # FIONBIO
114 } else { 278 } else {
115 _do_asy $cb, sub { Socket::inet_aton $_[0] }, @_; 279 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
116 } 280 }
281}
282
283=item $guard = guard { CODE }
284
285This function creates a special object that, when called, will execute the
286code block.
287
288This is often handy in continuation-passing style code to clean up some
289resource regardless of where you break out of a process.
290
291You can call one method on the returned object:
292
293=item $guard->cancel
294
295This simply causes the code block not to be invoked: it "cancels" the
296guard.
297
298=cut
299
300sub AnyEvent::Util::Guard::DESTROY {
301 ${$_[0]}->();
302}
303
304sub AnyEvent::Util::Guard::cancel($) {
305 ${$_[0]} = sub { };
306}
307
308sub guard(&) {
309 bless \(my $cb = shift), AnyEvent::Util::Guard::
117} 310}
118 311
1191; 3121;
120 313
121=back 314=back

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines