ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
Revision: 1.60
Committed: Thu Oct 30 03:43:14 2008 UTC (15 years, 11 months ago) by root
Branch: MAIN
CVS Tags: rel-4_31
Changes since 1.59: +1 -1 lines
Log Message:
4.31

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Util - various utility functions.
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::Util;
8
9 =head1 DESCRIPTION
10
11 This module implements various utility functions, mostly replacing
12 well-known functions by event-ised counterparts.
13
14 All functions documented without C<AnyEvent::Util::> prefix are exported
15 by default.
16
17 =over 4
18
19 =cut
20
21 package AnyEvent::Util;
22
23 no warnings;
24 use strict;
25
26 use Carp ();
27 use Errno ();
28 use Socket ();
29
30 use AnyEvent ();
31
32 use base 'Exporter';
33
34 our @EXPORT = qw(fh_nonblocking guard fork_call portable_pipe);
35 our @EXPORT_OK = qw(AF_INET6 WSAEWOULDBLOCK WSAEINPROGRESS WSAEINVAL WSAWOULDBLOCK);
36
37 our $VERSION = 4.31;
38
39 BEGIN {
40 my $posix = 1 * eval { local $SIG{__DIE__}; require POSIX };
41 eval "sub POSIX() { $posix }";
42 }
43
44 BEGIN {
45 # TODO remove this once not used anymore
46 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it
47 }
48
49 BEGIN {
50 my $af_inet6 = eval { local $SIG{__DIE__}; &Socket::AF_INET6 };
51
52 # uhoh
53 $af_inet6 ||= 10 if $^O =~ /linux/;
54 $af_inet6 ||= 23 if $^O =~ /cygwin/i;
55 $af_inet6 ||= 23 if AnyEvent::WIN32;
56 $af_inet6 ||= 24 if $^O =~ /openbsd|netbsd/;
57 $af_inet6 ||= 28 if $^O =~ /freebsd/;
58
59 $af_inet6 && socket my $ipv6_socket, $af_inet6, &Socket::SOCK_STREAM, 0 # check if they can be created
60 or $af_inet6 = 0;
61
62 eval "sub AF_INET6() { $af_inet6 }"; die if $@;
63
64 delete $AnyEvent::PROTOCOL{ipv6} unless $af_inet6;
65 }
66
67 BEGIN {
68 # broken windows perls use undocumented error codes...
69 if (AnyEvent::WIN32) {
70 eval "sub WSAEINVAL() { 10022 }";
71 eval "sub WSAEWOULDBLOCK() { 10035 }";
72 eval "sub WSAWOULDBLOCK() { 10035 }"; # TODO remove here ands from @export_ok
73 eval "sub WSAEINPROGRESS() { 10036 }";
74 } else {
75 # these should never match any errno value
76 eval "sub WSAEINVAL() { -1e99 }";
77 eval "sub WSAEWOULDBLOCK() { -1e99 }";
78 eval "sub WSAWOULDBLOCK() { -1e99 }"; # TODO
79 eval "sub WSAEINPROGRESS() { -1e99 }";
80 }
81 }
82
83 =item ($r, $w) = portable_pipe
84
85 Calling C<pipe> in Perl is portable - except it doesn't really work on
86 sucky windows platforms (at least not with most perls - cygwin's perl
87 notably works fine).
88
89 On that platform, you actually get two file handles you cannot use select
90 on.
91
92 This function gives you a pipe that actually works even on the broken
93 Windows platform (by creating a pair of TCP sockets, so do not expect any
94 speed from that).
95
96 Returns the empty list on any errors.
97
98 =cut
99
100 sub portable_pipe() {
101 my ($r, $w);
102
103 if (AnyEvent::WIN32) {
104 socketpair $r, $w, &Socket::AF_UNIX, &Socket::SOCK_STREAM, 0
105 or return;
106 } else {
107 pipe $r, $w
108 or return;
109 }
110
111 ($r, $w)
112 }
113
114 =item fork_call { CODE } @args, $cb->(@res)
115
116 Executes the given code block asynchronously, by forking. Everything the
117 block returns will be transferred to the calling process (by serialising and
118 deserialising via L<Storable>).
119
120 If there are any errors, then the C<$cb> will be called without any
121 arguments. In that case, either C<$@> contains the exception (and C<$!> is
122 irrelevant), or C<$!> contains an error number. In all other cases, C<$@>
123 will be C<undef>ined.
124
125 The code block must not ever call an event-polling function or use
126 event-based programming that might cause any callbacks registered in the
127 parent to run.
128
129 Win32 spoilers: Due to the endlessly sucky and broken native windows
130 perls (there is no way to cleanly exit a child process on that platform
131 that doesn't also kill the parent), you have to make sure that your main
132 program doesn't exit as long as any C<fork_calls> are still in progress,
133 otherwise the program won't exit. Also, on most windows platforms some
134 memory will leak for every invocation. We are open for improvements that
135 don't require XS hackery.
136
137 Note that forking can be expensive in large programs (RSS 200MB+). On
138 windows, it is abysmally slow, do not expect more than 5..20 forks/s on
139 that sucky platform (note this uses perl's pseudo-threads, so avoid those
140 like the plague).
141
142 Example: poor man's async disk I/O (better use L<IO::AIO>).
143
144 fork_call {
145 open my $fh, "</etc/passwd"
146 or die "passwd: $!";
147 local $/;
148 <$fh>
149 } sub {
150 my ($passwd) = @_;
151 ...
152 };
153
154 =item $AnyEvent::Util::MAX_FORKS [default: 10]
155
156 The maximum number of child processes that C<fork_call> will fork in
157 parallel. Any additional requests will be queued until a slot becomes free
158 again.
159
160 The environment variable C<PERL_ANYEVENT_MAX_FORKS> is used to initialise
161 this value.
162
163 =cut
164
165 our $MAX_FORKS = int 1 * $ENV{PERL_ANYEVENT_MAX_FORKS};
166 $MAX_FORKS = 10 if $MAX_FORKS <= 0;
167
168 my $forks;
169 my @fork_queue;
170
171 sub _fork_schedule;
172 sub _fork_schedule {
173 require Storable;
174
175 while ($forks < $MAX_FORKS) {
176 my $job = shift @fork_queue
177 or last;
178
179 ++$forks;
180
181 my $coderef = shift @$job;
182 my $cb = pop @$job;
183
184 # gimme a break...
185 my ($r, $w) = portable_pipe
186 or ($forks and last) # allow failures when we have at least one job
187 or die "fork_call: $!";
188
189 my $pid = fork;
190
191 if ($pid != 0) {
192 # parent
193 close $w;
194
195 my $buf;
196
197 my $ww; $ww = AnyEvent->io (fh => $r, poll => 'r', cb => sub {
198 my $len = sysread $r, $buf, 65536, length $buf;
199
200 if ($len <= 0) {
201 undef $ww;
202 close $r;
203 --$forks;
204 _fork_schedule;
205
206 my $result = eval { Storable::thaw ($buf) };
207 $result = [$@] unless $result;
208 $@ = shift @$result;
209
210 $cb->(@$result);
211
212 # work around the endlessly broken windows perls
213 kill 9, $pid if AnyEvent::WIN32;
214
215 # clean up the pid
216 waitpid $pid, 0;
217 }
218 });
219
220 } elsif (defined $pid) {
221 # child
222 close $r;
223
224 my $result = eval {
225 local $SIG{__DIE__};
226
227 Storable::freeze ([undef, $coderef->(@$job)])
228 };
229
230 $result = Storable::freeze (["$@"])
231 if $@;
232
233 # windows forces us to these contortions
234 my $ofs;
235
236 while () {
237 my $len = (length $result) - $ofs
238 or last;
239
240 $len = syswrite $w, $result, $len < 65536 ? $len : 65536, $ofs;
241
242 last if $len <= 0;
243
244 $ofs += $len;
245 }
246
247 # on native windows, _exit KILLS YOUR FORKED CHILDREN!
248 if (AnyEvent::WIN32) {
249 shutdown $w, 1; # signal parent to please kill us
250 sleep 10; # give parent a chance to clean up
251 sysread $w, my $buf, 1; # this *might* detect the parent exiting in some cases.
252 }
253 POSIX::_exit (0);
254 exit 1;
255
256 } elsif (($! != &Errno::EAGAIN && $! != &Errno::ENOMEM) || !$forks) {
257 # we ignore some errors as long as we can run at least one job
258 # maybe we should wait a few seconds and retry instead
259 die "fork_call: $!";
260 }
261 }
262 }
263
264 sub fork_call(&@) {
265 push @fork_queue, [@_];
266 _fork_schedule;
267 }
268
269 END {
270 if (AnyEvent::WIN32) {
271 while ($forks) {
272 @fork_queue = ();
273 AnyEvent->one_event;
274 }
275 }
276 }
277
278 # to be removed
279 sub dotted_quad($) {
280 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
281 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
282 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
283 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
284 }
285
286 # just a forwarder
287 sub inet_aton {
288 require AnyEvent::Socket;
289 *inet_aton = \&AnyEvent::Socket::inet_aton;
290 goto &inet_aton
291 }
292
293 =item fh_nonblocking $fh, $nonblocking
294
295 Sets the blocking state of the given filehandle (true == nonblocking,
296 false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on
297 broken (i.e. windows) platforms.
298
299 =cut
300
301 sub fh_nonblocking($$) {
302 my ($fh, $nb) = @_;
303
304 require Fcntl;
305
306 if (AnyEvent::WIN32) {
307 $nb = (! ! $nb) + 0;
308 ioctl $fh, 0x8004667e, \$nb; # FIONBIO
309 } else {
310 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
311 }
312 }
313
314 =item $guard = guard { CODE }
315
316 This function creates a special object that, when called, will execute the
317 code block.
318
319 This is often handy in continuation-passing style code to clean up some
320 resource regardless of where you break out of a process.
321
322 You can call one method on the returned object:
323
324 =item $guard->cancel
325
326 This simply causes the code block not to be invoked: it "cancels" the
327 guard.
328
329 =cut
330
331 sub AnyEvent::Util::Guard::DESTROY {
332 local $@;
333
334 eval {
335 local $SIG{__DIE__};
336 ${$_[0]}->();
337 };
338
339 warn "runtime error in AnyEvent::guard callback: $@" if $@;
340 }
341
342 sub AnyEvent::Util::Guard::cancel($) {
343 ${$_[0]} = sub { };
344 }
345
346 sub guard(&) {
347 bless \(my $cb = shift), AnyEvent::Util::Guard::
348 }
349
350 1;
351
352 =back
353
354 =head1 AUTHOR
355
356 Marc Lehmann <schmorp@schmorp.de>
357 http://home.schmorp.de/
358
359 =cut
360