ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Util.pm
Revision: 1.29
Committed: Sat May 31 12:10:55 2008 UTC (16 years ago) by root
Branch: MAIN
CVS Tags: rel-4_743, rel-4_742
Changes since 1.28: +1 -1 lines
Log Message:
4.742

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     Coro::Util - various utility functions.
4    
5     =head1 SYNOPSIS
6    
7     use Coro::Util;
8    
9     =head1 DESCRIPTION
10    
11     This module implements various utility functions, mostly replacing perl
12     functions by non-blocking counterparts.
13    
14 root 1.22 This module is an AnyEvent user. Refer to the L<AnyEvent>
15 root 1.4 documentation to see how to integrate it into your own programs.
16    
17 root 1.1 =over 4
18    
19     =cut
20    
21     package Coro::Util;
22    
23 root 1.25 no warnings;
24 root 1.5 use strict;
25    
26 root 1.19 use Socket ();
27    
28 root 1.22 use AnyEvent ();
29 root 1.23 use AnyEvent::Socket ();
30 root 1.1
31 root 1.4 use Coro::State;
32 root 1.8 use Coro::Handle;
33     use Coro::Storable ();
34 root 1.1 use Coro::Semaphore;
35    
36     use base 'Exporter';
37    
38 root 1.5 our @EXPORT = qw(gethostbyname gethostbyaddr);
39 root 1.8 our @EXPORT_OK = qw(inet_aton fork_eval);
40 root 1.1
41 root 1.29 our $VERSION = 4.742;
42 root 1.1
43 root 1.5 our $MAXPARALLEL = 16; # max. number of parallel jobs
44 root 1.1
45     my $jobs = new Coro::Semaphore $MAXPARALLEL;
46    
47     sub _do_asy(&;@) {
48     my $sub = shift;
49     $jobs->down;
50     my $fh;
51 root 1.6
52 root 1.12 my $pid = open $fh, "-|";
53    
54     if (!defined $pid) {
55     die "fork: $!";
56     } elsif (!$pid) {
57 root 1.1 syswrite STDOUT, join "\0", map { unpack "H*", $_ } &$sub;
58 root 1.4 Coro::State::_exit 0;
59 root 1.1 }
60 root 1.6
61 root 1.1 my $buf;
62 root 1.4 my $current = $Coro::current;
63     my $w; $w = AnyEvent->io (fh => $fh, poll => 'r', cb => sub {
64     sysread $fh, $buf, 16384, length $buf
65     and return;
66    
67     undef $w;
68     $current->ready;
69     });
70 root 1.6
71     &Coro::schedule;
72     &Coro::schedule while $w;
73    
74 root 1.1 $jobs->up;
75     my @r = map { pack "H*", $_ } split /\0/, $buf;
76     wantarray ? @r : $r[0];
77     }
78    
79 root 1.22 =item $ipn = Coro::Util::inet_aton $hostname || $ip
80    
81 root 1.23 Works almost exactly like its AnyEvent::Socket counterpart, except that it does not
82 root 1.22 block.
83    
84     =cut
85    
86     sub inet_aton {
87     my $current = $Coro::current;
88 root 1.23 my @res;
89 root 1.22
90 root 1.23 AnyEvent::Socket::inet_aton $_[0], sub {
91     @res = shift;
92 root 1.22 $current->ready;
93     undef $current;
94     };
95    
96     Coro::schedule while $current;
97    
98 root 1.23 wantarray ? @res : $res[0]
99 root 1.19 }
100    
101 root 1.1 =item gethostbyname, gethostbyaddr
102    
103 root 1.22 Work similarly to their perl counterparts, but do not block. Uses
104     C<Anyevent::Util::inet_aton> internally.
105 root 1.1
106     =cut
107    
108     sub gethostbyname($) {
109 root 1.22 my $current = $Coro::current;
110 root 1.23 my @res = inet_aton $_[0];
111 root 1.22
112 root 1.23 ($_[0], $_[0], &Socket::AF_INET, 4, map +(format_ip $_), grep length == 4, @res)
113 root 1.1 }
114    
115     sub gethostbyaddr($$) {
116 root 1.19 _do_asy { gethostbyaddr $_[0], $_[1] } @_
117 root 1.5 }
118    
119 root 1.8 =item @result = Coro::Util::fork_eval { ... }, @args
120    
121     Executes the given code block or code reference with the given arguments
122     in a separate process, returning the results. The return values must be
123     serialisable with Coro::Storable. It may, of course, block.
124    
125 root 1.10 Note that using event handling in the sub is not usually a good idea as
126     you will inherit a mixed set of watchers from the parent.
127 root 1.8
128     Exceptions will be correctly forwarded to the caller.
129    
130     This function is useful for pushing cpu-intensive computations into a
131     different process, for example to take advantage of multiple CPU's. Its
132     also useful if you want to simply run some blocking functions (such as
133     C<system()>) and do not care about the overhead enough to code your own
134     pid watcher etc.
135    
136 root 1.10 This function might keep a pool of processes in some future version, as
137     fork can be rather slow in large processes.
138    
139 root 1.8 Example: execute some external program (convert image to rgba raw form)
140     and add a long computation (extract the alpha channel) in a separate
141     process, making sure that never more then $NUMCPUS processes are being
142     run.
143    
144     my $cpulock = new Coro::Semaphore $NUMCPUS;
145    
146     sub do_it {
147     my ($path) = @_;
148    
149     my $guard = $cpulock->guard;
150    
151     Coro::Util::fork_eval {
152     open my $fh, "convert -depth 8 \Q$path\E rgba:"
153     or die "$path: $!";
154    
155     local $/;
156     # make my eyes hurt
157     pack "C*", unpack "(xxxC)*", <$fh>
158     }
159     }
160    
161     my $alphachannel = do_it "/tmp/img.png";
162    
163     =cut
164    
165     sub fork_eval(&@) {
166     my ($cb, @args) = @_;
167    
168     pipe my $fh1, my $fh2
169     or die "pipe: $!";
170    
171     my $pid = fork;
172    
173     if ($pid) {
174     undef $fh2;
175    
176     my $res = Coro::Storable::thaw +(Coro::Handle::unblock $fh1)->readline (undef);
177     waitpid $pid, 0; # should not block, we expect the child to simply behave
178    
179     die $$res unless "ARRAY" eq ref $res;
180    
181     return wantarray ? @$res : $res->[-1];
182    
183     } elsif (defined $pid) {
184     delete $SIG{__WARN__};
185     delete $SIG{__DIE__};
186     # just in case, this hack effectively disables event processing
187 root 1.9 # in the child. cleaner and slower would be to canceling all
188     # event watchers, but we are event-model agnostic.
189 root 1.8 undef $Coro::idle;
190     $Coro::current->prio (Coro::PRIO_MAX);
191    
192     eval {
193     undef $fh1;
194    
195     my @res = eval { $cb->(@args) };
196    
197     open my $fh, ">", \my $buf
198     or die "fork_eval: cannot open fh-to-buf in child: $!";
199     Storable::store_fd $@ ? \"$@" : \@res, $fh;
200     close $fh;
201    
202     syswrite $fh2, $buf;
203     close $fh2;
204     };
205    
206     warn $@ if $@;
207     Coro::State::_exit 0;
208    
209     } else {
210     die "fork_eval: $!";
211     }
212     }
213    
214 root 1.14 # make sure store_fd is preloaded
215     eval { Storable::store_fd undef, undef };
216    
217 root 1.1 1;
218    
219 root 1.7 =back
220    
221 root 1.1 =head1 AUTHOR
222    
223 root 1.4 Marc Lehmann <schmorp@schmorp.de>
224     http://home.schmorp.de/
225 root 1.1
226     =cut
227