ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Util.pm (file contents):
Revision 1.10 by elmex, Thu May 15 13:50:23 2008 UTC vs.
Revision 1.21 by root, Wed May 21 14:34:03 2008 UTC

3AnyEvent::Util - various utility functions. 3AnyEvent::Util - various utility functions.
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::Util; 7 use AnyEvent::Util;
8
9 inet_aton $name, $cb->($ipn || undef);
10 8
11=head1 DESCRIPTION 9=head1 DESCRIPTION
12 10
13This module implements various utility functions, mostly replacing 11This module implements various utility functions, mostly replacing
14well-known functions by event-ised counterparts. 12well-known functions by event-ised counterparts.
15 13
14All functions documented without C<AnyEvent::Util::> prefix are exported
15by default.
16
16=over 4 17=over 4
17 18
18=cut 19=cut
19 20
20package AnyEvent::Util; 21package AnyEvent::Util;
21 22
23no warnings;
22use strict; 24use strict;
23 25
24no warnings "uninitialized"; 26use Carp ();
25 27use Errno ();
26use Errno qw/ENXIO/;
27use Socket (); 28use Socket ();
28use IO::Socket::INET (); 29use IO::Socket::INET ();
29 30
30use AnyEvent; 31use AnyEvent;
31 32
32use base 'Exporter'; 33use base 'Exporter';
33 34
34#our @EXPORT = qw(gethostbyname gethostbyaddr); 35BEGIN {
35our @EXPORT_OK = qw(inet_aton); 36 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it
37}
38
39our @EXPORT = qw(inet_aton fh_nonblocking guard tcp_server tcp_connect);
36 40
37our $VERSION = '1.0'; 41our $VERSION = '1.0';
38 42
39our $MAXPARALLEL = 16; # max. number of parallel jobs 43our $MAXPARALLEL = 16; # max. number of parallel jobs
40 44
93 ($model eq "AnyEvent::Impl::EV" && eval { local $SIG{__DIE__}; require EV::ADNS }) 97 ($model eq "AnyEvent::Impl::EV" && eval { local $SIG{__DIE__}; require EV::ADNS })
94 ? 2 : 1 # so that || always detects as true 98 ? 2 : 1 # so that || always detects as true
95 }) - 1 # 2 => true, 1 => false 99 }) - 1 # 2 => true, 1 => false
96} 100}
97 101
98=item AnyEvent::Util::inet_aton $name_or_address, $cb->($binary_address_or_undef) 102=item inet_aton $name_or_address, $cb->(@addresses)
99 103
100Works almost exactly like its Socket counterpart, except that it uses a 104Works similarly to its Socket counterpart, except that it uses a
101callback. 105callback. Also, if a host has only an IPv6 address, this might be passed
106to the callback instead (use the length to detect this - 4 for IPv4, 16
107for IPv6).
108
109This function uses various shortcuts and will fall back to either
110L<EV::ADNS> or your systems C<inet_aton>.
111
112Unlike the L<Socket> function, you can get multiple IP addresses as result
113(currently only when EV::ADNS is being used).
102 114
103=cut 115=cut
104 116
105sub inet_aton { 117sub inet_aton {
106 my ($name, $cb) = @_; 118 my ($name, $cb) = @_;
107 119
108 if (&dotted_quad) { 120 if (&dotted_quad) {
109 $cb->(Socket::inet_aton $name); 121 $cb->(socket_inet_aton $name);
110 } elsif ($name eq "localhost") { # rfc2606 et al. 122 } elsif ($name eq "localhost") { # rfc2606 et al.
111 $cb->(v127.0.0.1); 123 $cb->(v127.0.0.1);
112 } elsif (&has_ev_adns) { 124 } elsif (&has_ev_adns) {
125 # work around some idiotic ands rfc readings
126 # rather hackish support for AAAA records (should
127 # wait for adns_getaddrinfo...)
128
129 my $loop = 10; # follow cname chains up to this length
130 my $qt;
131 my $acb; $acb = sub {
132 my ($status, undef, @a) = @_;
133
134 if ($status == &EV::ADNS::s_ok) {
135 if ($qt eq "a") {
136 return $cb->(map +(socket_inet_aton $_), @a);
137 } elsif ($qt eq "aaaa") {
138 return $cb->(@a);
139 } elsif ($qt eq "cname") {
140 $name = $a[0]; # there can only be one :)
141 $qt = "a";
113 EV::ADNS::submit ($name, &EV::ADNS::r_addr, 0, sub { 142 return EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
114 my (undef, undef, @a) = @_; 143 }
115 $cb->(@a ? Socket::inet_aton $a[0] : undef); 144 } elsif ($status == &EV::ADNS::s_prohibitedcname) {
145 # follow cname chains
146 if ($loop--) {
147 $qt = "cname";
148 return EV::ADNS::submit ($name, &EV::ADNS::r_cname, 0, $acb);
149 }
150 } elsif ($status == &EV::ADNS::s_nodata) {
151 if ($qt eq "a") {
152 # ask for raw AAAA (might not be a good method, but adns is too broken...)
153 $qt = "aaaa";
154 return EV::ADNS::submit ($name, &EV::ADNS::r_unknown | 28, 0, $acb);
155 }
156 }
157
158 $cb->();
116 }); 159 };
160
161 $qt = "a";
162 EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
117 } else { 163 } else {
118 _do_asy $cb, sub { Socket::inet_aton $_[0] }, @_; 164 _do_asy $cb, sub {
165 my $ipn = socket_inet_aton $_[0];
166 $ipn ? ($ipn) : ()
167 }, @_;
119 } 168 }
120} 169}
121 170
122=item AnyEvent::Util::fh_nonblocking $fh, $nonblocking 171=item fh_nonblocking $fh, $nonblocking
123 172
124Sets the blocking state of the given filehandle (true == nonblocking, 173Sets the blocking state of the given filehandle (true == nonblocking,
125false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on 174false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on
126broken (i.e. windows) platforms. 175broken (i.e. windows) platforms.
127 176
138 } else { 187 } else {
139 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0; 188 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
140 } 189 }
141} 190}
142 191
143=item AnyEvent::Util::connect ($socket, $connect_cb->($socket), $error_cb->()[, $timeout]) 192=item $guard = guard { CODE }
144 193
145Connects the socket C<$socket> non-blocking. C<$connect_cb> will be 194This function creates a special object that, when called, will execute the
146called when the socket was successfully connected and became writable, 195code block.
147the first argument to the C<$connect_cb> callback will be the C<$socket>
148itself.
149 196
150The blocking state of C<$socket> will be set to nonblocking via C<fh_nonblocking> (see 197This is often handy in continuation-passing style code to clean up some
151above). 198resource regardless of where you break out of a process.
152 199
153C<$error_cb> will be called when any error happened while connecting 200=cut
154the socket. C<$!> will be set to an appropriate error number.
155 201
156If C<$timeout> is given a timeout will be installed for the connect. If the 202sub AnyEvent::Util::Guard::DESTROY {
157timeout was reached the C<$error_cb> callback will be called and C<$!> is set to 203 ${$_[0]}->();
158C<ETIMEDOUT>. 204}
159 205
160The return value of C<connect> will be a guard object that you have to keep 206sub guard(&) {
161referenced until you are done with the connect or received an error. 207 bless \(my $cb = shift), AnyEvent::Util::Guard::
162If you let the object's reference drop to zero the internal connect and timeout 208}
163watchers will be removed.
164 209
165Here is a short example, which creates a socket and does a blocking DNS lookup via 210sub _tcp_port($) {
166L<IO::Socket::INET>: 211 $_[0] =~ /^(\d*)$/ and return $1*1;
167 212
168 my $sock = IO::Socket::INET->new ( 213 (getservbyname $_[0], "tcp")[2]
169 PeerAddr => "www.google.com:80", 214 or Carp::croak "$_[0]: service unknown"
170 Blocking => 0, 215}
171 ) or die "Couldn't make socket: $!\n";
172 216
173 my $hdl; 217=item my $guard = AnyEvent::Util::tcp_connect $host, $port, $connect_cb[, $prepare_cb]
174 218
175 my $watchobj = AnyEvent::Util::connect ($sock, sub { 219This function is experimental.
220
221This is a convenience function that creates a tcp socket and makes a 100%
222non-blocking connect to the given C<$host> (which can be a hostname or a
223textual IP address) and C<$port> (which can be a numeric port number or a
224service name).
225
226Unless called in void context, it returns a guard object that will
227automatically abort connecting when it gets destroyed (it does not do
228anything to the socket after the connect was successful).
229
230If the connect is successful, then the C<$connect_cb> will be invoked with
231the socket filehandle (in non-blocking mode) as first and the peer host
232(as a textual IP address) and peer port as second and third arguments,
233respectively.
234
235If the connect is unsuccessful, then the C<$connect_cb> will be invoked
236without any arguments and C<$!> will be set appropriately (with C<ENXIO>
237indicating a dns resolution failure).
238
239The filehandle is suitable to be plugged into L<AnyEvent::Handle>, but can
240be used as a normal perl file handle as well.
241
242Sometimes you need to "prepare" the socket before connecting, for example,
243to C<bind> it to some port, or you want a specific connect timeout that
244is lower than your kernel's default timeout. In this case you can specify
245a second callback, C<$prepare_cb>. It will be called with the file handle
246in not-yet-connected state as only argument and must return the connection
247timeout value (or C<0>, C<undef> or the empty list to indicate the default
248timeout is to be used).
249
250Note that the socket could be either a IPv4 TCP socket or an IPv6 tcp
251socket (although only IPv4 is currently supported by this module).
252
253Simple Example: connect to localhost on port 22.
254
255 AnyEvent::Util::tcp_connect localhost => 22, sub {
256 my $fh = shift
257 or die "unable to connect: $!";
258 # do something
259 };
260
261Complex Example: connect to www.google.com on port 80 and make a simple
262GET request without much error handling. Also limit the connection timeout
263to 15 seconds.
264
265 AnyEvent::Util::tcp_connect "www.google.com", "http",
266 sub {
176 my ($sock) = @_; 267 my ($fh) = @_
268 or die "unable to connect: $!";
177 269
178 $hdl = 270 my $handle; # avoid direct assignment so on_eof has it in scope.
179 AnyEvent::Handle->new ( 271 $handle = new AnyEvent::Handle
180 fh => $sock, 272 fh => $fh,
181 on_eof => sub { 273 on_eof => sub {
182 print "received eof\n"; 274 undef $handle; # keep it alive till eof
183 undef $hdl 275 warn "done.\n";
184 } 276 };
185 );
186 277
187 $hdl->push_write ("GET / HTTP/1.0\015\012\015\012"); 278 $handle->push_write ("GET / HTTP/1.0\015\012\015\012");
188 279
189 $hdl->push_read_line (sub { 280 $handle->push_read_line ("\015\012\015\012", sub {
190 my ($hdl, $line) = @_; 281 my ($handle, $line) = @_;
191 print "Yay, got line: $line\n";
192 });
193 282
194 }, sub { 283 # print response header
195 warn "Got error on connect: $!\n"; 284 print "HEADER\n$line\n\nBODY\n";
196 }, 10);
197 285
198=cut 286 $handle->on_read (sub {
199 287 # print response body
200sub connect { 288 print $_[0]->rbuf;
201 my ($socket, $c_cb, $e_cb, $tout) = @_; 289 $_[0]->rbuf = "";
202
203 fh_nonblocking ($socket, 1);
204
205 my $o = AnyEvent::Util::SocketHandle->new (
206 fh => $socket,
207 connect_cb => $c_cb,
208 error_cb => $e_cb,
209 timeout => $tout,
210 );
211
212 $o->connect;
213
214 $o
215}
216
217=item AnyEvent::Util::tcp_connect ($host, $port, $connect_cb->($socket), $error_cb->()[, $timeout])
218
219This is a shortcut function which behaves similar to the C<connect> function
220described above, except that it does a C<AnyEvent::Util::inet_aton> on C<$host>
221and creates a L<IO::Socket::INET> TCP connection for you, which will be
222passed as C<$socket> argument to the C<$connect_cb> callback above.
223
224In case the hostname couldn't be resolved C<$error_cb> will be called and C<$!>
225will be set to C<ENXIO>.
226
227For more details about the return value and the arguments see the C<connect>
228function above.
229
230Here is a short example:
231
232
233 my $hdl;
234 my $watchobj = AnyEvent::Util::tcp_connect ("www.google.com", 80, sub {
235 my ($sock) = @_;
236
237 $hdl =
238 AnyEvent::Handle->new (
239 fh => $sock,
240 on_eof => sub {
241 print "received eof\n";
242 undef $hdl
243 }
244 );
245
246 $hdl->push_write ("GET / HTTP/1.0\015\012\015\012");
247
248 $hdl->push_read_line (sub {
249 my ($hdl, $line) = @_;
250 print "Yay, got line: $line\n";
251 });
252
253 }, sub {
254 warn "Got error on connect: $!\n";
255 }, 10);
256
257=cut
258
259sub tcp_connect {
260 my ($host, $port, $c_cb, $e_cb, $tout, %sockargs) = @_;
261
262 my $o = AnyEvent::Util::SocketHandle->new (
263 connect_cb => $c_cb,
264 error_cb => $e_cb,
265 timeout => $tout,
266 );
267
268 $o->start_timeout;
269
270 AnyEvent::Util::inet_aton ($host, sub {
271 my ($addr) = @_;
272
273 return if $o->{timed_out};
274
275 if ($addr) {
276 my $sock =
277 IO::Socket::INET->new (
278 PeerHost => Socket::inet_ntoa ($addr),
279 PeerPort => $port,
280 Blocking => 0,
281 %sockargs
282 ); 290 });
291 });
292 }, sub {
293 my ($fh) = @_;
294 # could call $fh->bind etc. here
283 295
284 unless ($sock) { 296 15
285 $o->error; 297 };
298
299=cut
300
301sub tcp_connect($$$;$) {
302 my ($host, $port, $connect, $prepare) = @_;
303
304 # see http://cr.yp.to/docs/connect.html for some background
305
306 my %state = ( fh => undef );
307
308 # name resolution
309 inet_aton $host, sub {
310 return unless exists $state{fh};
311
312 my $ipn = shift;
313
314 4 == length $ipn
315 or do {
316 %state = ();
317 $! = &Errno::ENXIO;
318 return $connect->();
319 };
320
321 # socket creation
322 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
323 or do {
324 %state = ();
325 return $connect->();
326 };
327
328 fh_nonblocking $state{fh}, 1;
329
330 # prepare and optional timeout
331 if ($prepare) {
332 my $timeout = $prepare->($state{fh});
333
334 $state{to} = AnyEvent->timer (after => $timeout, cb => sub {
335 %state = ();
336 $! = &Errno::ETIMEDOUT;
337 $connect->();
338 }) if $timeout;
339 }
340
341 # called when the connect was successful, which,
342 # in theory, could be the case immediately (but never is in practise)
343 my $connected = sub {
344 my $fh = delete $state{fh};
345 %state = ();
346
347 # we are connected, or maybe there was an error
348 if (my $sin = getpeername $fh) {
349 my ($port, $host) = Socket::unpack_sockaddr_in $sin;
350 $connect->($fh, (Socket::inet_ntoa $host), $port);
351 } else {
352 # dummy read to fetch real error code
353 sysread $fh, my $buf, 1 if $! == &Errno::ENOTCONN;
354 $connect->();
286 } 355 }
356 };
287 357
288 fh_nonblocking ($sock, 1); 358 # now connect
289 359 if (connect $state{fh}, Socket::pack_sockaddr_in _tcp_port $port, $ipn) {
290 $o->{fh} = $sock;
291
292 $o->connect; 360 $connected->();
293 361 } elsif ($! == &Errno::EINPROGRESS || $! == &Errno::EWOULDBLOCK) { # EINPROGRESS is POSIX
362 $state{ww} = AnyEvent->io (fh => $state{fh}, poll => 'w', cb => $connected);
294 } else { 363 } else {
295 $! = ENXIO; 364 %state = ();
296 $o->error; 365 $connect->();
366 }
367 };
368
369 defined wantarray
370 ? guard { %state = () } # break any circular dependencies and unregister watchers
371 : ()
372}
373
374=item $guard = AnyEvent::Util::tcp_server $host, $port, $accept_cb[, $prepare_cb]
375
376This function is experimental.
377
378Create and bind a tcp socket to the given host (any IPv4 host if undef,
379otherwise it must be an IPv4 or IPv6 address) and port (service name or
380numeric port number, or an ephemeral port if given as zero or undef), set
381the SO_REUSEADDR flag and call C<listen>.
382
383For each new connection that could be C<accept>ed, call the C<$accept_cb>
384with the filehandle (in non-blocking mode) as first and the peer host and
385port as second and third arguments (see C<tcp_connect> for details).
386
387Croaks on any errors.
388
389If called in non-void context, then this function returns a guard object
390whose lifetime it tied to the tcp server: If the object gets destroyed,
391the server will be stopped (but existing accepted connections will
392continue).
393
394If you need more control over the listening socket, you can provide a
395C<$prepare_cb>, which is called just before the C<listen ()> call, with
396the listen file handle as first argument.
397
398It should return the length of the listen queue (or C<0> for the default).
399
400Example: bind on tcp port 8888 on the local machine and tell each client
401to go away.
402
403 AnyEvent::Util::tcp_server undef, 8888, sub {
404 my ($fh, $host, $port) = @_;
405
406 syswrite $fh, "The internet is full, $host:$port. Go away!\015\012";
407 };
408
409=cut
410
411sub tcp_server($$$;$) {
412 my ($host, $port, $accept, $prepare) = @_;
413
414 my %state;
415
416 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
417 or Carp::croak "socket: $!";
418
419 setsockopt $state{fh}, &Socket::SOL_SOCKET, &Socket::SO_REUSEADDR, 1
420 or Carp::croak "so_reuseaddr: $!";
421
422 bind $state{fh}, Socket::pack_sockaddr_in _tcp_port $port, socket_inet_aton ($host || "0.0.0.0")
423 or Carp::croak "bind: $!";
424
425 fh_nonblocking $state{fh}, 1;
426
427 my $len = ($prepare && $prepare->($state{fh})) || 128;
428
429 listen $state{fh}, $len
430 or Carp::croak "listen: $!";
431
432 $state{aw} = AnyEvent->io (fh => $state{fh}, poll => 'r', cb => sub {
433 # this closure keeps $state alive
434 while (my $peer = accept my $fh, $state{fh}) {
435 fh_nonblocking $fh, 1; # POSIX requires inheritance, the outside world does not
436 my ($port, $host) = Socket::unpack_sockaddr_in $peer;
437 $accept->($fh, (Socket::inet_ntoa $host), $port);
297 } 438 }
298 }); 439 });
299 440
300 $o 441 defined wantarray
301} 442 ? guard { %state = () } # clear fh and watcher, which breaks the circular dependency
302
303=item AnyEvent::Util::listen ($socket, $client_cb->($new_socket, $peer_ad), $error_cb->())
304
305This will listen and accept new connections on the C<$socket> in a non-blocking
306way. The callback C<$client_cb> will be called when a new client connection
307was accepted and the callback C<$error_cb> will be called in case of an error.
308C<$!> will be set to an approriate error number.
309
310The blocking state of C<$socket> will be set to nonblocking via C<fh_nonblocking> (see
311above).
312
313The first argument to C<$client_cb> will be the socket of the accepted client
314and the second argument the peer address.
315
316The return value is a guard object that you have to keep referenced as long as you
317want to accept new connections.
318
319Here is an example usage:
320
321 my $sock = IO::Socket::INET->new (
322 Listen => 5
323 ) or die "Couldn't make socket: $!\n";
324
325 my $watchobj = AnyEvent::Util::listen ($sock, sub {
326 my ($cl_sock, $cl_addr) = @_;
327
328 my ($port, $addr) = sockaddr_in ($cl_addr);
329 $addr = inet_ntoa ($addr);
330 print "Client connected: $addr:$port\n";
331
332 # ...
333
334 }, sub {
335 warn "Error on accept: $!"
336 });
337
338=cut
339
340sub listen {
341 my ($socket, $c_cb, $e_cb) = @_;
342
343 fh_nonblocking ($socket, 1);
344
345 my $o =
346 AnyEvent::Util::SocketHandle->new (
347 fh => $socket,
348 client_cb => $c_cb,
349 error_cb => $e_cb
350 ); 443 : ()
351
352 $o->listen;
353
354 $o
355}
356
357package AnyEvent::Util::SocketHandle;
358use Errno qw/ETIMEDOUT/;
359use Socket;
360use Scalar::Util qw/weaken/;
361
362sub new {
363 my $this = shift;
364 my $class = ref($this) || $this;
365 my $self = { @_ };
366 bless $self, $class;
367
368 return $self
369}
370
371sub error {
372 my ($self) = @_;
373 delete $self->{con_w};
374 delete $self->{list_w};
375 delete $self->{tmout};
376 $self->{error_cb}->();
377}
378
379sub listen {
380 my ($self) = @_;
381
382 weaken $self;
383
384 $self->{list_w} =
385 AnyEvent->io (poll => 'r', fh => $self->{fh}, cb => sub {
386 my ($new_sock, $paddr) = $self->{fh}->accept ();
387
388 unless (defined $new_sock) {
389 $self->error;
390 return;
391 }
392
393 $self->{client_cb}->($new_sock, $paddr);
394 });
395}
396
397sub start_timeout {
398 my ($self) = @_;
399
400 if (defined $self->{timeout}) {
401 $self->{tmout} =
402 AnyEvent->timer (after => $self->{timeout}, cb => sub {
403 delete $self->{tmout};
404 $! = ETIMEDOUT;
405 $self->error;
406 $self->{timed_out} = 1;
407 });
408 }
409}
410
411sub connect {
412 my ($self) = @_;
413
414 weaken $self;
415
416 $self->start_timeout;
417
418 $self->{con_w} =
419 AnyEvent->io (poll => 'w', fh => $self->{fh}, cb => sub {
420 delete $self->{con_w};
421 delete $self->{tmout};
422
423 if ($! = $self->{fh}->sockopt (SO_ERROR)) {
424 $self->error;
425
426 } else {
427 $self->{connect_cb}->($self->{fh});
428 }
429 });
430} 444}
431 445
4321; 4461;
433 447
434=back 448=back

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines