ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Util.pm
Revision: 1.52
Committed: Tue Jun 23 23:40:06 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
CVS Tags: rel-5_14
Changes since 1.51: +1 -1 lines
Log Message:
5.14

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     Coro::Util - various utility functions.
4    
5     =head1 SYNOPSIS
6    
7     use Coro::Util;
8    
9     =head1 DESCRIPTION
10    
11     This module implements various utility functions, mostly replacing perl
12     functions by non-blocking counterparts.
13    
14 root 1.22 This module is an AnyEvent user. Refer to the L<AnyEvent>
15 root 1.4 documentation to see how to integrate it into your own programs.
16    
17 root 1.1 =over 4
18    
19     =cut
20    
21     package Coro::Util;
22    
23 root 1.25 no warnings;
24 root 1.5 use strict;
25    
26 root 1.19 use Socket ();
27    
28 root 1.22 use AnyEvent ();
29 root 1.23 use AnyEvent::Socket ();
30 root 1.1
31 root 1.4 use Coro::State;
32 root 1.8 use Coro::Handle;
33     use Coro::Storable ();
34 root 1.1 use Coro::Semaphore;
35    
36     use base 'Exporter';
37    
38 root 1.5 our @EXPORT = qw(gethostbyname gethostbyaddr);
39 root 1.8 our @EXPORT_OK = qw(inet_aton fork_eval);
40 root 1.1
41 root 1.52 our $VERSION = 5.14;
42 root 1.1
43 root 1.5 our $MAXPARALLEL = 16; # max. number of parallel jobs
44 root 1.1
45     my $jobs = new Coro::Semaphore $MAXPARALLEL;
46    
47     sub _do_asy(&;@) {
48     my $sub = shift;
49     $jobs->down;
50     my $fh;
51 root 1.6
52 root 1.12 my $pid = open $fh, "-|";
53    
54     if (!defined $pid) {
55     die "fork: $!";
56     } elsif (!$pid) {
57 root 1.1 syswrite STDOUT, join "\0", map { unpack "H*", $_ } &$sub;
58 root 1.4 Coro::State::_exit 0;
59 root 1.1 }
60 root 1.6
61 root 1.1 my $buf;
62 root 1.45 my $wakeup = Coro::rouse_cb;
63 root 1.4 my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
64     sysread $fh, $buf, 16384, length $buf
65     and return;
66    
67     undef $w;
68 root 1.45 $wakeup->();
69 root 1.4 });
70 root 1.6
71 root 1.45 Coro::rouse_wait;
72 root 1.6
73 root 1.1 $jobs->up;
74     my @r = map { pack "H*", $_ } split /\0/, $buf;
75     wantarray ? @r : $r[0];
76     }
77    
78 root 1.22 =item $ipn = Coro::Util::inet_aton $hostname || $ip
79    
80 root 1.45 Works almost exactly like its AnyEvent::Socket counterpart, except that it
81     does not block other coroutines and returns the results.
82 root 1.22
83     =cut
84    
85     sub inet_aton {
86 root 1.45 AnyEvent::Socket::inet_aton $_[0], Coro::rouse_cb;
87     my @res = Coro::rouse_wait;
88 root 1.23 wantarray ? @res : $res[0]
89 root 1.19 }
90    
91 root 1.1 =item gethostbyname, gethostbyaddr
92    
93 root 1.22 Work similarly to their perl counterparts, but do not block. Uses
94     C<Anyevent::Util::inet_aton> internally.
95 root 1.1
96     =cut
97    
98     sub gethostbyname($) {
99 root 1.23 my @res = inet_aton $_[0];
100 root 1.22
101 root 1.23 ($_[0], $_[0], &Socket::AF_INET, 4, map +(format_ip $_), grep length == 4, @res)
102 root 1.1 }
103    
104     sub gethostbyaddr($$) {
105 root 1.19 _do_asy { gethostbyaddr $_[0], $_[1] } @_
106 root 1.5 }
107    
108 root 1.8 =item @result = Coro::Util::fork_eval { ... }, @args
109    
110     Executes the given code block or code reference with the given arguments
111     in a separate process, returning the results. The return values must be
112     serialisable with Coro::Storable. It may, of course, block.
113    
114 root 1.10 Note that using event handling in the sub is not usually a good idea as
115     you will inherit a mixed set of watchers from the parent.
116 root 1.8
117     Exceptions will be correctly forwarded to the caller.
118    
119     This function is useful for pushing cpu-intensive computations into a
120     different process, for example to take advantage of multiple CPU's. Its
121     also useful if you want to simply run some blocking functions (such as
122     C<system()>) and do not care about the overhead enough to code your own
123     pid watcher etc.
124    
125 root 1.10 This function might keep a pool of processes in some future version, as
126     fork can be rather slow in large processes.
127    
128 root 1.8 Example: execute some external program (convert image to rgba raw form)
129     and add a long computation (extract the alpha channel) in a separate
130     process, making sure that never more then $NUMCPUS processes are being
131     run.
132    
133     my $cpulock = new Coro::Semaphore $NUMCPUS;
134    
135     sub do_it {
136     my ($path) = @_;
137    
138     my $guard = $cpulock->guard;
139    
140     Coro::Util::fork_eval {
141     open my $fh, "convert -depth 8 \Q$path\E rgba:"
142     or die "$path: $!";
143    
144     local $/;
145     # make my eyes hurt
146     pack "C*", unpack "(xxxC)*", <$fh>
147     }
148     }
149    
150     my $alphachannel = do_it "/tmp/img.png";
151    
152     =cut
153    
154     sub fork_eval(&@) {
155     my ($cb, @args) = @_;
156    
157     pipe my $fh1, my $fh2
158     or die "pipe: $!";
159    
160     my $pid = fork;
161    
162     if ($pid) {
163     undef $fh2;
164    
165     my $res = Coro::Storable::thaw +(Coro::Handle::unblock $fh1)->readline (undef);
166     waitpid $pid, 0; # should not block, we expect the child to simply behave
167    
168     die $$res unless "ARRAY" eq ref $res;
169    
170     return wantarray ? @$res : $res->[-1];
171    
172     } elsif (defined $pid) {
173     delete $SIG{__WARN__};
174     delete $SIG{__DIE__};
175     # just in case, this hack effectively disables event processing
176 root 1.9 # in the child. cleaner and slower would be to canceling all
177     # event watchers, but we are event-model agnostic.
178 root 1.8 undef $Coro::idle;
179     $Coro::current->prio (Coro::PRIO_MAX);
180    
181     eval {
182     undef $fh1;
183    
184     my @res = eval { $cb->(@args) };
185    
186     open my $fh, ">", \my $buf
187     or die "fork_eval: cannot open fh-to-buf in child: $!";
188     Storable::store_fd $@ ? \"$@" : \@res, $fh;
189     close $fh;
190    
191     syswrite $fh2, $buf;
192     close $fh2;
193     };
194    
195     warn $@ if $@;
196     Coro::State::_exit 0;
197    
198     } else {
199     die "fork_eval: $!";
200     }
201     }
202    
203 root 1.14 # make sure store_fd is preloaded
204     eval { Storable::store_fd undef, undef };
205    
206 root 1.1 1;
207    
208 root 1.7 =back
209    
210 root 1.1 =head1 AUTHOR
211    
212 root 1.4 Marc Lehmann <schmorp@schmorp.de>
213     http://home.schmorp.de/
214 root 1.1
215     =cut
216