1 |
root |
1.1 |
=head1 NAME |
2 |
|
|
|
3 |
|
|
Coro::MP - erlang-style multi-processing/message-passing framework |
4 |
|
|
|
5 |
|
|
=head1 SYNOPSIS |
6 |
|
|
|
7 |
|
|
use Coro::MP; |
8 |
|
|
|
9 |
|
|
$NODE # contains this node's node ID |
10 |
|
|
NODE # returns this node's node ID |
11 |
|
|
|
12 |
|
|
$SELF # receiving/own port id in rcv callbacks |
13 |
|
|
|
14 |
|
|
# initialise the node so it can send/receive messages |
15 |
|
|
configure; |
16 |
|
|
|
17 |
|
|
# ports are message destinations |
18 |
|
|
|
19 |
|
|
# sending messages |
20 |
|
|
snd $port, type => data...; |
21 |
|
|
snd $port, @msg; |
22 |
|
|
snd @msg_with_first_element_being_a_port; |
23 |
|
|
|
24 |
|
|
# creating/using ports |
25 |
root |
1.2 |
my $port = port_async { |
26 |
root |
1.1 |
# thread context, $SELF is set to $port |
27 |
|
|
|
28 |
|
|
# returning will "kil" the $port with an empty reason |
29 |
|
|
}; |
30 |
|
|
|
31 |
|
|
# simple receive |
32 |
|
|
my $port = port { |
33 |
root |
1.2 |
my (undef, @data) = get "tag"; |
34 |
root |
1.1 |
}; |
35 |
|
|
snd $port, tag => "data0", "data1"; |
36 |
|
|
|
37 |
|
|
# create a port on another node |
38 |
|
|
my $port = spawn $node, $initfunc, @initdata; |
39 |
|
|
|
40 |
|
|
# monitoring |
41 |
|
|
mon $localport, $cb->(@msg) # callback is invoked on death |
42 |
|
|
mon $localport, $otherport # kill otherport on abnormal death |
43 |
|
|
mon $localport, $otherport, @msg # send message on death |
44 |
|
|
|
45 |
|
|
=head1 DESCRIPTION |
46 |
|
|
|
47 |
|
|
This module (-family) implements a simple message passing framework. |
48 |
|
|
|
49 |
|
|
Despite its simplicity, you can securely message other processes running |
50 |
|
|
on the same or other hosts, and you can supervise entities remotely. |
51 |
|
|
|
52 |
|
|
This module depends heavily on L<AnyEvent::MP>, in fact, many functions |
53 |
|
|
exported by this module are identical to AnyEvent::MP functions. This |
54 |
|
|
module family is simply the Coro API to AnyEvent::MP. |
55 |
|
|
|
56 |
|
|
Care has been taken to stay compatible with AnyEvent::MP, even if |
57 |
|
|
sometimes this required a less natural API (C<spawn> should indeed spawn a |
58 |
|
|
thread, not just call an initfunc for example). |
59 |
|
|
|
60 |
|
|
For an introduction to AnyEvent::MP, see the L<AnyEvent::MP::Intro> manual |
61 |
|
|
page. |
62 |
|
|
|
63 |
|
|
=head1 VARIABLES/FUNCTIONS |
64 |
|
|
|
65 |
|
|
=over 4 |
66 |
|
|
|
67 |
|
|
=cut |
68 |
|
|
|
69 |
|
|
package Coro::MP; |
70 |
|
|
|
71 |
|
|
use common::sense; |
72 |
|
|
|
73 |
|
|
use Carp (); |
74 |
|
|
|
75 |
|
|
use AnyEvent::MP::Kernel; |
76 |
root |
1.4 |
use AnyEvent::MP; |
77 |
root |
1.1 |
use Coro; |
78 |
|
|
use Coro::AnyEvent (); |
79 |
|
|
|
80 |
|
|
use AE (); |
81 |
|
|
|
82 |
|
|
use base "Exporter"; |
83 |
|
|
|
84 |
|
|
our $VERSION = "0.1"; |
85 |
|
|
|
86 |
|
|
our @EXPORT = qw( |
87 |
|
|
NODE $NODE *SELF node_of |
88 |
|
|
configure |
89 |
root |
1.3 |
port snd rcv mon kil psub cal spawn |
90 |
root |
1.4 |
|
91 |
root |
1.3 |
port_async rcv_async get get_cond syncal pasync |
92 |
root |
1.1 |
); |
93 |
|
|
|
94 |
root |
1.4 |
sub _new_coro { |
95 |
|
|
my ($port, $threadcb) = @_; |
96 |
|
|
|
97 |
|
|
my $coro = async_pool { |
98 |
|
|
eval { $threadcb->() }; |
99 |
|
|
kil $SELF, die => $@ if $@; |
100 |
|
|
}; |
101 |
|
|
$coro->swap_sv (\$SELF, \$port); |
102 |
|
|
|
103 |
|
|
# killing the port cancels the coro |
104 |
root |
1.6 |
# delying kil messages inside aemp guarantees |
105 |
|
|
# (hopefully) that $coro != $Coro::current. |
106 |
|
|
mon $port, sub { $coro->cancel (@_) }; |
107 |
root |
1.4 |
|
108 |
|
|
# cancelling the coro kills the port |
109 |
|
|
$coro->on_destroy (sub { kil $port, @_ }); |
110 |
root |
1.1 |
|
111 |
root |
1.4 |
$coro |
112 |
root |
1.1 |
} |
113 |
|
|
|
114 |
|
|
=item NODE, $NODE, node_of, configure |
115 |
|
|
|
116 |
|
|
=item $SELF, *SELF, SELF, %SELF, @SELF... |
117 |
|
|
|
118 |
root |
1.3 |
=item snd, mon, kil |
119 |
root |
1.1 |
|
120 |
|
|
These variables and functions work exactly as in AnyEvent::MP, in fact, |
121 |
root |
1.3 |
they are exactly the same functions, and are used in much the same way. |
122 |
|
|
|
123 |
|
|
=item rcv |
124 |
|
|
|
125 |
|
|
This function works exactly as C<AnyEvent::MP::rcv>, and is in fact |
126 |
|
|
compatible with Coro::MP ports. However, the canonical way to receive |
127 |
|
|
messages with Coro::MP is to use C<get> or C<get_cond>. |
128 |
|
|
|
129 |
|
|
=item port |
130 |
|
|
|
131 |
|
|
This function is exactly the same as C<AnyEvent::MP::port> and creates new |
132 |
|
|
ports. You can attach a thread to them by calling C<rcv_async> or you can |
133 |
|
|
do a create and attach in one operation using C<port_async>. |
134 |
|
|
|
135 |
|
|
=item psub |
136 |
|
|
|
137 |
|
|
This function works exactly as C<AnyEvent::MP::psub> - you could use it to |
138 |
|
|
run callbacks within a port context (good for monitoring), but you cannot |
139 |
|
|
C<get> messages unless the callback executes within the thread attached to |
140 |
|
|
the port. |
141 |
|
|
|
142 |
|
|
Since creating a thread with port context requires somewhta annoying |
143 |
|
|
syntax, there is a C<pasync> function that handles that for you - note |
144 |
|
|
that within such a thread, you still cannot C<get> messages. |
145 |
root |
1.1 |
|
146 |
|
|
=item spawn |
147 |
|
|
|
148 |
root |
1.3 |
This function is identical to C<AnyEvent::MP::spawn>. This means that |
149 |
root |
1.1 |
it doesn't spawn a new thread as one would expect, but simply calls an |
150 |
|
|
init function. The init function, however, can attach a new thread easily: |
151 |
|
|
|
152 |
|
|
sub initfun { |
153 |
|
|
my (@args) = @_; |
154 |
|
|
|
155 |
root |
1.3 |
rcv_async $SELF, sub { |
156 |
root |
1.1 |
# thread-code |
157 |
|
|
}; |
158 |
|
|
} |
159 |
|
|
|
160 |
root |
1.3 |
=item cal |
161 |
|
|
|
162 |
|
|
This function is identical to C<AnyEvent::MP::cal>. The easiest way to |
163 |
|
|
make a synchronous call is to use Coro's rouse functionality: |
164 |
|
|
|
165 |
|
|
# send 1, 2, 3 to $port and wait up to 30s for reply |
166 |
|
|
cal $port, 1, 2, 3, rouse_cb, 30; |
167 |
|
|
my @reply = rouse_wait; |
168 |
|
|
|
169 |
|
|
You can also use C<syncal> if you want, and are ok with learning yet |
170 |
root |
1.4 |
another function with a weird name: |
171 |
root |
1.3 |
|
172 |
|
|
my @reply = syncal 30, $port, 1, 2, 3; |
173 |
|
|
|
174 |
root |
1.1 |
=item $local_port = port_async { ... } |
175 |
|
|
|
176 |
|
|
Creates a new local port, and returns its ID. A new thread is created and |
177 |
|
|
attached to the port (see C<rcv_async>, below, for details). |
178 |
|
|
|
179 |
|
|
=cut |
180 |
|
|
|
181 |
|
|
sub rcv_async($$); |
182 |
|
|
|
183 |
|
|
sub port_async(;&) { |
184 |
|
|
my $id = "$UNIQ." . $ID++; |
185 |
|
|
my $port = "$NODE#$id"; |
186 |
|
|
|
187 |
|
|
@_ |
188 |
|
|
? rcv_async $port, shift |
189 |
|
|
: AnyEvent::MP::rcv $port, undef; |
190 |
|
|
|
191 |
|
|
$port |
192 |
|
|
} |
193 |
|
|
|
194 |
|
|
=item rcv_async $port, $threadcb |
195 |
|
|
|
196 |
|
|
This function creates and attaches a thread on a port. The thread is set |
197 |
|
|
to execute C<$threadcb> and is put into the ready queue. The thread will |
198 |
|
|
receive all messages not filtered away by tagged receive callbacks (as set |
199 |
|
|
by C<AnyEvent::MP::rcv>) - it simply replaces the default callback of an |
200 |
|
|
AnyEvent::MP port. |
201 |
|
|
|
202 |
|
|
The special variable C<$SELF> will be set to C<$port> during thread |
203 |
|
|
execution. |
204 |
|
|
|
205 |
|
|
When C<$threadcb> returns or the thread is canceled, the return/cancel |
206 |
|
|
values become the C<kil> reason. |
207 |
|
|
|
208 |
|
|
It is not allowed to call C<rcv_async> more than once on a given port. |
209 |
|
|
|
210 |
|
|
=cut |
211 |
|
|
|
212 |
|
|
sub rcv_async($$) { |
213 |
|
|
my ($port, $threadcb) = @_; |
214 |
|
|
|
215 |
|
|
my (@queue, $coro); |
216 |
|
|
|
217 |
|
|
AnyEvent::MP::rcv $port, sub { |
218 |
|
|
push @queue, \@_; # TODO, take copy? |
219 |
|
|
$coro->ready; # TODO, maybe too many unwanted wake-ups? |
220 |
|
|
}; |
221 |
|
|
|
222 |
root |
1.4 |
$coro = _new_coro $port, $threadcb; |
223 |
root |
1.1 |
$coro->{_coro_mp_queue} = \@queue; |
224 |
|
|
} |
225 |
|
|
|
226 |
|
|
=item @msg = get $tag |
227 |
|
|
|
228 |
|
|
=item @msg = get $tag, $timeout |
229 |
|
|
|
230 |
|
|
Find, dequeue and return the next message with the specified C<$tag>. If |
231 |
|
|
no matching message is currently queued, wait up to C<$timeout> seconds |
232 |
|
|
(or forever if no C<$timeout> has been specified or it is C<undef>) for |
233 |
|
|
one to arrive. |
234 |
|
|
|
235 |
|
|
Returns the message with the initial tag removed. In case of a timeout, |
236 |
|
|
the empty list. The function I<must> be called in list context. |
237 |
|
|
|
238 |
|
|
Note that empty messages cannot be distinguished from a timeout when using |
239 |
|
|
C<rcv>. |
240 |
|
|
|
241 |
|
|
=cut |
242 |
|
|
|
243 |
|
|
sub get($;$) { |
244 |
|
|
my ($tag, $timeout) = @_; |
245 |
|
|
|
246 |
|
|
my $queue = $Coro::current->{_coro_mp_queue} |
247 |
|
|
or Carp::croak "Coro::MP::get called from thread not attached to any port"; |
248 |
|
|
|
249 |
|
|
my $i; |
250 |
|
|
|
251 |
|
|
while () { |
252 |
|
|
$queue->[$_][0] eq $tag |
253 |
|
|
and return @{ splice @$queue, $_, 1 } |
254 |
|
|
for $i..$#$queue; |
255 |
|
|
|
256 |
|
|
$i = @$queue; |
257 |
|
|
|
258 |
|
|
# wait for more messages |
259 |
|
|
if (ref $timeout) { |
260 |
|
|
schedule; |
261 |
|
|
defined $i or return; # timeout |
262 |
|
|
|
263 |
|
|
} elsif (defined $timeout) { |
264 |
|
|
$timeout or return; |
265 |
|
|
|
266 |
|
|
my $current = $Coro::current; |
267 |
|
|
$timeout = AE::timer $timeout, 0, sub { |
268 |
|
|
undef $i; |
269 |
|
|
$current->ready; |
270 |
|
|
}; |
271 |
|
|
} else { |
272 |
|
|
$timeout = \$i; # dummy |
273 |
|
|
} |
274 |
|
|
} |
275 |
|
|
} |
276 |
|
|
|
277 |
root |
1.3 |
=item @msg = get_cond { condition... } [$timeout] |
278 |
|
|
|
279 |
|
|
Similarly to C<get>, looks for a matching message. Unlike C<get>, |
280 |
|
|
"matching" is not defined by a tag alone, but by a predicate, a piece of |
281 |
root |
1.4 |
code that is executed on each candidate message in turn, with C<@_> set to |
282 |
|
|
the message contents. |
283 |
root |
1.3 |
|
284 |
root |
1.4 |
The predicate code is supposed to return the empty list if the message |
285 |
|
|
didn't match. If it returns anything else, then the message is removed |
286 |
|
|
from the queue and returned to the caller. |
287 |
root |
1.3 |
|
288 |
root |
1.4 |
In addition, if the predicate returns a code reference, then it is |
289 |
|
|
immediately called invoked on the removed message. |
290 |
root |
1.3 |
|
291 |
|
|
If a C<$timeout> is specified and is not C<undef>, then, after this many |
292 |
|
|
seconds have been passed without a matching message arriving, the empty |
293 |
|
|
list will be returned. |
294 |
|
|
|
295 |
|
|
TODO |
296 |
|
|
|
297 |
|
|
=cut |
298 |
|
|
|
299 |
root |
1.4 |
sub get_cond(&;$) { |
300 |
root |
1.3 |
my ($cond, $timeout) = @_; |
301 |
|
|
|
302 |
|
|
my $queue = $Coro::current->{_coro_mp_queue} |
303 |
|
|
or Carp::croak "Coro::MP::get_cond called from thread not attached to any port"; |
304 |
|
|
|
305 |
|
|
my ($i, $ok); |
306 |
|
|
|
307 |
|
|
while () { |
308 |
|
|
do |
309 |
|
|
{ |
310 |
|
|
local *_ = $queue->[$_]; |
311 |
|
|
if ($ok = &$cond) { |
312 |
|
|
splice @$queue, $_, 1; |
313 |
|
|
&$ok if "CODE" eq ref $ok; |
314 |
|
|
return @_; |
315 |
|
|
} |
316 |
|
|
} |
317 |
|
|
for $i..$#$queue; |
318 |
|
|
|
319 |
|
|
$i = @$queue; |
320 |
|
|
|
321 |
|
|
# wait for more messages |
322 |
|
|
if (ref $timeout) { |
323 |
|
|
schedule; |
324 |
|
|
defined $i or return; # timeout |
325 |
|
|
|
326 |
|
|
} elsif (defined $timeout) { |
327 |
|
|
$timeout or return; |
328 |
|
|
|
329 |
|
|
my $current = $Coro::current; |
330 |
|
|
$timeout = AE::timer $timeout, 0, sub { |
331 |
|
|
undef $i; |
332 |
|
|
$current->ready; |
333 |
|
|
}; |
334 |
|
|
} else { |
335 |
|
|
$timeout = \$i; # dummy |
336 |
|
|
} |
337 |
|
|
} |
338 |
|
|
} |
339 |
|
|
|
340 |
|
|
=item $async = pasync { BLOCK } |
341 |
|
|
|
342 |
|
|
Sometimes you want to run a thread within a port context, for error |
343 |
|
|
handling. |
344 |
|
|
|
345 |
|
|
This function creates a new, ready, thread (using C<Coro::async>), sets |
346 |
|
|
C<$SELF> to the the current value of C<$SELF> while it executing, and |
347 |
|
|
calls the given BLOCK. |
348 |
|
|
|
349 |
|
|
This is very similar to C<psub> - note that while the BLOCK exeuctes in |
350 |
|
|
C<$SELF> port context, you cannot call C<get>, as C<$SELF> can only be |
351 |
|
|
attached to one thread. |
352 |
|
|
|
353 |
|
|
=cut |
354 |
|
|
|
355 |
|
|
sub pasync(&) { |
356 |
root |
1.4 |
_new_coro $SELF, $_[0] |
357 |
root |
1.1 |
} |
358 |
|
|
|
359 |
root |
1.3 |
=item @reply = syncal $port, @msg, $callback[, $timeout] |
360 |
root |
1.1 |
|
361 |
root |
1.3 |
The synchronous form of C<cal>, a simple form of RPC - it sends a message |
362 |
|
|
to the given C<$port> with the given contents (C<@msg>), but adds a reply |
363 |
|
|
port to the message. |
364 |
root |
1.1 |
|
365 |
|
|
The reply port is created temporarily just for the purpose of receiving |
366 |
|
|
the reply, and will be C<kil>ed when no longer needed. |
367 |
|
|
|
368 |
root |
1.3 |
Then it will wait until a reply message arrives, which will be returned to |
369 |
|
|
the caller. |
370 |
root |
1.1 |
|
371 |
root |
1.3 |
If the C<$timeout> is defined, then after this many seconds, when no |
372 |
|
|
message has arrived, the port will be C<kil>ed and an empty list will be |
373 |
|
|
returned. |
374 |
root |
1.1 |
|
375 |
root |
1.3 |
If the C<$timeout> is undef, then the local port will monitor the remote |
376 |
|
|
port instead, so it eventually gets cleaned-up. |
377 |
root |
1.1 |
|
378 |
root |
1.3 |
=cut |
379 |
root |
1.1 |
|
380 |
root |
1.3 |
sub syncal($@) { |
381 |
|
|
my ($timeout, @msg) = @_; |
382 |
root |
1.1 |
|
383 |
root |
1.3 |
cal @msg, Coro::rouse_cb, $timeout; |
384 |
|
|
Coro::rouse_wait |
385 |
root |
1.1 |
} |
386 |
|
|
|
387 |
|
|
=back |
388 |
|
|
|
389 |
|
|
=head1 SEE ALSO |
390 |
|
|
|
391 |
|
|
L<AnyEvent::MP::Intro> - a gentle introduction. |
392 |
|
|
|
393 |
|
|
L<AnyEvent::MP> - like Coro::MP, but event-based. |
394 |
|
|
|
395 |
|
|
L<AnyEvent>. |
396 |
|
|
|
397 |
|
|
=head1 AUTHOR |
398 |
|
|
|
399 |
|
|
Marc Lehmann <schmorp@schmorp.de> |
400 |
|
|
http://home.schmorp.de/ |
401 |
|
|
|
402 |
|
|
=cut |
403 |
|
|
|
404 |
|
|
1 |
405 |
|
|
|