ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-MP/MP.pm
Revision: 1.6
Committed: Fri Oct 2 20:41:31 2009 UTC (14 years, 7 months ago) by root
Branch: MAIN
Changes since 1.5: +3 -10 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     Coro::MP - erlang-style multi-processing/message-passing framework
4    
5     =head1 SYNOPSIS
6    
7     use Coro::MP;
8    
9     $NODE # contains this node's node ID
10     NODE # returns this node's node ID
11    
12     $SELF # receiving/own port id in rcv callbacks
13    
14     # initialise the node so it can send/receive messages
15     configure;
16    
17     # ports are message destinations
18    
19     # sending messages
20     snd $port, type => data...;
21     snd $port, @msg;
22     snd @msg_with_first_element_being_a_port;
23    
24     # creating/using ports
25 root 1.2 my $port = port_async {
26 root 1.1 # thread context, $SELF is set to $port
27    
28     # returning will "kil" the $port with an empty reason
29     };
30    
31     # simple receive
32     my $port = port {
33 root 1.2 my (undef, @data) = get "tag";
34 root 1.1 };
35     snd $port, tag => "data0", "data1";
36    
37     # create a port on another node
38     my $port = spawn $node, $initfunc, @initdata;
39    
40     # monitoring
41     mon $localport, $cb->(@msg) # callback is invoked on death
42     mon $localport, $otherport # kill otherport on abnormal death
43     mon $localport, $otherport, @msg # send message on death
44    
45     =head1 DESCRIPTION
46    
47     This module (-family) implements a simple message passing framework.
48    
49     Despite its simplicity, you can securely message other processes running
50     on the same or other hosts, and you can supervise entities remotely.
51    
52     This module depends heavily on L<AnyEvent::MP>, in fact, many functions
53     exported by this module are identical to AnyEvent::MP functions. This
54     module family is simply the Coro API to AnyEvent::MP.
55    
56     Care has been taken to stay compatible with AnyEvent::MP, even if
57     sometimes this required a less natural API (C<spawn> should indeed spawn a
58     thread, not just call an initfunc for example).
59    
60     For an introduction to AnyEvent::MP, see the L<AnyEvent::MP::Intro> manual
61     page.
62    
63     =head1 VARIABLES/FUNCTIONS
64    
65     =over 4
66    
67     =cut
68    
69     package Coro::MP;
70    
71     use common::sense;
72    
73     use Carp ();
74    
75     use AnyEvent::MP::Kernel;
76 root 1.4 use AnyEvent::MP;
77 root 1.1 use Coro;
78     use Coro::AnyEvent ();
79    
80     use AE ();
81    
82     use base "Exporter";
83    
84     our $VERSION = "0.1";
85    
86     our @EXPORT = qw(
87     NODE $NODE *SELF node_of
88     configure
89 root 1.3 port snd rcv mon kil psub cal spawn
90 root 1.4
91 root 1.3 port_async rcv_async get get_cond syncal pasync
92 root 1.1 );
93    
94 root 1.4 sub _new_coro {
95     my ($port, $threadcb) = @_;
96    
97     my $coro = async_pool {
98     eval { $threadcb->() };
99     kil $SELF, die => $@ if $@;
100     };
101     $coro->swap_sv (\$SELF, \$port);
102    
103     # killing the port cancels the coro
104 root 1.6 # delying kil messages inside aemp guarantees
105     # (hopefully) that $coro != $Coro::current.
106     mon $port, sub { $coro->cancel (@_) };
107 root 1.4
108     # cancelling the coro kills the port
109     $coro->on_destroy (sub { kil $port, @_ });
110 root 1.1
111 root 1.4 $coro
112 root 1.1 }
113    
114     =item NODE, $NODE, node_of, configure
115    
116     =item $SELF, *SELF, SELF, %SELF, @SELF...
117    
118 root 1.3 =item snd, mon, kil
119 root 1.1
120     These variables and functions work exactly as in AnyEvent::MP, in fact,
121 root 1.3 they are exactly the same functions, and are used in much the same way.
122    
123     =item rcv
124    
125     This function works exactly as C<AnyEvent::MP::rcv>, and is in fact
126     compatible with Coro::MP ports. However, the canonical way to receive
127     messages with Coro::MP is to use C<get> or C<get_cond>.
128    
129     =item port
130    
131     This function is exactly the same as C<AnyEvent::MP::port> and creates new
132     ports. You can attach a thread to them by calling C<rcv_async> or you can
133     do a create and attach in one operation using C<port_async>.
134    
135     =item psub
136    
137     This function works exactly as C<AnyEvent::MP::psub> - you could use it to
138     run callbacks within a port context (good for monitoring), but you cannot
139     C<get> messages unless the callback executes within the thread attached to
140     the port.
141    
142     Since creating a thread with port context requires somewhta annoying
143     syntax, there is a C<pasync> function that handles that for you - note
144     that within such a thread, you still cannot C<get> messages.
145 root 1.1
146     =item spawn
147    
148 root 1.3 This function is identical to C<AnyEvent::MP::spawn>. This means that
149 root 1.1 it doesn't spawn a new thread as one would expect, but simply calls an
150     init function. The init function, however, can attach a new thread easily:
151    
152     sub initfun {
153     my (@args) = @_;
154    
155 root 1.3 rcv_async $SELF, sub {
156 root 1.1 # thread-code
157     };
158     }
159    
160 root 1.3 =item cal
161    
162     This function is identical to C<AnyEvent::MP::cal>. The easiest way to
163     make a synchronous call is to use Coro's rouse functionality:
164    
165     # send 1, 2, 3 to $port and wait up to 30s for reply
166     cal $port, 1, 2, 3, rouse_cb, 30;
167     my @reply = rouse_wait;
168    
169     You can also use C<syncal> if you want, and are ok with learning yet
170 root 1.4 another function with a weird name:
171 root 1.3
172     my @reply = syncal 30, $port, 1, 2, 3;
173    
174 root 1.1 =item $local_port = port_async { ... }
175    
176     Creates a new local port, and returns its ID. A new thread is created and
177     attached to the port (see C<rcv_async>, below, for details).
178    
179     =cut
180    
181     sub rcv_async($$);
182    
183     sub port_async(;&) {
184     my $id = "$UNIQ." . $ID++;
185     my $port = "$NODE#$id";
186    
187     @_
188     ? rcv_async $port, shift
189     : AnyEvent::MP::rcv $port, undef;
190    
191     $port
192     }
193    
194     =item rcv_async $port, $threadcb
195    
196     This function creates and attaches a thread on a port. The thread is set
197     to execute C<$threadcb> and is put into the ready queue. The thread will
198     receive all messages not filtered away by tagged receive callbacks (as set
199     by C<AnyEvent::MP::rcv>) - it simply replaces the default callback of an
200     AnyEvent::MP port.
201    
202     The special variable C<$SELF> will be set to C<$port> during thread
203     execution.
204    
205     When C<$threadcb> returns or the thread is canceled, the return/cancel
206     values become the C<kil> reason.
207    
208     It is not allowed to call C<rcv_async> more than once on a given port.
209    
210     =cut
211    
212     sub rcv_async($$) {
213     my ($port, $threadcb) = @_;
214    
215     my (@queue, $coro);
216    
217     AnyEvent::MP::rcv $port, sub {
218     push @queue, \@_; # TODO, take copy?
219     $coro->ready; # TODO, maybe too many unwanted wake-ups?
220     };
221    
222 root 1.4 $coro = _new_coro $port, $threadcb;
223 root 1.1 $coro->{_coro_mp_queue} = \@queue;
224     }
225    
226     =item @msg = get $tag
227    
228     =item @msg = get $tag, $timeout
229    
230     Find, dequeue and return the next message with the specified C<$tag>. If
231     no matching message is currently queued, wait up to C<$timeout> seconds
232     (or forever if no C<$timeout> has been specified or it is C<undef>) for
233     one to arrive.
234    
235     Returns the message with the initial tag removed. In case of a timeout,
236     the empty list. The function I<must> be called in list context.
237    
238     Note that empty messages cannot be distinguished from a timeout when using
239     C<rcv>.
240    
241     =cut
242    
243     sub get($;$) {
244     my ($tag, $timeout) = @_;
245    
246     my $queue = $Coro::current->{_coro_mp_queue}
247     or Carp::croak "Coro::MP::get called from thread not attached to any port";
248    
249     my $i;
250    
251     while () {
252     $queue->[$_][0] eq $tag
253     and return @{ splice @$queue, $_, 1 }
254     for $i..$#$queue;
255    
256     $i = @$queue;
257    
258     # wait for more messages
259     if (ref $timeout) {
260     schedule;
261     defined $i or return; # timeout
262    
263     } elsif (defined $timeout) {
264     $timeout or return;
265    
266     my $current = $Coro::current;
267     $timeout = AE::timer $timeout, 0, sub {
268     undef $i;
269     $current->ready;
270     };
271     } else {
272     $timeout = \$i; # dummy
273     }
274     }
275     }
276    
277 root 1.3 =item @msg = get_cond { condition... } [$timeout]
278    
279     Similarly to C<get>, looks for a matching message. Unlike C<get>,
280     "matching" is not defined by a tag alone, but by a predicate, a piece of
281 root 1.4 code that is executed on each candidate message in turn, with C<@_> set to
282     the message contents.
283 root 1.3
284 root 1.4 The predicate code is supposed to return the empty list if the message
285     didn't match. If it returns anything else, then the message is removed
286     from the queue and returned to the caller.
287 root 1.3
288 root 1.4 In addition, if the predicate returns a code reference, then it is
289     immediately called invoked on the removed message.
290 root 1.3
291     If a C<$timeout> is specified and is not C<undef>, then, after this many
292     seconds have been passed without a matching message arriving, the empty
293     list will be returned.
294    
295     TODO
296    
297     =cut
298    
299 root 1.4 sub get_cond(&;$) {
300 root 1.3 my ($cond, $timeout) = @_;
301    
302     my $queue = $Coro::current->{_coro_mp_queue}
303     or Carp::croak "Coro::MP::get_cond called from thread not attached to any port";
304    
305     my ($i, $ok);
306    
307     while () {
308     do
309     {
310     local *_ = $queue->[$_];
311     if ($ok = &$cond) {
312     splice @$queue, $_, 1;
313     &$ok if "CODE" eq ref $ok;
314     return @_;
315     }
316     }
317     for $i..$#$queue;
318    
319     $i = @$queue;
320    
321     # wait for more messages
322     if (ref $timeout) {
323     schedule;
324     defined $i or return; # timeout
325    
326     } elsif (defined $timeout) {
327     $timeout or return;
328    
329     my $current = $Coro::current;
330     $timeout = AE::timer $timeout, 0, sub {
331     undef $i;
332     $current->ready;
333     };
334     } else {
335     $timeout = \$i; # dummy
336     }
337     }
338     }
339    
340     =item $async = pasync { BLOCK }
341    
342     Sometimes you want to run a thread within a port context, for error
343     handling.
344    
345     This function creates a new, ready, thread (using C<Coro::async>), sets
346     C<$SELF> to the the current value of C<$SELF> while it executing, and
347     calls the given BLOCK.
348    
349     This is very similar to C<psub> - note that while the BLOCK exeuctes in
350     C<$SELF> port context, you cannot call C<get>, as C<$SELF> can only be
351     attached to one thread.
352    
353     =cut
354    
355     sub pasync(&) {
356 root 1.4 _new_coro $SELF, $_[0]
357 root 1.1 }
358    
359 root 1.3 =item @reply = syncal $port, @msg, $callback[, $timeout]
360 root 1.1
361 root 1.3 The synchronous form of C<cal>, a simple form of RPC - it sends a message
362     to the given C<$port> with the given contents (C<@msg>), but adds a reply
363     port to the message.
364 root 1.1
365     The reply port is created temporarily just for the purpose of receiving
366     the reply, and will be C<kil>ed when no longer needed.
367    
368 root 1.3 Then it will wait until a reply message arrives, which will be returned to
369     the caller.
370 root 1.1
371 root 1.3 If the C<$timeout> is defined, then after this many seconds, when no
372     message has arrived, the port will be C<kil>ed and an empty list will be
373     returned.
374 root 1.1
375 root 1.3 If the C<$timeout> is undef, then the local port will monitor the remote
376     port instead, so it eventually gets cleaned-up.
377 root 1.1
378 root 1.3 =cut
379 root 1.1
380 root 1.3 sub syncal($@) {
381     my ($timeout, @msg) = @_;
382 root 1.1
383 root 1.3 cal @msg, Coro::rouse_cb, $timeout;
384     Coro::rouse_wait
385 root 1.1 }
386    
387     =back
388    
389     =head1 SEE ALSO
390    
391     L<AnyEvent::MP::Intro> - a gentle introduction.
392    
393     L<AnyEvent::MP> - like Coro::MP, but event-based.
394    
395     L<AnyEvent>.
396    
397     =head1 AUTHOR
398    
399     Marc Lehmann <schmorp@schmorp.de>
400     http://home.schmorp.de/
401    
402     =cut
403    
404     1
405