ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent/lib/AnyEvent/Util.pm
(Generate patch)

Comparing AnyEvent/lib/AnyEvent/Util.pm (file contents):
Revision 1.10 by elmex, Thu May 15 13:50:23 2008 UTC vs.
Revision 1.32 by root, Mon May 26 05:09:53 2008 UTC

4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::Util; 7 use AnyEvent::Util;
8 8
9 inet_aton $name, $cb->($ipn || undef);
10
11=head1 DESCRIPTION 9=head1 DESCRIPTION
12 10
13This module implements various utility functions, mostly replacing 11This module implements various utility functions, mostly replacing
14well-known functions by event-ised counterparts. 12well-known functions by event-ised counterparts.
13
14All functions documented without C<AnyEvent::Util::> prefix are exported
15by default.
15 16
16=over 4 17=over 4
17 18
18=cut 19=cut
19 20
20package AnyEvent::Util; 21package AnyEvent::Util;
21 22
23no warnings;
22use strict; 24use strict;
23 25
24no warnings "uninitialized"; 26use Carp ();
27use Errno ();
28use Socket ();
25 29
26use Errno qw/ENXIO/; 30use AnyEvent qw(WIN32);
27use Socket ();
28use IO::Socket::INET ();
29
30use AnyEvent;
31 31
32use base 'Exporter'; 32use base 'Exporter';
33 33
34#our @EXPORT = qw(gethostbyname gethostbyaddr); 34BEGIN {
35our @EXPORT_OK = qw(inet_aton); 35 *socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it
36}
37
38
39BEGIN {
40 my $af_inet6 = eval { local $SIG{__DIE__}; &Socket::AF_INET6 };
41
42 # uhoh
43 $af_inet6 ||= 10 if $^O =~ /linux/;
44 $af_inet6 ||= 23 if $^O =~ /cygwin/i;
45 $af_inet6 ||= 23 if WIN32;
46 $af_inet6 ||= 24 if $^O =~ /openbsd|netbsd/;
47 $af_inet6 ||= 28 if $^O =~ /freebsd/;
48
49 $af_inet6 && socket my $ipv6_socket, $af_inet6, &Socket::SOCK_STREAM, 0 # check if they can be created
50 or $af_inet6 = 0;
51
52 eval "sub AF_INET6() { $af_inet6 }"; die if $@;
53
54 delete $AnyEvent::PROTOCOL{ipv6} unless $af_inet6;
55}
56
57BEGIN {
58 # broken windows perls use undocumented error codes...
59 if (WIN32) {
60 eval "sub WSAWOULDBLOCK() { 10035 }";
61 eval "sub WSAEINPROGRESS() { 10036 }";
62 } else {
63 eval "sub WSAWOULDBLOCK() { -1e99 }"; # should never match any errno value
64 eval "sub WSAEINPROGRESS() { -1e99 }"; # should never match any errno value
65 }
66}
67
68our @EXPORT = qw(fh_nonblocking guard);
69our @EXPORT_OK = qw(AF_INET6 WSAWOULDBLOCK WSAEINPROGRESS);
36 70
37our $VERSION = '1.0'; 71our $VERSION = '1.0';
38 72
39our $MAXPARALLEL = 16; # max. number of parallel jobs 73our $MAXPARALLEL = 16; # max. number of parallel jobs
40 74
76sub _do_asy { 110sub _do_asy {
77 push @queue, [@_]; 111 push @queue, [@_];
78 _schedule; 112 _schedule;
79} 113}
80 114
115# to be removed
81sub dotted_quad($) { 116sub dotted_quad($) {
82 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 117 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
83 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 118 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
84 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) 119 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
85 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x 120 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
86} 121}
87 122
88my $has_ev_adns; 123# just a forwarder
89 124sub inet_aton {
90sub has_ev_adns { 125 require AnyEvent::Socket;
91 ($has_ev_adns ||= do { 126 *inet_aton = \&AnyEvent::Socket::inet_aton;
92 my $model = AnyEvent::detect; 127 goto &inet_aton
93 ($model eq "AnyEvent::Impl::EV" && eval { local $SIG{__DIE__}; require EV::ADNS })
94 ? 2 : 1 # so that || always detects as true
95 }) - 1 # 2 => true, 1 => false
96} 128}
97 129
98=item AnyEvent::Util::inet_aton $name_or_address, $cb->($binary_address_or_undef)
99
100Works almost exactly like its Socket counterpart, except that it uses a
101callback.
102
103=cut
104
105sub inet_aton {
106 my ($name, $cb) = @_;
107
108 if (&dotted_quad) {
109 $cb->(Socket::inet_aton $name);
110 } elsif ($name eq "localhost") { # rfc2606 et al.
111 $cb->(v127.0.0.1);
112 } elsif (&has_ev_adns) {
113 EV::ADNS::submit ($name, &EV::ADNS::r_addr, 0, sub {
114 my (undef, undef, @a) = @_;
115 $cb->(@a ? Socket::inet_aton $a[0] : undef);
116 });
117 } else {
118 _do_asy $cb, sub { Socket::inet_aton $_[0] }, @_;
119 }
120}
121
122=item AnyEvent::Util::fh_nonblocking $fh, $nonblocking 130=item fh_nonblocking $fh, $nonblocking
123 131
124Sets the blocking state of the given filehandle (true == nonblocking, 132Sets the blocking state of the given filehandle (true == nonblocking,
125false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on 133false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on
126broken (i.e. windows) platforms. 134broken (i.e. windows) platforms.
127 135
130sub fh_nonblocking($$) { 138sub fh_nonblocking($$) {
131 my ($fh, $nb) = @_; 139 my ($fh, $nb) = @_;
132 140
133 require Fcntl; 141 require Fcntl;
134 142
135 if ($^O eq "MSWin32") { 143 if (WIN32) {
136 $nb = (! ! $nb) + 0; 144 $nb = (! ! $nb) + 0;
137 ioctl $fh, 0x8004667e, \$nb; # FIONBIO 145 ioctl $fh, 0x8004667e, \$nb; # FIONBIO
138 } else { 146 } else {
139 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0; 147 fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0;
140 } 148 }
141} 149}
142 150
143=item AnyEvent::Util::connect ($socket, $connect_cb->($socket), $error_cb->()[, $timeout]) 151=item $guard = guard { CODE }
144 152
145Connects the socket C<$socket> non-blocking. C<$connect_cb> will be 153This function creates a special object that, when called, will execute the
146called when the socket was successfully connected and became writable, 154code block.
147the first argument to the C<$connect_cb> callback will be the C<$socket>
148itself.
149 155
150The blocking state of C<$socket> will be set to nonblocking via C<fh_nonblocking> (see 156This is often handy in continuation-passing style code to clean up some
151above). 157resource regardless of where you break out of a process.
152 158
153C<$error_cb> will be called when any error happened while connecting 159You can call one method on the returned object:
154the socket. C<$!> will be set to an appropriate error number.
155 160
156If C<$timeout> is given a timeout will be installed for the connect. If the 161=item $guard->cancel
157timeout was reached the C<$error_cb> callback will be called and C<$!> is set to
158C<ETIMEDOUT>.
159 162
160The return value of C<connect> will be a guard object that you have to keep 163This simply causes the code block not to be invoked: it "cancels" the
161referenced until you are done with the connect or received an error. 164guard.
162If you let the object's reference drop to zero the internal connect and timeout
163watchers will be removed.
164
165Here is a short example, which creates a socket and does a blocking DNS lookup via
166L<IO::Socket::INET>:
167
168 my $sock = IO::Socket::INET->new (
169 PeerAddr => "www.google.com:80",
170 Blocking => 0,
171 ) or die "Couldn't make socket: $!\n";
172
173 my $hdl;
174
175 my $watchobj = AnyEvent::Util::connect ($sock, sub {
176 my ($sock) = @_;
177
178 $hdl =
179 AnyEvent::Handle->new (
180 fh => $sock,
181 on_eof => sub {
182 print "received eof\n";
183 undef $hdl
184 }
185 );
186
187 $hdl->push_write ("GET / HTTP/1.0\015\012\015\012");
188
189 $hdl->push_read_line (sub {
190 my ($hdl, $line) = @_;
191 print "Yay, got line: $line\n";
192 });
193
194 }, sub {
195 warn "Got error on connect: $!\n";
196 }, 10);
197 165
198=cut 166=cut
199 167
200sub connect { 168sub AnyEvent::Util::Guard::DESTROY {
201 my ($socket, $c_cb, $e_cb, $tout) = @_; 169 ${$_[0]}->();
202
203 fh_nonblocking ($socket, 1);
204
205 my $o = AnyEvent::Util::SocketHandle->new (
206 fh => $socket,
207 connect_cb => $c_cb,
208 error_cb => $e_cb,
209 timeout => $tout,
210 );
211
212 $o->connect;
213
214 $o
215} 170}
216 171
217=item AnyEvent::Util::tcp_connect ($host, $port, $connect_cb->($socket), $error_cb->()[, $timeout]) 172sub AnyEvent::Util::Guard::cancel($) {
218 173 ${$_[0]} = sub { };
219This is a shortcut function which behaves similar to the C<connect> function
220described above, except that it does a C<AnyEvent::Util::inet_aton> on C<$host>
221and creates a L<IO::Socket::INET> TCP connection for you, which will be
222passed as C<$socket> argument to the C<$connect_cb> callback above.
223
224In case the hostname couldn't be resolved C<$error_cb> will be called and C<$!>
225will be set to C<ENXIO>.
226
227For more details about the return value and the arguments see the C<connect>
228function above.
229
230Here is a short example:
231
232
233 my $hdl;
234 my $watchobj = AnyEvent::Util::tcp_connect ("www.google.com", 80, sub {
235 my ($sock) = @_;
236
237 $hdl =
238 AnyEvent::Handle->new (
239 fh => $sock,
240 on_eof => sub {
241 print "received eof\n";
242 undef $hdl
243 }
244 );
245
246 $hdl->push_write ("GET / HTTP/1.0\015\012\015\012");
247
248 $hdl->push_read_line (sub {
249 my ($hdl, $line) = @_;
250 print "Yay, got line: $line\n";
251 });
252
253 }, sub {
254 warn "Got error on connect: $!\n";
255 }, 10);
256
257=cut
258
259sub tcp_connect {
260 my ($host, $port, $c_cb, $e_cb, $tout, %sockargs) = @_;
261
262 my $o = AnyEvent::Util::SocketHandle->new (
263 connect_cb => $c_cb,
264 error_cb => $e_cb,
265 timeout => $tout,
266 );
267
268 $o->start_timeout;
269
270 AnyEvent::Util::inet_aton ($host, sub {
271 my ($addr) = @_;
272
273 return if $o->{timed_out};
274
275 if ($addr) {
276 my $sock =
277 IO::Socket::INET->new (
278 PeerHost => Socket::inet_ntoa ($addr),
279 PeerPort => $port,
280 Blocking => 0,
281 %sockargs
282 );
283
284 unless ($sock) {
285 $o->error;
286 }
287
288 fh_nonblocking ($sock, 1);
289
290 $o->{fh} = $sock;
291
292 $o->connect;
293
294 } else {
295 $! = ENXIO;
296 $o->error;
297 }
298 });
299
300 $o
301} 174}
302 175
303=item AnyEvent::Util::listen ($socket, $client_cb->($new_socket, $peer_ad), $error_cb->()) 176sub guard(&) {
304 177 bless \(my $cb = shift), AnyEvent::Util::Guard::
305This will listen and accept new connections on the C<$socket> in a non-blocking
306way. The callback C<$client_cb> will be called when a new client connection
307was accepted and the callback C<$error_cb> will be called in case of an error.
308C<$!> will be set to an approriate error number.
309
310The blocking state of C<$socket> will be set to nonblocking via C<fh_nonblocking> (see
311above).
312
313The first argument to C<$client_cb> will be the socket of the accepted client
314and the second argument the peer address.
315
316The return value is a guard object that you have to keep referenced as long as you
317want to accept new connections.
318
319Here is an example usage:
320
321 my $sock = IO::Socket::INET->new (
322 Listen => 5
323 ) or die "Couldn't make socket: $!\n";
324
325 my $watchobj = AnyEvent::Util::listen ($sock, sub {
326 my ($cl_sock, $cl_addr) = @_;
327
328 my ($port, $addr) = sockaddr_in ($cl_addr);
329 $addr = inet_ntoa ($addr);
330 print "Client connected: $addr:$port\n";
331
332 # ...
333
334 }, sub {
335 warn "Error on accept: $!"
336 });
337
338=cut
339
340sub listen {
341 my ($socket, $c_cb, $e_cb) = @_;
342
343 fh_nonblocking ($socket, 1);
344
345 my $o =
346 AnyEvent::Util::SocketHandle->new (
347 fh => $socket,
348 client_cb => $c_cb,
349 error_cb => $e_cb
350 );
351
352 $o->listen;
353
354 $o
355}
356
357package AnyEvent::Util::SocketHandle;
358use Errno qw/ETIMEDOUT/;
359use Socket;
360use Scalar::Util qw/weaken/;
361
362sub new {
363 my $this = shift;
364 my $class = ref($this) || $this;
365 my $self = { @_ };
366 bless $self, $class;
367
368 return $self
369}
370
371sub error {
372 my ($self) = @_;
373 delete $self->{con_w};
374 delete $self->{list_w};
375 delete $self->{tmout};
376 $self->{error_cb}->();
377}
378
379sub listen {
380 my ($self) = @_;
381
382 weaken $self;
383
384 $self->{list_w} =
385 AnyEvent->io (poll => 'r', fh => $self->{fh}, cb => sub {
386 my ($new_sock, $paddr) = $self->{fh}->accept ();
387
388 unless (defined $new_sock) {
389 $self->error;
390 return;
391 }
392
393 $self->{client_cb}->($new_sock, $paddr);
394 });
395}
396
397sub start_timeout {
398 my ($self) = @_;
399
400 if (defined $self->{timeout}) {
401 $self->{tmout} =
402 AnyEvent->timer (after => $self->{timeout}, cb => sub {
403 delete $self->{tmout};
404 $! = ETIMEDOUT;
405 $self->error;
406 $self->{timed_out} = 1;
407 });
408 }
409}
410
411sub connect {
412 my ($self) = @_;
413
414 weaken $self;
415
416 $self->start_timeout;
417
418 $self->{con_w} =
419 AnyEvent->io (poll => 'w', fh => $self->{fh}, cb => sub {
420 delete $self->{con_w};
421 delete $self->{tmout};
422
423 if ($! = $self->{fh}->sockopt (SO_ERROR)) {
424 $self->error;
425
426 } else {
427 $self->{connect_cb}->($self->{fh});
428 }
429 });
430} 178}
431 179
4321; 1801;
433 181
434=back 182=back

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines