ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.126
Committed: Sat Mar 3 19:43:41 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.125: +31 -13 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.89 AnyEvent::MP - erlang-style multi-processing/message-passing framework
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.75 $NODE # contains this node's node ID
10     NODE # returns this node's node ID
11 root 1.2
12 root 1.38 $SELF # receiving/own port id in rcv callbacks
13    
14 root 1.48 # initialise the node so it can send/receive messages
15 root 1.72 configure;
16 root 1.48
17 root 1.75 # ports are message destinations
18 root 1.38
19     # sending messages
20 root 1.2 snd $port, type => data...;
21 root 1.38 snd $port, @msg;
22     snd @msg_with_first_element_being_a_port;
23 root 1.2
24 root 1.50 # creating/using ports, the simple way
25 root 1.73 my $simple_port = port { my @msg = @_ };
26 root 1.22
27 root 1.52 # creating/using ports, tagged message matching
28 root 1.38 my $port = port;
29 root 1.73 rcv $port, ping => sub { snd $_[0], "pong" };
30     rcv $port, pong => sub { warn "pong received\n" };
31 root 1.2
32 root 1.48 # create a port on another node
33     my $port = spawn $node, $initfunc, @initdata;
34    
35 root 1.116 # destroy a port again
36 root 1.101 kil $port; # "normal" kill
37     kil $port, my_error => "everything is broken"; # error kill
38    
39 root 1.35 # monitoring
40 root 1.90 mon $localport, $cb->(@msg) # callback is invoked on death
41     mon $localport, $otherport # kill otherport on abnormal death
42     mon $localport, $otherport, @msg # send message on death
43 root 1.35
44 root 1.101 # temporarily execute code in port context
45     peval $port, sub { die "kill the port!" };
46    
47     # execute callbacks in $SELF port context
48     my $timer = AE::timer 1, 0, psub {
49     die "kill the port, delayed";
50     };
51    
52 root 1.45 =head1 CURRENT STATUS
53    
54 root 1.71 bin/aemp - stable.
55     AnyEvent::MP - stable API, should work.
56 elmex 1.77 AnyEvent::MP::Intro - explains most concepts.
57 root 1.88 AnyEvent::MP::Kernel - mostly stable API.
58     AnyEvent::MP::Global - stable API.
59 root 1.45
60 root 1.1 =head1 DESCRIPTION
61    
62 root 1.2 This module (-family) implements a simple message passing framework.
63    
64     Despite its simplicity, you can securely message other processes running
65 root 1.67 on the same or other hosts, and you can supervise entities remotely.
66 root 1.2
67 root 1.23 For an introduction to this module family, see the L<AnyEvent::MP::Intro>
68 root 1.67 manual page and the examples under F<eg/>.
69 root 1.23
70 root 1.2 =head1 CONCEPTS
71    
72     =over 4
73    
74     =item port
75    
76 root 1.79 Not to be confused with a TCP port, a "port" is something you can send
77     messages to (with the C<snd> function).
78 root 1.29
79 root 1.53 Ports allow you to register C<rcv> handlers that can match all or just
80 root 1.64 some messages. Messages send to ports will not be queued, regardless of
81     anything was listening for them or not.
82 root 1.2
83 root 1.119 Ports are represented by (printable) strings called "port IDs".
84    
85 root 1.67 =item port ID - C<nodeid#portname>
86 root 1.2
87 root 1.123 A port ID is the concatenation of a node ID, a hash-mark (C<#>)
88     as separator, and a port name (a printable string of unspecified
89     format created by AnyEvent::MP).
90 root 1.2
91     =item node
92    
93 root 1.53 A node is a single process containing at least one port - the node port,
94 root 1.67 which enables nodes to manage each other remotely, and to create new
95 root 1.53 ports.
96 root 1.2
97 root 1.67 Nodes are either public (have one or more listening ports) or private
98     (no listening ports). Private nodes cannot talk to other private nodes
99 root 1.119 currently, but all nodes can talk to public nodes.
100    
101     Nodes is represented by (printable) strings called "node IDs".
102 root 1.2
103 root 1.117 =item node ID - C<[A-Za-z0-9_\-.:]*>
104 root 1.2
105 root 1.64 A node ID is a string that uniquely identifies the node within a
106     network. Depending on the configuration used, node IDs can look like a
107     hostname, a hostname and a port, or a random string. AnyEvent::MP itself
108 root 1.119 doesn't interpret node IDs in any way except to uniquely identify a node.
109 root 1.64
110     =item binds - C<ip:port>
111    
112     Nodes can only talk to each other by creating some kind of connection to
113     each other. To do this, nodes should listen on one or more local transport
114 root 1.119 endpoints - binds.
115    
116     Currently, only standard C<ip:port> specifications can be used, which
117     specify TCP ports to listen on. So a bind is basically just a tcp socket
118     in listening mode thta accepts conenctions form other nodes.
119 root 1.64
120 root 1.83 =item seed nodes
121 root 1.64
122 root 1.119 When a node starts, it knows nothing about the network it is in - it
123     needs to connect to at least one other node that is already in the
124     network. These other nodes are called "seed nodes".
125    
126     Seed nodes themselves are not special - they are seed nodes only because
127     some other node I<uses> them as such, but any node can be used as seed
128     node for other nodes, and eahc node cna use a different set of seed nodes.
129 root 1.83
130     In addition to discovering the network, seed nodes are also used to
131 root 1.119 maintain the network - all nodes using the same seed node form are part of
132     the same network. If a network is split into multiple subnets because e.g.
133     the network link between the parts goes down, then using the same seed
134     nodes for all nodes ensures that eventually the subnets get merged again.
135 root 1.83
136     Seed nodes are expected to be long-running, and at least one seed node
137 root 1.85 should always be available. They should also be relatively responsive - a
138     seed node that blocks for long periods will slow down everybody else.
139 root 1.83
140 root 1.119 For small networks, it's best if every node uses the same set of seed
141     nodes. For large networks, it can be useful to specify "regional" seed
142     nodes for most nodes in an area, and use all seed nodes as seed nodes for
143     each other. What's important is that all seed nodes connections form a
144     complete graph, so that the network cannot split into separate subnets
145     forever.
146    
147     Seed nodes are represented by seed IDs.
148    
149     =item seed IDs - C<host:port>
150 root 1.83
151 root 1.119 Seed IDs are transport endpoint(s) (usually a hostname/IP address and a
152 elmex 1.96 TCP port) of nodes that should be used as seed nodes.
153 root 1.29
154 root 1.119 =item global nodes
155    
156     An AEMP network needs a discovery service - nodes need to know how to
157     connect to other nodes they only know by name. In addition, AEMP offers a
158     distributed "group database", which maps group names to a list of strings
159     - for example, to register worker ports.
160    
161     A network needs at least one global node to work, and allows every node to
162     be a global node.
163    
164     Any node that loads the L<AnyEvent::MP::Global> module becomes a global
165     node and tries to keep connections to all other nodes. So while it can
166     make sense to make every node "global" in small networks, it usually makes
167     sense to only make seed nodes into global nodes in large networks (nodes
168     keep connections to seed nodes and global nodes, so makign them the same
169     reduces overhead).
170 root 1.67
171 root 1.2 =back
172    
173 root 1.3 =head1 VARIABLES/FUNCTIONS
174 root 1.2
175     =over 4
176    
177 root 1.1 =cut
178    
179     package AnyEvent::MP;
180    
181 root 1.121 use AnyEvent::MP::Config ();
182 root 1.44 use AnyEvent::MP::Kernel;
183 root 1.121 use AnyEvent::MP::Kernel qw(%NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID);
184 root 1.2
185 root 1.1 use common::sense;
186    
187 root 1.2 use Carp ();
188    
189 root 1.1 use AE ();
190 root 1.124 use Guard ();
191 root 1.1
192 root 1.2 use base "Exporter";
193    
194 root 1.121 our $VERSION = $AnyEvent::MP::Config::VERSION;
195 root 1.43
196 root 1.8 our @EXPORT = qw(
197 root 1.59 NODE $NODE *SELF node_of after
198 root 1.72 configure
199 root 1.101 snd rcv mon mon_guard kil psub peval spawn cal
200 root 1.22 port
201 root 1.124 db_set db_del db_reg
202 root 1.8 );
203 root 1.2
204 root 1.22 our $SELF;
205    
206     sub _self_die() {
207     my $msg = $@;
208     $msg =~ s/\n+$// unless ref $msg;
209     kil $SELF, die => $msg;
210     }
211    
212     =item $thisnode = NODE / $NODE
213    
214 root 1.67 The C<NODE> function returns, and the C<$NODE> variable contains, the node
215 root 1.64 ID of the node running in the current process. This value is initialised by
216 root 1.72 a call to C<configure>.
217 root 1.22
218 root 1.63 =item $nodeid = node_of $port
219 root 1.22
220 root 1.67 Extracts and returns the node ID from a port ID or a node ID.
221 root 1.34
222 root 1.78 =item configure $profile, key => value...
223    
224 root 1.72 =item configure key => value...
225 root 1.34
226 root 1.64 Before a node can talk to other nodes on the network (i.e. enter
227 root 1.72 "distributed mode") it has to configure itself - the minimum a node needs
228 root 1.64 to know is its own name, and optionally it should know the addresses of
229     some other nodes in the network to discover other nodes.
230 root 1.34
231 root 1.121 This function configures a node - it must be called exactly once (or
232     never) before calling other AnyEvent::MP functions.
233    
234 root 1.108 The key/value pairs are basically the same ones as documented for the
235 root 1.121 F<aemp> command line utility (sans the set/del prefix), with two additions:
236    
237     =over 4
238    
239     =item norc => $boolean (default false)
240    
241     If true, then the rc file (e.g. F<~/.perl-anyevent-mp>) will I<not>
242     be consulted - all configuraiton options must be specified in the
243     C<configure> call.
244 root 1.108
245 root 1.121 =item force => $boolean (default false)
246    
247     IF true, then the values specified in the C<configure> will take
248     precedence over any values configured via the rc file. The default is for
249     the rc file to override any options specified in the program.
250    
251     =back
252 root 1.34
253 root 1.72 =over 4
254    
255     =item step 1, gathering configuration from profiles
256    
257     The function first looks up a profile in the aemp configuration (see the
258     L<aemp> commandline utility). The profile name can be specified via the
259 root 1.78 named C<profile> parameter or can simply be the first parameter). If it is
260     missing, then the nodename (F<uname -n>) will be used as profile name.
261 root 1.34
262 root 1.72 The profile data is then gathered as follows:
263 root 1.69
264 elmex 1.77 First, all remaining key => value pairs (all of which are conveniently
265 root 1.72 undocumented at the moment) will be interpreted as configuration
266     data. Then they will be overwritten by any values specified in the global
267     default configuration (see the F<aemp> utility), then the chain of
268     profiles chosen by the profile name (and any C<parent> attributes).
269    
270     That means that the values specified in the profile have highest priority
271     and the values specified directly via C<configure> have lowest priority,
272     and can only be used to specify defaults.
273 root 1.49
274 root 1.64 If the profile specifies a node ID, then this will become the node ID of
275 root 1.122 this process. If not, then the profile name will be used as node ID, with
276 root 1.126 a unique randoms tring (C</%u>) appended.
277 root 1.122
278 root 1.126 The node ID can contain some C<%> sequences that are expanded: C<%n>
279     is expanded to the local nodename, C<%u> is replaced by a random
280     strign to make the node unique. For example, the F<aemp> commandline
281     utility uses C<aemp/%n/%u> as nodename, which might expand to
282     C<aemp/cerebro/ZQDGSIkRhEZQDGSIkRhE>.
283 root 1.64
284 root 1.72 =item step 2, bind listener sockets
285    
286 root 1.64 The next step is to look up the binds in the profile, followed by binding
287     aemp protocol listeners on all binds specified (it is possible and valid
288     to have no binds, meaning that the node cannot be contacted form the
289     outside. This means the node cannot talk to other nodes that also have no
290     binds, but it can still talk to all "normal" nodes).
291    
292 root 1.70 If the profile does not specify a binds list, then a default of C<*> is
293 root 1.72 used, meaning the node will bind on a dynamically-assigned port on every
294     local IP address it finds.
295    
296     =item step 3, connect to seed nodes
297 root 1.64
298 root 1.119 As the last step, the seed ID list from the profile is passed to the
299 root 1.64 L<AnyEvent::MP::Global> module, which will then use it to keep
300 root 1.72 connectivity with at least one node at any point in time.
301 root 1.64
302 root 1.72 =back
303    
304 root 1.87 Example: become a distributed node using the local node name as profile.
305 root 1.72 This should be the most common form of invocation for "daemon"-type nodes.
306 root 1.34
307 root 1.72 configure
308 root 1.34
309 root 1.126 Example: become a semi-anonymous node. This form is often used for
310     commandline clients.
311 root 1.34
312 root 1.126 configure nodeid => "myscript/%n/%u";
313 root 1.72
314 root 1.120 Example: configure a node using a profile called seed, which is suitable
315 root 1.72 for a seed node as it binds on all local addresses on a fixed port (4040,
316     customary for aemp).
317    
318     # use the aemp commandline utility
319 root 1.122 # aemp profile seed binds '*:4040'
320 root 1.72
321     # then use it
322     configure profile => "seed";
323 root 1.34
324 root 1.72 # or simply use aemp from the shell again:
325     # aemp run profile seed
326 root 1.34
327 root 1.72 # or provide a nicer-to-remember nodeid
328     # aemp run profile seed nodeid "$(hostname)"
329 root 1.34
330 root 1.22 =item $SELF
331    
332     Contains the current port id while executing C<rcv> callbacks or C<psub>
333     blocks.
334 root 1.3
335 root 1.67 =item *SELF, SELF, %SELF, @SELF...
336 root 1.22
337     Due to some quirks in how perl exports variables, it is impossible to
338 root 1.67 just export C<$SELF>, all the symbols named C<SELF> are exported by this
339 root 1.22 module, but only C<$SELF> is currently used.
340 root 1.3
341 root 1.33 =item snd $port, type => @data
342 root 1.3
343 root 1.33 =item snd $port, @msg
344 root 1.3
345 root 1.67 Send the given message to the given port, which can identify either a
346     local or a remote port, and must be a port ID.
347 root 1.8
348 root 1.67 While the message can be almost anything, it is highly recommended to
349     use a string as first element (a port ID, or some word that indicates a
350     request type etc.) and to consist if only simple perl values (scalars,
351     arrays, hashes) - if you think you need to pass an object, think again.
352    
353     The message data logically becomes read-only after a call to this
354     function: modifying any argument (or values referenced by them) is
355     forbidden, as there can be considerable time between the call to C<snd>
356     and the time the message is actually being serialised - in fact, it might
357     never be copied as within the same process it is simply handed to the
358     receiving port.
359 root 1.3
360     The type of data you can transfer depends on the transport protocol: when
361     JSON is used, then only strings, numbers and arrays and hashes consisting
362     of those are allowed (no objects). When Storable is used, then anything
363     that Storable can serialise and deserialise is allowed, and for the local
364 root 1.67 node, anything can be passed. Best rely only on the common denominator of
365     these.
366 root 1.3
367 root 1.22 =item $local_port = port
368 root 1.2
369 root 1.50 Create a new local port object and returns its port ID. Initially it has
370     no callbacks set and will throw an error when it receives messages.
371 root 1.10
372 root 1.50 =item $local_port = port { my @msg = @_ }
373 root 1.15
374 root 1.50 Creates a new local port, and returns its ID. Semantically the same as
375     creating a port and calling C<rcv $port, $callback> on it.
376 root 1.15
377 root 1.50 The block will be called for every message received on the port, with the
378     global variable C<$SELF> set to the port ID. Runtime errors will cause the
379     port to be C<kil>ed. The message will be passed as-is, no extra argument
380     (i.e. no port ID) will be passed to the callback.
381 root 1.15
382 root 1.50 If you want to stop/destroy the port, simply C<kil> it:
383 root 1.15
384 root 1.50 my $port = port {
385     my @msg = @_;
386     ...
387     kil $SELF;
388 root 1.15 };
389 root 1.10
390     =cut
391    
392 root 1.33 sub rcv($@);
393    
394 root 1.50 sub _kilme {
395     die "received message on port without callback";
396     }
397    
398 root 1.22 sub port(;&) {
399 root 1.123 my $id = $UNIQ . ++$ID;
400 root 1.22 my $port = "$NODE#$id";
401    
402 root 1.50 rcv $port, shift || \&_kilme;
403 root 1.10
404 root 1.22 $port
405 root 1.10 }
406    
407 root 1.50 =item rcv $local_port, $callback->(@msg)
408 root 1.31
409 root 1.50 Replaces the default callback on the specified port. There is no way to
410     remove the default callback: use C<sub { }> to disable it, or better
411     C<kil> the port when it is no longer needed.
412 root 1.3
413 root 1.33 The global C<$SELF> (exported by this module) contains C<$port> while
414 root 1.50 executing the callback. Runtime errors during callback execution will
415     result in the port being C<kil>ed.
416 root 1.22
417 root 1.50 The default callback received all messages not matched by a more specific
418     C<tag> match.
419 root 1.22
420 root 1.50 =item rcv $local_port, tag => $callback->(@msg_without_tag), ...
421 root 1.3
422 root 1.54 Register (or replace) callbacks to be called on messages starting with the
423     given tag on the given port (and return the port), or unregister it (when
424     C<$callback> is C<$undef> or missing). There can only be one callback
425     registered for each tag.
426 root 1.3
427 root 1.50 The original message will be passed to the callback, after the first
428     element (the tag) has been removed. The callback will use the same
429     environment as the default callback (see above).
430 root 1.3
431 root 1.36 Example: create a port and bind receivers on it in one go.
432    
433     my $port = rcv port,
434 root 1.50 msg1 => sub { ... },
435     msg2 => sub { ... },
436 root 1.36 ;
437    
438     Example: create a port, bind receivers and send it in a message elsewhere
439     in one go:
440    
441     snd $otherport, reply =>
442     rcv port,
443 root 1.50 msg1 => sub { ... },
444 root 1.36 ...
445     ;
446    
447 root 1.54 Example: temporarily register a rcv callback for a tag matching some port
448 root 1.102 (e.g. for an rpc reply) and unregister it after a message was received.
449 root 1.54
450     rcv $port, $otherport => sub {
451     my @reply = @_;
452    
453     rcv $SELF, $otherport;
454     };
455    
456 root 1.3 =cut
457    
458     sub rcv($@) {
459 root 1.33 my $port = shift;
460 root 1.75 my ($nodeid, $portid) = split /#/, $port, 2;
461 root 1.3
462 root 1.75 $NODE{$nodeid} == $NODE{""}
463 root 1.33 or Carp::croak "$port: rcv can only be called on local ports, caught";
464 root 1.22
465 root 1.50 while (@_) {
466     if (ref $_[0]) {
467     if (my $self = $PORT_DATA{$portid}) {
468     "AnyEvent::MP::Port" eq ref $self
469     or Carp::croak "$port: rcv can only be called on message matching ports, caught";
470 root 1.33
471 root 1.103 $self->[0] = shift;
472 root 1.50 } else {
473     my $cb = shift;
474     $PORT{$portid} = sub {
475     local $SELF = $port;
476     eval { &$cb }; _self_die if $@;
477     };
478     }
479     } elsif (defined $_[0]) {
480     my $self = $PORT_DATA{$portid} ||= do {
481 root 1.103 my $self = bless [$PORT{$portid} || sub { }, { }, $port], "AnyEvent::MP::Port";
482 root 1.50
483     $PORT{$portid} = sub {
484     local $SELF = $port;
485    
486     if (my $cb = $self->[1]{$_[0]}) {
487     shift;
488     eval { &$cb }; _self_die if $@;
489     } else {
490     &{ $self->[0] };
491 root 1.33 }
492     };
493 root 1.50
494     $self
495 root 1.33 };
496    
497 root 1.50 "AnyEvent::MP::Port" eq ref $self
498     or Carp::croak "$port: rcv can only be called on message matching ports, caught";
499 root 1.22
500 root 1.50 my ($tag, $cb) = splice @_, 0, 2;
501 root 1.33
502 root 1.50 if (defined $cb) {
503     $self->[1]{$tag} = $cb;
504 root 1.33 } else {
505 root 1.50 delete $self->[1]{$tag};
506 root 1.33 }
507 root 1.22 }
508 root 1.3 }
509 root 1.31
510 root 1.33 $port
511 root 1.2 }
512    
513 root 1.101 =item peval $port, $coderef[, @args]
514    
515     Evaluates the given C<$codref> within the contetx of C<$port>, that is,
516     when the code throews an exception the C<$port> will be killed.
517    
518     Any remaining args will be passed to the callback. Any return values will
519     be returned to the caller.
520    
521     This is useful when you temporarily want to execute code in the context of
522     a port.
523    
524     Example: create a port and run some initialisation code in it's context.
525    
526     my $port = port { ... };
527    
528     peval $port, sub {
529     init
530     or die "unable to init";
531     };
532    
533     =cut
534    
535     sub peval($$) {
536     local $SELF = shift;
537     my $cb = shift;
538    
539     if (wantarray) {
540     my @res = eval { &$cb };
541     _self_die if $@;
542     @res
543     } else {
544     my $res = eval { &$cb };
545     _self_die if $@;
546     $res
547     }
548     }
549    
550 root 1.22 =item $closure = psub { BLOCK }
551 root 1.2
552 root 1.22 Remembers C<$SELF> and creates a closure out of the BLOCK. When the
553     closure is executed, sets up the environment in the same way as in C<rcv>
554     callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
555    
556 root 1.101 The effect is basically as if it returned C<< sub { peval $SELF, sub {
557 root 1.114 BLOCK }, @_ } >>.
558 root 1.101
559 root 1.22 This is useful when you register callbacks from C<rcv> callbacks:
560    
561     rcv delayed_reply => sub {
562     my ($delay, @reply) = @_;
563     my $timer = AE::timer $delay, 0, psub {
564     snd @reply, $SELF;
565     };
566     };
567 root 1.3
568 root 1.8 =cut
569 root 1.3
570 root 1.22 sub psub(&) {
571     my $cb = shift;
572 root 1.3
573 root 1.22 my $port = $SELF
574     or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
575 root 1.1
576 root 1.22 sub {
577     local $SELF = $port;
578 root 1.2
579 root 1.22 if (wantarray) {
580     my @res = eval { &$cb };
581     _self_die if $@;
582     @res
583     } else {
584     my $res = eval { &$cb };
585     _self_die if $@;
586     $res
587     }
588     }
589 root 1.2 }
590    
591 root 1.67 =item $guard = mon $port, $cb->(@reason) # call $cb when $port dies
592 root 1.32
593 root 1.67 =item $guard = mon $port, $rcvport # kill $rcvport when $port dies
594 root 1.36
595 root 1.67 =item $guard = mon $port # kill $SELF when $port dies
596 root 1.32
597 root 1.67 =item $guard = mon $port, $rcvport, @msg # send a message when $port dies
598 root 1.32
599 root 1.42 Monitor the given port and do something when the port is killed or
600     messages to it were lost, and optionally return a guard that can be used
601     to stop monitoring again.
602    
603 root 1.36 In the first form (callback), the callback is simply called with any
604     number of C<@reason> elements (no @reason means that the port was deleted
605 root 1.32 "normally"). Note also that I<< the callback B<must> never die >>, so use
606     C<eval> if unsure.
607    
608 root 1.43 In the second form (another port given), the other port (C<$rcvport>)
609 elmex 1.77 will be C<kil>'ed with C<@reason>, if a @reason was specified, i.e. on
610 root 1.36 "normal" kils nothing happens, while under all other conditions, the other
611     port is killed with the same reason.
612 root 1.32
613 root 1.36 The third form (kill self) is the same as the second form, except that
614     C<$rvport> defaults to C<$SELF>.
615    
616     In the last form (message), a message of the form C<@msg, @reason> will be
617     C<snd>.
618 root 1.32
619 root 1.79 Monitoring-actions are one-shot: once messages are lost (and a monitoring
620     alert was raised), they are removed and will not trigger again.
621    
622 root 1.37 As a rule of thumb, monitoring requests should always monitor a port from
623     a local port (or callback). The reason is that kill messages might get
624     lost, just like any other message. Another less obvious reason is that
625 elmex 1.77 even monitoring requests can get lost (for example, when the connection
626 root 1.37 to the other node goes down permanently). When monitoring a port locally
627     these problems do not exist.
628    
629 root 1.79 C<mon> effectively guarantees that, in the absence of hardware failures,
630     after starting the monitor, either all messages sent to the port will
631     arrive, or the monitoring action will be invoked after possible message
632     loss has been detected. No messages will be lost "in between" (after
633     the first lost message no further messages will be received by the
634     port). After the monitoring action was invoked, further messages might get
635     delivered again.
636    
637     Inter-host-connection timeouts and monitoring depend on the transport
638     used. The only transport currently implemented is TCP, and AnyEvent::MP
639     relies on TCP to detect node-downs (this can take 10-15 minutes on a
640 elmex 1.96 non-idle connection, and usually around two hours for idle connections).
641 root 1.79
642     This means that monitoring is good for program errors and cleaning up
643     stuff eventually, but they are no replacement for a timeout when you need
644     to ensure some maximum latency.
645    
646 root 1.32 Example: call a given callback when C<$port> is killed.
647    
648     mon $port, sub { warn "port died because of <@_>\n" };
649    
650     Example: kill ourselves when C<$port> is killed abnormally.
651    
652 root 1.36 mon $port;
653 root 1.32
654 root 1.36 Example: send us a restart message when another C<$port> is killed.
655 root 1.32
656     mon $port, $self => "restart";
657    
658     =cut
659    
660     sub mon {
661 root 1.75 my ($nodeid, $port) = split /#/, shift, 2;
662 root 1.32
663 root 1.75 my $node = $NODE{$nodeid} || add_node $nodeid;
664 root 1.32
665 root 1.41 my $cb = @_ ? shift : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,';
666 root 1.32
667     unless (ref $cb) {
668     if (@_) {
669     # send a kill info message
670 root 1.41 my (@msg) = ($cb, @_);
671 root 1.32 $cb = sub { snd @msg, @_ };
672     } else {
673     # simply kill other port
674     my $port = $cb;
675     $cb = sub { kil $port, @_ if @_ };
676     }
677     }
678    
679     $node->monitor ($port, $cb);
680    
681     defined wantarray
682 root 1.124 and ($cb += 0, Guard::guard { $node->unmonitor ($port, $cb) })
683 root 1.32 }
684    
685     =item $guard = mon_guard $port, $ref, $ref...
686    
687     Monitors the given C<$port> and keeps the passed references. When the port
688     is killed, the references will be freed.
689    
690     Optionally returns a guard that will stop the monitoring.
691    
692     This function is useful when you create e.g. timers or other watchers and
693 root 1.67 want to free them when the port gets killed (note the use of C<psub>):
694 root 1.32
695     $port->rcv (start => sub {
696 root 1.67 my $timer; $timer = mon_guard $port, AE::timer 1, 1, psub {
697 root 1.32 undef $timer if 0.9 < rand;
698     });
699     });
700    
701     =cut
702    
703     sub mon_guard {
704     my ($port, @refs) = @_;
705    
706 root 1.36 #TODO: mon-less form?
707    
708 root 1.32 mon $port, sub { 0 && @refs }
709     }
710    
711 root 1.33 =item kil $port[, @reason]
712 root 1.32
713     Kill the specified port with the given C<@reason>.
714    
715 root 1.107 If no C<@reason> is specified, then the port is killed "normally" -
716     monitor callback will be invoked, but the kil will not cause linked ports
717     (C<mon $mport, $lport> form) to get killed.
718 root 1.32
719 root 1.107 If a C<@reason> is specified, then linked ports (C<mon $mport, $lport>
720     form) get killed with the same reason.
721 root 1.32
722     Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
723     will be reported as reason C<< die => $@ >>.
724    
725     Transport/communication errors are reported as C<< transport_error =>
726     $message >>.
727    
728 root 1.38 =cut
729    
730     =item $port = spawn $node, $initfunc[, @initdata]
731    
732     Creates a port on the node C<$node> (which can also be a port ID, in which
733     case it's the node where that port resides).
734    
735 root 1.67 The port ID of the newly created port is returned immediately, and it is
736     possible to immediately start sending messages or to monitor the port.
737 root 1.38
738 root 1.67 After the port has been created, the init function is called on the remote
739     node, in the same context as a C<rcv> callback. This function must be a
740     fully-qualified function name (e.g. C<MyApp::Chat::Server::init>). To
741     specify a function in the main program, use C<::name>.
742 root 1.38
743     If the function doesn't exist, then the node tries to C<require>
744     the package, then the package above the package and so on (e.g.
745     C<MyApp::Chat::Server>, C<MyApp::Chat>, C<MyApp>) until the function
746     exists or it runs out of package names.
747    
748     The init function is then called with the newly-created port as context
749 root 1.82 object (C<$SELF>) and the C<@initdata> values as arguments. It I<must>
750     call one of the C<rcv> functions to set callbacks on C<$SELF>, otherwise
751     the port might not get created.
752 root 1.38
753 root 1.67 A common idiom is to pass a local port, immediately monitor the spawned
754     port, and in the remote init function, immediately monitor the passed
755     local port. This two-way monitoring ensures that both ports get cleaned up
756     when there is a problem.
757 root 1.38
758 root 1.80 C<spawn> guarantees that the C<$initfunc> has no visible effects on the
759     caller before C<spawn> returns (by delaying invocation when spawn is
760     called for the local node).
761    
762 root 1.38 Example: spawn a chat server port on C<$othernode>.
763    
764     # this node, executed from within a port context:
765     my $server = spawn $othernode, "MyApp::Chat::Server::connect", $SELF;
766     mon $server;
767    
768     # init function on C<$othernode>
769     sub connect {
770     my ($srcport) = @_;
771    
772     mon $srcport;
773    
774     rcv $SELF, sub {
775     ...
776     };
777     }
778    
779     =cut
780    
781     sub _spawn {
782     my $port = shift;
783     my $init = shift;
784    
785 root 1.82 # rcv will create the actual port
786 root 1.38 local $SELF = "$NODE#$port";
787     eval {
788     &{ load_func $init }
789     };
790     _self_die if $@;
791     }
792    
793     sub spawn(@) {
794 root 1.75 my ($nodeid, undef) = split /#/, shift, 2;
795 root 1.38
796 root 1.123 my $id = $RUNIQ . ++$ID;
797 root 1.38
798 root 1.39 $_[0] =~ /::/
799     or Carp::croak "spawn init function must be a fully-qualified name, caught";
800    
801 root 1.75 snd_to_func $nodeid, "AnyEvent::MP::_spawn" => $id, @_;
802 root 1.38
803 root 1.75 "$nodeid#$id"
804 root 1.38 }
805    
806 root 1.121
807 root 1.59 =item after $timeout, @msg
808    
809     =item after $timeout, $callback
810    
811     Either sends the given message, or call the given callback, after the
812     specified number of seconds.
813    
814 root 1.67 This is simply a utility function that comes in handy at times - the
815     AnyEvent::MP author is not convinced of the wisdom of having it, though,
816     so it may go away in the future.
817 root 1.59
818     =cut
819    
820     sub after($@) {
821     my ($timeout, @action) = @_;
822    
823     my $t; $t = AE::timer $timeout, 0, sub {
824     undef $t;
825     ref $action[0]
826     ? $action[0]()
827     : snd @action;
828     };
829     }
830    
831 root 1.87 =item cal $port, @msg, $callback[, $timeout]
832    
833     A simple form of RPC - sends a message to the given C<$port> with the
834     given contents (C<@msg>), but adds a reply port to the message.
835    
836     The reply port is created temporarily just for the purpose of receiving
837     the reply, and will be C<kil>ed when no longer needed.
838    
839     A reply message sent to the port is passed to the C<$callback> as-is.
840    
841     If an optional time-out (in seconds) is given and it is not C<undef>,
842     then the callback will be called without any arguments after the time-out
843     elapsed and the port is C<kil>ed.
844    
845 root 1.98 If no time-out is given (or it is C<undef>), then the local port will
846     monitor the remote port instead, so it eventually gets cleaned-up.
847 root 1.87
848     Currently this function returns the temporary port, but this "feature"
849     might go in future versions unless you can make a convincing case that
850     this is indeed useful for something.
851    
852     =cut
853    
854     sub cal(@) {
855     my $timeout = ref $_[-1] ? undef : pop;
856     my $cb = pop;
857    
858     my $port = port {
859     undef $timeout;
860     kil $SELF;
861     &$cb;
862     };
863    
864     if (defined $timeout) {
865     $timeout = AE::timer $timeout, 0, sub {
866     undef $timeout;
867     kil $port;
868     $cb->();
869     };
870     } else {
871     mon $_[0], sub {
872     kil $port;
873     $cb->();
874     };
875     }
876    
877     push @_, $port;
878     &snd;
879    
880     $port
881     }
882    
883 root 1.8 =back
884    
885 root 1.124 =head1 DISTRIBUTED DATABASE
886    
887     AnyEvent::MP comes with a simple distributed database. The database will
888     be mirrored asynchronously at all global nodes. Other nodes bind to one of
889     the global nodes for their needs.
890    
891     The database consists of a two-level hash - a hash contains a hash which
892     contains values.
893    
894     The top level hash key is called "family", and the second-level hash key
895 root 1.126 is called "subkey" or simply "key".
896 root 1.124
897 root 1.125 The family must be alphanumeric, i.e. start with a letter and consist
898     of letters, digits, underscores and colons (C<[A-Za-z][A-Za-z0-9_:]*>,
899     pretty much like Perl module names.
900 root 1.124
901 root 1.125 As the family namespace is global, it is recommended to prefix family names
902 root 1.124 with the name of the application or module using it.
903    
904 root 1.126 The subkeys must be non-empty strings, with no further restrictions.
905 root 1.125
906 root 1.124 The values should preferably be strings, but other perl scalars should
907 root 1.125 work as well (such as undef, arrays and hashes).
908 root 1.124
909 root 1.126 Every database entry is owned by one node - adding the same family/subkey
910 root 1.124 combination on multiple nodes will not cause discomfort for AnyEvent::MP,
911     but the result might be nondeterministic, i.e. the key might have
912     different values on different nodes.
913    
914 root 1.126 Different subkeys in the same family can be owned by different nodes
915     without problems, and in fact, this is the common method to create worker
916     pools. For example, a worker port for image scaling might do this:
917 root 1.124
918 root 1.126 db_set my_image_scalers => $port;
919 root 1.124
920 root 1.126 And clients looking for an image scaler will want to get the
921     C<my_image_scalers> keys:
922    
923     db_keys "my_image_scalers" => 60 => sub {
924     #d##TODO#
925    
926     =over
927    
928     =item db_set $family => $subkey [=> $value]
929    
930     Sets (or replaces) a key to the database - if C<$value> is omitted,
931     C<undef> is used instead.
932    
933     =item db_del $family => $subkey
934 root 1.124
935     Deletes a key from the database.
936    
937 root 1.126 =item $guard = db_reg $family => $subkey [=> $value]
938 root 1.124
939     Sets the key on the database and returns a guard. When the guard is
940     destroyed, the key is deleted from the database. If C<$value> is missing,
941     then C<undef> is used.
942    
943     =cut
944    
945     =back
946    
947 root 1.26 =head1 AnyEvent::MP vs. Distributed Erlang
948    
949 root 1.35 AnyEvent::MP got lots of its ideas from distributed Erlang (Erlang node
950     == aemp node, Erlang process == aemp port), so many of the documents and
951     programming techniques employed by Erlang apply to AnyEvent::MP. Here is a
952 root 1.27 sample:
953    
954 root 1.95 http://www.erlang.se/doc/programming_rules.shtml
955     http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4
956     http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6
957     http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5
958 root 1.27
959     Despite the similarities, there are also some important differences:
960 root 1.26
961     =over 4
962    
963 root 1.65 =item * Node IDs are arbitrary strings in AEMP.
964 root 1.26
965 root 1.65 Erlang relies on special naming and DNS to work everywhere in the same
966     way. AEMP relies on each node somehow knowing its own address(es) (e.g. by
967 root 1.99 configuration or DNS), and possibly the addresses of some seed nodes, but
968     will otherwise discover other nodes (and their IDs) itself.
969 root 1.27
970 root 1.54 =item * Erlang has a "remote ports are like local ports" philosophy, AEMP
971 root 1.51 uses "local ports are like remote ports".
972    
973     The failure modes for local ports are quite different (runtime errors
974     only) then for remote ports - when a local port dies, you I<know> it dies,
975     when a connection to another node dies, you know nothing about the other
976     port.
977    
978     Erlang pretends remote ports are as reliable as local ports, even when
979     they are not.
980    
981     AEMP encourages a "treat remote ports differently" philosophy, with local
982     ports being the special case/exception, where transport errors cannot
983     occur.
984    
985 root 1.26 =item * Erlang uses processes and a mailbox, AEMP does not queue.
986    
987 root 1.119 Erlang uses processes that selectively receive messages out of order, and
988     therefore needs a queue. AEMP is event based, queuing messages would serve
989     no useful purpose. For the same reason the pattern-matching abilities
990     of AnyEvent::MP are more limited, as there is little need to be able to
991 elmex 1.77 filter messages without dequeuing them.
992 root 1.26
993 root 1.119 This is not a philosophical difference, but simply stems from AnyEvent::MP
994     being event-based, while Erlang is process-based.
995    
996     You cna have a look at L<Coro::MP> for a more Erlang-like process model on
997     top of AEMP and Coro threads.
998 root 1.26
999     =item * Erlang sends are synchronous, AEMP sends are asynchronous.
1000    
1001 root 1.119 Sending messages in Erlang is synchronous and blocks the process until
1002     a conenction has been established and the message sent (and so does not
1003     need a queue that can overflow). AEMP sends return immediately, connection
1004     establishment is handled in the background.
1005 root 1.26
1006 root 1.51 =item * Erlang suffers from silent message loss, AEMP does not.
1007 root 1.26
1008 root 1.99 Erlang implements few guarantees on messages delivery - messages can get
1009     lost without any of the processes realising it (i.e. you send messages a,
1010     b, and c, and the other side only receives messages a and c).
1011 root 1.26
1012 root 1.117 AEMP guarantees (modulo hardware errors) correct ordering, and the
1013     guarantee that after one message is lost, all following ones sent to the
1014     same port are lost as well, until monitoring raises an error, so there are
1015     no silent "holes" in the message sequence.
1016 root 1.26
1017 root 1.119 If you want your software to be very reliable, you have to cope with
1018     corrupted and even out-of-order messages in both Erlang and AEMP. AEMP
1019     simply tries to work better in common error cases, such as when a network
1020     link goes down.
1021    
1022 root 1.26 =item * Erlang can send messages to the wrong port, AEMP does not.
1023    
1024 root 1.119 In Erlang it is quite likely that a node that restarts reuses an Erlang
1025     process ID known to other nodes for a completely different process,
1026     causing messages destined for that process to end up in an unrelated
1027     process.
1028 root 1.26
1029 root 1.119 AEMP does not reuse port IDs, so old messages or old port IDs floating
1030 root 1.26 around in the network will not be sent to an unrelated port.
1031    
1032     =item * Erlang uses unprotected connections, AEMP uses secure
1033     authentication and can use TLS.
1034    
1035 root 1.66 AEMP can use a proven protocol - TLS - to protect connections and
1036 root 1.26 securely authenticate nodes.
1037    
1038 root 1.28 =item * The AEMP protocol is optimised for both text-based and binary
1039     communications.
1040    
1041 root 1.66 The AEMP protocol, unlike the Erlang protocol, supports both programming
1042 root 1.119 language independent text-only protocols (good for debugging), and binary,
1043 root 1.67 language-specific serialisers (e.g. Storable). By default, unless TLS is
1044     used, the protocol is actually completely text-based.
1045 root 1.28
1046     It has also been carefully designed to be implementable in other languages
1047 root 1.66 with a minimum of work while gracefully degrading functionality to make the
1048 root 1.28 protocol simple.
1049    
1050 root 1.35 =item * AEMP has more flexible monitoring options than Erlang.
1051    
1052 root 1.119 In Erlang, you can chose to receive I<all> exit signals as messages or
1053     I<none>, there is no in-between, so monitoring single Erlang processes is
1054     difficult to implement.
1055    
1056     Monitoring in AEMP is more flexible than in Erlang, as one can choose
1057     between automatic kill, exit message or callback on a per-port basis.
1058 root 1.35
1059 root 1.37 =item * Erlang tries to hide remote/local connections, AEMP does not.
1060 root 1.35
1061 root 1.67 Monitoring in Erlang is not an indicator of process death/crashes, in the
1062     same way as linking is (except linking is unreliable in Erlang).
1063 root 1.37
1064     In AEMP, you don't "look up" registered port names or send to named ports
1065     that might or might not be persistent. Instead, you normally spawn a port
1066 root 1.67 on the remote node. The init function monitors you, and you monitor the
1067     remote port. Since both monitors are local to the node, they are much more
1068     reliable (no need for C<spawn_link>).
1069 root 1.37
1070     This also saves round-trips and avoids sending messages to the wrong port
1071     (hard to do in Erlang).
1072 root 1.35
1073 root 1.26 =back
1074    
1075 root 1.46 =head1 RATIONALE
1076    
1077     =over 4
1078    
1079 root 1.67 =item Why strings for port and node IDs, why not objects?
1080 root 1.46
1081     We considered "objects", but found that the actual number of methods
1082 root 1.67 that can be called are quite low. Since port and node IDs travel over
1083 root 1.46 the network frequently, the serialising/deserialising would add lots of
1084 root 1.67 overhead, as well as having to keep a proxy object everywhere.
1085 root 1.46
1086     Strings can easily be printed, easily serialised etc. and need no special
1087     procedures to be "valid".
1088    
1089 root 1.110 And as a result, a port with just a default receiver consists of a single
1090 root 1.117 code reference stored in a global hash - it can't become much cheaper.
1091 root 1.47
1092 root 1.67 =item Why favour JSON, why not a real serialising format such as Storable?
1093 root 1.46
1094     In fact, any AnyEvent::MP node will happily accept Storable as framing
1095     format, but currently there is no way to make a node use Storable by
1096 root 1.67 default (although all nodes will accept it).
1097 root 1.46
1098     The default framing protocol is JSON because a) JSON::XS is many times
1099     faster for small messages and b) most importantly, after years of
1100     experience we found that object serialisation is causing more problems
1101 root 1.67 than it solves: Just like function calls, objects simply do not travel
1102 root 1.46 easily over the network, mostly because they will always be a copy, so you
1103     always have to re-think your design.
1104    
1105     Keeping your messages simple, concentrating on data structures rather than
1106     objects, will keep your messages clean, tidy and efficient.
1107    
1108     =back
1109    
1110 root 1.1 =head1 SEE ALSO
1111    
1112 root 1.68 L<AnyEvent::MP::Intro> - a gentle introduction.
1113    
1114     L<AnyEvent::MP::Kernel> - more, lower-level, stuff.
1115    
1116 root 1.113 L<AnyEvent::MP::Global> - network maintenance and port groups, to find
1117 root 1.68 your applications.
1118    
1119 root 1.105 L<AnyEvent::MP::DataConn> - establish data connections between nodes.
1120    
1121 root 1.81 L<AnyEvent::MP::LogCatcher> - simple service to display log messages from
1122     all nodes.
1123    
1124 root 1.1 L<AnyEvent>.
1125    
1126     =head1 AUTHOR
1127    
1128     Marc Lehmann <schmorp@schmorp.de>
1129     http://home.schmorp.de/
1130    
1131     =cut
1132    
1133     1
1134