ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Util.pm
Revision: 1.24
Committed: Sun May 25 03:05:42 2008 UTC (16 years ago) by root
Branch: MAIN
CVS Tags: rel-4_72
Changes since 1.23: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 Coro::Util - various utility functions.
4
5 =head1 SYNOPSIS
6
7 use Coro::Util;
8
9 =head1 DESCRIPTION
10
11 This module implements various utility functions, mostly replacing perl
12 functions by non-blocking counterparts.
13
14 This module is an AnyEvent user. Refer to the L<AnyEvent>
15 documentation to see how to integrate it into your own programs.
16
17 =over 4
18
19 =cut
20
21 package Coro::Util;
22
23 use strict;
24
25 no warnings "uninitialized";
26
27 use Socket ();
28
29 use AnyEvent ();
30 use AnyEvent::Socket ();
31
32 use Coro::State;
33 use Coro::Handle;
34 use Coro::Storable ();
35 use Coro::Semaphore;
36
37 use base 'Exporter';
38
39 our @EXPORT = qw(gethostbyname gethostbyaddr);
40 our @EXPORT_OK = qw(inet_aton fork_eval);
41
42 our $VERSION = 4.72;
43
44 our $MAXPARALLEL = 16; # max. number of parallel jobs
45
46 my $jobs = new Coro::Semaphore $MAXPARALLEL;
47
48 sub _do_asy(&;@) {
49 my $sub = shift;
50 $jobs->down;
51 my $fh;
52
53 my $pid = open $fh, "-|";
54
55 if (!defined $pid) {
56 die "fork: $!";
57 } elsif (!$pid) {
58 syswrite STDOUT, join "\0", map { unpack "H*", $_ } &$sub;
59 Coro::State::_exit 0;
60 }
61
62 my $buf;
63 my $current = $Coro::current;
64 my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
65 sysread $fh, $buf, 16384, length $buf
66 and return;
67
68 undef $w;
69 $current->ready;
70 });
71
72 &Coro::schedule;
73 &Coro::schedule while $w;
74
75 $jobs->up;
76 my @r = map { pack "H*", $_ } split /\0/, $buf;
77 wantarray ? @r : $r[0];
78 }
79
80 =item $ipn = Coro::Util::inet_aton $hostname || $ip
81
82 Works almost exactly like its AnyEvent::Socket counterpart, except that it does not
83 block.
84
85 =cut
86
87 sub inet_aton {
88 my $current = $Coro::current;
89 my @res;
90
91 AnyEvent::Socket::inet_aton $_[0], sub {
92 @res = shift;
93 $current->ready;
94 undef $current;
95 };
96
97 Coro::schedule while $current;
98
99 wantarray ? @res : $res[0]
100 }
101
102 =item gethostbyname, gethostbyaddr
103
104 Work similarly to their perl counterparts, but do not block. Uses
105 C<Anyevent::Util::inet_aton> internally.
106
107 =cut
108
109 sub gethostbyname($) {
110 my $current = $Coro::current;
111 my @res = inet_aton $_[0];
112
113 ($_[0], $_[0], &Socket::AF_INET, 4, map +(format_ip $_), grep length == 4, @res)
114 }
115
116 sub gethostbyaddr($$) {
117 _do_asy { gethostbyaddr $_[0], $_[1] } @_
118 }
119
120 =item @result = Coro::Util::fork_eval { ... }, @args
121
122 Executes the given code block or code reference with the given arguments
123 in a separate process, returning the results. The return values must be
124 serialisable with Coro::Storable. It may, of course, block.
125
126 Note that using event handling in the sub is not usually a good idea as
127 you will inherit a mixed set of watchers from the parent.
128
129 Exceptions will be correctly forwarded to the caller.
130
131 This function is useful for pushing cpu-intensive computations into a
132 different process, for example to take advantage of multiple CPU's. Its
133 also useful if you want to simply run some blocking functions (such as
134 C<system()>) and do not care about the overhead enough to code your own
135 pid watcher etc.
136
137 This function might keep a pool of processes in some future version, as
138 fork can be rather slow in large processes.
139
140 Example: execute some external program (convert image to rgba raw form)
141 and add a long computation (extract the alpha channel) in a separate
142 process, making sure that never more then $NUMCPUS processes are being
143 run.
144
145 my $cpulock = new Coro::Semaphore $NUMCPUS;
146
147 sub do_it {
148 my ($path) = @_;
149
150 my $guard = $cpulock->guard;
151
152 Coro::Util::fork_eval {
153 open my $fh, "convert -depth 8 \Q$path\E rgba:"
154 or die "$path: $!";
155
156 local $/;
157 # make my eyes hurt
158 pack "C*", unpack "(xxxC)*", <$fh>
159 }
160 }
161
162 my $alphachannel = do_it "/tmp/img.png";
163
164 =cut
165
166 sub fork_eval(&@) {
167 my ($cb, @args) = @_;
168
169 pipe my $fh1, my $fh2
170 or die "pipe: $!";
171
172 my $pid = fork;
173
174 if ($pid) {
175 undef $fh2;
176
177 my $res = Coro::Storable::thaw +(Coro::Handle::unblock $fh1)->readline (undef);
178 waitpid $pid, 0; # should not block, we expect the child to simply behave
179
180 die $$res unless "ARRAY" eq ref $res;
181
182 return wantarray ? @$res : $res->[-1];
183
184 } elsif (defined $pid) {
185 delete $SIG{__WARN__};
186 delete $SIG{__DIE__};
187 # just in case, this hack effectively disables event processing
188 # in the child. cleaner and slower would be to canceling all
189 # event watchers, but we are event-model agnostic.
190 undef $Coro::idle;
191 $Coro::current->prio (Coro::PRIO_MAX);
192
193 eval {
194 undef $fh1;
195
196 my @res = eval { $cb->(@args) };
197
198 open my $fh, ">", \my $buf
199 or die "fork_eval: cannot open fh-to-buf in child: $!";
200 Storable::store_fd $@ ? \"$@" : \@res, $fh;
201 close $fh;
202
203 syswrite $fh2, $buf;
204 close $fh2;
205 };
206
207 warn $@ if $@;
208 Coro::State::_exit 0;
209
210 } else {
211 die "fork_eval: $!";
212 }
213 }
214
215 # make sure store_fd is preloaded
216 eval { Storable::store_fd undef, undef };
217
218 1;
219
220 =back
221
222 =head1 AUTHOR
223
224 Marc Lehmann <schmorp@schmorp.de>
225 http://home.schmorp.de/
226
227 =cut
228