1 |
root |
1.1 |
=head1 NAME |
2 |
|
|
|
3 |
|
|
AnyEvent::Util - various utility functions. |
4 |
|
|
|
5 |
|
|
=head1 SYNOPSIS |
6 |
|
|
|
7 |
|
|
use AnyEvent::Util; |
8 |
|
|
|
9 |
root |
1.4 |
inet_aton $name, $cb->($ipn || undef); |
10 |
root |
1.1 |
|
11 |
|
|
=head1 DESCRIPTION |
12 |
|
|
|
13 |
|
|
This module implements various utility functions, mostly replacing |
14 |
|
|
well-known functions by event-ised counterparts. |
15 |
|
|
|
16 |
|
|
=over 4 |
17 |
|
|
|
18 |
|
|
=cut |
19 |
|
|
|
20 |
|
|
package AnyEvent::Util; |
21 |
|
|
|
22 |
|
|
use strict; |
23 |
|
|
|
24 |
|
|
no warnings "uninitialized"; |
25 |
|
|
|
26 |
root |
1.11 |
use Errno; |
27 |
root |
1.1 |
use Socket (); |
28 |
elmex |
1.10 |
use IO::Socket::INET (); |
29 |
root |
1.1 |
|
30 |
|
|
use AnyEvent; |
31 |
|
|
|
32 |
|
|
use base 'Exporter'; |
33 |
|
|
|
34 |
|
|
#our @EXPORT = qw(gethostbyname gethostbyaddr); |
35 |
root |
1.4 |
our @EXPORT_OK = qw(inet_aton); |
36 |
root |
1.1 |
|
37 |
|
|
our $VERSION = '1.0'; |
38 |
|
|
|
39 |
|
|
our $MAXPARALLEL = 16; # max. number of parallel jobs |
40 |
|
|
|
41 |
|
|
our $running; |
42 |
|
|
our @queue; |
43 |
|
|
|
44 |
|
|
sub _schedule; |
45 |
|
|
sub _schedule { |
46 |
|
|
return unless @queue; |
47 |
|
|
return if $running >= $MAXPARALLEL; |
48 |
|
|
|
49 |
|
|
++$running; |
50 |
|
|
my ($cb, $sub, @args) = @{shift @queue}; |
51 |
|
|
|
52 |
root |
1.2 |
if (eval { local $SIG{__DIE__}; require POSIX }) { |
53 |
root |
1.1 |
my $pid = open my $fh, "-|"; |
54 |
|
|
|
55 |
|
|
if (!defined $pid) { |
56 |
|
|
die "fork: $!"; |
57 |
|
|
} elsif (!$pid) { |
58 |
|
|
syswrite STDOUT, join "\0", map { unpack "H*", $_ } $sub->(@args); |
59 |
|
|
POSIX::_exit (0); |
60 |
|
|
} |
61 |
|
|
|
62 |
|
|
my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { |
63 |
|
|
--$running; |
64 |
|
|
_schedule; |
65 |
|
|
undef $w; |
66 |
|
|
|
67 |
|
|
my $buf; |
68 |
|
|
sysread $fh, $buf, 16384, length $buf; |
69 |
|
|
$cb->(map { pack "H*", $_ } split /\0/, $buf); |
70 |
|
|
}); |
71 |
|
|
} else { |
72 |
|
|
$cb->($sub->(@args)); |
73 |
|
|
} |
74 |
|
|
} |
75 |
|
|
|
76 |
|
|
sub _do_asy { |
77 |
|
|
push @queue, [@_]; |
78 |
|
|
_schedule; |
79 |
|
|
} |
80 |
|
|
|
81 |
|
|
sub dotted_quad($) { |
82 |
|
|
$_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) |
83 |
|
|
\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) |
84 |
|
|
\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) |
85 |
|
|
\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x |
86 |
|
|
} |
87 |
|
|
|
88 |
|
|
my $has_ev_adns; |
89 |
|
|
|
90 |
|
|
sub has_ev_adns { |
91 |
|
|
($has_ev_adns ||= do { |
92 |
|
|
my $model = AnyEvent::detect; |
93 |
root |
1.7 |
($model eq "AnyEvent::Impl::EV" && eval { local $SIG{__DIE__}; require EV::ADNS }) |
94 |
root |
1.3 |
? 2 : 1 # so that || always detects as true |
95 |
|
|
}) - 1 # 2 => true, 1 => false |
96 |
root |
1.1 |
} |
97 |
|
|
|
98 |
|
|
=item AnyEvent::Util::inet_aton $name_or_address, $cb->($binary_address_or_undef) |
99 |
|
|
|
100 |
|
|
Works almost exactly like its Socket counterpart, except that it uses a |
101 |
|
|
callback. |
102 |
|
|
|
103 |
|
|
=cut |
104 |
|
|
|
105 |
|
|
sub inet_aton { |
106 |
|
|
my ($name, $cb) = @_; |
107 |
|
|
|
108 |
|
|
if (&dotted_quad) { |
109 |
|
|
$cb->(Socket::inet_aton $name); |
110 |
root |
1.6 |
} elsif ($name eq "localhost") { # rfc2606 et al. |
111 |
|
|
$cb->(v127.0.0.1); |
112 |
root |
1.4 |
} elsif (&has_ev_adns) { |
113 |
root |
1.5 |
EV::ADNS::submit ($name, &EV::ADNS::r_addr, 0, sub { |
114 |
root |
1.4 |
my (undef, undef, @a) = @_; |
115 |
root |
1.1 |
$cb->(@a ? Socket::inet_aton $a[0] : undef); |
116 |
|
|
}); |
117 |
|
|
} else { |
118 |
|
|
_do_asy $cb, sub { Socket::inet_aton $_[0] }, @_; |
119 |
|
|
} |
120 |
|
|
} |
121 |
|
|
|
122 |
root |
1.6 |
=item AnyEvent::Util::fh_nonblocking $fh, $nonblocking |
123 |
|
|
|
124 |
|
|
Sets the blocking state of the given filehandle (true == nonblocking, |
125 |
|
|
false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on |
126 |
|
|
broken (i.e. windows) platforms. |
127 |
|
|
|
128 |
|
|
=cut |
129 |
|
|
|
130 |
|
|
sub fh_nonblocking($$) { |
131 |
|
|
my ($fh, $nb) = @_; |
132 |
|
|
|
133 |
|
|
require Fcntl; |
134 |
|
|
|
135 |
|
|
if ($^O eq "MSWin32") { |
136 |
|
|
$nb = (! ! $nb) + 0; |
137 |
|
|
ioctl $fh, 0x8004667e, \$nb; # FIONBIO |
138 |
|
|
} else { |
139 |
|
|
fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0; |
140 |
|
|
} |
141 |
|
|
} |
142 |
|
|
|
143 |
root |
1.11 |
sub AnyEvent::Util::Guard::DESTROY { |
144 |
|
|
${$_[0]}->(); |
145 |
|
|
} |
146 |
elmex |
1.8 |
|
147 |
root |
1.11 |
=item $guard = AnyEvent::Util::guard { CODE } |
148 |
elmex |
1.8 |
|
149 |
root |
1.11 |
This function creates a special object that, when called, will execute the |
150 |
|
|
code block. |
151 |
elmex |
1.8 |
|
152 |
root |
1.11 |
This is often handy in continuation-passing style code to clean up some |
153 |
|
|
resource regardless of where you break out of a process. |
154 |
elmex |
1.8 |
|
155 |
|
|
=cut |
156 |
|
|
|
157 |
root |
1.11 |
sub guard(&) { |
158 |
|
|
bless \(my $cb = shift), AnyEvent::Util::Guard:: |
159 |
elmex |
1.8 |
} |
160 |
|
|
|
161 |
root |
1.11 |
=item my $guard = AnyEvent::Util::tcp_connect $host, $port, $connect_cb[, $prepare_cb] |
162 |
elmex |
1.10 |
|
163 |
root |
1.11 |
This is a convenience function that creates a tcp socket and makes a 100% |
164 |
|
|
non-blocking connect to the given C<$host> (which can be a hostname or a |
165 |
|
|
textual IP address) and C<$port>. |
166 |
|
|
|
167 |
|
|
Unless called in void context, it returns a guard object that will |
168 |
|
|
automatically abort connecting when it gets destroyed (it does not do |
169 |
|
|
anything to the socket after the conenct was successful). |
170 |
|
|
|
171 |
|
|
If the connect is successful, then the C<$connect_cb> will be invoked with |
172 |
|
|
the socket filehandle (in non-blocking mode) as first and the peer host |
173 |
|
|
(as a textual IP address) and peer port as second and third arguments, |
174 |
|
|
respectively. |
175 |
|
|
|
176 |
|
|
If the connect is unsuccessful, then the C<$connect_cb> will be invoked |
177 |
|
|
without any arguments and C<$!> will be set appropriately (with C<ENXIO> |
178 |
|
|
indicating a dns resolution failure). |
179 |
|
|
|
180 |
|
|
The filehandle is suitable to be plugged into L<AnyEvent::Handle>, but can |
181 |
|
|
be used as a normal perl file handle as well. |
182 |
|
|
|
183 |
|
|
Sometimes you need to "prepare" the socket before connecting, for example, |
184 |
|
|
to C<bind> it to some port, or you want a specific connect timeout that |
185 |
|
|
is lower than your kernel's default timeout. In this case you can specify |
186 |
|
|
a second callback, C<$prepare_cb>. It will be called with the file handle |
187 |
|
|
in not-yet-connected state as only argument and must return the connection |
188 |
|
|
timeout value (or C<0>, C<undef> or the empty list to indicate the default |
189 |
|
|
timeout is to be used). |
190 |
|
|
|
191 |
|
|
Note that the socket could be either a IPv4 TCP socket or an IPv6 tcp |
192 |
|
|
socket (although only IPv4 is currently supported by this module). |
193 |
|
|
|
194 |
|
|
Simple Example: connect to localhost on port 22. |
195 |
|
|
|
196 |
|
|
AnyEvent::Util::tcp_connect localhost => 22, sub { |
197 |
|
|
my $fh = shift |
198 |
|
|
or die "unable to connect: $!"; |
199 |
|
|
# do something |
200 |
|
|
}; |
201 |
|
|
|
202 |
|
|
Complex Example: connect to www.google.com on port 80 and make a simple |
203 |
|
|
GET request without much error handling. Also limit the connection timeout |
204 |
|
|
to 15 seconds. |
205 |
|
|
|
206 |
|
|
AnyEvent::Util::tcp_connect "www.google.com", 80, |
207 |
|
|
sub { |
208 |
|
|
my ($fh) = @_ |
209 |
|
|
or die "unable to connect: $!"; |
210 |
|
|
|
211 |
|
|
my $handle; # avoid direct assignment so on_eof has it in scope. |
212 |
|
|
$handle = new AnyEvent::Handle |
213 |
|
|
fh => $fh, |
214 |
|
|
on_eof => sub { |
215 |
|
|
undef $handle; # keep it alive till eof |
216 |
|
|
warn "done.\n"; |
217 |
|
|
}; |
218 |
|
|
|
219 |
|
|
$handle->push_write ("GET / HTTP/1.0\015\012\015\012"); |
220 |
|
|
|
221 |
|
|
$handle->push_read_line ("\015\012\015\012", sub { |
222 |
|
|
my ($handle, $line) = @_; |
223 |
|
|
|
224 |
|
|
# print response header |
225 |
|
|
print "HEADER\n$line\n\nBODY\n"; |
226 |
|
|
|
227 |
|
|
$handle->on_read (sub { |
228 |
|
|
# print response body |
229 |
|
|
print $_[0]->rbuf; |
230 |
|
|
$_[0]->rbuf = ""; |
231 |
|
|
}); |
232 |
|
|
}); |
233 |
|
|
}, sub { |
234 |
|
|
my ($fh) = @_; |
235 |
|
|
# could call $fh->bind etc. here |
236 |
elmex |
1.10 |
|
237 |
root |
1.11 |
15 |
238 |
|
|
}; |
239 |
elmex |
1.10 |
|
240 |
|
|
|
241 |
root |
1.11 |
=cut |
242 |
elmex |
1.10 |
|
243 |
root |
1.11 |
sub tcp_connect($$$;$) { |
244 |
|
|
my ($host, $port, $connect, $prepare) = @_; |
245 |
elmex |
1.10 |
|
246 |
root |
1.11 |
# see http://cr.yp.to/docs/connect.html for some background |
247 |
elmex |
1.10 |
|
248 |
root |
1.11 |
my $state = {}; |
249 |
elmex |
1.10 |
|
250 |
root |
1.11 |
# name resolution |
251 |
|
|
inet_aton $host, sub { |
252 |
|
|
return unless $state; |
253 |
|
|
|
254 |
|
|
my $ipn = shift |
255 |
|
|
or do { |
256 |
|
|
undef $state; |
257 |
|
|
$! = &Errno::ENXIO; |
258 |
|
|
return $connect->(); |
259 |
|
|
}; |
260 |
|
|
|
261 |
|
|
# socket creation |
262 |
|
|
socket $state->{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0 |
263 |
|
|
or do { |
264 |
|
|
undef $state; |
265 |
|
|
return $connect->(); |
266 |
|
|
}; |
267 |
|
|
|
268 |
|
|
fh_nonblocking $state->{fh}, 1; |
269 |
|
|
|
270 |
|
|
# prepare and optional timeout |
271 |
|
|
if ($prepare) { |
272 |
|
|
my $timeout = $prepare->($state->{fh}); |
273 |
|
|
|
274 |
|
|
$state->{to} = AnyEvent->timer (after => $timeout, cb => sub { |
275 |
|
|
undef $state; |
276 |
|
|
$! = &Errno::ETIMEDOUT; |
277 |
|
|
$connect->(); |
278 |
|
|
}) if $timeout; |
279 |
|
|
} |
280 |
elmex |
1.10 |
|
281 |
root |
1.11 |
# called when the connect was successful, which, |
282 |
|
|
# in theory, could be the case immediately (but never is in practise) |
283 |
|
|
my $connected = sub { |
284 |
|
|
my $fh = delete $state->{fh}; |
285 |
|
|
undef $state; |
286 |
|
|
|
287 |
|
|
# we are connected, or maybe there was an error |
288 |
|
|
if (my $sin = getpeername $fh) { |
289 |
|
|
my ($port, $host) = Socket::unpack_sockaddr_in $sin; |
290 |
|
|
$connect->($fh, (Socket::inet_ntoa $host), $port); |
291 |
|
|
} else { |
292 |
|
|
# dummy read to fetch real error code |
293 |
|
|
sysread $fh, my $buf, 1; |
294 |
|
|
$connect->(); |
295 |
elmex |
1.10 |
} |
296 |
root |
1.11 |
}; |
297 |
elmex |
1.10 |
|
298 |
root |
1.11 |
# now connect |
299 |
|
|
if (connect $state->{fh}, Socket::pack_sockaddr_in $port, $ipn) { |
300 |
|
|
$connected->(); |
301 |
|
|
} elsif ($! == &Errno::EINPROGRESS || $! == &Errno::EWOULDBLOCK) { # EINPROGRESS is POSIX |
302 |
|
|
$state->{ww} = AnyEvent->io (fh => $state->{fh}, poll => 'w', cb => $connected); |
303 |
elmex |
1.10 |
} else { |
304 |
root |
1.11 |
undef $state; |
305 |
|
|
$connect->(); |
306 |
elmex |
1.10 |
} |
307 |
root |
1.11 |
}; |
308 |
elmex |
1.10 |
|
309 |
root |
1.11 |
defined wantarray |
310 |
|
|
? guard { undef $state } |
311 |
|
|
: () |
312 |
elmex |
1.10 |
} |
313 |
|
|
|
314 |
root |
1.11 |
=item AnyEvent::Util::tcp_server $host, $port, $accept_cb[, $prepare_cb] |
315 |
|
|
|
316 |
|
|
#TODO# |
317 |
elmex |
1.9 |
|
318 |
|
|
This will listen and accept new connections on the C<$socket> in a non-blocking |
319 |
|
|
way. The callback C<$client_cb> will be called when a new client connection |
320 |
|
|
was accepted and the callback C<$error_cb> will be called in case of an error. |
321 |
|
|
C<$!> will be set to an approriate error number. |
322 |
|
|
|
323 |
|
|
The blocking state of C<$socket> will be set to nonblocking via C<fh_nonblocking> (see |
324 |
|
|
above). |
325 |
|
|
|
326 |
|
|
The first argument to C<$client_cb> will be the socket of the accepted client |
327 |
|
|
and the second argument the peer address. |
328 |
|
|
|
329 |
|
|
The return value is a guard object that you have to keep referenced as long as you |
330 |
|
|
want to accept new connections. |
331 |
|
|
|
332 |
|
|
Here is an example usage: |
333 |
|
|
|
334 |
|
|
my $sock = IO::Socket::INET->new ( |
335 |
|
|
Listen => 5 |
336 |
|
|
) or die "Couldn't make socket: $!\n"; |
337 |
|
|
|
338 |
|
|
my $watchobj = AnyEvent::Util::listen ($sock, sub { |
339 |
|
|
my ($cl_sock, $cl_addr) = @_; |
340 |
|
|
|
341 |
|
|
my ($port, $addr) = sockaddr_in ($cl_addr); |
342 |
|
|
$addr = inet_ntoa ($addr); |
343 |
|
|
print "Client connected: $addr:$port\n"; |
344 |
|
|
|
345 |
|
|
# ... |
346 |
|
|
|
347 |
|
|
}, sub { |
348 |
|
|
warn "Error on accept: $!" |
349 |
|
|
}); |
350 |
|
|
|
351 |
|
|
=cut |
352 |
|
|
|
353 |
|
|
sub listen { |
354 |
|
|
my ($socket, $c_cb, $e_cb) = @_; |
355 |
|
|
|
356 |
|
|
fh_nonblocking ($socket, 1); |
357 |
|
|
|
358 |
|
|
my $o = |
359 |
|
|
AnyEvent::Util::SocketHandle->new ( |
360 |
|
|
fh => $socket, |
361 |
|
|
client_cb => $c_cb, |
362 |
|
|
error_cb => $e_cb |
363 |
|
|
); |
364 |
|
|
|
365 |
|
|
$o->listen; |
366 |
|
|
|
367 |
|
|
$o |
368 |
|
|
} |
369 |
|
|
|
370 |
elmex |
1.8 |
package AnyEvent::Util::SocketHandle; |
371 |
|
|
use Errno qw/ETIMEDOUT/; |
372 |
|
|
use Socket; |
373 |
|
|
use Scalar::Util qw/weaken/; |
374 |
|
|
|
375 |
|
|
sub new { |
376 |
|
|
my $this = shift; |
377 |
|
|
my $class = ref($this) || $this; |
378 |
|
|
my $self = { @_ }; |
379 |
|
|
bless $self, $class; |
380 |
|
|
|
381 |
|
|
return $self |
382 |
|
|
} |
383 |
|
|
|
384 |
|
|
sub error { |
385 |
|
|
my ($self) = @_; |
386 |
|
|
delete $self->{con_w}; |
387 |
elmex |
1.9 |
delete $self->{list_w}; |
388 |
elmex |
1.8 |
delete $self->{tmout}; |
389 |
|
|
$self->{error_cb}->(); |
390 |
|
|
} |
391 |
|
|
|
392 |
elmex |
1.9 |
sub listen { |
393 |
|
|
my ($self) = @_; |
394 |
|
|
|
395 |
|
|
weaken $self; |
396 |
|
|
|
397 |
|
|
$self->{list_w} = |
398 |
|
|
AnyEvent->io (poll => 'r', fh => $self->{fh}, cb => sub { |
399 |
|
|
my ($new_sock, $paddr) = $self->{fh}->accept (); |
400 |
|
|
|
401 |
|
|
unless (defined $new_sock) { |
402 |
|
|
$self->error; |
403 |
|
|
return; |
404 |
|
|
} |
405 |
|
|
|
406 |
|
|
$self->{client_cb}->($new_sock, $paddr); |
407 |
|
|
}); |
408 |
|
|
} |
409 |
|
|
|
410 |
elmex |
1.10 |
sub start_timeout { |
411 |
elmex |
1.8 |
my ($self) = @_; |
412 |
|
|
|
413 |
|
|
if (defined $self->{timeout}) { |
414 |
|
|
$self->{tmout} = |
415 |
|
|
AnyEvent->timer (after => $self->{timeout}, cb => sub { |
416 |
elmex |
1.10 |
delete $self->{tmout}; |
417 |
elmex |
1.8 |
$! = ETIMEDOUT; |
418 |
|
|
$self->error; |
419 |
elmex |
1.10 |
$self->{timed_out} = 1; |
420 |
elmex |
1.8 |
}); |
421 |
|
|
} |
422 |
elmex |
1.10 |
} |
423 |
|
|
|
424 |
|
|
sub connect { |
425 |
|
|
my ($self) = @_; |
426 |
|
|
|
427 |
|
|
weaken $self; |
428 |
|
|
|
429 |
|
|
$self->start_timeout; |
430 |
elmex |
1.8 |
|
431 |
|
|
$self->{con_w} = |
432 |
|
|
AnyEvent->io (poll => 'w', fh => $self->{fh}, cb => sub { |
433 |
|
|
delete $self->{con_w}; |
434 |
|
|
delete $self->{tmout}; |
435 |
|
|
|
436 |
|
|
if ($! = $self->{fh}->sockopt (SO_ERROR)) { |
437 |
|
|
$self->error; |
438 |
|
|
|
439 |
|
|
} else { |
440 |
|
|
$self->{connect_cb}->($self->{fh}); |
441 |
|
|
} |
442 |
|
|
}); |
443 |
|
|
} |
444 |
|
|
|
445 |
root |
1.1 |
1; |
446 |
|
|
|
447 |
|
|
=back |
448 |
|
|
|
449 |
|
|
=head1 AUTHOR |
450 |
|
|
|
451 |
|
|
Marc Lehmann <schmorp@schmorp.de> |
452 |
|
|
http://home.schmorp.de/ |
453 |
|
|
|
454 |
|
|
=cut |
455 |
|
|
|