ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.29
Committed: Tue Aug 4 23:16:57 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.28: +63 -21 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP - multi-processing/message-passing framework
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.22 $NODE # contains this node's noderef
10     NODE # returns this node's noderef
11     NODE $port # returns the noderef of the port
12 root 1.2
13     snd $port, type => data...;
14    
15 root 1.22 $SELF # receiving/own port id in rcv callbacks
16    
17 root 1.2 rcv $port, smartmatch => $cb->($port, @msg);
18    
19     # examples:
20     rcv $port2, ping => sub { snd $_[0], "pong"; 0 };
21     rcv $port1, pong => sub { warn "pong received\n" };
22     snd $port2, ping => $port1;
23    
24     # more, smarter, matches (_any_ is exported by this module)
25     rcv $port, [child_died => $pid] => sub { ...
26     rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
27    
28 root 1.1 =head1 DESCRIPTION
29    
30 root 1.2 This module (-family) implements a simple message passing framework.
31    
32     Despite its simplicity, you can securely message other processes running
33     on the same or other hosts.
34    
35 root 1.23 For an introduction to this module family, see the L<AnyEvent::MP::Intro>
36     manual page.
37    
38     At the moment, this module family is severly broken and underdocumented,
39 root 1.21 so do not use. This was uploaded mainly to reserve the CPAN namespace -
40 root 1.23 stay tuned! The basic API should be finished, however.
41 root 1.6
42 root 1.2 =head1 CONCEPTS
43    
44     =over 4
45    
46     =item port
47    
48 root 1.29 A port is something you can send messages to (with the C<snd> function).
49    
50     Some ports allow you to register C<rcv> handlers that can match specific
51     messages. All C<rcv> handlers will receive messages they match, messages
52     will not be queued.
53 root 1.2
54 root 1.3 =item port id - C<noderef#portname>
55 root 1.2
56 root 1.29 A port id is normaly the concatenation of a noderef, a hash-mark (C<#>) as
57     separator, and a port name (a printable string of unspecified format). An
58     exception is the the node port, whose ID is identical to it's node
59     reference.
60 root 1.2
61     =item node
62    
63     A node is a single process containing at least one port - the node
64 root 1.29 port. You can send messages to node ports to find existing ports or to
65     create new ports, among other things.
66 root 1.2
67 root 1.29 Nodes are either private (single-process only), slaves (connected to a
68     master node only) or public nodes (connectable from unrelated nodes).
69 root 1.2
70 root 1.5 =item noderef - C<host:port,host:port...>, C<id@noderef>, C<id>
71 root 1.2
72 root 1.29 A node reference is a string that either simply identifies the node (for
73     private and slave nodes), or contains a recipe on how to reach a given
74 root 1.2 node (for public nodes).
75    
76 root 1.29 This recipe is simply a comma-separated list of C<address:port> pairs (for
77     TCP/IP, other protocols might look different).
78    
79     Node references come in two flavours: resolved (containing only numerical
80     addresses) or unresolved (where hostnames are used instead of addresses).
81    
82     Before using an unresolved node reference in a message you first have to
83     resolve it.
84    
85 root 1.2 =back
86    
87 root 1.3 =head1 VARIABLES/FUNCTIONS
88 root 1.2
89     =over 4
90    
91 root 1.1 =cut
92    
93     package AnyEvent::MP;
94    
95 root 1.8 use AnyEvent::MP::Base;
96 root 1.2
97 root 1.1 use common::sense;
98    
99 root 1.2 use Carp ();
100    
101 root 1.1 use AE ();
102    
103 root 1.2 use base "Exporter";
104    
105 root 1.25 our $VERSION = '0.1';
106 root 1.8 our @EXPORT = qw(
107 root 1.22 NODE $NODE *SELF node_of _any_
108 root 1.29 resolve_node
109 root 1.8 become_slave become_public
110 root 1.22 snd rcv mon kil reg psub
111     port
112 root 1.8 );
113 root 1.2
114 root 1.22 our $SELF;
115    
116     sub _self_die() {
117     my $msg = $@;
118     $msg =~ s/\n+$// unless ref $msg;
119     kil $SELF, die => $msg;
120     }
121    
122     =item $thisnode = NODE / $NODE
123    
124     The C<NODE> function returns, and the C<$NODE> variable contains
125     the noderef of the local node. The value is initialised by a call
126     to C<become_public> or C<become_slave>, after which all local port
127     identifiers become invalid.
128    
129     =item $noderef = node_of $portid
130    
131     Extracts and returns the noderef from a portid or a noderef.
132    
133 root 1.29 =item $cv = resolve_node $noderef
134    
135     Takes an unresolved node reference that may contain hostnames and
136     abbreviated IDs, resolves all of them and returns a resolved node
137     reference.
138    
139     In addition to C<address:port> pairs allowed in resolved noderefs, the
140     following forms are supported:
141    
142     =over 4
143    
144     =item the empty string
145    
146     An empty-string component gets resolved as if the default port (4040) was
147     specified.
148    
149     =item naked port numbers (e.g. C<1234>)
150    
151     These are resolved by prepending the local nodename and a colon, to be
152     further resolved.
153    
154     =item hostnames (e.g. C<localhost:1234>, C<localhost>)
155    
156     These are resolved by using AnyEvent::DNS to resolve them, optionally
157     looking up SRV records for the C<aemp=4040> port, if no port was
158     specified.
159    
160     =back
161    
162 root 1.22 =item $SELF
163    
164     Contains the current port id while executing C<rcv> callbacks or C<psub>
165     blocks.
166 root 1.3
167 root 1.22 =item SELF, %SELF, @SELF...
168    
169     Due to some quirks in how perl exports variables, it is impossible to
170     just export C<$SELF>, all the symbols called C<SELF> are exported by this
171     module, but only C<$SELF> is currently used.
172 root 1.3
173     =item snd $portid, type => @data
174    
175     =item snd $portid, @msg
176    
177 root 1.8 Send the given message to the given port ID, which can identify either
178     a local or a remote port, and can be either a string or soemthignt hat
179     stringifies a sa port ID (such as a port object :).
180    
181     While the message can be about anything, it is highly recommended to use a
182     string as first element (a portid, or some word that indicates a request
183     type etc.).
184 root 1.3
185     The message data effectively becomes read-only after a call to this
186     function: modifying any argument is not allowed and can cause many
187     problems.
188    
189     The type of data you can transfer depends on the transport protocol: when
190     JSON is used, then only strings, numbers and arrays and hashes consisting
191     of those are allowed (no objects). When Storable is used, then anything
192     that Storable can serialise and deserialise is allowed, and for the local
193     node, anything can be passed.
194    
195 root 1.22 =item kil $portid[, @reason]
196    
197     Kill the specified port with the given C<@reason>.
198    
199     If no C<@reason> is specified, then the port is killed "normally" (linked
200     ports will not be kileld, or even notified).
201    
202     Otherwise, linked ports get killed with the same reason (second form of
203     C<mon>, see below).
204    
205     Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
206     will be reported as reason C<< die => $@ >>.
207    
208     Transport/communication errors are reported as C<< transport_error =>
209     $message >>.
210    
211     =item $guard = mon $portid, $cb->(@reason)
212 root 1.18
213 root 1.21 =item $guard = mon $portid, $otherport
214    
215     =item $guard = mon $portid, $otherport, @msg
216    
217 root 1.22 Monitor the given port and do something when the port is killed.
218    
219     In the first form, the callback is simply called with any number
220     of C<@reason> elements (no @reason means that the port was deleted
221     "normally"). Note also that I<< the callback B<must> never die >>, so use
222     C<eval> if unsure.
223    
224     In the second form, the other port will be C<kil>'ed with C<@reason>, iff
225     a @reason was specified, i.e. on "normal" kils nothing happens, while
226     under all other conditions, the other port is killed with the same reason.
227 root 1.20
228 root 1.22 In the last form, a message of the form C<@msg, @reason> will be C<snd>.
229    
230     Example: call a given callback when C<$port> is killed.
231    
232     mon $port, sub { warn "port died because of <@_>\n" };
233    
234     Example: kill ourselves when C<$port> is killed abnormally.
235    
236     mon $port, $self;
237    
238     Example: send us a restart message another C<$port> is killed.
239    
240     mon $port, $self => "restart";
241 root 1.18
242     =cut
243    
244     sub mon {
245 root 1.21 my ($noderef, $port, $cb) = ((split /#/, shift, 2), shift);
246 root 1.18
247 root 1.22 my $node = $NODE{$noderef} || add_node $noderef;
248 root 1.18
249 root 1.21 #TODO: ports must not be references
250     if (!ref $cb or "AnyEvent::MP::Port" eq ref $cb) {
251     if (@_) {
252     # send a kill info message
253     my (@msg) = ($cb, @_);
254     $cb = sub { snd @msg, @_ };
255     } else {
256     # simply kill other port
257     my $port = $cb;
258 root 1.22 $cb = sub { kil $port, @_ if @_ };
259 root 1.21 }
260     }
261 root 1.18
262     $node->monitor ($port, $cb);
263    
264     defined wantarray
265     and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }
266     }
267    
268 root 1.21 =item $guard = mon_guard $port, $ref, $ref...
269    
270     Monitors the given C<$port> and keeps the passed references. When the port
271     is killed, the references will be freed.
272    
273     Optionally returns a guard that will stop the monitoring.
274    
275     This function is useful when you create e.g. timers or other watchers and
276     want to free them when the port gets killed:
277    
278     $port->rcv (start => sub {
279     my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub {
280     undef $timer if 0.9 < rand;
281     });
282     });
283    
284     =cut
285    
286     sub mon_guard {
287     my ($port, @refs) = @_;
288    
289     mon $port, sub { 0 && @refs }
290     }
291    
292 root 1.24 =item lnk $port1, $port2
293    
294     Link two ports. This is simply a shorthand for:
295    
296     mon $port1, $port2;
297     mon $port2, $port1;
298    
299     It means that if either one is killed abnormally, the other one gets
300     killed as well.
301    
302 root 1.22 =item $local_port = port
303 root 1.2
304 root 1.22 Create a new local port object that supports message matching.
305 root 1.3
306 root 1.22 =item $portid = port { my @msg = @_; $finished }
307 root 1.10
308 root 1.15 Creates a "mini port", that is, a very lightweight port without any
309     pattern matching behind it, and returns its ID.
310    
311     The block will be called for every message received on the port. When the
312     callback returns a true value its job is considered "done" and the port
313     will be destroyed. Otherwise it will stay alive.
314    
315 root 1.17 The message will be passed as-is, no extra argument (i.e. no port id) will
316 root 1.15 be passed to the callback.
317    
318     If you need the local port id in the callback, this works nicely:
319    
320     my $port; $port = miniport {
321     snd $otherport, reply => $port;
322     };
323 root 1.10
324     =cut
325    
326 root 1.22 sub port(;&) {
327     my $id = "$UNIQ." . $ID++;
328     my $port = "$NODE#$id";
329    
330     if (@_) {
331     my $cb = shift;
332     $PORT{$id} = sub {
333     local $SELF = $port;
334     eval {
335     &$cb
336     and kil $id;
337     };
338     _self_die if $@;
339     };
340     } else {
341     my $self = bless {
342     id => "$NODE#$id",
343     }, "AnyEvent::MP::Port";
344    
345     $PORT_DATA{$id} = $self;
346     $PORT{$id} = sub {
347     local $SELF = $port;
348    
349     eval {
350     for (@{ $self->{rc0}{$_[0]} }) {
351     $_ && &{$_->[0]}
352     && undef $_;
353     }
354    
355     for (@{ $self->{rcv}{$_[0]} }) {
356     $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
357     && &{$_->[0]}
358     && undef $_;
359     }
360    
361     for (@{ $self->{any} }) {
362     $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
363     && &{$_->[0]}
364     && undef $_;
365     }
366     };
367     _self_die if $@;
368     };
369     }
370 root 1.10
371 root 1.22 $port
372 root 1.10 }
373    
374 root 1.22 =item reg $portid, $name
375 root 1.8
376 root 1.22 Registers the given port under the name C<$name>. If the name already
377     exists it is replaced.
378 root 1.8
379 root 1.22 A port can only be registered under one well known name.
380 root 1.8
381 root 1.22 A port automatically becomes unregistered when it is killed.
382 root 1.8
383     =cut
384    
385 root 1.22 sub reg(@) {
386     my ($portid, $name) = @_;
387 root 1.8
388 root 1.22 $REG{$name} = $portid;
389     }
390 root 1.18
391 root 1.22 =item rcv $portid, tagstring => $callback->(@msg), ...
392 root 1.3
393 root 1.22 =item rcv $portid, $smartmatch => $callback->(@msg), ...
394 root 1.3
395 root 1.22 =item rcv $portid, [$smartmatch...] => $callback->(@msg), ...
396 root 1.3
397 root 1.22 Register callbacks to be called on matching messages on the given port.
398 root 1.3
399     The callback has to return a true value when its work is done, after
400     which is will be removed, or a false value in which case it will stay
401     registered.
402    
403 root 1.22 The global C<$SELF> (exported by this module) contains C<$portid> while
404     executing the callback.
405    
406     Runtime errors wdurign callback execution will result in the port being
407     C<kil>ed.
408    
409 root 1.3 If the match is an array reference, then it will be matched against the
410     first elements of the message, otherwise only the first element is being
411     matched.
412    
413     Any element in the match that is specified as C<_any_> (a function
414     exported by this module) matches any single element of the message.
415    
416     While not required, it is highly recommended that the first matching
417     element is a string identifying the message. The one-string-only match is
418     also the most efficient match (by far).
419    
420     =cut
421    
422     sub rcv($@) {
423 root 1.22 my ($noderef, $port) = split /#/, shift, 2;
424 root 1.3
425 root 1.22 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
426     or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught";
427    
428     my $self = $PORT_DATA{$port}
429     or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
430    
431     "AnyEvent::MP::Port" eq ref $self
432     or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught";
433    
434     while (@_) {
435     my ($match, $cb) = splice @_, 0, 2;
436    
437     if (!ref $match) {
438     push @{ $self->{rc0}{$match} }, [$cb];
439     } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
440     my ($type, @match) = @$match;
441     @match
442     ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
443     : push @{ $self->{rc0}{$match->[0]} }, [$cb];
444     } else {
445     push @{ $self->{any} }, [$cb, $match];
446     }
447 root 1.3 }
448 root 1.2 }
449    
450 root 1.22 =item $closure = psub { BLOCK }
451 root 1.2
452 root 1.22 Remembers C<$SELF> and creates a closure out of the BLOCK. When the
453     closure is executed, sets up the environment in the same way as in C<rcv>
454     callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
455    
456     This is useful when you register callbacks from C<rcv> callbacks:
457    
458     rcv delayed_reply => sub {
459     my ($delay, @reply) = @_;
460     my $timer = AE::timer $delay, 0, psub {
461     snd @reply, $SELF;
462     };
463     };
464 root 1.3
465 root 1.8 =cut
466 root 1.3
467 root 1.22 sub psub(&) {
468     my $cb = shift;
469 root 1.3
470 root 1.22 my $port = $SELF
471     or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
472 root 1.1
473 root 1.22 sub {
474     local $SELF = $port;
475 root 1.2
476 root 1.22 if (wantarray) {
477     my @res = eval { &$cb };
478     _self_die if $@;
479     @res
480     } else {
481     my $res = eval { &$cb };
482     _self_die if $@;
483     $res
484     }
485     }
486 root 1.2 }
487    
488 root 1.8 =back
489    
490     =head1 FUNCTIONS FOR NODES
491    
492     =over 4
493 root 1.2
494 root 1.29 =item become_public $noderef
495 root 1.8
496     Tells the node to become a public node, i.e. reachable from other nodes.
497    
498 root 1.29 The first argument is the (unresolved) node reference of the local node
499     (if missing then the empty string is used).
500    
501     It is quite common to not specify anything, in which case the local node
502     tries to listen on the default port, or to only specify a port number, in
503     which case AnyEvent::MP tries to guess the local addresses.
504 root 1.2
505 root 1.8 =cut
506 root 1.1
507 root 1.4 =back
508    
509     =head1 NODE MESSAGES
510    
511 root 1.5 Nodes understand the following messages sent to them. Many of them take
512     arguments called C<@reply>, which will simply be used to compose a reply
513     message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and
514     the remaining arguments are simply the message data.
515 root 1.4
516 root 1.29 While other messages exist, they are not public and subject to change.
517    
518 root 1.4 =over 4
519    
520     =cut
521    
522 root 1.22 =item lookup => $name, @reply
523 root 1.3
524 root 1.8 Replies with the port ID of the specified well-known port, or C<undef>.
525 root 1.3
526 root 1.7 =item devnull => ...
527    
528     Generic data sink/CPU heat conversion.
529    
530 root 1.4 =item relay => $port, @msg
531    
532     Simply forwards the message to the given port.
533    
534     =item eval => $string[ @reply]
535    
536     Evaluates the given string. If C<@reply> is given, then a message of the
537 root 1.5 form C<@reply, $@, @evalres> is sent.
538    
539     Example: crash another node.
540    
541     snd $othernode, eval => "exit";
542 root 1.4
543     =item time => @reply
544    
545     Replies the the current node time to C<@reply>.
546    
547 root 1.5 Example: tell the current node to send the current time to C<$myport> in a
548     C<timereply> message.
549    
550     snd $NODE, time => $myport, timereply => 1, 2;
551     # => snd $myport, timereply => 1, 2, <time>
552    
553 root 1.2 =back
554    
555 root 1.26 =head1 AnyEvent::MP vs. Distributed Erlang
556    
557 root 1.27 AnyEvent::MP got lots of its ideas from distributed erlang (erlang node
558     == aemp node, erlang process == aemp port), so many of the documents and
559     programming techniques employed by erlang apply to AnyEvent::MP. Here is a
560     sample:
561    
562     http://www.erlang.se/doc/programming_rules.shtml
563     http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4
564     http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6
565     http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5
566    
567     Despite the similarities, there are also some important differences:
568 root 1.26
569     =over 4
570    
571     =item * Node references contain the recipe on how to contact them.
572    
573     Erlang relies on special naming and DNS to work everywhere in the
574     same way. AEMP relies on each node knowing it's own address(es), with
575     convenience functionality.
576    
577 root 1.27 This means that AEMP requires a less tightly controlled environment at the
578     cost of longer node references and a slightly higher management overhead.
579    
580 root 1.26 =item * Erlang uses processes and a mailbox, AEMP does not queue.
581    
582     Erlang uses processes that selctively receive messages, and therefore
583     needs a queue. AEMP is event based, queuing messages would serve no useful
584     purpose.
585    
586     (But see L<Coro::MP> for a more erlang-like process model on top of AEMP).
587    
588     =item * Erlang sends are synchronous, AEMP sends are asynchronous.
589    
590     Sending messages in erlang is synchronous and blocks the process. AEMP
591     sends are immediate, connection establishment is handled in the
592     background.
593    
594     =item * Erlang can silently lose messages, AEMP cannot.
595    
596     Erlang makes few guarantees on messages delivery - messages can get lost
597     without any of the processes realising it (i.e. you send messages a, b,
598     and c, and the other side only receives messages a and c).
599    
600     AEMP guarantees correct ordering, and the guarantee that there are no
601     holes in the message sequence.
602    
603     =item * In erlang, processes can be declared dead and later be found to be
604     alive.
605    
606     In erlang it can happen that a monitored process is declared dead and
607     linked processes get killed, but later it turns out that the process is
608     still alive - and can receive messages.
609    
610     In AEMP, when port monitoring detects a port as dead, then that port will
611     eventually be killed - it cannot happen that a node detects a port as dead
612     and then later sends messages to it, finding it is still alive.
613    
614     =item * Erlang can send messages to the wrong port, AEMP does not.
615    
616     In erlang it is quite possible that a node that restarts reuses a process
617     ID known to other nodes for a completely different process, causing
618     messages destined for that process to end up in an unrelated process.
619    
620     AEMP never reuses port IDs, so old messages or old port IDs floating
621     around in the network will not be sent to an unrelated port.
622    
623     =item * Erlang uses unprotected connections, AEMP uses secure
624     authentication and can use TLS.
625    
626     AEMP can use a proven protocol - SSL/TLS - to protect connections and
627     securely authenticate nodes.
628    
629 root 1.28 =item * The AEMP protocol is optimised for both text-based and binary
630     communications.
631    
632     The AEMP protocol, unlike the erlang protocol, supports both
633     language-independent text-only protocols (good for debugging) and binary,
634     language-specific serialisers (e.g. Storable).
635    
636     It has also been carefully designed to be implementable in other languages
637     with a minimum of work while gracefully degrading fucntionality to make the
638     protocol simple.
639    
640 root 1.26 =back
641    
642 root 1.1 =head1 SEE ALSO
643    
644     L<AnyEvent>.
645    
646     =head1 AUTHOR
647    
648     Marc Lehmann <schmorp@schmorp.de>
649     http://home.schmorp.de/
650    
651     =cut
652    
653     1
654