ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Util.pm
Revision: 1.18
Committed: Sun Nov 25 13:53:48 2007 UTC (16 years, 6 months ago) by root
Branch: MAIN
CVS Tags: rel-4_22
Changes since 1.17: +4 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     Coro::Util - various utility functions.
4    
5     =head1 SYNOPSIS
6    
7     use Coro::Util;
8    
9     =head1 DESCRIPTION
10    
11     This module implements various utility functions, mostly replacing perl
12     functions by non-blocking counterparts.
13    
14 root 1.4 This module is an AnyEvent user. Refer to the L<AnyEvent|AnyEvent>
15     documentation to see how to integrate it into your own programs.
16    
17 root 1.1 =over 4
18    
19     =cut
20    
21     package Coro::Util;
22    
23 root 1.5 use strict;
24    
25 root 1.4 no warnings "uninitialized";
26    
27     use AnyEvent;
28 root 1.1
29 root 1.4 use Coro::State;
30 root 1.8 use Coro::Handle;
31     use Coro::Storable ();
32 root 1.1 use Coro::Semaphore;
33    
34     use base 'Exporter';
35    
36 root 1.5 our @EXPORT = qw(gethostbyname gethostbyaddr);
37 root 1.8 our @EXPORT_OK = qw(inet_aton fork_eval);
38 root 1.1
39 root 1.5 our $VERSION = 2.0;
40 root 1.1
41 root 1.5 our $MAXPARALLEL = 16; # max. number of parallel jobs
42 root 1.1
43     my $jobs = new Coro::Semaphore $MAXPARALLEL;
44    
45     sub _do_asy(&;@) {
46     my $sub = shift;
47     $jobs->down;
48     my $fh;
49 root 1.6
50 root 1.12 my $pid = open $fh, "-|";
51    
52     if (!defined $pid) {
53     die "fork: $!";
54     } elsif (!$pid) {
55 root 1.1 syswrite STDOUT, join "\0", map { unpack "H*", $_ } &$sub;
56 root 1.4 Coro::State::_exit 0;
57 root 1.1 }
58 root 1.6
59 root 1.1 my $buf;
60 root 1.4 my $current = $Coro::current;
61     my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
62     sysread $fh, $buf, 16384, length $buf
63     and return;
64    
65     undef $w;
66     $current->ready;
67     });
68 root 1.6
69     &Coro::schedule;
70     &Coro::schedule while $w;
71    
72 root 1.1 $jobs->up;
73     my @r = map { pack "H*", $_ } split /\0/, $buf;
74     wantarray ? @r : $r[0];
75     }
76    
77 root 1.5 sub dotted_quad($) {
78 root 1.11 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
79     \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
80     \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
81     \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
82 root 1.5 }
83    
84 root 1.1 =item gethostbyname, gethostbyaddr
85    
86     Work exactly like their perl counterparts, but do not block. Currently
87 root 1.5 this is being implemented with forking, so it's not exactly low-cost.
88 root 1.1
89     =cut
90    
91 root 1.4 my $netdns = eval { die; require Net::DNS::Resolver; new Net::DNS::Resolver; };
92    
93 root 1.1 sub gethostbyname($) {
94 root 1.18 my $model = AnyEvent::detect;
95 root 1.4 if ($netdns) {
96     #$netdns->query($_[0]);
97     die;
98 root 1.18 } elsif ($model eq "AnyEvent::Impl::CoroEV" or $model eq "AnyEvent::Impl::EV") {
99 root 1.17 require EV::DNS;
100     require Socket;
101    
102     my $current = $Coro::current;
103     my ($result, @ptrs);
104    
105     EV::DNS::resolve_ipv4 ($_[0], 0, sub {
106     ($result, undef, undef, @ptrs) = @_;
107     $current->ready;
108     });
109     Coro::schedule while !defined $result;
110     return @ptrs
111     ? ($_[0], undef, &Socket::AF_INET, 4, @ptrs)
112     : ();
113 root 1.4 } else {
114 root 1.15 return _do_asy { gethostbyname $_[0] } @_
115 root 1.4 }
116 root 1.1 }
117    
118     sub gethostbyaddr($$) {
119 root 1.4 if ($netdns) {
120     die;
121     } else {
122 root 1.5 _do_asy { gethostbyaddr $_[0], $_[1] } @_
123     }
124     }
125    
126     =item Coro::Util::inet_aton
127    
128     Works almost exactly like its Socket counterpart, except that it does not
129     block. Is implemented with forking, so not exactly low-cost.
130    
131     =cut
132    
133     use Socket;
134    
135     our $inet_aton = \&Socket::inet_aton;
136    
137     sub inet_aton {
138     require Socket;
139    
140 root 1.18 my $model = AnyEvent::detect;
141 root 1.5 if (dotted_quad $_[0]) {
142     $inet_aton->($_[0])
143 root 1.18 } elsif ($model eq "AnyEvent::Impl::CoroEV" or $model eq "AnyEvent::Impl::EV") {
144 root 1.17 require EV::DNS;
145     require Socket;
146    
147     my $current = $Coro::current;
148     my ($result, @ptrs);
149    
150     EV::DNS::resolve_ipv4 ($_[0], 0, sub {
151     ($result, undef, undef, @ptrs) = @_;
152     $current->ready;
153     });
154     Coro::schedule while !defined $result;
155     return $ptrs[0];
156 root 1.5 } else {
157 root 1.15 return _do_asy { $inet_aton->($_[0]) } @_
158 root 1.4 }
159 root 1.1 }
160    
161 root 1.8 =item @result = Coro::Util::fork_eval { ... }, @args
162    
163     Executes the given code block or code reference with the given arguments
164     in a separate process, returning the results. The return values must be
165     serialisable with Coro::Storable. It may, of course, block.
166    
167 root 1.10 Note that using event handling in the sub is not usually a good idea as
168     you will inherit a mixed set of watchers from the parent.
169 root 1.8
170     Exceptions will be correctly forwarded to the caller.
171    
172     This function is useful for pushing cpu-intensive computations into a
173     different process, for example to take advantage of multiple CPU's. Its
174     also useful if you want to simply run some blocking functions (such as
175     C<system()>) and do not care about the overhead enough to code your own
176     pid watcher etc.
177    
178 root 1.10 This function might keep a pool of processes in some future version, as
179     fork can be rather slow in large processes.
180    
181 root 1.8 Example: execute some external program (convert image to rgba raw form)
182     and add a long computation (extract the alpha channel) in a separate
183     process, making sure that never more then $NUMCPUS processes are being
184     run.
185    
186     my $cpulock = new Coro::Semaphore $NUMCPUS;
187    
188     sub do_it {
189     my ($path) = @_;
190    
191     my $guard = $cpulock->guard;
192    
193     Coro::Util::fork_eval {
194     open my $fh, "convert -depth 8 \Q$path\E rgba:"
195     or die "$path: $!";
196    
197     local $/;
198     # make my eyes hurt
199     pack "C*", unpack "(xxxC)*", <$fh>
200     }
201     }
202    
203     my $alphachannel = do_it "/tmp/img.png";
204    
205     =cut
206    
207     sub fork_eval(&@) {
208     my ($cb, @args) = @_;
209    
210     pipe my $fh1, my $fh2
211     or die "pipe: $!";
212    
213     my $pid = fork;
214    
215     if ($pid) {
216     undef $fh2;
217    
218     my $res = Coro::Storable::thaw +(Coro::Handle::unblock $fh1)->readline (undef);
219     waitpid $pid, 0; # should not block, we expect the child to simply behave
220    
221     die $$res unless "ARRAY" eq ref $res;
222    
223     return wantarray ? @$res : $res->[-1];
224    
225     } elsif (defined $pid) {
226     delete $SIG{__WARN__};
227     delete $SIG{__DIE__};
228     # just in case, this hack effectively disables event processing
229 root 1.9 # in the child. cleaner and slower would be to canceling all
230     # event watchers, but we are event-model agnostic.
231 root 1.8 undef $Coro::idle;
232     $Coro::current->prio (Coro::PRIO_MAX);
233    
234     eval {
235     undef $fh1;
236    
237     my @res = eval { $cb->(@args) };
238    
239     open my $fh, ">", \my $buf
240     or die "fork_eval: cannot open fh-to-buf in child: $!";
241     Storable::store_fd $@ ? \"$@" : \@res, $fh;
242     close $fh;
243    
244     syswrite $fh2, $buf;
245     close $fh2;
246     };
247    
248     warn $@ if $@;
249     Coro::State::_exit 0;
250    
251     } else {
252     die "fork_eval: $!";
253     }
254     }
255    
256 root 1.14 # make sure store_fd is preloaded
257     eval { Storable::store_fd undef, undef };
258    
259 root 1.1 1;
260    
261 root 1.7 =back
262    
263 root 1.1 =head1 AUTHOR
264    
265 root 1.4 Marc Lehmann <schmorp@schmorp.de>
266     http://home.schmorp.de/
267 root 1.1
268     =cut
269