ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
Revision: 1.50
Committed: Wed Jul 16 21:17:59 2008 UTC (15 years, 11 months ago) by root
Branch: MAIN
Changes since 1.49: +29 -7 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Util - various utility functions.
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::Util;
8
9 =head1 DESCRIPTION
10
11 This module implements various utility functions, mostly replacing
12 well-known functions by event-ised counterparts.
13
14 All functions documented without C<AnyEvent::Util::> prefix are exported
15 by default.
16
17 =over 4
18
19 =cut
20
21 package AnyEvent::Util;
22
23 no warnings;
24 use strict;
25
26 use Carp ();
27 use Errno ();
28 use Socket ();
29
30 use AnyEvent ();
31
32 use base 'Exporter';
33
34 our @EXPORT = qw(fh_nonblocking guard fork_call portable_pipe);
35 our @EXPORT_OK = qw(AF_INET6 WSAEWOULDBLOCK WSAEINPROGRESS WSAEINVAL WSAWOULDBLOCK);
36
37 our $VERSION = 4.21;
38
39 BEGIN {
40 my $posix = 1 * eval { local $SIG{__DIE__}; require POSIX };
41 eval "sub POSIX() { $posix }";
42 }
43
44 BEGIN {
45 # TODO remove this once not used anymore
46 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it
47 }
48
49 BEGIN {
50 my $af_inet6 = eval { local $SIG{__DIE__}; &Socket::AF_INET6 };
51
52 # uhoh
53 $af_inet6 ||= 10 if $^O =~ /linux/;
54 $af_inet6 ||= 23 if $^O =~ /cygwin/i;
55 $af_inet6 ||= 23 if AnyEvent::WIN32;
56 $af_inet6 ||= 24 if $^O =~ /openbsd|netbsd/;
57 $af_inet6 ||= 28 if $^O =~ /freebsd/;
58
59 $af_inet6 && socket my $ipv6_socket, $af_inet6, &Socket::SOCK_STREAM, 0 # check if they can be created
60 or $af_inet6 = 0;
61
62 eval "sub AF_INET6() { $af_inet6 }"; die if $@;
63
64 delete $AnyEvent::PROTOCOL{ipv6} unless $af_inet6;
65 }
66
67 BEGIN {
68 # broken windows perls use undocumented error codes...
69 if (AnyEvent::WIN32) {
70 eval "sub WSAEINVAL() { 10022 }";
71 eval "sub WSAEWOULDBLOCK() { 10035 }";
72 eval "sub WSAWOULDBLOCK() { 10035 }"; # TODO remove here ands from @export_ok
73 eval "sub WSAEINPROGRESS() { 10036 }";
74 } else {
75 # these should never match any errno value
76 eval "sub WSAEINVAL() { -1e99 }";
77 eval "sub WSAEWOULDBLOCK() { -1e99 }";
78 eval "sub WSAWOULDBLOCK() { -1e99 }"; # TODO
79 eval "sub WSAEINPROGRESS() { -1e99 }";
80 }
81 }
82
83 =item ($r, $w) = portable_pipe
84
85 Calling C<pipe> in Perl is portable - except it doesn't really work on
86 sucky windows platforms (at least not with most perls - cygwin's perl
87 notably works fine).
88
89 On that platform, you actually get two file handles you cannot use select
90 on.
91
92 This function gives you a pipe that actually works even on the broken
93 Windows platform (by creating a pair of TCP sockets, so do not expect any
94 speed from that).
95
96 Returns the empty list on any errors.
97
98 =cut
99
100 sub portable_pipe() {
101 my ($r, $w);
102
103 if (AnyEvent::WIN32) {
104 socketpair $r, $w, &Socket::AF_UNIX, &Socket::SOCK_STREAM, 0
105 or return;
106 } else {
107 pipe $r, $w
108 or return;
109 }
110
111 ($r, $w)
112 }
113
114 =item fork_call { CODE } @args, $cb->(@res)
115
116 Executes the given code block asynchronously, by forking. Everything the
117 block returns will be transferred to the calling process (by serialising and
118 deserialising via L<Storable>).
119
120 If there are any errors, then the C<$cb> will be called without any
121 arguments. In that case, either C<$@> contains the exception (and C<$!> is
122 irrelevant), or C<$!> contains an error number. In all other cases, C<$@>
123 will be C<undef>ined.
124
125 The code block must not ever call an event-polling function or use
126 event-based programming that might cause any callbacks registered in the
127 parent to run.
128
129 Due to the endlessly sucky and broken native windows perls (there is no
130 way to cleanly exit a child process on that platform that doesn't also
131 kill the parent), you have to make sure that your main program doesn't
132 exit as long as any C<fork_calls> are still in progress, otherwise the
133 program won't exit (we are open for improvements that don't require XS
134 hackery).
135
136 Note that forking can be expensive in large programs (RSS 200MB+). On
137 windows, it is abysmally slow, do not expect more than 5..20 forks/s on
138 that sucky platform (note this uses perl's pseudo-threads, so avoid those
139 like the plague).
140
141 Example: poor man's async disk I/O (better use L<IO::AIO<).
142
143 fork_call {
144 open my $fh, "</etc/passwd"
145 or die "passwd: $!";
146 local $/;
147 <$fh>
148 } sub {
149 my ($passwd) = @_;
150 ...
151 };
152
153 =item $AnyEvent::Util::MAX_FORKS [default: 10]
154
155 The maximum number of child processes that C<fork_call> will fork in
156 parallel. Any additional requests will be queued until a slot becomes free
157 again.
158
159 The environment variable C<PERL_ANYEVENT_MAX_FORKS> is used to initialise
160 this value.
161
162 =cut
163
164 our $MAX_FORKS = int 1 * $ENV{PERL_ANYEVENT_MAX_FORKS};
165 $MAX_FORKS = 10 if $MAX_FORKS <= 0;
166
167 my $forks;
168 my @fork_queue;
169
170 sub _fork_schedule;
171 sub _fork_schedule {
172 while () {
173 return if $forks >= $MAX_FORKS;
174
175 my $job = shift @fork_queue
176 or return;
177
178 ++$forks;
179
180 my $coderef = shift @$job;
181 my $cb = pop @$job;
182
183 # gimme a break...
184 my ($r, $w) = portable_pipe
185 or ($forks and last) # allow failures when we have at least one job
186 or die "fork_call: $!";
187
188 my $pid = fork;
189
190 if ($pid != 0) {
191 # parent
192 close $w;
193
194 my $buf;
195
196 my $ww; $ww = AnyEvent->io (fh => $r, poll => 'r', cb => sub {
197 my $len = sysread $r, $buf, 65536, length $buf;
198
199 if ($len <= 0) {
200 undef $ww;
201 close $r;
202 --$forks;
203 _fork_schedule;
204
205 my $result = eval { Storable::thaw ($buf) };
206 $result = [$@] unless $result;
207 $@ = shift @$result;
208
209 $cb->(@$result);
210
211 kill 9, $pid if AnyEvent::WIN32; # work around the endlessly broken windows perls
212
213 # clean up the pid
214 waitpid $pid, 0;
215 }
216 });
217
218 } elsif (defined $pid) {
219 # child
220 close $r;
221
222 my $result = eval {
223 local $SIG{__DIE__};
224
225 Storable::freeze ([undef, $coderef->(@$job)])
226 };
227
228 $result = Storable::freeze (["$@"])
229 if $@;
230
231 # windows forces us to these contortions
232 my $ofs;
233
234 while () {
235 my $len = (length $result) - $ofs
236 or last;
237
238 $len = syswrite $w, $result, $len < 65536 ? $len : 65536, $ofs;
239
240 last if $len <= 0;
241
242 $ofs += $len;
243 }
244
245 close $w;
246
247 if (AnyEvent::WIN32) {
248 sleep 3600 while 1; # yes, we can't kill ourselves, and windows has no working _exit
249 } else {
250 # on native windows, _exit KILLS YOUR FORKED CHILDREN!
251 POSIX::_exit (0);
252 }
253 exit 1;
254
255 } elsif (($! != &Errno::EAGAIN && $! != &Errno::ENOMEM) || !$forks) {
256 # we ignore some errors as long as we can run at least one job
257 # maybe we should wait a few seconds and retry instead
258 die "fork_call: $!";
259 }
260 }
261 }
262
263 sub fork_call(&@) {
264 require Storable;
265
266 push @fork_queue, [@_];
267 _fork_schedule;
268 }
269
270 # to be removed
271 sub dotted_quad($) {
272 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
273 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
274 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
275 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
276 }
277
278 # just a forwarder
279 sub inet_aton {
280 require AnyEvent::Socket;
281 *inet_aton = \&AnyEvent::Socket::inet_aton;
282 goto &inet_aton
283 }
284
285 =item fh_nonblocking $fh, $nonblocking
286
287 Sets the blocking state of the given filehandle (true == nonblocking,
288 false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on
289 broken (i.e. windows) platforms.
290
291 =cut
292
293 sub fh_nonblocking($$) {
294 my ($fh, $nb) = @_;
295
296 require Fcntl;
297
298 if (AnyEvent::WIN32) {
299 $nb = (! ! $nb) + 0;
300 ioctl $fh, 0x8004667e, \$nb; # FIONBIO
301 } else {
302 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
303 }
304 }
305
306 =item $guard = guard { CODE }
307
308 This function creates a special object that, when called, will execute the
309 code block.
310
311 This is often handy in continuation-passing style code to clean up some
312 resource regardless of where you break out of a process.
313
314 You can call one method on the returned object:
315
316 =item $guard->cancel
317
318 This simply causes the code block not to be invoked: it "cancels" the
319 guard.
320
321 =cut
322
323 sub AnyEvent::Util::Guard::DESTROY {
324 local $@;
325
326 eval {
327 local $SIG{__DIE__};
328 ${$_[0]}->();
329 };
330
331 warn "runtime error in AnyEvent::guard callback: $@" if $@;
332 }
333
334 sub AnyEvent::Util::Guard::cancel($) {
335 ${$_[0]} = sub { };
336 }
337
338 sub guard(&) {
339 bless \(my $cb = shift), AnyEvent::Util::Guard::
340 }
341
342 1;
343
344 =back
345
346 =head1 AUTHOR
347
348 Marc Lehmann <schmorp@schmorp.de>
349 http://home.schmorp.de/
350
351 =cut
352