ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Util.pm (file contents):
Revision 1.10 by elmex, Thu May 15 13:50:23 2008 UTC vs.
Revision 1.18 by root, Tue May 20 15:04:43 2008 UTC

3AnyEvent::Util - various utility functions. 3AnyEvent::Util - various utility functions.
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::Util; 7 use AnyEvent::Util;
8
9 inet_aton $name, $cb->($ipn || undef);
10 8
11=head1 DESCRIPTION 9=head1 DESCRIPTION
12 10
13This module implements various utility functions, mostly replacing 11This module implements various utility functions, mostly replacing
14well-known functions by event-ised counterparts. 12well-known functions by event-ised counterparts.
15 13
14All functions documented without C<AnyEvent::Util::> prefix are exported
15by default.
16
16=over 4 17=over 4
17 18
18=cut 19=cut
19 20
20package AnyEvent::Util; 21package AnyEvent::Util;
21 22
22use strict; 23use strict;
23 24
24no warnings "uninitialized"; 25no warnings "uninitialized";
25 26
26use Errno qw/ENXIO/; 27use Errno ();
27use Socket (); 28use Socket ();
28use IO::Socket::INET (); 29use IO::Socket::INET ();
29 30
30use AnyEvent; 31use AnyEvent;
31 32
32use base 'Exporter'; 33use base 'Exporter';
33 34
34#our @EXPORT = qw(gethostbyname gethostbyaddr); 35BEGIN {
35our @EXPORT_OK = qw(inet_aton); 36 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it
37}
38
39our @EXPORT = qw(inet_aton fh_nonblocking guard tcp_server tcp_connect);
36 40
37our $VERSION = '1.0'; 41our $VERSION = '1.0';
38 42
39our $MAXPARALLEL = 16; # max. number of parallel jobs 43our $MAXPARALLEL = 16; # max. number of parallel jobs
40 44
93 ($model eq "AnyEvent::Impl::EV" && eval { local $SIG{__DIE__}; require EV::ADNS }) 97 ($model eq "AnyEvent::Impl::EV" && eval { local $SIG{__DIE__}; require EV::ADNS })
94 ? 2 : 1 # so that || always detects as true 98 ? 2 : 1 # so that || always detects as true
95 }) - 1 # 2 => true, 1 => false 99 }) - 1 # 2 => true, 1 => false
96} 100}
97 101
98=item AnyEvent::Util::inet_aton $name_or_address, $cb->($binary_address_or_undef) 102=item inet_aton $name_or_address, $cb->(@addresses)
99 103
100Works almost exactly like its Socket counterpart, except that it uses a 104Works similarly to its Socket counterpart, except that it uses a
101callback. 105callback. Also, if a host has only an IPv6 address, this might be passed
106to the callback instead (use the length to detect this - 4 for IPv4, 16
107for IPv6).
108
109This function uses various shortcuts and will fall back to either
110L<EV::ADNS> or your systems C<inet_aton>.
111
112Unlike the L<Socket> function, you can get multiple IP addresses as result
113(currently only when EV::ADNS is being used).
102 114
103=cut 115=cut
104 116
105sub inet_aton { 117sub inet_aton {
106 my ($name, $cb) = @_; 118 my ($name, $cb) = @_;
107 119
108 if (&dotted_quad) { 120 if (&dotted_quad) {
109 $cb->(Socket::inet_aton $name); 121 $cb->(socket_inet_aton $name);
110 } elsif ($name eq "localhost") { # rfc2606 et al. 122 } elsif ($name eq "localhost") { # rfc2606 et al.
111 $cb->(v127.0.0.1); 123 $cb->(v127.0.0.1);
112 } elsif (&has_ev_adns) { 124 } elsif (&has_ev_adns) {
125 # work around some idiotic ands rfc readings
126 # rather hackish support for AAAA records (should
127 # wait for adns_getaddrinfo...)
128
129 my $loop = 10; # follow cname chains up to this length
130 my $qt;
131 my $acb; $acb = sub {
132 my ($status, undef, @a) = @_;
133
134 if ($status == &EV::ADNS::s_ok) {
135 if ($qt eq "a") {
136 return $cb->(map +(socket_inet_aton $_), @a);
137 } elsif ($qt eq "aaaa") {
138 return $cb->(@a);
139 } elsif ($qt eq "cname") {
140 $name = $a[0]; # there can only be one :)
141 $qt = "a";
113 EV::ADNS::submit ($name, &EV::ADNS::r_addr, 0, sub { 142 return EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
114 my (undef, undef, @a) = @_; 143 }
115 $cb->(@a ? Socket::inet_aton $a[0] : undef); 144 } elsif ($status == &EV::ADNS::s_prohibitedcname) {
145 # follow cname chains
146 if ($loop--) {
147 $qt = "cname";
148 return EV::ADNS::submit ($name, &EV::ADNS::r_cname, 0, $acb);
149 }
150 } elsif ($status == &EV::ADNS::s_nodata) {
151 if ($qt eq "a") {
152 # ask for raw AAAA (might not be a good method, but adns is too broken...)
153 $qt = "aaaa";
154 return EV::ADNS::submit ($name, &EV::ADNS::r_unknown | 28, 0, $acb);
155 }
156 }
157
158 $cb->();
116 }); 159 };
160
161 $qt = "a";
162 EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb);
117 } else { 163 } else {
118 _do_asy $cb, sub { Socket::inet_aton $_[0] }, @_; 164 _do_asy $cb, sub {
165 my $ipn = socket_inet_aton $_[0];
166 $ipn ? ($ipn) : ()
167 }, @_;
119 } 168 }
120} 169}
121 170
122=item AnyEvent::Util::fh_nonblocking $fh, $nonblocking 171=item fh_nonblocking $fh, $nonblocking
123 172
124Sets the blocking state of the given filehandle (true == nonblocking, 173Sets the blocking state of the given filehandle (true == nonblocking,
125false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on 174false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on
126broken (i.e. windows) platforms. 175broken (i.e. windows) platforms.
127 176
138 } else { 187 } else {
139 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0; 188 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
140 } 189 }
141} 190}
142 191
143=item AnyEvent::Util::connect ($socket, $connect_cb->($socket), $error_cb->()[, $timeout]) 192=item $guard = guard { CODE }
144 193
145Connects the socket C<$socket> non-blocking. C<$connect_cb> will be 194This function creates a special object that, when called, will execute the
146called when the socket was successfully connected and became writable, 195code block.
147the first argument to the C<$connect_cb> callback will be the C<$socket>
148itself.
149 196
150The blocking state of C<$socket> will be set to nonblocking via C<fh_nonblocking> (see 197This is often handy in continuation-passing style code to clean up some
151above). 198resource regardless of where you break out of a process.
152 199
153C<$error_cb> will be called when any error happened while connecting 200=cut
154the socket. C<$!> will be set to an appropriate error number.
155 201
156If C<$timeout> is given a timeout will be installed for the connect. If the 202sub AnyEvent::Util::Guard::DESTROY {
157timeout was reached the C<$error_cb> callback will be called and C<$!> is set to 203 ${$_[0]}->();
158C<ETIMEDOUT>. 204}
159 205
160The return value of C<connect> will be a guard object that you have to keep 206sub guard(&) {
161referenced until you are done with the connect or received an error. 207 bless \(my $cb = shift), AnyEvent::Util::Guard::
162If you let the object's reference drop to zero the internal connect and timeout 208}
163watchers will be removed.
164 209
165Here is a short example, which creates a socket and does a blocking DNS lookup via 210=item my $guard = AnyEvent::Util::tcp_connect $host, $port, $connect_cb[, $prepare_cb]
166L<IO::Socket::INET>:
167 211
168 my $sock = IO::Socket::INET->new ( 212This function is experimental.
169 PeerAddr => "www.google.com:80",
170 Blocking => 0,
171 ) or die "Couldn't make socket: $!\n";
172 213
173 my $hdl; 214This is a convenience function that creates a tcp socket and makes a 100%
215non-blocking connect to the given C<$host> (which can be a hostname or a
216textual IP address) and C<$port>.
174 217
175 my $watchobj = AnyEvent::Util::connect ($sock, sub { 218Unless called in void context, it returns a guard object that will
219automatically abort connecting when it gets destroyed (it does not do
220anything to the socket after the conenct was successful).
221
222If the connect is successful, then the C<$connect_cb> will be invoked with
223the socket filehandle (in non-blocking mode) as first and the peer host
224(as a textual IP address) and peer port as second and third arguments,
225respectively.
226
227If the connect is unsuccessful, then the C<$connect_cb> will be invoked
228without any arguments and C<$!> will be set appropriately (with C<ENXIO>
229indicating a dns resolution failure).
230
231The filehandle is suitable to be plugged into L<AnyEvent::Handle>, but can
232be used as a normal perl file handle as well.
233
234Sometimes you need to "prepare" the socket before connecting, for example,
235to C<bind> it to some port, or you want a specific connect timeout that
236is lower than your kernel's default timeout. In this case you can specify
237a second callback, C<$prepare_cb>. It will be called with the file handle
238in not-yet-connected state as only argument and must return the connection
239timeout value (or C<0>, C<undef> or the empty list to indicate the default
240timeout is to be used).
241
242Note that the socket could be either a IPv4 TCP socket or an IPv6 tcp
243socket (although only IPv4 is currently supported by this module).
244
245Simple Example: connect to localhost on port 22.
246
247 AnyEvent::Util::tcp_connect localhost => 22, sub {
248 my $fh = shift
249 or die "unable to connect: $!";
250 # do something
251 };
252
253Complex Example: connect to www.google.com on port 80 and make a simple
254GET request without much error handling. Also limit the connection timeout
255to 15 seconds.
256
257 AnyEvent::Util::tcp_connect "www.google.com", 80,
258 sub {
176 my ($sock) = @_; 259 my ($fh) = @_
260 or die "unable to connect: $!";
177 261
178 $hdl = 262 my $handle; # avoid direct assignment so on_eof has it in scope.
179 AnyEvent::Handle->new ( 263 $handle = new AnyEvent::Handle
180 fh => $sock, 264 fh => $fh,
181 on_eof => sub { 265 on_eof => sub {
182 print "received eof\n"; 266 undef $handle; # keep it alive till eof
183 undef $hdl 267 warn "done.\n";
184 } 268 };
185 );
186 269
187 $hdl->push_write ("GET / HTTP/1.0\015\012\015\012"); 270 $handle->push_write ("GET / HTTP/1.0\015\012\015\012");
188 271
189 $hdl->push_read_line (sub { 272 $handle->push_read_line ("\015\012\015\012", sub {
190 my ($hdl, $line) = @_; 273 my ($handle, $line) = @_;
191 print "Yay, got line: $line\n";
192 });
193 274
194 }, sub { 275 # print response header
195 warn "Got error on connect: $!\n"; 276 print "HEADER\n$line\n\nBODY\n";
196 }, 10);
197 277
198=cut 278 $handle->on_read (sub {
199 279 # print response body
200sub connect { 280 print $_[0]->rbuf;
201 my ($socket, $c_cb, $e_cb, $tout) = @_; 281 $_[0]->rbuf = "";
202
203 fh_nonblocking ($socket, 1);
204
205 my $o = AnyEvent::Util::SocketHandle->new (
206 fh => $socket,
207 connect_cb => $c_cb,
208 error_cb => $e_cb,
209 timeout => $tout,
210 );
211
212 $o->connect;
213
214 $o
215}
216
217=item AnyEvent::Util::tcp_connect ($host, $port, $connect_cb->($socket), $error_cb->()[, $timeout])
218
219This is a shortcut function which behaves similar to the C<connect> function
220described above, except that it does a C<AnyEvent::Util::inet_aton> on C<$host>
221and creates a L<IO::Socket::INET> TCP connection for you, which will be
222passed as C<$socket> argument to the C<$connect_cb> callback above.
223
224In case the hostname couldn't be resolved C<$error_cb> will be called and C<$!>
225will be set to C<ENXIO>.
226
227For more details about the return value and the arguments see the C<connect>
228function above.
229
230Here is a short example:
231
232
233 my $hdl;
234 my $watchobj = AnyEvent::Util::tcp_connect ("www.google.com", 80, sub {
235 my ($sock) = @_;
236
237 $hdl =
238 AnyEvent::Handle->new (
239 fh => $sock,
240 on_eof => sub {
241 print "received eof\n";
242 undef $hdl
243 }
244 );
245
246 $hdl->push_write ("GET / HTTP/1.0\015\012\015\012");
247
248 $hdl->push_read_line (sub {
249 my ($hdl, $line) = @_;
250 print "Yay, got line: $line\n";
251 });
252
253 }, sub {
254 warn "Got error on connect: $!\n";
255 }, 10);
256
257=cut
258
259sub tcp_connect {
260 my ($host, $port, $c_cb, $e_cb, $tout, %sockargs) = @_;
261
262 my $o = AnyEvent::Util::SocketHandle->new (
263 connect_cb => $c_cb,
264 error_cb => $e_cb,
265 timeout => $tout,
266 );
267
268 $o->start_timeout;
269
270 AnyEvent::Util::inet_aton ($host, sub {
271 my ($addr) = @_;
272
273 return if $o->{timed_out};
274
275 if ($addr) {
276 my $sock =
277 IO::Socket::INET->new (
278 PeerHost => Socket::inet_ntoa ($addr),
279 PeerPort => $port,
280 Blocking => 0,
281 %sockargs
282 ); 282 });
283 });
284 }, sub {
285 my ($fh) = @_;
286 # could call $fh->bind etc. here
283 287
284 unless ($sock) { 288 15
285 $o->error; 289 };
290
291=cut
292
293sub tcp_connect($$$;$) {
294 my ($host, $port, $connect, $prepare) = @_;
295
296 # see http://cr.yp.to/docs/connect.html for some background
297
298 my %state = ( fh => undef );
299
300 # name resolution
301 inet_aton $host, sub {
302 return unless exists $state{fh};
303
304 my $ipn = shift;
305
306 4 == length $ipn
307 or do {
308 %state = ();
309 $! = &Errno::ENXIO;
310 return $connect->();
311 };
312
313 # socket creation
314 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
315 or do {
316 %state = ();
317 return $connect->();
318 };
319
320 fh_nonblocking $state{fh}, 1;
321
322 # prepare and optional timeout
323 if ($prepare) {
324 my $timeout = $prepare->($state{fh});
325
326 $state{to} = AnyEvent->timer (after => $timeout, cb => sub {
327 %state = ();
328 $! = &Errno::ETIMEDOUT;
329 $connect->();
330 }) if $timeout;
331 }
332
333 # called when the connect was successful, which,
334 # in theory, could be the case immediately (but never is in practise)
335 my $connected = sub {
336 my $fh = delete $state{fh};
337 %state = ();
338
339 # we are connected, or maybe there was an error
340 if (my $sin = getpeername $fh) {
341 my ($port, $host) = Socket::unpack_sockaddr_in $sin;
342 $connect->($fh, (Socket::inet_ntoa $host), $port);
343 } else {
344 # dummy read to fetch real error code
345 sysread $fh, my $buf, 1 if $! == &Errno::ENOTCONN;
346 $connect->();
286 } 347 }
348 };
287 349
288 fh_nonblocking ($sock, 1); 350 # now connect
289 351 if (connect $state{fh}, Socket::pack_sockaddr_in $port, $ipn) {
290 $o->{fh} = $sock;
291
292 $o->connect; 352 $connected->();
293 353 } elsif ($! == &Errno::EINPROGRESS || $! == &Errno::EWOULDBLOCK) { # EINPROGRESS is POSIX
354 $state{ww} = AnyEvent->io (fh => $state{fh}, poll => 'w', cb => $connected);
294 } else { 355 } else {
295 $! = ENXIO; 356 %state = ();
296 $o->error; 357 $connect->();
358 }
359 };
360
361 defined wantarray
362 ? guard { %state = () } # break any circular dependencies and unregister watchers
363 : ()
364}
365
366=item $guard = AnyEvent::Util::tcp_server $host, $port, $accept_cb[, $prepare_cb]
367
368This function is experimental.
369
370Create and bind a tcp socket to the given host (any IPv4 host if undef,
371otherwise it must be an IPv4 or IPv6 address) and port (or an ephemeral
372port if given as zero or undef), set the SO_REUSEADDR flag and call
373C<listen>.
374
375For each new connection that could be C<accept>ed, call the C<$accept_cb>
376with the filehandle (in non-blocking mode) as first and the peer host and
377port as second and third arguments (see C<tcp_connect> for details).
378
379Croaks on any errors.
380
381If called in non-void context, then this function returns a guard object
382whose lifetime it tied to the tcp server: If the object gets destroyed,
383the server will be stopped (but existing accepted connections will
384continue).
385
386If you need more control over the listening socket, you can provide a
387C<$prepare_cb>, which is called just before the C<listen ()> call, with
388the listen file handle as first argument.
389
390It should return the length of the listen queue (or C<0> for the default).
391
392Example: bind on tcp port 8888 on the local machine and tell each client
393to go away.
394
395 AnyEvent::Util::tcp_server undef, 8888, sub {
396 my ($fh, $host, $port) = @_;
397
398 syswrite $fh, "The internet is full, $host:$port. Go away!\015\012";
399 };
400
401=cut
402
403sub tcp_server($$$;$) {
404 my ($host, $port, $accept, $prepare) = @_;
405
406 my %state;
407
408 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
409 or Carp::croak "socket: $!";
410
411 setsockopt $state{fh}, &Socket::SOL_SOCKET, &Socket::SO_REUSEADDR, 1
412 or Carp::croak "so_reuseaddr: $!";
413
414 bind $state{fh}, Socket::pack_sockaddr_in $port, socket_inet_aton ($host || "0.0.0.0")
415 or Carp::croak "bind: $!";
416
417 fh_nonblocking $state{fh}, 1;
418
419 my $len = ($prepare && $prepare->($state{fh})) || 128;
420
421 listen $state{fh}, $len
422 or Carp::croak "listen: $!";
423
424 $state{aw} = AnyEvent->io (fh => $state{fh}, poll => 'r', cb => sub {
425 # this closure keeps $state alive
426 while (my $peer = accept my $fh, $state{fh}) {
427 fh_nonblocking $fh, 1; # POSIX requires inheritance, the outside world does not
428 my ($port, $host) = Socket::unpack_sockaddr_in $peer;
429 $accept->($fh, (Socket::inet_ntoa $host), $port);
297 } 430 }
298 }); 431 });
299 432
300 $o 433 defined wantarray
301} 434 ? guard { %state = () } # clear fh and watcher, which breaks the circular dependency
302
303=item AnyEvent::Util::listen ($socket, $client_cb->($new_socket, $peer_ad), $error_cb->())
304
305This will listen and accept new connections on the C<$socket> in a non-blocking
306way. The callback C<$client_cb> will be called when a new client connection
307was accepted and the callback C<$error_cb> will be called in case of an error.
308C<$!> will be set to an approriate error number.
309
310The blocking state of C<$socket> will be set to nonblocking via C<fh_nonblocking> (see
311above).
312
313The first argument to C<$client_cb> will be the socket of the accepted client
314and the second argument the peer address.
315
316The return value is a guard object that you have to keep referenced as long as you
317want to accept new connections.
318
319Here is an example usage:
320
321 my $sock = IO::Socket::INET->new (
322 Listen => 5
323 ) or die "Couldn't make socket: $!\n";
324
325 my $watchobj = AnyEvent::Util::listen ($sock, sub {
326 my ($cl_sock, $cl_addr) = @_;
327
328 my ($port, $addr) = sockaddr_in ($cl_addr);
329 $addr = inet_ntoa ($addr);
330 print "Client connected: $addr:$port\n";
331
332 # ...
333
334 }, sub {
335 warn "Error on accept: $!"
336 });
337
338=cut
339
340sub listen {
341 my ($socket, $c_cb, $e_cb) = @_;
342
343 fh_nonblocking ($socket, 1);
344
345 my $o =
346 AnyEvent::Util::SocketHandle->new (
347 fh => $socket,
348 client_cb => $c_cb,
349 error_cb => $e_cb
350 ); 435 : ()
351
352 $o->listen;
353
354 $o
355}
356
357package AnyEvent::Util::SocketHandle;
358use Errno qw/ETIMEDOUT/;
359use Socket;
360use Scalar::Util qw/weaken/;
361
362sub new {
363 my $this = shift;
364 my $class = ref($this) || $this;
365 my $self = { @_ };
366 bless $self, $class;
367
368 return $self
369}
370
371sub error {
372 my ($self) = @_;
373 delete $self->{con_w};
374 delete $self->{list_w};
375 delete $self->{tmout};
376 $self->{error_cb}->();
377}
378
379sub listen {
380 my ($self) = @_;
381
382 weaken $self;
383
384 $self->{list_w} =
385 AnyEvent->io (poll => 'r', fh => $self->{fh}, cb => sub {
386 my ($new_sock, $paddr) = $self->{fh}->accept ();
387
388 unless (defined $new_sock) {
389 $self->error;
390 return;
391 }
392
393 $self->{client_cb}->($new_sock, $paddr);
394 });
395}
396
397sub start_timeout {
398 my ($self) = @_;
399
400 if (defined $self->{timeout}) {
401 $self->{tmout} =
402 AnyEvent->timer (after => $self->{timeout}, cb => sub {
403 delete $self->{tmout};
404 $! = ETIMEDOUT;
405 $self->error;
406 $self->{timed_out} = 1;
407 });
408 }
409}
410
411sub connect {
412 my ($self) = @_;
413
414 weaken $self;
415
416 $self->start_timeout;
417
418 $self->{con_w} =
419 AnyEvent->io (poll => 'w', fh => $self->{fh}, cb => sub {
420 delete $self->{con_w};
421 delete $self->{tmout};
422
423 if ($! = $self->{fh}->sockopt (SO_ERROR)) {
424 $self->error;
425
426 } else {
427 $self->{connect_cb}->($self->{fh});
428 }
429 });
430} 436}
431 437
4321; 4381;
433 439
434=back 440=back

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines