ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Util.pm (file contents):
Revision 1.12 by root, Sat May 17 19:31:36 2008 UTC vs.
Revision 1.35 by root, Tue May 27 03:13:44 2008 UTC

3AnyEvent::Util - various utility functions. 3AnyEvent::Util - various utility functions.
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::Util; 7 use AnyEvent::Util;
8
9 inet_aton $name, $cb->($ipn || undef);
10 8
11=head1 DESCRIPTION 9=head1 DESCRIPTION
12 10
13This module implements various utility functions, mostly replacing 11This module implements various utility functions, mostly replacing
14well-known functions by event-ised counterparts. 12well-known functions by event-ised counterparts.
15 13
14All functions documented without C<AnyEvent::Util::> prefix are exported
15by default.
16
16=over 4 17=over 4
17 18
18=cut 19=cut
19 20
20package AnyEvent::Util; 21package AnyEvent::Util;
21 22
23no warnings;
22use strict; 24use strict;
23 25
24no warnings "uninitialized"; 26use Carp ();
25
26use Errno; 27use Errno ();
27use Socket (); 28use Socket ();
28use IO::Socket::INET ();
29 29
30use AnyEvent; 30use AnyEvent ();
31 31
32use base 'Exporter'; 32use base 'Exporter';
33 33
34#our @EXPORT = qw(gethostbyname gethostbyaddr); 34our @EXPORT = qw(fh_nonblocking guard fork_call portable_pipe);
35our @EXPORT_OK = qw(inet_aton); 35our @EXPORT_OK = qw(AF_INET6 WSAWOULDBLOCK WSAEINPROGRESS);
36 36
37our $VERSION = '1.0'; 37our $VERSION = '1.0';
38 38
39our $MAXPARALLEL = 16; # max. number of parallel jobs 39BEGIN {
40 my $posix = 1 * eval { local $SIG{__DIE__}; require POSIX };
41 eval "sub POSIX() { $posix }";
42}
40 43
41our $running; 44BEGIN {
42our @queue; 45 # TODO remove this once not used anymore
46 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it
47}
43 48
49BEGIN {
50 my $af_inet6 = eval { local $SIG{__DIE__}; &Socket::AF_INET6 };
51
52 # uhoh
53 $af_inet6 ||= 10 if $^O =~ /linux/;
54 $af_inet6 ||= 23 if $^O =~ /cygwin/i;
55 $af_inet6 ||= 23 if AnyEvent::WIN32;
56 $af_inet6 ||= 24 if $^O =~ /openbsd|netbsd/;
57 $af_inet6 ||= 28 if $^O =~ /freebsd/;
58
59 $af_inet6 && socket my $ipv6_socket, $af_inet6, &Socket::SOCK_STREAM, 0 # check if they can be created
60 or $af_inet6 = 0;
61
62 eval "sub AF_INET6() { $af_inet6 }"; die if $@;
63
64 delete $AnyEvent::PROTOCOL{ipv6} unless $af_inet6;
65}
66
67BEGIN {
68 # broken windows perls use undocumented error codes...
69 if (AnyEvent::WIN32) {
70 eval "sub WSAWOULDBLOCK() { 10035 }";
71 eval "sub WSAEINPROGRESS() { 10036 }";
72 } else {
73 eval "sub WSAWOULDBLOCK() { -1e99 }"; # should never match any errno value
74 eval "sub WSAEINPROGRESS() { -1e99 }"; # should never match any errno value
75 }
76}
77
78=item ($r, $w) = portable_pipe
79
80Calling C<pipe> in Perl is portable - except it doesn't really work on
81sucky windows platforms (at least not with most perls - cygwin's perl
82notably works fine).
83
84On that platform, you actually get two file handles you cannot use select
85on.
86
87This function gives you a pipe that actually works even on the broken
88Windows platform (by creating a pair of TCP sockets, so do not expect any
89speed from that).
90
91Returns the empty list on any errors.
92
93=cut
94
95sub portable_pipe() {
96 my ($r, $w);
97
98 if (AnyEvent::WIN32) {
99 socketpair $r, $w, &Socket::AF_UNIX, &Socket::SOCK_STREAM, 0
100 or return;
101 } else {
102 pipe $r, $w
103 or return;
104 }
105
106 ($r, $w)
107}
108
109=item fork_call $coderef, @args, $cb->(@res)
110
111Executes the given code reference asynchronously, by forking. Everything
112the C<$coderef> returns will transferred to the calling process (by
113serialising and deserialising via L<Storable>).
114
115If there are any errors, then the C<$cb> will be called without any
116arguments. In that case, either C<$@> contains the exception, or C<$!>
117contains an error number. In all other cases, C<$@> will be C<undef>ined.
118
119The C<$coderef> must not ever call an event-polling function or use
120event-based programming.
121
122Note that forking can be expensive in large programs (RSS 200MB+). On
123windows, it is abysmally slow, do not expect more than 5..20 forks/s on
124that sucky platform (note this uses perl's pseudo-threads, so avoid those
125like the plague).
126
127=item $AnyEvent::Util::MAX_FORKS [default: 10]
128
129The maximum number of child processes that C<fork_call> will fork in
130parallel. Any additional requests will be queued until a slot becomes free
131again.
132
133The environment variable C<PERL_ANYEVENT_MAX_FORKS> is used to initialise
134this value.
135
136=cut
137
138our $MAX_FORKS = int 1 * $ENV{PERL_ANYEVENT_MAX_FORKS};
139$MAX_FORKS = 10 if $MAX_FORKS <= 0;
140
141my $forks;
142my @fork_queue;
143
44sub _schedule; 144sub _fork_schedule;
45sub _schedule { 145sub _fork_schedule {
46 return unless @queue; 146 while () {
47 return if $running >= $MAXPARALLEL; 147 return if $forks >= $MAX_FORKS;
48 148
49 ++$running; 149 my $job = shift @fork_queue
50 my ($cb, $sub, @args) = @{shift @queue}; 150 or return;
51 151
52 if (eval { local $SIG{__DIE__}; require POSIX }) { 152 ++$forks;
53 my $pid = open my $fh, "-|";
54 153
154 my $coderef = shift @$job;
155 my $cb = pop @$job;
156
157 # gimme a break...
158 my ($r, $w) = portable_pipe
159 or ($forks and last) # allow failures when we have at least one job
160 or die "fork_call: $!";
161
162 my $pid = fork;
163
164 if ($pid != 0) {
165 # parent
166 close $w;
167
168 my $buf;
169
170 my $ww; $ww = AnyEvent->io (fh => $r, poll => 'r', cb => sub {
171 my $len = sysread $r, $buf, 65536, length $buf;
172
173 if ($len <= 0) {
174 undef $ww;
175 close $r;
176 --$forks;
177 _fork_schedule;
178
179 my $result = eval { Storable::thaw ($buf) };
180 $result = [$@] unless $result;
181 $@ = shift @$result;
182
183 $cb->(@$result);
184
185 # clean up the pid
186 waitpid $pid, 0;
187 }
188 });
189
55 if (!defined $pid) { 190 } elsif (defined $pid) {
56 die "fork: $!"; 191 # child
57 } elsif (!$pid) { 192 close $r;
58 syswrite STDOUT, join "\0", map { unpack "H*", $_ } $sub->(@args); 193
194 my $result = eval {
195 local $SIG{__DIE__};
196
197 Storable::freeze ([undef, $coderef->(@$job)])
198 };
199
200 $result = Storable::freeze (["$@"])
201 if $@;
202
203 # windows forces us to these contortions
204 my $ofs;
205
206 while () {
207 my $len = (length $result) - $ofs
208 or last;
209
210 $len = syswrite $w, $result, $len < 65536 ? $len : 65536, $ofs;
211
212 last if $len <= 0;
213
214 $ofs += $len;
215 }
216
217 close $w;
218
219 if (AnyEvent::WIN32) {
220 kill 9, $$; # yeah, windows for the win
221 } else {
222 # on native windows, _exit KILLS YOUR FORKED CHILDREN!
59 POSIX::_exit (0); 223 POSIX::_exit (0);
224 }
225 exit 1;
226
227 } elsif (($! != &Errno::EAGAIN && $! != &Errno::ENOMEM) || !$forks) {
228 # we ignore some errors as long as we can run at least one job
229 # maybe we should wait a few seconds and retry instead
230 die "fork_call: $!";
60 } 231 }
61
62 my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
63 --$running;
64 _schedule;
65 undef $w;
66
67 my $buf;
68 sysread $fh, $buf, 16384, length $buf;
69 $cb->(map { pack "H*", $_ } split /\0/, $buf);
70 });
71 } else {
72 $cb->($sub->(@args));
73 } 232 }
74} 233}
75 234
76sub _do_asy { 235sub fork_call {
236 require Storable;
237
77 push @queue, [@_]; 238 push @fork_queue, [@_];
78 _schedule; 239 _fork_schedule;
79} 240}
80 241
242# to be removed
81sub dotted_quad($) { 243sub dotted_quad($) {
82 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 244 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
83 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 245 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
84 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 246 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
85 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x 247 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
86} 248}
87 249
88my $has_ev_adns; 250# just a forwarder
89
90sub has_ev_adns {
91 ($has_ev_adns ||= do {
92 my $model = AnyEvent::detect;
93 ($model eq "AnyEvent::Impl::EV" && eval { local $SIG{__DIE__}; require EV::ADNS })
94 ? 2 : 1 # so that || always detects as true
95 }) - 1 # 2 => true, 1 => false
96}
97
98=item AnyEvent::Util::inet_aton $name_or_address, $cb->($binary_address_or_undef)
99
100Works almost exactly like its Socket counterpart, except that it uses a
101callback.
102
103=cut
104
105sub inet_aton { 251sub inet_aton {
106 my ($name, $cb) = @_; 252 require AnyEvent::Socket;
107 253 *inet_aton = \&AnyEvent::Socket::inet_aton;
108 if (&dotted_quad) { 254 goto &inet_aton
109 $cb->(Socket::inet_aton $name);
110 } elsif ($name eq "localhost") { # rfc2606 et al.
111 $cb->(v127.0.0.1);
112 } elsif (&has_ev_adns) {
113 EV::ADNS::submit ($name, &EV::ADNS::r_addr, 0, sub {
114 my (undef, undef, @a) = @_;
115 $cb->(@a ? Socket::inet_aton $a[0] : undef);
116 });
117 } else {
118 _do_asy $cb, sub { Socket::inet_aton $_[0] }, @_;
119 }
120} 255}
121 256
122=item AnyEvent::Util::fh_nonblocking $fh, $nonblocking 257=item fh_nonblocking $fh, $nonblocking
123 258
124Sets the blocking state of the given filehandle (true == nonblocking, 259Sets the blocking state of the given filehandle (true == nonblocking,
125false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on 260false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on
126broken (i.e. windows) platforms. 261broken (i.e. windows) platforms.
127 262
130sub fh_nonblocking($$) { 265sub fh_nonblocking($$) {
131 my ($fh, $nb) = @_; 266 my ($fh, $nb) = @_;
132 267
133 require Fcntl; 268 require Fcntl;
134 269
135 if ($^O eq "MSWin32") { 270 if (AnyEvent::WIN32) {
136 $nb = (! ! $nb) + 0; 271 $nb = (! ! $nb) + 0;
137 ioctl $fh, 0x8004667e, \$nb; # FIONBIO 272 ioctl $fh, 0x8004667e, \$nb; # FIONBIO
138 } else { 273 } else {
139 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0; 274 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
140 } 275 }
141} 276}
142 277
278=item $guard = guard { CODE }
279
280This function creates a special object that, when called, will execute the
281code block.
282
283This is often handy in continuation-passing style code to clean up some
284resource regardless of where you break out of a process.
285
286You can call one method on the returned object:
287
288=item $guard->cancel
289
290This simply causes the code block not to be invoked: it "cancels" the
291guard.
292
293=cut
294
143sub AnyEvent::Util::Guard::DESTROY { 295sub AnyEvent::Util::Guard::DESTROY {
144 ${$_[0]}->(); 296 ${$_[0]}->();
145} 297}
146 298
147=item $guard = AnyEvent::Util::guard { CODE } 299sub AnyEvent::Util::Guard::cancel($) {
148 300 ${$_[0]} = sub { };
149This function creates a special object that, when called, will execute the 301}
150code block.
151
152This is often handy in continuation-passing style code to clean up some
153resource regardless of where you break out of a process.
154
155=cut
156 302
157sub guard(&) { 303sub guard(&) {
158 bless \(my $cb = shift), AnyEvent::Util::Guard:: 304 bless \(my $cb = shift), AnyEvent::Util::Guard::
159} 305}
160 306
161=item my $guard = AnyEvent::Util::tcp_connect $host, $port, $connect_cb[, $prepare_cb]
162
163This is a convenience function that creates a tcp socket and makes a 100%
164non-blocking connect to the given C<$host> (which can be a hostname or a
165textual IP address) and C<$port>.
166
167Unless called in void context, it returns a guard object that will
168automatically abort connecting when it gets destroyed (it does not do
169anything to the socket after the conenct was successful).
170
171If the connect is successful, then the C<$connect_cb> will be invoked with
172the socket filehandle (in non-blocking mode) as first and the peer host
173(as a textual IP address) and peer port as second and third arguments,
174respectively.
175
176If the connect is unsuccessful, then the C<$connect_cb> will be invoked
177without any arguments and C<$!> will be set appropriately (with C<ENXIO>
178indicating a dns resolution failure).
179
180The filehandle is suitable to be plugged into L<AnyEvent::Handle>, but can
181be used as a normal perl file handle as well.
182
183Sometimes you need to "prepare" the socket before connecting, for example,
184to C<bind> it to some port, or you want a specific connect timeout that
185is lower than your kernel's default timeout. In this case you can specify
186a second callback, C<$prepare_cb>. It will be called with the file handle
187in not-yet-connected state as only argument and must return the connection
188timeout value (or C<0>, C<undef> or the empty list to indicate the default
189timeout is to be used).
190
191Note that the socket could be either a IPv4 TCP socket or an IPv6 tcp
192socket (although only IPv4 is currently supported by this module).
193
194Simple Example: connect to localhost on port 22.
195
196 AnyEvent::Util::tcp_connect localhost => 22, sub {
197 my $fh = shift
198 or die "unable to connect: $!";
199 # do something
200 };
201
202Complex Example: connect to www.google.com on port 80 and make a simple
203GET request without much error handling. Also limit the connection timeout
204to 15 seconds.
205
206 AnyEvent::Util::tcp_connect "www.google.com", 80,
207 sub {
208 my ($fh) = @_
209 or die "unable to connect: $!";
210
211 my $handle; # avoid direct assignment so on_eof has it in scope.
212 $handle = new AnyEvent::Handle
213 fh => $fh,
214 on_eof => sub {
215 undef $handle; # keep it alive till eof
216 warn "done.\n";
217 };
218
219 $handle->push_write ("GET / HTTP/1.0\015\012\015\012");
220
221 $handle->push_read_line ("\015\012\015\012", sub {
222 my ($handle, $line) = @_;
223
224 # print response header
225 print "HEADER\n$line\n\nBODY\n";
226
227 $handle->on_read (sub {
228 # print response body
229 print $_[0]->rbuf;
230 $_[0]->rbuf = "";
231 });
232 });
233 }, sub {
234 my ($fh) = @_;
235 # could call $fh->bind etc. here
236
237 15
238 };
239
240=cut
241
242sub tcp_connect($$$;$) {
243 my ($host, $port, $connect, $prepare) = @_;
244
245 # see http://cr.yp.to/docs/connect.html for some background
246
247 my %state = ( fh => undef );
248
249 # name resolution
250 inet_aton $host, sub {
251 return unless exists $state{fh};
252
253 my $ipn = shift
254 or do {
255 %state = ();
256 $! = &Errno::ENXIO;
257 return $connect->();
258 };
259
260 # socket creation
261 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
262 or do {
263 %state = ();
264 return $connect->();
265 };
266
267 fh_nonblocking $state{fh}, 1;
268
269 # prepare and optional timeout
270 if ($prepare) {
271 my $timeout = $prepare->($state{fh});
272
273 $state{to} = AnyEvent->timer (after => $timeout, cb => sub {
274 %state = ();
275 $! = &Errno::ETIMEDOUT;
276 $connect->();
277 }) if $timeout;
278 }
279
280 # called when the connect was successful, which,
281 # in theory, could be the case immediately (but never is in practise)
282 my $connected = sub {
283 my $fh = delete $state{fh};
284 %state = ();
285
286 # we are connected, or maybe there was an error
287 if (my $sin = getpeername $fh) {
288 my ($port, $host) = Socket::unpack_sockaddr_in $sin;
289 $connect->($fh, (Socket::inet_ntoa $host), $port);
290 } else {
291 # dummy read to fetch real error code
292 sysread $fh, my $buf, 1;
293 $connect->();
294 }
295 };
296
297 # now connect
298 if (connect $state{fh}, Socket::pack_sockaddr_in $port, $ipn) {
299 $connected->();
300 } elsif ($! == &Errno::EINPROGRESS || $! == &Errno::EWOULDBLOCK) { # EINPROGRESS is POSIX
301 $state{ww} = AnyEvent->io (fh => $state{fh}, poll => 'w', cb => $connected);
302 } else {
303 %state = ();
304 $connect->();
305 }
306 };
307
308 defined wantarray
309 ? guard { %state = () }
310 : ()
311}
312
313=item $guard = AnyEvent::Util::tcp_server $host, $port, $accept_cb[, $prepare_cb]
314
315Create and bind a tcp socket to the given host (any IPv4 host if undef,
316otherwise it must be an IPv4 or IPv6 address) and port (or an ephemeral
317port if given as zero or undef), set the SO_REUSEADDR flag and call
318C<listen>.
319
320For each new connection that could be C<accept>ed, call the C<$accept_cb>
321with the filehandle (in non-blocking mode) as first and the peer host and
322port as second and third arguments (see C<tcp_connect> for details).
323
324Croaks on any errors.
325
326If called in non-void context, then this function returns a guard object
327whose lifetime it tied to the tcp server: If the object gets destroyed,
328the server will be stopped (but existing accepted connections will
329continue).
330
331If you need more control over the listening socket, you can provide a
332C<$prepare_cb>, which is called just before the C<listen ()> call, with
333the listen file handle as first argument.
334
335It should return the length of the listen queue (or C<0> for the default).
336
337Example: bind on tcp port 8888 on the local machine and tell each client
338to go away.
339
340 AnyEvent::Util::tcp_server undef, 8888, sub {
341 my ($fh, $host, $port) = @_;
342
343 syswrite $fh, "The internet is full, $host:$port. Go away!\015\012";
344 };
345
346=cut
347
348sub tcp_server($$$;$) {
349 my ($host, $port, $accept, $prepare) = @_;
350
351 my %state;
352
353 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
354 or Carp::croak "socket: $!";
355
356 setsockopt $state{fh}, &Socket::SOL_SOCKET, &Socket::SO_REUSEADDR, 1
357 or Carp::croak "so_reuseaddr: $!";
358
359 bind $state{fh}, Socket::pack_sockaddr_in $port, Socket::inet_aton ($host || "0.0.0.0")
360 or Carp::croak "bind: $!";
361
362 fh_nonblocking $state{fh}, 1;
363
364 my $len = ($prepare && $prepare->($state{fh})) || 128;
365
366 listen $state{fh}, $len
367 or Carp::croak "listen: $!";
368
369 $state{aw} = AnyEvent->io (fh => $state{fh}, poll => 'r', cb => sub {
370 # this closure keeps $state alive
371 while (my $peer = accept my $fh, $state{fh}) {
372 fh_nonblocking $fh, 1; # POSIX requires inheritance, the outside world does not
373 my ($port, $host) = Socket::unpack_sockaddr_in $peer;
374 $accept->($fh, (Socket::inet_ntoa $host), $port);
375 }
376 });
377
378 defined wantarray
379 ? guard { %state = () }
380 : ()
381}
382
3831; 3071;
384 308
385=back 309=back
386 310
387=head1 AUTHOR 311=head1 AUTHOR

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines