ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Util.pm (file contents):
Revision 1.5 by root, Sun Apr 27 20:17:46 2008 UTC vs.
Revision 1.16 by root, Sat May 17 21:34:15 2008 UTC

3AnyEvent::Util - various utility functions. 3AnyEvent::Util - various utility functions.
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::Util; 7 use AnyEvent::Util;
8
9 inet_aton $name, $cb->($ipn || undef);
10 8
11=head1 DESCRIPTION 9=head1 DESCRIPTION
12 10
13This module implements various utility functions, mostly replacing 11This module implements various utility functions, mostly replacing
14well-known functions by event-ised counterparts. 12well-known functions by event-ised counterparts.
15 13
14All functions documented without C<AnyEvent::Util::> prefix are exported
15by default.
16
16=over 4 17=over 4
17 18
18=cut 19=cut
19 20
20package AnyEvent::Util; 21package AnyEvent::Util;
21 22
22use strict; 23use strict;
23 24
24no warnings "uninitialized"; 25no warnings "uninitialized";
25 26
27use Errno;
26use Socket (); 28use Socket ();
29use IO::Socket::INET ();
27 30
28use AnyEvent; 31use AnyEvent;
29 32
30use base 'Exporter'; 33use base 'Exporter';
31 34
32#our @EXPORT = qw(gethostbyname gethostbyaddr); 35our @EXPORT = qw(inet_aton fh_nonblocking guard tcp_server tcp_connect);
33our @EXPORT_OK = qw(inet_aton);
34 36
35our $VERSION = '1.0'; 37our $VERSION = '1.0';
36 38
37our $MAXPARALLEL = 16; # max. number of parallel jobs 39our $MAXPARALLEL = 16; # max. number of parallel jobs
38 40
86my $has_ev_adns; 88my $has_ev_adns;
87 89
88sub has_ev_adns { 90sub has_ev_adns {
89 ($has_ev_adns ||= do { 91 ($has_ev_adns ||= do {
90 my $model = AnyEvent::detect; 92 my $model = AnyEvent::detect;
91 (($model eq "AnyEvent::Impl::CoroEV" or $model eq "AnyEvent::Impl::EV")
92 && eval { local $SIG{__DIE__}; require EV::ADNS }) 93 ($model eq "AnyEvent::Impl::EV" && eval { local $SIG{__DIE__}; require EV::ADNS })
93 ? 2 : 1 # so that || always detects as true 94 ? 2 : 1 # so that || always detects as true
94 }) - 1 # 2 => true, 1 => false 95 }) - 1 # 2 => true, 1 => false
95} 96}
96 97
97=item AnyEvent::Util::inet_aton $name_or_address, $cb->($binary_address_or_undef) 98=item inet_aton $name_or_address, $cb->($binary_address_or_undef)
98 99
99Works almost exactly like its Socket counterpart, except that it uses a 100Works almost exactly like its Socket counterpart, except that it uses a
100callback. 101callback. Also, if a host has only an IPv6 address, this might be passed
102to the callback instead (use the length to detetc this - 4 for IPv4, 16
103for IPv6).
104
105This function uses various shortcuts and will fall back to either
106L<EV::ADNS> or your systems C<inet_aton>.
101 107
102=cut 108=cut
103 109
104sub inet_aton { 110sub inet_aton {
105 my ($name, $cb) = @_; 111 my ($name, $cb) = @_;
106 112
107 if (&dotted_quad) { 113 if (&dotted_quad) {
108 $cb->(Socket::inet_aton $name); 114 $cb->(Socket::inet_aton $name);
115 } elsif ($name eq "localhost") { # rfc2606 et al.
116 $cb->(v127.0.0.1);
109 } elsif (&has_ev_adns) { 117 } elsif (&has_ev_adns) {
118 # work around some idiotic ands rfc readings
119 # rather hackish support for AAAA records (should
120 # wait for adns_getaddrinfo...)
121
122 my $loop = 10; # follow cname chains up to this length
123 my $qt;
124 my $acb; $acb = sub {
125 my ($status, undef, @a) = @_;
126
127 if ($status == &EV::ADNS::s_ok) {
128 if ($qt eq "a") {
129 return $cb->(Socket::inet_aton $a[0]);
130 } elsif ($qt eq "aaaa") {
131 return $cb->($a[0]);
132 } elsif ($qt eq "cname") {
133 $name = $a[0];
134 $qt = "a";
110 EV::ADNS::submit ($name, &EV::ADNS::r_addr, 0, sub { 135 return EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
111 my (undef, undef, @a) = @_; 136 }
112 $cb->(@a ? Socket::inet_aton $a[0] : undef); 137 } elsif ($status == &EV::ADNS::s_prohibitedcname) {
138 # follow cname chains
139 if ($loop--) {
140 $qt = "cname";
141 return EV::ADNS::submit ($name, &EV::ADNS::r_cname, 0, $acb);
142 }
143 } elsif ($status == &EV::ADNS::s_nodata) {
144 if ($qt eq "a") {
145 # ask for raw AAAA (might not be a good method, but adns is too broken...)
146 $qt = "aaaa";
147 return EV::ADNS::submit ($name, &EV::ADNS::r_unknown | 28, 0, $acb);
148 }
149 }
150
151 $cb->(undef);
113 }); 152 };
153
154 $qt = "a";
155 EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
114 } else { 156 } else {
115 _do_asy $cb, sub { Socket::inet_aton $_[0] }, @_; 157 _do_asy $cb, sub { Socket::inet_aton $_[0] }, @_;
116 } 158 }
117} 159}
118 160
161=item fh_nonblocking $fh, $nonblocking
162
163Sets the blocking state of the given filehandle (true == nonblocking,
164false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on
165broken (i.e. windows) platforms.
166
167=cut
168
169sub fh_nonblocking($$) {
170 my ($fh, $nb) = @_;
171
172 require Fcntl;
173
174 if ($^O eq "MSWin32") {
175 $nb = (! ! $nb) + 0;
176 ioctl $fh, 0x8004667e, \$nb; # FIONBIO
177 } else {
178 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
179 }
180}
181
182=item $guard = guard { CODE }
183
184This function creates a special object that, when called, will execute the
185code block.
186
187This is often handy in continuation-passing style code to clean up some
188resource regardless of where you break out of a process.
189
190=cut
191
192sub AnyEvent::Util::Guard::DESTROY {
193 ${$_[0]}->();
194}
195
196sub guard(&) {
197 bless \(my $cb = shift), AnyEvent::Util::Guard::
198}
199
200=item my $guard = AnyEvent::Util::tcp_connect $host, $port, $connect_cb[, $prepare_cb]
201
202This function is experimental.
203
204This is a convenience function that creates a tcp socket and makes a 100%
205non-blocking connect to the given C<$host> (which can be a hostname or a
206textual IP address) and C<$port>.
207
208Unless called in void context, it returns a guard object that will
209automatically abort connecting when it gets destroyed (it does not do
210anything to the socket after the conenct was successful).
211
212If the connect is successful, then the C<$connect_cb> will be invoked with
213the socket filehandle (in non-blocking mode) as first and the peer host
214(as a textual IP address) and peer port as second and third arguments,
215respectively.
216
217If the connect is unsuccessful, then the C<$connect_cb> will be invoked
218without any arguments and C<$!> will be set appropriately (with C<ENXIO>
219indicating a dns resolution failure).
220
221The filehandle is suitable to be plugged into L<AnyEvent::Handle>, but can
222be used as a normal perl file handle as well.
223
224Sometimes you need to "prepare" the socket before connecting, for example,
225to C<bind> it to some port, or you want a specific connect timeout that
226is lower than your kernel's default timeout. In this case you can specify
227a second callback, C<$prepare_cb>. It will be called with the file handle
228in not-yet-connected state as only argument and must return the connection
229timeout value (or C<0>, C<undef> or the empty list to indicate the default
230timeout is to be used).
231
232Note that the socket could be either a IPv4 TCP socket or an IPv6 tcp
233socket (although only IPv4 is currently supported by this module).
234
235Simple Example: connect to localhost on port 22.
236
237 AnyEvent::Util::tcp_connect localhost => 22, sub {
238 my $fh = shift
239 or die "unable to connect: $!";
240 # do something
241 };
242
243Complex Example: connect to www.google.com on port 80 and make a simple
244GET request without much error handling. Also limit the connection timeout
245to 15 seconds.
246
247 AnyEvent::Util::tcp_connect "www.google.com", 80,
248 sub {
249 my ($fh) = @_
250 or die "unable to connect: $!";
251
252 my $handle; # avoid direct assignment so on_eof has it in scope.
253 $handle = new AnyEvent::Handle
254 fh => $fh,
255 on_eof => sub {
256 undef $handle; # keep it alive till eof
257 warn "done.\n";
258 };
259
260 $handle->push_write ("GET / HTTP/1.0\015\012\015\012");
261
262 $handle->push_read_line ("\015\012\015\012", sub {
263 my ($handle, $line) = @_;
264
265 # print response header
266 print "HEADER\n$line\n\nBODY\n";
267
268 $handle->on_read (sub {
269 # print response body
270 print $_[0]->rbuf;
271 $_[0]->rbuf = "";
272 });
273 });
274 }, sub {
275 my ($fh) = @_;
276 # could call $fh->bind etc. here
277
278 15
279 };
280
281=cut
282
283sub tcp_connect($$$;$) {
284 my ($host, $port, $connect, $prepare) = @_;
285
286 # see http://cr.yp.to/docs/connect.html for some background
287
288 my %state = ( fh => undef );
289
290 # name resolution
291 inet_aton $host, sub {
292 return unless exists $state{fh};
293
294 my $ipn = shift;
295
296 4 == length $ipn
297 or do {
298 %state = ();
299 $! = &Errno::ENXIO;
300 return $connect->();
301 };
302
303 # socket creation
304 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
305 or do {
306 %state = ();
307 return $connect->();
308 };
309
310 fh_nonblocking $state{fh}, 1;
311
312 # prepare and optional timeout
313 if ($prepare) {
314 my $timeout = $prepare->($state{fh});
315
316 $state{to} = AnyEvent->timer (after => $timeout, cb => sub {
317 %state = ();
318 $! = &Errno::ETIMEDOUT;
319 $connect->();
320 }) if $timeout;
321 }
322
323 # called when the connect was successful, which,
324 # in theory, could be the case immediately (but never is in practise)
325 my $connected = sub {
326 my $fh = delete $state{fh};
327 %state = ();
328
329 # we are connected, or maybe there was an error
330 if (my $sin = getpeername $fh) {
331 my ($port, $host) = Socket::unpack_sockaddr_in $sin;
332 $connect->($fh, (Socket::inet_ntoa $host), $port);
333 } else {
334 # dummy read to fetch real error code
335 sysread $fh, my $buf, 1;
336 $connect->();
337 }
338 };
339
340 # now connect
341 if (connect $state{fh}, Socket::pack_sockaddr_in $port, $ipn) {
342 $connected->();
343 } elsif ($! == &Errno::EINPROGRESS || $! == &Errno::EWOULDBLOCK) { # EINPROGRESS is POSIX
344 $state{ww} = AnyEvent->io (fh => $state{fh}, poll => 'w', cb => $connected);
345 } else {
346 %state = ();
347 $connect->();
348 }
349 };
350
351 defined wantarray
352 ? guard { %state = () } # break any circular dependencies and unregister watchers
353 : ()
354}
355
356=item $guard = AnyEvent::Util::tcp_server $host, $port, $accept_cb[, $prepare_cb]
357
358This function is experimental.
359
360Create and bind a tcp socket to the given host (any IPv4 host if undef,
361otherwise it must be an IPv4 or IPv6 address) and port (or an ephemeral
362port if given as zero or undef), set the SO_REUSEADDR flag and call
363C<listen>.
364
365For each new connection that could be C<accept>ed, call the C<$accept_cb>
366with the filehandle (in non-blocking mode) as first and the peer host and
367port as second and third arguments (see C<tcp_connect> for details).
368
369Croaks on any errors.
370
371If called in non-void context, then this function returns a guard object
372whose lifetime it tied to the tcp server: If the object gets destroyed,
373the server will be stopped (but existing accepted connections will
374continue).
375
376If you need more control over the listening socket, you can provide a
377C<$prepare_cb>, which is called just before the C<listen ()> call, with
378the listen file handle as first argument.
379
380It should return the length of the listen queue (or C<0> for the default).
381
382Example: bind on tcp port 8888 on the local machine and tell each client
383to go away.
384
385 AnyEvent::Util::tcp_server undef, 8888, sub {
386 my ($fh, $host, $port) = @_;
387
388 syswrite $fh, "The internet is full, $host:$port. Go away!\015\012";
389 };
390
391=cut
392
393sub tcp_server($$$;$) {
394 my ($host, $port, $accept, $prepare) = @_;
395
396 my %state;
397
398 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
399 or Carp::croak "socket: $!";
400
401 setsockopt $state{fh}, &Socket::SOL_SOCKET, &Socket::SO_REUSEADDR, 1
402 or Carp::croak "so_reuseaddr: $!";
403
404 bind $state{fh}, Socket::pack_sockaddr_in $port, Socket::inet_aton ($host || "0.0.0.0")
405 or Carp::croak "bind: $!";
406
407 fh_nonblocking $state{fh}, 1;
408
409 my $len = ($prepare && $prepare->($state{fh})) || 128;
410
411 listen $state{fh}, $len
412 or Carp::croak "listen: $!";
413
414 $state{aw} = AnyEvent->io (fh => $state{fh}, poll => 'r', cb => sub {
415 # this closure keeps $state alive
416 while (my $peer = accept my $fh, $state{fh}) {
417 fh_nonblocking $fh, 1; # POSIX requires inheritance, the outside world does not
418 my ($port, $host) = Socket::unpack_sockaddr_in $peer;
419 $accept->($fh, (Socket::inet_ntoa $host), $port);
420 }
421 });
422
423 defined wantarray
424 ? guard { %state = () } # clear fh and watcher, which breaks the circular dependency
425 : ()
426}
427
1191; 4281;
120 429
121=back 430=back
122 431
123=head1 AUTHOR 432=head1 AUTHOR

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines