ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Util.pm
Revision: 1.28
Committed: Fri May 30 21:34:52 2008 UTC (16 years ago) by root
Branch: MAIN
CVS Tags: rel-4_741
Changes since 1.27: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 Coro::Util - various utility functions.
4
5 =head1 SYNOPSIS
6
7 use Coro::Util;
8
9 =head1 DESCRIPTION
10
11 This module implements various utility functions, mostly replacing perl
12 functions by non-blocking counterparts.
13
14 This module is an AnyEvent user. Refer to the L<AnyEvent>
15 documentation to see how to integrate it into your own programs.
16
17 =over 4
18
19 =cut
20
21 package Coro::Util;
22
23 no warnings;
24 use strict;
25
26 use Socket ();
27
28 use AnyEvent ();
29 use AnyEvent::Socket ();
30
31 use Coro::State;
32 use Coro::Handle;
33 use Coro::Storable ();
34 use Coro::Semaphore;
35
36 use base 'Exporter';
37
38 our @EXPORT = qw(gethostbyname gethostbyaddr);
39 our @EXPORT_OK = qw(inet_aton fork_eval);
40
41 our $VERSION = 4.741;
42
43 our $MAXPARALLEL = 16; # max. number of parallel jobs
44
45 my $jobs = new Coro::Semaphore $MAXPARALLEL;
46
47 sub _do_asy(&;@) {
48 my $sub = shift;
49 $jobs->down;
50 my $fh;
51
52 my $pid = open $fh, "-|";
53
54 if (!defined $pid) {
55 die "fork: $!";
56 } elsif (!$pid) {
57 syswrite STDOUT, join "\0", map { unpack "H*", $_ } &$sub;
58 Coro::State::_exit 0;
59 }
60
61 my $buf;
62 my $current = $Coro::current;
63 my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
64 sysread $fh, $buf, 16384, length $buf
65 and return;
66
67 undef $w;
68 $current->ready;
69 });
70
71 &Coro::schedule;
72 &Coro::schedule while $w;
73
74 $jobs->up;
75 my @r = map { pack "H*", $_ } split /\0/, $buf;
76 wantarray ? @r : $r[0];
77 }
78
79 =item $ipn = Coro::Util::inet_aton $hostname || $ip
80
81 Works almost exactly like its AnyEvent::Socket counterpart, except that it does not
82 block.
83
84 =cut
85
86 sub inet_aton {
87 my $current = $Coro::current;
88 my @res;
89
90 AnyEvent::Socket::inet_aton $_[0], sub {
91 @res = shift;
92 $current->ready;
93 undef $current;
94 };
95
96 Coro::schedule while $current;
97
98 wantarray ? @res : $res[0]
99 }
100
101 =item gethostbyname, gethostbyaddr
102
103 Work similarly to their perl counterparts, but do not block. Uses
104 C<Anyevent::Util::inet_aton> internally.
105
106 =cut
107
108 sub gethostbyname($) {
109 my $current = $Coro::current;
110 my @res = inet_aton $_[0];
111
112 ($_[0], $_[0], &Socket::AF_INET, 4, map +(format_ip $_), grep length == 4, @res)
113 }
114
115 sub gethostbyaddr($$) {
116 _do_asy { gethostbyaddr $_[0], $_[1] } @_
117 }
118
119 =item @result = Coro::Util::fork_eval { ... }, @args
120
121 Executes the given code block or code reference with the given arguments
122 in a separate process, returning the results. The return values must be
123 serialisable with Coro::Storable. It may, of course, block.
124
125 Note that using event handling in the sub is not usually a good idea as
126 you will inherit a mixed set of watchers from the parent.
127
128 Exceptions will be correctly forwarded to the caller.
129
130 This function is useful for pushing cpu-intensive computations into a
131 different process, for example to take advantage of multiple CPU's. Its
132 also useful if you want to simply run some blocking functions (such as
133 C<system()>) and do not care about the overhead enough to code your own
134 pid watcher etc.
135
136 This function might keep a pool of processes in some future version, as
137 fork can be rather slow in large processes.
138
139 Example: execute some external program (convert image to rgba raw form)
140 and add a long computation (extract the alpha channel) in a separate
141 process, making sure that never more then $NUMCPUS processes are being
142 run.
143
144 my $cpulock = new Coro::Semaphore $NUMCPUS;
145
146 sub do_it {
147 my ($path) = @_;
148
149 my $guard = $cpulock->guard;
150
151 Coro::Util::fork_eval {
152 open my $fh, "convert -depth 8 \Q$path\E rgba:"
153 or die "$path: $!";
154
155 local $/;
156 # make my eyes hurt
157 pack "C*", unpack "(xxxC)*", <$fh>
158 }
159 }
160
161 my $alphachannel = do_it "/tmp/img.png";
162
163 =cut
164
165 sub fork_eval(&@) {
166 my ($cb, @args) = @_;
167
168 pipe my $fh1, my $fh2
169 or die "pipe: $!";
170
171 my $pid = fork;
172
173 if ($pid) {
174 undef $fh2;
175
176 my $res = Coro::Storable::thaw +(Coro::Handle::unblock $fh1)->readline (undef);
177 waitpid $pid, 0; # should not block, we expect the child to simply behave
178
179 die $$res unless "ARRAY" eq ref $res;
180
181 return wantarray ? @$res : $res->[-1];
182
183 } elsif (defined $pid) {
184 delete $SIG{__WARN__};
185 delete $SIG{__DIE__};
186 # just in case, this hack effectively disables event processing
187 # in the child. cleaner and slower would be to canceling all
188 # event watchers, but we are event-model agnostic.
189 undef $Coro::idle;
190 $Coro::current->prio (Coro::PRIO_MAX);
191
192 eval {
193 undef $fh1;
194
195 my @res = eval { $cb->(@args) };
196
197 open my $fh, ">", \my $buf
198 or die "fork_eval: cannot open fh-to-buf in child: $!";
199 Storable::store_fd $@ ? \"$@" : \@res, $fh;
200 close $fh;
201
202 syswrite $fh2, $buf;
203 close $fh2;
204 };
205
206 warn $@ if $@;
207 Coro::State::_exit 0;
208
209 } else {
210 die "fork_eval: $!";
211 }
212 }
213
214 # make sure store_fd is preloaded
215 eval { Storable::store_fd undef, undef };
216
217 1;
218
219 =back
220
221 =head1 AUTHOR
222
223 Marc Lehmann <schmorp@schmorp.de>
224 http://home.schmorp.de/
225
226 =cut
227