ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.34
Committed: Wed Aug 5 23:50:46 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.33: +64 -68 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP - multi-processing/message-passing framework
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.22 $NODE # contains this node's noderef
10     NODE # returns this node's noderef
11     NODE $port # returns the noderef of the port
12 root 1.2
13     snd $port, type => data...;
14    
15 root 1.22 $SELF # receiving/own port id in rcv callbacks
16    
17 root 1.2 rcv $port, smartmatch => $cb->($port, @msg);
18    
19     # examples:
20     rcv $port2, ping => sub { snd $_[0], "pong"; 0 };
21     rcv $port1, pong => sub { warn "pong received\n" };
22     snd $port2, ping => $port1;
23    
24     # more, smarter, matches (_any_ is exported by this module)
25     rcv $port, [child_died => $pid] => sub { ...
26     rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
27    
28 root 1.1 =head1 DESCRIPTION
29    
30 root 1.2 This module (-family) implements a simple message passing framework.
31    
32     Despite its simplicity, you can securely message other processes running
33     on the same or other hosts.
34    
35 root 1.23 For an introduction to this module family, see the L<AnyEvent::MP::Intro>
36     manual page.
37    
38     At the moment, this module family is severly broken and underdocumented,
39 root 1.21 so do not use. This was uploaded mainly to reserve the CPAN namespace -
40 root 1.23 stay tuned! The basic API should be finished, however.
41 root 1.6
42 root 1.2 =head1 CONCEPTS
43    
44     =over 4
45    
46     =item port
47    
48 root 1.29 A port is something you can send messages to (with the C<snd> function).
49    
50     Some ports allow you to register C<rcv> handlers that can match specific
51     messages. All C<rcv> handlers will receive messages they match, messages
52     will not be queued.
53 root 1.2
54 root 1.3 =item port id - C<noderef#portname>
55 root 1.2
56 root 1.29 A port id is normaly the concatenation of a noderef, a hash-mark (C<#>) as
57     separator, and a port name (a printable string of unspecified format). An
58 root 1.30 exception is the the node port, whose ID is identical to its node
59 root 1.29 reference.
60 root 1.2
61     =item node
62    
63     A node is a single process containing at least one port - the node
64 root 1.29 port. You can send messages to node ports to find existing ports or to
65     create new ports, among other things.
66 root 1.2
67 root 1.29 Nodes are either private (single-process only), slaves (connected to a
68     master node only) or public nodes (connectable from unrelated nodes).
69 root 1.2
70 root 1.5 =item noderef - C<host:port,host:port...>, C<id@noderef>, C<id>
71 root 1.2
72 root 1.29 A node reference is a string that either simply identifies the node (for
73     private and slave nodes), or contains a recipe on how to reach a given
74 root 1.2 node (for public nodes).
75    
76 root 1.29 This recipe is simply a comma-separated list of C<address:port> pairs (for
77     TCP/IP, other protocols might look different).
78    
79     Node references come in two flavours: resolved (containing only numerical
80     addresses) or unresolved (where hostnames are used instead of addresses).
81    
82     Before using an unresolved node reference in a message you first have to
83     resolve it.
84    
85 root 1.2 =back
86    
87 root 1.3 =head1 VARIABLES/FUNCTIONS
88 root 1.2
89     =over 4
90    
91 root 1.1 =cut
92    
93     package AnyEvent::MP;
94    
95 root 1.8 use AnyEvent::MP::Base;
96 root 1.2
97 root 1.1 use common::sense;
98    
99 root 1.2 use Carp ();
100    
101 root 1.1 use AE ();
102    
103 root 1.2 use base "Exporter";
104    
105 root 1.25 our $VERSION = '0.1';
106 root 1.8 our @EXPORT = qw(
107 root 1.22 NODE $NODE *SELF node_of _any_
108 root 1.31 resolve_node initialise_node
109 root 1.22 snd rcv mon kil reg psub
110     port
111 root 1.8 );
112 root 1.2
113 root 1.22 our $SELF;
114    
115     sub _self_die() {
116     my $msg = $@;
117     $msg =~ s/\n+$// unless ref $msg;
118     kil $SELF, die => $msg;
119     }
120    
121     =item $thisnode = NODE / $NODE
122    
123     The C<NODE> function returns, and the C<$NODE> variable contains
124     the noderef of the local node. The value is initialised by a call
125     to C<become_public> or C<become_slave>, after which all local port
126     identifiers become invalid.
127    
128 root 1.33 =item $noderef = node_of $port
129 root 1.22
130     Extracts and returns the noderef from a portid or a noderef.
131    
132 root 1.34 =item initialise_node $noderef, $seednode, $seednode...
133    
134     =item initialise_node "slave/", $master, $master...
135    
136     Before a node can talk to other nodes on the network it has to initialise
137     itself - the minimum a node needs to know is it's own name, and optionally
138     it should know the noderefs of some other nodes in the network.
139    
140     This function initialises a node - it must be called exactly once (or
141     never) before calling other AnyEvent::MP functions.
142    
143     All arguments are noderefs, which can be either resolved or unresolved.
144    
145     There are two types of networked nodes, public nodes and slave nodes:
146    
147     =over 4
148    
149     =item public nodes
150    
151     For public nodes, C<$noderef> must either be a (possibly unresolved)
152     noderef, in which case it will be resolved, or C<undef> (or missing), in
153     which case the noderef will be guessed.
154    
155     Afterwards, the node will bind itself on all endpoints and try to connect
156     to all additional C<$seednodes> that are specified. Seednodes are optional
157     and can be used to quickly bootstrap the node into an existing network.
158    
159     =item slave nodes
160    
161     When the C<$noderef> is the special string C<slave/>, then the node will
162     become a slave node. Slave nodes cannot be contacted from outside and will
163     route most of their traffic to the master node that they attach to.
164    
165     At least one additional noderef is required: The node will try to connect
166     to all of them and will become a slave attached to the first node it can
167     successfully connect to.
168    
169     =back
170    
171     This function will block until all nodes have been resolved and, for slave
172     nodes, until it has successfully established a connection to a master
173     server.
174    
175     Example: become a public node listening on the default node.
176    
177     initialise_node;
178    
179     Example: become a public node, and try to contact some well-known master
180     servers to become part of the network.
181    
182     initialise_node undef, "master1", "master2";
183    
184     Example: become a public node listening on port C<4041>.
185    
186     initialise_node 4041;
187    
188     Example: become a public node, only visible on localhost port 4044.
189    
190     initialise_node "locahost:4044";
191    
192     Example: become a slave node to any of the specified master servers.
193    
194     initialise_node "slave/", "master1", "192.168.13.17", "mp.example.net";
195    
196 root 1.29 =item $cv = resolve_node $noderef
197    
198     Takes an unresolved node reference that may contain hostnames and
199     abbreviated IDs, resolves all of them and returns a resolved node
200     reference.
201    
202     In addition to C<address:port> pairs allowed in resolved noderefs, the
203     following forms are supported:
204    
205     =over 4
206    
207     =item the empty string
208    
209     An empty-string component gets resolved as if the default port (4040) was
210     specified.
211    
212     =item naked port numbers (e.g. C<1234>)
213    
214     These are resolved by prepending the local nodename and a colon, to be
215     further resolved.
216    
217     =item hostnames (e.g. C<localhost:1234>, C<localhost>)
218    
219     These are resolved by using AnyEvent::DNS to resolve them, optionally
220     looking up SRV records for the C<aemp=4040> port, if no port was
221     specified.
222    
223     =back
224    
225 root 1.22 =item $SELF
226    
227     Contains the current port id while executing C<rcv> callbacks or C<psub>
228     blocks.
229 root 1.3
230 root 1.22 =item SELF, %SELF, @SELF...
231    
232     Due to some quirks in how perl exports variables, it is impossible to
233     just export C<$SELF>, all the symbols called C<SELF> are exported by this
234     module, but only C<$SELF> is currently used.
235 root 1.3
236 root 1.33 =item snd $port, type => @data
237 root 1.3
238 root 1.33 =item snd $port, @msg
239 root 1.3
240 root 1.8 Send the given message to the given port ID, which can identify either
241     a local or a remote port, and can be either a string or soemthignt hat
242     stringifies a sa port ID (such as a port object :).
243    
244     While the message can be about anything, it is highly recommended to use a
245     string as first element (a portid, or some word that indicates a request
246     type etc.).
247 root 1.3
248     The message data effectively becomes read-only after a call to this
249     function: modifying any argument is not allowed and can cause many
250     problems.
251    
252     The type of data you can transfer depends on the transport protocol: when
253     JSON is used, then only strings, numbers and arrays and hashes consisting
254     of those are allowed (no objects). When Storable is used, then anything
255     that Storable can serialise and deserialise is allowed, and for the local
256     node, anything can be passed.
257    
258 root 1.22 =item $local_port = port
259 root 1.2
260 root 1.31 Create a new local port object that can be used either as a pattern
261     matching port ("full port") or a single-callback port ("miniport"),
262     depending on how C<rcv> callbacks are bound to the object.
263 root 1.3
264 root 1.33 =item $port = port { my @msg = @_; $finished }
265 root 1.10
266 root 1.33 Creates a "miniport", that is, a very lightweight port without any pattern
267     matching behind it, and returns its ID. Semantically the same as creating
268     a port and calling C<rcv $port, $callback> on it.
269 root 1.15
270     The block will be called for every message received on the port. When the
271     callback returns a true value its job is considered "done" and the port
272     will be destroyed. Otherwise it will stay alive.
273    
274 root 1.17 The message will be passed as-is, no extra argument (i.e. no port id) will
275 root 1.15 be passed to the callback.
276    
277     If you need the local port id in the callback, this works nicely:
278    
279 root 1.31 my $port; $port = port {
280 root 1.15 snd $otherport, reply => $port;
281     };
282 root 1.10
283     =cut
284    
285 root 1.33 sub rcv($@);
286    
287 root 1.22 sub port(;&) {
288     my $id = "$UNIQ." . $ID++;
289     my $port = "$NODE#$id";
290    
291     if (@_) {
292 root 1.33 rcv $port, shift;
293 root 1.22 } else {
294 root 1.33 $PORT{$id} = sub { }; # nop
295 root 1.22 }
296 root 1.10
297 root 1.22 $port
298 root 1.10 }
299    
300 root 1.33 =item reg $port, $name
301 root 1.8
302 root 1.22 Registers the given port under the name C<$name>. If the name already
303     exists it is replaced.
304 root 1.8
305 root 1.22 A port can only be registered under one well known name.
306 root 1.8
307 root 1.22 A port automatically becomes unregistered when it is killed.
308 root 1.8
309     =cut
310    
311 root 1.22 sub reg(@) {
312 root 1.33 my ($port, $name) = @_;
313 root 1.8
314 root 1.33 $REG{$name} = $port;
315 root 1.22 }
316 root 1.18
317 root 1.33 =item rcv $port, $callback->(@msg)
318 root 1.31
319 root 1.33 Replaces the callback on the specified miniport (after converting it to
320     one if required).
321 root 1.31
322 root 1.33 =item rcv $port, tagstring => $callback->(@msg), ...
323 root 1.3
324 root 1.33 =item rcv $port, $smartmatch => $callback->(@msg), ...
325 root 1.3
326 root 1.33 =item rcv $port, [$smartmatch...] => $callback->(@msg), ...
327 root 1.3
328 root 1.32 Register callbacks to be called on matching messages on the given full
329 root 1.33 port (after converting it to one if required).
330 root 1.3
331     The callback has to return a true value when its work is done, after
332     which is will be removed, or a false value in which case it will stay
333     registered.
334    
335 root 1.33 The global C<$SELF> (exported by this module) contains C<$port> while
336 root 1.22 executing the callback.
337    
338     Runtime errors wdurign callback execution will result in the port being
339     C<kil>ed.
340    
341 root 1.3 If the match is an array reference, then it will be matched against the
342     first elements of the message, otherwise only the first element is being
343     matched.
344    
345     Any element in the match that is specified as C<_any_> (a function
346     exported by this module) matches any single element of the message.
347    
348     While not required, it is highly recommended that the first matching
349     element is a string identifying the message. The one-string-only match is
350     also the most efficient match (by far).
351    
352     =cut
353    
354     sub rcv($@) {
355 root 1.33 my $port = shift;
356     my ($noderef, $portid) = split /#/, $port, 2;
357 root 1.3
358 root 1.22 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
359 root 1.33 or Carp::croak "$port: rcv can only be called on local ports, caught";
360 root 1.22
361 root 1.33 if (@_ == 1) {
362     my $cb = shift;
363     delete $PORT_DATA{$portid};
364     $PORT{$portid} = sub {
365     local $SELF = $port;
366     eval {
367     &$cb
368     and kil $port;
369     };
370     _self_die if $@;
371     };
372     } else {
373     my $self = $PORT_DATA{$portid} ||= do {
374     my $self = bless {
375     id => $port,
376     }, "AnyEvent::MP::Port";
377    
378     $PORT{$portid} = sub {
379     local $SELF = $port;
380    
381     eval {
382     for (@{ $self->{rc0}{$_[0]} }) {
383     $_ && &{$_->[0]}
384     && undef $_;
385     }
386    
387     for (@{ $self->{rcv}{$_[0]} }) {
388     $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
389     && &{$_->[0]}
390     && undef $_;
391     }
392    
393     for (@{ $self->{any} }) {
394     $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
395     && &{$_->[0]}
396     && undef $_;
397     }
398     };
399     _self_die if $@;
400     };
401    
402     $self
403     };
404 root 1.22
405 root 1.33 "AnyEvent::MP::Port" eq ref $self
406     or Carp::croak "$port: rcv can only be called on message matching ports, caught";
407 root 1.22
408 root 1.33 while (@_) {
409     my ($match, $cb) = splice @_, 0, 2;
410    
411     if (!ref $match) {
412     push @{ $self->{rc0}{$match} }, [$cb];
413     } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
414     my ($type, @match) = @$match;
415     @match
416     ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
417     : push @{ $self->{rc0}{$match->[0]} }, [$cb];
418     } else {
419     push @{ $self->{any} }, [$cb, $match];
420     }
421 root 1.22 }
422 root 1.3 }
423 root 1.31
424 root 1.33 $port
425 root 1.2 }
426    
427 root 1.22 =item $closure = psub { BLOCK }
428 root 1.2
429 root 1.22 Remembers C<$SELF> and creates a closure out of the BLOCK. When the
430     closure is executed, sets up the environment in the same way as in C<rcv>
431     callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
432    
433     This is useful when you register callbacks from C<rcv> callbacks:
434    
435     rcv delayed_reply => sub {
436     my ($delay, @reply) = @_;
437     my $timer = AE::timer $delay, 0, psub {
438     snd @reply, $SELF;
439     };
440     };
441 root 1.3
442 root 1.8 =cut
443 root 1.3
444 root 1.22 sub psub(&) {
445     my $cb = shift;
446 root 1.3
447 root 1.22 my $port = $SELF
448     or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
449 root 1.1
450 root 1.22 sub {
451     local $SELF = $port;
452 root 1.2
453 root 1.22 if (wantarray) {
454     my @res = eval { &$cb };
455     _self_die if $@;
456     @res
457     } else {
458     my $res = eval { &$cb };
459     _self_die if $@;
460     $res
461     }
462     }
463 root 1.2 }
464    
465 root 1.33 =item $guard = mon $port, $cb->(@reason)
466 root 1.32
467 root 1.33 =item $guard = mon $port, $otherport
468 root 1.32
469 root 1.33 =item $guard = mon $port, $otherport, @msg
470 root 1.32
471     Monitor the given port and do something when the port is killed.
472    
473     In the first form, the callback is simply called with any number
474     of C<@reason> elements (no @reason means that the port was deleted
475     "normally"). Note also that I<< the callback B<must> never die >>, so use
476     C<eval> if unsure.
477    
478     In the second form, the other port will be C<kil>'ed with C<@reason>, iff
479     a @reason was specified, i.e. on "normal" kils nothing happens, while
480     under all other conditions, the other port is killed with the same reason.
481    
482     In the last form, a message of the form C<@msg, @reason> will be C<snd>.
483    
484     Example: call a given callback when C<$port> is killed.
485    
486     mon $port, sub { warn "port died because of <@_>\n" };
487    
488     Example: kill ourselves when C<$port> is killed abnormally.
489    
490     mon $port, $self;
491    
492     Example: send us a restart message another C<$port> is killed.
493    
494     mon $port, $self => "restart";
495    
496     =cut
497    
498     sub mon {
499     my ($noderef, $port) = split /#/, shift, 2;
500    
501     my $node = $NODE{$noderef} || add_node $noderef;
502    
503     my $cb = shift;
504    
505     unless (ref $cb) {
506     if (@_) {
507     # send a kill info message
508     my (@msg) = ($cb, @_);
509     $cb = sub { snd @msg, @_ };
510     } else {
511     # simply kill other port
512     my $port = $cb;
513     $cb = sub { kil $port, @_ if @_ };
514     }
515     }
516    
517     $node->monitor ($port, $cb);
518    
519     defined wantarray
520     and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }
521     }
522    
523     =item $guard = mon_guard $port, $ref, $ref...
524    
525     Monitors the given C<$port> and keeps the passed references. When the port
526     is killed, the references will be freed.
527    
528     Optionally returns a guard that will stop the monitoring.
529    
530     This function is useful when you create e.g. timers or other watchers and
531     want to free them when the port gets killed:
532    
533     $port->rcv (start => sub {
534     my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub {
535     undef $timer if 0.9 < rand;
536     });
537     });
538    
539     =cut
540    
541     sub mon_guard {
542     my ($port, @refs) = @_;
543    
544     mon $port, sub { 0 && @refs }
545     }
546    
547     =item lnk $port1, $port2
548    
549     Link two ports. This is simply a shorthand for:
550    
551     mon $port1, $port2;
552     mon $port2, $port1;
553    
554     It means that if either one is killed abnormally, the other one gets
555     killed as well.
556    
557 root 1.33 =item kil $port[, @reason]
558 root 1.32
559     Kill the specified port with the given C<@reason>.
560    
561     If no C<@reason> is specified, then the port is killed "normally" (linked
562     ports will not be kileld, or even notified).
563    
564     Otherwise, linked ports get killed with the same reason (second form of
565     C<mon>, see below).
566    
567     Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
568     will be reported as reason C<< die => $@ >>.
569    
570     Transport/communication errors are reported as C<< transport_error =>
571     $message >>.
572    
573 root 1.8 =back
574    
575 root 1.4 =head1 NODE MESSAGES
576    
577 root 1.5 Nodes understand the following messages sent to them. Many of them take
578     arguments called C<@reply>, which will simply be used to compose a reply
579     message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and
580     the remaining arguments are simply the message data.
581 root 1.4
582 root 1.29 While other messages exist, they are not public and subject to change.
583    
584 root 1.4 =over 4
585    
586     =cut
587    
588 root 1.22 =item lookup => $name, @reply
589 root 1.3
590 root 1.8 Replies with the port ID of the specified well-known port, or C<undef>.
591 root 1.3
592 root 1.7 =item devnull => ...
593    
594     Generic data sink/CPU heat conversion.
595    
596 root 1.4 =item relay => $port, @msg
597    
598     Simply forwards the message to the given port.
599    
600     =item eval => $string[ @reply]
601    
602     Evaluates the given string. If C<@reply> is given, then a message of the
603 root 1.5 form C<@reply, $@, @evalres> is sent.
604    
605     Example: crash another node.
606    
607     snd $othernode, eval => "exit";
608 root 1.4
609     =item time => @reply
610    
611     Replies the the current node time to C<@reply>.
612    
613 root 1.5 Example: tell the current node to send the current time to C<$myport> in a
614     C<timereply> message.
615    
616     snd $NODE, time => $myport, timereply => 1, 2;
617     # => snd $myport, timereply => 1, 2, <time>
618    
619 root 1.2 =back
620    
621 root 1.26 =head1 AnyEvent::MP vs. Distributed Erlang
622    
623 root 1.27 AnyEvent::MP got lots of its ideas from distributed erlang (erlang node
624     == aemp node, erlang process == aemp port), so many of the documents and
625     programming techniques employed by erlang apply to AnyEvent::MP. Here is a
626     sample:
627    
628     http://www.erlang.se/doc/programming_rules.shtml
629     http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4
630     http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6
631     http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5
632    
633     Despite the similarities, there are also some important differences:
634 root 1.26
635     =over 4
636    
637     =item * Node references contain the recipe on how to contact them.
638    
639     Erlang relies on special naming and DNS to work everywhere in the
640     same way. AEMP relies on each node knowing it's own address(es), with
641     convenience functionality.
642    
643 root 1.27 This means that AEMP requires a less tightly controlled environment at the
644     cost of longer node references and a slightly higher management overhead.
645    
646 root 1.26 =item * Erlang uses processes and a mailbox, AEMP does not queue.
647    
648     Erlang uses processes that selctively receive messages, and therefore
649     needs a queue. AEMP is event based, queuing messages would serve no useful
650     purpose.
651    
652     (But see L<Coro::MP> for a more erlang-like process model on top of AEMP).
653    
654     =item * Erlang sends are synchronous, AEMP sends are asynchronous.
655    
656     Sending messages in erlang is synchronous and blocks the process. AEMP
657     sends are immediate, connection establishment is handled in the
658     background.
659    
660     =item * Erlang can silently lose messages, AEMP cannot.
661    
662     Erlang makes few guarantees on messages delivery - messages can get lost
663     without any of the processes realising it (i.e. you send messages a, b,
664     and c, and the other side only receives messages a and c).
665    
666     AEMP guarantees correct ordering, and the guarantee that there are no
667     holes in the message sequence.
668    
669     =item * In erlang, processes can be declared dead and later be found to be
670     alive.
671    
672     In erlang it can happen that a monitored process is declared dead and
673     linked processes get killed, but later it turns out that the process is
674     still alive - and can receive messages.
675    
676     In AEMP, when port monitoring detects a port as dead, then that port will
677     eventually be killed - it cannot happen that a node detects a port as dead
678     and then later sends messages to it, finding it is still alive.
679    
680     =item * Erlang can send messages to the wrong port, AEMP does not.
681    
682     In erlang it is quite possible that a node that restarts reuses a process
683     ID known to other nodes for a completely different process, causing
684     messages destined for that process to end up in an unrelated process.
685    
686     AEMP never reuses port IDs, so old messages or old port IDs floating
687     around in the network will not be sent to an unrelated port.
688    
689     =item * Erlang uses unprotected connections, AEMP uses secure
690     authentication and can use TLS.
691    
692     AEMP can use a proven protocol - SSL/TLS - to protect connections and
693     securely authenticate nodes.
694    
695 root 1.28 =item * The AEMP protocol is optimised for both text-based and binary
696     communications.
697    
698     The AEMP protocol, unlike the erlang protocol, supports both
699     language-independent text-only protocols (good for debugging) and binary,
700     language-specific serialisers (e.g. Storable).
701    
702     It has also been carefully designed to be implementable in other languages
703     with a minimum of work while gracefully degrading fucntionality to make the
704     protocol simple.
705    
706 root 1.26 =back
707    
708 root 1.1 =head1 SEE ALSO
709    
710     L<AnyEvent>.
711    
712     =head1 AUTHOR
713    
714     Marc Lehmann <schmorp@schmorp.de>
715     http://home.schmorp.de/
716    
717     =cut
718    
719     1
720