ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Util.pm
Revision: 1.13
Committed: Sat Oct 27 14:07:31 2007 UTC (16 years, 7 months ago) by root
Branch: MAIN
Changes since 1.12: +0 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 Coro::Util - various utility functions.
4
5 =head1 SYNOPSIS
6
7 use Coro::Util;
8
9 =head1 DESCRIPTION
10
11 This module implements various utility functions, mostly replacing perl
12 functions by non-blocking counterparts.
13
14 This module is an AnyEvent user. Refer to the L<AnyEvent|AnyEvent>
15 documentation to see how to integrate it into your own programs.
16
17 =over 4
18
19 =cut
20
21 package Coro::Util;
22
23 use strict;
24
25 no warnings "uninitialized";
26
27 use AnyEvent;
28
29 use Coro::State;
30 use Coro::Handle;
31 use Coro::Storable ();
32 use Coro::Semaphore;
33
34 use base 'Exporter';
35
36 our @EXPORT = qw(gethostbyname gethostbyaddr);
37 our @EXPORT_OK = qw(inet_aton fork_eval);
38
39 our $VERSION = 2.0;
40
41 our $MAXPARALLEL = 16; # max. number of parallel jobs
42
43 my $jobs = new Coro::Semaphore $MAXPARALLEL;
44
45 sub _do_asy(&;@) {
46 my $sub = shift;
47 $jobs->down;
48 my $fh;
49
50 my $pid = open $fh, "-|";
51
52 if (!defined $pid) {
53 die "fork: $!";
54 } elsif (!$pid) {
55 syswrite STDOUT, join "\0", map { unpack "H*", $_ } &$sub;
56 Coro::State::_exit 0;
57 }
58
59 my $buf;
60 my $current = $Coro::current;
61 my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
62 sysread $fh, $buf, 16384, length $buf
63 and return;
64
65 undef $w;
66 $current->ready;
67 });
68
69 &Coro::schedule;
70 &Coro::schedule while $w;
71
72 $jobs->up;
73 my @r = map { pack "H*", $_ } split /\0/, $buf;
74 wantarray ? @r : $r[0];
75 }
76
77 sub dotted_quad($) {
78 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
79 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
80 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
81 \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
82 }
83
84 =item gethostbyname, gethostbyaddr
85
86 Work exactly like their perl counterparts, but do not block. Currently
87 this is being implemented with forking, so it's not exactly low-cost.
88
89 =cut
90
91 my $netdns = eval { die; require Net::DNS::Resolver; new Net::DNS::Resolver; };
92
93 sub gethostbyname($) {
94 if ($netdns) {
95 #$netdns->query($_[0]);
96 die;
97 } else {
98 _do_asy { gethostbyname $_[0] } @_
99 }
100 }
101
102 sub gethostbyaddr($$) {
103 if ($netdns) {
104 die;
105 } else {
106 _do_asy { gethostbyaddr $_[0], $_[1] } @_
107 }
108 }
109
110 =item Coro::Util::inet_aton
111
112 Works almost exactly like its Socket counterpart, except that it does not
113 block. Is implemented with forking, so not exactly low-cost.
114
115 =cut
116
117 use Socket;
118
119 our $inet_aton = \&Socket::inet_aton;
120
121 sub inet_aton {
122 require Socket;
123
124 if (dotted_quad $_[0]) {
125 $inet_aton->($_[0])
126 } else {
127 _do_asy { $inet_aton->($_[0]) } @_
128 }
129 }
130
131 =item @result = Coro::Util::fork_eval { ... }, @args
132
133 Executes the given code block or code reference with the given arguments
134 in a separate process, returning the results. The return values must be
135 serialisable with Coro::Storable. It may, of course, block.
136
137 Note that using event handling in the sub is not usually a good idea as
138 you will inherit a mixed set of watchers from the parent.
139
140 Exceptions will be correctly forwarded to the caller.
141
142 This function is useful for pushing cpu-intensive computations into a
143 different process, for example to take advantage of multiple CPU's. Its
144 also useful if you want to simply run some blocking functions (such as
145 C<system()>) and do not care about the overhead enough to code your own
146 pid watcher etc.
147
148 This function might keep a pool of processes in some future version, as
149 fork can be rather slow in large processes.
150
151 Example: execute some external program (convert image to rgba raw form)
152 and add a long computation (extract the alpha channel) in a separate
153 process, making sure that never more then $NUMCPUS processes are being
154 run.
155
156 my $cpulock = new Coro::Semaphore $NUMCPUS;
157
158 sub do_it {
159 my ($path) = @_;
160
161 my $guard = $cpulock->guard;
162
163 Coro::Util::fork_eval {
164 open my $fh, "convert -depth 8 \Q$path\E rgba:"
165 or die "$path: $!";
166
167 local $/;
168 # make my eyes hurt
169 pack "C*", unpack "(xxxC)*", <$fh>
170 }
171 }
172
173 my $alphachannel = do_it "/tmp/img.png";
174
175 =cut
176
177 sub fork_eval(&@) {
178 my ($cb, @args) = @_;
179
180 pipe my $fh1, my $fh2
181 or die "pipe: $!";
182
183 my $pid = fork;
184
185 if ($pid) {
186 undef $fh2;
187
188 my $res = Coro::Storable::thaw +(Coro::Handle::unblock $fh1)->readline (undef);
189 waitpid $pid, 0; # should not block, we expect the child to simply behave
190
191 die $$res unless "ARRAY" eq ref $res;
192
193 return wantarray ? @$res : $res->[-1];
194
195 } elsif (defined $pid) {
196 delete $SIG{__WARN__};
197 delete $SIG{__DIE__};
198 # just in case, this hack effectively disables event processing
199 # in the child. cleaner and slower would be to canceling all
200 # event watchers, but we are event-model agnostic.
201 undef $Coro::idle;
202 $Coro::current->prio (Coro::PRIO_MAX);
203
204 eval {
205 undef $fh1;
206
207 my @res = eval { $cb->(@args) };
208
209 open my $fh, ">", \my $buf
210 or die "fork_eval: cannot open fh-to-buf in child: $!";
211 Storable::store_fd $@ ? \"$@" : \@res, $fh;
212 close $fh;
213
214 syswrite $fh2, $buf;
215 close $fh2;
216 };
217
218 warn $@ if $@;
219 Coro::State::_exit 0;
220
221 } else {
222 die "fork_eval: $!";
223 }
224 }
225
226 1;
227
228 =back
229
230 =head1 AUTHOR
231
232 Marc Lehmann <schmorp@schmorp.de>
233 http://home.schmorp.de/
234
235 =cut
236