ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Util.pm
Revision: 1.47
Committed: Mon Nov 24 07:55:28 2008 UTC (15 years, 6 months ago) by root
Branch: MAIN
CVS Tags: rel-5_1, rel-5_11
Changes since 1.46: +1 -1 lines
Log Message:
5.1

File Contents

# Content
1 =head1 NAME
2
3 Coro::Util - various utility functions.
4
5 =head1 SYNOPSIS
6
7 use Coro::Util;
8
9 =head1 DESCRIPTION
10
11 This module implements various utility functions, mostly replacing perl
12 functions by non-blocking counterparts.
13
14 This module is an AnyEvent user. Refer to the L<AnyEvent>
15 documentation to see how to integrate it into your own programs.
16
17 =over 4
18
19 =cut
20
21 package Coro::Util;
22
23 no warnings;
24 use strict;
25
26 use Socket ();
27
28 use AnyEvent ();
29 use AnyEvent::Socket ();
30
31 use Coro::State;
32 use Coro::Handle;
33 use Coro::Storable ();
34 use Coro::Semaphore;
35
36 use base 'Exporter';
37
38 our @EXPORT = qw(gethostbyname gethostbyaddr);
39 our @EXPORT_OK = qw(inet_aton fork_eval);
40
41 our $VERSION = 5.1;
42
43 our $MAXPARALLEL = 16; # max. number of parallel jobs
44
45 my $jobs = new Coro::Semaphore $MAXPARALLEL;
46
47 sub _do_asy(&;@) {
48 my $sub = shift;
49 $jobs->down;
50 my $fh;
51
52 my $pid = open $fh, "-|";
53
54 if (!defined $pid) {
55 die "fork: $!";
56 } elsif (!$pid) {
57 syswrite STDOUT, join "\0", map { unpack "H*", $_ } &$sub;
58 Coro::State::_exit 0;
59 }
60
61 my $buf;
62 my $wakeup = Coro::rouse_cb;
63 my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
64 sysread $fh, $buf, 16384, length $buf
65 and return;
66
67 undef $w;
68 $wakeup->();
69 });
70
71 Coro::rouse_wait;
72
73 $jobs->up;
74 my @r = map { pack "H*", $_ } split /\0/, $buf;
75 wantarray ? @r : $r[0];
76 }
77
78 =item $ipn = Coro::Util::inet_aton $hostname || $ip
79
80 Works almost exactly like its AnyEvent::Socket counterpart, except that it
81 does not block other coroutines and returns the results.
82
83 =cut
84
85 sub inet_aton {
86 AnyEvent::Socket::inet_aton $_[0], Coro::rouse_cb;
87 my @res = Coro::rouse_wait;
88 wantarray ? @res : $res[0]
89 }
90
91 =item gethostbyname, gethostbyaddr
92
93 Work similarly to their perl counterparts, but do not block. Uses
94 C<Anyevent::Util::inet_aton> internally.
95
96 =cut
97
98 sub gethostbyname($) {
99 my @res = inet_aton $_[0];
100
101 ($_[0], $_[0], &Socket::AF_INET, 4, map +(format_ip $_), grep length == 4, @res)
102 }
103
104 sub gethostbyaddr($$) {
105 _do_asy { gethostbyaddr $_[0], $_[1] } @_
106 }
107
108 =item @result = Coro::Util::fork_eval { ... }, @args
109
110 Executes the given code block or code reference with the given arguments
111 in a separate process, returning the results. The return values must be
112 serialisable with Coro::Storable. It may, of course, block.
113
114 Note that using event handling in the sub is not usually a good idea as
115 you will inherit a mixed set of watchers from the parent.
116
117 Exceptions will be correctly forwarded to the caller.
118
119 This function is useful for pushing cpu-intensive computations into a
120 different process, for example to take advantage of multiple CPU's. Its
121 also useful if you want to simply run some blocking functions (such as
122 C<system()>) and do not care about the overhead enough to code your own
123 pid watcher etc.
124
125 This function might keep a pool of processes in some future version, as
126 fork can be rather slow in large processes.
127
128 Example: execute some external program (convert image to rgba raw form)
129 and add a long computation (extract the alpha channel) in a separate
130 process, making sure that never more then $NUMCPUS processes are being
131 run.
132
133 my $cpulock = new Coro::Semaphore $NUMCPUS;
134
135 sub do_it {
136 my ($path) = @_;
137
138 my $guard = $cpulock->guard;
139
140 Coro::Util::fork_eval {
141 open my $fh, "convert -depth 8 \Q$path\E rgba:"
142 or die "$path: $!";
143
144 local $/;
145 # make my eyes hurt
146 pack "C*", unpack "(xxxC)*", <$fh>
147 }
148 }
149
150 my $alphachannel = do_it "/tmp/img.png";
151
152 =cut
153
154 sub fork_eval(&@) {
155 my ($cb, @args) = @_;
156
157 pipe my $fh1, my $fh2
158 or die "pipe: $!";
159
160 my $pid = fork;
161
162 if ($pid) {
163 undef $fh2;
164
165 my $res = Coro::Storable::thaw +(Coro::Handle::unblock $fh1)->readline (undef);
166 waitpid $pid, 0; # should not block, we expect the child to simply behave
167
168 die $$res unless "ARRAY" eq ref $res;
169
170 return wantarray ? @$res : $res->[-1];
171
172 } elsif (defined $pid) {
173 delete $SIG{__WARN__};
174 delete $SIG{__DIE__};
175 # just in case, this hack effectively disables event processing
176 # in the child. cleaner and slower would be to canceling all
177 # event watchers, but we are event-model agnostic.
178 undef $Coro::idle;
179 $Coro::current->prio (Coro::PRIO_MAX);
180
181 eval {
182 undef $fh1;
183
184 my @res = eval { $cb->(@args) };
185
186 open my $fh, ">", \my $buf
187 or die "fork_eval: cannot open fh-to-buf in child: $!";
188 Storable::store_fd $@ ? \"$@" : \@res, $fh;
189 close $fh;
190
191 syswrite $fh2, $buf;
192 close $fh2;
193 };
194
195 warn $@ if $@;
196 Coro::State::_exit 0;
197
198 } else {
199 die "fork_eval: $!";
200 }
201 }
202
203 # make sure store_fd is preloaded
204 eval { Storable::store_fd undef, undef };
205
206 1;
207
208 =back
209
210 =head1 AUTHOR
211
212 Marc Lehmann <schmorp@schmorp.de>
213 http://home.schmorp.de/
214
215 =cut
216