ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.131
Committed: Fri Mar 9 19:07:53 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.130: +10 -5 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.89 AnyEvent::MP - erlang-style multi-processing/message-passing framework
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.75 $NODE # contains this node's node ID
10     NODE # returns this node's node ID
11 root 1.2
12 root 1.38 $SELF # receiving/own port id in rcv callbacks
13    
14 root 1.48 # initialise the node so it can send/receive messages
15 root 1.72 configure;
16 root 1.48
17 root 1.75 # ports are message destinations
18 root 1.38
19     # sending messages
20 root 1.2 snd $port, type => data...;
21 root 1.38 snd $port, @msg;
22     snd @msg_with_first_element_being_a_port;
23 root 1.2
24 root 1.50 # creating/using ports, the simple way
25 root 1.73 my $simple_port = port { my @msg = @_ };
26 root 1.22
27 root 1.52 # creating/using ports, tagged message matching
28 root 1.38 my $port = port;
29 root 1.73 rcv $port, ping => sub { snd $_[0], "pong" };
30     rcv $port, pong => sub { warn "pong received\n" };
31 root 1.2
32 root 1.48 # create a port on another node
33     my $port = spawn $node, $initfunc, @initdata;
34    
35 root 1.116 # destroy a port again
36 root 1.101 kil $port; # "normal" kill
37     kil $port, my_error => "everything is broken"; # error kill
38    
39 root 1.35 # monitoring
40 root 1.129 mon $port, $cb->(@msg) # callback is invoked on death
41     mon $port, $localport # kill localport on abnormal death
42     mon $port, $localport, @msg # send message on death
43 root 1.35
44 root 1.101 # temporarily execute code in port context
45     peval $port, sub { die "kill the port!" };
46    
47     # execute callbacks in $SELF port context
48     my $timer = AE::timer 1, 0, psub {
49     die "kill the port, delayed";
50     };
51    
52 root 1.45 =head1 CURRENT STATUS
53    
54 root 1.71 bin/aemp - stable.
55     AnyEvent::MP - stable API, should work.
56 elmex 1.77 AnyEvent::MP::Intro - explains most concepts.
57 root 1.88 AnyEvent::MP::Kernel - mostly stable API.
58     AnyEvent::MP::Global - stable API.
59 root 1.45
60 root 1.1 =head1 DESCRIPTION
61    
62 root 1.2 This module (-family) implements a simple message passing framework.
63    
64     Despite its simplicity, you can securely message other processes running
65 root 1.67 on the same or other hosts, and you can supervise entities remotely.
66 root 1.2
67 root 1.23 For an introduction to this module family, see the L<AnyEvent::MP::Intro>
68 root 1.67 manual page and the examples under F<eg/>.
69 root 1.23
70 root 1.2 =head1 CONCEPTS
71    
72     =over 4
73    
74     =item port
75    
76 root 1.79 Not to be confused with a TCP port, a "port" is something you can send
77     messages to (with the C<snd> function).
78 root 1.29
79 root 1.53 Ports allow you to register C<rcv> handlers that can match all or just
80 root 1.64 some messages. Messages send to ports will not be queued, regardless of
81     anything was listening for them or not.
82 root 1.2
83 root 1.119 Ports are represented by (printable) strings called "port IDs".
84    
85 root 1.67 =item port ID - C<nodeid#portname>
86 root 1.2
87 root 1.123 A port ID is the concatenation of a node ID, a hash-mark (C<#>)
88     as separator, and a port name (a printable string of unspecified
89     format created by AnyEvent::MP).
90 root 1.2
91     =item node
92    
93 root 1.53 A node is a single process containing at least one port - the node port,
94 root 1.67 which enables nodes to manage each other remotely, and to create new
95 root 1.53 ports.
96 root 1.2
97 root 1.67 Nodes are either public (have one or more listening ports) or private
98     (no listening ports). Private nodes cannot talk to other private nodes
99 root 1.119 currently, but all nodes can talk to public nodes.
100    
101     Nodes is represented by (printable) strings called "node IDs".
102 root 1.2
103 root 1.117 =item node ID - C<[A-Za-z0-9_\-.:]*>
104 root 1.2
105 root 1.64 A node ID is a string that uniquely identifies the node within a
106     network. Depending on the configuration used, node IDs can look like a
107     hostname, a hostname and a port, or a random string. AnyEvent::MP itself
108 root 1.119 doesn't interpret node IDs in any way except to uniquely identify a node.
109 root 1.64
110     =item binds - C<ip:port>
111    
112     Nodes can only talk to each other by creating some kind of connection to
113     each other. To do this, nodes should listen on one or more local transport
114 root 1.119 endpoints - binds.
115    
116     Currently, only standard C<ip:port> specifications can be used, which
117     specify TCP ports to listen on. So a bind is basically just a tcp socket
118     in listening mode thta accepts conenctions form other nodes.
119 root 1.64
120 root 1.83 =item seed nodes
121 root 1.64
122 root 1.119 When a node starts, it knows nothing about the network it is in - it
123     needs to connect to at least one other node that is already in the
124     network. These other nodes are called "seed nodes".
125    
126     Seed nodes themselves are not special - they are seed nodes only because
127     some other node I<uses> them as such, but any node can be used as seed
128     node for other nodes, and eahc node cna use a different set of seed nodes.
129 root 1.83
130     In addition to discovering the network, seed nodes are also used to
131 root 1.119 maintain the network - all nodes using the same seed node form are part of
132     the same network. If a network is split into multiple subnets because e.g.
133     the network link between the parts goes down, then using the same seed
134     nodes for all nodes ensures that eventually the subnets get merged again.
135 root 1.83
136     Seed nodes are expected to be long-running, and at least one seed node
137 root 1.85 should always be available. They should also be relatively responsive - a
138     seed node that blocks for long periods will slow down everybody else.
139 root 1.83
140 root 1.119 For small networks, it's best if every node uses the same set of seed
141     nodes. For large networks, it can be useful to specify "regional" seed
142     nodes for most nodes in an area, and use all seed nodes as seed nodes for
143     each other. What's important is that all seed nodes connections form a
144     complete graph, so that the network cannot split into separate subnets
145     forever.
146    
147     Seed nodes are represented by seed IDs.
148    
149     =item seed IDs - C<host:port>
150 root 1.83
151 root 1.119 Seed IDs are transport endpoint(s) (usually a hostname/IP address and a
152 elmex 1.96 TCP port) of nodes that should be used as seed nodes.
153 root 1.29
154 root 1.119 =item global nodes
155    
156     An AEMP network needs a discovery service - nodes need to know how to
157     connect to other nodes they only know by name. In addition, AEMP offers a
158     distributed "group database", which maps group names to a list of strings
159     - for example, to register worker ports.
160    
161     A network needs at least one global node to work, and allows every node to
162     be a global node.
163    
164     Any node that loads the L<AnyEvent::MP::Global> module becomes a global
165     node and tries to keep connections to all other nodes. So while it can
166     make sense to make every node "global" in small networks, it usually makes
167     sense to only make seed nodes into global nodes in large networks (nodes
168     keep connections to seed nodes and global nodes, so makign them the same
169     reduces overhead).
170 root 1.67
171 root 1.2 =back
172    
173 root 1.3 =head1 VARIABLES/FUNCTIONS
174 root 1.2
175     =over 4
176    
177 root 1.1 =cut
178    
179     package AnyEvent::MP;
180    
181 root 1.121 use AnyEvent::MP::Config ();
182 root 1.44 use AnyEvent::MP::Kernel;
183 root 1.121 use AnyEvent::MP::Kernel qw(%NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID);
184 root 1.2
185 root 1.1 use common::sense;
186    
187 root 1.2 use Carp ();
188    
189 root 1.1 use AE ();
190 root 1.124 use Guard ();
191 root 1.1
192 root 1.2 use base "Exporter";
193    
194 root 1.121 our $VERSION = $AnyEvent::MP::Config::VERSION;
195 root 1.43
196 root 1.8 our @EXPORT = qw(
197 root 1.59 NODE $NODE *SELF node_of after
198 root 1.72 configure
199 root 1.101 snd rcv mon mon_guard kil psub peval spawn cal
200 root 1.22 port
201 root 1.124 db_set db_del db_reg
202 root 1.128 db_mon db_family db_keys db_values
203 root 1.8 );
204 root 1.2
205 root 1.22 our $SELF;
206    
207     sub _self_die() {
208     my $msg = $@;
209     $msg =~ s/\n+$// unless ref $msg;
210     kil $SELF, die => $msg;
211     }
212    
213     =item $thisnode = NODE / $NODE
214    
215 root 1.67 The C<NODE> function returns, and the C<$NODE> variable contains, the node
216 root 1.64 ID of the node running in the current process. This value is initialised by
217 root 1.72 a call to C<configure>.
218 root 1.22
219 root 1.63 =item $nodeid = node_of $port
220 root 1.22
221 root 1.67 Extracts and returns the node ID from a port ID or a node ID.
222 root 1.34
223 root 1.78 =item configure $profile, key => value...
224    
225 root 1.72 =item configure key => value...
226 root 1.34
227 root 1.64 Before a node can talk to other nodes on the network (i.e. enter
228 root 1.72 "distributed mode") it has to configure itself - the minimum a node needs
229 root 1.64 to know is its own name, and optionally it should know the addresses of
230     some other nodes in the network to discover other nodes.
231 root 1.34
232 root 1.121 This function configures a node - it must be called exactly once (or
233     never) before calling other AnyEvent::MP functions.
234    
235 root 1.108 The key/value pairs are basically the same ones as documented for the
236 root 1.127 F<aemp> command line utility (sans the set/del prefix), with these additions:
237 root 1.121
238     =over 4
239    
240     =item norc => $boolean (default false)
241    
242     If true, then the rc file (e.g. F<~/.perl-anyevent-mp>) will I<not>
243     be consulted - all configuraiton options must be specified in the
244     C<configure> call.
245 root 1.108
246 root 1.121 =item force => $boolean (default false)
247    
248     IF true, then the values specified in the C<configure> will take
249     precedence over any values configured via the rc file. The default is for
250     the rc file to override any options specified in the program.
251    
252 root 1.127 =item secure => $pass->($nodeid)
253    
254     In addition to specifying a boolean, you can specify a code reference that
255     is called for every remote execution attempt - the execution request is
256     granted iff the callback returns a true value.
257    
258     See F<semp setsecure> for more info.
259    
260 root 1.121 =back
261 root 1.34
262 root 1.72 =over 4
263    
264     =item step 1, gathering configuration from profiles
265    
266     The function first looks up a profile in the aemp configuration (see the
267     L<aemp> commandline utility). The profile name can be specified via the
268 root 1.78 named C<profile> parameter or can simply be the first parameter). If it is
269     missing, then the nodename (F<uname -n>) will be used as profile name.
270 root 1.34
271 root 1.72 The profile data is then gathered as follows:
272 root 1.69
273 elmex 1.77 First, all remaining key => value pairs (all of which are conveniently
274 root 1.72 undocumented at the moment) will be interpreted as configuration
275     data. Then they will be overwritten by any values specified in the global
276     default configuration (see the F<aemp> utility), then the chain of
277     profiles chosen by the profile name (and any C<parent> attributes).
278    
279     That means that the values specified in the profile have highest priority
280     and the values specified directly via C<configure> have lowest priority,
281     and can only be used to specify defaults.
282 root 1.49
283 root 1.64 If the profile specifies a node ID, then this will become the node ID of
284 root 1.122 this process. If not, then the profile name will be used as node ID, with
285 root 1.126 a unique randoms tring (C</%u>) appended.
286 root 1.122
287 root 1.126 The node ID can contain some C<%> sequences that are expanded: C<%n>
288     is expanded to the local nodename, C<%u> is replaced by a random
289     strign to make the node unique. For example, the F<aemp> commandline
290     utility uses C<aemp/%n/%u> as nodename, which might expand to
291     C<aemp/cerebro/ZQDGSIkRhEZQDGSIkRhE>.
292 root 1.64
293 root 1.72 =item step 2, bind listener sockets
294    
295 root 1.64 The next step is to look up the binds in the profile, followed by binding
296     aemp protocol listeners on all binds specified (it is possible and valid
297     to have no binds, meaning that the node cannot be contacted form the
298     outside. This means the node cannot talk to other nodes that also have no
299     binds, but it can still talk to all "normal" nodes).
300    
301 root 1.70 If the profile does not specify a binds list, then a default of C<*> is
302 root 1.72 used, meaning the node will bind on a dynamically-assigned port on every
303     local IP address it finds.
304    
305     =item step 3, connect to seed nodes
306 root 1.64
307 root 1.119 As the last step, the seed ID list from the profile is passed to the
308 root 1.64 L<AnyEvent::MP::Global> module, which will then use it to keep
309 root 1.72 connectivity with at least one node at any point in time.
310 root 1.64
311 root 1.72 =back
312    
313 root 1.87 Example: become a distributed node using the local node name as profile.
314 root 1.72 This should be the most common form of invocation for "daemon"-type nodes.
315 root 1.34
316 root 1.72 configure
317 root 1.34
318 root 1.126 Example: become a semi-anonymous node. This form is often used for
319     commandline clients.
320 root 1.34
321 root 1.126 configure nodeid => "myscript/%n/%u";
322 root 1.72
323 root 1.120 Example: configure a node using a profile called seed, which is suitable
324 root 1.72 for a seed node as it binds on all local addresses on a fixed port (4040,
325     customary for aemp).
326    
327     # use the aemp commandline utility
328 root 1.122 # aemp profile seed binds '*:4040'
329 root 1.72
330     # then use it
331     configure profile => "seed";
332 root 1.34
333 root 1.72 # or simply use aemp from the shell again:
334     # aemp run profile seed
335 root 1.34
336 root 1.72 # or provide a nicer-to-remember nodeid
337     # aemp run profile seed nodeid "$(hostname)"
338 root 1.34
339 root 1.22 =item $SELF
340    
341     Contains the current port id while executing C<rcv> callbacks or C<psub>
342     blocks.
343 root 1.3
344 root 1.67 =item *SELF, SELF, %SELF, @SELF...
345 root 1.22
346     Due to some quirks in how perl exports variables, it is impossible to
347 root 1.67 just export C<$SELF>, all the symbols named C<SELF> are exported by this
348 root 1.22 module, but only C<$SELF> is currently used.
349 root 1.3
350 root 1.33 =item snd $port, type => @data
351 root 1.3
352 root 1.33 =item snd $port, @msg
353 root 1.3
354 root 1.67 Send the given message to the given port, which can identify either a
355     local or a remote port, and must be a port ID.
356 root 1.8
357 root 1.67 While the message can be almost anything, it is highly recommended to
358     use a string as first element (a port ID, or some word that indicates a
359     request type etc.) and to consist if only simple perl values (scalars,
360     arrays, hashes) - if you think you need to pass an object, think again.
361    
362     The message data logically becomes read-only after a call to this
363     function: modifying any argument (or values referenced by them) is
364     forbidden, as there can be considerable time between the call to C<snd>
365     and the time the message is actually being serialised - in fact, it might
366     never be copied as within the same process it is simply handed to the
367     receiving port.
368 root 1.3
369     The type of data you can transfer depends on the transport protocol: when
370     JSON is used, then only strings, numbers and arrays and hashes consisting
371     of those are allowed (no objects). When Storable is used, then anything
372     that Storable can serialise and deserialise is allowed, and for the local
373 root 1.67 node, anything can be passed. Best rely only on the common denominator of
374     these.
375 root 1.3
376 root 1.22 =item $local_port = port
377 root 1.2
378 root 1.50 Create a new local port object and returns its port ID. Initially it has
379     no callbacks set and will throw an error when it receives messages.
380 root 1.10
381 root 1.50 =item $local_port = port { my @msg = @_ }
382 root 1.15
383 root 1.50 Creates a new local port, and returns its ID. Semantically the same as
384     creating a port and calling C<rcv $port, $callback> on it.
385 root 1.15
386 root 1.50 The block will be called for every message received on the port, with the
387     global variable C<$SELF> set to the port ID. Runtime errors will cause the
388     port to be C<kil>ed. The message will be passed as-is, no extra argument
389     (i.e. no port ID) will be passed to the callback.
390 root 1.15
391 root 1.50 If you want to stop/destroy the port, simply C<kil> it:
392 root 1.15
393 root 1.50 my $port = port {
394     my @msg = @_;
395     ...
396     kil $SELF;
397 root 1.15 };
398 root 1.10
399     =cut
400    
401 root 1.33 sub rcv($@);
402    
403 root 1.50 sub _kilme {
404     die "received message on port without callback";
405     }
406    
407 root 1.22 sub port(;&) {
408 root 1.123 my $id = $UNIQ . ++$ID;
409 root 1.22 my $port = "$NODE#$id";
410    
411 root 1.50 rcv $port, shift || \&_kilme;
412 root 1.10
413 root 1.22 $port
414 root 1.10 }
415    
416 root 1.50 =item rcv $local_port, $callback->(@msg)
417 root 1.31
418 root 1.50 Replaces the default callback on the specified port. There is no way to
419     remove the default callback: use C<sub { }> to disable it, or better
420     C<kil> the port when it is no longer needed.
421 root 1.3
422 root 1.33 The global C<$SELF> (exported by this module) contains C<$port> while
423 root 1.50 executing the callback. Runtime errors during callback execution will
424     result in the port being C<kil>ed.
425 root 1.22
426 root 1.50 The default callback received all messages not matched by a more specific
427     C<tag> match.
428 root 1.22
429 root 1.50 =item rcv $local_port, tag => $callback->(@msg_without_tag), ...
430 root 1.3
431 root 1.54 Register (or replace) callbacks to be called on messages starting with the
432     given tag on the given port (and return the port), or unregister it (when
433     C<$callback> is C<$undef> or missing). There can only be one callback
434     registered for each tag.
435 root 1.3
436 root 1.50 The original message will be passed to the callback, after the first
437     element (the tag) has been removed. The callback will use the same
438     environment as the default callback (see above).
439 root 1.3
440 root 1.36 Example: create a port and bind receivers on it in one go.
441    
442     my $port = rcv port,
443 root 1.50 msg1 => sub { ... },
444     msg2 => sub { ... },
445 root 1.36 ;
446    
447     Example: create a port, bind receivers and send it in a message elsewhere
448     in one go:
449    
450     snd $otherport, reply =>
451     rcv port,
452 root 1.50 msg1 => sub { ... },
453 root 1.36 ...
454     ;
455    
456 root 1.54 Example: temporarily register a rcv callback for a tag matching some port
457 root 1.102 (e.g. for an rpc reply) and unregister it after a message was received.
458 root 1.54
459     rcv $port, $otherport => sub {
460     my @reply = @_;
461    
462     rcv $SELF, $otherport;
463     };
464    
465 root 1.3 =cut
466    
467     sub rcv($@) {
468 root 1.33 my $port = shift;
469 root 1.75 my ($nodeid, $portid) = split /#/, $port, 2;
470 root 1.3
471 root 1.75 $NODE{$nodeid} == $NODE{""}
472 root 1.33 or Carp::croak "$port: rcv can only be called on local ports, caught";
473 root 1.22
474 root 1.50 while (@_) {
475     if (ref $_[0]) {
476     if (my $self = $PORT_DATA{$portid}) {
477     "AnyEvent::MP::Port" eq ref $self
478     or Carp::croak "$port: rcv can only be called on message matching ports, caught";
479 root 1.33
480 root 1.103 $self->[0] = shift;
481 root 1.50 } else {
482     my $cb = shift;
483     $PORT{$portid} = sub {
484     local $SELF = $port;
485     eval { &$cb }; _self_die if $@;
486     };
487     }
488     } elsif (defined $_[0]) {
489     my $self = $PORT_DATA{$portid} ||= do {
490 root 1.103 my $self = bless [$PORT{$portid} || sub { }, { }, $port], "AnyEvent::MP::Port";
491 root 1.50
492     $PORT{$portid} = sub {
493     local $SELF = $port;
494    
495     if (my $cb = $self->[1]{$_[0]}) {
496     shift;
497     eval { &$cb }; _self_die if $@;
498     } else {
499     &{ $self->[0] };
500 root 1.33 }
501     };
502 root 1.50
503     $self
504 root 1.33 };
505    
506 root 1.50 "AnyEvent::MP::Port" eq ref $self
507     or Carp::croak "$port: rcv can only be called on message matching ports, caught";
508 root 1.22
509 root 1.50 my ($tag, $cb) = splice @_, 0, 2;
510 root 1.33
511 root 1.50 if (defined $cb) {
512     $self->[1]{$tag} = $cb;
513 root 1.33 } else {
514 root 1.50 delete $self->[1]{$tag};
515 root 1.33 }
516 root 1.22 }
517 root 1.3 }
518 root 1.31
519 root 1.33 $port
520 root 1.2 }
521    
522 root 1.101 =item peval $port, $coderef[, @args]
523    
524     Evaluates the given C<$codref> within the contetx of C<$port>, that is,
525     when the code throews an exception the C<$port> will be killed.
526    
527     Any remaining args will be passed to the callback. Any return values will
528     be returned to the caller.
529    
530     This is useful when you temporarily want to execute code in the context of
531     a port.
532    
533     Example: create a port and run some initialisation code in it's context.
534    
535     my $port = port { ... };
536    
537     peval $port, sub {
538     init
539     or die "unable to init";
540     };
541    
542     =cut
543    
544     sub peval($$) {
545     local $SELF = shift;
546     my $cb = shift;
547    
548     if (wantarray) {
549     my @res = eval { &$cb };
550     _self_die if $@;
551     @res
552     } else {
553     my $res = eval { &$cb };
554     _self_die if $@;
555     $res
556     }
557     }
558    
559 root 1.22 =item $closure = psub { BLOCK }
560 root 1.2
561 root 1.22 Remembers C<$SELF> and creates a closure out of the BLOCK. When the
562     closure is executed, sets up the environment in the same way as in C<rcv>
563     callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
564    
565 root 1.101 The effect is basically as if it returned C<< sub { peval $SELF, sub {
566 root 1.114 BLOCK }, @_ } >>.
567 root 1.101
568 root 1.22 This is useful when you register callbacks from C<rcv> callbacks:
569    
570     rcv delayed_reply => sub {
571     my ($delay, @reply) = @_;
572     my $timer = AE::timer $delay, 0, psub {
573     snd @reply, $SELF;
574     };
575     };
576 root 1.3
577 root 1.8 =cut
578 root 1.3
579 root 1.22 sub psub(&) {
580     my $cb = shift;
581 root 1.3
582 root 1.22 my $port = $SELF
583     or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
584 root 1.1
585 root 1.22 sub {
586     local $SELF = $port;
587 root 1.2
588 root 1.22 if (wantarray) {
589     my @res = eval { &$cb };
590     _self_die if $@;
591     @res
592     } else {
593     my $res = eval { &$cb };
594     _self_die if $@;
595     $res
596     }
597     }
598 root 1.2 }
599    
600 root 1.67 =item $guard = mon $port, $cb->(@reason) # call $cb when $port dies
601 root 1.32
602 root 1.67 =item $guard = mon $port, $rcvport # kill $rcvport when $port dies
603 root 1.36
604 root 1.67 =item $guard = mon $port # kill $SELF when $port dies
605 root 1.32
606 root 1.67 =item $guard = mon $port, $rcvport, @msg # send a message when $port dies
607 root 1.32
608 root 1.42 Monitor the given port and do something when the port is killed or
609     messages to it were lost, and optionally return a guard that can be used
610     to stop monitoring again.
611    
612 root 1.36 In the first form (callback), the callback is simply called with any
613     number of C<@reason> elements (no @reason means that the port was deleted
614 root 1.32 "normally"). Note also that I<< the callback B<must> never die >>, so use
615     C<eval> if unsure.
616    
617 root 1.43 In the second form (another port given), the other port (C<$rcvport>)
618 elmex 1.77 will be C<kil>'ed with C<@reason>, if a @reason was specified, i.e. on
619 root 1.36 "normal" kils nothing happens, while under all other conditions, the other
620     port is killed with the same reason.
621 root 1.32
622 root 1.36 The third form (kill self) is the same as the second form, except that
623     C<$rvport> defaults to C<$SELF>.
624    
625     In the last form (message), a message of the form C<@msg, @reason> will be
626     C<snd>.
627 root 1.32
628 root 1.79 Monitoring-actions are one-shot: once messages are lost (and a monitoring
629     alert was raised), they are removed and will not trigger again.
630    
631 root 1.37 As a rule of thumb, monitoring requests should always monitor a port from
632     a local port (or callback). The reason is that kill messages might get
633     lost, just like any other message. Another less obvious reason is that
634 elmex 1.77 even monitoring requests can get lost (for example, when the connection
635 root 1.37 to the other node goes down permanently). When monitoring a port locally
636     these problems do not exist.
637    
638 root 1.79 C<mon> effectively guarantees that, in the absence of hardware failures,
639     after starting the monitor, either all messages sent to the port will
640     arrive, or the monitoring action will be invoked after possible message
641     loss has been detected. No messages will be lost "in between" (after
642     the first lost message no further messages will be received by the
643     port). After the monitoring action was invoked, further messages might get
644     delivered again.
645    
646     Inter-host-connection timeouts and monitoring depend on the transport
647     used. The only transport currently implemented is TCP, and AnyEvent::MP
648     relies on TCP to detect node-downs (this can take 10-15 minutes on a
649 elmex 1.96 non-idle connection, and usually around two hours for idle connections).
650 root 1.79
651     This means that monitoring is good for program errors and cleaning up
652     stuff eventually, but they are no replacement for a timeout when you need
653     to ensure some maximum latency.
654    
655 root 1.32 Example: call a given callback when C<$port> is killed.
656    
657     mon $port, sub { warn "port died because of <@_>\n" };
658    
659     Example: kill ourselves when C<$port> is killed abnormally.
660    
661 root 1.36 mon $port;
662 root 1.32
663 root 1.36 Example: send us a restart message when another C<$port> is killed.
664 root 1.32
665     mon $port, $self => "restart";
666    
667     =cut
668    
669     sub mon {
670 root 1.75 my ($nodeid, $port) = split /#/, shift, 2;
671 root 1.32
672 root 1.75 my $node = $NODE{$nodeid} || add_node $nodeid;
673 root 1.32
674 root 1.41 my $cb = @_ ? shift : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,';
675 root 1.32
676     unless (ref $cb) {
677     if (@_) {
678     # send a kill info message
679 root 1.41 my (@msg) = ($cb, @_);
680 root 1.32 $cb = sub { snd @msg, @_ };
681     } else {
682     # simply kill other port
683     my $port = $cb;
684     $cb = sub { kil $port, @_ if @_ };
685     }
686     }
687    
688     $node->monitor ($port, $cb);
689    
690     defined wantarray
691 root 1.124 and ($cb += 0, Guard::guard { $node->unmonitor ($port, $cb) })
692 root 1.32 }
693    
694     =item $guard = mon_guard $port, $ref, $ref...
695    
696     Monitors the given C<$port> and keeps the passed references. When the port
697     is killed, the references will be freed.
698    
699     Optionally returns a guard that will stop the monitoring.
700    
701     This function is useful when you create e.g. timers or other watchers and
702 root 1.67 want to free them when the port gets killed (note the use of C<psub>):
703 root 1.32
704     $port->rcv (start => sub {
705 root 1.67 my $timer; $timer = mon_guard $port, AE::timer 1, 1, psub {
706 root 1.32 undef $timer if 0.9 < rand;
707     });
708     });
709    
710     =cut
711    
712     sub mon_guard {
713     my ($port, @refs) = @_;
714    
715 root 1.36 #TODO: mon-less form?
716    
717 root 1.32 mon $port, sub { 0 && @refs }
718     }
719    
720 root 1.33 =item kil $port[, @reason]
721 root 1.32
722     Kill the specified port with the given C<@reason>.
723    
724 root 1.107 If no C<@reason> is specified, then the port is killed "normally" -
725     monitor callback will be invoked, but the kil will not cause linked ports
726     (C<mon $mport, $lport> form) to get killed.
727 root 1.32
728 root 1.107 If a C<@reason> is specified, then linked ports (C<mon $mport, $lport>
729     form) get killed with the same reason.
730 root 1.32
731     Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
732     will be reported as reason C<< die => $@ >>.
733    
734     Transport/communication errors are reported as C<< transport_error =>
735     $message >>.
736    
737 root 1.38 =cut
738    
739     =item $port = spawn $node, $initfunc[, @initdata]
740    
741     Creates a port on the node C<$node> (which can also be a port ID, in which
742     case it's the node where that port resides).
743    
744 root 1.67 The port ID of the newly created port is returned immediately, and it is
745     possible to immediately start sending messages or to monitor the port.
746 root 1.38
747 root 1.67 After the port has been created, the init function is called on the remote
748     node, in the same context as a C<rcv> callback. This function must be a
749     fully-qualified function name (e.g. C<MyApp::Chat::Server::init>). To
750     specify a function in the main program, use C<::name>.
751 root 1.38
752     If the function doesn't exist, then the node tries to C<require>
753     the package, then the package above the package and so on (e.g.
754     C<MyApp::Chat::Server>, C<MyApp::Chat>, C<MyApp>) until the function
755     exists or it runs out of package names.
756    
757     The init function is then called with the newly-created port as context
758 root 1.82 object (C<$SELF>) and the C<@initdata> values as arguments. It I<must>
759     call one of the C<rcv> functions to set callbacks on C<$SELF>, otherwise
760     the port might not get created.
761 root 1.38
762 root 1.67 A common idiom is to pass a local port, immediately monitor the spawned
763     port, and in the remote init function, immediately monitor the passed
764     local port. This two-way monitoring ensures that both ports get cleaned up
765     when there is a problem.
766 root 1.38
767 root 1.80 C<spawn> guarantees that the C<$initfunc> has no visible effects on the
768     caller before C<spawn> returns (by delaying invocation when spawn is
769     called for the local node).
770    
771 root 1.38 Example: spawn a chat server port on C<$othernode>.
772    
773     # this node, executed from within a port context:
774     my $server = spawn $othernode, "MyApp::Chat::Server::connect", $SELF;
775     mon $server;
776    
777     # init function on C<$othernode>
778     sub connect {
779     my ($srcport) = @_;
780    
781     mon $srcport;
782    
783     rcv $SELF, sub {
784     ...
785     };
786     }
787    
788     =cut
789    
790     sub _spawn {
791     my $port = shift;
792     my $init = shift;
793    
794 root 1.82 # rcv will create the actual port
795 root 1.38 local $SELF = "$NODE#$port";
796     eval {
797     &{ load_func $init }
798     };
799     _self_die if $@;
800     }
801    
802     sub spawn(@) {
803 root 1.75 my ($nodeid, undef) = split /#/, shift, 2;
804 root 1.38
805 root 1.123 my $id = $RUNIQ . ++$ID;
806 root 1.38
807 root 1.39 $_[0] =~ /::/
808     or Carp::croak "spawn init function must be a fully-qualified name, caught";
809    
810 root 1.75 snd_to_func $nodeid, "AnyEvent::MP::_spawn" => $id, @_;
811 root 1.38
812 root 1.75 "$nodeid#$id"
813 root 1.38 }
814    
815 root 1.121
816 root 1.59 =item after $timeout, @msg
817    
818     =item after $timeout, $callback
819    
820     Either sends the given message, or call the given callback, after the
821     specified number of seconds.
822    
823 root 1.67 This is simply a utility function that comes in handy at times - the
824     AnyEvent::MP author is not convinced of the wisdom of having it, though,
825     so it may go away in the future.
826 root 1.59
827     =cut
828    
829     sub after($@) {
830     my ($timeout, @action) = @_;
831    
832     my $t; $t = AE::timer $timeout, 0, sub {
833     undef $t;
834     ref $action[0]
835     ? $action[0]()
836     : snd @action;
837     };
838     }
839    
840 root 1.129 #=item $cb2 = timeout $seconds, $cb[, @args]
841    
842 root 1.87 =item cal $port, @msg, $callback[, $timeout]
843    
844     A simple form of RPC - sends a message to the given C<$port> with the
845     given contents (C<@msg>), but adds a reply port to the message.
846    
847     The reply port is created temporarily just for the purpose of receiving
848     the reply, and will be C<kil>ed when no longer needed.
849    
850     A reply message sent to the port is passed to the C<$callback> as-is.
851    
852     If an optional time-out (in seconds) is given and it is not C<undef>,
853     then the callback will be called without any arguments after the time-out
854     elapsed and the port is C<kil>ed.
855    
856 root 1.98 If no time-out is given (or it is C<undef>), then the local port will
857     monitor the remote port instead, so it eventually gets cleaned-up.
858 root 1.87
859     Currently this function returns the temporary port, but this "feature"
860     might go in future versions unless you can make a convincing case that
861     this is indeed useful for something.
862    
863     =cut
864    
865     sub cal(@) {
866     my $timeout = ref $_[-1] ? undef : pop;
867     my $cb = pop;
868    
869     my $port = port {
870     undef $timeout;
871     kil $SELF;
872     &$cb;
873     };
874    
875     if (defined $timeout) {
876     $timeout = AE::timer $timeout, 0, sub {
877     undef $timeout;
878     kil $port;
879     $cb->();
880     };
881     } else {
882     mon $_[0], sub {
883     kil $port;
884     $cb->();
885     };
886     }
887    
888     push @_, $port;
889     &snd;
890    
891     $port
892     }
893    
894 root 1.8 =back
895    
896 root 1.124 =head1 DISTRIBUTED DATABASE
897    
898     AnyEvent::MP comes with a simple distributed database. The database will
899 root 1.131 be mirrored asynchronously on all global nodes. Other nodes bind to one
900     of the global nodes for their needs. Every node has a "local database"
901     which contains all the values that are set locally. All local databases
902     are merged together to form the global database, which can be queried.
903    
904     The database structure is that of a two-level hash - the database hash
905     contains hashes which contain values, similarly to a perl hash of hashes,
906     i.e.:
907 root 1.124
908 root 1.131 $DATABASE{$family}{$subkey} = $value
909 root 1.124
910     The top level hash key is called "family", and the second-level hash key
911 root 1.126 is called "subkey" or simply "key".
912 root 1.124
913 root 1.125 The family must be alphanumeric, i.e. start with a letter and consist
914     of letters, digits, underscores and colons (C<[A-Za-z][A-Za-z0-9_:]*>,
915     pretty much like Perl module names.
916 root 1.124
917 root 1.125 As the family namespace is global, it is recommended to prefix family names
918 root 1.124 with the name of the application or module using it.
919    
920 root 1.126 The subkeys must be non-empty strings, with no further restrictions.
921 root 1.125
922 root 1.124 The values should preferably be strings, but other perl scalars should
923 root 1.131 work as well (such as C<undef>, arrays and hashes).
924 root 1.124
925 root 1.126 Every database entry is owned by one node - adding the same family/subkey
926 root 1.124 combination on multiple nodes will not cause discomfort for AnyEvent::MP,
927     but the result might be nondeterministic, i.e. the key might have
928     different values on different nodes.
929    
930 root 1.126 Different subkeys in the same family can be owned by different nodes
931     without problems, and in fact, this is the common method to create worker
932     pools. For example, a worker port for image scaling might do this:
933 root 1.124
934 root 1.126 db_set my_image_scalers => $port;
935 root 1.124
936 root 1.126 And clients looking for an image scaler will want to get the
937 root 1.129 C<my_image_scalers> keys from time to time:
938    
939     db_keys my_image_scalers => sub {
940     @ports = @{ $_[0] };
941     };
942    
943     Or better yet, they want to monitor the database family, so they always
944     have a reasonable up-to-date copy:
945    
946     db_mon my_image_scalers => sub {
947     @ports = keys %{ $_[0] };
948     };
949    
950     In general, you can set or delete single subkeys, but query and monitor
951     whole families only.
952 root 1.126
953 root 1.129 If you feel the need to monitor or query a single subkey, try giving it
954     it's own family.
955 root 1.126
956     =over
957    
958     =item db_set $family => $subkey [=> $value]
959    
960     Sets (or replaces) a key to the database - if C<$value> is omitted,
961     C<undef> is used instead.
962    
963 root 1.130 =item db_del $family => $subkey...
964 root 1.124
965 root 1.130 Deletes one or more subkeys from the database family.
966 root 1.124
967 root 1.126 =item $guard = db_reg $family => $subkey [=> $value]
968 root 1.124
969     Sets the key on the database and returns a guard. When the guard is
970     destroyed, the key is deleted from the database. If C<$value> is missing,
971     then C<undef> is used.
972    
973 root 1.129 =item db_family $family => $cb->(\%familyhash)
974    
975     Queries the named database C<$family> and call the callback with the
976     family represented as a hash. You can keep and freely modify the hash.
977    
978     =item db_keys $family => $cb->(\@keys)
979    
980     Same as C<db_family>, except it only queries the family I<subkeys> and passes
981     them as array reference to the callback.
982    
983     =item db_values $family => $cb->(\@values)
984    
985     Same as C<db_family>, except it only queries the family I<values> and passes them
986     as array reference to the callback.
987    
988 root 1.130 =item $guard = db_mon $family => $cb->($familyhash, \@added, \@changed, \@deleted)
989 root 1.128
990 root 1.130 Creates a monitor on the given database family. Each time a key is set
991     or or is deleted the callback is called with a hash containing the
992     database family and three lists of added, changed and deleted subkeys,
993     respectively. If no keys have changed then the array reference might be
994     C<undef> or even missing.
995    
996     The family hash reference and the key arrays belong to AnyEvent::MP and
997     B<must not be modified or stored> by the callback. When in doubt, make a
998     copy.
999    
1000     As soon as possible after the monitoring starts, the callback will be
1001     called with the intiial contents of the family, even if it is empty,
1002     i.e. there will always be a timely call to the callback with the current
1003     contents.
1004 root 1.128
1005     It is possible that the callback is called with a change event even though
1006     the subkey is already present and the value has not changed.
1007    
1008     The monitoring stops when the guard object is destroyed.
1009    
1010     Example: on every change to the family "mygroup", print out all keys.
1011    
1012     my $guard = db_mon mygroup => sub {
1013 root 1.130 my ($family, $a, $c, $d) = @_;
1014 root 1.128 print "mygroup members: ", (join " ", keys %$family), "\n";
1015     };
1016    
1017     Exmaple: wait until the family "My::Module::workers" is non-empty.
1018    
1019     my $guard; $guard = db_mon My::Module::workers => sub {
1020 root 1.130 my ($family, $a, $c, $d) = @_;
1021 root 1.128 return unless %$family;
1022     undef $guard;
1023     print "My::Module::workers now nonempty\n";
1024     };
1025    
1026     Example: print all changes to the family "AnyRvent::Fantasy::Module".
1027    
1028     my $guard = db_mon AnyRvent::Fantasy::Module => sub {
1029 root 1.130 my ($family, $a, $c, $d) = @_;
1030 root 1.128
1031 root 1.130 print "+$_=$family->{$_}\n" for @$a;
1032     print "*$_=$family->{$_}\n" for @$c;
1033     print "-$_=$family->{$_}\n" for @$d;
1034 root 1.128 };
1035    
1036 root 1.124 =cut
1037    
1038     =back
1039    
1040 root 1.26 =head1 AnyEvent::MP vs. Distributed Erlang
1041    
1042 root 1.35 AnyEvent::MP got lots of its ideas from distributed Erlang (Erlang node
1043     == aemp node, Erlang process == aemp port), so many of the documents and
1044     programming techniques employed by Erlang apply to AnyEvent::MP. Here is a
1045 root 1.27 sample:
1046    
1047 root 1.95 http://www.erlang.se/doc/programming_rules.shtml
1048     http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4
1049     http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6
1050     http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5
1051 root 1.27
1052     Despite the similarities, there are also some important differences:
1053 root 1.26
1054     =over 4
1055    
1056 root 1.65 =item * Node IDs are arbitrary strings in AEMP.
1057 root 1.26
1058 root 1.65 Erlang relies on special naming and DNS to work everywhere in the same
1059     way. AEMP relies on each node somehow knowing its own address(es) (e.g. by
1060 root 1.99 configuration or DNS), and possibly the addresses of some seed nodes, but
1061     will otherwise discover other nodes (and their IDs) itself.
1062 root 1.27
1063 root 1.54 =item * Erlang has a "remote ports are like local ports" philosophy, AEMP
1064 root 1.51 uses "local ports are like remote ports".
1065    
1066     The failure modes for local ports are quite different (runtime errors
1067     only) then for remote ports - when a local port dies, you I<know> it dies,
1068     when a connection to another node dies, you know nothing about the other
1069     port.
1070    
1071     Erlang pretends remote ports are as reliable as local ports, even when
1072     they are not.
1073    
1074     AEMP encourages a "treat remote ports differently" philosophy, with local
1075     ports being the special case/exception, where transport errors cannot
1076     occur.
1077    
1078 root 1.26 =item * Erlang uses processes and a mailbox, AEMP does not queue.
1079    
1080 root 1.119 Erlang uses processes that selectively receive messages out of order, and
1081     therefore needs a queue. AEMP is event based, queuing messages would serve
1082     no useful purpose. For the same reason the pattern-matching abilities
1083     of AnyEvent::MP are more limited, as there is little need to be able to
1084 elmex 1.77 filter messages without dequeuing them.
1085 root 1.26
1086 root 1.119 This is not a philosophical difference, but simply stems from AnyEvent::MP
1087     being event-based, while Erlang is process-based.
1088    
1089     You cna have a look at L<Coro::MP> for a more Erlang-like process model on
1090     top of AEMP and Coro threads.
1091 root 1.26
1092     =item * Erlang sends are synchronous, AEMP sends are asynchronous.
1093    
1094 root 1.119 Sending messages in Erlang is synchronous and blocks the process until
1095     a conenction has been established and the message sent (and so does not
1096     need a queue that can overflow). AEMP sends return immediately, connection
1097     establishment is handled in the background.
1098 root 1.26
1099 root 1.51 =item * Erlang suffers from silent message loss, AEMP does not.
1100 root 1.26
1101 root 1.99 Erlang implements few guarantees on messages delivery - messages can get
1102     lost without any of the processes realising it (i.e. you send messages a,
1103     b, and c, and the other side only receives messages a and c).
1104 root 1.26
1105 root 1.117 AEMP guarantees (modulo hardware errors) correct ordering, and the
1106     guarantee that after one message is lost, all following ones sent to the
1107     same port are lost as well, until monitoring raises an error, so there are
1108     no silent "holes" in the message sequence.
1109 root 1.26
1110 root 1.119 If you want your software to be very reliable, you have to cope with
1111     corrupted and even out-of-order messages in both Erlang and AEMP. AEMP
1112     simply tries to work better in common error cases, such as when a network
1113     link goes down.
1114    
1115 root 1.26 =item * Erlang can send messages to the wrong port, AEMP does not.
1116    
1117 root 1.119 In Erlang it is quite likely that a node that restarts reuses an Erlang
1118     process ID known to other nodes for a completely different process,
1119     causing messages destined for that process to end up in an unrelated
1120     process.
1121 root 1.26
1122 root 1.119 AEMP does not reuse port IDs, so old messages or old port IDs floating
1123 root 1.26 around in the network will not be sent to an unrelated port.
1124    
1125     =item * Erlang uses unprotected connections, AEMP uses secure
1126     authentication and can use TLS.
1127    
1128 root 1.66 AEMP can use a proven protocol - TLS - to protect connections and
1129 root 1.26 securely authenticate nodes.
1130    
1131 root 1.28 =item * The AEMP protocol is optimised for both text-based and binary
1132     communications.
1133    
1134 root 1.66 The AEMP protocol, unlike the Erlang protocol, supports both programming
1135 root 1.119 language independent text-only protocols (good for debugging), and binary,
1136 root 1.67 language-specific serialisers (e.g. Storable). By default, unless TLS is
1137     used, the protocol is actually completely text-based.
1138 root 1.28
1139     It has also been carefully designed to be implementable in other languages
1140 root 1.66 with a minimum of work while gracefully degrading functionality to make the
1141 root 1.28 protocol simple.
1142    
1143 root 1.35 =item * AEMP has more flexible monitoring options than Erlang.
1144    
1145 root 1.119 In Erlang, you can chose to receive I<all> exit signals as messages or
1146     I<none>, there is no in-between, so monitoring single Erlang processes is
1147     difficult to implement.
1148    
1149     Monitoring in AEMP is more flexible than in Erlang, as one can choose
1150     between automatic kill, exit message or callback on a per-port basis.
1151 root 1.35
1152 root 1.37 =item * Erlang tries to hide remote/local connections, AEMP does not.
1153 root 1.35
1154 root 1.67 Monitoring in Erlang is not an indicator of process death/crashes, in the
1155     same way as linking is (except linking is unreliable in Erlang).
1156 root 1.37
1157     In AEMP, you don't "look up" registered port names or send to named ports
1158     that might or might not be persistent. Instead, you normally spawn a port
1159 root 1.67 on the remote node. The init function monitors you, and you monitor the
1160     remote port. Since both monitors are local to the node, they are much more
1161     reliable (no need for C<spawn_link>).
1162 root 1.37
1163     This also saves round-trips and avoids sending messages to the wrong port
1164     (hard to do in Erlang).
1165 root 1.35
1166 root 1.26 =back
1167    
1168 root 1.46 =head1 RATIONALE
1169    
1170     =over 4
1171    
1172 root 1.67 =item Why strings for port and node IDs, why not objects?
1173 root 1.46
1174     We considered "objects", but found that the actual number of methods
1175 root 1.67 that can be called are quite low. Since port and node IDs travel over
1176 root 1.46 the network frequently, the serialising/deserialising would add lots of
1177 root 1.67 overhead, as well as having to keep a proxy object everywhere.
1178 root 1.46
1179     Strings can easily be printed, easily serialised etc. and need no special
1180     procedures to be "valid".
1181    
1182 root 1.110 And as a result, a port with just a default receiver consists of a single
1183 root 1.117 code reference stored in a global hash - it can't become much cheaper.
1184 root 1.47
1185 root 1.67 =item Why favour JSON, why not a real serialising format such as Storable?
1186 root 1.46
1187     In fact, any AnyEvent::MP node will happily accept Storable as framing
1188     format, but currently there is no way to make a node use Storable by
1189 root 1.67 default (although all nodes will accept it).
1190 root 1.46
1191     The default framing protocol is JSON because a) JSON::XS is many times
1192     faster for small messages and b) most importantly, after years of
1193     experience we found that object serialisation is causing more problems
1194 root 1.67 than it solves: Just like function calls, objects simply do not travel
1195 root 1.46 easily over the network, mostly because they will always be a copy, so you
1196     always have to re-think your design.
1197    
1198     Keeping your messages simple, concentrating on data structures rather than
1199     objects, will keep your messages clean, tidy and efficient.
1200    
1201     =back
1202    
1203 root 1.1 =head1 SEE ALSO
1204    
1205 root 1.68 L<AnyEvent::MP::Intro> - a gentle introduction.
1206    
1207     L<AnyEvent::MP::Kernel> - more, lower-level, stuff.
1208    
1209 root 1.113 L<AnyEvent::MP::Global> - network maintenance and port groups, to find
1210 root 1.68 your applications.
1211    
1212 root 1.105 L<AnyEvent::MP::DataConn> - establish data connections between nodes.
1213    
1214 root 1.81 L<AnyEvent::MP::LogCatcher> - simple service to display log messages from
1215     all nodes.
1216    
1217 root 1.1 L<AnyEvent>.
1218    
1219     =head1 AUTHOR
1220    
1221     Marc Lehmann <schmorp@schmorp.de>
1222     http://home.schmorp.de/
1223    
1224     =cut
1225    
1226     1
1227