ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.122
Committed: Wed Feb 29 18:44:59 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.121: +6 -3 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.89 AnyEvent::MP - erlang-style multi-processing/message-passing framework
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.75 $NODE # contains this node's node ID
10     NODE # returns this node's node ID
11 root 1.2
12 root 1.38 $SELF # receiving/own port id in rcv callbacks
13    
14 root 1.48 # initialise the node so it can send/receive messages
15 root 1.72 configure;
16 root 1.48
17 root 1.75 # ports are message destinations
18 root 1.38
19     # sending messages
20 root 1.2 snd $port, type => data...;
21 root 1.38 snd $port, @msg;
22     snd @msg_with_first_element_being_a_port;
23 root 1.2
24 root 1.50 # creating/using ports, the simple way
25 root 1.73 my $simple_port = port { my @msg = @_ };
26 root 1.22
27 root 1.52 # creating/using ports, tagged message matching
28 root 1.38 my $port = port;
29 root 1.73 rcv $port, ping => sub { snd $_[0], "pong" };
30     rcv $port, pong => sub { warn "pong received\n" };
31 root 1.2
32 root 1.48 # create a port on another node
33     my $port = spawn $node, $initfunc, @initdata;
34    
35 root 1.116 # destroy a port again
36 root 1.101 kil $port; # "normal" kill
37     kil $port, my_error => "everything is broken"; # error kill
38    
39 root 1.35 # monitoring
40 root 1.90 mon $localport, $cb->(@msg) # callback is invoked on death
41     mon $localport, $otherport # kill otherport on abnormal death
42     mon $localport, $otherport, @msg # send message on death
43 root 1.35
44 root 1.101 # temporarily execute code in port context
45     peval $port, sub { die "kill the port!" };
46    
47     # execute callbacks in $SELF port context
48     my $timer = AE::timer 1, 0, psub {
49     die "kill the port, delayed";
50     };
51    
52 root 1.45 =head1 CURRENT STATUS
53    
54 root 1.71 bin/aemp - stable.
55     AnyEvent::MP - stable API, should work.
56 elmex 1.77 AnyEvent::MP::Intro - explains most concepts.
57 root 1.88 AnyEvent::MP::Kernel - mostly stable API.
58     AnyEvent::MP::Global - stable API.
59 root 1.45
60 root 1.1 =head1 DESCRIPTION
61    
62 root 1.2 This module (-family) implements a simple message passing framework.
63    
64     Despite its simplicity, you can securely message other processes running
65 root 1.67 on the same or other hosts, and you can supervise entities remotely.
66 root 1.2
67 root 1.23 For an introduction to this module family, see the L<AnyEvent::MP::Intro>
68 root 1.67 manual page and the examples under F<eg/>.
69 root 1.23
70 root 1.2 =head1 CONCEPTS
71    
72     =over 4
73    
74     =item port
75    
76 root 1.79 Not to be confused with a TCP port, a "port" is something you can send
77     messages to (with the C<snd> function).
78 root 1.29
79 root 1.53 Ports allow you to register C<rcv> handlers that can match all or just
80 root 1.64 some messages. Messages send to ports will not be queued, regardless of
81     anything was listening for them or not.
82 root 1.2
83 root 1.119 Ports are represented by (printable) strings called "port IDs".
84    
85 root 1.67 =item port ID - C<nodeid#portname>
86 root 1.2
87 root 1.67 A port ID is the concatenation of a node ID, a hash-mark (C<#>) as
88     separator, and a port name (a printable string of unspecified format).
89 root 1.2
90     =item node
91    
92 root 1.53 A node is a single process containing at least one port - the node port,
93 root 1.67 which enables nodes to manage each other remotely, and to create new
94 root 1.53 ports.
95 root 1.2
96 root 1.67 Nodes are either public (have one or more listening ports) or private
97     (no listening ports). Private nodes cannot talk to other private nodes
98 root 1.119 currently, but all nodes can talk to public nodes.
99    
100     Nodes is represented by (printable) strings called "node IDs".
101 root 1.2
102 root 1.117 =item node ID - C<[A-Za-z0-9_\-.:]*>
103 root 1.2
104 root 1.64 A node ID is a string that uniquely identifies the node within a
105     network. Depending on the configuration used, node IDs can look like a
106     hostname, a hostname and a port, or a random string. AnyEvent::MP itself
107 root 1.119 doesn't interpret node IDs in any way except to uniquely identify a node.
108 root 1.64
109     =item binds - C<ip:port>
110    
111     Nodes can only talk to each other by creating some kind of connection to
112     each other. To do this, nodes should listen on one or more local transport
113 root 1.119 endpoints - binds.
114    
115     Currently, only standard C<ip:port> specifications can be used, which
116     specify TCP ports to listen on. So a bind is basically just a tcp socket
117     in listening mode thta accepts conenctions form other nodes.
118 root 1.64
119 root 1.83 =item seed nodes
120 root 1.64
121 root 1.119 When a node starts, it knows nothing about the network it is in - it
122     needs to connect to at least one other node that is already in the
123     network. These other nodes are called "seed nodes".
124    
125     Seed nodes themselves are not special - they are seed nodes only because
126     some other node I<uses> them as such, but any node can be used as seed
127     node for other nodes, and eahc node cna use a different set of seed nodes.
128 root 1.83
129     In addition to discovering the network, seed nodes are also used to
130 root 1.119 maintain the network - all nodes using the same seed node form are part of
131     the same network. If a network is split into multiple subnets because e.g.
132     the network link between the parts goes down, then using the same seed
133     nodes for all nodes ensures that eventually the subnets get merged again.
134 root 1.83
135     Seed nodes are expected to be long-running, and at least one seed node
136 root 1.85 should always be available. They should also be relatively responsive - a
137     seed node that blocks for long periods will slow down everybody else.
138 root 1.83
139 root 1.119 For small networks, it's best if every node uses the same set of seed
140     nodes. For large networks, it can be useful to specify "regional" seed
141     nodes for most nodes in an area, and use all seed nodes as seed nodes for
142     each other. What's important is that all seed nodes connections form a
143     complete graph, so that the network cannot split into separate subnets
144     forever.
145    
146     Seed nodes are represented by seed IDs.
147    
148     =item seed IDs - C<host:port>
149 root 1.83
150 root 1.119 Seed IDs are transport endpoint(s) (usually a hostname/IP address and a
151 elmex 1.96 TCP port) of nodes that should be used as seed nodes.
152 root 1.29
153 root 1.119 =item global nodes
154    
155     An AEMP network needs a discovery service - nodes need to know how to
156     connect to other nodes they only know by name. In addition, AEMP offers a
157     distributed "group database", which maps group names to a list of strings
158     - for example, to register worker ports.
159    
160     A network needs at least one global node to work, and allows every node to
161     be a global node.
162    
163     Any node that loads the L<AnyEvent::MP::Global> module becomes a global
164     node and tries to keep connections to all other nodes. So while it can
165     make sense to make every node "global" in small networks, it usually makes
166     sense to only make seed nodes into global nodes in large networks (nodes
167     keep connections to seed nodes and global nodes, so makign them the same
168     reduces overhead).
169 root 1.67
170 root 1.2 =back
171    
172 root 1.3 =head1 VARIABLES/FUNCTIONS
173 root 1.2
174     =over 4
175    
176 root 1.1 =cut
177    
178     package AnyEvent::MP;
179    
180 root 1.121 use AnyEvent::MP::Config ();
181 root 1.44 use AnyEvent::MP::Kernel;
182 root 1.121 use AnyEvent::MP::Kernel qw(%NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID);
183 root 1.2
184 root 1.1 use common::sense;
185    
186 root 1.2 use Carp ();
187    
188 root 1.1 use AE ();
189    
190 root 1.2 use base "Exporter";
191    
192 root 1.121 our $VERSION = $AnyEvent::MP::Config::VERSION;
193 root 1.43
194 root 1.8 our @EXPORT = qw(
195 root 1.59 NODE $NODE *SELF node_of after
196 root 1.72 configure
197 root 1.101 snd rcv mon mon_guard kil psub peval spawn cal
198 root 1.22 port
199 root 1.8 );
200 root 1.2
201 root 1.22 our $SELF;
202    
203     sub _self_die() {
204     my $msg = $@;
205     $msg =~ s/\n+$// unless ref $msg;
206     kil $SELF, die => $msg;
207     }
208    
209     =item $thisnode = NODE / $NODE
210    
211 root 1.67 The C<NODE> function returns, and the C<$NODE> variable contains, the node
212 root 1.64 ID of the node running in the current process. This value is initialised by
213 root 1.72 a call to C<configure>.
214 root 1.22
215 root 1.63 =item $nodeid = node_of $port
216 root 1.22
217 root 1.67 Extracts and returns the node ID from a port ID or a node ID.
218 root 1.34
219 root 1.78 =item configure $profile, key => value...
220    
221 root 1.72 =item configure key => value...
222 root 1.34
223 root 1.64 Before a node can talk to other nodes on the network (i.e. enter
224 root 1.72 "distributed mode") it has to configure itself - the minimum a node needs
225 root 1.64 to know is its own name, and optionally it should know the addresses of
226     some other nodes in the network to discover other nodes.
227 root 1.34
228 root 1.121 This function configures a node - it must be called exactly once (or
229     never) before calling other AnyEvent::MP functions.
230    
231 root 1.108 The key/value pairs are basically the same ones as documented for the
232 root 1.121 F<aemp> command line utility (sans the set/del prefix), with two additions:
233    
234     =over 4
235    
236     =item norc => $boolean (default false)
237    
238     If true, then the rc file (e.g. F<~/.perl-anyevent-mp>) will I<not>
239     be consulted - all configuraiton options must be specified in the
240     C<configure> call.
241 root 1.108
242 root 1.121 =item force => $boolean (default false)
243    
244     IF true, then the values specified in the C<configure> will take
245     precedence over any values configured via the rc file. The default is for
246     the rc file to override any options specified in the program.
247    
248     =back
249 root 1.34
250 root 1.72 =over 4
251    
252     =item step 1, gathering configuration from profiles
253    
254     The function first looks up a profile in the aemp configuration (see the
255     L<aemp> commandline utility). The profile name can be specified via the
256 root 1.78 named C<profile> parameter or can simply be the first parameter). If it is
257     missing, then the nodename (F<uname -n>) will be used as profile name.
258 root 1.34
259 root 1.72 The profile data is then gathered as follows:
260 root 1.69
261 elmex 1.77 First, all remaining key => value pairs (all of which are conveniently
262 root 1.72 undocumented at the moment) will be interpreted as configuration
263     data. Then they will be overwritten by any values specified in the global
264     default configuration (see the F<aemp> utility), then the chain of
265     profiles chosen by the profile name (and any C<parent> attributes).
266    
267     That means that the values specified in the profile have highest priority
268     and the values specified directly via C<configure> have lowest priority,
269     and can only be used to specify defaults.
270 root 1.49
271 root 1.64 If the profile specifies a node ID, then this will become the node ID of
272 root 1.122 this process. If not, then the profile name will be used as node ID, with
273     a slash (C</>) attached.
274    
275     If the node ID (or profile name) ends with a slash (C</>), then a random
276     string is appended to make it unique.
277 root 1.64
278 root 1.72 =item step 2, bind listener sockets
279    
280 root 1.64 The next step is to look up the binds in the profile, followed by binding
281     aemp protocol listeners on all binds specified (it is possible and valid
282     to have no binds, meaning that the node cannot be contacted form the
283     outside. This means the node cannot talk to other nodes that also have no
284     binds, but it can still talk to all "normal" nodes).
285    
286 root 1.70 If the profile does not specify a binds list, then a default of C<*> is
287 root 1.72 used, meaning the node will bind on a dynamically-assigned port on every
288     local IP address it finds.
289    
290     =item step 3, connect to seed nodes
291 root 1.64
292 root 1.119 As the last step, the seed ID list from the profile is passed to the
293 root 1.64 L<AnyEvent::MP::Global> module, which will then use it to keep
294 root 1.72 connectivity with at least one node at any point in time.
295 root 1.64
296 root 1.72 =back
297    
298 root 1.87 Example: become a distributed node using the local node name as profile.
299 root 1.72 This should be the most common form of invocation for "daemon"-type nodes.
300 root 1.34
301 root 1.72 configure
302 root 1.34
303 root 1.64 Example: become an anonymous node. This form is often used for commandline
304     clients.
305 root 1.34
306 root 1.72 configure nodeid => "anon/";
307    
308 root 1.120 Example: configure a node using a profile called seed, which is suitable
309 root 1.72 for a seed node as it binds on all local addresses on a fixed port (4040,
310     customary for aemp).
311    
312     # use the aemp commandline utility
313 root 1.122 # aemp profile seed binds '*:4040'
314 root 1.72
315     # then use it
316     configure profile => "seed";
317 root 1.34
318 root 1.72 # or simply use aemp from the shell again:
319     # aemp run profile seed
320 root 1.34
321 root 1.72 # or provide a nicer-to-remember nodeid
322     # aemp run profile seed nodeid "$(hostname)"
323 root 1.34
324 root 1.22 =item $SELF
325    
326     Contains the current port id while executing C<rcv> callbacks or C<psub>
327     blocks.
328 root 1.3
329 root 1.67 =item *SELF, SELF, %SELF, @SELF...
330 root 1.22
331     Due to some quirks in how perl exports variables, it is impossible to
332 root 1.67 just export C<$SELF>, all the symbols named C<SELF> are exported by this
333 root 1.22 module, but only C<$SELF> is currently used.
334 root 1.3
335 root 1.33 =item snd $port, type => @data
336 root 1.3
337 root 1.33 =item snd $port, @msg
338 root 1.3
339 root 1.67 Send the given message to the given port, which can identify either a
340     local or a remote port, and must be a port ID.
341 root 1.8
342 root 1.67 While the message can be almost anything, it is highly recommended to
343     use a string as first element (a port ID, or some word that indicates a
344     request type etc.) and to consist if only simple perl values (scalars,
345     arrays, hashes) - if you think you need to pass an object, think again.
346    
347     The message data logically becomes read-only after a call to this
348     function: modifying any argument (or values referenced by them) is
349     forbidden, as there can be considerable time between the call to C<snd>
350     and the time the message is actually being serialised - in fact, it might
351     never be copied as within the same process it is simply handed to the
352     receiving port.
353 root 1.3
354     The type of data you can transfer depends on the transport protocol: when
355     JSON is used, then only strings, numbers and arrays and hashes consisting
356     of those are allowed (no objects). When Storable is used, then anything
357     that Storable can serialise and deserialise is allowed, and for the local
358 root 1.67 node, anything can be passed. Best rely only on the common denominator of
359     these.
360 root 1.3
361 root 1.22 =item $local_port = port
362 root 1.2
363 root 1.50 Create a new local port object and returns its port ID. Initially it has
364     no callbacks set and will throw an error when it receives messages.
365 root 1.10
366 root 1.50 =item $local_port = port { my @msg = @_ }
367 root 1.15
368 root 1.50 Creates a new local port, and returns its ID. Semantically the same as
369     creating a port and calling C<rcv $port, $callback> on it.
370 root 1.15
371 root 1.50 The block will be called for every message received on the port, with the
372     global variable C<$SELF> set to the port ID. Runtime errors will cause the
373     port to be C<kil>ed. The message will be passed as-is, no extra argument
374     (i.e. no port ID) will be passed to the callback.
375 root 1.15
376 root 1.50 If you want to stop/destroy the port, simply C<kil> it:
377 root 1.15
378 root 1.50 my $port = port {
379     my @msg = @_;
380     ...
381     kil $SELF;
382 root 1.15 };
383 root 1.10
384     =cut
385    
386 root 1.33 sub rcv($@);
387    
388 root 1.50 sub _kilme {
389     die "received message on port without callback";
390     }
391    
392 root 1.22 sub port(;&) {
393 root 1.121 my $id = "$UNIQ." . ++$ID;
394 root 1.22 my $port = "$NODE#$id";
395    
396 root 1.50 rcv $port, shift || \&_kilme;
397 root 1.10
398 root 1.22 $port
399 root 1.10 }
400    
401 root 1.50 =item rcv $local_port, $callback->(@msg)
402 root 1.31
403 root 1.50 Replaces the default callback on the specified port. There is no way to
404     remove the default callback: use C<sub { }> to disable it, or better
405     C<kil> the port when it is no longer needed.
406 root 1.3
407 root 1.33 The global C<$SELF> (exported by this module) contains C<$port> while
408 root 1.50 executing the callback. Runtime errors during callback execution will
409     result in the port being C<kil>ed.
410 root 1.22
411 root 1.50 The default callback received all messages not matched by a more specific
412     C<tag> match.
413 root 1.22
414 root 1.50 =item rcv $local_port, tag => $callback->(@msg_without_tag), ...
415 root 1.3
416 root 1.54 Register (or replace) callbacks to be called on messages starting with the
417     given tag on the given port (and return the port), or unregister it (when
418     C<$callback> is C<$undef> or missing). There can only be one callback
419     registered for each tag.
420 root 1.3
421 root 1.50 The original message will be passed to the callback, after the first
422     element (the tag) has been removed. The callback will use the same
423     environment as the default callback (see above).
424 root 1.3
425 root 1.36 Example: create a port and bind receivers on it in one go.
426    
427     my $port = rcv port,
428 root 1.50 msg1 => sub { ... },
429     msg2 => sub { ... },
430 root 1.36 ;
431    
432     Example: create a port, bind receivers and send it in a message elsewhere
433     in one go:
434    
435     snd $otherport, reply =>
436     rcv port,
437 root 1.50 msg1 => sub { ... },
438 root 1.36 ...
439     ;
440    
441 root 1.54 Example: temporarily register a rcv callback for a tag matching some port
442 root 1.102 (e.g. for an rpc reply) and unregister it after a message was received.
443 root 1.54
444     rcv $port, $otherport => sub {
445     my @reply = @_;
446    
447     rcv $SELF, $otherport;
448     };
449    
450 root 1.3 =cut
451    
452     sub rcv($@) {
453 root 1.33 my $port = shift;
454 root 1.75 my ($nodeid, $portid) = split /#/, $port, 2;
455 root 1.3
456 root 1.75 $NODE{$nodeid} == $NODE{""}
457 root 1.33 or Carp::croak "$port: rcv can only be called on local ports, caught";
458 root 1.22
459 root 1.50 while (@_) {
460     if (ref $_[0]) {
461     if (my $self = $PORT_DATA{$portid}) {
462     "AnyEvent::MP::Port" eq ref $self
463     or Carp::croak "$port: rcv can only be called on message matching ports, caught";
464 root 1.33
465 root 1.103 $self->[0] = shift;
466 root 1.50 } else {
467     my $cb = shift;
468     $PORT{$portid} = sub {
469     local $SELF = $port;
470     eval { &$cb }; _self_die if $@;
471     };
472     }
473     } elsif (defined $_[0]) {
474     my $self = $PORT_DATA{$portid} ||= do {
475 root 1.103 my $self = bless [$PORT{$portid} || sub { }, { }, $port], "AnyEvent::MP::Port";
476 root 1.50
477     $PORT{$portid} = sub {
478     local $SELF = $port;
479    
480     if (my $cb = $self->[1]{$_[0]}) {
481     shift;
482     eval { &$cb }; _self_die if $@;
483     } else {
484     &{ $self->[0] };
485 root 1.33 }
486     };
487 root 1.50
488     $self
489 root 1.33 };
490    
491 root 1.50 "AnyEvent::MP::Port" eq ref $self
492     or Carp::croak "$port: rcv can only be called on message matching ports, caught";
493 root 1.22
494 root 1.50 my ($tag, $cb) = splice @_, 0, 2;
495 root 1.33
496 root 1.50 if (defined $cb) {
497     $self->[1]{$tag} = $cb;
498 root 1.33 } else {
499 root 1.50 delete $self->[1]{$tag};
500 root 1.33 }
501 root 1.22 }
502 root 1.3 }
503 root 1.31
504 root 1.33 $port
505 root 1.2 }
506    
507 root 1.101 =item peval $port, $coderef[, @args]
508    
509     Evaluates the given C<$codref> within the contetx of C<$port>, that is,
510     when the code throews an exception the C<$port> will be killed.
511    
512     Any remaining args will be passed to the callback. Any return values will
513     be returned to the caller.
514    
515     This is useful when you temporarily want to execute code in the context of
516     a port.
517    
518     Example: create a port and run some initialisation code in it's context.
519    
520     my $port = port { ... };
521    
522     peval $port, sub {
523     init
524     or die "unable to init";
525     };
526    
527     =cut
528    
529     sub peval($$) {
530     local $SELF = shift;
531     my $cb = shift;
532    
533     if (wantarray) {
534     my @res = eval { &$cb };
535     _self_die if $@;
536     @res
537     } else {
538     my $res = eval { &$cb };
539     _self_die if $@;
540     $res
541     }
542     }
543    
544 root 1.22 =item $closure = psub { BLOCK }
545 root 1.2
546 root 1.22 Remembers C<$SELF> and creates a closure out of the BLOCK. When the
547     closure is executed, sets up the environment in the same way as in C<rcv>
548     callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
549    
550 root 1.101 The effect is basically as if it returned C<< sub { peval $SELF, sub {
551 root 1.114 BLOCK }, @_ } >>.
552 root 1.101
553 root 1.22 This is useful when you register callbacks from C<rcv> callbacks:
554    
555     rcv delayed_reply => sub {
556     my ($delay, @reply) = @_;
557     my $timer = AE::timer $delay, 0, psub {
558     snd @reply, $SELF;
559     };
560     };
561 root 1.3
562 root 1.8 =cut
563 root 1.3
564 root 1.22 sub psub(&) {
565     my $cb = shift;
566 root 1.3
567 root 1.22 my $port = $SELF
568     or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
569 root 1.1
570 root 1.22 sub {
571     local $SELF = $port;
572 root 1.2
573 root 1.22 if (wantarray) {
574     my @res = eval { &$cb };
575     _self_die if $@;
576     @res
577     } else {
578     my $res = eval { &$cb };
579     _self_die if $@;
580     $res
581     }
582     }
583 root 1.2 }
584    
585 root 1.67 =item $guard = mon $port, $cb->(@reason) # call $cb when $port dies
586 root 1.32
587 root 1.67 =item $guard = mon $port, $rcvport # kill $rcvport when $port dies
588 root 1.36
589 root 1.67 =item $guard = mon $port # kill $SELF when $port dies
590 root 1.32
591 root 1.67 =item $guard = mon $port, $rcvport, @msg # send a message when $port dies
592 root 1.32
593 root 1.42 Monitor the given port and do something when the port is killed or
594     messages to it were lost, and optionally return a guard that can be used
595     to stop monitoring again.
596    
597 root 1.36 In the first form (callback), the callback is simply called with any
598     number of C<@reason> elements (no @reason means that the port was deleted
599 root 1.32 "normally"). Note also that I<< the callback B<must> never die >>, so use
600     C<eval> if unsure.
601    
602 root 1.43 In the second form (another port given), the other port (C<$rcvport>)
603 elmex 1.77 will be C<kil>'ed with C<@reason>, if a @reason was specified, i.e. on
604 root 1.36 "normal" kils nothing happens, while under all other conditions, the other
605     port is killed with the same reason.
606 root 1.32
607 root 1.36 The third form (kill self) is the same as the second form, except that
608     C<$rvport> defaults to C<$SELF>.
609    
610     In the last form (message), a message of the form C<@msg, @reason> will be
611     C<snd>.
612 root 1.32
613 root 1.79 Monitoring-actions are one-shot: once messages are lost (and a monitoring
614     alert was raised), they are removed and will not trigger again.
615    
616 root 1.37 As a rule of thumb, monitoring requests should always monitor a port from
617     a local port (or callback). The reason is that kill messages might get
618     lost, just like any other message. Another less obvious reason is that
619 elmex 1.77 even monitoring requests can get lost (for example, when the connection
620 root 1.37 to the other node goes down permanently). When monitoring a port locally
621     these problems do not exist.
622    
623 root 1.79 C<mon> effectively guarantees that, in the absence of hardware failures,
624     after starting the monitor, either all messages sent to the port will
625     arrive, or the monitoring action will be invoked after possible message
626     loss has been detected. No messages will be lost "in between" (after
627     the first lost message no further messages will be received by the
628     port). After the monitoring action was invoked, further messages might get
629     delivered again.
630    
631     Inter-host-connection timeouts and monitoring depend on the transport
632     used. The only transport currently implemented is TCP, and AnyEvent::MP
633     relies on TCP to detect node-downs (this can take 10-15 minutes on a
634 elmex 1.96 non-idle connection, and usually around two hours for idle connections).
635 root 1.79
636     This means that monitoring is good for program errors and cleaning up
637     stuff eventually, but they are no replacement for a timeout when you need
638     to ensure some maximum latency.
639    
640 root 1.32 Example: call a given callback when C<$port> is killed.
641    
642     mon $port, sub { warn "port died because of <@_>\n" };
643    
644     Example: kill ourselves when C<$port> is killed abnormally.
645    
646 root 1.36 mon $port;
647 root 1.32
648 root 1.36 Example: send us a restart message when another C<$port> is killed.
649 root 1.32
650     mon $port, $self => "restart";
651    
652     =cut
653    
654     sub mon {
655 root 1.75 my ($nodeid, $port) = split /#/, shift, 2;
656 root 1.32
657 root 1.75 my $node = $NODE{$nodeid} || add_node $nodeid;
658 root 1.32
659 root 1.41 my $cb = @_ ? shift : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,';
660 root 1.32
661     unless (ref $cb) {
662     if (@_) {
663     # send a kill info message
664 root 1.41 my (@msg) = ($cb, @_);
665 root 1.32 $cb = sub { snd @msg, @_ };
666     } else {
667     # simply kill other port
668     my $port = $cb;
669     $cb = sub { kil $port, @_ if @_ };
670     }
671     }
672    
673     $node->monitor ($port, $cb);
674    
675     defined wantarray
676 root 1.94 and ($cb += 0, AnyEvent::Util::guard { $node->unmonitor ($port, $cb) })
677 root 1.32 }
678    
679     =item $guard = mon_guard $port, $ref, $ref...
680    
681     Monitors the given C<$port> and keeps the passed references. When the port
682     is killed, the references will be freed.
683    
684     Optionally returns a guard that will stop the monitoring.
685    
686     This function is useful when you create e.g. timers or other watchers and
687 root 1.67 want to free them when the port gets killed (note the use of C<psub>):
688 root 1.32
689     $port->rcv (start => sub {
690 root 1.67 my $timer; $timer = mon_guard $port, AE::timer 1, 1, psub {
691 root 1.32 undef $timer if 0.9 < rand;
692     });
693     });
694    
695     =cut
696    
697     sub mon_guard {
698     my ($port, @refs) = @_;
699    
700 root 1.36 #TODO: mon-less form?
701    
702 root 1.32 mon $port, sub { 0 && @refs }
703     }
704    
705 root 1.33 =item kil $port[, @reason]
706 root 1.32
707     Kill the specified port with the given C<@reason>.
708    
709 root 1.107 If no C<@reason> is specified, then the port is killed "normally" -
710     monitor callback will be invoked, but the kil will not cause linked ports
711     (C<mon $mport, $lport> form) to get killed.
712 root 1.32
713 root 1.107 If a C<@reason> is specified, then linked ports (C<mon $mport, $lport>
714     form) get killed with the same reason.
715 root 1.32
716     Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
717     will be reported as reason C<< die => $@ >>.
718    
719     Transport/communication errors are reported as C<< transport_error =>
720     $message >>.
721    
722 root 1.38 =cut
723    
724     =item $port = spawn $node, $initfunc[, @initdata]
725    
726     Creates a port on the node C<$node> (which can also be a port ID, in which
727     case it's the node where that port resides).
728    
729 root 1.67 The port ID of the newly created port is returned immediately, and it is
730     possible to immediately start sending messages or to monitor the port.
731 root 1.38
732 root 1.67 After the port has been created, the init function is called on the remote
733     node, in the same context as a C<rcv> callback. This function must be a
734     fully-qualified function name (e.g. C<MyApp::Chat::Server::init>). To
735     specify a function in the main program, use C<::name>.
736 root 1.38
737     If the function doesn't exist, then the node tries to C<require>
738     the package, then the package above the package and so on (e.g.
739     C<MyApp::Chat::Server>, C<MyApp::Chat>, C<MyApp>) until the function
740     exists or it runs out of package names.
741    
742     The init function is then called with the newly-created port as context
743 root 1.82 object (C<$SELF>) and the C<@initdata> values as arguments. It I<must>
744     call one of the C<rcv> functions to set callbacks on C<$SELF>, otherwise
745     the port might not get created.
746 root 1.38
747 root 1.67 A common idiom is to pass a local port, immediately monitor the spawned
748     port, and in the remote init function, immediately monitor the passed
749     local port. This two-way monitoring ensures that both ports get cleaned up
750     when there is a problem.
751 root 1.38
752 root 1.80 C<spawn> guarantees that the C<$initfunc> has no visible effects on the
753     caller before C<spawn> returns (by delaying invocation when spawn is
754     called for the local node).
755    
756 root 1.38 Example: spawn a chat server port on C<$othernode>.
757    
758     # this node, executed from within a port context:
759     my $server = spawn $othernode, "MyApp::Chat::Server::connect", $SELF;
760     mon $server;
761    
762     # init function on C<$othernode>
763     sub connect {
764     my ($srcport) = @_;
765    
766     mon $srcport;
767    
768     rcv $SELF, sub {
769     ...
770     };
771     }
772    
773     =cut
774    
775     sub _spawn {
776     my $port = shift;
777     my $init = shift;
778    
779 root 1.82 # rcv will create the actual port
780 root 1.38 local $SELF = "$NODE#$port";
781     eval {
782     &{ load_func $init }
783     };
784     _self_die if $@;
785     }
786    
787     sub spawn(@) {
788 root 1.75 my ($nodeid, undef) = split /#/, shift, 2;
789 root 1.38
790 root 1.121 my $id = "$RUNIQ." . ++$ID;
791 root 1.38
792 root 1.39 $_[0] =~ /::/
793     or Carp::croak "spawn init function must be a fully-qualified name, caught";
794    
795 root 1.75 snd_to_func $nodeid, "AnyEvent::MP::_spawn" => $id, @_;
796 root 1.38
797 root 1.75 "$nodeid#$id"
798 root 1.38 }
799    
800 root 1.121
801 root 1.59 =item after $timeout, @msg
802    
803     =item after $timeout, $callback
804    
805     Either sends the given message, or call the given callback, after the
806     specified number of seconds.
807    
808 root 1.67 This is simply a utility function that comes in handy at times - the
809     AnyEvent::MP author is not convinced of the wisdom of having it, though,
810     so it may go away in the future.
811 root 1.59
812     =cut
813    
814     sub after($@) {
815     my ($timeout, @action) = @_;
816    
817     my $t; $t = AE::timer $timeout, 0, sub {
818     undef $t;
819     ref $action[0]
820     ? $action[0]()
821     : snd @action;
822     };
823     }
824    
825 root 1.87 =item cal $port, @msg, $callback[, $timeout]
826    
827     A simple form of RPC - sends a message to the given C<$port> with the
828     given contents (C<@msg>), but adds a reply port to the message.
829    
830     The reply port is created temporarily just for the purpose of receiving
831     the reply, and will be C<kil>ed when no longer needed.
832    
833     A reply message sent to the port is passed to the C<$callback> as-is.
834    
835     If an optional time-out (in seconds) is given and it is not C<undef>,
836     then the callback will be called without any arguments after the time-out
837     elapsed and the port is C<kil>ed.
838    
839 root 1.98 If no time-out is given (or it is C<undef>), then the local port will
840     monitor the remote port instead, so it eventually gets cleaned-up.
841 root 1.87
842     Currently this function returns the temporary port, but this "feature"
843     might go in future versions unless you can make a convincing case that
844     this is indeed useful for something.
845    
846     =cut
847    
848     sub cal(@) {
849     my $timeout = ref $_[-1] ? undef : pop;
850     my $cb = pop;
851    
852     my $port = port {
853     undef $timeout;
854     kil $SELF;
855     &$cb;
856     };
857    
858     if (defined $timeout) {
859     $timeout = AE::timer $timeout, 0, sub {
860     undef $timeout;
861     kil $port;
862     $cb->();
863     };
864     } else {
865     mon $_[0], sub {
866     kil $port;
867     $cb->();
868     };
869     }
870    
871     push @_, $port;
872     &snd;
873    
874     $port
875     }
876    
877 root 1.8 =back
878    
879 root 1.26 =head1 AnyEvent::MP vs. Distributed Erlang
880    
881 root 1.35 AnyEvent::MP got lots of its ideas from distributed Erlang (Erlang node
882     == aemp node, Erlang process == aemp port), so many of the documents and
883     programming techniques employed by Erlang apply to AnyEvent::MP. Here is a
884 root 1.27 sample:
885    
886 root 1.95 http://www.erlang.se/doc/programming_rules.shtml
887     http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4
888     http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6
889     http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5
890 root 1.27
891     Despite the similarities, there are also some important differences:
892 root 1.26
893     =over 4
894    
895 root 1.65 =item * Node IDs are arbitrary strings in AEMP.
896 root 1.26
897 root 1.65 Erlang relies on special naming and DNS to work everywhere in the same
898     way. AEMP relies on each node somehow knowing its own address(es) (e.g. by
899 root 1.99 configuration or DNS), and possibly the addresses of some seed nodes, but
900     will otherwise discover other nodes (and their IDs) itself.
901 root 1.27
902 root 1.54 =item * Erlang has a "remote ports are like local ports" philosophy, AEMP
903 root 1.51 uses "local ports are like remote ports".
904    
905     The failure modes for local ports are quite different (runtime errors
906     only) then for remote ports - when a local port dies, you I<know> it dies,
907     when a connection to another node dies, you know nothing about the other
908     port.
909    
910     Erlang pretends remote ports are as reliable as local ports, even when
911     they are not.
912    
913     AEMP encourages a "treat remote ports differently" philosophy, with local
914     ports being the special case/exception, where transport errors cannot
915     occur.
916    
917 root 1.26 =item * Erlang uses processes and a mailbox, AEMP does not queue.
918    
919 root 1.119 Erlang uses processes that selectively receive messages out of order, and
920     therefore needs a queue. AEMP is event based, queuing messages would serve
921     no useful purpose. For the same reason the pattern-matching abilities
922     of AnyEvent::MP are more limited, as there is little need to be able to
923 elmex 1.77 filter messages without dequeuing them.
924 root 1.26
925 root 1.119 This is not a philosophical difference, but simply stems from AnyEvent::MP
926     being event-based, while Erlang is process-based.
927    
928     You cna have a look at L<Coro::MP> for a more Erlang-like process model on
929     top of AEMP and Coro threads.
930 root 1.26
931     =item * Erlang sends are synchronous, AEMP sends are asynchronous.
932    
933 root 1.119 Sending messages in Erlang is synchronous and blocks the process until
934     a conenction has been established and the message sent (and so does not
935     need a queue that can overflow). AEMP sends return immediately, connection
936     establishment is handled in the background.
937 root 1.26
938 root 1.51 =item * Erlang suffers from silent message loss, AEMP does not.
939 root 1.26
940 root 1.99 Erlang implements few guarantees on messages delivery - messages can get
941     lost without any of the processes realising it (i.e. you send messages a,
942     b, and c, and the other side only receives messages a and c).
943 root 1.26
944 root 1.117 AEMP guarantees (modulo hardware errors) correct ordering, and the
945     guarantee that after one message is lost, all following ones sent to the
946     same port are lost as well, until monitoring raises an error, so there are
947     no silent "holes" in the message sequence.
948 root 1.26
949 root 1.119 If you want your software to be very reliable, you have to cope with
950     corrupted and even out-of-order messages in both Erlang and AEMP. AEMP
951     simply tries to work better in common error cases, such as when a network
952     link goes down.
953    
954 root 1.26 =item * Erlang can send messages to the wrong port, AEMP does not.
955    
956 root 1.119 In Erlang it is quite likely that a node that restarts reuses an Erlang
957     process ID known to other nodes for a completely different process,
958     causing messages destined for that process to end up in an unrelated
959     process.
960 root 1.26
961 root 1.119 AEMP does not reuse port IDs, so old messages or old port IDs floating
962 root 1.26 around in the network will not be sent to an unrelated port.
963    
964     =item * Erlang uses unprotected connections, AEMP uses secure
965     authentication and can use TLS.
966    
967 root 1.66 AEMP can use a proven protocol - TLS - to protect connections and
968 root 1.26 securely authenticate nodes.
969    
970 root 1.28 =item * The AEMP protocol is optimised for both text-based and binary
971     communications.
972    
973 root 1.66 The AEMP protocol, unlike the Erlang protocol, supports both programming
974 root 1.119 language independent text-only protocols (good for debugging), and binary,
975 root 1.67 language-specific serialisers (e.g. Storable). By default, unless TLS is
976     used, the protocol is actually completely text-based.
977 root 1.28
978     It has also been carefully designed to be implementable in other languages
979 root 1.66 with a minimum of work while gracefully degrading functionality to make the
980 root 1.28 protocol simple.
981    
982 root 1.35 =item * AEMP has more flexible monitoring options than Erlang.
983    
984 root 1.119 In Erlang, you can chose to receive I<all> exit signals as messages or
985     I<none>, there is no in-between, so monitoring single Erlang processes is
986     difficult to implement.
987    
988     Monitoring in AEMP is more flexible than in Erlang, as one can choose
989     between automatic kill, exit message or callback on a per-port basis.
990 root 1.35
991 root 1.37 =item * Erlang tries to hide remote/local connections, AEMP does not.
992 root 1.35
993 root 1.67 Monitoring in Erlang is not an indicator of process death/crashes, in the
994     same way as linking is (except linking is unreliable in Erlang).
995 root 1.37
996     In AEMP, you don't "look up" registered port names or send to named ports
997     that might or might not be persistent. Instead, you normally spawn a port
998 root 1.67 on the remote node. The init function monitors you, and you monitor the
999     remote port. Since both monitors are local to the node, they are much more
1000     reliable (no need for C<spawn_link>).
1001 root 1.37
1002     This also saves round-trips and avoids sending messages to the wrong port
1003     (hard to do in Erlang).
1004 root 1.35
1005 root 1.26 =back
1006    
1007 root 1.46 =head1 RATIONALE
1008    
1009     =over 4
1010    
1011 root 1.67 =item Why strings for port and node IDs, why not objects?
1012 root 1.46
1013     We considered "objects", but found that the actual number of methods
1014 root 1.67 that can be called are quite low. Since port and node IDs travel over
1015 root 1.46 the network frequently, the serialising/deserialising would add lots of
1016 root 1.67 overhead, as well as having to keep a proxy object everywhere.
1017 root 1.46
1018     Strings can easily be printed, easily serialised etc. and need no special
1019     procedures to be "valid".
1020    
1021 root 1.110 And as a result, a port with just a default receiver consists of a single
1022 root 1.117 code reference stored in a global hash - it can't become much cheaper.
1023 root 1.47
1024 root 1.67 =item Why favour JSON, why not a real serialising format such as Storable?
1025 root 1.46
1026     In fact, any AnyEvent::MP node will happily accept Storable as framing
1027     format, but currently there is no way to make a node use Storable by
1028 root 1.67 default (although all nodes will accept it).
1029 root 1.46
1030     The default framing protocol is JSON because a) JSON::XS is many times
1031     faster for small messages and b) most importantly, after years of
1032     experience we found that object serialisation is causing more problems
1033 root 1.67 than it solves: Just like function calls, objects simply do not travel
1034 root 1.46 easily over the network, mostly because they will always be a copy, so you
1035     always have to re-think your design.
1036    
1037     Keeping your messages simple, concentrating on data structures rather than
1038     objects, will keep your messages clean, tidy and efficient.
1039    
1040     =back
1041    
1042 root 1.1 =head1 SEE ALSO
1043    
1044 root 1.68 L<AnyEvent::MP::Intro> - a gentle introduction.
1045    
1046     L<AnyEvent::MP::Kernel> - more, lower-level, stuff.
1047    
1048 root 1.113 L<AnyEvent::MP::Global> - network maintenance and port groups, to find
1049 root 1.68 your applications.
1050    
1051 root 1.105 L<AnyEvent::MP::DataConn> - establish data connections between nodes.
1052    
1053 root 1.81 L<AnyEvent::MP::LogCatcher> - simple service to display log messages from
1054     all nodes.
1055    
1056 root 1.1 L<AnyEvent>.
1057    
1058     =head1 AUTHOR
1059    
1060     Marc Lehmann <schmorp@schmorp.de>
1061     http://home.schmorp.de/
1062    
1063     =cut
1064    
1065     1
1066