ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro/Coro/Util.pm
Revision: 1.115
Committed: Mon Mar 16 11:12:53 2020 UTC (4 years, 2 months ago) by root
Branch: MAIN
CVS Tags: rel-6_57, HEAD
Changes since 1.114: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 Coro::Util - various utility functions.
4
5 =head1 SYNOPSIS
6
7 use Coro::Util;
8
9 =head1 DESCRIPTION
10
11 This module implements various utility functions, mostly replacing perl
12 functions by non-blocking counterparts.
13
14 Many of these functions exist for the sole purpose of emulating existing
15 interfaces, no matter how bad or limited they are (e.g. no IPv6 support).
16
17 This module is an AnyEvent user. Refer to the L<AnyEvent>
18 documentation to see how to integrate it into your own programs.
19
20 =over 4
21
22 =cut
23
24 package Coro::Util;
25
26 use common::sense;
27
28 use Socket ();
29
30 use AnyEvent ();
31 use AnyEvent::Socket ();
32
33 use Coro::State;
34 use Coro::Handle;
35 use Coro::Storable ();
36 use Coro::AnyEvent ();
37 use Coro::Semaphore;
38
39 use base 'Exporter';
40
41 our @EXPORT = qw(gethostbyname gethostbyaddr);
42 our @EXPORT_OK = qw(inet_aton fork_eval);
43
44 our $VERSION = 6.57;
45
46 our $MAXPARALLEL = 16; # max. number of parallel jobs
47
48 my $jobs = new Coro::Semaphore $MAXPARALLEL;
49
50 sub _do_asy(&;@) {
51 my $sub = shift;
52 $jobs->down;
53 my $fh;
54
55 my $pid = open $fh, "-|";
56
57 if (!defined $pid) {
58 die "fork: $!";
59 } elsif (!$pid) {
60 syswrite STDOUT, join "\0", map { unpack "H*", $_ } &$sub;
61 Coro::Util::_exit 0;
62 }
63
64 my $buf;
65 my $wakeup = Coro::rouse_cb;
66 my $w; $w = AE::io $fh, 0, sub {
67 sysread $fh, $buf, 16384, length $buf
68 and return;
69
70 undef $w;
71 $wakeup->();
72 };
73
74 Coro::rouse_wait;
75
76 $jobs->up;
77 my @r = map { pack "H*", $_ } split /\0/, $buf;
78 wantarray ? @r : $r[0];
79 }
80
81 =item $ipn = Coro::Util::inet_aton $hostname || $ip
82
83 Works almost exactly like its C<Socket::inet_aton> counterpart, except
84 that it does not block other coroutines.
85
86 Does not handle multihomed hosts or IPv6 - consider using
87 C<AnyEvent::Socket::resolve_sockaddr> with the L<Coro> rouse functions
88 instead.
89
90 =cut
91
92 sub inet_aton {
93 AnyEvent::Socket::inet_aton $_[0], Coro::rouse_cb;
94 (grep length == 4, Coro::rouse_wait)[0]
95 }
96
97 =item gethostbyname, gethostbyaddr
98
99 Work similarly to their Perl counterparts, but do not block. Uses
100 C<AnyEvent::Util::inet_aton> internally.
101
102 Does not handle multihomed hosts or IPv6 - consider using
103 C<AnyEvent::Socket::resolve_sockaddr> or C<AnyEvent::DNS::reverse_lookup>
104 with the L<Coro> rouse functions instead.
105
106 =cut
107
108 sub gethostbyname($) {
109 AnyEvent::Socket::inet_aton $_[0], Coro::rouse_cb;
110
111 ($_[0], $_[0], &Socket::AF_INET, 4, map +(AnyEvent::Socket::format_address $_), grep length == 4, Coro::rouse_wait)
112 }
113
114 sub gethostbyaddr($$) {
115 _do_asy { gethostbyaddr $_[0], $_[1] } @_
116 }
117
118 =item @result = Coro::Util::fork_eval { ... }, @args
119
120 Executes the given code block or code reference with the given arguments
121 in a separate process, returning the results. The return values must be
122 serialisable with Coro::Storable. It may, of course, block.
123
124 Note that using event handling in the sub is not usually a good idea as
125 you will inherit a mixed set of watchers from the parent.
126
127 Exceptions will be correctly forwarded to the caller.
128
129 This function is useful for pushing cpu-intensive computations into a
130 different process, for example to take advantage of multiple CPU's. Its
131 also useful if you want to simply run some blocking functions (such as
132 C<system()>) and do not care about the overhead enough to code your own
133 pid watcher etc.
134
135 This function might keep a pool of processes in some future version, as
136 fork can be rather slow in large processes.
137
138 You should also look at C<AnyEvent::Util::fork_eval>, which is newer and
139 more compatible to totally broken Perl implementations such as the one
140 from ActiveState.
141
142 Example: execute some external program (convert image to rgba raw form)
143 and add a long computation (extract the alpha channel) in a separate
144 process, making sure that never more then $NUMCPUS processes are being
145 run.
146
147 my $cpulock = new Coro::Semaphore $NUMCPUS;
148
149 sub do_it {
150 my ($path) = @_;
151
152 my $guard = $cpulock->guard;
153
154 Coro::Util::fork_eval {
155 open my $fh, "convert -depth 8 \Q$path\E rgba:"
156 or die "$path: $!";
157
158 local $/;
159 # make my eyes hurt
160 pack "C*", unpack "(xxxC)*", <$fh>
161 }
162 }
163
164 my $alphachannel = do_it "/tmp/img.png";
165
166 =cut
167
168 sub fork_eval(&@) {
169 my ($cb, @args) = @_;
170
171 pipe my $fh1, my $fh2
172 or die "pipe: $!";
173
174 my $pid = fork;
175
176 if ($pid) {
177 undef $fh2;
178
179 my $res = Coro::Storable::thaw +(Coro::Handle::unblock $fh1)->readline (undef);
180 waitpid $pid, 0; # should not block, we expect the child to simply behave
181
182 die $$res unless "ARRAY" eq ref $res;
183
184 return wantarray ? @$res : $res->[-1];
185
186 } elsif (defined $pid) {
187 delete $SIG{__WARN__};
188 delete $SIG{__DIE__};
189 # just in case, this hack effectively disables event processing
190 # in the child. cleaner and slower would be to canceling all
191 # event watchers, but we are event-model agnostic.
192 undef $Coro::idle;
193 $Coro::current->prio (Coro::PRIO_MAX);
194
195 eval {
196 undef $fh1;
197
198 my @res = eval { $cb->(@args) };
199
200 open my $fh, ">", \my $buf
201 or die "fork_eval: cannot open fh-to-buf in child: $!";
202 Storable::store_fd $@ ? \"$@" : \@res, $fh;
203 close $fh;
204
205 syswrite $fh2, $buf;
206 close $fh2;
207 };
208
209 warn $@ if $@;
210 Coro::Util::_exit 0;
211
212 } else {
213 die "fork_eval: $!";
214 }
215 }
216
217 # make sure store_fd is preloaded
218 eval { Storable::store_fd undef, undef };
219
220 1;
221
222 =back
223
224 =head1 AUTHOR/SUPPORT/CONTACT
225
226 Marc A. Lehmann <schmorp@schmorp.de>
227 http://software.schmorp.de/pkg/Coro.html
228
229 =cut
230