ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.31
Committed: Wed Aug 5 19:55:58 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.30: +14 -5 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP - multi-processing/message-passing framework
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.22 $NODE # contains this node's noderef
10     NODE # returns this node's noderef
11     NODE $port # returns the noderef of the port
12 root 1.2
13     snd $port, type => data...;
14    
15 root 1.22 $SELF # receiving/own port id in rcv callbacks
16    
17 root 1.2 rcv $port, smartmatch => $cb->($port, @msg);
18    
19     # examples:
20     rcv $port2, ping => sub { snd $_[0], "pong"; 0 };
21     rcv $port1, pong => sub { warn "pong received\n" };
22     snd $port2, ping => $port1;
23    
24     # more, smarter, matches (_any_ is exported by this module)
25     rcv $port, [child_died => $pid] => sub { ...
26     rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
27    
28 root 1.1 =head1 DESCRIPTION
29    
30 root 1.2 This module (-family) implements a simple message passing framework.
31    
32     Despite its simplicity, you can securely message other processes running
33     on the same or other hosts.
34    
35 root 1.23 For an introduction to this module family, see the L<AnyEvent::MP::Intro>
36     manual page.
37    
38     At the moment, this module family is severly broken and underdocumented,
39 root 1.21 so do not use. This was uploaded mainly to reserve the CPAN namespace -
40 root 1.23 stay tuned! The basic API should be finished, however.
41 root 1.6
42 root 1.2 =head1 CONCEPTS
43    
44     =over 4
45    
46     =item port
47    
48 root 1.29 A port is something you can send messages to (with the C<snd> function).
49    
50     Some ports allow you to register C<rcv> handlers that can match specific
51     messages. All C<rcv> handlers will receive messages they match, messages
52     will not be queued.
53 root 1.2
54 root 1.3 =item port id - C<noderef#portname>
55 root 1.2
56 root 1.29 A port id is normaly the concatenation of a noderef, a hash-mark (C<#>) as
57     separator, and a port name (a printable string of unspecified format). An
58 root 1.30 exception is the the node port, whose ID is identical to its node
59 root 1.29 reference.
60 root 1.2
61     =item node
62    
63     A node is a single process containing at least one port - the node
64 root 1.29 port. You can send messages to node ports to find existing ports or to
65     create new ports, among other things.
66 root 1.2
67 root 1.29 Nodes are either private (single-process only), slaves (connected to a
68     master node only) or public nodes (connectable from unrelated nodes).
69 root 1.2
70 root 1.5 =item noderef - C<host:port,host:port...>, C<id@noderef>, C<id>
71 root 1.2
72 root 1.29 A node reference is a string that either simply identifies the node (for
73     private and slave nodes), or contains a recipe on how to reach a given
74 root 1.2 node (for public nodes).
75    
76 root 1.29 This recipe is simply a comma-separated list of C<address:port> pairs (for
77     TCP/IP, other protocols might look different).
78    
79     Node references come in two flavours: resolved (containing only numerical
80     addresses) or unresolved (where hostnames are used instead of addresses).
81    
82     Before using an unresolved node reference in a message you first have to
83     resolve it.
84    
85 root 1.2 =back
86    
87 root 1.3 =head1 VARIABLES/FUNCTIONS
88 root 1.2
89     =over 4
90    
91 root 1.1 =cut
92    
93     package AnyEvent::MP;
94    
95 root 1.8 use AnyEvent::MP::Base;
96 root 1.2
97 root 1.1 use common::sense;
98    
99 root 1.2 use Carp ();
100    
101 root 1.1 use AE ();
102    
103 root 1.2 use base "Exporter";
104    
105 root 1.25 our $VERSION = '0.1';
106 root 1.8 our @EXPORT = qw(
107 root 1.22 NODE $NODE *SELF node_of _any_
108 root 1.31 resolve_node initialise_node
109 root 1.22 snd rcv mon kil reg psub
110     port
111 root 1.8 );
112 root 1.2
113 root 1.22 our $SELF;
114    
115     sub _self_die() {
116     my $msg = $@;
117     $msg =~ s/\n+$// unless ref $msg;
118     kil $SELF, die => $msg;
119     }
120    
121     =item $thisnode = NODE / $NODE
122    
123     The C<NODE> function returns, and the C<$NODE> variable contains
124     the noderef of the local node. The value is initialised by a call
125     to C<become_public> or C<become_slave>, after which all local port
126     identifiers become invalid.
127    
128     =item $noderef = node_of $portid
129    
130     Extracts and returns the noderef from a portid or a noderef.
131    
132 root 1.29 =item $cv = resolve_node $noderef
133    
134     Takes an unresolved node reference that may contain hostnames and
135     abbreviated IDs, resolves all of them and returns a resolved node
136     reference.
137    
138     In addition to C<address:port> pairs allowed in resolved noderefs, the
139     following forms are supported:
140    
141     =over 4
142    
143     =item the empty string
144    
145     An empty-string component gets resolved as if the default port (4040) was
146     specified.
147    
148     =item naked port numbers (e.g. C<1234>)
149    
150     These are resolved by prepending the local nodename and a colon, to be
151     further resolved.
152    
153     =item hostnames (e.g. C<localhost:1234>, C<localhost>)
154    
155     These are resolved by using AnyEvent::DNS to resolve them, optionally
156     looking up SRV records for the C<aemp=4040> port, if no port was
157     specified.
158    
159     =back
160    
161 root 1.22 =item $SELF
162    
163     Contains the current port id while executing C<rcv> callbacks or C<psub>
164     blocks.
165 root 1.3
166 root 1.22 =item SELF, %SELF, @SELF...
167    
168     Due to some quirks in how perl exports variables, it is impossible to
169     just export C<$SELF>, all the symbols called C<SELF> are exported by this
170     module, but only C<$SELF> is currently used.
171 root 1.3
172     =item snd $portid, type => @data
173    
174     =item snd $portid, @msg
175    
176 root 1.8 Send the given message to the given port ID, which can identify either
177     a local or a remote port, and can be either a string or soemthignt hat
178     stringifies a sa port ID (such as a port object :).
179    
180     While the message can be about anything, it is highly recommended to use a
181     string as first element (a portid, or some word that indicates a request
182     type etc.).
183 root 1.3
184     The message data effectively becomes read-only after a call to this
185     function: modifying any argument is not allowed and can cause many
186     problems.
187    
188     The type of data you can transfer depends on the transport protocol: when
189     JSON is used, then only strings, numbers and arrays and hashes consisting
190     of those are allowed (no objects). When Storable is used, then anything
191     that Storable can serialise and deserialise is allowed, and for the local
192     node, anything can be passed.
193    
194 root 1.22 =item kil $portid[, @reason]
195    
196     Kill the specified port with the given C<@reason>.
197    
198     If no C<@reason> is specified, then the port is killed "normally" (linked
199     ports will not be kileld, or even notified).
200    
201     Otherwise, linked ports get killed with the same reason (second form of
202     C<mon>, see below).
203    
204     Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
205     will be reported as reason C<< die => $@ >>.
206    
207     Transport/communication errors are reported as C<< transport_error =>
208     $message >>.
209    
210     =item $guard = mon $portid, $cb->(@reason)
211 root 1.18
212 root 1.21 =item $guard = mon $portid, $otherport
213    
214     =item $guard = mon $portid, $otherport, @msg
215    
216 root 1.22 Monitor the given port and do something when the port is killed.
217    
218     In the first form, the callback is simply called with any number
219     of C<@reason> elements (no @reason means that the port was deleted
220     "normally"). Note also that I<< the callback B<must> never die >>, so use
221     C<eval> if unsure.
222    
223     In the second form, the other port will be C<kil>'ed with C<@reason>, iff
224     a @reason was specified, i.e. on "normal" kils nothing happens, while
225     under all other conditions, the other port is killed with the same reason.
226 root 1.20
227 root 1.22 In the last form, a message of the form C<@msg, @reason> will be C<snd>.
228    
229     Example: call a given callback when C<$port> is killed.
230    
231     mon $port, sub { warn "port died because of <@_>\n" };
232    
233     Example: kill ourselves when C<$port> is killed abnormally.
234    
235     mon $port, $self;
236    
237     Example: send us a restart message another C<$port> is killed.
238    
239     mon $port, $self => "restart";
240 root 1.18
241     =cut
242    
243     sub mon {
244 root 1.30 my ($noderef, $port) = split /#/, shift, 2;
245 root 1.18
246 root 1.22 my $node = $NODE{$noderef} || add_node $noderef;
247 root 1.18
248 root 1.30 my $cb = shift;
249    
250     unless (ref $cb) {
251 root 1.21 if (@_) {
252     # send a kill info message
253     my (@msg) = ($cb, @_);
254     $cb = sub { snd @msg, @_ };
255     } else {
256     # simply kill other port
257     my $port = $cb;
258 root 1.22 $cb = sub { kil $port, @_ if @_ };
259 root 1.21 }
260     }
261 root 1.18
262     $node->monitor ($port, $cb);
263    
264     defined wantarray
265     and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }
266     }
267    
268 root 1.21 =item $guard = mon_guard $port, $ref, $ref...
269    
270     Monitors the given C<$port> and keeps the passed references. When the port
271     is killed, the references will be freed.
272    
273     Optionally returns a guard that will stop the monitoring.
274    
275     This function is useful when you create e.g. timers or other watchers and
276     want to free them when the port gets killed:
277    
278     $port->rcv (start => sub {
279     my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub {
280     undef $timer if 0.9 < rand;
281     });
282     });
283    
284     =cut
285    
286     sub mon_guard {
287     my ($port, @refs) = @_;
288    
289     mon $port, sub { 0 && @refs }
290     }
291    
292 root 1.24 =item lnk $port1, $port2
293    
294     Link two ports. This is simply a shorthand for:
295    
296     mon $port1, $port2;
297     mon $port2, $port1;
298    
299     It means that if either one is killed abnormally, the other one gets
300     killed as well.
301    
302 root 1.22 =item $local_port = port
303 root 1.2
304 root 1.31 Create a new local port object that can be used either as a pattern
305     matching port ("full port") or a single-callback port ("miniport"),
306     depending on how C<rcv> callbacks are bound to the object.
307 root 1.3
308 root 1.22 =item $portid = port { my @msg = @_; $finished }
309 root 1.10
310 root 1.15 Creates a "mini port", that is, a very lightweight port without any
311     pattern matching behind it, and returns its ID.
312    
313     The block will be called for every message received on the port. When the
314     callback returns a true value its job is considered "done" and the port
315     will be destroyed. Otherwise it will stay alive.
316    
317 root 1.17 The message will be passed as-is, no extra argument (i.e. no port id) will
318 root 1.15 be passed to the callback.
319    
320     If you need the local port id in the callback, this works nicely:
321    
322 root 1.31 my $port; $port = port {
323 root 1.15 snd $otherport, reply => $port;
324     };
325 root 1.10
326     =cut
327    
328 root 1.22 sub port(;&) {
329     my $id = "$UNIQ." . $ID++;
330     my $port = "$NODE#$id";
331    
332     if (@_) {
333     my $cb = shift;
334     $PORT{$id} = sub {
335     local $SELF = $port;
336     eval {
337     &$cb
338     and kil $id;
339     };
340     _self_die if $@;
341     };
342     } else {
343     my $self = bless {
344     id => "$NODE#$id",
345     }, "AnyEvent::MP::Port";
346    
347     $PORT_DATA{$id} = $self;
348     $PORT{$id} = sub {
349     local $SELF = $port;
350    
351     eval {
352     for (@{ $self->{rc0}{$_[0]} }) {
353     $_ && &{$_->[0]}
354     && undef $_;
355     }
356    
357     for (@{ $self->{rcv}{$_[0]} }) {
358     $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
359     && &{$_->[0]}
360     && undef $_;
361     }
362    
363     for (@{ $self->{any} }) {
364     $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
365     && &{$_->[0]}
366     && undef $_;
367     }
368     };
369     _self_die if $@;
370     };
371     }
372 root 1.10
373 root 1.22 $port
374 root 1.10 }
375    
376 root 1.22 =item reg $portid, $name
377 root 1.8
378 root 1.22 Registers the given port under the name C<$name>. If the name already
379     exists it is replaced.
380 root 1.8
381 root 1.22 A port can only be registered under one well known name.
382 root 1.8
383 root 1.22 A port automatically becomes unregistered when it is killed.
384 root 1.8
385     =cut
386    
387 root 1.22 sub reg(@) {
388     my ($portid, $name) = @_;
389 root 1.8
390 root 1.22 $REG{$name} = $portid;
391     }
392 root 1.18
393 root 1.31 =item rcv $portid, $callback->(@msg)
394    
395     Replaces the callback on the specified miniport (or newly created port
396     object, see C<port>). Full ports are configured with the following calls:
397    
398 root 1.22 =item rcv $portid, tagstring => $callback->(@msg), ...
399 root 1.3
400 root 1.22 =item rcv $portid, $smartmatch => $callback->(@msg), ...
401 root 1.3
402 root 1.22 =item rcv $portid, [$smartmatch...] => $callback->(@msg), ...
403 root 1.3
404 root 1.22 Register callbacks to be called on matching messages on the given port.
405 root 1.3
406     The callback has to return a true value when its work is done, after
407     which is will be removed, or a false value in which case it will stay
408     registered.
409    
410 root 1.22 The global C<$SELF> (exported by this module) contains C<$portid> while
411     executing the callback.
412    
413     Runtime errors wdurign callback execution will result in the port being
414     C<kil>ed.
415    
416 root 1.3 If the match is an array reference, then it will be matched against the
417     first elements of the message, otherwise only the first element is being
418     matched.
419    
420     Any element in the match that is specified as C<_any_> (a function
421     exported by this module) matches any single element of the message.
422    
423     While not required, it is highly recommended that the first matching
424     element is a string identifying the message. The one-string-only match is
425     also the most efficient match (by far).
426    
427     =cut
428    
429     sub rcv($@) {
430 root 1.31 my $portid = shift;
431     my ($noderef, $port) = split /#/, $port, 2;
432 root 1.3
433 root 1.22 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
434     or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught";
435    
436     my $self = $PORT_DATA{$port}
437     or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
438    
439     "AnyEvent::MP::Port" eq ref $self
440     or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
441    
442     while (@_) {
443     my ($match, $cb) = splice @_, 0, 2;
444    
445     if (!ref $match) {
446     push @{ $self->{rc0}{$match} }, [$cb];
447     } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
448     my ($type, @match) = @$match;
449     @match
450     ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
451     : push @{ $self->{rc0}{$match->[0]} }, [$cb];
452     } else {
453     push @{ $self->{any} }, [$cb, $match];
454     }
455 root 1.3 }
456 root 1.31
457     $portid
458 root 1.2 }
459    
460 root 1.22 =item $closure = psub { BLOCK }
461 root 1.2
462 root 1.22 Remembers C<$SELF> and creates a closure out of the BLOCK. When the
463     closure is executed, sets up the environment in the same way as in C<rcv>
464     callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
465    
466     This is useful when you register callbacks from C<rcv> callbacks:
467    
468     rcv delayed_reply => sub {
469     my ($delay, @reply) = @_;
470     my $timer = AE::timer $delay, 0, psub {
471     snd @reply, $SELF;
472     };
473     };
474 root 1.3
475 root 1.8 =cut
476 root 1.3
477 root 1.22 sub psub(&) {
478     my $cb = shift;
479 root 1.3
480 root 1.22 my $port = $SELF
481     or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
482 root 1.1
483 root 1.22 sub {
484     local $SELF = $port;
485 root 1.2
486 root 1.22 if (wantarray) {
487     my @res = eval { &$cb };
488     _self_die if $@;
489     @res
490     } else {
491     my $res = eval { &$cb };
492     _self_die if $@;
493     $res
494     }
495     }
496 root 1.2 }
497    
498 root 1.8 =back
499    
500     =head1 FUNCTIONS FOR NODES
501    
502     =over 4
503 root 1.2
504 root 1.29 =item become_public $noderef
505 root 1.8
506     Tells the node to become a public node, i.e. reachable from other nodes.
507    
508 root 1.29 The first argument is the (unresolved) node reference of the local node
509     (if missing then the empty string is used).
510    
511     It is quite common to not specify anything, in which case the local node
512     tries to listen on the default port, or to only specify a port number, in
513     which case AnyEvent::MP tries to guess the local addresses.
514 root 1.2
515 root 1.8 =cut
516 root 1.1
517 root 1.4 =back
518    
519     =head1 NODE MESSAGES
520    
521 root 1.5 Nodes understand the following messages sent to them. Many of them take
522     arguments called C<@reply>, which will simply be used to compose a reply
523     message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and
524     the remaining arguments are simply the message data.
525 root 1.4
526 root 1.29 While other messages exist, they are not public and subject to change.
527    
528 root 1.4 =over 4
529    
530     =cut
531    
532 root 1.22 =item lookup => $name, @reply
533 root 1.3
534 root 1.8 Replies with the port ID of the specified well-known port, or C<undef>.
535 root 1.3
536 root 1.7 =item devnull => ...
537    
538     Generic data sink/CPU heat conversion.
539    
540 root 1.4 =item relay => $port, @msg
541    
542     Simply forwards the message to the given port.
543    
544     =item eval => $string[ @reply]
545    
546     Evaluates the given string. If C<@reply> is given, then a message of the
547 root 1.5 form C<@reply, $@, @evalres> is sent.
548    
549     Example: crash another node.
550    
551     snd $othernode, eval => "exit";
552 root 1.4
553     =item time => @reply
554    
555     Replies the the current node time to C<@reply>.
556    
557 root 1.5 Example: tell the current node to send the current time to C<$myport> in a
558     C<timereply> message.
559    
560     snd $NODE, time => $myport, timereply => 1, 2;
561     # => snd $myport, timereply => 1, 2, <time>
562    
563 root 1.2 =back
564    
565 root 1.26 =head1 AnyEvent::MP vs. Distributed Erlang
566    
567 root 1.27 AnyEvent::MP got lots of its ideas from distributed erlang (erlang node
568     == aemp node, erlang process == aemp port), so many of the documents and
569     programming techniques employed by erlang apply to AnyEvent::MP. Here is a
570     sample:
571    
572     http://www.erlang.se/doc/programming_rules.shtml
573     http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4
574     http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6
575     http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5
576    
577     Despite the similarities, there are also some important differences:
578 root 1.26
579     =over 4
580    
581     =item * Node references contain the recipe on how to contact them.
582    
583     Erlang relies on special naming and DNS to work everywhere in the
584     same way. AEMP relies on each node knowing it's own address(es), with
585     convenience functionality.
586    
587 root 1.27 This means that AEMP requires a less tightly controlled environment at the
588     cost of longer node references and a slightly higher management overhead.
589    
590 root 1.26 =item * Erlang uses processes and a mailbox, AEMP does not queue.
591    
592     Erlang uses processes that selctively receive messages, and therefore
593     needs a queue. AEMP is event based, queuing messages would serve no useful
594     purpose.
595    
596     (But see L<Coro::MP> for a more erlang-like process model on top of AEMP).
597    
598     =item * Erlang sends are synchronous, AEMP sends are asynchronous.
599    
600     Sending messages in erlang is synchronous and blocks the process. AEMP
601     sends are immediate, connection establishment is handled in the
602     background.
603    
604     =item * Erlang can silently lose messages, AEMP cannot.
605    
606     Erlang makes few guarantees on messages delivery - messages can get lost
607     without any of the processes realising it (i.e. you send messages a, b,
608     and c, and the other side only receives messages a and c).
609    
610     AEMP guarantees correct ordering, and the guarantee that there are no
611     holes in the message sequence.
612    
613     =item * In erlang, processes can be declared dead and later be found to be
614     alive.
615    
616     In erlang it can happen that a monitored process is declared dead and
617     linked processes get killed, but later it turns out that the process is
618     still alive - and can receive messages.
619    
620     In AEMP, when port monitoring detects a port as dead, then that port will
621     eventually be killed - it cannot happen that a node detects a port as dead
622     and then later sends messages to it, finding it is still alive.
623    
624     =item * Erlang can send messages to the wrong port, AEMP does not.
625    
626     In erlang it is quite possible that a node that restarts reuses a process
627     ID known to other nodes for a completely different process, causing
628     messages destined for that process to end up in an unrelated process.
629    
630     AEMP never reuses port IDs, so old messages or old port IDs floating
631     around in the network will not be sent to an unrelated port.
632    
633     =item * Erlang uses unprotected connections, AEMP uses secure
634     authentication and can use TLS.
635    
636     AEMP can use a proven protocol - SSL/TLS - to protect connections and
637     securely authenticate nodes.
638    
639 root 1.28 =item * The AEMP protocol is optimised for both text-based and binary
640     communications.
641    
642     The AEMP protocol, unlike the erlang protocol, supports both
643     language-independent text-only protocols (good for debugging) and binary,
644     language-specific serialisers (e.g. Storable).
645    
646     It has also been carefully designed to be implementable in other languages
647     with a minimum of work while gracefully degrading fucntionality to make the
648     protocol simple.
649    
650 root 1.26 =back
651    
652 root 1.1 =head1 SEE ALSO
653    
654     L<AnyEvent>.
655    
656     =head1 AUTHOR
657    
658     Marc Lehmann <schmorp@schmorp.de>
659     http://home.schmorp.de/
660    
661     =cut
662    
663     1
664