ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
Revision: 1.35
Committed: Tue May 27 03:13:44 2008 UTC (16 years, 1 month ago) by root
Branch: MAIN
Changes since 1.34: +12 -8 lines
Log Message:
windows sucks bigbigtime

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::Util - various utility functions.
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::Util;
8
9 =head1 DESCRIPTION
10
11 This module implements various utility functions, mostly replacing
12 well-known functions by event-ised counterparts.
13
14 All functions documented without C<AnyEvent::Util::> prefix are exported
15 by default.
16
17 =over 4
18
19 =cut
20
21 package AnyEvent::Util;
22
23 no warnings;
24 use strict;
25
26 use Carp ();
27 use Errno ();
28 use Socket ();
29
30 use AnyEvent ();
31
32 use base 'Exporter';
33
34 our @EXPORT = qw(fh_nonblocking guard fork_call portable_pipe);
35 our @EXPORT_OK = qw(AF_INET6 WSAWOULDBLOCK WSAEINPROGRESS);
36
37 our $VERSION = '1.0';
38
39 BEGIN {
40 my $posix = 1 * eval { local $SIG{__DIE__}; require POSIX };
41 eval "sub POSIX() { $posix }";
42 }
43
44 BEGIN {
45 # TODO remove this once not used anymore
46 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it
47 }
48
49 BEGIN {
50 my $af_inet6 = eval { local $SIG{__DIE__}; &Socket::AF_INET6 };
51
52 # uhoh
53 $af_inet6 ||= 10 if $^O =~ /linux/;
54 $af_inet6 ||= 23 if $^O =~ /cygwin/i;
55 $af_inet6 ||= 23 if AnyEvent::WIN32;
56 $af_inet6 ||= 24 if $^O =~ /openbsd|netbsd/;
57 $af_inet6 ||= 28 if $^O =~ /freebsd/;
58
59 $af_inet6 && socket my $ipv6_socket, $af_inet6, &Socket::SOCK_STREAM, 0 # check if they can be created
60 or $af_inet6 = 0;
61
62 eval "sub AF_INET6() { $af_inet6 }"; die if $@;
63
64 delete $AnyEvent::PROTOCOL{ipv6} unless $af_inet6;
65 }
66
67 BEGIN {
68 # broken windows perls use undocumented error codes...
69 if (AnyEvent::WIN32) {
70 eval "sub WSAWOULDBLOCK() { 10035 }";
71 eval "sub WSAEINPROGRESS() { 10036 }";
72 } else {
73 eval "sub WSAWOULDBLOCK() { -1e99 }"; # should never match any errno value
74 eval "sub WSAEINPROGRESS() { -1e99 }"; # should never match any errno value
75 }
76 }
77
78 =item ($r, $w) = portable_pipe
79
80 Calling C<pipe> in Perl is portable - except it doesn't really work on
81 sucky windows platforms (at least not with most perls - cygwin's perl
82 notably works fine).
83
84 On that platform, you actually get two file handles you cannot use select
85 on.
86
87 This function gives you a pipe that actually works even on the broken
88 Windows platform (by creating a pair of TCP sockets, so do not expect any
89 speed from that).
90
91 Returns the empty list on any errors.
92
93 =cut
94
95 sub portable_pipe() {
96 my ($r, $w);
97
98 if (AnyEvent::WIN32) {
99 socketpair $r, $w, &Socket::AF_UNIX, &Socket::SOCK_STREAM, 0
100 or return;
101 } else {
102 pipe $r, $w
103 or return;
104 }
105
106 ($r, $w)
107 }
108
109 =item fork_call $coderef, @args, $cb->(@res)
110
111 Executes the given code reference asynchronously, by forking. Everything
112 the C<$coderef> returns will transferred to the calling process (by
113 serialising and deserialising via L<Storable>).
114
115 If there are any errors, then the C<$cb> will be called without any
116 arguments. In that case, either C<$@> contains the exception, or C<$!>
117 contains an error number. In all other cases, C<$@> will be C<undef>ined.
118
119 The C<$coderef> must not ever call an event-polling function or use
120 event-based programming.
121
122 Note that forking can be expensive in large programs (RSS 200MB+). On
123 windows, it is abysmally slow, do not expect more than 5..20 forks/s on
124 that sucky platform (note this uses perl's pseudo-threads, so avoid those
125 like the plague).
126
127 =item $AnyEvent::Util::MAX_FORKS [default: 10]
128
129 The maximum number of child processes that C<fork_call> will fork in
130 parallel. Any additional requests will be queued until a slot becomes free
131 again.
132
133 The environment variable C<PERL_ANYEVENT_MAX_FORKS> is used to initialise
134 this value.
135
136 =cut
137
138 our $MAX_FORKS = int 1 * $ENV{PERL_ANYEVENT_MAX_FORKS};
139 $MAX_FORKS = 10 if $MAX_FORKS <= 0;
140
141 my $forks;
142 my @fork_queue;
143
144 sub _fork_schedule;
145 sub _fork_schedule {
146 while () {
147 return if $forks >= $MAX_FORKS;
148
149 my $job = shift @fork_queue
150 or return;
151
152 ++$forks;
153
154 my $coderef = shift @$job;
155 my $cb = pop @$job;
156
157 # gimme a break...
158 my ($r, $w) = portable_pipe
159 or ($forks and last) # allow failures when we have at least one job
160 or die "fork_call: $!";
161
162 my $pid = fork;
163
164 if ($pid != 0) {
165 # parent
166 close $w;
167
168 my $buf;
169
170 my $ww; $ww = AnyEvent->io (fh => $r, poll => 'r', cb => sub {
171 my $len = sysread $r, $buf, 65536, length $buf;
172
173 if ($len <= 0) {
174 undef $ww;
175 close $r;
176 --$forks;
177 _fork_schedule;
178
179 my $result = eval { Storable::thaw ($buf) };
180 $result = [$@] unless $result;
181 $@ = shift @$result;
182
183 $cb->(@$result);
184
185 # clean up the pid
186 waitpid $pid, 0;
187 }
188 });
189
190 } elsif (defined $pid) {
191 # child
192 close $r;
193
194 my $result = eval {
195 local $SIG{__DIE__};
196
197 Storable::freeze ([undef, $coderef->(@$job)])
198 };
199
200 $result = Storable::freeze (["$@"])
201 if $@;
202
203 # windows forces us to these contortions
204 my $ofs;
205
206 while () {
207 my $len = (length $result) - $ofs
208 or last;
209
210 $len = syswrite $w, $result, $len < 65536 ? $len : 65536, $ofs;
211
212 last if $len <= 0;
213
214 $ofs += $len;
215 }
216
217 close $w;
218
219 if (AnyEvent::WIN32) {
220 kill 9, $$; # yeah, windows for the win
221 } else {
222 # on native windows, _exit KILLS YOUR FORKED CHILDREN!
223 POSIX::_exit (0);
224 }
225 exit 1;
226
227 } elsif (($! != &Errno::EAGAIN && $! != &Errno::ENOMEM) || !$forks) {
228 # we ignore some errors as long as we can run at least one job
229 # maybe we should wait a few seconds and retry instead
230 die "fork_call: $!";
231 }
232 }
233 }
234
235 sub fork_call {
236 require Storable;
237
238 push @fork_queue, [@_];
239 _fork_schedule;
240 }
241
242 # to be removed
243 sub dotted_quad($) {
244 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
245 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
246 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
247 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
248 }
249
250 # just a forwarder
251 sub inet_aton {
252 require AnyEvent::Socket;
253 *inet_aton = \&AnyEvent::Socket::inet_aton;
254 goto &inet_aton
255 }
256
257 =item fh_nonblocking $fh, $nonblocking
258
259 Sets the blocking state of the given filehandle (true == nonblocking,
260 false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on
261 broken (i.e. windows) platforms.
262
263 =cut
264
265 sub fh_nonblocking($$) {
266 my ($fh, $nb) = @_;
267
268 require Fcntl;
269
270 if (AnyEvent::WIN32) {
271 $nb = (! ! $nb) + 0;
272 ioctl $fh, 0x8004667e, \$nb; # FIONBIO
273 } else {
274 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
275 }
276 }
277
278 =item $guard = guard { CODE }
279
280 This function creates a special object that, when called, will execute the
281 code block.
282
283 This is often handy in continuation-passing style code to clean up some
284 resource regardless of where you break out of a process.
285
286 You can call one method on the returned object:
287
288 =item $guard->cancel
289
290 This simply causes the code block not to be invoked: it "cancels" the
291 guard.
292
293 =cut
294
295 sub AnyEvent::Util::Guard::DESTROY {
296 ${$_[0]}->();
297 }
298
299 sub AnyEvent::Util::Guard::cancel($) {
300 ${$_[0]} = sub { };
301 }
302
303 sub guard(&) {
304 bless \(my $cb = shift), AnyEvent::Util::Guard::
305 }
306
307 1;
308
309 =back
310
311 =head1 AUTHOR
312
313 Marc Lehmann <schmorp@schmorp.de>
314 http://home.schmorp.de/
315
316 =cut
317