ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
Revision: 1.46
Committed: Fri Jun 6 15:35:30 2008 UTC (16 years ago) by root
Branch: MAIN
CVS Tags: rel-4_151, rel-4_152
Changes since 1.45: +1 -1 lines
Log Message:
4.151

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Util - various utility functions.
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::Util;
8
9 =head1 DESCRIPTION
10
11 This module implements various utility functions, mostly replacing
12 well-known functions by event-ised counterparts.
13
14 All functions documented without C<AnyEvent::Util::> prefix are exported
15 by default.
16
17 =over 4
18
19 =cut
20
21 package AnyEvent::Util;
22
23 no warnings;
24 use strict;
25
26 use Carp ();
27 use Errno ();
28 use Socket ();
29
30 use AnyEvent ();
31
32 use base 'Exporter';
33
34 our @EXPORT = qw(fh_nonblocking guard fork_call portable_pipe);
35 our @EXPORT_OK = qw(AF_INET6 WSAEWOULDBLOCK WSAEINPROGRESS WSAEINVAL WSAWOULDBLOCK);
36
37 our $VERSION = 4.151;
38
39 BEGIN {
40 my $posix = 1 * eval { local $SIG{__DIE__}; require POSIX };
41 eval "sub POSIX() { $posix }";
42 }
43
44 BEGIN {
45 # TODO remove this once not used anymore
46 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it
47 }
48
49 BEGIN {
50 my $af_inet6 = eval { local $SIG{__DIE__}; &Socket::AF_INET6 };
51
52 # uhoh
53 $af_inet6 ||= 10 if $^O =~ /linux/;
54 $af_inet6 ||= 23 if $^O =~ /cygwin/i;
55 $af_inet6 ||= 23 if AnyEvent::WIN32;
56 $af_inet6 ||= 24 if $^O =~ /openbsd|netbsd/;
57 $af_inet6 ||= 28 if $^O =~ /freebsd/;
58
59 $af_inet6 && socket my $ipv6_socket, $af_inet6, &Socket::SOCK_STREAM, 0 # check if they can be created
60 or $af_inet6 = 0;
61
62 eval "sub AF_INET6() { $af_inet6 }"; die if $@;
63
64 delete $AnyEvent::PROTOCOL{ipv6} unless $af_inet6;
65 }
66
67 BEGIN {
68 # broken windows perls use undocumented error codes...
69 if (AnyEvent::WIN32) {
70 eval "sub WSAEINVAL() { 10022 }";
71 eval "sub WSAEWOULDBLOCK() { 10035 }";
72 eval "sub WSAWOULDBLOCK() { 10035 }"; # TODO remove here ands from @export_ok
73 eval "sub WSAEINPROGRESS() { 10036 }";
74 } else {
75 # these should never match any errno value
76 eval "sub WSAEINVAL() { -1e99 }";
77 eval "sub WSAEWOULDBLOCK() { -1e99 }";
78 eval "sub WSAWOULDBLOCK() { -1e99 }"; # TODO
79 eval "sub WSAEINPROGRESS() { -1e99 }";
80 }
81 }
82
83 =item ($r, $w) = portable_pipe
84
85 Calling C<pipe> in Perl is portable - except it doesn't really work on
86 sucky windows platforms (at least not with most perls - cygwin's perl
87 notably works fine).
88
89 On that platform, you actually get two file handles you cannot use select
90 on.
91
92 This function gives you a pipe that actually works even on the broken
93 Windows platform (by creating a pair of TCP sockets, so do not expect any
94 speed from that).
95
96 Returns the empty list on any errors.
97
98 =cut
99
100 sub portable_pipe() {
101 my ($r, $w);
102
103 if (AnyEvent::WIN32) {
104 socketpair $r, $w, &Socket::AF_UNIX, &Socket::SOCK_STREAM, 0
105 or return;
106 } else {
107 pipe $r, $w
108 or return;
109 }
110
111 ($r, $w)
112 }
113
114 =item fork_call $coderef, @args, $cb->(@res)
115
116 Executes the given code reference asynchronously, by forking. Everything
117 the C<$coderef> returns will transferred to the calling process (by
118 serialising and deserialising via L<Storable>).
119
120 If there are any errors, then the C<$cb> will be called without any
121 arguments. In that case, either C<$@> contains the exception, or C<$!>
122 contains an error number. In all other cases, C<$@> will be C<undef>ined.
123
124 The C<$coderef> must not ever call an event-polling function or use
125 event-based programming.
126
127 Note that forking can be expensive in large programs (RSS 200MB+). On
128 windows, it is abysmally slow, do not expect more than 5..20 forks/s on
129 that sucky platform (note this uses perl's pseudo-threads, so avoid those
130 like the plague).
131
132 =item $AnyEvent::Util::MAX_FORKS [default: 10]
133
134 The maximum number of child processes that C<fork_call> will fork in
135 parallel. Any additional requests will be queued until a slot becomes free
136 again.
137
138 The environment variable C<PERL_ANYEVENT_MAX_FORKS> is used to initialise
139 this value.
140
141 =cut
142
143 our $MAX_FORKS = int 1 * $ENV{PERL_ANYEVENT_MAX_FORKS};
144 $MAX_FORKS = 10 if $MAX_FORKS <= 0;
145
146 my $forks;
147 my @fork_queue;
148
149 sub _fork_schedule;
150 sub _fork_schedule {
151 while () {
152 return if $forks >= $MAX_FORKS;
153
154 my $job = shift @fork_queue
155 or return;
156
157 ++$forks;
158
159 my $coderef = shift @$job;
160 my $cb = pop @$job;
161
162 # gimme a break...
163 my ($r, $w) = portable_pipe
164 or ($forks and last) # allow failures when we have at least one job
165 or die "fork_call: $!";
166
167 my $pid = fork;
168
169 if ($pid != 0) {
170 # parent
171 close $w;
172
173 my $buf;
174
175 my $ww; $ww = AnyEvent->io (fh => $r, poll => 'r', cb => sub {
176 my $len = sysread $r, $buf, 65536, length $buf;
177
178 if ($len <= 0) {
179 undef $ww;
180 close $r;
181 --$forks;
182 _fork_schedule;
183
184 my $result = eval { Storable::thaw ($buf) };
185 $result = [$@] unless $result;
186 $@ = shift @$result;
187
188 $cb->(@$result);
189
190 # clean up the pid
191 waitpid $pid, 0;
192 }
193 });
194
195 } elsif (defined $pid) {
196 # child
197 close $r;
198
199 my $result = eval {
200 local $SIG{__DIE__};
201
202 Storable::freeze ([undef, $coderef->(@$job)])
203 };
204
205 $result = Storable::freeze (["$@"])
206 if $@;
207
208 # windows forces us to these contortions
209 my $ofs;
210
211 while () {
212 my $len = (length $result) - $ofs
213 or last;
214
215 $len = syswrite $w, $result, $len < 65536 ? $len : 65536, $ofs;
216
217 last if $len <= 0;
218
219 $ofs += $len;
220 }
221
222 close $w;
223
224 if (AnyEvent::WIN32) {
225 kill 9, $$; # yeah, windows for the win
226 } else {
227 # on native windows, _exit KILLS YOUR FORKED CHILDREN!
228 POSIX::_exit (0);
229 }
230 exit 1;
231
232 } elsif (($! != &Errno::EAGAIN && $! != &Errno::ENOMEM) || !$forks) {
233 # we ignore some errors as long as we can run at least one job
234 # maybe we should wait a few seconds and retry instead
235 die "fork_call: $!";
236 }
237 }
238 }
239
240 sub fork_call {
241 require Storable;
242
243 push @fork_queue, [@_];
244 _fork_schedule;
245 }
246
247 # to be removed
248 sub dotted_quad($) {
249 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
250 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
251 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
252 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
253 }
254
255 # just a forwarder
256 sub inet_aton {
257 require AnyEvent::Socket;
258 *inet_aton = \&AnyEvent::Socket::inet_aton;
259 goto &inet_aton
260 }
261
262 =item fh_nonblocking $fh, $nonblocking
263
264 Sets the blocking state of the given filehandle (true == nonblocking,
265 false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on
266 broken (i.e. windows) platforms.
267
268 =cut
269
270 sub fh_nonblocking($$) {
271 my ($fh, $nb) = @_;
272
273 require Fcntl;
274
275 if (AnyEvent::WIN32) {
276 $nb = (! ! $nb) + 0;
277 ioctl $fh, 0x8004667e, \$nb; # FIONBIO
278 } else {
279 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
280 }
281 }
282
283 =item $guard = guard { CODE }
284
285 This function creates a special object that, when called, will execute the
286 code block.
287
288 This is often handy in continuation-passing style code to clean up some
289 resource regardless of where you break out of a process.
290
291 You can call one method on the returned object:
292
293 =item $guard->cancel
294
295 This simply causes the code block not to be invoked: it "cancels" the
296 guard.
297
298 =cut
299
300 sub AnyEvent::Util::Guard::DESTROY {
301 local $@;
302
303 eval {
304 local $SIG{__DIE__};
305 ${$_[0]}->();
306 };
307
308 warn "runtime error in AnyEvent::guard callback: $@" if $@;
309 }
310
311 sub AnyEvent::Util::Guard::cancel($) {
312 ${$_[0]} = sub { };
313 }
314
315 sub guard(&) {
316 bless \(my $cb = shift), AnyEvent::Util::Guard::
317 }
318
319 1;
320
321 =back
322
323 =head1 AUTHOR
324
325 Marc Lehmann <schmorp@schmorp.de>
326 http://home.schmorp.de/
327
328 =cut
329