ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Util.pm
Revision: 1.17
Committed: Thu Nov 1 16:00:46 2007 UTC (16 years, 7 months ago) by root
Branch: MAIN
CVS Tags: rel-4_21, rel-4_2
Changes since 1.16: +28 -28 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     Coro::Util - various utility functions.
4    
5     =head1 SYNOPSIS
6    
7     use Coro::Util;
8    
9     =head1 DESCRIPTION
10    
11     This module implements various utility functions, mostly replacing perl
12     functions by non-blocking counterparts.
13    
14 root 1.4 This module is an AnyEvent user. Refer to the L<AnyEvent|AnyEvent>
15     documentation to see how to integrate it into your own programs.
16    
17 root 1.1 =over 4
18    
19     =cut
20    
21     package Coro::Util;
22    
23 root 1.5 use strict;
24    
25 root 1.4 no warnings "uninitialized";
26    
27     use AnyEvent;
28 root 1.1
29 root 1.4 use Coro::State;
30 root 1.8 use Coro::Handle;
31     use Coro::Storable ();
32 root 1.1 use Coro::Semaphore;
33    
34     use base 'Exporter';
35    
36 root 1.5 our @EXPORT = qw(gethostbyname gethostbyaddr);
37 root 1.8 our @EXPORT_OK = qw(inet_aton fork_eval);
38 root 1.1
39 root 1.5 our $VERSION = 2.0;
40 root 1.1
41 root 1.5 our $MAXPARALLEL = 16; # max. number of parallel jobs
42 root 1.1
43     my $jobs = new Coro::Semaphore $MAXPARALLEL;
44    
45     sub _do_asy(&;@) {
46     my $sub = shift;
47     $jobs->down;
48     my $fh;
49 root 1.6
50 root 1.12 my $pid = open $fh, "-|";
51    
52     if (!defined $pid) {
53     die "fork: $!";
54     } elsif (!$pid) {
55 root 1.1 syswrite STDOUT, join "\0", map { unpack "H*", $_ } &$sub;
56 root 1.4 Coro::State::_exit 0;
57 root 1.1 }
58 root 1.6
59 root 1.1 my $buf;
60 root 1.4 my $current = $Coro::current;
61     my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
62     sysread $fh, $buf, 16384, length $buf
63     and return;
64    
65     undef $w;
66     $current->ready;
67     });
68 root 1.6
69     &Coro::schedule;
70     &Coro::schedule while $w;
71    
72 root 1.1 $jobs->up;
73     my @r = map { pack "H*", $_ } split /\0/, $buf;
74     wantarray ? @r : $r[0];
75     }
76    
77 root 1.5 sub dotted_quad($) {
78 root 1.11 $_[0] =~ /^(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
79     \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
80     \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)
81     \.(?:25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[0-9][0-9]?)$/x
82 root 1.5 }
83    
84 root 1.1 =item gethostbyname, gethostbyaddr
85    
86     Work exactly like their perl counterparts, but do not block. Currently
87 root 1.5 this is being implemented with forking, so it's not exactly low-cost.
88 root 1.1
89     =cut
90    
91 root 1.4 my $netdns = eval { die; require Net::DNS::Resolver; new Net::DNS::Resolver; };
92    
93 root 1.1 sub gethostbyname($) {
94 root 1.4 if ($netdns) {
95     #$netdns->query($_[0]);
96     die;
97 root 1.17 } elsif (AnyEvent::detect eq "EV::AnyEvent") {
98     require EV::DNS;
99     require Socket;
100    
101     my $current = $Coro::current;
102     my ($result, @ptrs);
103    
104     EV::DNS::resolve_ipv4 ($_[0], 0, sub {
105     ($result, undef, undef, @ptrs) = @_;
106     $current->ready;
107     });
108     Coro::schedule while !defined $result;
109     return @ptrs
110     ? ($_[0], undef, &Socket::AF_INET, 4, @ptrs)
111     : ();
112 root 1.4 } else {
113 root 1.15 return _do_asy { gethostbyname $_[0] } @_
114 root 1.4 }
115 root 1.1 }
116    
117     sub gethostbyaddr($$) {
118 root 1.4 if ($netdns) {
119     die;
120     } else {
121 root 1.5 _do_asy { gethostbyaddr $_[0], $_[1] } @_
122     }
123     }
124    
125     =item Coro::Util::inet_aton
126    
127     Works almost exactly like its Socket counterpart, except that it does not
128     block. Is implemented with forking, so not exactly low-cost.
129    
130     =cut
131    
132     use Socket;
133    
134     our $inet_aton = \&Socket::inet_aton;
135    
136     sub inet_aton {
137     require Socket;
138    
139     if (dotted_quad $_[0]) {
140     $inet_aton->($_[0])
141 root 1.17 } elsif (AnyEvent::detect eq "EV::AnyEvent") {
142     require EV::DNS;
143     require Socket;
144    
145     my $current = $Coro::current;
146     my ($result, @ptrs);
147    
148     EV::DNS::resolve_ipv4 ($_[0], 0, sub {
149     ($result, undef, undef, @ptrs) = @_;
150     $current->ready;
151     });
152     Coro::schedule while !defined $result;
153     return $ptrs[0];
154 root 1.5 } else {
155 root 1.15 return _do_asy { $inet_aton->($_[0]) } @_
156 root 1.4 }
157 root 1.1 }
158    
159 root 1.8 =item @result = Coro::Util::fork_eval { ... }, @args
160    
161     Executes the given code block or code reference with the given arguments
162     in a separate process, returning the results. The return values must be
163     serialisable with Coro::Storable. It may, of course, block.
164    
165 root 1.10 Note that using event handling in the sub is not usually a good idea as
166     you will inherit a mixed set of watchers from the parent.
167 root 1.8
168     Exceptions will be correctly forwarded to the caller.
169    
170     This function is useful for pushing cpu-intensive computations into a
171     different process, for example to take advantage of multiple CPU's. Its
172     also useful if you want to simply run some blocking functions (such as
173     C<system()>) and do not care about the overhead enough to code your own
174     pid watcher etc.
175    
176 root 1.10 This function might keep a pool of processes in some future version, as
177     fork can be rather slow in large processes.
178    
179 root 1.8 Example: execute some external program (convert image to rgba raw form)
180     and add a long computation (extract the alpha channel) in a separate
181     process, making sure that never more then $NUMCPUS processes are being
182     run.
183    
184     my $cpulock = new Coro::Semaphore $NUMCPUS;
185    
186     sub do_it {
187     my ($path) = @_;
188    
189     my $guard = $cpulock->guard;
190    
191     Coro::Util::fork_eval {
192     open my $fh, "convert -depth 8 \Q$path\E rgba:"
193     or die "$path: $!";
194    
195     local $/;
196     # make my eyes hurt
197     pack "C*", unpack "(xxxC)*", <$fh>
198     }
199     }
200    
201     my $alphachannel = do_it "/tmp/img.png";
202    
203     =cut
204    
205     sub fork_eval(&@) {
206     my ($cb, @args) = @_;
207    
208     pipe my $fh1, my $fh2
209     or die "pipe: $!";
210    
211     my $pid = fork;
212    
213     if ($pid) {
214     undef $fh2;
215    
216     my $res = Coro::Storable::thaw +(Coro::Handle::unblock $fh1)->readline (undef);
217     waitpid $pid, 0; # should not block, we expect the child to simply behave
218    
219     die $$res unless "ARRAY" eq ref $res;
220    
221     return wantarray ? @$res : $res->[-1];
222    
223     } elsif (defined $pid) {
224     delete $SIG{__WARN__};
225     delete $SIG{__DIE__};
226     # just in case, this hack effectively disables event processing
227 root 1.9 # in the child. cleaner and slower would be to canceling all
228     # event watchers, but we are event-model agnostic.
229 root 1.8 undef $Coro::idle;
230     $Coro::current->prio (Coro::PRIO_MAX);
231    
232     eval {
233     undef $fh1;
234    
235     my @res = eval { $cb->(@args) };
236    
237     open my $fh, ">", \my $buf
238     or die "fork_eval: cannot open fh-to-buf in child: $!";
239     Storable::store_fd $@ ? \"$@" : \@res, $fh;
240     close $fh;
241    
242     syswrite $fh2, $buf;
243     close $fh2;
244     };
245    
246     warn $@ if $@;
247     Coro::State::_exit 0;
248    
249     } else {
250     die "fork_eval: $!";
251     }
252     }
253    
254 root 1.14 # make sure store_fd is preloaded
255     eval { Storable::store_fd undef, undef };
256    
257 root 1.1 1;
258    
259 root 1.7 =back
260    
261 root 1.1 =head1 AUTHOR
262    
263 root 1.4 Marc Lehmann <schmorp@schmorp.de>
264     http://home.schmorp.de/
265 root 1.1
266     =cut
267