ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/Coro-MP/MP.pm
Revision: 1.16
Committed: Wed Sep 26 18:34:02 2012 UTC (11 years, 7 months ago) by root
Branch: MAIN
CVS Tags: HEAD
Changes since 1.15: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 Coro::MP - erlang-style multi-processing/message-passing framework
4
5 =head1 SYNOPSIS
6
7 use Coro::MP;
8
9 # exports everything that AnyEvent::MP exports as well.
10 # new stuff compared to AnyEvent::MP:
11
12 # creating/using ports from threads
13 my $port = port_async {
14 # thread context, $SELF is set to $port
15
16 # returning will "kil" the $port with an empty reason
17 };
18
19 # attach to an existing port
20 spawn $NODE, "::initfunc";
21 sub ::initfunc {
22 rcv_async $SELF, sub {
23 ...
24 };
25 }
26
27 # simple "tag" receives:
28 my ($pid) = get "pid", 30
29 or die "no pid message received after 30s";
30
31 # conditional receive
32 my ($tag, @data) = get_cond { $_[0] =~ /^group1_/ };
33 my @next_msg = get_cond { 1 } 30; # 30s timeout
34
35 # run thread in port context
36 peval_async $port, sub {
37 die "kill the port\n";
38 };
39
40 # synchronous "cal"
41 my @retval = syncal 30, $port, tag => $data;
42
43 =head1 DESCRIPTION
44
45 This module (-family) implements a simple message passing framework.
46
47 Despite its simplicity, you can securely message other processes running
48 on the same or other hosts, and you can supervise entities remotely.
49
50 This module depends heavily on L<AnyEvent::MP>, in fact, many functions
51 exported by this module are identical to AnyEvent::MP functions. This
52 module family is simply the Coro API to AnyEvent::MP.
53
54 Care has been taken to stay compatible with AnyEvent::MP, even if
55 sometimes this required a less natural API (C<spawn> should indeed spawn a
56 thread, not just call an initfunc for example).
57
58 For an introduction, see L<AnyEvent::MP::Intro>, which also covers
59 C<Coro::MP>.
60
61 =head1 VARIABLES/FUNCTIONS
62
63 =over 4
64
65 =cut
66
67 package Coro::MP;
68
69 use common::sense;
70
71 use Carp ();
72
73 use AnyEvent::MP::Kernel;
74 use AnyEvent::MP;
75 use Coro;
76 use Coro::AnyEvent ();
77
78 use AE ();
79
80 use base "Exporter";
81
82 our $VERSION = "0.1";
83
84 our @EXPORT = (@AnyEvent::MP::EXPORT, qw(
85 port_async rcv_async get get_cond syncal peval_async
86 ));
87 our @EXPORT_OK = (@AnyEvent::MP::EXPORT_OK);
88
89 sub _new_coro {
90 my ($port, $threadcb) = @_;
91
92 my $coro = async_pool {
93 # we do it inside the thread, because this hopefully gives us the correct $SELF,
94 # as $SELF should not be localised when threads are switched.
95 $Coro::current->swap_sv (\$SELF, \$port);
96 eval { $threadcb->() };
97 kil $SELF, die => $@ if $@;
98 };
99
100 # killing the port cancels the coro
101 # delaying kil messages inside aemp guarantees
102 # (hopefully) that $coro != $Coro::current.
103 mon $port, sub { $coro->cancel (@_) };
104
105 # cancelling the coro kills the port
106 $coro->on_destroy (sub { kil $port, @_ });
107
108 $coro
109 }
110
111 =item NODE, $NODE, node_of, configure
112
113 =item $SELF, *SELF, SELF, %SELF, @SELF...
114
115 =item snd, mon, kil, psub
116
117 These variables and functions work exactly as in AnyEvent::MP, in fact,
118 they are exactly the same functions, and are used in much the same way.
119
120 =item rcv
121
122 This function works exactly as C<AnyEvent::MP::rcv>, and is in fact
123 compatible with Coro::MP ports. However, the canonical way to receive
124 messages with Coro::MP is to use C<get> or C<get_cond>.
125
126 =item port
127
128 This function is exactly the same as C<AnyEvent::MP::port> and creates new
129 ports. You can attach a thread to them by calling C<rcv_async> or you can
130 do a create and attach in one operation using C<port_async>.
131
132 =item peval
133
134 This function works exactly as C<AnyEvent::MP::peval> - you could use it to
135 run callbacks within a port context (good for monitoring), but you cannot
136 C<get> messages unless the callback executes within the thread attached to
137 the port.
138
139 Since creating a thread with port context requires somewhat annoying
140 syntax, there is a C<peval_async> function that handles that for you -
141 note that within such a thread, you still cannot C<get> messages.
142
143 =item spawn
144
145 This function is identical to C<AnyEvent::MP::spawn>. This means that
146 it doesn't spawn a new thread as one would expect, but simply calls an
147 init function. The init function, however, can attach a new thread easily:
148
149 sub initfun {
150 my (@args) = @_;
151
152 rcv_async $SELF, sub {
153 # thread-code
154 };
155 }
156
157 =item cal
158
159 This function is identical to C<AnyEvent::MP::cal>. The easiest way to
160 make a synchronous call is to use Coro's rouse functionality:
161
162 # send 1, 2, 3 to $port and wait up to 30s for reply
163 cal $port, 1, 2, 3, rouse_cb, 30;
164 my @reply = rouse_wait;
165
166 You can also use C<syncal> if you want, and feel fine with learning yet
167 another function with a weird name:
168
169 my @reply = syncal 30, $port, 1, 2, 3;
170
171 =item $local_port = port_async { ... }
172
173 Creates a new local port, and returns its ID. A new thread is created and
174 attached to the port (see C<rcv_async>, below, for details).
175
176 =cut
177
178 sub rcv_async($$);
179
180 sub port_async(;&) {
181 my $id = "$AnyEvent::MP::Kernel::UNIQ." . $AnyEvent::MP::Kernel::ID++;
182 my $port = "$NODE#$id";
183
184 @_
185 ? rcv_async $port, shift
186 : AnyEvent::MP::rcv $port, undef;
187
188 $port
189 }
190
191 =item rcv_async $port, $threadcb
192
193 This function creates and attaches a thread on a port. The thread is set
194 to execute C<$threadcb> and is put into the ready queue. The thread will
195 receive all messages not filtered away by tagged receive callbacks (as set
196 by C<AnyEvent::MP::rcv>) - it simply replaces the default callback of an
197 AnyEvent::MP port.
198
199 The special variable C<$SELF> will be set to C<$port> during thread
200 execution.
201
202 When C<$threadcb> returns or the thread is canceled, the return/cancel
203 values become the C<kil> reason.
204
205 It is not allowed to call C<rcv_async> more than once on a given port.
206
207 =cut
208
209 sub rcv_async($$) {
210 my ($port, $threadcb) = @_;
211
212 my (@queue, $coro);
213
214 AnyEvent::MP::rcv $port, sub {
215 push @queue, \@_; # TODO, take copy?
216 $coro->ready; # TODO, maybe too many unwanted wake-ups?
217 };
218
219 $coro = _new_coro $port, $threadcb;
220 $coro->{_coro_mp_queue} = \@queue;
221 }
222
223 =item @msg = get $tag
224
225 =item @msg = get $tag, $timeout
226
227 Find, dequeue and return the next message with the specified C<$tag>. If
228 no matching message is currently queued, wait up to C<$timeout> seconds
229 (or forever if no C<$timeout> has been specified or it is C<undef>) for
230 one to arrive.
231
232 Returns the message with the initial tag removed. In case of a timeout,
233 the empty list. The function I<must> be called in list context.
234
235 Note that empty messages cannot be distinguished from a timeout when using
236 C<rcv>.
237
238 Example: send a "log" message to C<$SELF> and then get and print it.
239
240 snd $SELF, log => "text";
241 my ($text) = get "log";
242 print "log message: $text\n";
243
244 Example: receive C<p1> and C<p2> messages, regardless of the order they
245 arrive in on the port.
246
247 my @p1 = get "p1";
248 my @21 = get "p2";
249
250 Example: assume a message with tag C<now> is already in the queue and fetch
251 it. If no message was there, do not wait, but die.
252
253 my @msg = get "now", 0
254 or die "expected now emssage to be there, but it wasn't";
255
256 =cut
257
258 sub get($;$) {
259 my ($tag, $timeout) = @_;
260
261 my $queue = $Coro::current->{_coro_mp_queue}
262 or Carp::croak "Coro::MP::get called from thread not attached to any port";
263
264 my $i;
265
266 while () {
267 $queue->[$_][0] eq $tag
268 and return @{ splice @$queue, $_, 1 }
269 for $i..$#$queue;
270
271 $i = @$queue;
272
273 # wait for more messages
274 if (ref $timeout) {
275 schedule;
276 defined $i or return; # timeout
277
278 } elsif (defined $timeout) {
279 $timeout or return;
280
281 my $current = $Coro::current;
282 $timeout = AE::timer $timeout, 0, sub {
283 undef $i;
284 $current->ready;
285 };
286 } else {
287 $timeout = \$i; # dummy
288 }
289 }
290 }
291
292 =item @msg = get_cond { condition... } [$timeout]
293
294 Similarly to C<get>, looks for a matching message. Unlike C<get>,
295 "matching" is not defined by a tag alone, but by a predicate, a piece of
296 code that is executed on each candidate message in turn, with C<@_> set to
297 the message contents.
298
299 The predicate code is supposed to return the empty list if the message
300 didn't match. If it returns anything else, then the message is removed
301 from the queue and returned to the caller.
302
303 In addition, if the predicate returns a code reference, then it is
304 immediately called invoked on the removed message.
305
306 If a C<$timeout> is specified and is not C<undef>, then, after this many
307 seconds have been passed without a matching message arriving, the empty
308 list will be returned.
309
310 Example: fetch the next message, wait as long as necessary.
311
312 my @msg = get_cond { 1 };
313
314 Example: fetch the next message whose tag starts with C<group1_>.
315
316 my ($tag, @data) = get_cond { $_[0] =~ /^group1_/ };
317
318 Example: check whether a message with tag C<child_exit> and a second
319 elemet of C<$pid> is in the queue already.
320
321 if (
322 my (undef, $pid, $status) =
323 get_cond {
324 $_[0] eq "child_exit" && $_[1] == $pid
325 } 0
326 ) {
327 warn "child $pid did exit with status $status\n";
328 }
329
330 Example: implement a server that reacts to C<log>, C<exit> and C<reverse>
331 messages, and exits after 30 seconds of idling.
332
333 my $reverser = port_async {
334 while() {
335 get_cond {
336 $_[0] eq "exit" and return sub {
337 last; # yes, this is valid
338 };
339 $_[0] eq "log" and return sub {
340 print "log: $_[1]\n";
341 };
342 $_[0] eq "reverse" and return sub {
343 my (undef, $text, @reply) = @_;
344 snd @reply, scalar reverse $text;
345 };
346
347 die "unexpected message $_[0] received";
348 } 30
349 or last;
350 }
351 };
352
353 =cut
354
355 sub _true { 1 }
356
357 sub get_cond(;&$) {
358 my ($cond, $timeout) = @_;
359
360 my $queue = $Coro::current->{_coro_mp_queue}
361 or Carp::croak "Coro::MP::get_cond called from thread not attached to any port";
362
363 my ($i, $ok);
364
365 $cond ||= \&_true;
366
367 while () {
368 do
369 {
370 local *_ = $queue->[$_];
371 if ($ok = &$cond) {
372 splice @$queue, $_, 1;
373 &$ok if "CODE" eq ref $ok;
374 return @_;
375 }
376 }
377 for $i..$#$queue;
378
379 $i = @$queue;
380
381 # wait for more messages
382 if (ref $timeout) {
383 schedule;
384 defined $i or return; # timeout
385
386 } elsif (defined $timeout) {
387 $timeout or return;
388
389 my $current = $Coro::current;
390 $timeout = AE::timer $timeout, 0, sub {
391 undef $i;
392 $current->ready;
393 };
394 } else {
395 $timeout = \$i; # dummy
396 }
397 }
398 }
399
400 =item $async = peval_async $port, sub { BLOCK }
401
402 Sometimes you want to run a thread within a port context, for error
403 handling.
404
405 This function creates a new, ready, thread (using C<Coro::async>), sets
406 C<$SELF> to the the current value of C<$SELF> while it executing, and
407 calls the given BLOCK.
408
409 This is very similar to C<psub> - note that while the BLOCK exeuctes in
410 C<$SELF> port context, you cannot call C<get>, as C<$SELF> can only be
411 attached to one thread.
412
413 Example: execute some Coro::AIO code concurrently in another thread, but
414 make sure any errors C<kil> the originating port.
415
416 port_async {
417 ...
418 peval_async $SELF, {
419 # $SELF set, but cannot call get etc. here
420
421 my $fh = aio_open ...
422 or die "open: $!";
423
424 aio_close $fh;
425 };
426 };
427
428 =cut
429
430 sub peval_async($$) {
431 _new_coro $_[0], $_[1]
432 }
433
434 =item @reply = syncal $timeout, $port => @msg
435
436 The synchronous form of C<cal>, a simple form of RPC - it sends a message
437 to the given C<$port> with the given contents (C<@msg>), but adds a reply
438 port to the message.
439
440 The reply port is created temporarily just for the purpose of receiving
441 the reply, and will be C<kil>ed when no longer needed.
442
443 Then it will wait until a reply message arrives, which will be returned to
444 the caller.
445
446 If the C<$timeout> is defined, then after this many seconds, when no
447 message has arrived, the port will be C<kil>ed and an empty list will be
448 returned.
449
450 If the C<$timeout> is undef, then the local port will monitor the remote
451 port instead, so it eventually gets cleaned-up.
452
453 Example: call the string reverse example from C<get_cond>.
454
455 my $reversed = syncal 1, $reverse, reverse => "Rotator";
456
457 =cut
458
459 sub syncal($@) {
460 my ($timeout, @msg) = @_;
461
462 cal @msg, Coro::rouse_cb, $timeout;
463 Coro::rouse_wait
464 }
465
466 =back
467
468 =head1 SEE ALSO
469
470 L<AnyEvent::MP::Intro> - a gentle introduction.
471
472 L<AnyEvent::MP> - like Coro::MP, but event-based.
473
474 L<AnyEvent>.
475
476 =head1 AUTHOR
477
478 Marc Lehmann <schmorp@schmorp.de>
479 http://home.schmorp.de/
480
481 =cut
482
483 1
484