ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.32
Committed: Wed Aug 5 19:58:46 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.31: +110 -109 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP - multi-processing/message-passing framework
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.22 $NODE # contains this node's noderef
10     NODE # returns this node's noderef
11     NODE $port # returns the noderef of the port
12 root 1.2
13     snd $port, type => data...;
14    
15 root 1.22 $SELF # receiving/own port id in rcv callbacks
16    
17 root 1.2 rcv $port, smartmatch => $cb->($port, @msg);
18    
19     # examples:
20     rcv $port2, ping => sub { snd $_[0], "pong"; 0 };
21     rcv $port1, pong => sub { warn "pong received\n" };
22     snd $port2, ping => $port1;
23    
24     # more, smarter, matches (_any_ is exported by this module)
25     rcv $port, [child_died => $pid] => sub { ...
26     rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
27    
28 root 1.1 =head1 DESCRIPTION
29    
30 root 1.2 This module (-family) implements a simple message passing framework.
31    
32     Despite its simplicity, you can securely message other processes running
33     on the same or other hosts.
34    
35 root 1.23 For an introduction to this module family, see the L<AnyEvent::MP::Intro>
36     manual page.
37    
38     At the moment, this module family is severly broken and underdocumented,
39 root 1.21 so do not use. This was uploaded mainly to reserve the CPAN namespace -
40 root 1.23 stay tuned! The basic API should be finished, however.
41 root 1.6
42 root 1.2 =head1 CONCEPTS
43    
44     =over 4
45    
46     =item port
47    
48 root 1.29 A port is something you can send messages to (with the C<snd> function).
49    
50     Some ports allow you to register C<rcv> handlers that can match specific
51     messages. All C<rcv> handlers will receive messages they match, messages
52     will not be queued.
53 root 1.2
54 root 1.3 =item port id - C<noderef#portname>
55 root 1.2
56 root 1.29 A port id is normaly the concatenation of a noderef, a hash-mark (C<#>) as
57     separator, and a port name (a printable string of unspecified format). An
58 root 1.30 exception is the the node port, whose ID is identical to its node
59 root 1.29 reference.
60 root 1.2
61     =item node
62    
63     A node is a single process containing at least one port - the node
64 root 1.29 port. You can send messages to node ports to find existing ports or to
65     create new ports, among other things.
66 root 1.2
67 root 1.29 Nodes are either private (single-process only), slaves (connected to a
68     master node only) or public nodes (connectable from unrelated nodes).
69 root 1.2
70 root 1.5 =item noderef - C<host:port,host:port...>, C<id@noderef>, C<id>
71 root 1.2
72 root 1.29 A node reference is a string that either simply identifies the node (for
73     private and slave nodes), or contains a recipe on how to reach a given
74 root 1.2 node (for public nodes).
75    
76 root 1.29 This recipe is simply a comma-separated list of C<address:port> pairs (for
77     TCP/IP, other protocols might look different).
78    
79     Node references come in two flavours: resolved (containing only numerical
80     addresses) or unresolved (where hostnames are used instead of addresses).
81    
82     Before using an unresolved node reference in a message you first have to
83     resolve it.
84    
85 root 1.2 =back
86    
87 root 1.3 =head1 VARIABLES/FUNCTIONS
88 root 1.2
89     =over 4
90    
91 root 1.1 =cut
92    
93     package AnyEvent::MP;
94    
95 root 1.8 use AnyEvent::MP::Base;
96 root 1.2
97 root 1.1 use common::sense;
98    
99 root 1.2 use Carp ();
100    
101 root 1.1 use AE ();
102    
103 root 1.2 use base "Exporter";
104    
105 root 1.25 our $VERSION = '0.1';
106 root 1.8 our @EXPORT = qw(
107 root 1.22 NODE $NODE *SELF node_of _any_
108 root 1.31 resolve_node initialise_node
109 root 1.22 snd rcv mon kil reg psub
110     port
111 root 1.8 );
112 root 1.2
113 root 1.22 our $SELF;
114    
115     sub _self_die() {
116     my $msg = $@;
117     $msg =~ s/\n+$// unless ref $msg;
118     kil $SELF, die => $msg;
119     }
120    
121     =item $thisnode = NODE / $NODE
122    
123     The C<NODE> function returns, and the C<$NODE> variable contains
124     the noderef of the local node. The value is initialised by a call
125     to C<become_public> or C<become_slave>, after which all local port
126     identifiers become invalid.
127    
128     =item $noderef = node_of $portid
129    
130     Extracts and returns the noderef from a portid or a noderef.
131    
132 root 1.29 =item $cv = resolve_node $noderef
133    
134     Takes an unresolved node reference that may contain hostnames and
135     abbreviated IDs, resolves all of them and returns a resolved node
136     reference.
137    
138     In addition to C<address:port> pairs allowed in resolved noderefs, the
139     following forms are supported:
140    
141     =over 4
142    
143     =item the empty string
144    
145     An empty-string component gets resolved as if the default port (4040) was
146     specified.
147    
148     =item naked port numbers (e.g. C<1234>)
149    
150     These are resolved by prepending the local nodename and a colon, to be
151     further resolved.
152    
153     =item hostnames (e.g. C<localhost:1234>, C<localhost>)
154    
155     These are resolved by using AnyEvent::DNS to resolve them, optionally
156     looking up SRV records for the C<aemp=4040> port, if no port was
157     specified.
158    
159     =back
160    
161 root 1.22 =item $SELF
162    
163     Contains the current port id while executing C<rcv> callbacks or C<psub>
164     blocks.
165 root 1.3
166 root 1.22 =item SELF, %SELF, @SELF...
167    
168     Due to some quirks in how perl exports variables, it is impossible to
169     just export C<$SELF>, all the symbols called C<SELF> are exported by this
170     module, but only C<$SELF> is currently used.
171 root 1.3
172     =item snd $portid, type => @data
173    
174     =item snd $portid, @msg
175    
176 root 1.8 Send the given message to the given port ID, which can identify either
177     a local or a remote port, and can be either a string or soemthignt hat
178     stringifies a sa port ID (such as a port object :).
179    
180     While the message can be about anything, it is highly recommended to use a
181     string as first element (a portid, or some word that indicates a request
182     type etc.).
183 root 1.3
184     The message data effectively becomes read-only after a call to this
185     function: modifying any argument is not allowed and can cause many
186     problems.
187    
188     The type of data you can transfer depends on the transport protocol: when
189     JSON is used, then only strings, numbers and arrays and hashes consisting
190     of those are allowed (no objects). When Storable is used, then anything
191     that Storable can serialise and deserialise is allowed, and for the local
192     node, anything can be passed.
193    
194 root 1.22 =item $local_port = port
195 root 1.2
196 root 1.31 Create a new local port object that can be used either as a pattern
197     matching port ("full port") or a single-callback port ("miniport"),
198     depending on how C<rcv> callbacks are bound to the object.
199 root 1.3
200 root 1.22 =item $portid = port { my @msg = @_; $finished }
201 root 1.10
202 root 1.15 Creates a "mini port", that is, a very lightweight port without any
203     pattern matching behind it, and returns its ID.
204    
205     The block will be called for every message received on the port. When the
206     callback returns a true value its job is considered "done" and the port
207     will be destroyed. Otherwise it will stay alive.
208    
209 root 1.17 The message will be passed as-is, no extra argument (i.e. no port id) will
210 root 1.15 be passed to the callback.
211    
212     If you need the local port id in the callback, this works nicely:
213    
214 root 1.31 my $port; $port = port {
215 root 1.15 snd $otherport, reply => $port;
216     };
217 root 1.10
218     =cut
219    
220 root 1.22 sub port(;&) {
221     my $id = "$UNIQ." . $ID++;
222     my $port = "$NODE#$id";
223    
224     if (@_) {
225     my $cb = shift;
226     $PORT{$id} = sub {
227     local $SELF = $port;
228     eval {
229     &$cb
230     and kil $id;
231     };
232     _self_die if $@;
233     };
234     } else {
235     my $self = bless {
236     id => "$NODE#$id",
237     }, "AnyEvent::MP::Port";
238    
239     $PORT_DATA{$id} = $self;
240     $PORT{$id} = sub {
241     local $SELF = $port;
242    
243     eval {
244     for (@{ $self->{rc0}{$_[0]} }) {
245     $_ && &{$_->[0]}
246     && undef $_;
247     }
248    
249     for (@{ $self->{rcv}{$_[0]} }) {
250     $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
251     && &{$_->[0]}
252     && undef $_;
253     }
254    
255     for (@{ $self->{any} }) {
256     $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
257     && &{$_->[0]}
258     && undef $_;
259     }
260     };
261     _self_die if $@;
262     };
263     }
264 root 1.10
265 root 1.22 $port
266 root 1.10 }
267    
268 root 1.22 =item reg $portid, $name
269 root 1.8
270 root 1.22 Registers the given port under the name C<$name>. If the name already
271     exists it is replaced.
272 root 1.8
273 root 1.22 A port can only be registered under one well known name.
274 root 1.8
275 root 1.22 A port automatically becomes unregistered when it is killed.
276 root 1.8
277     =cut
278    
279 root 1.22 sub reg(@) {
280     my ($portid, $name) = @_;
281 root 1.8
282 root 1.22 $REG{$name} = $portid;
283     }
284 root 1.18
285 root 1.31 =item rcv $portid, $callback->(@msg)
286    
287     Replaces the callback on the specified miniport (or newly created port
288     object, see C<port>). Full ports are configured with the following calls:
289    
290 root 1.22 =item rcv $portid, tagstring => $callback->(@msg), ...
291 root 1.3
292 root 1.22 =item rcv $portid, $smartmatch => $callback->(@msg), ...
293 root 1.3
294 root 1.22 =item rcv $portid, [$smartmatch...] => $callback->(@msg), ...
295 root 1.3
296 root 1.32 Register callbacks to be called on matching messages on the given full
297     port (or newly created port).
298 root 1.3
299     The callback has to return a true value when its work is done, after
300     which is will be removed, or a false value in which case it will stay
301     registered.
302    
303 root 1.22 The global C<$SELF> (exported by this module) contains C<$portid> while
304     executing the callback.
305    
306     Runtime errors wdurign callback execution will result in the port being
307     C<kil>ed.
308    
309 root 1.3 If the match is an array reference, then it will be matched against the
310     first elements of the message, otherwise only the first element is being
311     matched.
312    
313     Any element in the match that is specified as C<_any_> (a function
314     exported by this module) matches any single element of the message.
315    
316     While not required, it is highly recommended that the first matching
317     element is a string identifying the message. The one-string-only match is
318     also the most efficient match (by far).
319    
320     =cut
321    
322     sub rcv($@) {
323 root 1.31 my $portid = shift;
324     my ($noderef, $port) = split /#/, $port, 2;
325 root 1.3
326 root 1.22 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
327     or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught";
328    
329     my $self = $PORT_DATA{$port}
330     or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
331    
332     "AnyEvent::MP::Port" eq ref $self
333     or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
334    
335     while (@_) {
336     my ($match, $cb) = splice @_, 0, 2;
337    
338     if (!ref $match) {
339     push @{ $self->{rc0}{$match} }, [$cb];
340     } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
341     my ($type, @match) = @$match;
342     @match
343     ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
344     : push @{ $self->{rc0}{$match->[0]} }, [$cb];
345     } else {
346     push @{ $self->{any} }, [$cb, $match];
347     }
348 root 1.3 }
349 root 1.31
350     $portid
351 root 1.2 }
352    
353 root 1.22 =item $closure = psub { BLOCK }
354 root 1.2
355 root 1.22 Remembers C<$SELF> and creates a closure out of the BLOCK. When the
356     closure is executed, sets up the environment in the same way as in C<rcv>
357     callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
358    
359     This is useful when you register callbacks from C<rcv> callbacks:
360    
361     rcv delayed_reply => sub {
362     my ($delay, @reply) = @_;
363     my $timer = AE::timer $delay, 0, psub {
364     snd @reply, $SELF;
365     };
366     };
367 root 1.3
368 root 1.8 =cut
369 root 1.3
370 root 1.22 sub psub(&) {
371     my $cb = shift;
372 root 1.3
373 root 1.22 my $port = $SELF
374     or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
375 root 1.1
376 root 1.22 sub {
377     local $SELF = $port;
378 root 1.2
379 root 1.22 if (wantarray) {
380     my @res = eval { &$cb };
381     _self_die if $@;
382     @res
383     } else {
384     my $res = eval { &$cb };
385     _self_die if $@;
386     $res
387     }
388     }
389 root 1.2 }
390    
391 root 1.32 =item $guard = mon $portid, $cb->(@reason)
392    
393     =item $guard = mon $portid, $otherport
394    
395     =item $guard = mon $portid, $otherport, @msg
396    
397     Monitor the given port and do something when the port is killed.
398    
399     In the first form, the callback is simply called with any number
400     of C<@reason> elements (no @reason means that the port was deleted
401     "normally"). Note also that I<< the callback B<must> never die >>, so use
402     C<eval> if unsure.
403    
404     In the second form, the other port will be C<kil>'ed with C<@reason>, iff
405     a @reason was specified, i.e. on "normal" kils nothing happens, while
406     under all other conditions, the other port is killed with the same reason.
407    
408     In the last form, a message of the form C<@msg, @reason> will be C<snd>.
409    
410     Example: call a given callback when C<$port> is killed.
411    
412     mon $port, sub { warn "port died because of <@_>\n" };
413    
414     Example: kill ourselves when C<$port> is killed abnormally.
415    
416     mon $port, $self;
417    
418     Example: send us a restart message another C<$port> is killed.
419    
420     mon $port, $self => "restart";
421    
422     =cut
423    
424     sub mon {
425     my ($noderef, $port) = split /#/, shift, 2;
426    
427     my $node = $NODE{$noderef} || add_node $noderef;
428    
429     my $cb = shift;
430    
431     unless (ref $cb) {
432     if (@_) {
433     # send a kill info message
434     my (@msg) = ($cb, @_);
435     $cb = sub { snd @msg, @_ };
436     } else {
437     # simply kill other port
438     my $port = $cb;
439     $cb = sub { kil $port, @_ if @_ };
440     }
441     }
442    
443     $node->monitor ($port, $cb);
444    
445     defined wantarray
446     and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }
447     }
448    
449     =item $guard = mon_guard $port, $ref, $ref...
450    
451     Monitors the given C<$port> and keeps the passed references. When the port
452     is killed, the references will be freed.
453    
454     Optionally returns a guard that will stop the monitoring.
455    
456     This function is useful when you create e.g. timers or other watchers and
457     want to free them when the port gets killed:
458    
459     $port->rcv (start => sub {
460     my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub {
461     undef $timer if 0.9 < rand;
462     });
463     });
464    
465     =cut
466    
467     sub mon_guard {
468     my ($port, @refs) = @_;
469    
470     mon $port, sub { 0 && @refs }
471     }
472    
473     =item lnk $port1, $port2
474    
475     Link two ports. This is simply a shorthand for:
476    
477     mon $port1, $port2;
478     mon $port2, $port1;
479    
480     It means that if either one is killed abnormally, the other one gets
481     killed as well.
482    
483     =item kil $portid[, @reason]
484    
485     Kill the specified port with the given C<@reason>.
486    
487     If no C<@reason> is specified, then the port is killed "normally" (linked
488     ports will not be kileld, or even notified).
489    
490     Otherwise, linked ports get killed with the same reason (second form of
491     C<mon>, see below).
492    
493     Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
494     will be reported as reason C<< die => $@ >>.
495    
496     Transport/communication errors are reported as C<< transport_error =>
497     $message >>.
498    
499 root 1.8 =back
500    
501     =head1 FUNCTIONS FOR NODES
502    
503     =over 4
504 root 1.2
505 root 1.29 =item become_public $noderef
506 root 1.8
507     Tells the node to become a public node, i.e. reachable from other nodes.
508    
509 root 1.29 The first argument is the (unresolved) node reference of the local node
510     (if missing then the empty string is used).
511    
512     It is quite common to not specify anything, in which case the local node
513     tries to listen on the default port, or to only specify a port number, in
514     which case AnyEvent::MP tries to guess the local addresses.
515 root 1.2
516 root 1.8 =cut
517 root 1.1
518 root 1.4 =back
519    
520     =head1 NODE MESSAGES
521    
522 root 1.5 Nodes understand the following messages sent to them. Many of them take
523     arguments called C<@reply>, which will simply be used to compose a reply
524     message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and
525     the remaining arguments are simply the message data.
526 root 1.4
527 root 1.29 While other messages exist, they are not public and subject to change.
528    
529 root 1.4 =over 4
530    
531     =cut
532    
533 root 1.22 =item lookup => $name, @reply
534 root 1.3
535 root 1.8 Replies with the port ID of the specified well-known port, or C<undef>.
536 root 1.3
537 root 1.7 =item devnull => ...
538    
539     Generic data sink/CPU heat conversion.
540    
541 root 1.4 =item relay => $port, @msg
542    
543     Simply forwards the message to the given port.
544    
545     =item eval => $string[ @reply]
546    
547     Evaluates the given string. If C<@reply> is given, then a message of the
548 root 1.5 form C<@reply, $@, @evalres> is sent.
549    
550     Example: crash another node.
551    
552     snd $othernode, eval => "exit";
553 root 1.4
554     =item time => @reply
555    
556     Replies the the current node time to C<@reply>.
557    
558 root 1.5 Example: tell the current node to send the current time to C<$myport> in a
559     C<timereply> message.
560    
561     snd $NODE, time => $myport, timereply => 1, 2;
562     # => snd $myport, timereply => 1, 2, <time>
563    
564 root 1.2 =back
565    
566 root 1.26 =head1 AnyEvent::MP vs. Distributed Erlang
567    
568 root 1.27 AnyEvent::MP got lots of its ideas from distributed erlang (erlang node
569     == aemp node, erlang process == aemp port), so many of the documents and
570     programming techniques employed by erlang apply to AnyEvent::MP. Here is a
571     sample:
572    
573     http://www.erlang.se/doc/programming_rules.shtml
574     http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4
575     http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6
576     http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5
577    
578     Despite the similarities, there are also some important differences:
579 root 1.26
580     =over 4
581    
582     =item * Node references contain the recipe on how to contact them.
583    
584     Erlang relies on special naming and DNS to work everywhere in the
585     same way. AEMP relies on each node knowing it's own address(es), with
586     convenience functionality.
587    
588 root 1.27 This means that AEMP requires a less tightly controlled environment at the
589     cost of longer node references and a slightly higher management overhead.
590    
591 root 1.26 =item * Erlang uses processes and a mailbox, AEMP does not queue.
592    
593     Erlang uses processes that selctively receive messages, and therefore
594     needs a queue. AEMP is event based, queuing messages would serve no useful
595     purpose.
596    
597     (But see L<Coro::MP> for a more erlang-like process model on top of AEMP).
598    
599     =item * Erlang sends are synchronous, AEMP sends are asynchronous.
600    
601     Sending messages in erlang is synchronous and blocks the process. AEMP
602     sends are immediate, connection establishment is handled in the
603     background.
604    
605     =item * Erlang can silently lose messages, AEMP cannot.
606    
607     Erlang makes few guarantees on messages delivery - messages can get lost
608     without any of the processes realising it (i.e. you send messages a, b,
609     and c, and the other side only receives messages a and c).
610    
611     AEMP guarantees correct ordering, and the guarantee that there are no
612     holes in the message sequence.
613    
614     =item * In erlang, processes can be declared dead and later be found to be
615     alive.
616    
617     In erlang it can happen that a monitored process is declared dead and
618     linked processes get killed, but later it turns out that the process is
619     still alive - and can receive messages.
620    
621     In AEMP, when port monitoring detects a port as dead, then that port will
622     eventually be killed - it cannot happen that a node detects a port as dead
623     and then later sends messages to it, finding it is still alive.
624    
625     =item * Erlang can send messages to the wrong port, AEMP does not.
626    
627     In erlang it is quite possible that a node that restarts reuses a process
628     ID known to other nodes for a completely different process, causing
629     messages destined for that process to end up in an unrelated process.
630    
631     AEMP never reuses port IDs, so old messages or old port IDs floating
632     around in the network will not be sent to an unrelated port.
633    
634     =item * Erlang uses unprotected connections, AEMP uses secure
635     authentication and can use TLS.
636    
637     AEMP can use a proven protocol - SSL/TLS - to protect connections and
638     securely authenticate nodes.
639    
640 root 1.28 =item * The AEMP protocol is optimised for both text-based and binary
641     communications.
642    
643     The AEMP protocol, unlike the erlang protocol, supports both
644     language-independent text-only protocols (good for debugging) and binary,
645     language-specific serialisers (e.g. Storable).
646    
647     It has also been carefully designed to be implementable in other languages
648     with a minimum of work while gracefully degrading fucntionality to make the
649     protocol simple.
650    
651 root 1.26 =back
652    
653 root 1.1 =head1 SEE ALSO
654    
655     L<AnyEvent>.
656    
657     =head1 AUTHOR
658    
659     Marc Lehmann <schmorp@schmorp.de>
660     http://home.schmorp.de/
661    
662     =cut
663    
664     1
665