1 |
=head1 NAME |
2 |
|
3 |
AnyEvent::Util - various utility functions. |
4 |
|
5 |
=head1 SYNOPSIS |
6 |
|
7 |
use AnyEvent::Util; |
8 |
|
9 |
=head1 DESCRIPTION |
10 |
|
11 |
This module implements various utility functions, mostly replacing |
12 |
well-known functions by event-ised counterparts. |
13 |
|
14 |
All functions documented without C<AnyEvent::Util::> prefix are exported |
15 |
by default. |
16 |
|
17 |
=over 4 |
18 |
|
19 |
=cut |
20 |
|
21 |
package AnyEvent::Util; |
22 |
|
23 |
use strict; |
24 |
|
25 |
no warnings "uninitialized"; |
26 |
|
27 |
use Carp (); |
28 |
use Errno (); |
29 |
use Socket (); |
30 |
use IO::Socket::INET (); |
31 |
|
32 |
use AnyEvent; |
33 |
|
34 |
use base 'Exporter'; |
35 |
|
36 |
BEGIN { |
37 |
*socket_inet_aton = \&Socket::inet_aton; # take a copy, in case Coro::LWP overrides it |
38 |
} |
39 |
|
40 |
our @EXPORT = qw(inet_aton fh_nonblocking guard tcp_server tcp_connect); |
41 |
|
42 |
our $VERSION = '1.0'; |
43 |
|
44 |
our $MAXPARALLEL = 16; # max. number of parallel jobs |
45 |
|
46 |
our $running; |
47 |
our @queue; |
48 |
|
49 |
sub _schedule; |
50 |
sub _schedule { |
51 |
return unless @queue; |
52 |
return if $running >= $MAXPARALLEL; |
53 |
|
54 |
++$running; |
55 |
my ($cb, $sub, @args) = @{shift @queue}; |
56 |
|
57 |
if (eval { local $SIG{__DIE__}; require POSIX }) { |
58 |
my $pid = open my $fh, "-|"; |
59 |
|
60 |
if (!defined $pid) { |
61 |
die "fork: $!"; |
62 |
} elsif (!$pid) { |
63 |
syswrite STDOUT, join "\0", map { unpack "H*", $_ } $sub->(@args); |
64 |
POSIX::_exit (0); |
65 |
} |
66 |
|
67 |
my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub { |
68 |
--$running; |
69 |
_schedule; |
70 |
undef $w; |
71 |
|
72 |
my $buf; |
73 |
sysread $fh, $buf, 16384, length $buf; |
74 |
$cb->(map { pack "H*", $_ } split /\0/, $buf); |
75 |
}); |
76 |
} else { |
77 |
$cb->($sub->(@args)); |
78 |
} |
79 |
} |
80 |
|
81 |
sub _do_asy { |
82 |
push @queue, [@_]; |
83 |
_schedule; |
84 |
} |
85 |
|
86 |
sub dotted_quad($) { |
87 |
$_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) |
88 |
\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) |
89 |
\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?) |
90 |
\.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x |
91 |
} |
92 |
|
93 |
my $has_ev_adns; |
94 |
|
95 |
sub has_ev_adns { |
96 |
($has_ev_adns ||= do { |
97 |
my $model = AnyEvent::detect; |
98 |
($model eq "AnyEvent::Impl::EV" && eval { local $SIG{__DIE__}; require EV::ADNS }) |
99 |
? 2 : 1 # so that || always detects as true |
100 |
}) - 1 # 2 => true, 1 => false |
101 |
} |
102 |
|
103 |
=item inet_aton $name_or_address, $cb->(@addresses) |
104 |
|
105 |
Works similarly to its Socket counterpart, except that it uses a |
106 |
callback. Also, if a host has only an IPv6 address, this might be passed |
107 |
to the callback instead (use the length to detect this - 4 for IPv4, 16 |
108 |
for IPv6). |
109 |
|
110 |
This function uses various shortcuts and will fall back to either |
111 |
L<EV::ADNS> or your systems C<inet_aton>. |
112 |
|
113 |
Unlike the L<Socket> function, you can get multiple IP addresses as result |
114 |
(currently only when EV::ADNS is being used). |
115 |
|
116 |
=cut |
117 |
|
118 |
sub inet_aton { |
119 |
my ($name, $cb) = @_; |
120 |
|
121 |
if (&dotted_quad) { |
122 |
$cb->(socket_inet_aton $name); |
123 |
} elsif ($name eq "localhost") { # rfc2606 et al. |
124 |
$cb->(v127.0.0.1); |
125 |
} elsif (&has_ev_adns) { |
126 |
# work around some idiotic ands rfc readings |
127 |
# rather hackish support for AAAA records (should |
128 |
# wait for adns_getaddrinfo...) |
129 |
|
130 |
my $loop = 10; # follow cname chains up to this length |
131 |
my $qt; |
132 |
my $acb; $acb = sub { |
133 |
my ($status, undef, @a) = @_; |
134 |
|
135 |
if ($status == &EV::ADNS::s_ok) { |
136 |
if ($qt eq "a") { |
137 |
return $cb->(map +(socket_inet_aton $_), @a); |
138 |
} elsif ($qt eq "aaaa") { |
139 |
return $cb->(@a); |
140 |
} elsif ($qt eq "cname") { |
141 |
$name = $a[0]; # there can only be one :) |
142 |
$qt = "a"; |
143 |
return EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb); |
144 |
} |
145 |
} elsif ($status == &EV::ADNS::s_prohibitedcname) { |
146 |
# follow cname chains |
147 |
if ($loop--) { |
148 |
$qt = "cname"; |
149 |
return EV::ADNS::submit ($name, &EV::ADNS::r_cname, 0, $acb); |
150 |
} |
151 |
} elsif ($status == &EV::ADNS::s_nodata) { |
152 |
if ($qt eq "a") { |
153 |
# ask for raw AAAA (might not be a good method, but adns is too broken...) |
154 |
$qt = "aaaa"; |
155 |
return EV::ADNS::submit ($name, &EV::ADNS::r_unknown | 28, 0, $acb); |
156 |
} |
157 |
} |
158 |
|
159 |
$cb->(); |
160 |
}; |
161 |
|
162 |
$qt = "a"; |
163 |
EV::ADNS::submit ($name, &EV::ADNS::r_a, 0, $acb); |
164 |
} else { |
165 |
_do_asy $cb, sub { |
166 |
my $ipn = socket_inet_aton $_[0]; |
167 |
$ipn ? ($ipn) : () |
168 |
}, @_; |
169 |
} |
170 |
} |
171 |
|
172 |
=item fh_nonblocking $fh, $nonblocking |
173 |
|
174 |
Sets the blocking state of the given filehandle (true == nonblocking, |
175 |
false == blocking). Uses fcntl on anything sensible and ioctl FIONBIO on |
176 |
broken (i.e. windows) platforms. |
177 |
|
178 |
=cut |
179 |
|
180 |
sub fh_nonblocking($$) { |
181 |
my ($fh, $nb) = @_; |
182 |
|
183 |
require Fcntl; |
184 |
|
185 |
if ($^O eq "MSWin32") { |
186 |
$nb = (! ! $nb) + 0; |
187 |
ioctl $fh, 0x8004667e, \$nb; # FIONBIO |
188 |
} else { |
189 |
fcntl $fh, &Fcntl::F_SETFL, $nb ? &Fcntl::O_NONBLOCK : 0; |
190 |
} |
191 |
} |
192 |
|
193 |
=item $guard = guard { CODE } |
194 |
|
195 |
This function creates a special object that, when called, will execute the |
196 |
code block. |
197 |
|
198 |
This is often handy in continuation-passing style code to clean up some |
199 |
resource regardless of where you break out of a process. |
200 |
|
201 |
=cut |
202 |
|
203 |
sub AnyEvent::Util::Guard::DESTROY { |
204 |
${$_[0]}->(); |
205 |
} |
206 |
|
207 |
sub guard(&) { |
208 |
bless \(my $cb = shift), AnyEvent::Util::Guard:: |
209 |
} |
210 |
|
211 |
sub _tcp_port($) { |
212 |
$_[0] =~ /^\d*$/ and return $1*1; |
213 |
|
214 |
(getservbyname $_[0], "tcp")[2] |
215 |
or Carp::croak "$_[0]: service unknown" |
216 |
} |
217 |
|
218 |
=item my $guard = AnyEvent::Util::tcp_connect $host, $port, $connect_cb[, $prepare_cb] |
219 |
|
220 |
This function is experimental. |
221 |
|
222 |
This is a convenience function that creates a tcp socket and makes a 100% |
223 |
non-blocking connect to the given C<$host> (which can be a hostname or a |
224 |
textual IP address) and C<$port> (which can be a numeric port number or a |
225 |
service name). |
226 |
|
227 |
Unless called in void context, it returns a guard object that will |
228 |
automatically abort connecting when it gets destroyed (it does not do |
229 |
anything to the socket after the connect was successful). |
230 |
|
231 |
If the connect is successful, then the C<$connect_cb> will be invoked with |
232 |
the socket filehandle (in non-blocking mode) as first and the peer host |
233 |
(as a textual IP address) and peer port as second and third arguments, |
234 |
respectively. |
235 |
|
236 |
If the connect is unsuccessful, then the C<$connect_cb> will be invoked |
237 |
without any arguments and C<$!> will be set appropriately (with C<ENXIO> |
238 |
indicating a dns resolution failure). |
239 |
|
240 |
The filehandle is suitable to be plugged into L<AnyEvent::Handle>, but can |
241 |
be used as a normal perl file handle as well. |
242 |
|
243 |
Sometimes you need to "prepare" the socket before connecting, for example, |
244 |
to C<bind> it to some port, or you want a specific connect timeout that |
245 |
is lower than your kernel's default timeout. In this case you can specify |
246 |
a second callback, C<$prepare_cb>. It will be called with the file handle |
247 |
in not-yet-connected state as only argument and must return the connection |
248 |
timeout value (or C<0>, C<undef> or the empty list to indicate the default |
249 |
timeout is to be used). |
250 |
|
251 |
Note that the socket could be either a IPv4 TCP socket or an IPv6 tcp |
252 |
socket (although only IPv4 is currently supported by this module). |
253 |
|
254 |
Simple Example: connect to localhost on port 22. |
255 |
|
256 |
AnyEvent::Util::tcp_connect localhost => 22, sub { |
257 |
my $fh = shift |
258 |
or die "unable to connect: $!"; |
259 |
# do something |
260 |
}; |
261 |
|
262 |
Complex Example: connect to www.google.com on port 80 and make a simple |
263 |
GET request without much error handling. Also limit the connection timeout |
264 |
to 15 seconds. |
265 |
|
266 |
AnyEvent::Util::tcp_connect "www.google.com", "http", |
267 |
sub { |
268 |
my ($fh) = @_ |
269 |
or die "unable to connect: $!"; |
270 |
|
271 |
my $handle; # avoid direct assignment so on_eof has it in scope. |
272 |
$handle = new AnyEvent::Handle |
273 |
fh => $fh, |
274 |
on_eof => sub { |
275 |
undef $handle; # keep it alive till eof |
276 |
warn "done.\n"; |
277 |
}; |
278 |
|
279 |
$handle->push_write ("GET / HTTP/1.0\015\012\015\012"); |
280 |
|
281 |
$handle->push_read_line ("\015\012\015\012", sub { |
282 |
my ($handle, $line) = @_; |
283 |
|
284 |
# print response header |
285 |
print "HEADER\n$line\n\nBODY\n"; |
286 |
|
287 |
$handle->on_read (sub { |
288 |
# print response body |
289 |
print $_[0]->rbuf; |
290 |
$_[0]->rbuf = ""; |
291 |
}); |
292 |
}); |
293 |
}, sub { |
294 |
my ($fh) = @_; |
295 |
# could call $fh->bind etc. here |
296 |
|
297 |
15 |
298 |
}; |
299 |
|
300 |
=cut |
301 |
|
302 |
sub tcp_connect($$$;$) { |
303 |
my ($host, $port, $connect, $prepare) = @_; |
304 |
|
305 |
# see http://cr.yp.to/docs/connect.html for some background |
306 |
|
307 |
my %state = ( fh => undef ); |
308 |
|
309 |
# name resolution |
310 |
inet_aton $host, sub { |
311 |
return unless exists $state{fh}; |
312 |
|
313 |
my $ipn = shift; |
314 |
|
315 |
4 == length $ipn |
316 |
or do { |
317 |
%state = (); |
318 |
$! = &Errno::ENXIO; |
319 |
return $connect->(); |
320 |
}; |
321 |
|
322 |
# socket creation |
323 |
socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0 |
324 |
or do { |
325 |
%state = (); |
326 |
return $connect->(); |
327 |
}; |
328 |
|
329 |
fh_nonblocking $state{fh}, 1; |
330 |
|
331 |
# prepare and optional timeout |
332 |
if ($prepare) { |
333 |
my $timeout = $prepare->($state{fh}); |
334 |
|
335 |
$state{to} = AnyEvent->timer (after => $timeout, cb => sub { |
336 |
%state = (); |
337 |
$! = &Errno::ETIMEDOUT; |
338 |
$connect->(); |
339 |
}) if $timeout; |
340 |
} |
341 |
|
342 |
# called when the connect was successful, which, |
343 |
# in theory, could be the case immediately (but never is in practise) |
344 |
my $connected = sub { |
345 |
my $fh = delete $state{fh}; |
346 |
%state = (); |
347 |
|
348 |
# we are connected, or maybe there was an error |
349 |
if (my $sin = getpeername $fh) { |
350 |
my ($port, $host) = Socket::unpack_sockaddr_in $sin; |
351 |
$connect->($fh, (Socket::inet_ntoa $host), $port); |
352 |
} else { |
353 |
# dummy read to fetch real error code |
354 |
sysread $fh, my $buf, 1 if $! == &Errno::ENOTCONN; |
355 |
$connect->(); |
356 |
} |
357 |
}; |
358 |
|
359 |
# now connect |
360 |
if (connect $state{fh}, Socket::pack_sockaddr_in _tcp_port $port, $ipn) { |
361 |
$connected->(); |
362 |
} elsif ($! == &Errno::EINPROGRESS || $! == &Errno::EWOULDBLOCK) { # EINPROGRESS is POSIX |
363 |
$state{ww} = AnyEvent->io (fh => $state{fh}, poll => 'w', cb => $connected); |
364 |
} else { |
365 |
%state = (); |
366 |
$connect->(); |
367 |
} |
368 |
}; |
369 |
|
370 |
defined wantarray |
371 |
? guard { %state = () } # break any circular dependencies and unregister watchers |
372 |
: () |
373 |
} |
374 |
|
375 |
=item $guard = AnyEvent::Util::tcp_server $host, $port, $accept_cb[, $prepare_cb] |
376 |
|
377 |
This function is experimental. |
378 |
|
379 |
Create and bind a tcp socket to the given host (any IPv4 host if undef, |
380 |
otherwise it must be an IPv4 or IPv6 address) and port (service name or |
381 |
numeric port number, or an ephemeral port if given as zero or undef), set |
382 |
the SO_REUSEADDR flag and call C<listen>. |
383 |
|
384 |
For each new connection that could be C<accept>ed, call the C<$accept_cb> |
385 |
with the filehandle (in non-blocking mode) as first and the peer host and |
386 |
port as second and third arguments (see C<tcp_connect> for details). |
387 |
|
388 |
Croaks on any errors. |
389 |
|
390 |
If called in non-void context, then this function returns a guard object |
391 |
whose lifetime it tied to the tcp server: If the object gets destroyed, |
392 |
the server will be stopped (but existing accepted connections will |
393 |
continue). |
394 |
|
395 |
If you need more control over the listening socket, you can provide a |
396 |
C<$prepare_cb>, which is called just before the C<listen ()> call, with |
397 |
the listen file handle as first argument. |
398 |
|
399 |
It should return the length of the listen queue (or C<0> for the default). |
400 |
|
401 |
Example: bind on tcp port 8888 on the local machine and tell each client |
402 |
to go away. |
403 |
|
404 |
AnyEvent::Util::tcp_server undef, 8888, sub { |
405 |
my ($fh, $host, $port) = @_; |
406 |
|
407 |
syswrite $fh, "The internet is full, $host:$port. Go away!\015\012"; |
408 |
}; |
409 |
|
410 |
=cut |
411 |
|
412 |
sub tcp_server($$$;$) { |
413 |
my ($host, $port, $accept, $prepare) = @_; |
414 |
|
415 |
my %state; |
416 |
|
417 |
socket $state{fh}, &Socket::AF_INET, &Socket::SOCK_STREAM, 0 |
418 |
or Carp::croak "socket: $!"; |
419 |
|
420 |
setsockopt $state{fh}, &Socket::SOL_SOCKET, &Socket::SO_REUSEADDR, 1 |
421 |
or Carp::croak "so_reuseaddr: $!"; |
422 |
|
423 |
bind $state{fh}, Socket::pack_sockaddr_in _tcp_port $port, socket_inet_aton ($host || "0.0.0.0") |
424 |
or Carp::croak "bind: $!"; |
425 |
|
426 |
fh_nonblocking $state{fh}, 1; |
427 |
|
428 |
my $len = ($prepare && $prepare->($state{fh})) || 128; |
429 |
|
430 |
listen $state{fh}, $len |
431 |
or Carp::croak "listen: $!"; |
432 |
|
433 |
$state{aw} = AnyEvent->io (fh => $state{fh}, poll => 'r', cb => sub { |
434 |
# this closure keeps $state alive |
435 |
while (my $peer = accept my $fh, $state{fh}) { |
436 |
fh_nonblocking $fh, 1; # POSIX requires inheritance, the outside world does not |
437 |
my ($port, $host) = Socket::unpack_sockaddr_in $peer; |
438 |
$accept->($fh, (Socket::inet_ntoa $host), $port); |
439 |
} |
440 |
}); |
441 |
|
442 |
defined wantarray |
443 |
? guard { %state = () } # clear fh and watcher, which breaks the circular dependency |
444 |
: () |
445 |
} |
446 |
|
447 |
1; |
448 |
|
449 |
=back |
450 |
|
451 |
=head1 AUTHOR |
452 |
|
453 |
Marc Lehmann <schmorp@schmorp.de> |
454 |
http://home.schmorp.de/ |
455 |
|
456 |
=cut |
457 |
|