ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.133
Committed: Mon Mar 12 10:34:06 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.132: +14 -3 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.89 AnyEvent::MP - erlang-style multi-processing/message-passing framework
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.75 $NODE # contains this node's node ID
10     NODE # returns this node's node ID
11 root 1.2
12 root 1.38 $SELF # receiving/own port id in rcv callbacks
13    
14 root 1.48 # initialise the node so it can send/receive messages
15 root 1.72 configure;
16 root 1.48
17 root 1.75 # ports are message destinations
18 root 1.38
19     # sending messages
20 root 1.2 snd $port, type => data...;
21 root 1.38 snd $port, @msg;
22     snd @msg_with_first_element_being_a_port;
23 root 1.2
24 root 1.50 # creating/using ports, the simple way
25 root 1.73 my $simple_port = port { my @msg = @_ };
26 root 1.22
27 root 1.52 # creating/using ports, tagged message matching
28 root 1.38 my $port = port;
29 root 1.73 rcv $port, ping => sub { snd $_[0], "pong" };
30     rcv $port, pong => sub { warn "pong received\n" };
31 root 1.2
32 root 1.48 # create a port on another node
33     my $port = spawn $node, $initfunc, @initdata;
34    
35 root 1.116 # destroy a port again
36 root 1.101 kil $port; # "normal" kill
37     kil $port, my_error => "everything is broken"; # error kill
38    
39 root 1.35 # monitoring
40 root 1.129 mon $port, $cb->(@msg) # callback is invoked on death
41     mon $port, $localport # kill localport on abnormal death
42     mon $port, $localport, @msg # send message on death
43 root 1.35
44 root 1.101 # temporarily execute code in port context
45     peval $port, sub { die "kill the port!" };
46    
47     # execute callbacks in $SELF port context
48     my $timer = AE::timer 1, 0, psub {
49     die "kill the port, delayed";
50     };
51    
52 root 1.45 =head1 CURRENT STATUS
53    
54 root 1.71 bin/aemp - stable.
55     AnyEvent::MP - stable API, should work.
56 elmex 1.77 AnyEvent::MP::Intro - explains most concepts.
57 root 1.88 AnyEvent::MP::Kernel - mostly stable API.
58     AnyEvent::MP::Global - stable API.
59 root 1.45
60 root 1.1 =head1 DESCRIPTION
61    
62 root 1.2 This module (-family) implements a simple message passing framework.
63    
64     Despite its simplicity, you can securely message other processes running
65 root 1.67 on the same or other hosts, and you can supervise entities remotely.
66 root 1.2
67 root 1.23 For an introduction to this module family, see the L<AnyEvent::MP::Intro>
68 root 1.67 manual page and the examples under F<eg/>.
69 root 1.23
70 root 1.2 =head1 CONCEPTS
71    
72     =over 4
73    
74     =item port
75    
76 root 1.79 Not to be confused with a TCP port, a "port" is something you can send
77     messages to (with the C<snd> function).
78 root 1.29
79 root 1.53 Ports allow you to register C<rcv> handlers that can match all or just
80 root 1.64 some messages. Messages send to ports will not be queued, regardless of
81     anything was listening for them or not.
82 root 1.2
83 root 1.119 Ports are represented by (printable) strings called "port IDs".
84    
85 root 1.67 =item port ID - C<nodeid#portname>
86 root 1.2
87 root 1.123 A port ID is the concatenation of a node ID, a hash-mark (C<#>)
88     as separator, and a port name (a printable string of unspecified
89     format created by AnyEvent::MP).
90 root 1.2
91     =item node
92    
93 root 1.53 A node is a single process containing at least one port - the node port,
94 root 1.67 which enables nodes to manage each other remotely, and to create new
95 root 1.53 ports.
96 root 1.2
97 root 1.67 Nodes are either public (have one or more listening ports) or private
98     (no listening ports). Private nodes cannot talk to other private nodes
99 root 1.119 currently, but all nodes can talk to public nodes.
100    
101     Nodes is represented by (printable) strings called "node IDs".
102 root 1.2
103 root 1.117 =item node ID - C<[A-Za-z0-9_\-.:]*>
104 root 1.2
105 root 1.64 A node ID is a string that uniquely identifies the node within a
106     network. Depending on the configuration used, node IDs can look like a
107     hostname, a hostname and a port, or a random string. AnyEvent::MP itself
108 root 1.119 doesn't interpret node IDs in any way except to uniquely identify a node.
109 root 1.64
110     =item binds - C<ip:port>
111    
112     Nodes can only talk to each other by creating some kind of connection to
113     each other. To do this, nodes should listen on one or more local transport
114 root 1.119 endpoints - binds.
115    
116     Currently, only standard C<ip:port> specifications can be used, which
117     specify TCP ports to listen on. So a bind is basically just a tcp socket
118     in listening mode thta accepts conenctions form other nodes.
119 root 1.64
120 root 1.83 =item seed nodes
121 root 1.64
122 root 1.119 When a node starts, it knows nothing about the network it is in - it
123     needs to connect to at least one other node that is already in the
124     network. These other nodes are called "seed nodes".
125    
126     Seed nodes themselves are not special - they are seed nodes only because
127     some other node I<uses> them as such, but any node can be used as seed
128     node for other nodes, and eahc node cna use a different set of seed nodes.
129 root 1.83
130     In addition to discovering the network, seed nodes are also used to
131 root 1.119 maintain the network - all nodes using the same seed node form are part of
132     the same network. If a network is split into multiple subnets because e.g.
133     the network link between the parts goes down, then using the same seed
134     nodes for all nodes ensures that eventually the subnets get merged again.
135 root 1.83
136     Seed nodes are expected to be long-running, and at least one seed node
137 root 1.85 should always be available. They should also be relatively responsive - a
138     seed node that blocks for long periods will slow down everybody else.
139 root 1.83
140 root 1.119 For small networks, it's best if every node uses the same set of seed
141     nodes. For large networks, it can be useful to specify "regional" seed
142     nodes for most nodes in an area, and use all seed nodes as seed nodes for
143     each other. What's important is that all seed nodes connections form a
144     complete graph, so that the network cannot split into separate subnets
145     forever.
146    
147     Seed nodes are represented by seed IDs.
148    
149     =item seed IDs - C<host:port>
150 root 1.83
151 root 1.119 Seed IDs are transport endpoint(s) (usually a hostname/IP address and a
152 elmex 1.96 TCP port) of nodes that should be used as seed nodes.
153 root 1.29
154 root 1.119 =item global nodes
155    
156     An AEMP network needs a discovery service - nodes need to know how to
157     connect to other nodes they only know by name. In addition, AEMP offers a
158     distributed "group database", which maps group names to a list of strings
159     - for example, to register worker ports.
160    
161     A network needs at least one global node to work, and allows every node to
162     be a global node.
163    
164     Any node that loads the L<AnyEvent::MP::Global> module becomes a global
165     node and tries to keep connections to all other nodes. So while it can
166     make sense to make every node "global" in small networks, it usually makes
167     sense to only make seed nodes into global nodes in large networks (nodes
168     keep connections to seed nodes and global nodes, so makign them the same
169     reduces overhead).
170 root 1.67
171 root 1.2 =back
172    
173 root 1.3 =head1 VARIABLES/FUNCTIONS
174 root 1.2
175     =over 4
176    
177 root 1.1 =cut
178    
179     package AnyEvent::MP;
180    
181 root 1.121 use AnyEvent::MP::Config ();
182 root 1.44 use AnyEvent::MP::Kernel;
183 root 1.121 use AnyEvent::MP::Kernel qw(%NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID);
184 root 1.2
185 root 1.1 use common::sense;
186    
187 root 1.2 use Carp ();
188    
189 root 1.1 use AE ();
190 root 1.124 use Guard ();
191 root 1.1
192 root 1.2 use base "Exporter";
193    
194 root 1.121 our $VERSION = $AnyEvent::MP::Config::VERSION;
195 root 1.43
196 root 1.8 our @EXPORT = qw(
197 root 1.59 NODE $NODE *SELF node_of after
198 root 1.72 configure
199 root 1.101 snd rcv mon mon_guard kil psub peval spawn cal
200 root 1.22 port
201 root 1.124 db_set db_del db_reg
202 root 1.128 db_mon db_family db_keys db_values
203 root 1.8 );
204 root 1.2
205 root 1.22 our $SELF;
206    
207     sub _self_die() {
208     my $msg = $@;
209     $msg =~ s/\n+$// unless ref $msg;
210     kil $SELF, die => $msg;
211     }
212    
213     =item $thisnode = NODE / $NODE
214    
215 root 1.67 The C<NODE> function returns, and the C<$NODE> variable contains, the node
216 root 1.64 ID of the node running in the current process. This value is initialised by
217 root 1.72 a call to C<configure>.
218 root 1.22
219 root 1.63 =item $nodeid = node_of $port
220 root 1.22
221 root 1.67 Extracts and returns the node ID from a port ID or a node ID.
222 root 1.34
223 root 1.78 =item configure $profile, key => value...
224    
225 root 1.72 =item configure key => value...
226 root 1.34
227 root 1.64 Before a node can talk to other nodes on the network (i.e. enter
228 root 1.72 "distributed mode") it has to configure itself - the minimum a node needs
229 root 1.64 to know is its own name, and optionally it should know the addresses of
230     some other nodes in the network to discover other nodes.
231 root 1.34
232 root 1.121 This function configures a node - it must be called exactly once (or
233     never) before calling other AnyEvent::MP functions.
234    
235 root 1.108 The key/value pairs are basically the same ones as documented for the
236 root 1.127 F<aemp> command line utility (sans the set/del prefix), with these additions:
237 root 1.121
238     =over 4
239    
240     =item norc => $boolean (default false)
241    
242     If true, then the rc file (e.g. F<~/.perl-anyevent-mp>) will I<not>
243     be consulted - all configuraiton options must be specified in the
244     C<configure> call.
245 root 1.108
246 root 1.121 =item force => $boolean (default false)
247    
248     IF true, then the values specified in the C<configure> will take
249     precedence over any values configured via the rc file. The default is for
250     the rc file to override any options specified in the program.
251    
252 root 1.127 =item secure => $pass->($nodeid)
253    
254     In addition to specifying a boolean, you can specify a code reference that
255     is called for every remote execution attempt - the execution request is
256     granted iff the callback returns a true value.
257    
258     See F<semp setsecure> for more info.
259    
260 root 1.121 =back
261 root 1.34
262 root 1.72 =over 4
263    
264     =item step 1, gathering configuration from profiles
265    
266     The function first looks up a profile in the aemp configuration (see the
267     L<aemp> commandline utility). The profile name can be specified via the
268 root 1.78 named C<profile> parameter or can simply be the first parameter). If it is
269     missing, then the nodename (F<uname -n>) will be used as profile name.
270 root 1.34
271 root 1.72 The profile data is then gathered as follows:
272 root 1.69
273 elmex 1.77 First, all remaining key => value pairs (all of which are conveniently
274 root 1.72 undocumented at the moment) will be interpreted as configuration
275     data. Then they will be overwritten by any values specified in the global
276     default configuration (see the F<aemp> utility), then the chain of
277     profiles chosen by the profile name (and any C<parent> attributes).
278    
279     That means that the values specified in the profile have highest priority
280     and the values specified directly via C<configure> have lowest priority,
281     and can only be used to specify defaults.
282 root 1.49
283 root 1.64 If the profile specifies a node ID, then this will become the node ID of
284 root 1.122 this process. If not, then the profile name will be used as node ID, with
285 root 1.126 a unique randoms tring (C</%u>) appended.
286 root 1.122
287 root 1.126 The node ID can contain some C<%> sequences that are expanded: C<%n>
288     is expanded to the local nodename, C<%u> is replaced by a random
289     strign to make the node unique. For example, the F<aemp> commandline
290     utility uses C<aemp/%n/%u> as nodename, which might expand to
291     C<aemp/cerebro/ZQDGSIkRhEZQDGSIkRhE>.
292 root 1.64
293 root 1.72 =item step 2, bind listener sockets
294    
295 root 1.64 The next step is to look up the binds in the profile, followed by binding
296     aemp protocol listeners on all binds specified (it is possible and valid
297     to have no binds, meaning that the node cannot be contacted form the
298     outside. This means the node cannot talk to other nodes that also have no
299     binds, but it can still talk to all "normal" nodes).
300    
301 root 1.70 If the profile does not specify a binds list, then a default of C<*> is
302 root 1.72 used, meaning the node will bind on a dynamically-assigned port on every
303     local IP address it finds.
304    
305     =item step 3, connect to seed nodes
306 root 1.64
307 root 1.119 As the last step, the seed ID list from the profile is passed to the
308 root 1.64 L<AnyEvent::MP::Global> module, which will then use it to keep
309 root 1.72 connectivity with at least one node at any point in time.
310 root 1.64
311 root 1.72 =back
312    
313 root 1.87 Example: become a distributed node using the local node name as profile.
314 root 1.72 This should be the most common form of invocation for "daemon"-type nodes.
315 root 1.34
316 root 1.72 configure
317 root 1.34
318 root 1.126 Example: become a semi-anonymous node. This form is often used for
319     commandline clients.
320 root 1.34
321 root 1.126 configure nodeid => "myscript/%n/%u";
322 root 1.72
323 root 1.120 Example: configure a node using a profile called seed, which is suitable
324 root 1.72 for a seed node as it binds on all local addresses on a fixed port (4040,
325     customary for aemp).
326    
327     # use the aemp commandline utility
328 root 1.122 # aemp profile seed binds '*:4040'
329 root 1.72
330     # then use it
331     configure profile => "seed";
332 root 1.34
333 root 1.72 # or simply use aemp from the shell again:
334     # aemp run profile seed
335 root 1.34
336 root 1.72 # or provide a nicer-to-remember nodeid
337     # aemp run profile seed nodeid "$(hostname)"
338 root 1.34
339 root 1.22 =item $SELF
340    
341     Contains the current port id while executing C<rcv> callbacks or C<psub>
342     blocks.
343 root 1.3
344 root 1.67 =item *SELF, SELF, %SELF, @SELF...
345 root 1.22
346     Due to some quirks in how perl exports variables, it is impossible to
347 root 1.67 just export C<$SELF>, all the symbols named C<SELF> are exported by this
348 root 1.22 module, but only C<$SELF> is currently used.
349 root 1.3
350 root 1.33 =item snd $port, type => @data
351 root 1.3
352 root 1.33 =item snd $port, @msg
353 root 1.3
354 root 1.67 Send the given message to the given port, which can identify either a
355     local or a remote port, and must be a port ID.
356 root 1.8
357 root 1.67 While the message can be almost anything, it is highly recommended to
358     use a string as first element (a port ID, or some word that indicates a
359     request type etc.) and to consist if only simple perl values (scalars,
360     arrays, hashes) - if you think you need to pass an object, think again.
361    
362     The message data logically becomes read-only after a call to this
363     function: modifying any argument (or values referenced by them) is
364     forbidden, as there can be considerable time between the call to C<snd>
365     and the time the message is actually being serialised - in fact, it might
366     never be copied as within the same process it is simply handed to the
367     receiving port.
368 root 1.3
369     The type of data you can transfer depends on the transport protocol: when
370     JSON is used, then only strings, numbers and arrays and hashes consisting
371     of those are allowed (no objects). When Storable is used, then anything
372     that Storable can serialise and deserialise is allowed, and for the local
373 root 1.67 node, anything can be passed. Best rely only on the common denominator of
374     these.
375 root 1.3
376 root 1.22 =item $local_port = port
377 root 1.2
378 root 1.50 Create a new local port object and returns its port ID. Initially it has
379     no callbacks set and will throw an error when it receives messages.
380 root 1.10
381 root 1.50 =item $local_port = port { my @msg = @_ }
382 root 1.15
383 root 1.50 Creates a new local port, and returns its ID. Semantically the same as
384     creating a port and calling C<rcv $port, $callback> on it.
385 root 1.15
386 root 1.50 The block will be called for every message received on the port, with the
387     global variable C<$SELF> set to the port ID. Runtime errors will cause the
388     port to be C<kil>ed. The message will be passed as-is, no extra argument
389     (i.e. no port ID) will be passed to the callback.
390 root 1.15
391 root 1.50 If you want to stop/destroy the port, simply C<kil> it:
392 root 1.15
393 root 1.50 my $port = port {
394     my @msg = @_;
395     ...
396     kil $SELF;
397 root 1.15 };
398 root 1.10
399     =cut
400    
401 root 1.33 sub rcv($@);
402    
403 root 1.132 my $KILME = sub {
404 root 1.133 (my $tag = substr $_[0], 0, 30) =~ s/([\x20-\x7e])/./g;
405     kil $SELF, unhandled_message => "no callback set for message (first element $tag)";
406 root 1.132 };
407 root 1.50
408 root 1.22 sub port(;&) {
409 root 1.123 my $id = $UNIQ . ++$ID;
410 root 1.22 my $port = "$NODE#$id";
411    
412 root 1.132 rcv $port, shift || $KILME;
413 root 1.10
414 root 1.22 $port
415 root 1.10 }
416    
417 root 1.50 =item rcv $local_port, $callback->(@msg)
418 root 1.31
419 root 1.50 Replaces the default callback on the specified port. There is no way to
420     remove the default callback: use C<sub { }> to disable it, or better
421     C<kil> the port when it is no longer needed.
422 root 1.3
423 root 1.33 The global C<$SELF> (exported by this module) contains C<$port> while
424 root 1.50 executing the callback. Runtime errors during callback execution will
425     result in the port being C<kil>ed.
426 root 1.22
427 root 1.133 The default callback receives all messages not matched by a more specific
428 root 1.50 C<tag> match.
429 root 1.22
430 root 1.50 =item rcv $local_port, tag => $callback->(@msg_without_tag), ...
431 root 1.3
432 root 1.54 Register (or replace) callbacks to be called on messages starting with the
433     given tag on the given port (and return the port), or unregister it (when
434     C<$callback> is C<$undef> or missing). There can only be one callback
435     registered for each tag.
436 root 1.3
437 root 1.50 The original message will be passed to the callback, after the first
438     element (the tag) has been removed. The callback will use the same
439     environment as the default callback (see above).
440 root 1.3
441 root 1.36 Example: create a port and bind receivers on it in one go.
442    
443     my $port = rcv port,
444 root 1.50 msg1 => sub { ... },
445     msg2 => sub { ... },
446 root 1.36 ;
447    
448     Example: create a port, bind receivers and send it in a message elsewhere
449     in one go:
450    
451     snd $otherport, reply =>
452     rcv port,
453 root 1.50 msg1 => sub { ... },
454 root 1.36 ...
455     ;
456    
457 root 1.54 Example: temporarily register a rcv callback for a tag matching some port
458 root 1.102 (e.g. for an rpc reply) and unregister it after a message was received.
459 root 1.54
460     rcv $port, $otherport => sub {
461     my @reply = @_;
462    
463     rcv $SELF, $otherport;
464     };
465    
466 root 1.3 =cut
467    
468     sub rcv($@) {
469 root 1.33 my $port = shift;
470 root 1.75 my ($nodeid, $portid) = split /#/, $port, 2;
471 root 1.3
472 root 1.75 $NODE{$nodeid} == $NODE{""}
473 root 1.33 or Carp::croak "$port: rcv can only be called on local ports, caught";
474 root 1.22
475 root 1.50 while (@_) {
476     if (ref $_[0]) {
477     if (my $self = $PORT_DATA{$portid}) {
478     "AnyEvent::MP::Port" eq ref $self
479     or Carp::croak "$port: rcv can only be called on message matching ports, caught";
480 root 1.33
481 root 1.103 $self->[0] = shift;
482 root 1.50 } else {
483     my $cb = shift;
484     $PORT{$portid} = sub {
485     local $SELF = $port;
486     eval { &$cb }; _self_die if $@;
487     };
488     }
489     } elsif (defined $_[0]) {
490     my $self = $PORT_DATA{$portid} ||= do {
491 root 1.103 my $self = bless [$PORT{$portid} || sub { }, { }, $port], "AnyEvent::MP::Port";
492 root 1.50
493     $PORT{$portid} = sub {
494     local $SELF = $port;
495    
496     if (my $cb = $self->[1]{$_[0]}) {
497     shift;
498     eval { &$cb }; _self_die if $@;
499     } else {
500     &{ $self->[0] };
501 root 1.33 }
502     };
503 root 1.50
504     $self
505 root 1.33 };
506    
507 root 1.50 "AnyEvent::MP::Port" eq ref $self
508     or Carp::croak "$port: rcv can only be called on message matching ports, caught";
509 root 1.22
510 root 1.50 my ($tag, $cb) = splice @_, 0, 2;
511 root 1.33
512 root 1.50 if (defined $cb) {
513     $self->[1]{$tag} = $cb;
514 root 1.33 } else {
515 root 1.50 delete $self->[1]{$tag};
516 root 1.33 }
517 root 1.22 }
518 root 1.3 }
519 root 1.31
520 root 1.33 $port
521 root 1.2 }
522    
523 root 1.101 =item peval $port, $coderef[, @args]
524    
525     Evaluates the given C<$codref> within the contetx of C<$port>, that is,
526     when the code throews an exception the C<$port> will be killed.
527    
528     Any remaining args will be passed to the callback. Any return values will
529     be returned to the caller.
530    
531     This is useful when you temporarily want to execute code in the context of
532     a port.
533    
534     Example: create a port and run some initialisation code in it's context.
535    
536     my $port = port { ... };
537    
538     peval $port, sub {
539     init
540     or die "unable to init";
541     };
542    
543     =cut
544    
545     sub peval($$) {
546     local $SELF = shift;
547     my $cb = shift;
548    
549     if (wantarray) {
550     my @res = eval { &$cb };
551     _self_die if $@;
552     @res
553     } else {
554     my $res = eval { &$cb };
555     _self_die if $@;
556     $res
557     }
558     }
559    
560 root 1.22 =item $closure = psub { BLOCK }
561 root 1.2
562 root 1.22 Remembers C<$SELF> and creates a closure out of the BLOCK. When the
563     closure is executed, sets up the environment in the same way as in C<rcv>
564     callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
565    
566 root 1.101 The effect is basically as if it returned C<< sub { peval $SELF, sub {
567 root 1.114 BLOCK }, @_ } >>.
568 root 1.101
569 root 1.22 This is useful when you register callbacks from C<rcv> callbacks:
570    
571     rcv delayed_reply => sub {
572     my ($delay, @reply) = @_;
573     my $timer = AE::timer $delay, 0, psub {
574     snd @reply, $SELF;
575     };
576     };
577 root 1.3
578 root 1.8 =cut
579 root 1.3
580 root 1.22 sub psub(&) {
581     my $cb = shift;
582 root 1.3
583 root 1.22 my $port = $SELF
584     or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
585 root 1.1
586 root 1.22 sub {
587     local $SELF = $port;
588 root 1.2
589 root 1.22 if (wantarray) {
590     my @res = eval { &$cb };
591     _self_die if $@;
592     @res
593     } else {
594     my $res = eval { &$cb };
595     _self_die if $@;
596     $res
597     }
598     }
599 root 1.2 }
600    
601 root 1.67 =item $guard = mon $port, $cb->(@reason) # call $cb when $port dies
602 root 1.32
603 root 1.67 =item $guard = mon $port, $rcvport # kill $rcvport when $port dies
604 root 1.36
605 root 1.67 =item $guard = mon $port # kill $SELF when $port dies
606 root 1.32
607 root 1.67 =item $guard = mon $port, $rcvport, @msg # send a message when $port dies
608 root 1.32
609 root 1.42 Monitor the given port and do something when the port is killed or
610     messages to it were lost, and optionally return a guard that can be used
611     to stop monitoring again.
612    
613 root 1.36 In the first form (callback), the callback is simply called with any
614     number of C<@reason> elements (no @reason means that the port was deleted
615 root 1.32 "normally"). Note also that I<< the callback B<must> never die >>, so use
616     C<eval> if unsure.
617    
618 root 1.43 In the second form (another port given), the other port (C<$rcvport>)
619 elmex 1.77 will be C<kil>'ed with C<@reason>, if a @reason was specified, i.e. on
620 root 1.36 "normal" kils nothing happens, while under all other conditions, the other
621     port is killed with the same reason.
622 root 1.32
623 root 1.36 The third form (kill self) is the same as the second form, except that
624     C<$rvport> defaults to C<$SELF>.
625    
626     In the last form (message), a message of the form C<@msg, @reason> will be
627     C<snd>.
628 root 1.32
629 root 1.79 Monitoring-actions are one-shot: once messages are lost (and a monitoring
630     alert was raised), they are removed and will not trigger again.
631    
632 root 1.37 As a rule of thumb, monitoring requests should always monitor a port from
633     a local port (or callback). The reason is that kill messages might get
634     lost, just like any other message. Another less obvious reason is that
635 elmex 1.77 even monitoring requests can get lost (for example, when the connection
636 root 1.37 to the other node goes down permanently). When monitoring a port locally
637     these problems do not exist.
638    
639 root 1.79 C<mon> effectively guarantees that, in the absence of hardware failures,
640     after starting the monitor, either all messages sent to the port will
641     arrive, or the monitoring action will be invoked after possible message
642     loss has been detected. No messages will be lost "in between" (after
643     the first lost message no further messages will be received by the
644     port). After the monitoring action was invoked, further messages might get
645     delivered again.
646    
647     Inter-host-connection timeouts and monitoring depend on the transport
648     used. The only transport currently implemented is TCP, and AnyEvent::MP
649     relies on TCP to detect node-downs (this can take 10-15 minutes on a
650 elmex 1.96 non-idle connection, and usually around two hours for idle connections).
651 root 1.79
652     This means that monitoring is good for program errors and cleaning up
653     stuff eventually, but they are no replacement for a timeout when you need
654     to ensure some maximum latency.
655    
656 root 1.32 Example: call a given callback when C<$port> is killed.
657    
658     mon $port, sub { warn "port died because of <@_>\n" };
659    
660     Example: kill ourselves when C<$port> is killed abnormally.
661    
662 root 1.36 mon $port;
663 root 1.32
664 root 1.36 Example: send us a restart message when another C<$port> is killed.
665 root 1.32
666     mon $port, $self => "restart";
667    
668     =cut
669    
670     sub mon {
671 root 1.75 my ($nodeid, $port) = split /#/, shift, 2;
672 root 1.32
673 root 1.75 my $node = $NODE{$nodeid} || add_node $nodeid;
674 root 1.32
675 root 1.41 my $cb = @_ ? shift : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,';
676 root 1.32
677     unless (ref $cb) {
678     if (@_) {
679     # send a kill info message
680 root 1.41 my (@msg) = ($cb, @_);
681 root 1.32 $cb = sub { snd @msg, @_ };
682     } else {
683     # simply kill other port
684     my $port = $cb;
685     $cb = sub { kil $port, @_ if @_ };
686     }
687     }
688    
689     $node->monitor ($port, $cb);
690    
691     defined wantarray
692 root 1.124 and ($cb += 0, Guard::guard { $node->unmonitor ($port, $cb) })
693 root 1.32 }
694    
695     =item $guard = mon_guard $port, $ref, $ref...
696    
697     Monitors the given C<$port> and keeps the passed references. When the port
698     is killed, the references will be freed.
699    
700     Optionally returns a guard that will stop the monitoring.
701    
702     This function is useful when you create e.g. timers or other watchers and
703 root 1.67 want to free them when the port gets killed (note the use of C<psub>):
704 root 1.32
705     $port->rcv (start => sub {
706 root 1.67 my $timer; $timer = mon_guard $port, AE::timer 1, 1, psub {
707 root 1.32 undef $timer if 0.9 < rand;
708     });
709     });
710    
711     =cut
712    
713     sub mon_guard {
714     my ($port, @refs) = @_;
715    
716 root 1.36 #TODO: mon-less form?
717    
718 root 1.32 mon $port, sub { 0 && @refs }
719     }
720    
721 root 1.33 =item kil $port[, @reason]
722 root 1.32
723     Kill the specified port with the given C<@reason>.
724    
725 root 1.107 If no C<@reason> is specified, then the port is killed "normally" -
726     monitor callback will be invoked, but the kil will not cause linked ports
727     (C<mon $mport, $lport> form) to get killed.
728 root 1.32
729 root 1.107 If a C<@reason> is specified, then linked ports (C<mon $mport, $lport>
730     form) get killed with the same reason.
731 root 1.32
732     Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
733     will be reported as reason C<< die => $@ >>.
734    
735     Transport/communication errors are reported as C<< transport_error =>
736     $message >>.
737    
738 root 1.133 Common idioms:
739    
740     # silently remove yourself, do not kill linked ports
741     kil $SELF;
742    
743     # report a failure in some detail
744     kil $SELF, failure_mode_1 => "it failed with too high temperature";
745    
746     # do not waste much time with killing, just die when something goes wrong
747     open my $fh, "<file"
748     or die "file: $!";
749 root 1.38
750     =item $port = spawn $node, $initfunc[, @initdata]
751    
752     Creates a port on the node C<$node> (which can also be a port ID, in which
753     case it's the node where that port resides).
754    
755 root 1.67 The port ID of the newly created port is returned immediately, and it is
756     possible to immediately start sending messages or to monitor the port.
757 root 1.38
758 root 1.67 After the port has been created, the init function is called on the remote
759     node, in the same context as a C<rcv> callback. This function must be a
760     fully-qualified function name (e.g. C<MyApp::Chat::Server::init>). To
761     specify a function in the main program, use C<::name>.
762 root 1.38
763     If the function doesn't exist, then the node tries to C<require>
764     the package, then the package above the package and so on (e.g.
765     C<MyApp::Chat::Server>, C<MyApp::Chat>, C<MyApp>) until the function
766     exists or it runs out of package names.
767    
768     The init function is then called with the newly-created port as context
769 root 1.82 object (C<$SELF>) and the C<@initdata> values as arguments. It I<must>
770     call one of the C<rcv> functions to set callbacks on C<$SELF>, otherwise
771     the port might not get created.
772 root 1.38
773 root 1.67 A common idiom is to pass a local port, immediately monitor the spawned
774     port, and in the remote init function, immediately monitor the passed
775     local port. This two-way monitoring ensures that both ports get cleaned up
776     when there is a problem.
777 root 1.38
778 root 1.80 C<spawn> guarantees that the C<$initfunc> has no visible effects on the
779     caller before C<spawn> returns (by delaying invocation when spawn is
780     called for the local node).
781    
782 root 1.38 Example: spawn a chat server port on C<$othernode>.
783    
784     # this node, executed from within a port context:
785     my $server = spawn $othernode, "MyApp::Chat::Server::connect", $SELF;
786     mon $server;
787    
788     # init function on C<$othernode>
789     sub connect {
790     my ($srcport) = @_;
791    
792     mon $srcport;
793    
794     rcv $SELF, sub {
795     ...
796     };
797     }
798    
799     =cut
800    
801     sub _spawn {
802     my $port = shift;
803     my $init = shift;
804    
805 root 1.82 # rcv will create the actual port
806 root 1.38 local $SELF = "$NODE#$port";
807     eval {
808     &{ load_func $init }
809     };
810     _self_die if $@;
811     }
812    
813     sub spawn(@) {
814 root 1.75 my ($nodeid, undef) = split /#/, shift, 2;
815 root 1.38
816 root 1.123 my $id = $RUNIQ . ++$ID;
817 root 1.38
818 root 1.39 $_[0] =~ /::/
819     or Carp::croak "spawn init function must be a fully-qualified name, caught";
820    
821 root 1.75 snd_to_func $nodeid, "AnyEvent::MP::_spawn" => $id, @_;
822 root 1.38
823 root 1.75 "$nodeid#$id"
824 root 1.38 }
825    
826 root 1.121
827 root 1.59 =item after $timeout, @msg
828    
829     =item after $timeout, $callback
830    
831     Either sends the given message, or call the given callback, after the
832     specified number of seconds.
833    
834 root 1.67 This is simply a utility function that comes in handy at times - the
835     AnyEvent::MP author is not convinced of the wisdom of having it, though,
836     so it may go away in the future.
837 root 1.59
838     =cut
839    
840     sub after($@) {
841     my ($timeout, @action) = @_;
842    
843     my $t; $t = AE::timer $timeout, 0, sub {
844     undef $t;
845     ref $action[0]
846     ? $action[0]()
847     : snd @action;
848     };
849     }
850    
851 root 1.129 #=item $cb2 = timeout $seconds, $cb[, @args]
852    
853 root 1.87 =item cal $port, @msg, $callback[, $timeout]
854    
855     A simple form of RPC - sends a message to the given C<$port> with the
856     given contents (C<@msg>), but adds a reply port to the message.
857    
858     The reply port is created temporarily just for the purpose of receiving
859     the reply, and will be C<kil>ed when no longer needed.
860    
861     A reply message sent to the port is passed to the C<$callback> as-is.
862    
863     If an optional time-out (in seconds) is given and it is not C<undef>,
864     then the callback will be called without any arguments after the time-out
865     elapsed and the port is C<kil>ed.
866    
867 root 1.98 If no time-out is given (or it is C<undef>), then the local port will
868     monitor the remote port instead, so it eventually gets cleaned-up.
869 root 1.87
870     Currently this function returns the temporary port, but this "feature"
871     might go in future versions unless you can make a convincing case that
872     this is indeed useful for something.
873    
874     =cut
875    
876     sub cal(@) {
877     my $timeout = ref $_[-1] ? undef : pop;
878     my $cb = pop;
879    
880     my $port = port {
881     undef $timeout;
882     kil $SELF;
883     &$cb;
884     };
885    
886     if (defined $timeout) {
887     $timeout = AE::timer $timeout, 0, sub {
888     undef $timeout;
889     kil $port;
890     $cb->();
891     };
892     } else {
893     mon $_[0], sub {
894     kil $port;
895     $cb->();
896     };
897     }
898    
899     push @_, $port;
900     &snd;
901    
902     $port
903     }
904    
905 root 1.8 =back
906    
907 root 1.124 =head1 DISTRIBUTED DATABASE
908    
909     AnyEvent::MP comes with a simple distributed database. The database will
910 root 1.131 be mirrored asynchronously on all global nodes. Other nodes bind to one
911     of the global nodes for their needs. Every node has a "local database"
912     which contains all the values that are set locally. All local databases
913     are merged together to form the global database, which can be queried.
914    
915     The database structure is that of a two-level hash - the database hash
916     contains hashes which contain values, similarly to a perl hash of hashes,
917     i.e.:
918 root 1.124
919 root 1.131 $DATABASE{$family}{$subkey} = $value
920 root 1.124
921     The top level hash key is called "family", and the second-level hash key
922 root 1.126 is called "subkey" or simply "key".
923 root 1.124
924 root 1.125 The family must be alphanumeric, i.e. start with a letter and consist
925     of letters, digits, underscores and colons (C<[A-Za-z][A-Za-z0-9_:]*>,
926     pretty much like Perl module names.
927 root 1.124
928 root 1.125 As the family namespace is global, it is recommended to prefix family names
929 root 1.124 with the name of the application or module using it.
930    
931 root 1.126 The subkeys must be non-empty strings, with no further restrictions.
932 root 1.125
933 root 1.124 The values should preferably be strings, but other perl scalars should
934 root 1.131 work as well (such as C<undef>, arrays and hashes).
935 root 1.124
936 root 1.126 Every database entry is owned by one node - adding the same family/subkey
937 root 1.124 combination on multiple nodes will not cause discomfort for AnyEvent::MP,
938     but the result might be nondeterministic, i.e. the key might have
939     different values on different nodes.
940    
941 root 1.126 Different subkeys in the same family can be owned by different nodes
942     without problems, and in fact, this is the common method to create worker
943     pools. For example, a worker port for image scaling might do this:
944 root 1.124
945 root 1.126 db_set my_image_scalers => $port;
946 root 1.124
947 root 1.126 And clients looking for an image scaler will want to get the
948 root 1.129 C<my_image_scalers> keys from time to time:
949    
950     db_keys my_image_scalers => sub {
951     @ports = @{ $_[0] };
952     };
953    
954     Or better yet, they want to monitor the database family, so they always
955     have a reasonable up-to-date copy:
956    
957     db_mon my_image_scalers => sub {
958     @ports = keys %{ $_[0] };
959     };
960    
961     In general, you can set or delete single subkeys, but query and monitor
962     whole families only.
963 root 1.126
964 root 1.129 If you feel the need to monitor or query a single subkey, try giving it
965     it's own family.
966 root 1.126
967     =over
968    
969     =item db_set $family => $subkey [=> $value]
970    
971     Sets (or replaces) a key to the database - if C<$value> is omitted,
972     C<undef> is used instead.
973    
974 root 1.130 =item db_del $family => $subkey...
975 root 1.124
976 root 1.130 Deletes one or more subkeys from the database family.
977 root 1.124
978 root 1.126 =item $guard = db_reg $family => $subkey [=> $value]
979 root 1.124
980     Sets the key on the database and returns a guard. When the guard is
981     destroyed, the key is deleted from the database. If C<$value> is missing,
982     then C<undef> is used.
983    
984 root 1.129 =item db_family $family => $cb->(\%familyhash)
985    
986     Queries the named database C<$family> and call the callback with the
987     family represented as a hash. You can keep and freely modify the hash.
988    
989     =item db_keys $family => $cb->(\@keys)
990    
991     Same as C<db_family>, except it only queries the family I<subkeys> and passes
992     them as array reference to the callback.
993    
994     =item db_values $family => $cb->(\@values)
995    
996     Same as C<db_family>, except it only queries the family I<values> and passes them
997     as array reference to the callback.
998    
999 root 1.130 =item $guard = db_mon $family => $cb->($familyhash, \@added, \@changed, \@deleted)
1000 root 1.128
1001 root 1.130 Creates a monitor on the given database family. Each time a key is set
1002     or or is deleted the callback is called with a hash containing the
1003     database family and three lists of added, changed and deleted subkeys,
1004     respectively. If no keys have changed then the array reference might be
1005     C<undef> or even missing.
1006    
1007 root 1.132 If not called in void context, a guard object is returned that, when
1008     destroyed, stops the monitor.
1009    
1010 root 1.130 The family hash reference and the key arrays belong to AnyEvent::MP and
1011     B<must not be modified or stored> by the callback. When in doubt, make a
1012     copy.
1013    
1014     As soon as possible after the monitoring starts, the callback will be
1015     called with the intiial contents of the family, even if it is empty,
1016     i.e. there will always be a timely call to the callback with the current
1017     contents.
1018 root 1.128
1019     It is possible that the callback is called with a change event even though
1020     the subkey is already present and the value has not changed.
1021    
1022     The monitoring stops when the guard object is destroyed.
1023    
1024     Example: on every change to the family "mygroup", print out all keys.
1025    
1026     my $guard = db_mon mygroup => sub {
1027 root 1.130 my ($family, $a, $c, $d) = @_;
1028 root 1.128 print "mygroup members: ", (join " ", keys %$family), "\n";
1029     };
1030    
1031     Exmaple: wait until the family "My::Module::workers" is non-empty.
1032    
1033     my $guard; $guard = db_mon My::Module::workers => sub {
1034 root 1.130 my ($family, $a, $c, $d) = @_;
1035 root 1.128 return unless %$family;
1036     undef $guard;
1037     print "My::Module::workers now nonempty\n";
1038     };
1039    
1040     Example: print all changes to the family "AnyRvent::Fantasy::Module".
1041    
1042     my $guard = db_mon AnyRvent::Fantasy::Module => sub {
1043 root 1.130 my ($family, $a, $c, $d) = @_;
1044 root 1.128
1045 root 1.130 print "+$_=$family->{$_}\n" for @$a;
1046     print "*$_=$family->{$_}\n" for @$c;
1047     print "-$_=$family->{$_}\n" for @$d;
1048 root 1.128 };
1049    
1050 root 1.124 =cut
1051    
1052     =back
1053    
1054 root 1.26 =head1 AnyEvent::MP vs. Distributed Erlang
1055    
1056 root 1.35 AnyEvent::MP got lots of its ideas from distributed Erlang (Erlang node
1057     == aemp node, Erlang process == aemp port), so many of the documents and
1058     programming techniques employed by Erlang apply to AnyEvent::MP. Here is a
1059 root 1.27 sample:
1060    
1061 root 1.95 http://www.erlang.se/doc/programming_rules.shtml
1062     http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4
1063     http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6
1064     http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5
1065 root 1.27
1066     Despite the similarities, there are also some important differences:
1067 root 1.26
1068     =over 4
1069    
1070 root 1.65 =item * Node IDs are arbitrary strings in AEMP.
1071 root 1.26
1072 root 1.65 Erlang relies on special naming and DNS to work everywhere in the same
1073     way. AEMP relies on each node somehow knowing its own address(es) (e.g. by
1074 root 1.99 configuration or DNS), and possibly the addresses of some seed nodes, but
1075     will otherwise discover other nodes (and their IDs) itself.
1076 root 1.27
1077 root 1.54 =item * Erlang has a "remote ports are like local ports" philosophy, AEMP
1078 root 1.51 uses "local ports are like remote ports".
1079    
1080     The failure modes for local ports are quite different (runtime errors
1081     only) then for remote ports - when a local port dies, you I<know> it dies,
1082     when a connection to another node dies, you know nothing about the other
1083     port.
1084    
1085     Erlang pretends remote ports are as reliable as local ports, even when
1086     they are not.
1087    
1088     AEMP encourages a "treat remote ports differently" philosophy, with local
1089     ports being the special case/exception, where transport errors cannot
1090     occur.
1091    
1092 root 1.26 =item * Erlang uses processes and a mailbox, AEMP does not queue.
1093    
1094 root 1.119 Erlang uses processes that selectively receive messages out of order, and
1095     therefore needs a queue. AEMP is event based, queuing messages would serve
1096     no useful purpose. For the same reason the pattern-matching abilities
1097     of AnyEvent::MP are more limited, as there is little need to be able to
1098 elmex 1.77 filter messages without dequeuing them.
1099 root 1.26
1100 root 1.119 This is not a philosophical difference, but simply stems from AnyEvent::MP
1101     being event-based, while Erlang is process-based.
1102    
1103     You cna have a look at L<Coro::MP> for a more Erlang-like process model on
1104     top of AEMP and Coro threads.
1105 root 1.26
1106     =item * Erlang sends are synchronous, AEMP sends are asynchronous.
1107    
1108 root 1.119 Sending messages in Erlang is synchronous and blocks the process until
1109     a conenction has been established and the message sent (and so does not
1110     need a queue that can overflow). AEMP sends return immediately, connection
1111     establishment is handled in the background.
1112 root 1.26
1113 root 1.51 =item * Erlang suffers from silent message loss, AEMP does not.
1114 root 1.26
1115 root 1.99 Erlang implements few guarantees on messages delivery - messages can get
1116     lost without any of the processes realising it (i.e. you send messages a,
1117     b, and c, and the other side only receives messages a and c).
1118 root 1.26
1119 root 1.117 AEMP guarantees (modulo hardware errors) correct ordering, and the
1120     guarantee that after one message is lost, all following ones sent to the
1121     same port are lost as well, until monitoring raises an error, so there are
1122     no silent "holes" in the message sequence.
1123 root 1.26
1124 root 1.119 If you want your software to be very reliable, you have to cope with
1125     corrupted and even out-of-order messages in both Erlang and AEMP. AEMP
1126     simply tries to work better in common error cases, such as when a network
1127     link goes down.
1128    
1129 root 1.26 =item * Erlang can send messages to the wrong port, AEMP does not.
1130    
1131 root 1.119 In Erlang it is quite likely that a node that restarts reuses an Erlang
1132     process ID known to other nodes for a completely different process,
1133     causing messages destined for that process to end up in an unrelated
1134     process.
1135 root 1.26
1136 root 1.119 AEMP does not reuse port IDs, so old messages or old port IDs floating
1137 root 1.26 around in the network will not be sent to an unrelated port.
1138    
1139     =item * Erlang uses unprotected connections, AEMP uses secure
1140     authentication and can use TLS.
1141    
1142 root 1.66 AEMP can use a proven protocol - TLS - to protect connections and
1143 root 1.26 securely authenticate nodes.
1144    
1145 root 1.28 =item * The AEMP protocol is optimised for both text-based and binary
1146     communications.
1147    
1148 root 1.66 The AEMP protocol, unlike the Erlang protocol, supports both programming
1149 root 1.119 language independent text-only protocols (good for debugging), and binary,
1150 root 1.67 language-specific serialisers (e.g. Storable). By default, unless TLS is
1151     used, the protocol is actually completely text-based.
1152 root 1.28
1153     It has also been carefully designed to be implementable in other languages
1154 root 1.66 with a minimum of work while gracefully degrading functionality to make the
1155 root 1.28 protocol simple.
1156    
1157 root 1.35 =item * AEMP has more flexible monitoring options than Erlang.
1158    
1159 root 1.119 In Erlang, you can chose to receive I<all> exit signals as messages or
1160     I<none>, there is no in-between, so monitoring single Erlang processes is
1161     difficult to implement.
1162    
1163     Monitoring in AEMP is more flexible than in Erlang, as one can choose
1164     between automatic kill, exit message or callback on a per-port basis.
1165 root 1.35
1166 root 1.37 =item * Erlang tries to hide remote/local connections, AEMP does not.
1167 root 1.35
1168 root 1.67 Monitoring in Erlang is not an indicator of process death/crashes, in the
1169     same way as linking is (except linking is unreliable in Erlang).
1170 root 1.37
1171     In AEMP, you don't "look up" registered port names or send to named ports
1172     that might or might not be persistent. Instead, you normally spawn a port
1173 root 1.67 on the remote node. The init function monitors you, and you monitor the
1174     remote port. Since both monitors are local to the node, they are much more
1175     reliable (no need for C<spawn_link>).
1176 root 1.37
1177     This also saves round-trips and avoids sending messages to the wrong port
1178     (hard to do in Erlang).
1179 root 1.35
1180 root 1.26 =back
1181    
1182 root 1.46 =head1 RATIONALE
1183    
1184     =over 4
1185    
1186 root 1.67 =item Why strings for port and node IDs, why not objects?
1187 root 1.46
1188     We considered "objects", but found that the actual number of methods
1189 root 1.67 that can be called are quite low. Since port and node IDs travel over
1190 root 1.46 the network frequently, the serialising/deserialising would add lots of
1191 root 1.67 overhead, as well as having to keep a proxy object everywhere.
1192 root 1.46
1193     Strings can easily be printed, easily serialised etc. and need no special
1194     procedures to be "valid".
1195    
1196 root 1.110 And as a result, a port with just a default receiver consists of a single
1197 root 1.117 code reference stored in a global hash - it can't become much cheaper.
1198 root 1.47
1199 root 1.67 =item Why favour JSON, why not a real serialising format such as Storable?
1200 root 1.46
1201     In fact, any AnyEvent::MP node will happily accept Storable as framing
1202     format, but currently there is no way to make a node use Storable by
1203 root 1.67 default (although all nodes will accept it).
1204 root 1.46
1205     The default framing protocol is JSON because a) JSON::XS is many times
1206     faster for small messages and b) most importantly, after years of
1207     experience we found that object serialisation is causing more problems
1208 root 1.67 than it solves: Just like function calls, objects simply do not travel
1209 root 1.46 easily over the network, mostly because they will always be a copy, so you
1210     always have to re-think your design.
1211    
1212     Keeping your messages simple, concentrating on data structures rather than
1213     objects, will keep your messages clean, tidy and efficient.
1214    
1215     =back
1216    
1217 root 1.1 =head1 SEE ALSO
1218    
1219 root 1.68 L<AnyEvent::MP::Intro> - a gentle introduction.
1220    
1221     L<AnyEvent::MP::Kernel> - more, lower-level, stuff.
1222    
1223 root 1.113 L<AnyEvent::MP::Global> - network maintenance and port groups, to find
1224 root 1.68 your applications.
1225    
1226 root 1.105 L<AnyEvent::MP::DataConn> - establish data connections between nodes.
1227    
1228 root 1.81 L<AnyEvent::MP::LogCatcher> - simple service to display log messages from
1229     all nodes.
1230    
1231 root 1.1 L<AnyEvent>.
1232    
1233     =head1 AUTHOR
1234    
1235     Marc Lehmann <schmorp@schmorp.de>
1236     http://home.schmorp.de/
1237    
1238     =cut
1239    
1240     1
1241