ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
Revision: 1.52
Committed: Wed Jul 16 22:08:16 2008 UTC (15 years, 11 months ago) by root
Branch: MAIN
Changes since 1.51: +10 -12 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Util - various utility functions.
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::Util;
8
9 =head1 DESCRIPTION
10
11 This module implements various utility functions, mostly replacing
12 well-known functions by event-ised counterparts.
13
14 All functions documented without C<AnyEvent::Util::> prefix are exported
15 by default.
16
17 =over 4
18
19 =cut
20
21 package AnyEvent::Util;
22
23 no warnings;
24 use strict;
25
26 use Carp ();
27 use Errno ();
28 use Socket ();
29
30 use AnyEvent ();
31
32 use base 'Exporter';
33
34 our @EXPORT = qw(fh_nonblocking guard fork_call portable_pipe);
35 our @EXPORT_OK = qw(AF_INET6 WSAEWOULDBLOCK WSAEINPROGRESS WSAEINVAL WSAWOULDBLOCK);
36
37 our $VERSION = 4.21;
38
39 BEGIN {
40 my $posix = 1 * eval { local $SIG{__DIE__}; require POSIX };
41 eval "sub POSIX() { $posix }";
42 }
43
44 BEGIN {
45 # TODO remove this once not used anymore
46 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it
47 }
48
49 BEGIN {
50 my $af_inet6 = eval { local $SIG{__DIE__}; &Socket::AF_INET6 };
51
52 # uhoh
53 $af_inet6 ||= 10 if $^O =~ /linux/;
54 $af_inet6 ||= 23 if $^O =~ /cygwin/i;
55 $af_inet6 ||= 23 if AnyEvent::WIN32;
56 $af_inet6 ||= 24 if $^O =~ /openbsd|netbsd/;
57 $af_inet6 ||= 28 if $^O =~ /freebsd/;
58
59 $af_inet6 && socket my $ipv6_socket, $af_inet6, &Socket::SOCK_STREAM, 0 # check if they can be created
60 or $af_inet6 = 0;
61
62 eval "sub AF_INET6() { $af_inet6 }"; die if $@;
63
64 delete $AnyEvent::PROTOCOL{ipv6} unless $af_inet6;
65 }
66
67 BEGIN {
68 # broken windows perls use undocumented error codes...
69 if (AnyEvent::WIN32) {
70 eval "sub WSAEINVAL() { 10022 }";
71 eval "sub WSAEWOULDBLOCK() { 10035 }";
72 eval "sub WSAWOULDBLOCK() { 10035 }"; # TODO remove here ands from @export_ok
73 eval "sub WSAEINPROGRESS() { 10036 }";
74 } else {
75 # these should never match any errno value
76 eval "sub WSAEINVAL() { -1e99 }";
77 eval "sub WSAEWOULDBLOCK() { -1e99 }";
78 eval "sub WSAWOULDBLOCK() { -1e99 }"; # TODO
79 eval "sub WSAEINPROGRESS() { -1e99 }";
80 }
81 }
82
83 =item ($r, $w) = portable_pipe
84
85 Calling C<pipe> in Perl is portable - except it doesn't really work on
86 sucky windows platforms (at least not with most perls - cygwin's perl
87 notably works fine).
88
89 On that platform, you actually get two file handles you cannot use select
90 on.
91
92 This function gives you a pipe that actually works even on the broken
93 Windows platform (by creating a pair of TCP sockets, so do not expect any
94 speed from that).
95
96 Returns the empty list on any errors.
97
98 =cut
99
100 sub portable_pipe() {
101 my ($r, $w);
102
103 if (AnyEvent::WIN32) {
104 socketpair $r, $w, &Socket::AF_UNIX, &Socket::SOCK_STREAM, 0
105 or return;
106 } else {
107 pipe $r, $w
108 or return;
109 }
110
111 ($r, $w)
112 }
113
114 =item fork_call { CODE } @args, $cb->(@res)
115
116 Executes the given code block asynchronously, by forking. Everything the
117 block returns will be transferred to the calling process (by serialising and
118 deserialising via L<Storable>).
119
120 If there are any errors, then the C<$cb> will be called without any
121 arguments. In that case, either C<$@> contains the exception (and C<$!> is
122 irrelevant), or C<$!> contains an error number. In all other cases, C<$@>
123 will be C<undef>ined.
124
125 The code block must not ever call an event-polling function or use
126 event-based programming that might cause any callbacks registered in the
127 parent to run.
128
129 Due to the endlessly sucky and broken native windows perls (there is no
130 way to cleanly exit a child process on that platform that doesn't also
131 kill the parent), you have to make sure that your main program doesn't
132 exit as long as any C<fork_calls> are still in progress, otherwise the
133 program won't exit (we are open for improvements that don't require XS
134 hackery).
135
136 Note that forking can be expensive in large programs (RSS 200MB+). On
137 windows, it is abysmally slow, do not expect more than 5..20 forks/s on
138 that sucky platform (note this uses perl's pseudo-threads, so avoid those
139 like the plague).
140
141 Example: poor man's async disk I/O (better use L<IO::AIO>).
142
143 fork_call {
144 open my $fh, "</etc/passwd"
145 or die "passwd: $!";
146 local $/;
147 <$fh>
148 } sub {
149 my ($passwd) = @_;
150 ...
151 };
152
153 =item $AnyEvent::Util::MAX_FORKS [default: 10]
154
155 The maximum number of child processes that C<fork_call> will fork in
156 parallel. Any additional requests will be queued until a slot becomes free
157 again.
158
159 The environment variable C<PERL_ANYEVENT_MAX_FORKS> is used to initialise
160 this value.
161
162 =cut
163
164 our $MAX_FORKS = int 1 * $ENV{PERL_ANYEVENT_MAX_FORKS};
165 $MAX_FORKS = 10 if $MAX_FORKS <= 0;
166
167 my $forks;
168 my @fork_queue;
169
170 sub _fork_schedule;
171 sub _fork_schedule {
172 require Storable;
173
174 while ($forks < $MAX_FORKS) {
175 my $job = shift @fork_queue
176 or last;
177
178 ++$forks;
179
180 my $coderef = shift @$job;
181 my $cb = pop @$job;
182
183 # gimme a break...
184 my ($r, $w) = portable_pipe
185 or ($forks and last) # allow failures when we have at least one job
186 or die "fork_call: $!";
187
188 my $pid = fork;
189
190 if ($pid != 0) {
191 # parent
192 close $w;
193
194 my $buf;
195
196 my $ww; $ww = AnyEvent->io (fh => $r, poll => 'r', cb => sub {
197 my $len = sysread $r, $buf, 65536, length $buf;
198
199 if ($len <= 0) {
200 undef $ww;
201 close $r;
202 --$forks;
203 _fork_schedule;
204
205 my $result = eval { Storable::thaw ($buf) };
206 $result = [$@] unless $result;
207 $@ = shift @$result;
208
209 $cb->(@$result);
210
211 # work around the endlessly broken windows perls
212 kill 9, $pid if AnyEvent::WIN32;
213
214 # clean up the pid
215 waitpid $pid, 0;
216 }
217 });
218
219 } elsif (defined $pid) {
220 # child
221 close $r;
222
223 my $result = eval {
224 local $SIG{__DIE__};
225
226 Storable::freeze ([undef, $coderef->(@$job)])
227 };
228
229 $result = Storable::freeze (["$@"])
230 if $@;
231
232 # windows forces us to these contortions
233 my $ofs;
234
235 while () {
236 my $len = (length $result) - $ofs
237 or last;
238
239 $len = syswrite $w, $result, $len < 65536 ? $len : 65536, $ofs;
240
241 last if $len <= 0;
242
243 $ofs += $len;
244 }
245
246 # on native windows, _exit KILLS YOUR FORKED CHILDREN!
247 if (AnyEvent::WIN32) {
248 shutdown $w, 1; # signal parent to please kill us
249 sleep 10; # give parent a chance to clean up
250 sysread $w, my $buf, 1; # this *might* detect the parent exiting in some cases.
251 }
252 POSIX::_exit (0);
253 exit 1;
254
255 } elsif (($! != &Errno::EAGAIN && $! != &Errno::ENOMEM) || !$forks) {
256 # we ignore some errors as long as we can run at least one job
257 # maybe we should wait a few seconds and retry instead
258 die "fork_call: $!";
259 }
260 }
261 }
262
263 sub fork_call(&@) {
264 push @fork_queue, [@_];
265 _fork_schedule;
266 }
267
268 # to be removed
269 sub dotted_quad($) {
270 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
271 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
272 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
273 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
274 }
275
276 # just a forwarder
277 sub inet_aton {
278 require AnyEvent::Socket;
279 *inet_aton = \&AnyEvent::Socket::inet_aton;
280 goto &inet_aton
281 }
282
283 =item fh_nonblocking $fh, $nonblocking
284
285 Sets the blocking state of the given filehandle (true == nonblocking,
286 false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on
287 broken (i.e. windows) platforms.
288
289 =cut
290
291 sub fh_nonblocking($$) {
292 my ($fh, $nb) = @_;
293
294 require Fcntl;
295
296 if (AnyEvent::WIN32) {
297 $nb = (! ! $nb) + 0;
298 ioctl $fh, 0x8004667e, \$nb; # FIONBIO
299 } else {
300 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
301 }
302 }
303
304 =item $guard = guard { CODE }
305
306 This function creates a special object that, when called, will execute the
307 code block.
308
309 This is often handy in continuation-passing style code to clean up some
310 resource regardless of where you break out of a process.
311
312 You can call one method on the returned object:
313
314 =item $guard->cancel
315
316 This simply causes the code block not to be invoked: it "cancels" the
317 guard.
318
319 =cut
320
321 sub AnyEvent::Util::Guard::DESTROY {
322 local $@;
323
324 eval {
325 local $SIG{__DIE__};
326 ${$_[0]}->();
327 };
328
329 warn "runtime error in AnyEvent::guard callback: $@" if $@;
330 }
331
332 sub AnyEvent::Util::Guard::cancel($) {
333 ${$_[0]} = sub { };
334 }
335
336 sub guard(&) {
337 bless \(my $cb = shift), AnyEvent::Util::Guard::
338 }
339
340 1;
341
342 =back
343
344 =head1 AUTHOR
345
346 Marc Lehmann <schmorp@schmorp.de>
347 http://home.schmorp.de/
348
349 =cut
350