ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Util.pm (file contents):
Revision 1.10 by elmex, Thu May 15 13:50:23 2008 UTC vs.
Revision 1.13 by root, Sat May 17 19:39:33 2008 UTC

21 21
22use strict; 22use strict;
23 23
24no warnings "uninitialized"; 24no warnings "uninitialized";
25 25
26use Errno qw/ENXIO/; 26use Errno;
27use Socket (); 27use Socket ();
28use IO::Socket::INET (); 28use IO::Socket::INET ();
29 29
30use AnyEvent; 30use AnyEvent;
31 31
138 } else { 138 } else {
139 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0; 139 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
140 } 140 }
141} 141}
142 142
143=item AnyEvent::Util::connect ($socket, $connect_cb->($socket), $error_cb->()[, $timeout]) 143sub AnyEvent::Util::Guard::DESTROY {
144 ${$_[0]}->();
145}
144 146
145Connects the socket C<$socket> non-blocking. C<$connect_cb> will be 147=item $guard = AnyEvent::Util::guard { CODE }
146called when the socket was successfully connected and became writable,
147the first argument to the C<$connect_cb> callback will be the C<$socket>
148itself.
149 148
150The blocking state of C<$socket> will be set to nonblocking via C<fh_nonblocking> (see 149This function creates a special object that, when called, will execute the
151above). 150code block.
152 151
153C<$error_cb> will be called when any error happened while connecting 152This is often handy in continuation-passing style code to clean up some
154the socket. C<$!> will be set to an appropriate error number. 153resource regardless of where you break out of a process.
155 154
156If C<$timeout> is given a timeout will be installed for the connect. If the 155=cut
157timeout was reached the C<$error_cb> callback will be called and C<$!> is set to
158C<ETIMEDOUT>.
159 156
160The return value of C<connect> will be a guard object that you have to keep 157sub guard(&) {
161referenced until you are done with the connect or received an error. 158 bless \(my $cb = shift), AnyEvent::Util::Guard::
162If you let the object's reference drop to zero the internal connect and timeout 159}
163watchers will be removed.
164 160
165Here is a short example, which creates a socket and does a blocking DNS lookup via 161=item my $guard = AnyEvent::Util::tcp_connect $host, $port, $connect_cb[, $prepare_cb]
166L<IO::Socket::INET>:
167 162
168 my $sock = IO::Socket::INET->new ( 163This is a convenience function that creates a tcp socket and makes a 100%
169 PeerAddr => "www.google.com:80", 164non-blocking connect to the given C<$host> (which can be a hostname or a
170 Blocking => 0, 165textual IP address) and C<$port>.
171 ) or die "Couldn't make socket: $!\n";
172 166
173 my $hdl; 167Unless called in void context, it returns a guard object that will
168automatically abort connecting when it gets destroyed (it does not do
169anything to the socket after the conenct was successful).
174 170
175 my $watchobj = AnyEvent::Util::connect ($sock, sub { 171If the connect is successful, then the C<$connect_cb> will be invoked with
172the socket filehandle (in non-blocking mode) as first and the peer host
173(as a textual IP address) and peer port as second and third arguments,
174respectively.
175
176If the connect is unsuccessful, then the C<$connect_cb> will be invoked
177without any arguments and C<$!> will be set appropriately (with C<ENXIO>
178indicating a dns resolution failure).
179
180The filehandle is suitable to be plugged into L<AnyEvent::Handle>, but can
181be used as a normal perl file handle as well.
182
183Sometimes you need to "prepare" the socket before connecting, for example,
184to C<bind> it to some port, or you want a specific connect timeout that
185is lower than your kernel's default timeout. In this case you can specify
186a second callback, C<$prepare_cb>. It will be called with the file handle
187in not-yet-connected state as only argument and must return the connection
188timeout value (or C<0>, C<undef> or the empty list to indicate the default
189timeout is to be used).
190
191Note that the socket could be either a IPv4 TCP socket or an IPv6 tcp
192socket (although only IPv4 is currently supported by this module).
193
194Simple Example: connect to localhost on port 22.
195
196 AnyEvent::Util::tcp_connect localhost => 22, sub {
197 my $fh = shift
198 or die "unable to connect: $!";
199 # do something
200 };
201
202Complex Example: connect to www.google.com on port 80 and make a simple
203GET request without much error handling. Also limit the connection timeout
204to 15 seconds.
205
206 AnyEvent::Util::tcp_connect "www.google.com", 80,
207 sub {
176 my ($sock) = @_; 208 my ($fh) = @_
209 or die "unable to connect: $!";
177 210
178 $hdl = 211 my $handle; # avoid direct assignment so on_eof has it in scope.
179 AnyEvent::Handle->new ( 212 $handle = new AnyEvent::Handle
180 fh => $sock, 213 fh => $fh,
181 on_eof => sub { 214 on_eof => sub {
182 print "received eof\n"; 215 undef $handle; # keep it alive till eof
183 undef $hdl 216 warn "done.\n";
184 } 217 };
185 );
186 218
187 $hdl->push_write ("GET / HTTP/1.0\015\012\015\012"); 219 $handle->push_write ("GET / HTTP/1.0\015\012\015\012");
188 220
189 $hdl->push_read_line (sub { 221 $handle->push_read_line ("\015\012\015\012", sub {
190 my ($hdl, $line) = @_; 222 my ($handle, $line) = @_;
191 print "Yay, got line: $line\n";
192 });
193 223
194 }, sub { 224 # print response header
195 warn "Got error on connect: $!\n"; 225 print "HEADER\n$line\n\nBODY\n";
196 }, 10);
197 226
198=cut 227 $handle->on_read (sub {
199 228 # print response body
200sub connect { 229 print $_[0]->rbuf;
201 my ($socket, $c_cb, $e_cb, $tout) = @_; 230 $_[0]->rbuf = "";
202
203 fh_nonblocking ($socket, 1);
204
205 my $o = AnyEvent::Util::SocketHandle->new (
206 fh => $socket,
207 connect_cb => $c_cb,
208 error_cb => $e_cb,
209 timeout => $tout,
210 );
211
212 $o->connect;
213
214 $o
215}
216
217=item AnyEvent::Util::tcp_connect ($host, $port, $connect_cb->($socket), $error_cb->()[, $timeout])
218
219This is a shortcut function which behaves similar to the C<connect> function
220described above, except that it does a C<AnyEvent::Util::inet_aton> on C<$host>
221and creates a L<IO::Socket::INET> TCP connection for you, which will be
222passed as C<$socket> argument to the C<$connect_cb> callback above.
223
224In case the hostname couldn't be resolved C<$error_cb> will be called and C<$!>
225will be set to C<ENXIO>.
226
227For more details about the return value and the arguments see the C<connect>
228function above.
229
230Here is a short example:
231
232
233 my $hdl;
234 my $watchobj = AnyEvent::Util::tcp_connect ("www.google.com", 80, sub {
235 my ($sock) = @_;
236
237 $hdl =
238 AnyEvent::Handle->new (
239 fh => $sock,
240 on_eof => sub {
241 print "received eof\n";
242 undef $hdl
243 }
244 );
245
246 $hdl->push_write ("GET / HTTP/1.0\015\012\015\012");
247
248 $hdl->push_read_line (sub {
249 my ($hdl, $line) = @_;
250 print "Yay, got line: $line\n";
251 });
252
253 }, sub {
254 warn "Got error on connect: $!\n";
255 }, 10);
256
257=cut
258
259sub tcp_connect {
260 my ($host, $port, $c_cb, $e_cb, $tout, %sockargs) = @_;
261
262 my $o = AnyEvent::Util::SocketHandle->new (
263 connect_cb => $c_cb,
264 error_cb => $e_cb,
265 timeout => $tout,
266 );
267
268 $o->start_timeout;
269
270 AnyEvent::Util::inet_aton ($host, sub {
271 my ($addr) = @_;
272
273 return if $o->{timed_out};
274
275 if ($addr) {
276 my $sock =
277 IO::Socket::INET->new (
278 PeerHost => Socket::inet_ntoa ($addr),
279 PeerPort => $port,
280 Blocking => 0,
281 %sockargs
282 ); 231 });
232 });
233 }, sub {
234 my ($fh) = @_;
235 # could call $fh->bind etc. here
283 236
284 unless ($sock) { 237 15
285 $o->error; 238 };
239
240=cut
241
242sub tcp_connect($$$;$) {
243 my ($host, $port, $connect, $prepare) = @_;
244
245 # see http://cr.yp.to/docs/connect.html for some background
246
247 my %state = ( fh => undef );
248
249 # name resolution
250 inet_aton $host, sub {
251 return unless exists $state{fh};
252
253 my $ipn = shift
254 or do {
255 %state = ();
256 $! = &Errno::ENXIO;
257 return $connect->();
258 };
259
260 # socket creation
261 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
262 or do {
263 %state = ();
264 return $connect->();
265 };
266
267 fh_nonblocking $state{fh}, 1;
268
269 # prepare and optional timeout
270 if ($prepare) {
271 my $timeout = $prepare->($state{fh});
272
273 $state{to} = AnyEvent->timer (after => $timeout, cb => sub {
274 %state = ();
275 $! = &Errno::ETIMEDOUT;
276 $connect->();
277 }) if $timeout;
278 }
279
280 # called when the connect was successful, which,
281 # in theory, could be the case immediately (but never is in practise)
282 my $connected = sub {
283 my $fh = delete $state{fh};
284 %state = ();
285
286 # we are connected, or maybe there was an error
287 if (my $sin = getpeername $fh) {
288 my ($port, $host) = Socket::unpack_sockaddr_in $sin;
289 $connect->($fh, (Socket::inet_ntoa $host), $port);
290 } else {
291 # dummy read to fetch real error code
292 sysread $fh, my $buf, 1;
293 $connect->();
286 } 294 }
295 };
287 296
288 fh_nonblocking ($sock, 1); 297 # now connect
289 298 if (connect $state{fh}, Socket::pack_sockaddr_in $port, $ipn) {
290 $o->{fh} = $sock;
291
292 $o->connect; 299 $connected->();
293 300 } elsif ($! == &Errno::EINPROGRESS || $! == &Errno::EWOULDBLOCK) { # EINPROGRESS is POSIX
301 $state{ww} = AnyEvent->io (fh => $state{fh}, poll => 'w', cb => $connected);
294 } else { 302 } else {
295 $! = ENXIO; 303 %state = ();
296 $o->error; 304 $connect->();
305 }
306 };
307
308 defined wantarray
309 ? guard { %state = () } # break any circular dependencies and unregister watchers
310 : ()
311}
312
313=item $guard = AnyEvent::Util::tcp_server $host, $port, $accept_cb[, $prepare_cb]
314
315Create and bind a tcp socket to the given host (any IPv4 host if undef,
316otherwise it must be an IPv4 or IPv6 address) and port (or an ephemeral
317port if given as zero or undef), set the SO_REUSEADDR flag and call
318C<listen>.
319
320For each new connection that could be C<accept>ed, call the C<$accept_cb>
321with the filehandle (in non-blocking mode) as first and the peer host and
322port as second and third arguments (see C<tcp_connect> for details).
323
324Croaks on any errors.
325
326If called in non-void context, then this function returns a guard object
327whose lifetime it tied to the tcp server: If the object gets destroyed,
328the server will be stopped (but existing accepted connections will
329continue).
330
331If you need more control over the listening socket, you can provide a
332C<$prepare_cb>, which is called just before the C<listen ()> call, with
333the listen file handle as first argument.
334
335It should return the length of the listen queue (or C<0> for the default).
336
337Example: bind on tcp port 8888 on the local machine and tell each client
338to go away.
339
340 AnyEvent::Util::tcp_server undef, 8888, sub {
341 my ($fh, $host, $port) = @_;
342
343 syswrite $fh, "The internet is full, $host:$port. Go away!\015\012";
344 };
345
346=cut
347
348sub tcp_server($$$;$) {
349 my ($host, $port, $accept, $prepare) = @_;
350
351 my %state;
352
353 socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0
354 or Carp::croak "socket: $!";
355
356 setsockopt $state{fh}, &Socket::SOL_SOCKET, &Socket::SO_REUSEADDR, 1
357 or Carp::croak "so_reuseaddr: $!";
358
359 bind $state{fh}, Socket::pack_sockaddr_in $port, Socket::inet_aton ($host || "0.0.0.0")
360 or Carp::croak "bind: $!";
361
362 fh_nonblocking $state{fh}, 1;
363
364 my $len = ($prepare && $prepare->($state{fh})) || 128;
365
366 listen $state{fh}, $len
367 or Carp::croak "listen: $!";
368
369 $state{aw} = AnyEvent->io (fh => $state{fh}, poll => 'r', cb => sub {
370 # this closure keeps $state alive
371 while (my $peer = accept my $fh, $state{fh}) {
372 fh_nonblocking $fh, 1; # POSIX requires inheritance, the outside world does not
373 my ($port, $host) = Socket::unpack_sockaddr_in $peer;
374 $accept->($fh, (Socket::inet_ntoa $host), $port);
297 } 375 }
298 }); 376 });
299 377
300 $o 378 defined wantarray
301} 379 ? guard { %state = () } # clear fh and watcher, which breaks the circular dependency
302
303=item AnyEvent::Util::listen ($socket, $client_cb->($new_socket, $peer_ad), $error_cb->())
304
305This will listen and accept new connections on the C<$socket> in a non-blocking
306way. The callback C<$client_cb> will be called when a new client connection
307was accepted and the callback C<$error_cb> will be called in case of an error.
308C<$!> will be set to an approriate error number.
309
310The blocking state of C<$socket> will be set to nonblocking via C<fh_nonblocking> (see
311above).
312
313The first argument to C<$client_cb> will be the socket of the accepted client
314and the second argument the peer address.
315
316The return value is a guard object that you have to keep referenced as long as you
317want to accept new connections.
318
319Here is an example usage:
320
321 my $sock = IO::Socket::INET->new (
322 Listen => 5
323 ) or die "Couldn't make socket: $!\n";
324
325 my $watchobj = AnyEvent::Util::listen ($sock, sub {
326 my ($cl_sock, $cl_addr) = @_;
327
328 my ($port, $addr) = sockaddr_in ($cl_addr);
329 $addr = inet_ntoa ($addr);
330 print "Client connected: $addr:$port\n";
331
332 # ...
333
334 }, sub {
335 warn "Error on accept: $!"
336 });
337
338=cut
339
340sub listen {
341 my ($socket, $c_cb, $e_cb) = @_;
342
343 fh_nonblocking ($socket, 1);
344
345 my $o =
346 AnyEvent::Util::SocketHandle->new (
347 fh => $socket,
348 client_cb => $c_cb,
349 error_cb => $e_cb
350 ); 380 : ()
351
352 $o->listen;
353
354 $o
355}
356
357package AnyEvent::Util::SocketHandle;
358use Errno qw/ETIMEDOUT/;
359use Socket;
360use Scalar::Util qw/weaken/;
361
362sub new {
363 my $this = shift;
364 my $class = ref($this) || $this;
365 my $self = { @_ };
366 bless $self, $class;
367
368 return $self
369}
370
371sub error {
372 my ($self) = @_;
373 delete $self->{con_w};
374 delete $self->{list_w};
375 delete $self->{tmout};
376 $self->{error_cb}->();
377}
378
379sub listen {
380 my ($self) = @_;
381
382 weaken $self;
383
384 $self->{list_w} =
385 AnyEvent->io (poll => 'r', fh => $self->{fh}, cb => sub {
386 my ($new_sock, $paddr) = $self->{fh}->accept ();
387
388 unless (defined $new_sock) {
389 $self->error;
390 return;
391 }
392
393 $self->{client_cb}->($new_sock, $paddr);
394 });
395}
396
397sub start_timeout {
398 my ($self) = @_;
399
400 if (defined $self->{timeout}) {
401 $self->{tmout} =
402 AnyEvent->timer (after => $self->{timeout}, cb => sub {
403 delete $self->{tmout};
404 $! = ETIMEDOUT;
405 $self->error;
406 $self->{timed_out} = 1;
407 });
408 }
409}
410
411sub connect {
412 my ($self) = @_;
413
414 weaken $self;
415
416 $self->start_timeout;
417
418 $self->{con_w} =
419 AnyEvent->io (poll => 'w', fh => $self->{fh}, cb => sub {
420 delete $self->{con_w};
421 delete $self->{tmout};
422
423 if ($! = $self->{fh}->sockopt (SO_ERROR)) {
424 $self->error;
425
426 } else {
427 $self->{connect_cb}->($self->{fh});
428 }
429 });
430} 381}
431 382
4321; 3831;
433 384
434=back 385=back

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines