ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
Revision: 1.42
Committed: Wed Jun 4 11:45:21 2008 UTC (16 years ago) by root
Branch: MAIN
CVS Tags: rel-4_13
Changes since 1.41: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::Util - various utility functions.
4    
5     =head1 SYNOPSIS
6    
7 root 1.40 use AnyEvent::Util;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module implements various utility functions, mostly replacing
12     well-known functions by event-ised counterparts.
13    
14 root 1.14 All functions documented without C<AnyEvent::Util::> prefix are exported
15     by default.
16    
17 root 1.1 =over 4
18    
19     =cut
20    
21     package AnyEvent::Util;
22    
23 root 1.21 no warnings;
24 root 1.1 use strict;
25    
26 root 1.19 use Carp ();
27 root 1.18 use Errno ();
28 root 1.1 use Socket ();
29    
30 root 1.33 use AnyEvent ();
31 root 1.1
32     use base 'Exporter';
33    
34 root 1.34 our @EXPORT = qw(fh_nonblocking guard fork_call portable_pipe);
35 root 1.38 our @EXPORT_OK = qw(AF_INET6 WSAEWOULDBLOCK WSAEINPROGRESS WSAEINVAL WSAWOULDBLOCK);
36 root 1.34
37 root 1.42 our $VERSION = 4.13;
38 root 1.34
39 root 1.18 BEGIN {
40 root 1.34 my $posix = 1 * eval { local $SIG{__DIE__}; require POSIX };
41     eval "sub POSIX() { $posix }";
42     }
43    
44     BEGIN {
45     # TODO remove this once not used anymore
46 root 1.18 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it
47     }
48    
49 root 1.26 BEGIN {
50 root 1.31 my $af_inet6 = eval { local $SIG{__DIE__}; &Socket::AF_INET6 };
51 root 1.30
52     # uhoh
53     $af_inet6 ||= 10 if $^O =~ /linux/;
54 root 1.32 $af_inet6 ||= 23 if $^O =~ /cygwin/i;
55 root 1.33 $af_inet6 ||= 23 if AnyEvent::WIN32;
56 root 1.30 $af_inet6 ||= 24 if $^O =~ /openbsd|netbsd/;
57     $af_inet6 ||= 28 if $^O =~ /freebsd/;
58    
59 root 1.29 $af_inet6 && socket my $ipv6_socket, $af_inet6, &Socket::SOCK_STREAM, 0 # check if they can be created
60     or $af_inet6 = 0;
61 root 1.30
62 root 1.26 eval "sub AF_INET6() { $af_inet6 }"; die if $@;
63    
64     delete $AnyEvent::PROTOCOL{ipv6} unless $af_inet6;
65     }
66    
67     BEGIN {
68     # broken windows perls use undocumented error codes...
69 root 1.33 if (AnyEvent::WIN32) {
70 root 1.36 eval "sub WSAEINVAL() { 10022 }";
71 root 1.37 eval "sub WSAEWOULDBLOCK() { 10035 }";
72 root 1.38 eval "sub WSAWOULDBLOCK() { 10035 }"; # TODO remove here ands from @export_ok
73 root 1.28 eval "sub WSAEINPROGRESS() { 10036 }";
74 root 1.26 } else {
75 root 1.36 # these should never match any errno value
76     eval "sub WSAEINVAL() { -1e99 }";
77 root 1.37 eval "sub WSAEWOULDBLOCK() { -1e99 }";
78 root 1.38 eval "sub WSAWOULDBLOCK() { -1e99 }"; # TODO
79 root 1.36 eval "sub WSAEINPROGRESS() { -1e99 }";
80 root 1.26 }
81     }
82    
83 root 1.34 =item ($r, $w) = portable_pipe
84    
85     Calling C<pipe> in Perl is portable - except it doesn't really work on
86     sucky windows platforms (at least not with most perls - cygwin's perl
87     notably works fine).
88    
89     On that platform, you actually get two file handles you cannot use select
90     on.
91    
92     This function gives you a pipe that actually works even on the broken
93     Windows platform (by creating a pair of TCP sockets, so do not expect any
94     speed from that).
95    
96     Returns the empty list on any errors.
97    
98     =cut
99    
100     sub portable_pipe() {
101     my ($r, $w);
102    
103     if (AnyEvent::WIN32) {
104     socketpair $r, $w, &Socket::AF_UNIX, &Socket::SOCK_STREAM, 0
105     or return;
106     } else {
107     pipe $r, $w
108     or return;
109     }
110    
111     ($r, $w)
112     }
113    
114     =item fork_call $coderef, @args, $cb->(@res)
115    
116     Executes the given code reference asynchronously, by forking. Everything
117     the C<$coderef> returns will transferred to the calling process (by
118     serialising and deserialising via L<Storable>).
119    
120     If there are any errors, then the C<$cb> will be called without any
121     arguments. In that case, either C<$@> contains the exception, or C<$!>
122     contains an error number. In all other cases, C<$@> will be C<undef>ined.
123    
124     The C<$coderef> must not ever call an event-polling function or use
125     event-based programming.
126    
127 root 1.35 Note that forking can be expensive in large programs (RSS 200MB+). On
128     windows, it is abysmally slow, do not expect more than 5..20 forks/s on
129     that sucky platform (note this uses perl's pseudo-threads, so avoid those
130     like the plague).
131 root 1.34
132     =item $AnyEvent::Util::MAX_FORKS [default: 10]
133    
134     The maximum number of child processes that C<fork_call> will fork in
135     parallel. Any additional requests will be queued until a slot becomes free
136     again.
137    
138     The environment variable C<PERL_ANYEVENT_MAX_FORKS> is used to initialise
139     this value.
140    
141     =cut
142    
143     our $MAX_FORKS = int 1 * $ENV{PERL_ANYEVENT_MAX_FORKS};
144     $MAX_FORKS = 10 if $MAX_FORKS <= 0;
145    
146     my $forks;
147     my @fork_queue;
148    
149     sub _fork_schedule;
150     sub _fork_schedule {
151     while () {
152     return if $forks >= $MAX_FORKS;
153 root 1.1
154 root 1.34 my $job = shift @fork_queue
155     or return;
156 root 1.1
157 root 1.34 ++$forks;
158 root 1.1
159 root 1.34 my $coderef = shift @$job;
160     my $cb = pop @$job;
161    
162     # gimme a break...
163     my ($r, $w) = portable_pipe
164     or ($forks and last) # allow failures when we have at least one job
165     or die "fork_call: $!";
166 root 1.1
167 root 1.34 my $pid = fork;
168 root 1.1
169 root 1.35 if ($pid != 0) {
170 root 1.34 # parent
171     close $w;
172 root 1.1
173     my $buf;
174 root 1.34
175     my $ww; $ww = AnyEvent->io (fh => $r, poll => 'r', cb => sub {
176     my $len = sysread $r, $buf, 65536, length $buf;
177    
178     if ($len <= 0) {
179     undef $ww;
180     close $r;
181     --$forks;
182     _fork_schedule;
183    
184     my $result = eval { Storable::thaw ($buf) };
185     $result = [$@] unless $result;
186     $@ = shift @$result;
187    
188     $cb->(@$result);
189    
190     # clean up the pid
191     waitpid $pid, 0;
192     }
193     });
194    
195     } elsif (defined $pid) {
196     # child
197     close $r;
198    
199     my $result = eval {
200     local $SIG{__DIE__};
201    
202     Storable::freeze ([undef, $coderef->(@$job)])
203     };
204    
205     $result = Storable::freeze (["$@"])
206     if $@;
207    
208     # windows forces us to these contortions
209     my $ofs;
210    
211     while () {
212     my $len = (length $result) - $ofs
213     or last;
214    
215     $len = syswrite $w, $result, $len < 65536 ? $len : 65536, $ofs;
216    
217     last if $len <= 0;
218    
219     $ofs += $len;
220     }
221    
222     close $w;
223    
224 root 1.35 if (AnyEvent::WIN32) {
225     kill 9, $$; # yeah, windows for the win
226     } else {
227     # on native windows, _exit KILLS YOUR FORKED CHILDREN!
228 root 1.34 POSIX::_exit (0);
229     }
230 root 1.35 exit 1;
231 root 1.34
232     } elsif (($! != &Errno::EAGAIN && $! != &Errno::ENOMEM) || !$forks) {
233     # we ignore some errors as long as we can run at least one job
234     # maybe we should wait a few seconds and retry instead
235     die "fork_call: $!";
236     }
237 root 1.1 }
238     }
239    
240 root 1.34 sub fork_call {
241 root 1.35 require Storable;
242    
243 root 1.34 push @fork_queue, [@_];
244     _fork_schedule;
245 root 1.1 }
246    
247 root 1.24 # to be removed
248 root 1.1 sub dotted_quad($) {
249     $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
250     \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
251     \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
252     \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
253     }
254    
255 root 1.23 # just a forwarder
256 root 1.1 sub inet_aton {
257 root 1.23 require AnyEvent::Socket;
258     *inet_aton = \&AnyEvent::Socket::inet_aton;
259     goto &inet_aton
260 root 1.1 }
261    
262 root 1.14 =item fh_nonblocking $fh, $nonblocking
263 root 1.6
264     Sets the blocking state of the given filehandle (true == nonblocking,
265     false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on
266     broken (i.e. windows) platforms.
267    
268     =cut
269    
270     sub fh_nonblocking($$) {
271     my ($fh, $nb) = @_;
272    
273     require Fcntl;
274    
275 root 1.33 if (AnyEvent::WIN32) {
276 root 1.6 $nb = (! ! $nb) + 0;
277     ioctl $fh, 0x8004667e, \$nb; # FIONBIO
278     } else {
279     fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
280     }
281     }
282    
283 root 1.14 =item $guard = guard { CODE }
284 elmex 1.8
285 root 1.11 This function creates a special object that, when called, will execute the
286     code block.
287 elmex 1.8
288 root 1.11 This is often handy in continuation-passing style code to clean up some
289     resource regardless of where you break out of a process.
290 elmex 1.8
291 root 1.25 You can call one method on the returned object:
292    
293     =item $guard->cancel
294    
295     This simply causes the code block not to be invoked: it "cancels" the
296     guard.
297    
298 elmex 1.8 =cut
299    
300 root 1.14 sub AnyEvent::Util::Guard::DESTROY {
301     ${$_[0]}->();
302     }
303    
304 root 1.25 sub AnyEvent::Util::Guard::cancel($) {
305     ${$_[0]} = sub { };
306     }
307    
308 root 1.11 sub guard(&) {
309     bless \(my $cb = shift), AnyEvent::Util::Guard::
310 elmex 1.8 }
311    
312 root 1.1 1;
313    
314     =back
315    
316     =head1 AUTHOR
317    
318     Marc Lehmann <schmorp@schmorp.de>
319     http://home.schmorp.de/
320    
321     =cut
322