ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Util.pm
Revision: 1.18
Committed: Sun Nov 25 13:53:48 2007 UTC (16 years, 6 months ago) by root
Branch: MAIN
CVS Tags: rel-4_22
Changes since 1.17: +4 -2 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 Coro::Util - various utility functions.
4
5 =head1 SYNOPSIS
6
7 use Coro::Util;
8
9 =head1 DESCRIPTION
10
11 This module implements various utility functions, mostly replacing perl
12 functions by non-blocking counterparts.
13
14 This module is an AnyEvent user. Refer to the L<AnyEvent|AnyEvent>
15 documentation to see how to integrate it into your own programs.
16
17 =over 4
18
19 =cut
20
21 package Coro::Util;
22
23 use strict;
24
25 no warnings "uninitialized";
26
27 use AnyEvent;
28
29 use Coro::State;
30 use Coro::Handle;
31 use Coro::Storable ();
32 use Coro::Semaphore;
33
34 use base 'Exporter';
35
36 our @EXPORT = qw(gethostbyname gethostbyaddr);
37 our @EXPORT_OK = qw(inet_aton fork_eval);
38
39 our $VERSION = 2.0;
40
41 our $MAXPARALLEL = 16; # max. number of parallel jobs
42
43 my $jobs = new Coro::Semaphore $MAXPARALLEL;
44
45 sub _do_asy(&;@) {
46 my $sub = shift;
47 $jobs->down;
48 my $fh;
49
50 my $pid = open $fh, "-|";
51
52 if (!defined $pid) {
53 die "fork: $!";
54 } elsif (!$pid) {
55 syswrite STDOUT, join "\0", map { unpack "H*", $_ } &$sub;
56 Coro::State::_exit 0;
57 }
58
59 my $buf;
60 my $current = $Coro::current;
61 my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
62 sysread $fh, $buf, 16384, length $buf
63 and return;
64
65 undef $w;
66 $current->ready;
67 });
68
69 &Coro::schedule;
70 &Coro::schedule while $w;
71
72 $jobs->up;
73 my @r = map { pack "H*", $_ } split /\0/, $buf;
74 wantarray ? @r : $r[0];
75 }
76
77 sub dotted_quad($) {
78 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
79 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
80 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
81 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
82 }
83
84 =item gethostbyname, gethostbyaddr
85
86 Work exactly like their perl counterparts, but do not block. Currently
87 this is being implemented with forking, so it's not exactly low-cost.
88
89 =cut
90
91 my $netdns = eval { die; require Net::DNS::Resolver; new Net::DNS::Resolver; };
92
93 sub gethostbyname($) {
94 my $model = AnyEvent::detect;
95 if ($netdns) {
96 #$netdns->query($_[0]);
97 die;
98 } elsif ($model eq "AnyEvent::Impl::CoroEV" or $model eq "AnyEvent::Impl::EV") {
99 require EV::DNS;
100 require Socket;
101
102 my $current = $Coro::current;
103 my ($result, @ptrs);
104
105 EV::DNS::resolve_ipv4 ($_[0], 0, sub {
106 ($result, undef, undef, @ptrs) = @_;
107 $current->ready;
108 });
109 Coro::schedule while !defined $result;
110 return @ptrs
111 ? ($_[0], undef, &Socket::AF_INET, 4, @ptrs)
112 : ();
113 } else {
114 return _do_asy { gethostbyname $_[0] } @_
115 }
116 }
117
118 sub gethostbyaddr($$) {
119 if ($netdns) {
120 die;
121 } else {
122 _do_asy { gethostbyaddr $_[0], $_[1] } @_
123 }
124 }
125
126 =item Coro::Util::inet_aton
127
128 Works almost exactly like its Socket counterpart, except that it does not
129 block. Is implemented with forking, so not exactly low-cost.
130
131 =cut
132
133 use Socket;
134
135 our $inet_aton = \&Socket::inet_aton;
136
137 sub inet_aton {
138 require Socket;
139
140 my $model = AnyEvent::detect;
141 if (dotted_quad $_[0]) {
142 $inet_aton->($_[0])
143 } elsif ($model eq "AnyEvent::Impl::CoroEV" or $model eq "AnyEvent::Impl::EV") {
144 require EV::DNS;
145 require Socket;
146
147 my $current = $Coro::current;
148 my ($result, @ptrs);
149
150 EV::DNS::resolve_ipv4 ($_[0], 0, sub {
151 ($result, undef, undef, @ptrs) = @_;
152 $current->ready;
153 });
154 Coro::schedule while !defined $result;
155 return $ptrs[0];
156 } else {
157 return _do_asy { $inet_aton->($_[0]) } @_
158 }
159 }
160
161 =item @result = Coro::Util::fork_eval { ... }, @args
162
163 Executes the given code block or code reference with the given arguments
164 in a separate process, returning the results. The return values must be
165 serialisable with Coro::Storable. It may, of course, block.
166
167 Note that using event handling in the sub is not usually a good idea as
168 you will inherit a mixed set of watchers from the parent.
169
170 Exceptions will be correctly forwarded to the caller.
171
172 This function is useful for pushing cpu-intensive computations into a
173 different process, for example to take advantage of multiple CPU's. Its
174 also useful if you want to simply run some blocking functions (such as
175 C<system()>) and do not care about the overhead enough to code your own
176 pid watcher etc.
177
178 This function might keep a pool of processes in some future version, as
179 fork can be rather slow in large processes.
180
181 Example: execute some external program (convert image to rgba raw form)
182 and add a long computation (extract the alpha channel) in a separate
183 process, making sure that never more then $NUMCPUS processes are being
184 run.
185
186 my $cpulock = new Coro::Semaphore $NUMCPUS;
187
188 sub do_it {
189 my ($path) = @_;
190
191 my $guard = $cpulock->guard;
192
193 Coro::Util::fork_eval {
194 open my $fh, "convert -depth 8 \Q$path\E rgba:"
195 or die "$path: $!";
196
197 local $/;
198 # make my eyes hurt
199 pack "C*", unpack "(xxxC)*", <$fh>
200 }
201 }
202
203 my $alphachannel = do_it "/tmp/img.png";
204
205 =cut
206
207 sub fork_eval(&@) {
208 my ($cb, @args) = @_;
209
210 pipe my $fh1, my $fh2
211 or die "pipe: $!";
212
213 my $pid = fork;
214
215 if ($pid) {
216 undef $fh2;
217
218 my $res = Coro::Storable::thaw +(Coro::Handle::unblock $fh1)->readline (undef);
219 waitpid $pid, 0; # should not block, we expect the child to simply behave
220
221 die $$res unless "ARRAY" eq ref $res;
222
223 return wantarray ? @$res : $res->[-1];
224
225 } elsif (defined $pid) {
226 delete $SIG{__WARN__};
227 delete $SIG{__DIE__};
228 # just in case, this hack effectively disables event processing
229 # in the child. cleaner and slower would be to canceling all
230 # event watchers, but we are event-model agnostic.
231 undef $Coro::idle;
232 $Coro::current->prio (Coro::PRIO_MAX);
233
234 eval {
235 undef $fh1;
236
237 my @res = eval { $cb->(@args) };
238
239 open my $fh, ">", \my $buf
240 or die "fork_eval: cannot open fh-to-buf in child: $!";
241 Storable::store_fd $@ ? \"$@" : \@res, $fh;
242 close $fh;
243
244 syswrite $fh2, $buf;
245 close $fh2;
246 };
247
248 warn $@ if $@;
249 Coro::State::_exit 0;
250
251 } else {
252 die "fork_eval: $!";
253 }
254 }
255
256 # make sure store_fd is preloaded
257 eval { Storable::store_fd undef, undef };
258
259 1;
260
261 =back
262
263 =head1 AUTHOR
264
265 Marc Lehmann <schmorp@schmorp.de>
266 http://home.schmorp.de/
267
268 =cut
269