ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.129
Committed: Thu Mar 8 21:37:51 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.128: +40 -6 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.89 AnyEvent::MP - erlang-style multi-processing/message-passing framework
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.75 $NODE # contains this node's node ID
10     NODE # returns this node's node ID
11 root 1.2
12 root 1.38 $SELF # receiving/own port id in rcv callbacks
13    
14 root 1.48 # initialise the node so it can send/receive messages
15 root 1.72 configure;
16 root 1.48
17 root 1.75 # ports are message destinations
18 root 1.38
19     # sending messages
20 root 1.2 snd $port, type => data...;
21 root 1.38 snd $port, @msg;
22     snd @msg_with_first_element_being_a_port;
23 root 1.2
24 root 1.50 # creating/using ports, the simple way
25 root 1.73 my $simple_port = port { my @msg = @_ };
26 root 1.22
27 root 1.52 # creating/using ports, tagged message matching
28 root 1.38 my $port = port;
29 root 1.73 rcv $port, ping => sub { snd $_[0], "pong" };
30     rcv $port, pong => sub { warn "pong received\n" };
31 root 1.2
32 root 1.48 # create a port on another node
33     my $port = spawn $node, $initfunc, @initdata;
34    
35 root 1.116 # destroy a port again
36 root 1.101 kil $port; # "normal" kill
37     kil $port, my_error => "everything is broken"; # error kill
38    
39 root 1.35 # monitoring
40 root 1.129 mon $port, $cb->(@msg) # callback is invoked on death
41     mon $port, $localport # kill localport on abnormal death
42     mon $port, $localport, @msg # send message on death
43 root 1.35
44 root 1.101 # temporarily execute code in port context
45     peval $port, sub { die "kill the port!" };
46    
47     # execute callbacks in $SELF port context
48     my $timer = AE::timer 1, 0, psub {
49     die "kill the port, delayed";
50     };
51    
52 root 1.45 =head1 CURRENT STATUS
53    
54 root 1.71 bin/aemp - stable.
55     AnyEvent::MP - stable API, should work.
56 elmex 1.77 AnyEvent::MP::Intro - explains most concepts.
57 root 1.88 AnyEvent::MP::Kernel - mostly stable API.
58     AnyEvent::MP::Global - stable API.
59 root 1.45
60 root 1.1 =head1 DESCRIPTION
61    
62 root 1.2 This module (-family) implements a simple message passing framework.
63    
64     Despite its simplicity, you can securely message other processes running
65 root 1.67 on the same or other hosts, and you can supervise entities remotely.
66 root 1.2
67 root 1.23 For an introduction to this module family, see the L<AnyEvent::MP::Intro>
68 root 1.67 manual page and the examples under F<eg/>.
69 root 1.23
70 root 1.2 =head1 CONCEPTS
71    
72     =over 4
73    
74     =item port
75    
76 root 1.79 Not to be confused with a TCP port, a "port" is something you can send
77     messages to (with the C<snd> function).
78 root 1.29
79 root 1.53 Ports allow you to register C<rcv> handlers that can match all or just
80 root 1.64 some messages. Messages send to ports will not be queued, regardless of
81     anything was listening for them or not.
82 root 1.2
83 root 1.119 Ports are represented by (printable) strings called "port IDs".
84    
85 root 1.67 =item port ID - C<nodeid#portname>
86 root 1.2
87 root 1.123 A port ID is the concatenation of a node ID, a hash-mark (C<#>)
88     as separator, and a port name (a printable string of unspecified
89     format created by AnyEvent::MP).
90 root 1.2
91     =item node
92    
93 root 1.53 A node is a single process containing at least one port - the node port,
94 root 1.67 which enables nodes to manage each other remotely, and to create new
95 root 1.53 ports.
96 root 1.2
97 root 1.67 Nodes are either public (have one or more listening ports) or private
98     (no listening ports). Private nodes cannot talk to other private nodes
99 root 1.119 currently, but all nodes can talk to public nodes.
100    
101     Nodes is represented by (printable) strings called "node IDs".
102 root 1.2
103 root 1.117 =item node ID - C<[A-Za-z0-9_\-.:]*>
104 root 1.2
105 root 1.64 A node ID is a string that uniquely identifies the node within a
106     network. Depending on the configuration used, node IDs can look like a
107     hostname, a hostname and a port, or a random string. AnyEvent::MP itself
108 root 1.119 doesn't interpret node IDs in any way except to uniquely identify a node.
109 root 1.64
110     =item binds - C<ip:port>
111    
112     Nodes can only talk to each other by creating some kind of connection to
113     each other. To do this, nodes should listen on one or more local transport
114 root 1.119 endpoints - binds.
115    
116     Currently, only standard C<ip:port> specifications can be used, which
117     specify TCP ports to listen on. So a bind is basically just a tcp socket
118     in listening mode thta accepts conenctions form other nodes.
119 root 1.64
120 root 1.83 =item seed nodes
121 root 1.64
122 root 1.119 When a node starts, it knows nothing about the network it is in - it
123     needs to connect to at least one other node that is already in the
124     network. These other nodes are called "seed nodes".
125    
126     Seed nodes themselves are not special - they are seed nodes only because
127     some other node I<uses> them as such, but any node can be used as seed
128     node for other nodes, and eahc node cna use a different set of seed nodes.
129 root 1.83
130     In addition to discovering the network, seed nodes are also used to
131 root 1.119 maintain the network - all nodes using the same seed node form are part of
132     the same network. If a network is split into multiple subnets because e.g.
133     the network link between the parts goes down, then using the same seed
134     nodes for all nodes ensures that eventually the subnets get merged again.
135 root 1.83
136     Seed nodes are expected to be long-running, and at least one seed node
137 root 1.85 should always be available. They should also be relatively responsive - a
138     seed node that blocks for long periods will slow down everybody else.
139 root 1.83
140 root 1.119 For small networks, it's best if every node uses the same set of seed
141     nodes. For large networks, it can be useful to specify "regional" seed
142     nodes for most nodes in an area, and use all seed nodes as seed nodes for
143     each other. What's important is that all seed nodes connections form a
144     complete graph, so that the network cannot split into separate subnets
145     forever.
146    
147     Seed nodes are represented by seed IDs.
148    
149     =item seed IDs - C<host:port>
150 root 1.83
151 root 1.119 Seed IDs are transport endpoint(s) (usually a hostname/IP address and a
152 elmex 1.96 TCP port) of nodes that should be used as seed nodes.
153 root 1.29
154 root 1.119 =item global nodes
155    
156     An AEMP network needs a discovery service - nodes need to know how to
157     connect to other nodes they only know by name. In addition, AEMP offers a
158     distributed "group database", which maps group names to a list of strings
159     - for example, to register worker ports.
160    
161     A network needs at least one global node to work, and allows every node to
162     be a global node.
163    
164     Any node that loads the L<AnyEvent::MP::Global> module becomes a global
165     node and tries to keep connections to all other nodes. So while it can
166     make sense to make every node "global" in small networks, it usually makes
167     sense to only make seed nodes into global nodes in large networks (nodes
168     keep connections to seed nodes and global nodes, so makign them the same
169     reduces overhead).
170 root 1.67
171 root 1.2 =back
172    
173 root 1.3 =head1 VARIABLES/FUNCTIONS
174 root 1.2
175     =over 4
176    
177 root 1.1 =cut
178    
179     package AnyEvent::MP;
180    
181 root 1.121 use AnyEvent::MP::Config ();
182 root 1.44 use AnyEvent::MP::Kernel;
183 root 1.121 use AnyEvent::MP::Kernel qw(%NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID);
184 root 1.2
185 root 1.1 use common::sense;
186    
187 root 1.2 use Carp ();
188    
189 root 1.1 use AE ();
190 root 1.124 use Guard ();
191 root 1.1
192 root 1.2 use base "Exporter";
193    
194 root 1.121 our $VERSION = $AnyEvent::MP::Config::VERSION;
195 root 1.43
196 root 1.8 our @EXPORT = qw(
197 root 1.59 NODE $NODE *SELF node_of after
198 root 1.72 configure
199 root 1.101 snd rcv mon mon_guard kil psub peval spawn cal
200 root 1.22 port
201 root 1.124 db_set db_del db_reg
202 root 1.128 db_mon db_family db_keys db_values
203 root 1.8 );
204 root 1.2
205 root 1.22 our $SELF;
206    
207     sub _self_die() {
208     my $msg = $@;
209     $msg =~ s/\n+$// unless ref $msg;
210     kil $SELF, die => $msg;
211     }
212    
213     =item $thisnode = NODE / $NODE
214    
215 root 1.67 The C<NODE> function returns, and the C<$NODE> variable contains, the node
216 root 1.64 ID of the node running in the current process. This value is initialised by
217 root 1.72 a call to C<configure>.
218 root 1.22
219 root 1.63 =item $nodeid = node_of $port
220 root 1.22
221 root 1.67 Extracts and returns the node ID from a port ID or a node ID.
222 root 1.34
223 root 1.78 =item configure $profile, key => value...
224    
225 root 1.72 =item configure key => value...
226 root 1.34
227 root 1.64 Before a node can talk to other nodes on the network (i.e. enter
228 root 1.72 "distributed mode") it has to configure itself - the minimum a node needs
229 root 1.64 to know is its own name, and optionally it should know the addresses of
230     some other nodes in the network to discover other nodes.
231 root 1.34
232 root 1.121 This function configures a node - it must be called exactly once (or
233     never) before calling other AnyEvent::MP functions.
234    
235 root 1.108 The key/value pairs are basically the same ones as documented for the
236 root 1.127 F<aemp> command line utility (sans the set/del prefix), with these additions:
237 root 1.121
238     =over 4
239    
240     =item norc => $boolean (default false)
241    
242     If true, then the rc file (e.g. F<~/.perl-anyevent-mp>) will I<not>
243     be consulted - all configuraiton options must be specified in the
244     C<configure> call.
245 root 1.108
246 root 1.121 =item force => $boolean (default false)
247    
248     IF true, then the values specified in the C<configure> will take
249     precedence over any values configured via the rc file. The default is for
250     the rc file to override any options specified in the program.
251    
252 root 1.127 =item secure => $pass->($nodeid)
253    
254     In addition to specifying a boolean, you can specify a code reference that
255     is called for every remote execution attempt - the execution request is
256     granted iff the callback returns a true value.
257    
258     See F<semp setsecure> for more info.
259    
260 root 1.121 =back
261 root 1.34
262 root 1.72 =over 4
263    
264     =item step 1, gathering configuration from profiles
265    
266     The function first looks up a profile in the aemp configuration (see the
267     L<aemp> commandline utility). The profile name can be specified via the
268 root 1.78 named C<profile> parameter or can simply be the first parameter). If it is
269     missing, then the nodename (F<uname -n>) will be used as profile name.
270 root 1.34
271 root 1.72 The profile data is then gathered as follows:
272 root 1.69
273 elmex 1.77 First, all remaining key => value pairs (all of which are conveniently
274 root 1.72 undocumented at the moment) will be interpreted as configuration
275     data. Then they will be overwritten by any values specified in the global
276     default configuration (see the F<aemp> utility), then the chain of
277     profiles chosen by the profile name (and any C<parent> attributes).
278    
279     That means that the values specified in the profile have highest priority
280     and the values specified directly via C<configure> have lowest priority,
281     and can only be used to specify defaults.
282 root 1.49
283 root 1.64 If the profile specifies a node ID, then this will become the node ID of
284 root 1.122 this process. If not, then the profile name will be used as node ID, with
285 root 1.126 a unique randoms tring (C</%u>) appended.
286 root 1.122
287 root 1.126 The node ID can contain some C<%> sequences that are expanded: C<%n>
288     is expanded to the local nodename, C<%u> is replaced by a random
289     strign to make the node unique. For example, the F<aemp> commandline
290     utility uses C<aemp/%n/%u> as nodename, which might expand to
291     C<aemp/cerebro/ZQDGSIkRhEZQDGSIkRhE>.
292 root 1.64
293 root 1.72 =item step 2, bind listener sockets
294    
295 root 1.64 The next step is to look up the binds in the profile, followed by binding
296     aemp protocol listeners on all binds specified (it is possible and valid
297     to have no binds, meaning that the node cannot be contacted form the
298     outside. This means the node cannot talk to other nodes that also have no
299     binds, but it can still talk to all "normal" nodes).
300    
301 root 1.70 If the profile does not specify a binds list, then a default of C<*> is
302 root 1.72 used, meaning the node will bind on a dynamically-assigned port on every
303     local IP address it finds.
304    
305     =item step 3, connect to seed nodes
306 root 1.64
307 root 1.119 As the last step, the seed ID list from the profile is passed to the
308 root 1.64 L<AnyEvent::MP::Global> module, which will then use it to keep
309 root 1.72 connectivity with at least one node at any point in time.
310 root 1.64
311 root 1.72 =back
312    
313 root 1.87 Example: become a distributed node using the local node name as profile.
314 root 1.72 This should be the most common form of invocation for "daemon"-type nodes.
315 root 1.34
316 root 1.72 configure
317 root 1.34
318 root 1.126 Example: become a semi-anonymous node. This form is often used for
319     commandline clients.
320 root 1.34
321 root 1.126 configure nodeid => "myscript/%n/%u";
322 root 1.72
323 root 1.120 Example: configure a node using a profile called seed, which is suitable
324 root 1.72 for a seed node as it binds on all local addresses on a fixed port (4040,
325     customary for aemp).
326    
327     # use the aemp commandline utility
328 root 1.122 # aemp profile seed binds '*:4040'
329 root 1.72
330     # then use it
331     configure profile => "seed";
332 root 1.34
333 root 1.72 # or simply use aemp from the shell again:
334     # aemp run profile seed
335 root 1.34
336 root 1.72 # or provide a nicer-to-remember nodeid
337     # aemp run profile seed nodeid "$(hostname)"
338 root 1.34
339 root 1.22 =item $SELF
340    
341     Contains the current port id while executing C<rcv> callbacks or C<psub>
342     blocks.
343 root 1.3
344 root 1.67 =item *SELF, SELF, %SELF, @SELF...
345 root 1.22
346     Due to some quirks in how perl exports variables, it is impossible to
347 root 1.67 just export C<$SELF>, all the symbols named C<SELF> are exported by this
348 root 1.22 module, but only C<$SELF> is currently used.
349 root 1.3
350 root 1.33 =item snd $port, type => @data
351 root 1.3
352 root 1.33 =item snd $port, @msg
353 root 1.3
354 root 1.67 Send the given message to the given port, which can identify either a
355     local or a remote port, and must be a port ID.
356 root 1.8
357 root 1.67 While the message can be almost anything, it is highly recommended to
358     use a string as first element (a port ID, or some word that indicates a
359     request type etc.) and to consist if only simple perl values (scalars,
360     arrays, hashes) - if you think you need to pass an object, think again.
361    
362     The message data logically becomes read-only after a call to this
363     function: modifying any argument (or values referenced by them) is
364     forbidden, as there can be considerable time between the call to C<snd>
365     and the time the message is actually being serialised - in fact, it might
366     never be copied as within the same process it is simply handed to the
367     receiving port.
368 root 1.3
369     The type of data you can transfer depends on the transport protocol: when
370     JSON is used, then only strings, numbers and arrays and hashes consisting
371     of those are allowed (no objects). When Storable is used, then anything
372     that Storable can serialise and deserialise is allowed, and for the local
373 root 1.67 node, anything can be passed. Best rely only on the common denominator of
374     these.
375 root 1.3
376 root 1.22 =item $local_port = port
377 root 1.2
378 root 1.50 Create a new local port object and returns its port ID. Initially it has
379     no callbacks set and will throw an error when it receives messages.
380 root 1.10
381 root 1.50 =item $local_port = port { my @msg = @_ }
382 root 1.15
383 root 1.50 Creates a new local port, and returns its ID. Semantically the same as
384     creating a port and calling C<rcv $port, $callback> on it.
385 root 1.15
386 root 1.50 The block will be called for every message received on the port, with the
387     global variable C<$SELF> set to the port ID. Runtime errors will cause the
388     port to be C<kil>ed. The message will be passed as-is, no extra argument
389     (i.e. no port ID) will be passed to the callback.
390 root 1.15
391 root 1.50 If you want to stop/destroy the port, simply C<kil> it:
392 root 1.15
393 root 1.50 my $port = port {
394     my @msg = @_;
395     ...
396     kil $SELF;
397 root 1.15 };
398 root 1.10
399     =cut
400    
401 root 1.33 sub rcv($@);
402    
403 root 1.50 sub _kilme {
404     die "received message on port without callback";
405     }
406    
407 root 1.22 sub port(;&) {
408 root 1.123 my $id = $UNIQ . ++$ID;
409 root 1.22 my $port = "$NODE#$id";
410    
411 root 1.50 rcv $port, shift || \&_kilme;
412 root 1.10
413 root 1.22 $port
414 root 1.10 }
415    
416 root 1.50 =item rcv $local_port, $callback->(@msg)
417 root 1.31
418 root 1.50 Replaces the default callback on the specified port. There is no way to
419     remove the default callback: use C<sub { }> to disable it, or better
420     C<kil> the port when it is no longer needed.
421 root 1.3
422 root 1.33 The global C<$SELF> (exported by this module) contains C<$port> while
423 root 1.50 executing the callback. Runtime errors during callback execution will
424     result in the port being C<kil>ed.
425 root 1.22
426 root 1.50 The default callback received all messages not matched by a more specific
427     C<tag> match.
428 root 1.22
429 root 1.50 =item rcv $local_port, tag => $callback->(@msg_without_tag), ...
430 root 1.3
431 root 1.54 Register (or replace) callbacks to be called on messages starting with the
432     given tag on the given port (and return the port), or unregister it (when
433     C<$callback> is C<$undef> or missing). There can only be one callback
434     registered for each tag.
435 root 1.3
436 root 1.50 The original message will be passed to the callback, after the first
437     element (the tag) has been removed. The callback will use the same
438     environment as the default callback (see above).
439 root 1.3
440 root 1.36 Example: create a port and bind receivers on it in one go.
441    
442     my $port = rcv port,
443 root 1.50 msg1 => sub { ... },
444     msg2 => sub { ... },
445 root 1.36 ;
446    
447     Example: create a port, bind receivers and send it in a message elsewhere
448     in one go:
449    
450     snd $otherport, reply =>
451     rcv port,
452 root 1.50 msg1 => sub { ... },
453 root 1.36 ...
454     ;
455    
456 root 1.54 Example: temporarily register a rcv callback for a tag matching some port
457 root 1.102 (e.g. for an rpc reply) and unregister it after a message was received.
458 root 1.54
459     rcv $port, $otherport => sub {
460     my @reply = @_;
461    
462     rcv $SELF, $otherport;
463     };
464    
465 root 1.3 =cut
466    
467     sub rcv($@) {
468 root 1.33 my $port = shift;
469 root 1.75 my ($nodeid, $portid) = split /#/, $port, 2;
470 root 1.3
471 root 1.75 $NODE{$nodeid} == $NODE{""}
472 root 1.33 or Carp::croak "$port: rcv can only be called on local ports, caught";
473 root 1.22
474 root 1.50 while (@_) {
475     if (ref $_[0]) {
476     if (my $self = $PORT_DATA{$portid}) {
477     "AnyEvent::MP::Port" eq ref $self
478     or Carp::croak "$port: rcv can only be called on message matching ports, caught";
479 root 1.33
480 root 1.103 $self->[0] = shift;
481 root 1.50 } else {
482     my $cb = shift;
483     $PORT{$portid} = sub {
484     local $SELF = $port;
485     eval { &$cb }; _self_die if $@;
486     };
487     }
488     } elsif (defined $_[0]) {
489     my $self = $PORT_DATA{$portid} ||= do {
490 root 1.103 my $self = bless [$PORT{$portid} || sub { }, { }, $port], "AnyEvent::MP::Port";
491 root 1.50
492     $PORT{$portid} = sub {
493     local $SELF = $port;
494    
495     if (my $cb = $self->[1]{$_[0]}) {
496     shift;
497     eval { &$cb }; _self_die if $@;
498     } else {
499     &{ $self->[0] };
500 root 1.33 }
501     };
502 root 1.50
503     $self
504 root 1.33 };
505    
506 root 1.50 "AnyEvent::MP::Port" eq ref $self
507     or Carp::croak "$port: rcv can only be called on message matching ports, caught";
508 root 1.22
509 root 1.50 my ($tag, $cb) = splice @_, 0, 2;
510 root 1.33
511 root 1.50 if (defined $cb) {
512     $self->[1]{$tag} = $cb;
513 root 1.33 } else {
514 root 1.50 delete $self->[1]{$tag};
515 root 1.33 }
516 root 1.22 }
517 root 1.3 }
518 root 1.31
519 root 1.33 $port
520 root 1.2 }
521    
522 root 1.101 =item peval $port, $coderef[, @args]
523    
524     Evaluates the given C<$codref> within the contetx of C<$port>, that is,
525     when the code throews an exception the C<$port> will be killed.
526    
527     Any remaining args will be passed to the callback. Any return values will
528     be returned to the caller.
529    
530     This is useful when you temporarily want to execute code in the context of
531     a port.
532    
533     Example: create a port and run some initialisation code in it's context.
534    
535     my $port = port { ... };
536    
537     peval $port, sub {
538     init
539     or die "unable to init";
540     };
541    
542     =cut
543    
544     sub peval($$) {
545     local $SELF = shift;
546     my $cb = shift;
547    
548     if (wantarray) {
549     my @res = eval { &$cb };
550     _self_die if $@;
551     @res
552     } else {
553     my $res = eval { &$cb };
554     _self_die if $@;
555     $res
556     }
557     }
558    
559 root 1.22 =item $closure = psub { BLOCK }
560 root 1.2
561 root 1.22 Remembers C<$SELF> and creates a closure out of the BLOCK. When the
562     closure is executed, sets up the environment in the same way as in C<rcv>
563     callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
564    
565 root 1.101 The effect is basically as if it returned C<< sub { peval $SELF, sub {
566 root 1.114 BLOCK }, @_ } >>.
567 root 1.101
568 root 1.22 This is useful when you register callbacks from C<rcv> callbacks:
569    
570     rcv delayed_reply => sub {
571     my ($delay, @reply) = @_;
572     my $timer = AE::timer $delay, 0, psub {
573     snd @reply, $SELF;
574     };
575     };
576 root 1.3
577 root 1.8 =cut
578 root 1.3
579 root 1.22 sub psub(&) {
580     my $cb = shift;
581 root 1.3
582 root 1.22 my $port = $SELF
583     or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
584 root 1.1
585 root 1.22 sub {
586     local $SELF = $port;
587 root 1.2
588 root 1.22 if (wantarray) {
589     my @res = eval { &$cb };
590     _self_die if $@;
591     @res
592     } else {
593     my $res = eval { &$cb };
594     _self_die if $@;
595     $res
596     }
597     }
598 root 1.2 }
599    
600 root 1.67 =item $guard = mon $port, $cb->(@reason) # call $cb when $port dies
601 root 1.32
602 root 1.67 =item $guard = mon $port, $rcvport # kill $rcvport when $port dies
603 root 1.36
604 root 1.67 =item $guard = mon $port # kill $SELF when $port dies
605 root 1.32
606 root 1.67 =item $guard = mon $port, $rcvport, @msg # send a message when $port dies
607 root 1.32
608 root 1.42 Monitor the given port and do something when the port is killed or
609     messages to it were lost, and optionally return a guard that can be used
610     to stop monitoring again.
611    
612 root 1.36 In the first form (callback), the callback is simply called with any
613     number of C<@reason> elements (no @reason means that the port was deleted
614 root 1.32 "normally"). Note also that I<< the callback B<must> never die >>, so use
615     C<eval> if unsure.
616    
617 root 1.43 In the second form (another port given), the other port (C<$rcvport>)
618 elmex 1.77 will be C<kil>'ed with C<@reason>, if a @reason was specified, i.e. on
619 root 1.36 "normal" kils nothing happens, while under all other conditions, the other
620     port is killed with the same reason.
621 root 1.32
622 root 1.36 The third form (kill self) is the same as the second form, except that
623     C<$rvport> defaults to C<$SELF>.
624    
625     In the last form (message), a message of the form C<@msg, @reason> will be
626     C<snd>.
627 root 1.32
628 root 1.79 Monitoring-actions are one-shot: once messages are lost (and a monitoring
629     alert was raised), they are removed and will not trigger again.
630    
631 root 1.37 As a rule of thumb, monitoring requests should always monitor a port from
632     a local port (or callback). The reason is that kill messages might get
633     lost, just like any other message. Another less obvious reason is that
634 elmex 1.77 even monitoring requests can get lost (for example, when the connection
635 root 1.37 to the other node goes down permanently). When monitoring a port locally
636     these problems do not exist.
637    
638 root 1.79 C<mon> effectively guarantees that, in the absence of hardware failures,
639     after starting the monitor, either all messages sent to the port will
640     arrive, or the monitoring action will be invoked after possible message
641     loss has been detected. No messages will be lost "in between" (after
642     the first lost message no further messages will be received by the
643     port). After the monitoring action was invoked, further messages might get
644     delivered again.
645    
646     Inter-host-connection timeouts and monitoring depend on the transport
647     used. The only transport currently implemented is TCP, and AnyEvent::MP
648     relies on TCP to detect node-downs (this can take 10-15 minutes on a
649 elmex 1.96 non-idle connection, and usually around two hours for idle connections).
650 root 1.79
651     This means that monitoring is good for program errors and cleaning up
652     stuff eventually, but they are no replacement for a timeout when you need
653     to ensure some maximum latency.
654    
655 root 1.32 Example: call a given callback when C<$port> is killed.
656    
657     mon $port, sub { warn "port died because of <@_>\n" };
658    
659     Example: kill ourselves when C<$port> is killed abnormally.
660    
661 root 1.36 mon $port;
662 root 1.32
663 root 1.36 Example: send us a restart message when another C<$port> is killed.
664 root 1.32
665     mon $port, $self => "restart";
666    
667     =cut
668    
669     sub mon {
670 root 1.75 my ($nodeid, $port) = split /#/, shift, 2;
671 root 1.32
672 root 1.75 my $node = $NODE{$nodeid} || add_node $nodeid;
673 root 1.32
674 root 1.41 my $cb = @_ ? shift : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,';
675 root 1.32
676     unless (ref $cb) {
677     if (@_) {
678     # send a kill info message
679 root 1.41 my (@msg) = ($cb, @_);
680 root 1.32 $cb = sub { snd @msg, @_ };
681     } else {
682     # simply kill other port
683     my $port = $cb;
684     $cb = sub { kil $port, @_ if @_ };
685     }
686     }
687    
688     $node->monitor ($port, $cb);
689    
690     defined wantarray
691 root 1.124 and ($cb += 0, Guard::guard { $node->unmonitor ($port, $cb) })
692 root 1.32 }
693    
694     =item $guard = mon_guard $port, $ref, $ref...
695    
696     Monitors the given C<$port> and keeps the passed references. When the port
697     is killed, the references will be freed.
698    
699     Optionally returns a guard that will stop the monitoring.
700    
701     This function is useful when you create e.g. timers or other watchers and
702 root 1.67 want to free them when the port gets killed (note the use of C<psub>):
703 root 1.32
704     $port->rcv (start => sub {
705 root 1.67 my $timer; $timer = mon_guard $port, AE::timer 1, 1, psub {
706 root 1.32 undef $timer if 0.9 < rand;
707     });
708     });
709    
710     =cut
711    
712     sub mon_guard {
713     my ($port, @refs) = @_;
714    
715 root 1.36 #TODO: mon-less form?
716    
717 root 1.32 mon $port, sub { 0 && @refs }
718     }
719    
720 root 1.33 =item kil $port[, @reason]
721 root 1.32
722     Kill the specified port with the given C<@reason>.
723    
724 root 1.107 If no C<@reason> is specified, then the port is killed "normally" -
725     monitor callback will be invoked, but the kil will not cause linked ports
726     (C<mon $mport, $lport> form) to get killed.
727 root 1.32
728 root 1.107 If a C<@reason> is specified, then linked ports (C<mon $mport, $lport>
729     form) get killed with the same reason.
730 root 1.32
731     Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
732     will be reported as reason C<< die => $@ >>.
733    
734     Transport/communication errors are reported as C<< transport_error =>
735     $message >>.
736    
737 root 1.38 =cut
738    
739     =item $port = spawn $node, $initfunc[, @initdata]
740    
741     Creates a port on the node C<$node> (which can also be a port ID, in which
742     case it's the node where that port resides).
743    
744 root 1.67 The port ID of the newly created port is returned immediately, and it is
745     possible to immediately start sending messages or to monitor the port.
746 root 1.38
747 root 1.67 After the port has been created, the init function is called on the remote
748     node, in the same context as a C<rcv> callback. This function must be a
749     fully-qualified function name (e.g. C<MyApp::Chat::Server::init>). To
750     specify a function in the main program, use C<::name>.
751 root 1.38
752     If the function doesn't exist, then the node tries to C<require>
753     the package, then the package above the package and so on (e.g.
754     C<MyApp::Chat::Server>, C<MyApp::Chat>, C<MyApp>) until the function
755     exists or it runs out of package names.
756    
757     The init function is then called with the newly-created port as context
758 root 1.82 object (C<$SELF>) and the C<@initdata> values as arguments. It I<must>
759     call one of the C<rcv> functions to set callbacks on C<$SELF>, otherwise
760     the port might not get created.
761 root 1.38
762 root 1.67 A common idiom is to pass a local port, immediately monitor the spawned
763     port, and in the remote init function, immediately monitor the passed
764     local port. This two-way monitoring ensures that both ports get cleaned up
765     when there is a problem.
766 root 1.38
767 root 1.80 C<spawn> guarantees that the C<$initfunc> has no visible effects on the
768     caller before C<spawn> returns (by delaying invocation when spawn is
769     called for the local node).
770    
771 root 1.38 Example: spawn a chat server port on C<$othernode>.
772    
773     # this node, executed from within a port context:
774     my $server = spawn $othernode, "MyApp::Chat::Server::connect", $SELF;
775     mon $server;
776    
777     # init function on C<$othernode>
778     sub connect {
779     my ($srcport) = @_;
780    
781     mon $srcport;
782    
783     rcv $SELF, sub {
784     ...
785     };
786     }
787    
788     =cut
789    
790     sub _spawn {
791     my $port = shift;
792     my $init = shift;
793    
794 root 1.82 # rcv will create the actual port
795 root 1.38 local $SELF = "$NODE#$port";
796     eval {
797     &{ load_func $init }
798     };
799     _self_die if $@;
800     }
801    
802     sub spawn(@) {
803 root 1.75 my ($nodeid, undef) = split /#/, shift, 2;
804 root 1.38
805 root 1.123 my $id = $RUNIQ . ++$ID;
806 root 1.38
807 root 1.39 $_[0] =~ /::/
808     or Carp::croak "spawn init function must be a fully-qualified name, caught";
809    
810 root 1.75 snd_to_func $nodeid, "AnyEvent::MP::_spawn" => $id, @_;
811 root 1.38
812 root 1.75 "$nodeid#$id"
813 root 1.38 }
814    
815 root 1.121
816 root 1.59 =item after $timeout, @msg
817    
818     =item after $timeout, $callback
819    
820     Either sends the given message, or call the given callback, after the
821     specified number of seconds.
822    
823 root 1.67 This is simply a utility function that comes in handy at times - the
824     AnyEvent::MP author is not convinced of the wisdom of having it, though,
825     so it may go away in the future.
826 root 1.59
827     =cut
828    
829     sub after($@) {
830     my ($timeout, @action) = @_;
831    
832     my $t; $t = AE::timer $timeout, 0, sub {
833     undef $t;
834     ref $action[0]
835     ? $action[0]()
836     : snd @action;
837     };
838     }
839    
840 root 1.129 #=item $cb2 = timeout $seconds, $cb[, @args]
841    
842 root 1.87 =item cal $port, @msg, $callback[, $timeout]
843    
844     A simple form of RPC - sends a message to the given C<$port> with the
845     given contents (C<@msg>), but adds a reply port to the message.
846    
847     The reply port is created temporarily just for the purpose of receiving
848     the reply, and will be C<kil>ed when no longer needed.
849    
850     A reply message sent to the port is passed to the C<$callback> as-is.
851    
852     If an optional time-out (in seconds) is given and it is not C<undef>,
853     then the callback will be called without any arguments after the time-out
854     elapsed and the port is C<kil>ed.
855    
856 root 1.98 If no time-out is given (or it is C<undef>), then the local port will
857     monitor the remote port instead, so it eventually gets cleaned-up.
858 root 1.87
859     Currently this function returns the temporary port, but this "feature"
860     might go in future versions unless you can make a convincing case that
861     this is indeed useful for something.
862    
863     =cut
864    
865     sub cal(@) {
866     my $timeout = ref $_[-1] ? undef : pop;
867     my $cb = pop;
868    
869     my $port = port {
870     undef $timeout;
871     kil $SELF;
872     &$cb;
873     };
874    
875     if (defined $timeout) {
876     $timeout = AE::timer $timeout, 0, sub {
877     undef $timeout;
878     kil $port;
879     $cb->();
880     };
881     } else {
882     mon $_[0], sub {
883     kil $port;
884     $cb->();
885     };
886     }
887    
888     push @_, $port;
889     &snd;
890    
891     $port
892     }
893    
894 root 1.8 =back
895    
896 root 1.124 =head1 DISTRIBUTED DATABASE
897    
898     AnyEvent::MP comes with a simple distributed database. The database will
899     be mirrored asynchronously at all global nodes. Other nodes bind to one of
900     the global nodes for their needs.
901    
902     The database consists of a two-level hash - a hash contains a hash which
903     contains values.
904    
905     The top level hash key is called "family", and the second-level hash key
906 root 1.126 is called "subkey" or simply "key".
907 root 1.124
908 root 1.125 The family must be alphanumeric, i.e. start with a letter and consist
909     of letters, digits, underscores and colons (C<[A-Za-z][A-Za-z0-9_:]*>,
910     pretty much like Perl module names.
911 root 1.124
912 root 1.125 As the family namespace is global, it is recommended to prefix family names
913 root 1.124 with the name of the application or module using it.
914    
915 root 1.126 The subkeys must be non-empty strings, with no further restrictions.
916 root 1.125
917 root 1.124 The values should preferably be strings, but other perl scalars should
918 root 1.125 work as well (such as undef, arrays and hashes).
919 root 1.124
920 root 1.126 Every database entry is owned by one node - adding the same family/subkey
921 root 1.124 combination on multiple nodes will not cause discomfort for AnyEvent::MP,
922     but the result might be nondeterministic, i.e. the key might have
923     different values on different nodes.
924    
925 root 1.126 Different subkeys in the same family can be owned by different nodes
926     without problems, and in fact, this is the common method to create worker
927     pools. For example, a worker port for image scaling might do this:
928 root 1.124
929 root 1.126 db_set my_image_scalers => $port;
930 root 1.124
931 root 1.126 And clients looking for an image scaler will want to get the
932 root 1.129 C<my_image_scalers> keys from time to time:
933    
934     db_keys my_image_scalers => sub {
935     @ports = @{ $_[0] };
936     };
937    
938     Or better yet, they want to monitor the database family, so they always
939     have a reasonable up-to-date copy:
940    
941     db_mon my_image_scalers => sub {
942     @ports = keys %{ $_[0] };
943     };
944    
945     In general, you can set or delete single subkeys, but query and monitor
946     whole families only.
947 root 1.126
948 root 1.129 If you feel the need to monitor or query a single subkey, try giving it
949     it's own family.
950 root 1.126
951     =over
952    
953     =item db_set $family => $subkey [=> $value]
954    
955     Sets (or replaces) a key to the database - if C<$value> is omitted,
956     C<undef> is used instead.
957    
958     =item db_del $family => $subkey
959 root 1.124
960     Deletes a key from the database.
961    
962 root 1.126 =item $guard = db_reg $family => $subkey [=> $value]
963 root 1.124
964     Sets the key on the database and returns a guard. When the guard is
965     destroyed, the key is deleted from the database. If C<$value> is missing,
966     then C<undef> is used.
967    
968 root 1.129 =item db_family $family => $cb->(\%familyhash)
969    
970     Queries the named database C<$family> and call the callback with the
971     family represented as a hash. You can keep and freely modify the hash.
972    
973     =item db_keys $family => $cb->(\@keys)
974    
975     Same as C<db_family>, except it only queries the family I<subkeys> and passes
976     them as array reference to the callback.
977    
978     =item db_values $family => $cb->(\@values)
979    
980     Same as C<db_family>, except it only queries the family I<values> and passes them
981     as array reference to the callback.
982    
983 root 1.128 =item $guard = db_mon $family => $cb->($familyhash, \@subkeys...)
984    
985     Creates a monitor on the given database family. Each time a key is set or
986     or is deleted the callback is called with a hash containing the database
987     family and an arrayref with subkeys that have changed.
988    
989     Specifically, if one of the passed subkeys exists in the $familyhash, then
990     it is currently set to the value in the $familyhash. Otherwise, it has
991     been deleted.
992    
993 root 1.129 The family hash reference belongs to AnyEvent::MP and B<must not be
994     modified or stored> by the callback. When in doubt, make a copy.
995    
996 root 1.128 The first call will be with the current contents of the family and all
997     keys, as if they were just added.
998    
999     It is possible that the callback is called with a change event even though
1000     the subkey is already present and the value has not changed.
1001    
1002     The monitoring stops when the guard object is destroyed.
1003    
1004     Example: on every change to the family "mygroup", print out all keys.
1005    
1006     my $guard = db_mon mygroup => sub {
1007     my ($family, $keys) = @_;
1008     print "mygroup members: ", (join " ", keys %$family), "\n";
1009     };
1010    
1011     Exmaple: wait until the family "My::Module::workers" is non-empty.
1012    
1013     my $guard; $guard = db_mon My::Module::workers => sub {
1014     my ($family, $keys) = @_;
1015     return unless %$family;
1016     undef $guard;
1017     print "My::Module::workers now nonempty\n";
1018     };
1019    
1020     Example: print all changes to the family "AnyRvent::Fantasy::Module".
1021    
1022     my $guard = db_mon AnyRvent::Fantasy::Module => sub {
1023     my ($family, $keys) = @_;
1024    
1025     for (@$keys) {
1026     print "$_: ",
1027     (exists $family->{$_}
1028     ? $family->{$_}
1029     : "(deleted)"),
1030     "\n";
1031     }
1032     };
1033    
1034 root 1.124 =cut
1035    
1036     =back
1037    
1038 root 1.26 =head1 AnyEvent::MP vs. Distributed Erlang
1039    
1040 root 1.35 AnyEvent::MP got lots of its ideas from distributed Erlang (Erlang node
1041     == aemp node, Erlang process == aemp port), so many of the documents and
1042     programming techniques employed by Erlang apply to AnyEvent::MP. Here is a
1043 root 1.27 sample:
1044    
1045 root 1.95 http://www.erlang.se/doc/programming_rules.shtml
1046     http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4
1047     http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6
1048     http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5
1049 root 1.27
1050     Despite the similarities, there are also some important differences:
1051 root 1.26
1052     =over 4
1053    
1054 root 1.65 =item * Node IDs are arbitrary strings in AEMP.
1055 root 1.26
1056 root 1.65 Erlang relies on special naming and DNS to work everywhere in the same
1057     way. AEMP relies on each node somehow knowing its own address(es) (e.g. by
1058 root 1.99 configuration or DNS), and possibly the addresses of some seed nodes, but
1059     will otherwise discover other nodes (and their IDs) itself.
1060 root 1.27
1061 root 1.54 =item * Erlang has a "remote ports are like local ports" philosophy, AEMP
1062 root 1.51 uses "local ports are like remote ports".
1063    
1064     The failure modes for local ports are quite different (runtime errors
1065     only) then for remote ports - when a local port dies, you I<know> it dies,
1066     when a connection to another node dies, you know nothing about the other
1067     port.
1068    
1069     Erlang pretends remote ports are as reliable as local ports, even when
1070     they are not.
1071    
1072     AEMP encourages a "treat remote ports differently" philosophy, with local
1073     ports being the special case/exception, where transport errors cannot
1074     occur.
1075    
1076 root 1.26 =item * Erlang uses processes and a mailbox, AEMP does not queue.
1077    
1078 root 1.119 Erlang uses processes that selectively receive messages out of order, and
1079     therefore needs a queue. AEMP is event based, queuing messages would serve
1080     no useful purpose. For the same reason the pattern-matching abilities
1081     of AnyEvent::MP are more limited, as there is little need to be able to
1082 elmex 1.77 filter messages without dequeuing them.
1083 root 1.26
1084 root 1.119 This is not a philosophical difference, but simply stems from AnyEvent::MP
1085     being event-based, while Erlang is process-based.
1086    
1087     You cna have a look at L<Coro::MP> for a more Erlang-like process model on
1088     top of AEMP and Coro threads.
1089 root 1.26
1090     =item * Erlang sends are synchronous, AEMP sends are asynchronous.
1091    
1092 root 1.119 Sending messages in Erlang is synchronous and blocks the process until
1093     a conenction has been established and the message sent (and so does not
1094     need a queue that can overflow). AEMP sends return immediately, connection
1095     establishment is handled in the background.
1096 root 1.26
1097 root 1.51 =item * Erlang suffers from silent message loss, AEMP does not.
1098 root 1.26
1099 root 1.99 Erlang implements few guarantees on messages delivery - messages can get
1100     lost without any of the processes realising it (i.e. you send messages a,
1101     b, and c, and the other side only receives messages a and c).
1102 root 1.26
1103 root 1.117 AEMP guarantees (modulo hardware errors) correct ordering, and the
1104     guarantee that after one message is lost, all following ones sent to the
1105     same port are lost as well, until monitoring raises an error, so there are
1106     no silent "holes" in the message sequence.
1107 root 1.26
1108 root 1.119 If you want your software to be very reliable, you have to cope with
1109     corrupted and even out-of-order messages in both Erlang and AEMP. AEMP
1110     simply tries to work better in common error cases, such as when a network
1111     link goes down.
1112    
1113 root 1.26 =item * Erlang can send messages to the wrong port, AEMP does not.
1114    
1115 root 1.119 In Erlang it is quite likely that a node that restarts reuses an Erlang
1116     process ID known to other nodes for a completely different process,
1117     causing messages destined for that process to end up in an unrelated
1118     process.
1119 root 1.26
1120 root 1.119 AEMP does not reuse port IDs, so old messages or old port IDs floating
1121 root 1.26 around in the network will not be sent to an unrelated port.
1122    
1123     =item * Erlang uses unprotected connections, AEMP uses secure
1124     authentication and can use TLS.
1125    
1126 root 1.66 AEMP can use a proven protocol - TLS - to protect connections and
1127 root 1.26 securely authenticate nodes.
1128    
1129 root 1.28 =item * The AEMP protocol is optimised for both text-based and binary
1130     communications.
1131    
1132 root 1.66 The AEMP protocol, unlike the Erlang protocol, supports both programming
1133 root 1.119 language independent text-only protocols (good for debugging), and binary,
1134 root 1.67 language-specific serialisers (e.g. Storable). By default, unless TLS is
1135     used, the protocol is actually completely text-based.
1136 root 1.28
1137     It has also been carefully designed to be implementable in other languages
1138 root 1.66 with a minimum of work while gracefully degrading functionality to make the
1139 root 1.28 protocol simple.
1140    
1141 root 1.35 =item * AEMP has more flexible monitoring options than Erlang.
1142    
1143 root 1.119 In Erlang, you can chose to receive I<all> exit signals as messages or
1144     I<none>, there is no in-between, so monitoring single Erlang processes is
1145     difficult to implement.
1146    
1147     Monitoring in AEMP is more flexible than in Erlang, as one can choose
1148     between automatic kill, exit message or callback on a per-port basis.
1149 root 1.35
1150 root 1.37 =item * Erlang tries to hide remote/local connections, AEMP does not.
1151 root 1.35
1152 root 1.67 Monitoring in Erlang is not an indicator of process death/crashes, in the
1153     same way as linking is (except linking is unreliable in Erlang).
1154 root 1.37
1155     In AEMP, you don't "look up" registered port names or send to named ports
1156     that might or might not be persistent. Instead, you normally spawn a port
1157 root 1.67 on the remote node. The init function monitors you, and you monitor the
1158     remote port. Since both monitors are local to the node, they are much more
1159     reliable (no need for C<spawn_link>).
1160 root 1.37
1161     This also saves round-trips and avoids sending messages to the wrong port
1162     (hard to do in Erlang).
1163 root 1.35
1164 root 1.26 =back
1165    
1166 root 1.46 =head1 RATIONALE
1167    
1168     =over 4
1169    
1170 root 1.67 =item Why strings for port and node IDs, why not objects?
1171 root 1.46
1172     We considered "objects", but found that the actual number of methods
1173 root 1.67 that can be called are quite low. Since port and node IDs travel over
1174 root 1.46 the network frequently, the serialising/deserialising would add lots of
1175 root 1.67 overhead, as well as having to keep a proxy object everywhere.
1176 root 1.46
1177     Strings can easily be printed, easily serialised etc. and need no special
1178     procedures to be "valid".
1179    
1180 root 1.110 And as a result, a port with just a default receiver consists of a single
1181 root 1.117 code reference stored in a global hash - it can't become much cheaper.
1182 root 1.47
1183 root 1.67 =item Why favour JSON, why not a real serialising format such as Storable?
1184 root 1.46
1185     In fact, any AnyEvent::MP node will happily accept Storable as framing
1186     format, but currently there is no way to make a node use Storable by
1187 root 1.67 default (although all nodes will accept it).
1188 root 1.46
1189     The default framing protocol is JSON because a) JSON::XS is many times
1190     faster for small messages and b) most importantly, after years of
1191     experience we found that object serialisation is causing more problems
1192 root 1.67 than it solves: Just like function calls, objects simply do not travel
1193 root 1.46 easily over the network, mostly because they will always be a copy, so you
1194     always have to re-think your design.
1195    
1196     Keeping your messages simple, concentrating on data structures rather than
1197     objects, will keep your messages clean, tidy and efficient.
1198    
1199     =back
1200    
1201 root 1.1 =head1 SEE ALSO
1202    
1203 root 1.68 L<AnyEvent::MP::Intro> - a gentle introduction.
1204    
1205     L<AnyEvent::MP::Kernel> - more, lower-level, stuff.
1206    
1207 root 1.113 L<AnyEvent::MP::Global> - network maintenance and port groups, to find
1208 root 1.68 your applications.
1209    
1210 root 1.105 L<AnyEvent::MP::DataConn> - establish data connections between nodes.
1211    
1212 root 1.81 L<AnyEvent::MP::LogCatcher> - simple service to display log messages from
1213     all nodes.
1214    
1215 root 1.1 L<AnyEvent>.
1216    
1217     =head1 AUTHOR
1218    
1219     Marc Lehmann <schmorp@schmorp.de>
1220     http://home.schmorp.de/
1221    
1222     =cut
1223    
1224     1
1225