ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.142
Committed: Fri Mar 23 13:44:01 2012 UTC (12 years, 1 month ago) by root
Branch: MAIN
Changes since 1.141: +0 -13 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.89 AnyEvent::MP - erlang-style multi-processing/message-passing framework
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.75 $NODE # contains this node's node ID
10     NODE # returns this node's node ID
11 root 1.2
12 root 1.38 $SELF # receiving/own port id in rcv callbacks
13    
14 root 1.48 # initialise the node so it can send/receive messages
15 root 1.72 configure;
16 root 1.48
17 root 1.75 # ports are message destinations
18 root 1.38
19     # sending messages
20 root 1.2 snd $port, type => data...;
21 root 1.38 snd $port, @msg;
22     snd @msg_with_first_element_being_a_port;
23 root 1.2
24 root 1.50 # creating/using ports, the simple way
25 root 1.73 my $simple_port = port { my @msg = @_ };
26 root 1.22
27 root 1.52 # creating/using ports, tagged message matching
28 root 1.38 my $port = port;
29 root 1.73 rcv $port, ping => sub { snd $_[0], "pong" };
30     rcv $port, pong => sub { warn "pong received\n" };
31 root 1.2
32 root 1.48 # create a port on another node
33     my $port = spawn $node, $initfunc, @initdata;
34    
35 root 1.116 # destroy a port again
36 root 1.101 kil $port; # "normal" kill
37     kil $port, my_error => "everything is broken"; # error kill
38    
39 root 1.35 # monitoring
40 root 1.129 mon $port, $cb->(@msg) # callback is invoked on death
41     mon $port, $localport # kill localport on abnormal death
42     mon $port, $localport, @msg # send message on death
43 root 1.35
44 root 1.101 # temporarily execute code in port context
45     peval $port, sub { die "kill the port!" };
46    
47     # execute callbacks in $SELF port context
48     my $timer = AE::timer 1, 0, psub {
49     die "kill the port, delayed";
50     };
51    
52 root 1.1 =head1 DESCRIPTION
53    
54 root 1.2 This module (-family) implements a simple message passing framework.
55    
56     Despite its simplicity, you can securely message other processes running
57 root 1.67 on the same or other hosts, and you can supervise entities remotely.
58 root 1.2
59 root 1.23 For an introduction to this module family, see the L<AnyEvent::MP::Intro>
60 root 1.67 manual page and the examples under F<eg/>.
61 root 1.23
62 root 1.2 =head1 CONCEPTS
63    
64     =over 4
65    
66     =item port
67    
68 root 1.79 Not to be confused with a TCP port, a "port" is something you can send
69     messages to (with the C<snd> function).
70 root 1.29
71 root 1.53 Ports allow you to register C<rcv> handlers that can match all or just
72 root 1.64 some messages. Messages send to ports will not be queued, regardless of
73     anything was listening for them or not.
74 root 1.2
75 root 1.119 Ports are represented by (printable) strings called "port IDs".
76    
77 root 1.67 =item port ID - C<nodeid#portname>
78 root 1.2
79 root 1.123 A port ID is the concatenation of a node ID, a hash-mark (C<#>)
80     as separator, and a port name (a printable string of unspecified
81     format created by AnyEvent::MP).
82 root 1.2
83     =item node
84    
85 root 1.53 A node is a single process containing at least one port - the node port,
86 root 1.67 which enables nodes to manage each other remotely, and to create new
87 root 1.53 ports.
88 root 1.2
89 root 1.67 Nodes are either public (have one or more listening ports) or private
90     (no listening ports). Private nodes cannot talk to other private nodes
91 root 1.119 currently, but all nodes can talk to public nodes.
92    
93     Nodes is represented by (printable) strings called "node IDs".
94 root 1.2
95 root 1.117 =item node ID - C<[A-Za-z0-9_\-.:]*>
96 root 1.2
97 root 1.64 A node ID is a string that uniquely identifies the node within a
98     network. Depending on the configuration used, node IDs can look like a
99     hostname, a hostname and a port, or a random string. AnyEvent::MP itself
100 root 1.119 doesn't interpret node IDs in any way except to uniquely identify a node.
101 root 1.64
102     =item binds - C<ip:port>
103    
104     Nodes can only talk to each other by creating some kind of connection to
105     each other. To do this, nodes should listen on one or more local transport
106 root 1.119 endpoints - binds.
107    
108     Currently, only standard C<ip:port> specifications can be used, which
109     specify TCP ports to listen on. So a bind is basically just a tcp socket
110     in listening mode thta accepts conenctions form other nodes.
111 root 1.64
112 root 1.83 =item seed nodes
113 root 1.64
114 root 1.119 When a node starts, it knows nothing about the network it is in - it
115     needs to connect to at least one other node that is already in the
116     network. These other nodes are called "seed nodes".
117    
118     Seed nodes themselves are not special - they are seed nodes only because
119     some other node I<uses> them as such, but any node can be used as seed
120     node for other nodes, and eahc node cna use a different set of seed nodes.
121 root 1.83
122     In addition to discovering the network, seed nodes are also used to
123 root 1.119 maintain the network - all nodes using the same seed node form are part of
124     the same network. If a network is split into multiple subnets because e.g.
125     the network link between the parts goes down, then using the same seed
126     nodes for all nodes ensures that eventually the subnets get merged again.
127 root 1.83
128     Seed nodes are expected to be long-running, and at least one seed node
129 root 1.85 should always be available. They should also be relatively responsive - a
130     seed node that blocks for long periods will slow down everybody else.
131 root 1.83
132 root 1.119 For small networks, it's best if every node uses the same set of seed
133     nodes. For large networks, it can be useful to specify "regional" seed
134     nodes for most nodes in an area, and use all seed nodes as seed nodes for
135     each other. What's important is that all seed nodes connections form a
136     complete graph, so that the network cannot split into separate subnets
137     forever.
138    
139     Seed nodes are represented by seed IDs.
140    
141     =item seed IDs - C<host:port>
142 root 1.83
143 root 1.119 Seed IDs are transport endpoint(s) (usually a hostname/IP address and a
144 elmex 1.96 TCP port) of nodes that should be used as seed nodes.
145 root 1.29
146 root 1.119 =item global nodes
147    
148     An AEMP network needs a discovery service - nodes need to know how to
149     connect to other nodes they only know by name. In addition, AEMP offers a
150     distributed "group database", which maps group names to a list of strings
151     - for example, to register worker ports.
152    
153     A network needs at least one global node to work, and allows every node to
154     be a global node.
155    
156     Any node that loads the L<AnyEvent::MP::Global> module becomes a global
157     node and tries to keep connections to all other nodes. So while it can
158     make sense to make every node "global" in small networks, it usually makes
159     sense to only make seed nodes into global nodes in large networks (nodes
160     keep connections to seed nodes and global nodes, so makign them the same
161     reduces overhead).
162 root 1.67
163 root 1.2 =back
164    
165 root 1.3 =head1 VARIABLES/FUNCTIONS
166 root 1.2
167     =over 4
168    
169 root 1.1 =cut
170    
171     package AnyEvent::MP;
172    
173 root 1.121 use AnyEvent::MP::Config ();
174 root 1.44 use AnyEvent::MP::Kernel;
175 root 1.121 use AnyEvent::MP::Kernel qw(%NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID);
176 root 1.2
177 root 1.1 use common::sense;
178    
179 root 1.2 use Carp ();
180    
181 root 1.141 use AnyEvent ();
182 root 1.124 use Guard ();
183 root 1.1
184 root 1.2 use base "Exporter";
185    
186 root 1.121 our $VERSION = $AnyEvent::MP::Config::VERSION;
187 root 1.43
188 root 1.8 our @EXPORT = qw(
189 root 1.59 NODE $NODE *SELF node_of after
190 root 1.72 configure
191 root 1.101 snd rcv mon mon_guard kil psub peval spawn cal
192 root 1.22 port
193 root 1.124 db_set db_del db_reg
194 root 1.128 db_mon db_family db_keys db_values
195 root 1.8 );
196 root 1.2
197 root 1.22 our $SELF;
198    
199     sub _self_die() {
200     my $msg = $@;
201     $msg =~ s/\n+$// unless ref $msg;
202     kil $SELF, die => $msg;
203     }
204    
205     =item $thisnode = NODE / $NODE
206    
207 root 1.67 The C<NODE> function returns, and the C<$NODE> variable contains, the node
208 root 1.64 ID of the node running in the current process. This value is initialised by
209 root 1.72 a call to C<configure>.
210 root 1.22
211 root 1.63 =item $nodeid = node_of $port
212 root 1.22
213 root 1.67 Extracts and returns the node ID from a port ID or a node ID.
214 root 1.34
215 root 1.78 =item configure $profile, key => value...
216    
217 root 1.72 =item configure key => value...
218 root 1.34
219 root 1.64 Before a node can talk to other nodes on the network (i.e. enter
220 root 1.72 "distributed mode") it has to configure itself - the minimum a node needs
221 root 1.64 to know is its own name, and optionally it should know the addresses of
222     some other nodes in the network to discover other nodes.
223 root 1.34
224 root 1.121 This function configures a node - it must be called exactly once (or
225     never) before calling other AnyEvent::MP functions.
226    
227 root 1.108 The key/value pairs are basically the same ones as documented for the
228 root 1.127 F<aemp> command line utility (sans the set/del prefix), with these additions:
229 root 1.121
230     =over 4
231    
232     =item norc => $boolean (default false)
233    
234     If true, then the rc file (e.g. F<~/.perl-anyevent-mp>) will I<not>
235     be consulted - all configuraiton options must be specified in the
236     C<configure> call.
237 root 1.108
238 root 1.121 =item force => $boolean (default false)
239    
240     IF true, then the values specified in the C<configure> will take
241     precedence over any values configured via the rc file. The default is for
242     the rc file to override any options specified in the program.
243    
244     =back
245 root 1.34
246 root 1.72 =over 4
247    
248     =item step 1, gathering configuration from profiles
249    
250     The function first looks up a profile in the aemp configuration (see the
251     L<aemp> commandline utility). The profile name can be specified via the
252 root 1.78 named C<profile> parameter or can simply be the first parameter). If it is
253     missing, then the nodename (F<uname -n>) will be used as profile name.
254 root 1.34
255 root 1.72 The profile data is then gathered as follows:
256 root 1.69
257 elmex 1.77 First, all remaining key => value pairs (all of which are conveniently
258 root 1.72 undocumented at the moment) will be interpreted as configuration
259     data. Then they will be overwritten by any values specified in the global
260     default configuration (see the F<aemp> utility), then the chain of
261     profiles chosen by the profile name (and any C<parent> attributes).
262    
263     That means that the values specified in the profile have highest priority
264     and the values specified directly via C<configure> have lowest priority,
265     and can only be used to specify defaults.
266 root 1.49
267 root 1.64 If the profile specifies a node ID, then this will become the node ID of
268 root 1.122 this process. If not, then the profile name will be used as node ID, with
269 root 1.126 a unique randoms tring (C</%u>) appended.
270 root 1.122
271 root 1.126 The node ID can contain some C<%> sequences that are expanded: C<%n>
272     is expanded to the local nodename, C<%u> is replaced by a random
273     strign to make the node unique. For example, the F<aemp> commandline
274     utility uses C<aemp/%n/%u> as nodename, which might expand to
275     C<aemp/cerebro/ZQDGSIkRhEZQDGSIkRhE>.
276 root 1.64
277 root 1.72 =item step 2, bind listener sockets
278    
279 root 1.64 The next step is to look up the binds in the profile, followed by binding
280     aemp protocol listeners on all binds specified (it is possible and valid
281     to have no binds, meaning that the node cannot be contacted form the
282     outside. This means the node cannot talk to other nodes that also have no
283     binds, but it can still talk to all "normal" nodes).
284    
285 root 1.70 If the profile does not specify a binds list, then a default of C<*> is
286 root 1.72 used, meaning the node will bind on a dynamically-assigned port on every
287     local IP address it finds.
288    
289     =item step 3, connect to seed nodes
290 root 1.64
291 root 1.119 As the last step, the seed ID list from the profile is passed to the
292 root 1.64 L<AnyEvent::MP::Global> module, which will then use it to keep
293 root 1.72 connectivity with at least one node at any point in time.
294 root 1.64
295 root 1.72 =back
296    
297 root 1.87 Example: become a distributed node using the local node name as profile.
298 root 1.72 This should be the most common form of invocation for "daemon"-type nodes.
299 root 1.34
300 root 1.72 configure
301 root 1.34
302 root 1.126 Example: become a semi-anonymous node. This form is often used for
303     commandline clients.
304 root 1.34
305 root 1.126 configure nodeid => "myscript/%n/%u";
306 root 1.72
307 root 1.120 Example: configure a node using a profile called seed, which is suitable
308 root 1.72 for a seed node as it binds on all local addresses on a fixed port (4040,
309     customary for aemp).
310    
311     # use the aemp commandline utility
312 root 1.122 # aemp profile seed binds '*:4040'
313 root 1.72
314     # then use it
315     configure profile => "seed";
316 root 1.34
317 root 1.72 # or simply use aemp from the shell again:
318     # aemp run profile seed
319 root 1.34
320 root 1.72 # or provide a nicer-to-remember nodeid
321     # aemp run profile seed nodeid "$(hostname)"
322 root 1.34
323 root 1.22 =item $SELF
324    
325     Contains the current port id while executing C<rcv> callbacks or C<psub>
326     blocks.
327 root 1.3
328 root 1.67 =item *SELF, SELF, %SELF, @SELF...
329 root 1.22
330     Due to some quirks in how perl exports variables, it is impossible to
331 root 1.67 just export C<$SELF>, all the symbols named C<SELF> are exported by this
332 root 1.22 module, but only C<$SELF> is currently used.
333 root 1.3
334 root 1.33 =item snd $port, type => @data
335 root 1.3
336 root 1.33 =item snd $port, @msg
337 root 1.3
338 root 1.67 Send the given message to the given port, which can identify either a
339     local or a remote port, and must be a port ID.
340 root 1.8
341 root 1.67 While the message can be almost anything, it is highly recommended to
342     use a string as first element (a port ID, or some word that indicates a
343     request type etc.) and to consist if only simple perl values (scalars,
344     arrays, hashes) - if you think you need to pass an object, think again.
345    
346     The message data logically becomes read-only after a call to this
347     function: modifying any argument (or values referenced by them) is
348     forbidden, as there can be considerable time between the call to C<snd>
349     and the time the message is actually being serialised - in fact, it might
350     never be copied as within the same process it is simply handed to the
351     receiving port.
352 root 1.3
353     The type of data you can transfer depends on the transport protocol: when
354     JSON is used, then only strings, numbers and arrays and hashes consisting
355     of those are allowed (no objects). When Storable is used, then anything
356     that Storable can serialise and deserialise is allowed, and for the local
357 root 1.67 node, anything can be passed. Best rely only on the common denominator of
358     these.
359 root 1.3
360 root 1.22 =item $local_port = port
361 root 1.2
362 root 1.50 Create a new local port object and returns its port ID. Initially it has
363     no callbacks set and will throw an error when it receives messages.
364 root 1.10
365 root 1.50 =item $local_port = port { my @msg = @_ }
366 root 1.15
367 root 1.50 Creates a new local port, and returns its ID. Semantically the same as
368     creating a port and calling C<rcv $port, $callback> on it.
369 root 1.15
370 root 1.50 The block will be called for every message received on the port, with the
371     global variable C<$SELF> set to the port ID. Runtime errors will cause the
372     port to be C<kil>ed. The message will be passed as-is, no extra argument
373     (i.e. no port ID) will be passed to the callback.
374 root 1.15
375 root 1.50 If you want to stop/destroy the port, simply C<kil> it:
376 root 1.15
377 root 1.50 my $port = port {
378     my @msg = @_;
379     ...
380     kil $SELF;
381 root 1.15 };
382 root 1.10
383     =cut
384    
385 root 1.33 sub rcv($@);
386    
387 root 1.132 my $KILME = sub {
388 root 1.133 (my $tag = substr $_[0], 0, 30) =~ s/([\x20-\x7e])/./g;
389 root 1.135 kil $SELF, unhandled_message => "no callback found for message '$tag'";
390 root 1.132 };
391 root 1.50
392 root 1.22 sub port(;&) {
393 root 1.123 my $id = $UNIQ . ++$ID;
394 root 1.22 my $port = "$NODE#$id";
395    
396 root 1.132 rcv $port, shift || $KILME;
397 root 1.10
398 root 1.22 $port
399 root 1.10 }
400    
401 root 1.50 =item rcv $local_port, $callback->(@msg)
402 root 1.31
403 root 1.50 Replaces the default callback on the specified port. There is no way to
404     remove the default callback: use C<sub { }> to disable it, or better
405     C<kil> the port when it is no longer needed.
406 root 1.3
407 root 1.33 The global C<$SELF> (exported by this module) contains C<$port> while
408 root 1.50 executing the callback. Runtime errors during callback execution will
409     result in the port being C<kil>ed.
410 root 1.22
411 root 1.133 The default callback receives all messages not matched by a more specific
412 root 1.50 C<tag> match.
413 root 1.22
414 root 1.50 =item rcv $local_port, tag => $callback->(@msg_without_tag), ...
415 root 1.3
416 root 1.54 Register (or replace) callbacks to be called on messages starting with the
417     given tag on the given port (and return the port), or unregister it (when
418     C<$callback> is C<$undef> or missing). There can only be one callback
419     registered for each tag.
420 root 1.3
421 root 1.50 The original message will be passed to the callback, after the first
422     element (the tag) has been removed. The callback will use the same
423     environment as the default callback (see above).
424 root 1.3
425 root 1.36 Example: create a port and bind receivers on it in one go.
426    
427     my $port = rcv port,
428 root 1.50 msg1 => sub { ... },
429     msg2 => sub { ... },
430 root 1.36 ;
431    
432     Example: create a port, bind receivers and send it in a message elsewhere
433     in one go:
434    
435     snd $otherport, reply =>
436     rcv port,
437 root 1.50 msg1 => sub { ... },
438 root 1.36 ...
439     ;
440    
441 root 1.54 Example: temporarily register a rcv callback for a tag matching some port
442 root 1.102 (e.g. for an rpc reply) and unregister it after a message was received.
443 root 1.54
444     rcv $port, $otherport => sub {
445     my @reply = @_;
446    
447     rcv $SELF, $otherport;
448     };
449    
450 root 1.3 =cut
451    
452     sub rcv($@) {
453 root 1.33 my $port = shift;
454 root 1.75 my ($nodeid, $portid) = split /#/, $port, 2;
455 root 1.3
456 root 1.75 $NODE{$nodeid} == $NODE{""}
457 root 1.33 or Carp::croak "$port: rcv can only be called on local ports, caught";
458 root 1.22
459 root 1.50 while (@_) {
460     if (ref $_[0]) {
461     if (my $self = $PORT_DATA{$portid}) {
462     "AnyEvent::MP::Port" eq ref $self
463     or Carp::croak "$port: rcv can only be called on message matching ports, caught";
464 root 1.33
465 root 1.103 $self->[0] = shift;
466 root 1.50 } else {
467     my $cb = shift;
468     $PORT{$portid} = sub {
469     local $SELF = $port;
470     eval { &$cb }; _self_die if $@;
471     };
472     }
473     } elsif (defined $_[0]) {
474     my $self = $PORT_DATA{$portid} ||= do {
475 root 1.103 my $self = bless [$PORT{$portid} || sub { }, { }, $port], "AnyEvent::MP::Port";
476 root 1.50
477     $PORT{$portid} = sub {
478     local $SELF = $port;
479    
480     if (my $cb = $self->[1]{$_[0]}) {
481     shift;
482     eval { &$cb }; _self_die if $@;
483     } else {
484     &{ $self->[0] };
485 root 1.33 }
486     };
487 root 1.50
488     $self
489 root 1.33 };
490    
491 root 1.50 "AnyEvent::MP::Port" eq ref $self
492     or Carp::croak "$port: rcv can only be called on message matching ports, caught";
493 root 1.22
494 root 1.50 my ($tag, $cb) = splice @_, 0, 2;
495 root 1.33
496 root 1.50 if (defined $cb) {
497     $self->[1]{$tag} = $cb;
498 root 1.33 } else {
499 root 1.50 delete $self->[1]{$tag};
500 root 1.33 }
501 root 1.22 }
502 root 1.3 }
503 root 1.31
504 root 1.33 $port
505 root 1.2 }
506    
507 root 1.101 =item peval $port, $coderef[, @args]
508    
509     Evaluates the given C<$codref> within the contetx of C<$port>, that is,
510     when the code throews an exception the C<$port> will be killed.
511    
512     Any remaining args will be passed to the callback. Any return values will
513     be returned to the caller.
514    
515     This is useful when you temporarily want to execute code in the context of
516     a port.
517    
518     Example: create a port and run some initialisation code in it's context.
519    
520     my $port = port { ... };
521    
522     peval $port, sub {
523     init
524     or die "unable to init";
525     };
526    
527     =cut
528    
529     sub peval($$) {
530     local $SELF = shift;
531     my $cb = shift;
532    
533     if (wantarray) {
534     my @res = eval { &$cb };
535     _self_die if $@;
536     @res
537     } else {
538     my $res = eval { &$cb };
539     _self_die if $@;
540     $res
541     }
542     }
543    
544 root 1.22 =item $closure = psub { BLOCK }
545 root 1.2
546 root 1.22 Remembers C<$SELF> and creates a closure out of the BLOCK. When the
547     closure is executed, sets up the environment in the same way as in C<rcv>
548     callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
549    
550 root 1.101 The effect is basically as if it returned C<< sub { peval $SELF, sub {
551 root 1.114 BLOCK }, @_ } >>.
552 root 1.101
553 root 1.22 This is useful when you register callbacks from C<rcv> callbacks:
554    
555     rcv delayed_reply => sub {
556     my ($delay, @reply) = @_;
557     my $timer = AE::timer $delay, 0, psub {
558     snd @reply, $SELF;
559     };
560     };
561 root 1.3
562 root 1.8 =cut
563 root 1.3
564 root 1.22 sub psub(&) {
565     my $cb = shift;
566 root 1.3
567 root 1.22 my $port = $SELF
568     or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
569 root 1.1
570 root 1.22 sub {
571     local $SELF = $port;
572 root 1.2
573 root 1.22 if (wantarray) {
574     my @res = eval { &$cb };
575     _self_die if $@;
576     @res
577     } else {
578     my $res = eval { &$cb };
579     _self_die if $@;
580     $res
581     }
582     }
583 root 1.2 }
584    
585 root 1.67 =item $guard = mon $port, $rcvport # kill $rcvport when $port dies
586 root 1.36
587 root 1.67 =item $guard = mon $port # kill $SELF when $port dies
588 root 1.32
589 root 1.139 =item $guard = mon $port, $cb->(@reason) # call $cb when $port dies
590    
591 root 1.67 =item $guard = mon $port, $rcvport, @msg # send a message when $port dies
592 root 1.32
593 root 1.42 Monitor the given port and do something when the port is killed or
594     messages to it were lost, and optionally return a guard that can be used
595     to stop monitoring again.
596    
597 root 1.139 The first two forms distinguish between "normal" and "abnormal" kil's:
598 root 1.32
599 root 1.139 In the first form (another port given), if the C<$port> is C<kil>'ed with
600     a non-empty reason, the other port (C<$rcvport>) will be kil'ed with the
601     same reason. That is, on "normal" kil's nothing happens, while under all
602     other conditions, the other port is killed with the same reason.
603 root 1.32
604 root 1.139 The second form (kill self) is the same as the first form, except that
605 root 1.36 C<$rvport> defaults to C<$SELF>.
606    
607 root 1.139 The remaining forms don't distinguish between "normal" and "abnormal" kil's
608     - it's up to the callback or receiver to check whether the C<@reason> is
609     empty and act accordingly.
610    
611     In the third form (callback), the callback is simply called with any
612     number of C<@reason> elements (empty @reason means that the port was deleted
613     "normally"). Note also that I<< the callback B<must> never die >>, so use
614     C<eval> if unsure.
615    
616     In the last form (message), a message of the form C<$rcvport, @msg,
617     @reason> will be C<snd>.
618 root 1.32
619 root 1.79 Monitoring-actions are one-shot: once messages are lost (and a monitoring
620 root 1.139 alert was raised), they are removed and will not trigger again, even if it
621     turns out that the port is still alive.
622 root 1.79
623 root 1.139 As a rule of thumb, monitoring requests should always monitor a remote
624     port locally (using a local C<$rcvport> or a callback). The reason is that
625     kill messages might get lost, just like any other message. Another less
626     obvious reason is that even monitoring requests can get lost (for example,
627     when the connection to the other node goes down permanently). When
628     monitoring a port locally these problems do not exist.
629 root 1.37
630 root 1.79 C<mon> effectively guarantees that, in the absence of hardware failures,
631     after starting the monitor, either all messages sent to the port will
632     arrive, or the monitoring action will be invoked after possible message
633     loss has been detected. No messages will be lost "in between" (after
634     the first lost message no further messages will be received by the
635     port). After the monitoring action was invoked, further messages might get
636     delivered again.
637    
638     Inter-host-connection timeouts and monitoring depend on the transport
639     used. The only transport currently implemented is TCP, and AnyEvent::MP
640     relies on TCP to detect node-downs (this can take 10-15 minutes on a
641 elmex 1.96 non-idle connection, and usually around two hours for idle connections).
642 root 1.79
643     This means that monitoring is good for program errors and cleaning up
644     stuff eventually, but they are no replacement for a timeout when you need
645     to ensure some maximum latency.
646    
647 root 1.32 Example: call a given callback when C<$port> is killed.
648    
649     mon $port, sub { warn "port died because of <@_>\n" };
650    
651     Example: kill ourselves when C<$port> is killed abnormally.
652    
653 root 1.36 mon $port;
654 root 1.32
655 root 1.36 Example: send us a restart message when another C<$port> is killed.
656 root 1.32
657     mon $port, $self => "restart";
658    
659     =cut
660    
661     sub mon {
662 root 1.75 my ($nodeid, $port) = split /#/, shift, 2;
663 root 1.32
664 root 1.75 my $node = $NODE{$nodeid} || add_node $nodeid;
665 root 1.32
666 root 1.41 my $cb = @_ ? shift : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,';
667 root 1.32
668     unless (ref $cb) {
669     if (@_) {
670     # send a kill info message
671 root 1.41 my (@msg) = ($cb, @_);
672 root 1.32 $cb = sub { snd @msg, @_ };
673     } else {
674     # simply kill other port
675     my $port = $cb;
676     $cb = sub { kil $port, @_ if @_ };
677     }
678     }
679    
680     $node->monitor ($port, $cb);
681    
682     defined wantarray
683 root 1.124 and ($cb += 0, Guard::guard { $node->unmonitor ($port, $cb) })
684 root 1.32 }
685    
686     =item $guard = mon_guard $port, $ref, $ref...
687    
688     Monitors the given C<$port> and keeps the passed references. When the port
689     is killed, the references will be freed.
690    
691     Optionally returns a guard that will stop the monitoring.
692    
693     This function is useful when you create e.g. timers or other watchers and
694 root 1.67 want to free them when the port gets killed (note the use of C<psub>):
695 root 1.32
696     $port->rcv (start => sub {
697 root 1.67 my $timer; $timer = mon_guard $port, AE::timer 1, 1, psub {
698 root 1.32 undef $timer if 0.9 < rand;
699     });
700     });
701    
702     =cut
703    
704     sub mon_guard {
705     my ($port, @refs) = @_;
706    
707 root 1.36 #TODO: mon-less form?
708    
709 root 1.32 mon $port, sub { 0 && @refs }
710     }
711    
712 root 1.33 =item kil $port[, @reason]
713 root 1.32
714     Kill the specified port with the given C<@reason>.
715    
716 root 1.107 If no C<@reason> is specified, then the port is killed "normally" -
717     monitor callback will be invoked, but the kil will not cause linked ports
718     (C<mon $mport, $lport> form) to get killed.
719 root 1.32
720 root 1.107 If a C<@reason> is specified, then linked ports (C<mon $mport, $lport>
721     form) get killed with the same reason.
722 root 1.32
723     Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
724     will be reported as reason C<< die => $@ >>.
725    
726     Transport/communication errors are reported as C<< transport_error =>
727     $message >>.
728    
729 root 1.133 Common idioms:
730    
731     # silently remove yourself, do not kill linked ports
732     kil $SELF;
733    
734     # report a failure in some detail
735     kil $SELF, failure_mode_1 => "it failed with too high temperature";
736    
737     # do not waste much time with killing, just die when something goes wrong
738     open my $fh, "<file"
739     or die "file: $!";
740 root 1.38
741     =item $port = spawn $node, $initfunc[, @initdata]
742    
743     Creates a port on the node C<$node> (which can also be a port ID, in which
744     case it's the node where that port resides).
745    
746 root 1.67 The port ID of the newly created port is returned immediately, and it is
747     possible to immediately start sending messages or to monitor the port.
748 root 1.38
749 root 1.67 After the port has been created, the init function is called on the remote
750     node, in the same context as a C<rcv> callback. This function must be a
751     fully-qualified function name (e.g. C<MyApp::Chat::Server::init>). To
752     specify a function in the main program, use C<::name>.
753 root 1.38
754     If the function doesn't exist, then the node tries to C<require>
755     the package, then the package above the package and so on (e.g.
756     C<MyApp::Chat::Server>, C<MyApp::Chat>, C<MyApp>) until the function
757     exists or it runs out of package names.
758    
759     The init function is then called with the newly-created port as context
760 root 1.82 object (C<$SELF>) and the C<@initdata> values as arguments. It I<must>
761     call one of the C<rcv> functions to set callbacks on C<$SELF>, otherwise
762     the port might not get created.
763 root 1.38
764 root 1.67 A common idiom is to pass a local port, immediately monitor the spawned
765     port, and in the remote init function, immediately monitor the passed
766     local port. This two-way monitoring ensures that both ports get cleaned up
767     when there is a problem.
768 root 1.38
769 root 1.80 C<spawn> guarantees that the C<$initfunc> has no visible effects on the
770     caller before C<spawn> returns (by delaying invocation when spawn is
771     called for the local node).
772    
773 root 1.38 Example: spawn a chat server port on C<$othernode>.
774    
775     # this node, executed from within a port context:
776     my $server = spawn $othernode, "MyApp::Chat::Server::connect", $SELF;
777     mon $server;
778    
779     # init function on C<$othernode>
780     sub connect {
781     my ($srcport) = @_;
782    
783     mon $srcport;
784    
785     rcv $SELF, sub {
786     ...
787     };
788     }
789    
790     =cut
791    
792     sub _spawn {
793     my $port = shift;
794     my $init = shift;
795    
796 root 1.82 # rcv will create the actual port
797 root 1.38 local $SELF = "$NODE#$port";
798     eval {
799     &{ load_func $init }
800     };
801     _self_die if $@;
802     }
803    
804     sub spawn(@) {
805 root 1.75 my ($nodeid, undef) = split /#/, shift, 2;
806 root 1.38
807 root 1.123 my $id = $RUNIQ . ++$ID;
808 root 1.38
809 root 1.39 $_[0] =~ /::/
810     or Carp::croak "spawn init function must be a fully-qualified name, caught";
811    
812 root 1.75 snd_to_func $nodeid, "AnyEvent::MP::_spawn" => $id, @_;
813 root 1.38
814 root 1.75 "$nodeid#$id"
815 root 1.38 }
816    
817 root 1.121
818 root 1.59 =item after $timeout, @msg
819    
820     =item after $timeout, $callback
821    
822     Either sends the given message, or call the given callback, after the
823     specified number of seconds.
824    
825 root 1.67 This is simply a utility function that comes in handy at times - the
826     AnyEvent::MP author is not convinced of the wisdom of having it, though,
827     so it may go away in the future.
828 root 1.59
829     =cut
830    
831     sub after($@) {
832     my ($timeout, @action) = @_;
833    
834     my $t; $t = AE::timer $timeout, 0, sub {
835     undef $t;
836     ref $action[0]
837     ? $action[0]()
838     : snd @action;
839     };
840     }
841    
842 root 1.129 #=item $cb2 = timeout $seconds, $cb[, @args]
843    
844 root 1.87 =item cal $port, @msg, $callback[, $timeout]
845    
846     A simple form of RPC - sends a message to the given C<$port> with the
847     given contents (C<@msg>), but adds a reply port to the message.
848    
849     The reply port is created temporarily just for the purpose of receiving
850     the reply, and will be C<kil>ed when no longer needed.
851    
852     A reply message sent to the port is passed to the C<$callback> as-is.
853    
854     If an optional time-out (in seconds) is given and it is not C<undef>,
855     then the callback will be called without any arguments after the time-out
856     elapsed and the port is C<kil>ed.
857    
858 root 1.98 If no time-out is given (or it is C<undef>), then the local port will
859     monitor the remote port instead, so it eventually gets cleaned-up.
860 root 1.87
861     Currently this function returns the temporary port, but this "feature"
862     might go in future versions unless you can make a convincing case that
863     this is indeed useful for something.
864    
865     =cut
866    
867     sub cal(@) {
868     my $timeout = ref $_[-1] ? undef : pop;
869     my $cb = pop;
870    
871     my $port = port {
872     undef $timeout;
873     kil $SELF;
874     &$cb;
875     };
876    
877     if (defined $timeout) {
878     $timeout = AE::timer $timeout, 0, sub {
879     undef $timeout;
880     kil $port;
881     $cb->();
882     };
883     } else {
884     mon $_[0], sub {
885     kil $port;
886     $cb->();
887     };
888     }
889    
890     push @_, $port;
891     &snd;
892    
893     $port
894     }
895    
896 root 1.8 =back
897    
898 root 1.124 =head1 DISTRIBUTED DATABASE
899    
900     AnyEvent::MP comes with a simple distributed database. The database will
901 root 1.131 be mirrored asynchronously on all global nodes. Other nodes bind to one
902     of the global nodes for their needs. Every node has a "local database"
903     which contains all the values that are set locally. All local databases
904     are merged together to form the global database, which can be queried.
905    
906     The database structure is that of a two-level hash - the database hash
907     contains hashes which contain values, similarly to a perl hash of hashes,
908     i.e.:
909 root 1.124
910 root 1.131 $DATABASE{$family}{$subkey} = $value
911 root 1.124
912     The top level hash key is called "family", and the second-level hash key
913 root 1.126 is called "subkey" or simply "key".
914 root 1.124
915 root 1.125 The family must be alphanumeric, i.e. start with a letter and consist
916     of letters, digits, underscores and colons (C<[A-Za-z][A-Za-z0-9_:]*>,
917     pretty much like Perl module names.
918 root 1.124
919 root 1.125 As the family namespace is global, it is recommended to prefix family names
920 root 1.124 with the name of the application or module using it.
921    
922 root 1.126 The subkeys must be non-empty strings, with no further restrictions.
923 root 1.125
924 root 1.124 The values should preferably be strings, but other perl scalars should
925 root 1.131 work as well (such as C<undef>, arrays and hashes).
926 root 1.124
927 root 1.126 Every database entry is owned by one node - adding the same family/subkey
928 root 1.124 combination on multiple nodes will not cause discomfort for AnyEvent::MP,
929     but the result might be nondeterministic, i.e. the key might have
930     different values on different nodes.
931    
932 root 1.126 Different subkeys in the same family can be owned by different nodes
933     without problems, and in fact, this is the common method to create worker
934     pools. For example, a worker port for image scaling might do this:
935 root 1.124
936 root 1.126 db_set my_image_scalers => $port;
937 root 1.124
938 root 1.126 And clients looking for an image scaler will want to get the
939 root 1.129 C<my_image_scalers> keys from time to time:
940    
941     db_keys my_image_scalers => sub {
942     @ports = @{ $_[0] };
943     };
944    
945     Or better yet, they want to monitor the database family, so they always
946     have a reasonable up-to-date copy:
947    
948     db_mon my_image_scalers => sub {
949     @ports = keys %{ $_[0] };
950     };
951    
952     In general, you can set or delete single subkeys, but query and monitor
953     whole families only.
954 root 1.126
955 root 1.129 If you feel the need to monitor or query a single subkey, try giving it
956     it's own family.
957 root 1.126
958     =over
959    
960 root 1.137 =item $guard = db_set $family => $subkey [=> $value]
961 root 1.126
962     Sets (or replaces) a key to the database - if C<$value> is omitted,
963     C<undef> is used instead.
964    
965 root 1.137 When called in non-void context, C<db_set> returns a guard that
966     automatically calls C<db_del> when it is destroyed.
967    
968 root 1.130 =item db_del $family => $subkey...
969 root 1.124
970 root 1.130 Deletes one or more subkeys from the database family.
971 root 1.124
972 root 1.137 =item $guard = db_reg $family => $port => $value
973    
974     =item $guard = db_reg $family => $port
975    
976     =item $guard = db_reg $family
977    
978     Registers a port in the given family and optionally returns a guard to
979     remove it.
980    
981     This function basically does the same as:
982 root 1.124
983 root 1.137 db_set $family => $port => $value
984    
985     Except that the port is monitored and automatically removed from the
986     database family when it is kil'ed.
987    
988     If C<$value> is missing, C<undef> is used. If C<$port> is missing, then
989     C<$SELF> is used.
990    
991     This function is most useful to register a port in some port group (which
992     is just another name for a database family), and have it removed when the
993     port is gone. This works best when the port is a local port.
994    
995     =cut
996    
997     sub db_reg($$;$) {
998     my $family = shift;
999     my $port = @_ ? shift : $SELF;
1000    
1001     my $clr = sub { db_del $family => $port };
1002     mon $port, $clr;
1003    
1004     db_set $family => $port => $_[0];
1005    
1006     defined wantarray
1007     and &Guard::guard ($clr)
1008     }
1009 root 1.124
1010 root 1.129 =item db_family $family => $cb->(\%familyhash)
1011    
1012     Queries the named database C<$family> and call the callback with the
1013     family represented as a hash. You can keep and freely modify the hash.
1014    
1015     =item db_keys $family => $cb->(\@keys)
1016    
1017     Same as C<db_family>, except it only queries the family I<subkeys> and passes
1018     them as array reference to the callback.
1019    
1020     =item db_values $family => $cb->(\@values)
1021    
1022     Same as C<db_family>, except it only queries the family I<values> and passes them
1023     as array reference to the callback.
1024    
1025 root 1.130 =item $guard = db_mon $family => $cb->($familyhash, \@added, \@changed, \@deleted)
1026 root 1.128
1027 root 1.130 Creates a monitor on the given database family. Each time a key is set
1028     or or is deleted the callback is called with a hash containing the
1029     database family and three lists of added, changed and deleted subkeys,
1030     respectively. If no keys have changed then the array reference might be
1031     C<undef> or even missing.
1032    
1033 root 1.132 If not called in void context, a guard object is returned that, when
1034     destroyed, stops the monitor.
1035    
1036 root 1.130 The family hash reference and the key arrays belong to AnyEvent::MP and
1037     B<must not be modified or stored> by the callback. When in doubt, make a
1038     copy.
1039    
1040     As soon as possible after the monitoring starts, the callback will be
1041     called with the intiial contents of the family, even if it is empty,
1042     i.e. there will always be a timely call to the callback with the current
1043     contents.
1044 root 1.128
1045     It is possible that the callback is called with a change event even though
1046     the subkey is already present and the value has not changed.
1047    
1048     The monitoring stops when the guard object is destroyed.
1049    
1050     Example: on every change to the family "mygroup", print out all keys.
1051    
1052     my $guard = db_mon mygroup => sub {
1053 root 1.130 my ($family, $a, $c, $d) = @_;
1054 root 1.128 print "mygroup members: ", (join " ", keys %$family), "\n";
1055     };
1056    
1057     Exmaple: wait until the family "My::Module::workers" is non-empty.
1058    
1059     my $guard; $guard = db_mon My::Module::workers => sub {
1060 root 1.130 my ($family, $a, $c, $d) = @_;
1061 root 1.128 return unless %$family;
1062     undef $guard;
1063     print "My::Module::workers now nonempty\n";
1064     };
1065    
1066     Example: print all changes to the family "AnyRvent::Fantasy::Module".
1067    
1068     my $guard = db_mon AnyRvent::Fantasy::Module => sub {
1069 root 1.130 my ($family, $a, $c, $d) = @_;
1070 root 1.128
1071 root 1.130 print "+$_=$family->{$_}\n" for @$a;
1072     print "*$_=$family->{$_}\n" for @$c;
1073     print "-$_=$family->{$_}\n" for @$d;
1074 root 1.128 };
1075    
1076 root 1.124 =cut
1077    
1078     =back
1079    
1080 root 1.26 =head1 AnyEvent::MP vs. Distributed Erlang
1081    
1082 root 1.35 AnyEvent::MP got lots of its ideas from distributed Erlang (Erlang node
1083     == aemp node, Erlang process == aemp port), so many of the documents and
1084     programming techniques employed by Erlang apply to AnyEvent::MP. Here is a
1085 root 1.27 sample:
1086    
1087 root 1.95 http://www.erlang.se/doc/programming_rules.shtml
1088     http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4
1089     http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6
1090     http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5
1091 root 1.27
1092     Despite the similarities, there are also some important differences:
1093 root 1.26
1094     =over 4
1095    
1096 root 1.65 =item * Node IDs are arbitrary strings in AEMP.
1097 root 1.26
1098 root 1.65 Erlang relies on special naming and DNS to work everywhere in the same
1099     way. AEMP relies on each node somehow knowing its own address(es) (e.g. by
1100 root 1.99 configuration or DNS), and possibly the addresses of some seed nodes, but
1101     will otherwise discover other nodes (and their IDs) itself.
1102 root 1.27
1103 root 1.54 =item * Erlang has a "remote ports are like local ports" philosophy, AEMP
1104 root 1.51 uses "local ports are like remote ports".
1105    
1106     The failure modes for local ports are quite different (runtime errors
1107     only) then for remote ports - when a local port dies, you I<know> it dies,
1108     when a connection to another node dies, you know nothing about the other
1109     port.
1110    
1111     Erlang pretends remote ports are as reliable as local ports, even when
1112     they are not.
1113    
1114     AEMP encourages a "treat remote ports differently" philosophy, with local
1115     ports being the special case/exception, where transport errors cannot
1116     occur.
1117    
1118 root 1.26 =item * Erlang uses processes and a mailbox, AEMP does not queue.
1119    
1120 root 1.119 Erlang uses processes that selectively receive messages out of order, and
1121     therefore needs a queue. AEMP is event based, queuing messages would serve
1122     no useful purpose. For the same reason the pattern-matching abilities
1123     of AnyEvent::MP are more limited, as there is little need to be able to
1124 elmex 1.77 filter messages without dequeuing them.
1125 root 1.26
1126 root 1.119 This is not a philosophical difference, but simply stems from AnyEvent::MP
1127     being event-based, while Erlang is process-based.
1128    
1129     You cna have a look at L<Coro::MP> for a more Erlang-like process model on
1130     top of AEMP and Coro threads.
1131 root 1.26
1132     =item * Erlang sends are synchronous, AEMP sends are asynchronous.
1133    
1134 root 1.119 Sending messages in Erlang is synchronous and blocks the process until
1135     a conenction has been established and the message sent (and so does not
1136     need a queue that can overflow). AEMP sends return immediately, connection
1137     establishment is handled in the background.
1138 root 1.26
1139 root 1.51 =item * Erlang suffers from silent message loss, AEMP does not.
1140 root 1.26
1141 root 1.99 Erlang implements few guarantees on messages delivery - messages can get
1142     lost without any of the processes realising it (i.e. you send messages a,
1143     b, and c, and the other side only receives messages a and c).
1144 root 1.26
1145 root 1.117 AEMP guarantees (modulo hardware errors) correct ordering, and the
1146     guarantee that after one message is lost, all following ones sent to the
1147     same port are lost as well, until monitoring raises an error, so there are
1148     no silent "holes" in the message sequence.
1149 root 1.26
1150 root 1.119 If you want your software to be very reliable, you have to cope with
1151     corrupted and even out-of-order messages in both Erlang and AEMP. AEMP
1152     simply tries to work better in common error cases, such as when a network
1153     link goes down.
1154    
1155 root 1.26 =item * Erlang can send messages to the wrong port, AEMP does not.
1156    
1157 root 1.119 In Erlang it is quite likely that a node that restarts reuses an Erlang
1158     process ID known to other nodes for a completely different process,
1159     causing messages destined for that process to end up in an unrelated
1160     process.
1161 root 1.26
1162 root 1.119 AEMP does not reuse port IDs, so old messages or old port IDs floating
1163 root 1.26 around in the network will not be sent to an unrelated port.
1164    
1165     =item * Erlang uses unprotected connections, AEMP uses secure
1166     authentication and can use TLS.
1167    
1168 root 1.66 AEMP can use a proven protocol - TLS - to protect connections and
1169 root 1.26 securely authenticate nodes.
1170    
1171 root 1.28 =item * The AEMP protocol is optimised for both text-based and binary
1172     communications.
1173    
1174 root 1.66 The AEMP protocol, unlike the Erlang protocol, supports both programming
1175 root 1.119 language independent text-only protocols (good for debugging), and binary,
1176 root 1.67 language-specific serialisers (e.g. Storable). By default, unless TLS is
1177     used, the protocol is actually completely text-based.
1178 root 1.28
1179     It has also been carefully designed to be implementable in other languages
1180 root 1.66 with a minimum of work while gracefully degrading functionality to make the
1181 root 1.28 protocol simple.
1182    
1183 root 1.35 =item * AEMP has more flexible monitoring options than Erlang.
1184    
1185 root 1.119 In Erlang, you can chose to receive I<all> exit signals as messages or
1186     I<none>, there is no in-between, so monitoring single Erlang processes is
1187     difficult to implement.
1188    
1189     Monitoring in AEMP is more flexible than in Erlang, as one can choose
1190     between automatic kill, exit message or callback on a per-port basis.
1191 root 1.35
1192 root 1.37 =item * Erlang tries to hide remote/local connections, AEMP does not.
1193 root 1.35
1194 root 1.67 Monitoring in Erlang is not an indicator of process death/crashes, in the
1195     same way as linking is (except linking is unreliable in Erlang).
1196 root 1.37
1197     In AEMP, you don't "look up" registered port names or send to named ports
1198     that might or might not be persistent. Instead, you normally spawn a port
1199 root 1.67 on the remote node. The init function monitors you, and you monitor the
1200     remote port. Since both monitors are local to the node, they are much more
1201     reliable (no need for C<spawn_link>).
1202 root 1.37
1203     This also saves round-trips and avoids sending messages to the wrong port
1204     (hard to do in Erlang).
1205 root 1.35
1206 root 1.26 =back
1207    
1208 root 1.46 =head1 RATIONALE
1209    
1210     =over 4
1211    
1212 root 1.67 =item Why strings for port and node IDs, why not objects?
1213 root 1.46
1214     We considered "objects", but found that the actual number of methods
1215 root 1.67 that can be called are quite low. Since port and node IDs travel over
1216 root 1.46 the network frequently, the serialising/deserialising would add lots of
1217 root 1.67 overhead, as well as having to keep a proxy object everywhere.
1218 root 1.46
1219     Strings can easily be printed, easily serialised etc. and need no special
1220     procedures to be "valid".
1221    
1222 root 1.110 And as a result, a port with just a default receiver consists of a single
1223 root 1.117 code reference stored in a global hash - it can't become much cheaper.
1224 root 1.47
1225 root 1.67 =item Why favour JSON, why not a real serialising format such as Storable?
1226 root 1.46
1227     In fact, any AnyEvent::MP node will happily accept Storable as framing
1228     format, but currently there is no way to make a node use Storable by
1229 root 1.67 default (although all nodes will accept it).
1230 root 1.46
1231     The default framing protocol is JSON because a) JSON::XS is many times
1232     faster for small messages and b) most importantly, after years of
1233     experience we found that object serialisation is causing more problems
1234 root 1.67 than it solves: Just like function calls, objects simply do not travel
1235 root 1.46 easily over the network, mostly because they will always be a copy, so you
1236     always have to re-think your design.
1237    
1238     Keeping your messages simple, concentrating on data structures rather than
1239     objects, will keep your messages clean, tidy and efficient.
1240    
1241     =back
1242    
1243 root 1.137 =head1 PORTING FROM AnyEvent::MP VERSION 1.X
1244    
1245 root 1.139 AEMP version 2 has a few major incompatible changes compared to version 1:
1246 root 1.137
1247     =over 4
1248    
1249     =item AnyEvent::MP::Global no longer has group management functions.
1250    
1251 root 1.140 At least not officially - the grp_* functions are still exported and might
1252     work, but they will be removed in some later release.
1253    
1254 root 1.137 AnyEvent::MP now comes with a distributed database that is more
1255 root 1.139 powerful. Its database families map closely to port groups, but the API
1256     has changed (the functions are also now exported by AnyEvent::MP). Here is
1257     a rough porting guide:
1258 root 1.137
1259     grp_reg $group, $port # old
1260     db_reg $group, $port # new
1261    
1262     $list = grp_get $group # old
1263     db_keys $group, sub { my $list = shift } # new
1264    
1265     grp_mon $group, $cb->(\@ports, $add, $del) # old
1266     db_mon $group, $cb->(\%ports, $add, $change, $del) # new
1267    
1268 root 1.139 C<grp_reg> is a no-brainer (just replace by C<db_reg>), but C<grp_get> is
1269     no longer instant, because the local node might not have a copy of the
1270     group. You can either modify your code to allow for a callback, or use
1271     C<db_mon> to keep an updated copy of the group:
1272 root 1.137
1273     my $local_group_copy;
1274 root 1.139 db_mon $group => sub { $local_group_copy = $_[0] };
1275 root 1.137
1276 root 1.139 # now "keys %$local_group_copy" always returns the most up-to-date
1277 root 1.137 # list of ports in the group.
1278    
1279 root 1.139 C<grp_mon> can be replaced by C<db_mon> with minor changes - C<db_mon>
1280     passes a hash as first argument, and an extra C<$chg> argument that can be
1281     ignored:
1282 root 1.137
1283     db_mon $group => sub {
1284     my ($ports, $add, $chg, $lde) = @_;
1285     $ports = [keys %$ports];
1286    
1287     # now $ports, $add and $del are the same as
1288     # were originally passed by grp_mon.
1289     ...
1290     };
1291    
1292     =item Nodes not longer connect to all other nodes.
1293    
1294     In AEMP 1.x, every node automatically loads the L<AnyEvent::MP::Global>
1295     module, which in turn would create connections to all other nodes in the
1296     network (helped by the seed nodes).
1297    
1298     In version 2.x, global nodes still connect to all other global nodes, but
1299     other nodes don't - now every node either is a global node itself, or
1300     attaches itself to another global node.
1301    
1302     If a node isn't a global node itself, then it attaches itself to one
1303     of its seed nodes. If that seed node isn't a global node yet, it will
1304     automatically be upgraded to a global node.
1305    
1306     So in many cases, nothing needs to be changed - one just has to make sure
1307     that all seed nodes are meshed together with the other seed nodes (as with
1308 root 1.139 AEMP 1.x), and other nodes specify them as seed nodes. This is most easily
1309     achieved by specifying the same set of seed nodes for all nodes in the
1310     network.
1311 root 1.137
1312     Not opening a connection to every other node is usually an advantage,
1313     except when you need the lower latency of an already established
1314     connection. To ensure a node establishes a connection to another node,
1315     you can monitor the node port (C<mon $node, ...>), which will attempt to
1316 root 1.138 create the connection (and notify you when the connection fails).
1317 root 1.137
1318 root 1.138 =item Listener-less nodes (nodes without binds) are gone.
1319 root 1.137
1320 root 1.138 And are not coming back, at least not in their old form. If no C<binds>
1321 root 1.139 are specified for a node, AnyEvent::MP assumes a default of C<*:*>.
1322 root 1.137
1323     There are vague plans to implement some form of routing domains, which
1324     might or might not bring back listener-less nodes, but don't count on it.
1325    
1326     The fact that most connections are now optional somewhat mitigates this,
1327     as a node can be effectively unreachable from the outside without any
1328     problems, as long as it isn't a global node and only reaches out to other
1329     nodes (as opposed to being contacted from other nodes).
1330    
1331 root 1.138 =item $AnyEvent::MP::Kernel::WARN has gone.
1332    
1333     AnyEvent has acquired a logging framework (L<AnyEvent::Log>), and AEMP now
1334     uses this, and so should your programs.
1335    
1336     Every module now documents what kinds of messages it generates, with
1337     AnyEvent::MP acting as a catch all.
1338    
1339     On the positive side, this means that instead of setting
1340 root 1.139 C<PERL_ANYEVENT_MP_WARNLEVEL>, you can get away by setting C<AE_VERBOSE> -
1341 root 1.138 much less to type.
1342    
1343 root 1.137 =back
1344    
1345 root 1.139 =head1 LOGGING
1346    
1347     AnyEvent::MP does not normally log anything by itself, but sinc eit is the
1348     root of the contetx hierarchy for AnyEvent::MP modules, it will receive
1349     all log messages by submodules.
1350    
1351 root 1.1 =head1 SEE ALSO
1352    
1353 root 1.68 L<AnyEvent::MP::Intro> - a gentle introduction.
1354    
1355     L<AnyEvent::MP::Kernel> - more, lower-level, stuff.
1356    
1357 root 1.113 L<AnyEvent::MP::Global> - network maintenance and port groups, to find
1358 root 1.68 your applications.
1359    
1360 root 1.105 L<AnyEvent::MP::DataConn> - establish data connections between nodes.
1361    
1362 root 1.81 L<AnyEvent::MP::LogCatcher> - simple service to display log messages from
1363     all nodes.
1364    
1365 root 1.1 L<AnyEvent>.
1366    
1367     =head1 AUTHOR
1368    
1369     Marc Lehmann <schmorp@schmorp.de>
1370     http://home.schmorp.de/
1371    
1372     =cut
1373    
1374     1
1375