ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Util.pm
Revision: 1.12
Committed: Tue Oct 23 01:48:33 2007 UTC (16 years, 7 months ago) by root
Branch: MAIN
CVS Tags: rel-4_13
Changes since 1.11: +5 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     Coro::Util - various utility functions.
4    
5     =head1 SYNOPSIS
6    
7     use Coro::Util;
8    
9     =head1 DESCRIPTION
10    
11     This module implements various utility functions, mostly replacing perl
12     functions by non-blocking counterparts.
13    
14 root 1.4 This module is an AnyEvent user. Refer to the L<AnyEvent|AnyEvent>
15     documentation to see how to integrate it into your own programs.
16    
17 root 1.1 =over 4
18    
19     =cut
20    
21     package Coro::Util;
22    
23 root 1.5 use strict;
24    
25 root 1.4 no warnings "uninitialized";
26    
27     use AnyEvent;
28 root 1.1
29 root 1.4 use Coro::State;
30 root 1.8 use Coro::Handle;
31     use Coro::Storable ();
32 root 1.1 use Coro::Semaphore;
33    
34     use base 'Exporter';
35    
36 root 1.5 our @EXPORT = qw(gethostbyname gethostbyaddr);
37 root 1.8 our @EXPORT_OK = qw(inet_aton fork_eval);
38 root 1.1
39 root 1.5 our $VERSION = 2.0;
40 root 1.1
41 root 1.5 our $MAXPARALLEL = 16; # max. number of parallel jobs
42 root 1.1
43     my $jobs = new Coro::Semaphore $MAXPARALLEL;
44    
45     sub _do_asy(&;@) {
46     my $sub = shift;
47     $jobs->down;
48     my $fh;
49 root 1.6
50 root 1.12 my $pid = open $fh, "-|";
51    
52     if (!defined $pid) {
53     die "fork: $!";
54     } elsif (!$pid) {
55 root 1.1 syswrite STDOUT, join "\0", map { unpack "H*", $_ } &$sub;
56 root 1.4 Coro::State::_exit 0;
57 root 1.1 }
58 root 1.6
59 root 1.1 my $buf;
60 root 1.4 my $current = $Coro::current;
61     my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
62     sysread $fh, $buf, 16384, length $buf
63     and return;
64    
65     undef $w;
66     $current->ready;
67     });
68 root 1.6
69     &Coro::schedule;
70     &Coro::schedule while $w;
71    
72 root 1.1 $jobs->up;
73     my @r = map { pack "H*", $_ } split /\0/, $buf;
74     wantarray ? @r : $r[0];
75     }
76    
77 root 1.5 sub dotted_quad($) {
78 root 1.11 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
79     \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
80     \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
81     \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
82 root 1.5 }
83    
84 root 1.1 =item gethostbyname, gethostbyaddr
85    
86     Work exactly like their perl counterparts, but do not block. Currently
87 root 1.5 this is being implemented with forking, so it's not exactly low-cost.
88 root 1.1
89     =cut
90    
91 root 1.4 my $netdns = eval { die; require Net::DNS::Resolver; new Net::DNS::Resolver; };
92    
93 root 1.1 sub gethostbyname($) {
94 root 1.4 if ($netdns) {
95     #$netdns->query($_[0]);
96     die;
97     } else {
98 root 1.5 _do_asy { gethostbyname $_[0] } @_
99 root 1.4 }
100 root 1.1 }
101    
102     sub gethostbyaddr($$) {
103 root 1.4 if ($netdns) {
104     die;
105     } else {
106 root 1.5 _do_asy { gethostbyaddr $_[0], $_[1] } @_
107     }
108     }
109    
110     =item Coro::Util::inet_aton
111    
112     Works almost exactly like its Socket counterpart, except that it does not
113     block. Is implemented with forking, so not exactly low-cost.
114    
115     =cut
116    
117     use Socket;
118    
119     our $inet_aton = \&Socket::inet_aton;
120    
121     sub inet_aton {
122     require Socket;
123    
124     if (dotted_quad $_[0]) {
125     $inet_aton->($_[0])
126     } else {
127     _do_asy { $inet_aton->($_[0]) } @_
128 root 1.4 }
129 root 1.1 }
130    
131 root 1.8 =item @result = Coro::Util::fork_eval { ... }, @args
132    
133     Executes the given code block or code reference with the given arguments
134     in a separate process, returning the results. The return values must be
135     serialisable with Coro::Storable. It may, of course, block.
136    
137 root 1.10 Note that using event handling in the sub is not usually a good idea as
138     you will inherit a mixed set of watchers from the parent.
139 root 1.8
140     Exceptions will be correctly forwarded to the caller.
141    
142     This function is useful for pushing cpu-intensive computations into a
143     different process, for example to take advantage of multiple CPU's. Its
144     also useful if you want to simply run some blocking functions (such as
145     C<system()>) and do not care about the overhead enough to code your own
146     pid watcher etc.
147    
148 root 1.10 This function might keep a pool of processes in some future version, as
149     fork can be rather slow in large processes.
150    
151 root 1.8 Example: execute some external program (convert image to rgba raw form)
152     and add a long computation (extract the alpha channel) in a separate
153     process, making sure that never more then $NUMCPUS processes are being
154     run.
155    
156     my $cpulock = new Coro::Semaphore $NUMCPUS;
157    
158     sub do_it {
159     my ($path) = @_;
160    
161     my $guard = $cpulock->guard;
162    
163     Coro::Util::fork_eval {
164     open my $fh, "convert -depth 8 \Q$path\E rgba:"
165     or die "$path: $!";
166    
167     local $/;
168     # make my eyes hurt
169     pack "C*", unpack "(xxxC)*", <$fh>
170     }
171     }
172    
173     my $alphachannel = do_it "/tmp/img.png";
174    
175     =cut
176    
177     sub fork_eval(&@) {
178     my ($cb, @args) = @_;
179    
180     pipe my $fh1, my $fh2
181     or die "pipe: $!";
182    
183     my $pid = fork;
184    
185     if ($pid) {
186     undef $fh2;
187    
188     my $res = Coro::Storable::thaw +(Coro::Handle::unblock $fh1)->readline (undef);
189     waitpid $pid, 0; # should not block, we expect the child to simply behave
190    
191     die $$res unless "ARRAY" eq ref $res;
192    
193     return wantarray ? @$res : $res->[-1];
194    
195     } elsif (defined $pid) {
196     delete $SIG{__WARN__};
197     delete $SIG{__DIE__};
198 root 1.9 Coro::killall;
199 root 1.8 # just in case, this hack effectively disables event processing
200 root 1.9 # in the child. cleaner and slower would be to canceling all
201     # event watchers, but we are event-model agnostic.
202 root 1.8 undef $Coro::idle;
203     $Coro::current->prio (Coro::PRIO_MAX);
204    
205     eval {
206     undef $fh1;
207    
208     my @res = eval { $cb->(@args) };
209    
210     open my $fh, ">", \my $buf
211     or die "fork_eval: cannot open fh-to-buf in child: $!";
212     Storable::store_fd $@ ? \"$@" : \@res, $fh;
213     close $fh;
214    
215     syswrite $fh2, $buf;
216     close $fh2;
217     };
218    
219     warn $@ if $@;
220     Coro::State::_exit 0;
221    
222     } else {
223     die "fork_eval: $!";
224     }
225     }
226    
227 root 1.1 1;
228    
229 root 1.7 =back
230    
231 root 1.1 =head1 AUTHOR
232    
233 root 1.4 Marc Lehmann <schmorp@schmorp.de>
234     http://home.schmorp.de/
235 root 1.1
236     =cut
237