ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.25
Committed: Tue Aug 4 21:07:37 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-0_1
Changes since 1.24: +1 -1 lines
Log Message:
0.1

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP - multi-processing/message-passing framework
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.22 $NODE # contains this node's noderef
10     NODE # returns this node's noderef
11     NODE $port # returns the noderef of the port
12 root 1.2
13     snd $port, type => data...;
14    
15 root 1.22 $SELF # receiving/own port id in rcv callbacks
16    
17 root 1.2 rcv $port, smartmatch => $cb->($port, @msg);
18    
19     # examples:
20     rcv $port2, ping => sub { snd $_[0], "pong"; 0 };
21     rcv $port1, pong => sub { warn "pong received\n" };
22     snd $port2, ping => $port1;
23    
24     # more, smarter, matches (_any_ is exported by this module)
25     rcv $port, [child_died => $pid] => sub { ...
26     rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
27    
28 root 1.1 =head1 DESCRIPTION
29    
30 root 1.2 This module (-family) implements a simple message passing framework.
31    
32     Despite its simplicity, you can securely message other processes running
33     on the same or other hosts.
34    
35 root 1.23 For an introduction to this module family, see the L<AnyEvent::MP::Intro>
36     manual page.
37    
38     At the moment, this module family is severly broken and underdocumented,
39 root 1.21 so do not use. This was uploaded mainly to reserve the CPAN namespace -
40 root 1.23 stay tuned! The basic API should be finished, however.
41 root 1.6
42 root 1.2 =head1 CONCEPTS
43    
44     =over 4
45    
46     =item port
47    
48     A port is something you can send messages to with the C<snd> function, and
49     you can register C<rcv> handlers with. All C<rcv> handlers will receive
50     messages they match, messages will not be queued.
51    
52 root 1.3 =item port id - C<noderef#portname>
53 root 1.2
54 root 1.3 A port id is always the noderef, a hash-mark (C<#>) as separator, followed
55     by a port name (a printable string of unspecified format).
56 root 1.2
57     =item node
58    
59     A node is a single process containing at least one port - the node
60     port. You can send messages to node ports to let them create new ports,
61     among other things.
62    
63     Initially, nodes are either private (single-process only) or hidden
64 root 1.3 (connected to a master node only). Only when they epxlicitly "become
65     public" can you send them messages from unrelated other nodes.
66 root 1.2
67 root 1.5 =item noderef - C<host:port,host:port...>, C<id@noderef>, C<id>
68 root 1.2
69 root 1.3 A noderef is a string that either uniquely identifies a given node (for
70 root 1.2 private and hidden nodes), or contains a recipe on how to reach a given
71     node (for public nodes).
72    
73     =back
74    
75 root 1.3 =head1 VARIABLES/FUNCTIONS
76 root 1.2
77     =over 4
78    
79 root 1.1 =cut
80    
81     package AnyEvent::MP;
82    
83 root 1.8 use AnyEvent::MP::Base;
84 root 1.2
85 root 1.1 use common::sense;
86    
87 root 1.2 use Carp ();
88    
89 root 1.1 use AE ();
90    
91 root 1.2 use base "Exporter";
92    
93 root 1.25 our $VERSION = '0.1';
94 root 1.8 our @EXPORT = qw(
95 root 1.22 NODE $NODE *SELF node_of _any_
96 root 1.8 become_slave become_public
97 root 1.22 snd rcv mon kil reg psub
98     port
99 root 1.8 );
100 root 1.2
101 root 1.22 our $SELF;
102    
103     sub _self_die() {
104     my $msg = $@;
105     $msg =~ s/\n+$// unless ref $msg;
106     kil $SELF, die => $msg;
107     }
108    
109     =item $thisnode = NODE / $NODE
110    
111     The C<NODE> function returns, and the C<$NODE> variable contains
112     the noderef of the local node. The value is initialised by a call
113     to C<become_public> or C<become_slave>, after which all local port
114     identifiers become invalid.
115    
116     =item $noderef = node_of $portid
117    
118     Extracts and returns the noderef from a portid or a noderef.
119    
120     =item $SELF
121    
122     Contains the current port id while executing C<rcv> callbacks or C<psub>
123     blocks.
124 root 1.3
125 root 1.22 =item SELF, %SELF, @SELF...
126    
127     Due to some quirks in how perl exports variables, it is impossible to
128     just export C<$SELF>, all the symbols called C<SELF> are exported by this
129     module, but only C<$SELF> is currently used.
130 root 1.3
131     =item snd $portid, type => @data
132    
133     =item snd $portid, @msg
134    
135 root 1.8 Send the given message to the given port ID, which can identify either
136     a local or a remote port, and can be either a string or soemthignt hat
137     stringifies a sa port ID (such as a port object :).
138    
139     While the message can be about anything, it is highly recommended to use a
140     string as first element (a portid, or some word that indicates a request
141     type etc.).
142 root 1.3
143     The message data effectively becomes read-only after a call to this
144     function: modifying any argument is not allowed and can cause many
145     problems.
146    
147     The type of data you can transfer depends on the transport protocol: when
148     JSON is used, then only strings, numbers and arrays and hashes consisting
149     of those are allowed (no objects). When Storable is used, then anything
150     that Storable can serialise and deserialise is allowed, and for the local
151     node, anything can be passed.
152    
153 root 1.22 =item kil $portid[, @reason]
154    
155     Kill the specified port with the given C<@reason>.
156    
157     If no C<@reason> is specified, then the port is killed "normally" (linked
158     ports will not be kileld, or even notified).
159    
160     Otherwise, linked ports get killed with the same reason (second form of
161     C<mon>, see below).
162    
163     Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
164     will be reported as reason C<< die => $@ >>.
165    
166     Transport/communication errors are reported as C<< transport_error =>
167     $message >>.
168    
169     =item $guard = mon $portid, $cb->(@reason)
170 root 1.18
171 root 1.21 =item $guard = mon $portid, $otherport
172    
173     =item $guard = mon $portid, $otherport, @msg
174    
175 root 1.22 Monitor the given port and do something when the port is killed.
176    
177     In the first form, the callback is simply called with any number
178     of C<@reason> elements (no @reason means that the port was deleted
179     "normally"). Note also that I<< the callback B<must> never die >>, so use
180     C<eval> if unsure.
181    
182     In the second form, the other port will be C<kil>'ed with C<@reason>, iff
183     a @reason was specified, i.e. on "normal" kils nothing happens, while
184     under all other conditions, the other port is killed with the same reason.
185 root 1.20
186 root 1.22 In the last form, a message of the form C<@msg, @reason> will be C<snd>.
187    
188     Example: call a given callback when C<$port> is killed.
189    
190     mon $port, sub { warn "port died because of <@_>\n" };
191    
192     Example: kill ourselves when C<$port> is killed abnormally.
193    
194     mon $port, $self;
195    
196     Example: send us a restart message another C<$port> is killed.
197    
198     mon $port, $self => "restart";
199 root 1.18
200     =cut
201    
202     sub mon {
203 root 1.21 my ($noderef, $port, $cb) = ((split /#/, shift, 2), shift);
204 root 1.18
205 root 1.22 my $node = $NODE{$noderef} || add_node $noderef;
206 root 1.18
207 root 1.21 #TODO: ports must not be references
208     if (!ref $cb or "AnyEvent::MP::Port" eq ref $cb) {
209     if (@_) {
210     # send a kill info message
211     my (@msg) = ($cb, @_);
212     $cb = sub { snd @msg, @_ };
213     } else {
214     # simply kill other port
215     my $port = $cb;
216 root 1.22 $cb = sub { kil $port, @_ if @_ };
217 root 1.21 }
218     }
219 root 1.18
220     $node->monitor ($port, $cb);
221    
222     defined wantarray
223     and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }
224     }
225    
226 root 1.21 =item $guard = mon_guard $port, $ref, $ref...
227    
228     Monitors the given C<$port> and keeps the passed references. When the port
229     is killed, the references will be freed.
230    
231     Optionally returns a guard that will stop the monitoring.
232    
233     This function is useful when you create e.g. timers or other watchers and
234     want to free them when the port gets killed:
235    
236     $port->rcv (start => sub {
237     my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub {
238     undef $timer if 0.9 < rand;
239     });
240     });
241    
242     =cut
243    
244     sub mon_guard {
245     my ($port, @refs) = @_;
246    
247     mon $port, sub { 0 && @refs }
248     }
249    
250 root 1.24 =item lnk $port1, $port2
251    
252     Link two ports. This is simply a shorthand for:
253    
254     mon $port1, $port2;
255     mon $port2, $port1;
256    
257     It means that if either one is killed abnormally, the other one gets
258     killed as well.
259    
260 root 1.22 =item $local_port = port
261 root 1.2
262 root 1.22 Create a new local port object that supports message matching.
263 root 1.3
264 root 1.22 =item $portid = port { my @msg = @_; $finished }
265 root 1.10
266 root 1.15 Creates a "mini port", that is, a very lightweight port without any
267     pattern matching behind it, and returns its ID.
268    
269     The block will be called for every message received on the port. When the
270     callback returns a true value its job is considered "done" and the port
271     will be destroyed. Otherwise it will stay alive.
272    
273 root 1.17 The message will be passed as-is, no extra argument (i.e. no port id) will
274 root 1.15 be passed to the callback.
275    
276     If you need the local port id in the callback, this works nicely:
277    
278     my $port; $port = miniport {
279     snd $otherport, reply => $port;
280     };
281 root 1.10
282     =cut
283    
284 root 1.22 sub port(;&) {
285     my $id = "$UNIQ." . $ID++;
286     my $port = "$NODE#$id";
287    
288     if (@_) {
289     my $cb = shift;
290     $PORT{$id} = sub {
291     local $SELF = $port;
292     eval {
293     &$cb
294     and kil $id;
295     };
296     _self_die if $@;
297     };
298     } else {
299     my $self = bless {
300     id => "$NODE#$id",
301     }, "AnyEvent::MP::Port";
302    
303     $PORT_DATA{$id} = $self;
304     $PORT{$id} = sub {
305     local $SELF = $port;
306    
307     eval {
308     for (@{ $self->{rc0}{$_[0]} }) {
309     $_ && &{$_->[0]}
310     && undef $_;
311     }
312    
313     for (@{ $self->{rcv}{$_[0]} }) {
314     $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
315     && &{$_->[0]}
316     && undef $_;
317     }
318    
319     for (@{ $self->{any} }) {
320     $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
321     && &{$_->[0]}
322     && undef $_;
323     }
324     };
325     _self_die if $@;
326     };
327     }
328 root 1.10
329 root 1.22 $port
330 root 1.10 }
331    
332 root 1.22 =item reg $portid, $name
333 root 1.8
334 root 1.22 Registers the given port under the name C<$name>. If the name already
335     exists it is replaced.
336 root 1.8
337 root 1.22 A port can only be registered under one well known name.
338 root 1.8
339 root 1.22 A port automatically becomes unregistered when it is killed.
340 root 1.8
341     =cut
342    
343 root 1.22 sub reg(@) {
344     my ($portid, $name) = @_;
345 root 1.8
346 root 1.22 $REG{$name} = $portid;
347     }
348 root 1.18
349 root 1.22 =item rcv $portid, tagstring => $callback->(@msg), ...
350 root 1.3
351 root 1.22 =item rcv $portid, $smartmatch => $callback->(@msg), ...
352 root 1.3
353 root 1.22 =item rcv $portid, [$smartmatch...] => $callback->(@msg), ...
354 root 1.3
355 root 1.22 Register callbacks to be called on matching messages on the given port.
356 root 1.3
357     The callback has to return a true value when its work is done, after
358     which is will be removed, or a false value in which case it will stay
359     registered.
360    
361 root 1.22 The global C<$SELF> (exported by this module) contains C<$portid> while
362     executing the callback.
363    
364     Runtime errors wdurign callback execution will result in the port being
365     C<kil>ed.
366    
367 root 1.3 If the match is an array reference, then it will be matched against the
368     first elements of the message, otherwise only the first element is being
369     matched.
370    
371     Any element in the match that is specified as C<_any_> (a function
372     exported by this module) matches any single element of the message.
373    
374     While not required, it is highly recommended that the first matching
375     element is a string identifying the message. The one-string-only match is
376     also the most efficient match (by far).
377    
378     =cut
379    
380     sub rcv($@) {
381 root 1.22 my ($noderef, $port) = split /#/, shift, 2;
382 root 1.3
383 root 1.22 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
384     or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught";
385    
386     my $self = $PORT_DATA{$port}
387     or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
388    
389     "AnyEvent::MP::Port" eq ref $self
390     or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
391    
392     while (@_) {
393     my ($match, $cb) = splice @_, 0, 2;
394    
395     if (!ref $match) {
396     push @{ $self->{rc0}{$match} }, [$cb];
397     } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
398     my ($type, @match) = @$match;
399     @match
400     ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
401     : push @{ $self->{rc0}{$match->[0]} }, [$cb];
402     } else {
403     push @{ $self->{any} }, [$cb, $match];
404     }
405 root 1.3 }
406 root 1.2 }
407    
408 root 1.22 =item $closure = psub { BLOCK }
409 root 1.2
410 root 1.22 Remembers C<$SELF> and creates a closure out of the BLOCK. When the
411     closure is executed, sets up the environment in the same way as in C<rcv>
412     callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
413    
414     This is useful when you register callbacks from C<rcv> callbacks:
415    
416     rcv delayed_reply => sub {
417     my ($delay, @reply) = @_;
418     my $timer = AE::timer $delay, 0, psub {
419     snd @reply, $SELF;
420     };
421     };
422 root 1.3
423 root 1.8 =cut
424 root 1.3
425 root 1.22 sub psub(&) {
426     my $cb = shift;
427 root 1.3
428 root 1.22 my $port = $SELF
429     or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
430 root 1.1
431 root 1.22 sub {
432     local $SELF = $port;
433 root 1.2
434 root 1.22 if (wantarray) {
435     my @res = eval { &$cb };
436     _self_die if $@;
437     @res
438     } else {
439     my $res = eval { &$cb };
440     _self_die if $@;
441     $res
442     }
443     }
444 root 1.2 }
445    
446 root 1.8 =back
447    
448     =head1 FUNCTIONS FOR NODES
449    
450     =over 4
451 root 1.2
452 root 1.8 =item become_public endpoint...
453    
454     Tells the node to become a public node, i.e. reachable from other nodes.
455    
456     If no arguments are given, or the first argument is C<undef>, then
457     AnyEvent::MP tries to bind on port C<4040> on all IP addresses that the
458     local nodename resolves to.
459    
460     Otherwise the first argument must be an array-reference with transport
461     endpoints ("ip:port", "hostname:port") or port numbers (in which case the
462     local nodename is used as hostname). The endpoints are all resolved and
463     will become the node reference.
464 root 1.2
465 root 1.8 =cut
466 root 1.1
467 root 1.4 =back
468    
469     =head1 NODE MESSAGES
470    
471 root 1.5 Nodes understand the following messages sent to them. Many of them take
472     arguments called C<@reply>, which will simply be used to compose a reply
473     message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and
474     the remaining arguments are simply the message data.
475 root 1.4
476     =over 4
477    
478     =cut
479    
480 root 1.22 =item lookup => $name, @reply
481 root 1.3
482 root 1.8 Replies with the port ID of the specified well-known port, or C<undef>.
483 root 1.3
484 root 1.7 =item devnull => ...
485    
486     Generic data sink/CPU heat conversion.
487    
488 root 1.4 =item relay => $port, @msg
489    
490     Simply forwards the message to the given port.
491    
492     =item eval => $string[ @reply]
493    
494     Evaluates the given string. If C<@reply> is given, then a message of the
495 root 1.5 form C<@reply, $@, @evalres> is sent.
496    
497     Example: crash another node.
498    
499     snd $othernode, eval => "exit";
500 root 1.4
501     =item time => @reply
502    
503     Replies the the current node time to C<@reply>.
504    
505 root 1.5 Example: tell the current node to send the current time to C<$myport> in a
506     C<timereply> message.
507    
508     snd $NODE, time => $myport, timereply => 1, 2;
509     # => snd $myport, timereply => 1, 2, <time>
510    
511 root 1.2 =back
512    
513 root 1.1 =head1 SEE ALSO
514    
515     L<AnyEvent>.
516    
517     =head1 AUTHOR
518    
519     Marc Lehmann <schmorp@schmorp.de>
520     http://home.schmorp.de/
521    
522     =cut
523    
524     1
525