ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.30
Committed: Tue Aug 4 23:35:51 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.29: +5 -4 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP - multi-processing/message-passing framework
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.22 $NODE # contains this node's noderef
10     NODE # returns this node's noderef
11     NODE $port # returns the noderef of the port
12 root 1.2
13     snd $port, type => data...;
14    
15 root 1.22 $SELF # receiving/own port id in rcv callbacks
16    
17 root 1.2 rcv $port, smartmatch => $cb->($port, @msg);
18    
19     # examples:
20     rcv $port2, ping => sub { snd $_[0], "pong"; 0 };
21     rcv $port1, pong => sub { warn "pong received\n" };
22     snd $port2, ping => $port1;
23    
24     # more, smarter, matches (_any_ is exported by this module)
25     rcv $port, [child_died => $pid] => sub { ...
26     rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
27    
28 root 1.1 =head1 DESCRIPTION
29    
30 root 1.2 This module (-family) implements a simple message passing framework.
31    
32     Despite its simplicity, you can securely message other processes running
33     on the same or other hosts.
34    
35 root 1.23 For an introduction to this module family, see the L<AnyEvent::MP::Intro>
36     manual page.
37    
38     At the moment, this module family is severly broken and underdocumented,
39 root 1.21 so do not use. This was uploaded mainly to reserve the CPAN namespace -
40 root 1.23 stay tuned! The basic API should be finished, however.
41 root 1.6
42 root 1.2 =head1 CONCEPTS
43    
44     =over 4
45    
46     =item port
47    
48 root 1.29 A port is something you can send messages to (with the C<snd> function).
49    
50     Some ports allow you to register C<rcv> handlers that can match specific
51     messages. All C<rcv> handlers will receive messages they match, messages
52     will not be queued.
53 root 1.2
54 root 1.3 =item port id - C<noderef#portname>
55 root 1.2
56 root 1.29 A port id is normaly the concatenation of a noderef, a hash-mark (C<#>) as
57     separator, and a port name (a printable string of unspecified format). An
58 root 1.30 exception is the the node port, whose ID is identical to its node
59 root 1.29 reference.
60 root 1.2
61     =item node
62    
63     A node is a single process containing at least one port - the node
64 root 1.29 port. You can send messages to node ports to find existing ports or to
65     create new ports, among other things.
66 root 1.2
67 root 1.29 Nodes are either private (single-process only), slaves (connected to a
68     master node only) or public nodes (connectable from unrelated nodes).
69 root 1.2
70 root 1.5 =item noderef - C<host:port,host:port...>, C<id@noderef>, C<id>
71 root 1.2
72 root 1.29 A node reference is a string that either simply identifies the node (for
73     private and slave nodes), or contains a recipe on how to reach a given
74 root 1.2 node (for public nodes).
75    
76 root 1.29 This recipe is simply a comma-separated list of C<address:port> pairs (for
77     TCP/IP, other protocols might look different).
78    
79     Node references come in two flavours: resolved (containing only numerical
80     addresses) or unresolved (where hostnames are used instead of addresses).
81    
82     Before using an unresolved node reference in a message you first have to
83     resolve it.
84    
85 root 1.2 =back
86    
87 root 1.3 =head1 VARIABLES/FUNCTIONS
88 root 1.2
89     =over 4
90    
91 root 1.1 =cut
92    
93     package AnyEvent::MP;
94    
95 root 1.8 use AnyEvent::MP::Base;
96 root 1.2
97 root 1.1 use common::sense;
98    
99 root 1.2 use Carp ();
100    
101 root 1.1 use AE ();
102    
103 root 1.2 use base "Exporter";
104    
105 root 1.25 our $VERSION = '0.1';
106 root 1.8 our @EXPORT = qw(
107 root 1.22 NODE $NODE *SELF node_of _any_
108 root 1.29 resolve_node
109 root 1.8 become_slave become_public
110 root 1.22 snd rcv mon kil reg psub
111     port
112 root 1.8 );
113 root 1.2
114 root 1.22 our $SELF;
115    
116     sub _self_die() {
117     my $msg = $@;
118     $msg =~ s/\n+$// unless ref $msg;
119     kil $SELF, die => $msg;
120     }
121    
122     =item $thisnode = NODE / $NODE
123    
124     The C<NODE> function returns, and the C<$NODE> variable contains
125     the noderef of the local node. The value is initialised by a call
126     to C<become_public> or C<become_slave>, after which all local port
127     identifiers become invalid.
128    
129     =item $noderef = node_of $portid
130    
131     Extracts and returns the noderef from a portid or a noderef.
132    
133 root 1.29 =item $cv = resolve_node $noderef
134    
135     Takes an unresolved node reference that may contain hostnames and
136     abbreviated IDs, resolves all of them and returns a resolved node
137     reference.
138    
139     In addition to C<address:port> pairs allowed in resolved noderefs, the
140     following forms are supported:
141    
142     =over 4
143    
144     =item the empty string
145    
146     An empty-string component gets resolved as if the default port (4040) was
147     specified.
148    
149     =item naked port numbers (e.g. C<1234>)
150    
151     These are resolved by prepending the local nodename and a colon, to be
152     further resolved.
153    
154     =item hostnames (e.g. C<localhost:1234>, C<localhost>)
155    
156     These are resolved by using AnyEvent::DNS to resolve them, optionally
157     looking up SRV records for the C<aemp=4040> port, if no port was
158     specified.
159    
160     =back
161    
162 root 1.22 =item $SELF
163    
164     Contains the current port id while executing C<rcv> callbacks or C<psub>
165     blocks.
166 root 1.3
167 root 1.22 =item SELF, %SELF, @SELF...
168    
169     Due to some quirks in how perl exports variables, it is impossible to
170     just export C<$SELF>, all the symbols called C<SELF> are exported by this
171     module, but only C<$SELF> is currently used.
172 root 1.3
173     =item snd $portid, type => @data
174    
175     =item snd $portid, @msg
176    
177 root 1.8 Send the given message to the given port ID, which can identify either
178     a local or a remote port, and can be either a string or soemthignt hat
179     stringifies a sa port ID (such as a port object :).
180    
181     While the message can be about anything, it is highly recommended to use a
182     string as first element (a portid, or some word that indicates a request
183     type etc.).
184 root 1.3
185     The message data effectively becomes read-only after a call to this
186     function: modifying any argument is not allowed and can cause many
187     problems.
188    
189     The type of data you can transfer depends on the transport protocol: when
190     JSON is used, then only strings, numbers and arrays and hashes consisting
191     of those are allowed (no objects). When Storable is used, then anything
192     that Storable can serialise and deserialise is allowed, and for the local
193     node, anything can be passed.
194    
195 root 1.22 =item kil $portid[, @reason]
196    
197     Kill the specified port with the given C<@reason>.
198    
199     If no C<@reason> is specified, then the port is killed "normally" (linked
200     ports will not be kileld, or even notified).
201    
202     Otherwise, linked ports get killed with the same reason (second form of
203     C<mon>, see below).
204    
205     Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
206     will be reported as reason C<< die => $@ >>.
207    
208     Transport/communication errors are reported as C<< transport_error =>
209     $message >>.
210    
211     =item $guard = mon $portid, $cb->(@reason)
212 root 1.18
213 root 1.21 =item $guard = mon $portid, $otherport
214    
215     =item $guard = mon $portid, $otherport, @msg
216    
217 root 1.22 Monitor the given port and do something when the port is killed.
218    
219     In the first form, the callback is simply called with any number
220     of C<@reason> elements (no @reason means that the port was deleted
221     "normally"). Note also that I<< the callback B<must> never die >>, so use
222     C<eval> if unsure.
223    
224     In the second form, the other port will be C<kil>'ed with C<@reason>, iff
225     a @reason was specified, i.e. on "normal" kils nothing happens, while
226     under all other conditions, the other port is killed with the same reason.
227 root 1.20
228 root 1.22 In the last form, a message of the form C<@msg, @reason> will be C<snd>.
229    
230     Example: call a given callback when C<$port> is killed.
231    
232     mon $port, sub { warn "port died because of <@_>\n" };
233    
234     Example: kill ourselves when C<$port> is killed abnormally.
235    
236     mon $port, $self;
237    
238     Example: send us a restart message another C<$port> is killed.
239    
240     mon $port, $self => "restart";
241 root 1.18
242     =cut
243    
244     sub mon {
245 root 1.30 my ($noderef, $port) = split /#/, shift, 2;
246 root 1.18
247 root 1.22 my $node = $NODE{$noderef} || add_node $noderef;
248 root 1.18
249 root 1.30 my $cb = shift;
250    
251     unless (ref $cb) {
252 root 1.21 if (@_) {
253     # send a kill info message
254     my (@msg) = ($cb, @_);
255     $cb = sub { snd @msg, @_ };
256     } else {
257     # simply kill other port
258     my $port = $cb;
259 root 1.22 $cb = sub { kil $port, @_ if @_ };
260 root 1.21 }
261     }
262 root 1.18
263     $node->monitor ($port, $cb);
264    
265     defined wantarray
266     and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }
267     }
268    
269 root 1.21 =item $guard = mon_guard $port, $ref, $ref...
270    
271     Monitors the given C<$port> and keeps the passed references. When the port
272     is killed, the references will be freed.
273    
274     Optionally returns a guard that will stop the monitoring.
275    
276     This function is useful when you create e.g. timers or other watchers and
277     want to free them when the port gets killed:
278    
279     $port->rcv (start => sub {
280     my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub {
281     undef $timer if 0.9 < rand;
282     });
283     });
284    
285     =cut
286    
287     sub mon_guard {
288     my ($port, @refs) = @_;
289    
290     mon $port, sub { 0 && @refs }
291     }
292    
293 root 1.24 =item lnk $port1, $port2
294    
295     Link two ports. This is simply a shorthand for:
296    
297     mon $port1, $port2;
298     mon $port2, $port1;
299    
300     It means that if either one is killed abnormally, the other one gets
301     killed as well.
302    
303 root 1.22 =item $local_port = port
304 root 1.2
305 root 1.22 Create a new local port object that supports message matching.
306 root 1.3
307 root 1.22 =item $portid = port { my @msg = @_; $finished }
308 root 1.10
309 root 1.15 Creates a "mini port", that is, a very lightweight port without any
310     pattern matching behind it, and returns its ID.
311    
312     The block will be called for every message received on the port. When the
313     callback returns a true value its job is considered "done" and the port
314     will be destroyed. Otherwise it will stay alive.
315    
316 root 1.17 The message will be passed as-is, no extra argument (i.e. no port id) will
317 root 1.15 be passed to the callback.
318    
319     If you need the local port id in the callback, this works nicely:
320    
321     my $port; $port = miniport {
322     snd $otherport, reply => $port;
323     };
324 root 1.10
325     =cut
326    
327 root 1.22 sub port(;&) {
328     my $id = "$UNIQ." . $ID++;
329     my $port = "$NODE#$id";
330    
331     if (@_) {
332     my $cb = shift;
333     $PORT{$id} = sub {
334     local $SELF = $port;
335     eval {
336     &$cb
337     and kil $id;
338     };
339     _self_die if $@;
340     };
341     } else {
342     my $self = bless {
343     id => "$NODE#$id",
344     }, "AnyEvent::MP::Port";
345    
346     $PORT_DATA{$id} = $self;
347     $PORT{$id} = sub {
348     local $SELF = $port;
349    
350     eval {
351     for (@{ $self->{rc0}{$_[0]} }) {
352     $_ && &{$_->[0]}
353     && undef $_;
354     }
355    
356     for (@{ $self->{rcv}{$_[0]} }) {
357     $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
358     && &{$_->[0]}
359     && undef $_;
360     }
361    
362     for (@{ $self->{any} }) {
363     $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
364     && &{$_->[0]}
365     && undef $_;
366     }
367     };
368     _self_die if $@;
369     };
370     }
371 root 1.10
372 root 1.22 $port
373 root 1.10 }
374    
375 root 1.22 =item reg $portid, $name
376 root 1.8
377 root 1.22 Registers the given port under the name C<$name>. If the name already
378     exists it is replaced.
379 root 1.8
380 root 1.22 A port can only be registered under one well known name.
381 root 1.8
382 root 1.22 A port automatically becomes unregistered when it is killed.
383 root 1.8
384     =cut
385    
386 root 1.22 sub reg(@) {
387     my ($portid, $name) = @_;
388 root 1.8
389 root 1.22 $REG{$name} = $portid;
390     }
391 root 1.18
392 root 1.22 =item rcv $portid, tagstring => $callback->(@msg), ...
393 root 1.3
394 root 1.22 =item rcv $portid, $smartmatch => $callback->(@msg), ...
395 root 1.3
396 root 1.22 =item rcv $portid, [$smartmatch...] => $callback->(@msg), ...
397 root 1.3
398 root 1.22 Register callbacks to be called on matching messages on the given port.
399 root 1.3
400     The callback has to return a true value when its work is done, after
401     which is will be removed, or a false value in which case it will stay
402     registered.
403    
404 root 1.22 The global C<$SELF> (exported by this module) contains C<$portid> while
405     executing the callback.
406    
407     Runtime errors wdurign callback execution will result in the port being
408     C<kil>ed.
409    
410 root 1.3 If the match is an array reference, then it will be matched against the
411     first elements of the message, otherwise only the first element is being
412     matched.
413    
414     Any element in the match that is specified as C<_any_> (a function
415     exported by this module) matches any single element of the message.
416    
417     While not required, it is highly recommended that the first matching
418     element is a string identifying the message. The one-string-only match is
419     also the most efficient match (by far).
420    
421     =cut
422    
423     sub rcv($@) {
424 root 1.22 my ($noderef, $port) = split /#/, shift, 2;
425 root 1.3
426 root 1.22 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
427     or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught";
428    
429     my $self = $PORT_DATA{$port}
430     or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
431    
432     "AnyEvent::MP::Port" eq ref $self
433     or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
434    
435     while (@_) {
436     my ($match, $cb) = splice @_, 0, 2;
437    
438     if (!ref $match) {
439     push @{ $self->{rc0}{$match} }, [$cb];
440     } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
441     my ($type, @match) = @$match;
442     @match
443     ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
444     : push @{ $self->{rc0}{$match->[0]} }, [$cb];
445     } else {
446     push @{ $self->{any} }, [$cb, $match];
447     }
448 root 1.3 }
449 root 1.2 }
450    
451 root 1.22 =item $closure = psub { BLOCK }
452 root 1.2
453 root 1.22 Remembers C<$SELF> and creates a closure out of the BLOCK. When the
454     closure is executed, sets up the environment in the same way as in C<rcv>
455     callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
456    
457     This is useful when you register callbacks from C<rcv> callbacks:
458    
459     rcv delayed_reply => sub {
460     my ($delay, @reply) = @_;
461     my $timer = AE::timer $delay, 0, psub {
462     snd @reply, $SELF;
463     };
464     };
465 root 1.3
466 root 1.8 =cut
467 root 1.3
468 root 1.22 sub psub(&) {
469     my $cb = shift;
470 root 1.3
471 root 1.22 my $port = $SELF
472     or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
473 root 1.1
474 root 1.22 sub {
475     local $SELF = $port;
476 root 1.2
477 root 1.22 if (wantarray) {
478     my @res = eval { &$cb };
479     _self_die if $@;
480     @res
481     } else {
482     my $res = eval { &$cb };
483     _self_die if $@;
484     $res
485     }
486     }
487 root 1.2 }
488    
489 root 1.8 =back
490    
491     =head1 FUNCTIONS FOR NODES
492    
493     =over 4
494 root 1.2
495 root 1.29 =item become_public $noderef
496 root 1.8
497     Tells the node to become a public node, i.e. reachable from other nodes.
498    
499 root 1.29 The first argument is the (unresolved) node reference of the local node
500     (if missing then the empty string is used).
501    
502     It is quite common to not specify anything, in which case the local node
503     tries to listen on the default port, or to only specify a port number, in
504     which case AnyEvent::MP tries to guess the local addresses.
505 root 1.2
506 root 1.8 =cut
507 root 1.1
508 root 1.4 =back
509    
510     =head1 NODE MESSAGES
511    
512 root 1.5 Nodes understand the following messages sent to them. Many of them take
513     arguments called C<@reply>, which will simply be used to compose a reply
514     message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and
515     the remaining arguments are simply the message data.
516 root 1.4
517 root 1.29 While other messages exist, they are not public and subject to change.
518    
519 root 1.4 =over 4
520    
521     =cut
522    
523 root 1.22 =item lookup => $name, @reply
524 root 1.3
525 root 1.8 Replies with the port ID of the specified well-known port, or C<undef>.
526 root 1.3
527 root 1.7 =item devnull => ...
528    
529     Generic data sink/CPU heat conversion.
530    
531 root 1.4 =item relay => $port, @msg
532    
533     Simply forwards the message to the given port.
534    
535     =item eval => $string[ @reply]
536    
537     Evaluates the given string. If C<@reply> is given, then a message of the
538 root 1.5 form C<@reply, $@, @evalres> is sent.
539    
540     Example: crash another node.
541    
542     snd $othernode, eval => "exit";
543 root 1.4
544     =item time => @reply
545    
546     Replies the the current node time to C<@reply>.
547    
548 root 1.5 Example: tell the current node to send the current time to C<$myport> in a
549     C<timereply> message.
550    
551     snd $NODE, time => $myport, timereply => 1, 2;
552     # => snd $myport, timereply => 1, 2, <time>
553    
554 root 1.2 =back
555    
556 root 1.26 =head1 AnyEvent::MP vs. Distributed Erlang
557    
558 root 1.27 AnyEvent::MP got lots of its ideas from distributed erlang (erlang node
559     == aemp node, erlang process == aemp port), so many of the documents and
560     programming techniques employed by erlang apply to AnyEvent::MP. Here is a
561     sample:
562    
563     http://www.erlang.se/doc/programming_rules.shtml
564     http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4
565     http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6
566     http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5
567    
568     Despite the similarities, there are also some important differences:
569 root 1.26
570     =over 4
571    
572     =item * Node references contain the recipe on how to contact them.
573    
574     Erlang relies on special naming and DNS to work everywhere in the
575     same way. AEMP relies on each node knowing it's own address(es), with
576     convenience functionality.
577    
578 root 1.27 This means that AEMP requires a less tightly controlled environment at the
579     cost of longer node references and a slightly higher management overhead.
580    
581 root 1.26 =item * Erlang uses processes and a mailbox, AEMP does not queue.
582    
583     Erlang uses processes that selctively receive messages, and therefore
584     needs a queue. AEMP is event based, queuing messages would serve no useful
585     purpose.
586    
587     (But see L<Coro::MP> for a more erlang-like process model on top of AEMP).
588    
589     =item * Erlang sends are synchronous, AEMP sends are asynchronous.
590    
591     Sending messages in erlang is synchronous and blocks the process. AEMP
592     sends are immediate, connection establishment is handled in the
593     background.
594    
595     =item * Erlang can silently lose messages, AEMP cannot.
596    
597     Erlang makes few guarantees on messages delivery - messages can get lost
598     without any of the processes realising it (i.e. you send messages a, b,
599     and c, and the other side only receives messages a and c).
600    
601     AEMP guarantees correct ordering, and the guarantee that there are no
602     holes in the message sequence.
603    
604     =item * In erlang, processes can be declared dead and later be found to be
605     alive.
606    
607     In erlang it can happen that a monitored process is declared dead and
608     linked processes get killed, but later it turns out that the process is
609     still alive - and can receive messages.
610    
611     In AEMP, when port monitoring detects a port as dead, then that port will
612     eventually be killed - it cannot happen that a node detects a port as dead
613     and then later sends messages to it, finding it is still alive.
614    
615     =item * Erlang can send messages to the wrong port, AEMP does not.
616    
617     In erlang it is quite possible that a node that restarts reuses a process
618     ID known to other nodes for a completely different process, causing
619     messages destined for that process to end up in an unrelated process.
620    
621     AEMP never reuses port IDs, so old messages or old port IDs floating
622     around in the network will not be sent to an unrelated port.
623    
624     =item * Erlang uses unprotected connections, AEMP uses secure
625     authentication and can use TLS.
626    
627     AEMP can use a proven protocol - SSL/TLS - to protect connections and
628     securely authenticate nodes.
629    
630 root 1.28 =item * The AEMP protocol is optimised for both text-based and binary
631     communications.
632    
633     The AEMP protocol, unlike the erlang protocol, supports both
634     language-independent text-only protocols (good for debugging) and binary,
635     language-specific serialisers (e.g. Storable).
636    
637     It has also been carefully designed to be implementable in other languages
638     with a minimum of work while gracefully degrading fucntionality to make the
639     protocol simple.
640    
641 root 1.26 =back
642    
643 root 1.1 =head1 SEE ALSO
644    
645     L<AnyEvent>.
646    
647     =head1 AUTHOR
648    
649     Marc Lehmann <schmorp@schmorp.de>
650     http://home.schmorp.de/
651    
652     =cut
653    
654     1
655