ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
Revision: 1.52
Committed: Wed Jul 16 22:08:16 2008 UTC (15 years, 11 months ago) by root
Branch: MAIN
Changes since 1.51: +10 -12 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Util - various utility functions.
4    
5     =head1 SYNOPSIS
6    
7 root 1.40 use AnyEvent::Util;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module implements various utility functions, mostly replacing
12     well-known functions by event-ised counterparts.
13    
14 root 1.14 All functions documented without C<AnyEvent::Util::> prefix are exported
15     by default.
16    
17 root 1.1 =over 4
18    
19     =cut
20    
21     package AnyEvent::Util;
22    
23 root 1.21 no warnings;
24 root 1.1 use strict;
25    
26 root 1.19 use Carp ();
27 root 1.18 use Errno ();
28 root 1.1 use Socket ();
29    
30 root 1.33 use AnyEvent ();
31 root 1.1
32     use base 'Exporter';
33    
34 root 1.34 our @EXPORT = qw(fh_nonblocking guard fork_call portable_pipe);
35 root 1.38 our @EXPORT_OK = qw(AF_INET6 WSAEWOULDBLOCK WSAEINPROGRESS WSAEINVAL WSAWOULDBLOCK);
36 root 1.34
37 root 1.50 our $VERSION = 4.21;
38 root 1.34
39 root 1.18 BEGIN {
40 root 1.34 my $posix = 1 * eval { local $SIG{__DIE__}; require POSIX };
41     eval "sub POSIX() { $posix }";
42     }
43    
44     BEGIN {
45     # TODO remove this once not used anymore
46 root 1.18 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it
47     }
48    
49 root 1.26 BEGIN {
50 root 1.31 my $af_inet6 = eval { local $SIG{__DIE__}; &Socket::AF_INET6 };
51 root 1.30
52     # uhoh
53     $af_inet6 ||= 10 if $^O =~ /linux/;
54 root 1.32 $af_inet6 ||= 23 if $^O =~ /cygwin/i;
55 root 1.33 $af_inet6 ||= 23 if AnyEvent::WIN32;
56 root 1.30 $af_inet6 ||= 24 if $^O =~ /openbsd|netbsd/;
57     $af_inet6 ||= 28 if $^O =~ /freebsd/;
58    
59 root 1.29 $af_inet6 && socket my $ipv6_socket, $af_inet6, &Socket::SOCK_STREAM, 0 # check if they can be created
60     or $af_inet6 = 0;
61 root 1.30
62 root 1.26 eval "sub AF_INET6() { $af_inet6 }"; die if $@;
63    
64     delete $AnyEvent::PROTOCOL{ipv6} unless $af_inet6;
65     }
66    
67     BEGIN {
68     # broken windows perls use undocumented error codes...
69 root 1.33 if (AnyEvent::WIN32) {
70 root 1.36 eval "sub WSAEINVAL() { 10022 }";
71 root 1.37 eval "sub WSAEWOULDBLOCK() { 10035 }";
72 root 1.38 eval "sub WSAWOULDBLOCK() { 10035 }"; # TODO remove here ands from @export_ok
73 root 1.28 eval "sub WSAEINPROGRESS() { 10036 }";
74 root 1.26 } else {
75 root 1.36 # these should never match any errno value
76     eval "sub WSAEINVAL() { -1e99 }";
77 root 1.37 eval "sub WSAEWOULDBLOCK() { -1e99 }";
78 root 1.38 eval "sub WSAWOULDBLOCK() { -1e99 }"; # TODO
79 root 1.36 eval "sub WSAEINPROGRESS() { -1e99 }";
80 root 1.26 }
81     }
82    
83 root 1.34 =item ($r, $w) = portable_pipe
84    
85     Calling C<pipe> in Perl is portable - except it doesn't really work on
86     sucky windows platforms (at least not with most perls - cygwin's perl
87     notably works fine).
88    
89     On that platform, you actually get two file handles you cannot use select
90     on.
91    
92     This function gives you a pipe that actually works even on the broken
93     Windows platform (by creating a pair of TCP sockets, so do not expect any
94     speed from that).
95    
96     Returns the empty list on any errors.
97    
98     =cut
99    
100     sub portable_pipe() {
101     my ($r, $w);
102    
103     if (AnyEvent::WIN32) {
104     socketpair $r, $w, &Socket::AF_UNIX, &Socket::SOCK_STREAM, 0
105     or return;
106     } else {
107     pipe $r, $w
108     or return;
109     }
110    
111     ($r, $w)
112     }
113    
114 root 1.48 =item fork_call { CODE } @args, $cb->(@res)
115 root 1.34
116 root 1.50 Executes the given code block asynchronously, by forking. Everything the
117     block returns will be transferred to the calling process (by serialising and
118     deserialising via L<Storable>).
119 root 1.34
120     If there are any errors, then the C<$cb> will be called without any
121 root 1.49 arguments. In that case, either C<$@> contains the exception (and C<$!> is
122     irrelevant), or C<$!> contains an error number. In all other cases, C<$@>
123     will be C<undef>ined.
124 root 1.34
125 root 1.50 The code block must not ever call an event-polling function or use
126     event-based programming that might cause any callbacks registered in the
127     parent to run.
128    
129     Due to the endlessly sucky and broken native windows perls (there is no
130     way to cleanly exit a child process on that platform that doesn't also
131     kill the parent), you have to make sure that your main program doesn't
132     exit as long as any C<fork_calls> are still in progress, otherwise the
133     program won't exit (we are open for improvements that don't require XS
134     hackery).
135 root 1.34
136 root 1.35 Note that forking can be expensive in large programs (RSS 200MB+). On
137     windows, it is abysmally slow, do not expect more than 5..20 forks/s on
138     that sucky platform (note this uses perl's pseudo-threads, so avoid those
139     like the plague).
140 root 1.34
141 root 1.51 Example: poor man's async disk I/O (better use L<IO::AIO>).
142 root 1.50
143     fork_call {
144     open my $fh, "</etc/passwd"
145     or die "passwd: $!";
146     local $/;
147     <$fh>
148     } sub {
149     my ($passwd) = @_;
150     ...
151     };
152    
153 root 1.34 =item $AnyEvent::Util::MAX_FORKS [default: 10]
154    
155     The maximum number of child processes that C<fork_call> will fork in
156     parallel. Any additional requests will be queued until a slot becomes free
157     again.
158    
159     The environment variable C<PERL_ANYEVENT_MAX_FORKS> is used to initialise
160     this value.
161    
162     =cut
163    
164     our $MAX_FORKS = int 1 * $ENV{PERL_ANYEVENT_MAX_FORKS};
165     $MAX_FORKS = 10 if $MAX_FORKS <= 0;
166    
167     my $forks;
168     my @fork_queue;
169    
170     sub _fork_schedule;
171     sub _fork_schedule {
172 root 1.52 require Storable;
173 root 1.1
174 root 1.52 while ($forks < $MAX_FORKS) {
175 root 1.34 my $job = shift @fork_queue
176 root 1.52 or last;
177 root 1.1
178 root 1.34 ++$forks;
179 root 1.1
180 root 1.34 my $coderef = shift @$job;
181     my $cb = pop @$job;
182    
183     # gimme a break...
184     my ($r, $w) = portable_pipe
185     or ($forks and last) # allow failures when we have at least one job
186     or die "fork_call: $!";
187 root 1.1
188 root 1.34 my $pid = fork;
189 root 1.1
190 root 1.35 if ($pid != 0) {
191 root 1.34 # parent
192     close $w;
193 root 1.1
194     my $buf;
195 root 1.34
196     my $ww; $ww = AnyEvent->io (fh => $r, poll => 'r', cb => sub {
197     my $len = sysread $r, $buf, 65536, length $buf;
198    
199     if ($len <= 0) {
200     undef $ww;
201     close $r;
202     --$forks;
203     _fork_schedule;
204    
205     my $result = eval { Storable::thaw ($buf) };
206     $result = [$@] unless $result;
207     $@ = shift @$result;
208    
209     $cb->(@$result);
210    
211 root 1.52 # work around the endlessly broken windows perls
212     kill 9, $pid if AnyEvent::WIN32;
213 root 1.50
214 root 1.34 # clean up the pid
215     waitpid $pid, 0;
216     }
217     });
218    
219     } elsif (defined $pid) {
220     # child
221     close $r;
222    
223     my $result = eval {
224     local $SIG{__DIE__};
225    
226     Storable::freeze ([undef, $coderef->(@$job)])
227     };
228    
229     $result = Storable::freeze (["$@"])
230     if $@;
231    
232     # windows forces us to these contortions
233     my $ofs;
234    
235     while () {
236     my $len = (length $result) - $ofs
237     or last;
238    
239     $len = syswrite $w, $result, $len < 65536 ? $len : 65536, $ofs;
240    
241     last if $len <= 0;
242    
243     $ofs += $len;
244     }
245    
246 root 1.52 # on native windows, _exit KILLS YOUR FORKED CHILDREN!
247 root 1.35 if (AnyEvent::WIN32) {
248 root 1.52 shutdown $w, 1; # signal parent to please kill us
249     sleep 10; # give parent a chance to clean up
250     sysread $w, my $buf, 1; # this *might* detect the parent exiting in some cases.
251 root 1.34 }
252 root 1.52 POSIX::_exit (0);
253 root 1.35 exit 1;
254 root 1.34
255     } elsif (($! != &Errno::EAGAIN && $! != &Errno::ENOMEM) || !$forks) {
256     # we ignore some errors as long as we can run at least one job
257     # maybe we should wait a few seconds and retry instead
258     die "fork_call: $!";
259     }
260 root 1.1 }
261     }
262    
263 root 1.48 sub fork_call(&@) {
264 root 1.34 push @fork_queue, [@_];
265     _fork_schedule;
266 root 1.1 }
267    
268 root 1.24 # to be removed
269 root 1.1 sub dotted_quad($) {
270     $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
271     \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
272     \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
273     \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
274     }
275    
276 root 1.23 # just a forwarder
277 root 1.1 sub inet_aton {
278 root 1.23 require AnyEvent::Socket;
279     *inet_aton = \&AnyEvent::Socket::inet_aton;
280     goto &inet_aton
281 root 1.1 }
282    
283 root 1.14 =item fh_nonblocking $fh, $nonblocking
284 root 1.6
285     Sets the blocking state of the given filehandle (true == nonblocking,
286     false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on
287     broken (i.e. windows) platforms.
288    
289     =cut
290    
291     sub fh_nonblocking($$) {
292     my ($fh, $nb) = @_;
293    
294     require Fcntl;
295    
296 root 1.33 if (AnyEvent::WIN32) {
297 root 1.6 $nb = (! ! $nb) + 0;
298     ioctl $fh, 0x8004667e, \$nb; # FIONBIO
299     } else {
300     fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
301     }
302     }
303    
304 root 1.14 =item $guard = guard { CODE }
305 elmex 1.8
306 root 1.11 This function creates a special object that, when called, will execute the
307     code block.
308 elmex 1.8
309 root 1.11 This is often handy in continuation-passing style code to clean up some
310     resource regardless of where you break out of a process.
311 elmex 1.8
312 root 1.25 You can call one method on the returned object:
313    
314     =item $guard->cancel
315    
316     This simply causes the code block not to be invoked: it "cancels" the
317     guard.
318    
319 elmex 1.8 =cut
320    
321 root 1.14 sub AnyEvent::Util::Guard::DESTROY {
322 root 1.43 local $@;
323    
324     eval {
325     local $SIG{__DIE__};
326     ${$_[0]}->();
327     };
328    
329     warn "runtime error in AnyEvent::guard callback: $@" if $@;
330 root 1.14 }
331    
332 root 1.25 sub AnyEvent::Util::Guard::cancel($) {
333     ${$_[0]} = sub { };
334     }
335    
336 root 1.11 sub guard(&) {
337     bless \(my $cb = shift), AnyEvent::Util::Guard::
338 elmex 1.8 }
339    
340 root 1.1 1;
341    
342     =back
343    
344     =head1 AUTHOR
345    
346     Marc Lehmann <schmorp@schmorp.de>
347     http://home.schmorp.de/
348    
349     =cut
350