ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Util.pm
Revision: 1.10
Committed: Wed Oct 3 01:48:06 2007 UTC (16 years, 8 months ago) by root
Branch: MAIN
CVS Tags: rel-4_0, rel-4_01, rel-4_03, rel-4_02
Changes since 1.9: +5 -2 lines
Log Message:
temporary check-in, non-working version

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     Coro::Util - various utility functions.
4    
5     =head1 SYNOPSIS
6    
7     use Coro::Util;
8    
9     =head1 DESCRIPTION
10    
11     This module implements various utility functions, mostly replacing perl
12     functions by non-blocking counterparts.
13    
14 root 1.4 This module is an AnyEvent user. Refer to the L<AnyEvent|AnyEvent>
15     documentation to see how to integrate it into your own programs.
16    
17 root 1.1 =over 4
18    
19     =cut
20    
21     package Coro::Util;
22    
23 root 1.5 use strict;
24    
25 root 1.4 no warnings "uninitialized";
26    
27     use AnyEvent;
28 root 1.1
29 root 1.4 use Coro::State;
30 root 1.8 use Coro::Handle;
31     use Coro::Storable ();
32 root 1.1 use Coro::Semaphore;
33    
34     use base 'Exporter';
35    
36 root 1.5 our @EXPORT = qw(gethostbyname gethostbyaddr);
37 root 1.8 our @EXPORT_OK = qw(inet_aton fork_eval);
38 root 1.1
39 root 1.5 our $VERSION = 2.0;
40 root 1.1
41 root 1.5 our $MAXPARALLEL = 16; # max. number of parallel jobs
42 root 1.1
43     my $jobs = new Coro::Semaphore $MAXPARALLEL;
44    
45     sub _do_asy(&;@) {
46     my $sub = shift;
47     $jobs->down;
48     my $fh;
49 root 1.6
50 root 1.1 if (0 == open $fh, "-|") {
51     syswrite STDOUT, join "\0", map { unpack "H*", $_ } &$sub;
52 root 1.4 Coro::State::_exit 0;
53 root 1.1 }
54 root 1.6
55 root 1.1 my $buf;
56 root 1.4 my $current = $Coro::current;
57     my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
58     sysread $fh, $buf, 16384, length $buf
59     and return;
60    
61     undef $w;
62     $current->ready;
63     });
64 root 1.6
65     &Coro::schedule;
66     &Coro::schedule while $w;
67    
68 root 1.1 $jobs->up;
69     my @r = map { pack "H*", $_ } split /\0/, $buf;
70     wantarray ? @r : $r[0];
71     }
72    
73 root 1.5 sub dotted_quad($) {
74     $_[0] =~ /^(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
75     \.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
76     \.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
77     \.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/
78     }
79    
80 root 1.1 =item gethostbyname, gethostbyaddr
81    
82     Work exactly like their perl counterparts, but do not block. Currently
83 root 1.5 this is being implemented with forking, so it's not exactly low-cost.
84 root 1.1
85     =cut
86    
87 root 1.4 my $netdns = eval { die; require Net::DNS::Resolver; new Net::DNS::Resolver; };
88    
89 root 1.1 sub gethostbyname($) {
90 root 1.4 if ($netdns) {
91     #$netdns->query($_[0]);
92     die;
93     } else {
94 root 1.5 _do_asy { gethostbyname $_[0] } @_
95 root 1.4 }
96 root 1.1 }
97    
98     sub gethostbyaddr($$) {
99 root 1.4 if ($netdns) {
100     die;
101     } else {
102 root 1.5 _do_asy { gethostbyaddr $_[0], $_[1] } @_
103     }
104     }
105    
106     =item Coro::Util::inet_aton
107    
108     Works almost exactly like its Socket counterpart, except that it does not
109     block. Is implemented with forking, so not exactly low-cost.
110    
111     =cut
112    
113     use Socket;
114    
115     our $inet_aton = \&Socket::inet_aton;
116    
117     sub inet_aton {
118     require Socket;
119    
120     if (dotted_quad $_[0]) {
121     $inet_aton->($_[0])
122     } else {
123     _do_asy { $inet_aton->($_[0]) } @_
124 root 1.4 }
125 root 1.1 }
126    
127 root 1.8 =item @result = Coro::Util::fork_eval { ... }, @args
128    
129     Executes the given code block or code reference with the given arguments
130     in a separate process, returning the results. The return values must be
131     serialisable with Coro::Storable. It may, of course, block.
132    
133 root 1.10 Note that using event handling in the sub is not usually a good idea as
134     you will inherit a mixed set of watchers from the parent.
135 root 1.8
136     Exceptions will be correctly forwarded to the caller.
137    
138     This function is useful for pushing cpu-intensive computations into a
139     different process, for example to take advantage of multiple CPU's. Its
140     also useful if you want to simply run some blocking functions (such as
141     C<system()>) and do not care about the overhead enough to code your own
142     pid watcher etc.
143    
144 root 1.10 This function might keep a pool of processes in some future version, as
145     fork can be rather slow in large processes.
146    
147 root 1.8 Example: execute some external program (convert image to rgba raw form)
148     and add a long computation (extract the alpha channel) in a separate
149     process, making sure that never more then $NUMCPUS processes are being
150     run.
151    
152     my $cpulock = new Coro::Semaphore $NUMCPUS;
153    
154     sub do_it {
155     my ($path) = @_;
156    
157     my $guard = $cpulock->guard;
158    
159     Coro::Util::fork_eval {
160     open my $fh, "convert -depth 8 \Q$path\E rgba:"
161     or die "$path: $!";
162    
163     local $/;
164     # make my eyes hurt
165     pack "C*", unpack "(xxxC)*", <$fh>
166     }
167     }
168    
169     my $alphachannel = do_it "/tmp/img.png";
170    
171     =cut
172    
173     sub fork_eval(&@) {
174     my ($cb, @args) = @_;
175    
176     pipe my $fh1, my $fh2
177     or die "pipe: $!";
178    
179     my $pid = fork;
180    
181     if ($pid) {
182     undef $fh2;
183    
184     my $res = Coro::Storable::thaw +(Coro::Handle::unblock $fh1)->readline (undef);
185     waitpid $pid, 0; # should not block, we expect the child to simply behave
186    
187     die $$res unless "ARRAY" eq ref $res;
188    
189     return wantarray ? @$res : $res->[-1];
190    
191     } elsif (defined $pid) {
192     delete $SIG{__WARN__};
193     delete $SIG{__DIE__};
194 root 1.9 Coro::killall;
195 root 1.8 # just in case, this hack effectively disables event processing
196 root 1.9 # in the child. cleaner and slower would be to canceling all
197     # event watchers, but we are event-model agnostic.
198 root 1.8 undef $Coro::idle;
199     $Coro::current->prio (Coro::PRIO_MAX);
200    
201     eval {
202     undef $fh1;
203    
204     my @res = eval { $cb->(@args) };
205    
206     open my $fh, ">", \my $buf
207     or die "fork_eval: cannot open fh-to-buf in child: $!";
208     Storable::store_fd $@ ? \"$@" : \@res, $fh;
209     close $fh;
210    
211     syswrite $fh2, $buf;
212     close $fh2;
213     };
214    
215     warn $@ if $@;
216     Coro::State::_exit 0;
217    
218     } else {
219     die "fork_eval: $!";
220     }
221     }
222    
223 root 1.1 1;
224    
225 root 1.7 =back
226    
227 root 1.1 =head1 AUTHOR
228    
229 root 1.4 Marc Lehmann <schmorp@schmorp.de>
230     http://home.schmorp.de/
231 root 1.1
232     =cut
233