ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.123
Committed: Thu Mar 1 19:37:59 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.122: +5 -4 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.89 AnyEvent::MP - erlang-style multi-processing/message-passing framework
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.75 $NODE # contains this node's node ID
10     NODE # returns this node's node ID
11 root 1.2
12 root 1.38 $SELF # receiving/own port id in rcv callbacks
13    
14 root 1.48 # initialise the node so it can send/receive messages
15 root 1.72 configure;
16 root 1.48
17 root 1.75 # ports are message destinations
18 root 1.38
19     # sending messages
20 root 1.2 snd $port, type => data...;
21 root 1.38 snd $port, @msg;
22     snd @msg_with_first_element_being_a_port;
23 root 1.2
24 root 1.50 # creating/using ports, the simple way
25 root 1.73 my $simple_port = port { my @msg = @_ };
26 root 1.22
27 root 1.52 # creating/using ports, tagged message matching
28 root 1.38 my $port = port;
29 root 1.73 rcv $port, ping => sub { snd $_[0], "pong" };
30     rcv $port, pong => sub { warn "pong received\n" };
31 root 1.2
32 root 1.48 # create a port on another node
33     my $port = spawn $node, $initfunc, @initdata;
34    
35 root 1.116 # destroy a port again
36 root 1.101 kil $port; # "normal" kill
37     kil $port, my_error => "everything is broken"; # error kill
38    
39 root 1.35 # monitoring
40 root 1.90 mon $localport, $cb->(@msg) # callback is invoked on death
41     mon $localport, $otherport # kill otherport on abnormal death
42     mon $localport, $otherport, @msg # send message on death
43 root 1.35
44 root 1.101 # temporarily execute code in port context
45     peval $port, sub { die "kill the port!" };
46    
47     # execute callbacks in $SELF port context
48     my $timer = AE::timer 1, 0, psub {
49     die "kill the port, delayed";
50     };
51    
52 root 1.45 =head1 CURRENT STATUS
53    
54 root 1.71 bin/aemp - stable.
55     AnyEvent::MP - stable API, should work.
56 elmex 1.77 AnyEvent::MP::Intro - explains most concepts.
57 root 1.88 AnyEvent::MP::Kernel - mostly stable API.
58     AnyEvent::MP::Global - stable API.
59 root 1.45
60 root 1.1 =head1 DESCRIPTION
61    
62 root 1.2 This module (-family) implements a simple message passing framework.
63    
64     Despite its simplicity, you can securely message other processes running
65 root 1.67 on the same or other hosts, and you can supervise entities remotely.
66 root 1.2
67 root 1.23 For an introduction to this module family, see the L<AnyEvent::MP::Intro>
68 root 1.67 manual page and the examples under F<eg/>.
69 root 1.23
70 root 1.2 =head1 CONCEPTS
71    
72     =over 4
73    
74     =item port
75    
76 root 1.79 Not to be confused with a TCP port, a "port" is something you can send
77     messages to (with the C<snd> function).
78 root 1.29
79 root 1.53 Ports allow you to register C<rcv> handlers that can match all or just
80 root 1.64 some messages. Messages send to ports will not be queued, regardless of
81     anything was listening for them or not.
82 root 1.2
83 root 1.119 Ports are represented by (printable) strings called "port IDs".
84    
85 root 1.67 =item port ID - C<nodeid#portname>
86 root 1.2
87 root 1.123 A port ID is the concatenation of a node ID, a hash-mark (C<#>)
88     as separator, and a port name (a printable string of unspecified
89     format created by AnyEvent::MP).
90 root 1.2
91     =item node
92    
93 root 1.53 A node is a single process containing at least one port - the node port,
94 root 1.67 which enables nodes to manage each other remotely, and to create new
95 root 1.53 ports.
96 root 1.2
97 root 1.67 Nodes are either public (have one or more listening ports) or private
98     (no listening ports). Private nodes cannot talk to other private nodes
99 root 1.119 currently, but all nodes can talk to public nodes.
100    
101     Nodes is represented by (printable) strings called "node IDs".
102 root 1.2
103 root 1.117 =item node ID - C<[A-Za-z0-9_\-.:]*>
104 root 1.2
105 root 1.64 A node ID is a string that uniquely identifies the node within a
106     network. Depending on the configuration used, node IDs can look like a
107     hostname, a hostname and a port, or a random string. AnyEvent::MP itself
108 root 1.119 doesn't interpret node IDs in any way except to uniquely identify a node.
109 root 1.64
110     =item binds - C<ip:port>
111    
112     Nodes can only talk to each other by creating some kind of connection to
113     each other. To do this, nodes should listen on one or more local transport
114 root 1.119 endpoints - binds.
115    
116     Currently, only standard C<ip:port> specifications can be used, which
117     specify TCP ports to listen on. So a bind is basically just a tcp socket
118     in listening mode thta accepts conenctions form other nodes.
119 root 1.64
120 root 1.83 =item seed nodes
121 root 1.64
122 root 1.119 When a node starts, it knows nothing about the network it is in - it
123     needs to connect to at least one other node that is already in the
124     network. These other nodes are called "seed nodes".
125    
126     Seed nodes themselves are not special - they are seed nodes only because
127     some other node I<uses> them as such, but any node can be used as seed
128     node for other nodes, and eahc node cna use a different set of seed nodes.
129 root 1.83
130     In addition to discovering the network, seed nodes are also used to
131 root 1.119 maintain the network - all nodes using the same seed node form are part of
132     the same network. If a network is split into multiple subnets because e.g.
133     the network link between the parts goes down, then using the same seed
134     nodes for all nodes ensures that eventually the subnets get merged again.
135 root 1.83
136     Seed nodes are expected to be long-running, and at least one seed node
137 root 1.85 should always be available. They should also be relatively responsive - a
138     seed node that blocks for long periods will slow down everybody else.
139 root 1.83
140 root 1.119 For small networks, it's best if every node uses the same set of seed
141     nodes. For large networks, it can be useful to specify "regional" seed
142     nodes for most nodes in an area, and use all seed nodes as seed nodes for
143     each other. What's important is that all seed nodes connections form a
144     complete graph, so that the network cannot split into separate subnets
145     forever.
146    
147     Seed nodes are represented by seed IDs.
148    
149     =item seed IDs - C<host:port>
150 root 1.83
151 root 1.119 Seed IDs are transport endpoint(s) (usually a hostname/IP address and a
152 elmex 1.96 TCP port) of nodes that should be used as seed nodes.
153 root 1.29
154 root 1.119 =item global nodes
155    
156     An AEMP network needs a discovery service - nodes need to know how to
157     connect to other nodes they only know by name. In addition, AEMP offers a
158     distributed "group database", which maps group names to a list of strings
159     - for example, to register worker ports.
160    
161     A network needs at least one global node to work, and allows every node to
162     be a global node.
163    
164     Any node that loads the L<AnyEvent::MP::Global> module becomes a global
165     node and tries to keep connections to all other nodes. So while it can
166     make sense to make every node "global" in small networks, it usually makes
167     sense to only make seed nodes into global nodes in large networks (nodes
168     keep connections to seed nodes and global nodes, so makign them the same
169     reduces overhead).
170 root 1.67
171 root 1.2 =back
172    
173 root 1.3 =head1 VARIABLES/FUNCTIONS
174 root 1.2
175     =over 4
176    
177 root 1.1 =cut
178    
179     package AnyEvent::MP;
180    
181 root 1.121 use AnyEvent::MP::Config ();
182 root 1.44 use AnyEvent::MP::Kernel;
183 root 1.121 use AnyEvent::MP::Kernel qw(%NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID);
184 root 1.2
185 root 1.1 use common::sense;
186    
187 root 1.2 use Carp ();
188    
189 root 1.1 use AE ();
190    
191 root 1.2 use base "Exporter";
192    
193 root 1.121 our $VERSION = $AnyEvent::MP::Config::VERSION;
194 root 1.43
195 root 1.8 our @EXPORT = qw(
196 root 1.59 NODE $NODE *SELF node_of after
197 root 1.72 configure
198 root 1.101 snd rcv mon mon_guard kil psub peval spawn cal
199 root 1.22 port
200 root 1.8 );
201 root 1.2
202 root 1.22 our $SELF;
203    
204     sub _self_die() {
205     my $msg = $@;
206     $msg =~ s/\n+$// unless ref $msg;
207     kil $SELF, die => $msg;
208     }
209    
210     =item $thisnode = NODE / $NODE
211    
212 root 1.67 The C<NODE> function returns, and the C<$NODE> variable contains, the node
213 root 1.64 ID of the node running in the current process. This value is initialised by
214 root 1.72 a call to C<configure>.
215 root 1.22
216 root 1.63 =item $nodeid = node_of $port
217 root 1.22
218 root 1.67 Extracts and returns the node ID from a port ID or a node ID.
219 root 1.34
220 root 1.78 =item configure $profile, key => value...
221    
222 root 1.72 =item configure key => value...
223 root 1.34
224 root 1.64 Before a node can talk to other nodes on the network (i.e. enter
225 root 1.72 "distributed mode") it has to configure itself - the minimum a node needs
226 root 1.64 to know is its own name, and optionally it should know the addresses of
227     some other nodes in the network to discover other nodes.
228 root 1.34
229 root 1.121 This function configures a node - it must be called exactly once (or
230     never) before calling other AnyEvent::MP functions.
231    
232 root 1.108 The key/value pairs are basically the same ones as documented for the
233 root 1.121 F<aemp> command line utility (sans the set/del prefix), with two additions:
234    
235     =over 4
236    
237     =item norc => $boolean (default false)
238    
239     If true, then the rc file (e.g. F<~/.perl-anyevent-mp>) will I<not>
240     be consulted - all configuraiton options must be specified in the
241     C<configure> call.
242 root 1.108
243 root 1.121 =item force => $boolean (default false)
244    
245     IF true, then the values specified in the C<configure> will take
246     precedence over any values configured via the rc file. The default is for
247     the rc file to override any options specified in the program.
248    
249     =back
250 root 1.34
251 root 1.72 =over 4
252    
253     =item step 1, gathering configuration from profiles
254    
255     The function first looks up a profile in the aemp configuration (see the
256     L<aemp> commandline utility). The profile name can be specified via the
257 root 1.78 named C<profile> parameter or can simply be the first parameter). If it is
258     missing, then the nodename (F<uname -n>) will be used as profile name.
259 root 1.34
260 root 1.72 The profile data is then gathered as follows:
261 root 1.69
262 elmex 1.77 First, all remaining key => value pairs (all of which are conveniently
263 root 1.72 undocumented at the moment) will be interpreted as configuration
264     data. Then they will be overwritten by any values specified in the global
265     default configuration (see the F<aemp> utility), then the chain of
266     profiles chosen by the profile name (and any C<parent> attributes).
267    
268     That means that the values specified in the profile have highest priority
269     and the values specified directly via C<configure> have lowest priority,
270     and can only be used to specify defaults.
271 root 1.49
272 root 1.64 If the profile specifies a node ID, then this will become the node ID of
273 root 1.122 this process. If not, then the profile name will be used as node ID, with
274     a slash (C</>) attached.
275    
276     If the node ID (or profile name) ends with a slash (C</>), then a random
277     string is appended to make it unique.
278 root 1.64
279 root 1.72 =item step 2, bind listener sockets
280    
281 root 1.64 The next step is to look up the binds in the profile, followed by binding
282     aemp protocol listeners on all binds specified (it is possible and valid
283     to have no binds, meaning that the node cannot be contacted form the
284     outside. This means the node cannot talk to other nodes that also have no
285     binds, but it can still talk to all "normal" nodes).
286    
287 root 1.70 If the profile does not specify a binds list, then a default of C<*> is
288 root 1.72 used, meaning the node will bind on a dynamically-assigned port on every
289     local IP address it finds.
290    
291     =item step 3, connect to seed nodes
292 root 1.64
293 root 1.119 As the last step, the seed ID list from the profile is passed to the
294 root 1.64 L<AnyEvent::MP::Global> module, which will then use it to keep
295 root 1.72 connectivity with at least one node at any point in time.
296 root 1.64
297 root 1.72 =back
298    
299 root 1.87 Example: become a distributed node using the local node name as profile.
300 root 1.72 This should be the most common form of invocation for "daemon"-type nodes.
301 root 1.34
302 root 1.72 configure
303 root 1.34
304 root 1.64 Example: become an anonymous node. This form is often used for commandline
305     clients.
306 root 1.34
307 root 1.72 configure nodeid => "anon/";
308    
309 root 1.120 Example: configure a node using a profile called seed, which is suitable
310 root 1.72 for a seed node as it binds on all local addresses on a fixed port (4040,
311     customary for aemp).
312    
313     # use the aemp commandline utility
314 root 1.122 # aemp profile seed binds '*:4040'
315 root 1.72
316     # then use it
317     configure profile => "seed";
318 root 1.34
319 root 1.72 # or simply use aemp from the shell again:
320     # aemp run profile seed
321 root 1.34
322 root 1.72 # or provide a nicer-to-remember nodeid
323     # aemp run profile seed nodeid "$(hostname)"
324 root 1.34
325 root 1.22 =item $SELF
326    
327     Contains the current port id while executing C<rcv> callbacks or C<psub>
328     blocks.
329 root 1.3
330 root 1.67 =item *SELF, SELF, %SELF, @SELF...
331 root 1.22
332     Due to some quirks in how perl exports variables, it is impossible to
333 root 1.67 just export C<$SELF>, all the symbols named C<SELF> are exported by this
334 root 1.22 module, but only C<$SELF> is currently used.
335 root 1.3
336 root 1.33 =item snd $port, type => @data
337 root 1.3
338 root 1.33 =item snd $port, @msg
339 root 1.3
340 root 1.67 Send the given message to the given port, which can identify either a
341     local or a remote port, and must be a port ID.
342 root 1.8
343 root 1.67 While the message can be almost anything, it is highly recommended to
344     use a string as first element (a port ID, or some word that indicates a
345     request type etc.) and to consist if only simple perl values (scalars,
346     arrays, hashes) - if you think you need to pass an object, think again.
347    
348     The message data logically becomes read-only after a call to this
349     function: modifying any argument (or values referenced by them) is
350     forbidden, as there can be considerable time between the call to C<snd>
351     and the time the message is actually being serialised - in fact, it might
352     never be copied as within the same process it is simply handed to the
353     receiving port.
354 root 1.3
355     The type of data you can transfer depends on the transport protocol: when
356     JSON is used, then only strings, numbers and arrays and hashes consisting
357     of those are allowed (no objects). When Storable is used, then anything
358     that Storable can serialise and deserialise is allowed, and for the local
359 root 1.67 node, anything can be passed. Best rely only on the common denominator of
360     these.
361 root 1.3
362 root 1.22 =item $local_port = port
363 root 1.2
364 root 1.50 Create a new local port object and returns its port ID. Initially it has
365     no callbacks set and will throw an error when it receives messages.
366 root 1.10
367 root 1.50 =item $local_port = port { my @msg = @_ }
368 root 1.15
369 root 1.50 Creates a new local port, and returns its ID. Semantically the same as
370     creating a port and calling C<rcv $port, $callback> on it.
371 root 1.15
372 root 1.50 The block will be called for every message received on the port, with the
373     global variable C<$SELF> set to the port ID. Runtime errors will cause the
374     port to be C<kil>ed. The message will be passed as-is, no extra argument
375     (i.e. no port ID) will be passed to the callback.
376 root 1.15
377 root 1.50 If you want to stop/destroy the port, simply C<kil> it:
378 root 1.15
379 root 1.50 my $port = port {
380     my @msg = @_;
381     ...
382     kil $SELF;
383 root 1.15 };
384 root 1.10
385     =cut
386    
387 root 1.33 sub rcv($@);
388    
389 root 1.50 sub _kilme {
390     die "received message on port without callback";
391     }
392    
393 root 1.22 sub port(;&) {
394 root 1.123 my $id = $UNIQ . ++$ID;
395 root 1.22 my $port = "$NODE#$id";
396    
397 root 1.50 rcv $port, shift || \&_kilme;
398 root 1.10
399 root 1.22 $port
400 root 1.10 }
401    
402 root 1.50 =item rcv $local_port, $callback->(@msg)
403 root 1.31
404 root 1.50 Replaces the default callback on the specified port. There is no way to
405     remove the default callback: use C<sub { }> to disable it, or better
406     C<kil> the port when it is no longer needed.
407 root 1.3
408 root 1.33 The global C<$SELF> (exported by this module) contains C<$port> while
409 root 1.50 executing the callback. Runtime errors during callback execution will
410     result in the port being C<kil>ed.
411 root 1.22
412 root 1.50 The default callback received all messages not matched by a more specific
413     C<tag> match.
414 root 1.22
415 root 1.50 =item rcv $local_port, tag => $callback->(@msg_without_tag), ...
416 root 1.3
417 root 1.54 Register (or replace) callbacks to be called on messages starting with the
418     given tag on the given port (and return the port), or unregister it (when
419     C<$callback> is C<$undef> or missing). There can only be one callback
420     registered for each tag.
421 root 1.3
422 root 1.50 The original message will be passed to the callback, after the first
423     element (the tag) has been removed. The callback will use the same
424     environment as the default callback (see above).
425 root 1.3
426 root 1.36 Example: create a port and bind receivers on it in one go.
427    
428     my $port = rcv port,
429 root 1.50 msg1 => sub { ... },
430     msg2 => sub { ... },
431 root 1.36 ;
432    
433     Example: create a port, bind receivers and send it in a message elsewhere
434     in one go:
435    
436     snd $otherport, reply =>
437     rcv port,
438 root 1.50 msg1 => sub { ... },
439 root 1.36 ...
440     ;
441    
442 root 1.54 Example: temporarily register a rcv callback for a tag matching some port
443 root 1.102 (e.g. for an rpc reply) and unregister it after a message was received.
444 root 1.54
445     rcv $port, $otherport => sub {
446     my @reply = @_;
447    
448     rcv $SELF, $otherport;
449     };
450    
451 root 1.3 =cut
452    
453     sub rcv($@) {
454 root 1.33 my $port = shift;
455 root 1.75 my ($nodeid, $portid) = split /#/, $port, 2;
456 root 1.3
457 root 1.75 $NODE{$nodeid} == $NODE{""}
458 root 1.33 or Carp::croak "$port: rcv can only be called on local ports, caught";
459 root 1.22
460 root 1.50 while (@_) {
461     if (ref $_[0]) {
462     if (my $self = $PORT_DATA{$portid}) {
463     "AnyEvent::MP::Port" eq ref $self
464     or Carp::croak "$port: rcv can only be called on message matching ports, caught";
465 root 1.33
466 root 1.103 $self->[0] = shift;
467 root 1.50 } else {
468     my $cb = shift;
469     $PORT{$portid} = sub {
470     local $SELF = $port;
471     eval { &$cb }; _self_die if $@;
472     };
473     }
474     } elsif (defined $_[0]) {
475     my $self = $PORT_DATA{$portid} ||= do {
476 root 1.103 my $self = bless [$PORT{$portid} || sub { }, { }, $port], "AnyEvent::MP::Port";
477 root 1.50
478     $PORT{$portid} = sub {
479     local $SELF = $port;
480    
481     if (my $cb = $self->[1]{$_[0]}) {
482     shift;
483     eval { &$cb }; _self_die if $@;
484     } else {
485     &{ $self->[0] };
486 root 1.33 }
487     };
488 root 1.50
489     $self
490 root 1.33 };
491    
492 root 1.50 "AnyEvent::MP::Port" eq ref $self
493     or Carp::croak "$port: rcv can only be called on message matching ports, caught";
494 root 1.22
495 root 1.50 my ($tag, $cb) = splice @_, 0, 2;
496 root 1.33
497 root 1.50 if (defined $cb) {
498     $self->[1]{$tag} = $cb;
499 root 1.33 } else {
500 root 1.50 delete $self->[1]{$tag};
501 root 1.33 }
502 root 1.22 }
503 root 1.3 }
504 root 1.31
505 root 1.33 $port
506 root 1.2 }
507    
508 root 1.101 =item peval $port, $coderef[, @args]
509    
510     Evaluates the given C<$codref> within the contetx of C<$port>, that is,
511     when the code throews an exception the C<$port> will be killed.
512    
513     Any remaining args will be passed to the callback. Any return values will
514     be returned to the caller.
515    
516     This is useful when you temporarily want to execute code in the context of
517     a port.
518    
519     Example: create a port and run some initialisation code in it's context.
520    
521     my $port = port { ... };
522    
523     peval $port, sub {
524     init
525     or die "unable to init";
526     };
527    
528     =cut
529    
530     sub peval($$) {
531     local $SELF = shift;
532     my $cb = shift;
533    
534     if (wantarray) {
535     my @res = eval { &$cb };
536     _self_die if $@;
537     @res
538     } else {
539     my $res = eval { &$cb };
540     _self_die if $@;
541     $res
542     }
543     }
544    
545 root 1.22 =item $closure = psub { BLOCK }
546 root 1.2
547 root 1.22 Remembers C<$SELF> and creates a closure out of the BLOCK. When the
548     closure is executed, sets up the environment in the same way as in C<rcv>
549     callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
550    
551 root 1.101 The effect is basically as if it returned C<< sub { peval $SELF, sub {
552 root 1.114 BLOCK }, @_ } >>.
553 root 1.101
554 root 1.22 This is useful when you register callbacks from C<rcv> callbacks:
555    
556     rcv delayed_reply => sub {
557     my ($delay, @reply) = @_;
558     my $timer = AE::timer $delay, 0, psub {
559     snd @reply, $SELF;
560     };
561     };
562 root 1.3
563 root 1.8 =cut
564 root 1.3
565 root 1.22 sub psub(&) {
566     my $cb = shift;
567 root 1.3
568 root 1.22 my $port = $SELF
569     or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
570 root 1.1
571 root 1.22 sub {
572     local $SELF = $port;
573 root 1.2
574 root 1.22 if (wantarray) {
575     my @res = eval { &$cb };
576     _self_die if $@;
577     @res
578     } else {
579     my $res = eval { &$cb };
580     _self_die if $@;
581     $res
582     }
583     }
584 root 1.2 }
585    
586 root 1.67 =item $guard = mon $port, $cb->(@reason) # call $cb when $port dies
587 root 1.32
588 root 1.67 =item $guard = mon $port, $rcvport # kill $rcvport when $port dies
589 root 1.36
590 root 1.67 =item $guard = mon $port # kill $SELF when $port dies
591 root 1.32
592 root 1.67 =item $guard = mon $port, $rcvport, @msg # send a message when $port dies
593 root 1.32
594 root 1.42 Monitor the given port and do something when the port is killed or
595     messages to it were lost, and optionally return a guard that can be used
596     to stop monitoring again.
597    
598 root 1.36 In the first form (callback), the callback is simply called with any
599     number of C<@reason> elements (no @reason means that the port was deleted
600 root 1.32 "normally"). Note also that I<< the callback B<must> never die >>, so use
601     C<eval> if unsure.
602    
603 root 1.43 In the second form (another port given), the other port (C<$rcvport>)
604 elmex 1.77 will be C<kil>'ed with C<@reason>, if a @reason was specified, i.e. on
605 root 1.36 "normal" kils nothing happens, while under all other conditions, the other
606     port is killed with the same reason.
607 root 1.32
608 root 1.36 The third form (kill self) is the same as the second form, except that
609     C<$rvport> defaults to C<$SELF>.
610    
611     In the last form (message), a message of the form C<@msg, @reason> will be
612     C<snd>.
613 root 1.32
614 root 1.79 Monitoring-actions are one-shot: once messages are lost (and a monitoring
615     alert was raised), they are removed and will not trigger again.
616    
617 root 1.37 As a rule of thumb, monitoring requests should always monitor a port from
618     a local port (or callback). The reason is that kill messages might get
619     lost, just like any other message. Another less obvious reason is that
620 elmex 1.77 even monitoring requests can get lost (for example, when the connection
621 root 1.37 to the other node goes down permanently). When monitoring a port locally
622     these problems do not exist.
623    
624 root 1.79 C<mon> effectively guarantees that, in the absence of hardware failures,
625     after starting the monitor, either all messages sent to the port will
626     arrive, or the monitoring action will be invoked after possible message
627     loss has been detected. No messages will be lost "in between" (after
628     the first lost message no further messages will be received by the
629     port). After the monitoring action was invoked, further messages might get
630     delivered again.
631    
632     Inter-host-connection timeouts and monitoring depend on the transport
633     used. The only transport currently implemented is TCP, and AnyEvent::MP
634     relies on TCP to detect node-downs (this can take 10-15 minutes on a
635 elmex 1.96 non-idle connection, and usually around two hours for idle connections).
636 root 1.79
637     This means that monitoring is good for program errors and cleaning up
638     stuff eventually, but they are no replacement for a timeout when you need
639     to ensure some maximum latency.
640    
641 root 1.32 Example: call a given callback when C<$port> is killed.
642    
643     mon $port, sub { warn "port died because of <@_>\n" };
644    
645     Example: kill ourselves when C<$port> is killed abnormally.
646    
647 root 1.36 mon $port;
648 root 1.32
649 root 1.36 Example: send us a restart message when another C<$port> is killed.
650 root 1.32
651     mon $port, $self => "restart";
652    
653     =cut
654    
655     sub mon {
656 root 1.75 my ($nodeid, $port) = split /#/, shift, 2;
657 root 1.32
658 root 1.75 my $node = $NODE{$nodeid} || add_node $nodeid;
659 root 1.32
660 root 1.41 my $cb = @_ ? shift : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,';
661 root 1.32
662     unless (ref $cb) {
663     if (@_) {
664     # send a kill info message
665 root 1.41 my (@msg) = ($cb, @_);
666 root 1.32 $cb = sub { snd @msg, @_ };
667     } else {
668     # simply kill other port
669     my $port = $cb;
670     $cb = sub { kil $port, @_ if @_ };
671     }
672     }
673    
674     $node->monitor ($port, $cb);
675    
676     defined wantarray
677 root 1.94 and ($cb += 0, AnyEvent::Util::guard { $node->unmonitor ($port, $cb) })
678 root 1.32 }
679    
680     =item $guard = mon_guard $port, $ref, $ref...
681    
682     Monitors the given C<$port> and keeps the passed references. When the port
683     is killed, the references will be freed.
684    
685     Optionally returns a guard that will stop the monitoring.
686    
687     This function is useful when you create e.g. timers or other watchers and
688 root 1.67 want to free them when the port gets killed (note the use of C<psub>):
689 root 1.32
690     $port->rcv (start => sub {
691 root 1.67 my $timer; $timer = mon_guard $port, AE::timer 1, 1, psub {
692 root 1.32 undef $timer if 0.9 < rand;
693     });
694     });
695    
696     =cut
697    
698     sub mon_guard {
699     my ($port, @refs) = @_;
700    
701 root 1.36 #TODO: mon-less form?
702    
703 root 1.32 mon $port, sub { 0 && @refs }
704     }
705    
706 root 1.33 =item kil $port[, @reason]
707 root 1.32
708     Kill the specified port with the given C<@reason>.
709    
710 root 1.107 If no C<@reason> is specified, then the port is killed "normally" -
711     monitor callback will be invoked, but the kil will not cause linked ports
712     (C<mon $mport, $lport> form) to get killed.
713 root 1.32
714 root 1.107 If a C<@reason> is specified, then linked ports (C<mon $mport, $lport>
715     form) get killed with the same reason.
716 root 1.32
717     Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
718     will be reported as reason C<< die => $@ >>.
719    
720     Transport/communication errors are reported as C<< transport_error =>
721     $message >>.
722    
723 root 1.38 =cut
724    
725     =item $port = spawn $node, $initfunc[, @initdata]
726    
727     Creates a port on the node C<$node> (which can also be a port ID, in which
728     case it's the node where that port resides).
729    
730 root 1.67 The port ID of the newly created port is returned immediately, and it is
731     possible to immediately start sending messages or to monitor the port.
732 root 1.38
733 root 1.67 After the port has been created, the init function is called on the remote
734     node, in the same context as a C<rcv> callback. This function must be a
735     fully-qualified function name (e.g. C<MyApp::Chat::Server::init>). To
736     specify a function in the main program, use C<::name>.
737 root 1.38
738     If the function doesn't exist, then the node tries to C<require>
739     the package, then the package above the package and so on (e.g.
740     C<MyApp::Chat::Server>, C<MyApp::Chat>, C<MyApp>) until the function
741     exists or it runs out of package names.
742    
743     The init function is then called with the newly-created port as context
744 root 1.82 object (C<$SELF>) and the C<@initdata> values as arguments. It I<must>
745     call one of the C<rcv> functions to set callbacks on C<$SELF>, otherwise
746     the port might not get created.
747 root 1.38
748 root 1.67 A common idiom is to pass a local port, immediately monitor the spawned
749     port, and in the remote init function, immediately monitor the passed
750     local port. This two-way monitoring ensures that both ports get cleaned up
751     when there is a problem.
752 root 1.38
753 root 1.80 C<spawn> guarantees that the C<$initfunc> has no visible effects on the
754     caller before C<spawn> returns (by delaying invocation when spawn is
755     called for the local node).
756    
757 root 1.38 Example: spawn a chat server port on C<$othernode>.
758    
759     # this node, executed from within a port context:
760     my $server = spawn $othernode, "MyApp::Chat::Server::connect", $SELF;
761     mon $server;
762    
763     # init function on C<$othernode>
764     sub connect {
765     my ($srcport) = @_;
766    
767     mon $srcport;
768    
769     rcv $SELF, sub {
770     ...
771     };
772     }
773    
774     =cut
775    
776     sub _spawn {
777     my $port = shift;
778     my $init = shift;
779    
780 root 1.82 # rcv will create the actual port
781 root 1.38 local $SELF = "$NODE#$port";
782     eval {
783     &{ load_func $init }
784     };
785     _self_die if $@;
786     }
787    
788     sub spawn(@) {
789 root 1.75 my ($nodeid, undef) = split /#/, shift, 2;
790 root 1.38
791 root 1.123 my $id = $RUNIQ . ++$ID;
792 root 1.38
793 root 1.39 $_[0] =~ /::/
794     or Carp::croak "spawn init function must be a fully-qualified name, caught";
795    
796 root 1.75 snd_to_func $nodeid, "AnyEvent::MP::_spawn" => $id, @_;
797 root 1.38
798 root 1.75 "$nodeid#$id"
799 root 1.38 }
800    
801 root 1.121
802 root 1.59 =item after $timeout, @msg
803    
804     =item after $timeout, $callback
805    
806     Either sends the given message, or call the given callback, after the
807     specified number of seconds.
808    
809 root 1.67 This is simply a utility function that comes in handy at times - the
810     AnyEvent::MP author is not convinced of the wisdom of having it, though,
811     so it may go away in the future.
812 root 1.59
813     =cut
814    
815     sub after($@) {
816     my ($timeout, @action) = @_;
817    
818     my $t; $t = AE::timer $timeout, 0, sub {
819     undef $t;
820     ref $action[0]
821     ? $action[0]()
822     : snd @action;
823     };
824     }
825    
826 root 1.87 =item cal $port, @msg, $callback[, $timeout]
827    
828     A simple form of RPC - sends a message to the given C<$port> with the
829     given contents (C<@msg>), but adds a reply port to the message.
830    
831     The reply port is created temporarily just for the purpose of receiving
832     the reply, and will be C<kil>ed when no longer needed.
833    
834     A reply message sent to the port is passed to the C<$callback> as-is.
835    
836     If an optional time-out (in seconds) is given and it is not C<undef>,
837     then the callback will be called without any arguments after the time-out
838     elapsed and the port is C<kil>ed.
839    
840 root 1.98 If no time-out is given (or it is C<undef>), then the local port will
841     monitor the remote port instead, so it eventually gets cleaned-up.
842 root 1.87
843     Currently this function returns the temporary port, but this "feature"
844     might go in future versions unless you can make a convincing case that
845     this is indeed useful for something.
846    
847     =cut
848    
849     sub cal(@) {
850     my $timeout = ref $_[-1] ? undef : pop;
851     my $cb = pop;
852    
853     my $port = port {
854     undef $timeout;
855     kil $SELF;
856     &$cb;
857     };
858    
859     if (defined $timeout) {
860     $timeout = AE::timer $timeout, 0, sub {
861     undef $timeout;
862     kil $port;
863     $cb->();
864     };
865     } else {
866     mon $_[0], sub {
867     kil $port;
868     $cb->();
869     };
870     }
871    
872     push @_, $port;
873     &snd;
874    
875     $port
876     }
877    
878 root 1.8 =back
879    
880 root 1.26 =head1 AnyEvent::MP vs. Distributed Erlang
881    
882 root 1.35 AnyEvent::MP got lots of its ideas from distributed Erlang (Erlang node
883     == aemp node, Erlang process == aemp port), so many of the documents and
884     programming techniques employed by Erlang apply to AnyEvent::MP. Here is a
885 root 1.27 sample:
886    
887 root 1.95 http://www.erlang.se/doc/programming_rules.shtml
888     http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4
889     http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6
890     http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5
891 root 1.27
892     Despite the similarities, there are also some important differences:
893 root 1.26
894     =over 4
895    
896 root 1.65 =item * Node IDs are arbitrary strings in AEMP.
897 root 1.26
898 root 1.65 Erlang relies on special naming and DNS to work everywhere in the same
899     way. AEMP relies on each node somehow knowing its own address(es) (e.g. by
900 root 1.99 configuration or DNS), and possibly the addresses of some seed nodes, but
901     will otherwise discover other nodes (and their IDs) itself.
902 root 1.27
903 root 1.54 =item * Erlang has a "remote ports are like local ports" philosophy, AEMP
904 root 1.51 uses "local ports are like remote ports".
905    
906     The failure modes for local ports are quite different (runtime errors
907     only) then for remote ports - when a local port dies, you I<know> it dies,
908     when a connection to another node dies, you know nothing about the other
909     port.
910    
911     Erlang pretends remote ports are as reliable as local ports, even when
912     they are not.
913    
914     AEMP encourages a "treat remote ports differently" philosophy, with local
915     ports being the special case/exception, where transport errors cannot
916     occur.
917    
918 root 1.26 =item * Erlang uses processes and a mailbox, AEMP does not queue.
919    
920 root 1.119 Erlang uses processes that selectively receive messages out of order, and
921     therefore needs a queue. AEMP is event based, queuing messages would serve
922     no useful purpose. For the same reason the pattern-matching abilities
923     of AnyEvent::MP are more limited, as there is little need to be able to
924 elmex 1.77 filter messages without dequeuing them.
925 root 1.26
926 root 1.119 This is not a philosophical difference, but simply stems from AnyEvent::MP
927     being event-based, while Erlang is process-based.
928    
929     You cna have a look at L<Coro::MP> for a more Erlang-like process model on
930     top of AEMP and Coro threads.
931 root 1.26
932     =item * Erlang sends are synchronous, AEMP sends are asynchronous.
933    
934 root 1.119 Sending messages in Erlang is synchronous and blocks the process until
935     a conenction has been established and the message sent (and so does not
936     need a queue that can overflow). AEMP sends return immediately, connection
937     establishment is handled in the background.
938 root 1.26
939 root 1.51 =item * Erlang suffers from silent message loss, AEMP does not.
940 root 1.26
941 root 1.99 Erlang implements few guarantees on messages delivery - messages can get
942     lost without any of the processes realising it (i.e. you send messages a,
943     b, and c, and the other side only receives messages a and c).
944 root 1.26
945 root 1.117 AEMP guarantees (modulo hardware errors) correct ordering, and the
946     guarantee that after one message is lost, all following ones sent to the
947     same port are lost as well, until monitoring raises an error, so there are
948     no silent "holes" in the message sequence.
949 root 1.26
950 root 1.119 If you want your software to be very reliable, you have to cope with
951     corrupted and even out-of-order messages in both Erlang and AEMP. AEMP
952     simply tries to work better in common error cases, such as when a network
953     link goes down.
954    
955 root 1.26 =item * Erlang can send messages to the wrong port, AEMP does not.
956    
957 root 1.119 In Erlang it is quite likely that a node that restarts reuses an Erlang
958     process ID known to other nodes for a completely different process,
959     causing messages destined for that process to end up in an unrelated
960     process.
961 root 1.26
962 root 1.119 AEMP does not reuse port IDs, so old messages or old port IDs floating
963 root 1.26 around in the network will not be sent to an unrelated port.
964    
965     =item * Erlang uses unprotected connections, AEMP uses secure
966     authentication and can use TLS.
967    
968 root 1.66 AEMP can use a proven protocol - TLS - to protect connections and
969 root 1.26 securely authenticate nodes.
970    
971 root 1.28 =item * The AEMP protocol is optimised for both text-based and binary
972     communications.
973    
974 root 1.66 The AEMP protocol, unlike the Erlang protocol, supports both programming
975 root 1.119 language independent text-only protocols (good for debugging), and binary,
976 root 1.67 language-specific serialisers (e.g. Storable). By default, unless TLS is
977     used, the protocol is actually completely text-based.
978 root 1.28
979     It has also been carefully designed to be implementable in other languages
980 root 1.66 with a minimum of work while gracefully degrading functionality to make the
981 root 1.28 protocol simple.
982    
983 root 1.35 =item * AEMP has more flexible monitoring options than Erlang.
984    
985 root 1.119 In Erlang, you can chose to receive I<all> exit signals as messages or
986     I<none>, there is no in-between, so monitoring single Erlang processes is
987     difficult to implement.
988    
989     Monitoring in AEMP is more flexible than in Erlang, as one can choose
990     between automatic kill, exit message or callback on a per-port basis.
991 root 1.35
992 root 1.37 =item * Erlang tries to hide remote/local connections, AEMP does not.
993 root 1.35
994 root 1.67 Monitoring in Erlang is not an indicator of process death/crashes, in the
995     same way as linking is (except linking is unreliable in Erlang).
996 root 1.37
997     In AEMP, you don't "look up" registered port names or send to named ports
998     that might or might not be persistent. Instead, you normally spawn a port
999 root 1.67 on the remote node. The init function monitors you, and you monitor the
1000     remote port. Since both monitors are local to the node, they are much more
1001     reliable (no need for C<spawn_link>).
1002 root 1.37
1003     This also saves round-trips and avoids sending messages to the wrong port
1004     (hard to do in Erlang).
1005 root 1.35
1006 root 1.26 =back
1007    
1008 root 1.46 =head1 RATIONALE
1009    
1010     =over 4
1011    
1012 root 1.67 =item Why strings for port and node IDs, why not objects?
1013 root 1.46
1014     We considered "objects", but found that the actual number of methods
1015 root 1.67 that can be called are quite low. Since port and node IDs travel over
1016 root 1.46 the network frequently, the serialising/deserialising would add lots of
1017 root 1.67 overhead, as well as having to keep a proxy object everywhere.
1018 root 1.46
1019     Strings can easily be printed, easily serialised etc. and need no special
1020     procedures to be "valid".
1021    
1022 root 1.110 And as a result, a port with just a default receiver consists of a single
1023 root 1.117 code reference stored in a global hash - it can't become much cheaper.
1024 root 1.47
1025 root 1.67 =item Why favour JSON, why not a real serialising format such as Storable?
1026 root 1.46
1027     In fact, any AnyEvent::MP node will happily accept Storable as framing
1028     format, but currently there is no way to make a node use Storable by
1029 root 1.67 default (although all nodes will accept it).
1030 root 1.46
1031     The default framing protocol is JSON because a) JSON::XS is many times
1032     faster for small messages and b) most importantly, after years of
1033     experience we found that object serialisation is causing more problems
1034 root 1.67 than it solves: Just like function calls, objects simply do not travel
1035 root 1.46 easily over the network, mostly because they will always be a copy, so you
1036     always have to re-think your design.
1037    
1038     Keeping your messages simple, concentrating on data structures rather than
1039     objects, will keep your messages clean, tidy and efficient.
1040    
1041     =back
1042    
1043 root 1.1 =head1 SEE ALSO
1044    
1045 root 1.68 L<AnyEvent::MP::Intro> - a gentle introduction.
1046    
1047     L<AnyEvent::MP::Kernel> - more, lower-level, stuff.
1048    
1049 root 1.113 L<AnyEvent::MP::Global> - network maintenance and port groups, to find
1050 root 1.68 your applications.
1051    
1052 root 1.105 L<AnyEvent::MP::DataConn> - establish data connections between nodes.
1053    
1054 root 1.81 L<AnyEvent::MP::LogCatcher> - simple service to display log messages from
1055     all nodes.
1056    
1057 root 1.1 L<AnyEvent>.
1058    
1059     =head1 AUTHOR
1060    
1061     Marc Lehmann <schmorp@schmorp.de>
1062     http://home.schmorp.de/
1063    
1064     =cut
1065    
1066     1
1067