ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Util.pm
Revision: 1.109
Committed: Thu Aug 31 16:28:49 2017 UTC (6 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-6_514
Changes since 1.108: +1 -1 lines
Log Message:
6.514

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     Coro::Util - various utility functions.
4    
5     =head1 SYNOPSIS
6    
7     use Coro::Util;
8    
9     =head1 DESCRIPTION
10    
11     This module implements various utility functions, mostly replacing perl
12     functions by non-blocking counterparts.
13    
14 root 1.63 Many of these functions exist for the sole purpose of emulating existing
15     interfaces, no matter how bad or limited they are (e.g. no IPv6 support).
16    
17 root 1.22 This module is an AnyEvent user. Refer to the L<AnyEvent>
18 root 1.4 documentation to see how to integrate it into your own programs.
19    
20 root 1.1 =over 4
21    
22     =cut
23    
24     package Coro::Util;
25    
26 root 1.60 use common::sense;
27 root 1.5
28 root 1.19 use Socket ();
29    
30 root 1.22 use AnyEvent ();
31 root 1.23 use AnyEvent::Socket ();
32 root 1.1
33 root 1.4 use Coro::State;
34 root 1.8 use Coro::Handle;
35     use Coro::Storable ();
36 root 1.53 use Coro::AnyEvent ();
37 root 1.1 use Coro::Semaphore;
38    
39     use base 'Exporter';
40    
41 root 1.5 our @EXPORT = qw(gethostbyname gethostbyaddr);
42 root 1.8 our @EXPORT_OK = qw(inet_aton fork_eval);
43 root 1.1
44 root 1.109 our $VERSION = 6.514;
45 root 1.1
46 root 1.5 our $MAXPARALLEL = 16; # max. number of parallel jobs
47 root 1.1
48     my $jobs = new Coro::Semaphore $MAXPARALLEL;
49    
50     sub _do_asy(&;@) {
51     my $sub = shift;
52     $jobs->down;
53     my $fh;
54 root 1.6
55 root 1.12 my $pid = open $fh, "-|";
56    
57     if (!defined $pid) {
58     die "fork: $!";
59     } elsif (!$pid) {
60 root 1.1 syswrite STDOUT, join "\0", map { unpack "H*", $_ } &$sub;
61 root 1.84 Coro::Util::_exit 0;
62 root 1.1 }
63 root 1.6
64 root 1.1 my $buf;
65 root 1.45 my $wakeup = Coro::rouse_cb;
66 root 1.59 my $w; $w = AE::io $fh, 0, sub {
67 root 1.4 sysread $fh, $buf, 16384, length $buf
68     and return;
69    
70     undef $w;
71 root 1.45 $wakeup->();
72 root 1.59 };
73 root 1.6
74 root 1.45 Coro::rouse_wait;
75 root 1.6
76 root 1.1 $jobs->up;
77     my @r = map { pack "H*", $_ } split /\0/, $buf;
78     wantarray ? @r : $r[0];
79     }
80    
81 root 1.22 =item $ipn = Coro::Util::inet_aton $hostname || $ip
82    
83 root 1.63 Works almost exactly like its C<Socket::inet_aton> counterpart, except
84     that it does not block other coroutines.
85    
86     Does not handle multihomed hosts or IPv6 - consider using
87     C<AnyEvent::Socket::resolve_sockaddr> with the L<Coro> rouse functions
88     instead.
89 root 1.22
90     =cut
91    
92     sub inet_aton {
93 root 1.45 AnyEvent::Socket::inet_aton $_[0], Coro::rouse_cb;
94 root 1.63 (grep length == 4, Coro::rouse_wait)[0]
95 root 1.19 }
96    
97 root 1.1 =item gethostbyname, gethostbyaddr
98    
99 root 1.63 Work similarly to their Perl counterparts, but do not block. Uses
100     C<AnyEvent::Util::inet_aton> internally.
101    
102     Does not handle multihomed hosts or IPv6 - consider using
103     C<AnyEvent::Socket::resolve_sockaddr> or C<AnyEvent::DNS::reverse_lookup>
104     with the L<Coro> rouse functions instead.
105 root 1.1
106     =cut
107    
108     sub gethostbyname($) {
109 root 1.63 AnyEvent::Socket::inet_aton $_[0], Coro::rouse_cb;
110 root 1.22
111 root 1.63 ($_[0], $_[0], &Socket::AF_INET, 4, map +(AnyEvent::Socket::format_address $_), grep length == 4, Coro::rouse_wait)
112 root 1.1 }
113    
114     sub gethostbyaddr($$) {
115 root 1.19 _do_asy { gethostbyaddr $_[0], $_[1] } @_
116 root 1.5 }
117    
118 root 1.8 =item @result = Coro::Util::fork_eval { ... }, @args
119    
120     Executes the given code block or code reference with the given arguments
121     in a separate process, returning the results. The return values must be
122     serialisable with Coro::Storable. It may, of course, block.
123    
124 root 1.10 Note that using event handling in the sub is not usually a good idea as
125     you will inherit a mixed set of watchers from the parent.
126 root 1.8
127     Exceptions will be correctly forwarded to the caller.
128    
129     This function is useful for pushing cpu-intensive computations into a
130     different process, for example to take advantage of multiple CPU's. Its
131     also useful if you want to simply run some blocking functions (such as
132     C<system()>) and do not care about the overhead enough to code your own
133     pid watcher etc.
134    
135 root 1.10 This function might keep a pool of processes in some future version, as
136     fork can be rather slow in large processes.
137    
138 root 1.63 You should also look at C<AnyEvent::Util::fork_eval>, which is newer and
139     more compatible to totally broken Perl implementations such as the one
140     from ActiveState.
141    
142 root 1.8 Example: execute some external program (convert image to rgba raw form)
143     and add a long computation (extract the alpha channel) in a separate
144     process, making sure that never more then $NUMCPUS processes are being
145     run.
146    
147     my $cpulock = new Coro::Semaphore $NUMCPUS;
148    
149     sub do_it {
150     my ($path) = @_;
151    
152     my $guard = $cpulock->guard;
153    
154     Coro::Util::fork_eval {
155     open my $fh, "convert -depth 8 \Q$path\E rgba:"
156     or die "$path: $!";
157    
158     local $/;
159     # make my eyes hurt
160     pack "C*", unpack "(xxxC)*", <$fh>
161     }
162     }
163    
164     my $alphachannel = do_it "/tmp/img.png";
165    
166     =cut
167    
168     sub fork_eval(&@) {
169     my ($cb, @args) = @_;
170    
171     pipe my $fh1, my $fh2
172     or die "pipe: $!";
173    
174     my $pid = fork;
175    
176     if ($pid) {
177     undef $fh2;
178    
179     my $res = Coro::Storable::thaw +(Coro::Handle::unblock $fh1)->readline (undef);
180     waitpid $pid, 0; # should not block, we expect the child to simply behave
181    
182     die $$res unless "ARRAY" eq ref $res;
183    
184     return wantarray ? @$res : $res->[-1];
185    
186     } elsif (defined $pid) {
187     delete $SIG{__WARN__};
188     delete $SIG{__DIE__};
189     # just in case, this hack effectively disables event processing
190 root 1.9 # in the child. cleaner and slower would be to canceling all
191     # event watchers, but we are event-model agnostic.
192 root 1.8 undef $Coro::idle;
193     $Coro::current->prio (Coro::PRIO_MAX);
194    
195     eval {
196     undef $fh1;
197    
198     my @res = eval { $cb->(@args) };
199    
200     open my $fh, ">", \my $buf
201     or die "fork_eval: cannot open fh-to-buf in child: $!";
202     Storable::store_fd $@ ? \"$@" : \@res, $fh;
203     close $fh;
204    
205     syswrite $fh2, $buf;
206     close $fh2;
207     };
208    
209     warn $@ if $@;
210 root 1.84 Coro::Util::_exit 0;
211 root 1.8
212     } else {
213     die "fork_eval: $!";
214     }
215     }
216    
217 root 1.14 # make sure store_fd is preloaded
218     eval { Storable::store_fd undef, undef };
219    
220 root 1.1 1;
221    
222 root 1.7 =back
223    
224 root 1.96 =head1 AUTHOR/SUPPORT/CONTACT
225 root 1.1
226 root 1.96 Marc A. Lehmann <schmorp@schmorp.de>
227     http://software.schmorp.de/pkg/Coro.html
228 root 1.1
229     =cut
230