ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.145
Committed: Tue Oct 2 13:58:07 2012 UTC (11 years, 7 months ago) by root
Branch: MAIN
Changes since 1.144: +2 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.89 AnyEvent::MP - erlang-style multi-processing/message-passing framework
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.75 $NODE # contains this node's node ID
10     NODE # returns this node's node ID
11 root 1.2
12 root 1.38 $SELF # receiving/own port id in rcv callbacks
13    
14 root 1.48 # initialise the node so it can send/receive messages
15 root 1.72 configure;
16 root 1.48
17 root 1.75 # ports are message destinations
18 root 1.38
19     # sending messages
20 root 1.2 snd $port, type => data...;
21 root 1.38 snd $port, @msg;
22     snd @msg_with_first_element_being_a_port;
23 root 1.2
24 root 1.50 # creating/using ports, the simple way
25 root 1.73 my $simple_port = port { my @msg = @_ };
26 root 1.22
27 root 1.52 # creating/using ports, tagged message matching
28 root 1.38 my $port = port;
29 root 1.73 rcv $port, ping => sub { snd $_[0], "pong" };
30     rcv $port, pong => sub { warn "pong received\n" };
31 root 1.2
32 root 1.48 # create a port on another node
33     my $port = spawn $node, $initfunc, @initdata;
34    
35 root 1.116 # destroy a port again
36 root 1.101 kil $port; # "normal" kill
37     kil $port, my_error => "everything is broken"; # error kill
38    
39 root 1.35 # monitoring
40 root 1.129 mon $port, $cb->(@msg) # callback is invoked on death
41     mon $port, $localport # kill localport on abnormal death
42     mon $port, $localport, @msg # send message on death
43 root 1.35
44 root 1.101 # temporarily execute code in port context
45     peval $port, sub { die "kill the port!" };
46    
47     # execute callbacks in $SELF port context
48     my $timer = AE::timer 1, 0, psub {
49     die "kill the port, delayed";
50     };
51    
52 root 1.143 # distributed database - modification
53     db_set $family => $subkey [=> $value] # add a subkey
54     db_del $family => $subkey... # delete one or more subkeys
55     db_reg $family => $port [=> $value] # register a port
56    
57     # distributed database - queries
58     db_family $family => $cb->(\%familyhash)
59     db_keys $family => $cb->(\@keys)
60     db_values $family => $cb->(\@values)
61    
62     # distributed database - monitoring a family
63     db_mon $family => $cb->(\%familyhash, \@added, \@changed, \@deleted)
64    
65 root 1.1 =head1 DESCRIPTION
66    
67 root 1.2 This module (-family) implements a simple message passing framework.
68    
69     Despite its simplicity, you can securely message other processes running
70 root 1.67 on the same or other hosts, and you can supervise entities remotely.
71 root 1.2
72 root 1.23 For an introduction to this module family, see the L<AnyEvent::MP::Intro>
73 root 1.67 manual page and the examples under F<eg/>.
74 root 1.23
75 root 1.2 =head1 CONCEPTS
76    
77     =over 4
78    
79     =item port
80    
81 root 1.79 Not to be confused with a TCP port, a "port" is something you can send
82     messages to (with the C<snd> function).
83 root 1.29
84 root 1.53 Ports allow you to register C<rcv> handlers that can match all or just
85 root 1.64 some messages. Messages send to ports will not be queued, regardless of
86     anything was listening for them or not.
87 root 1.2
88 root 1.119 Ports are represented by (printable) strings called "port IDs".
89    
90 root 1.67 =item port ID - C<nodeid#portname>
91 root 1.2
92 root 1.123 A port ID is the concatenation of a node ID, a hash-mark (C<#>)
93     as separator, and a port name (a printable string of unspecified
94     format created by AnyEvent::MP).
95 root 1.2
96     =item node
97    
98 root 1.53 A node is a single process containing at least one port - the node port,
99 root 1.67 which enables nodes to manage each other remotely, and to create new
100 root 1.53 ports.
101 root 1.2
102 root 1.67 Nodes are either public (have one or more listening ports) or private
103     (no listening ports). Private nodes cannot talk to other private nodes
104 root 1.119 currently, but all nodes can talk to public nodes.
105    
106     Nodes is represented by (printable) strings called "node IDs".
107 root 1.2
108 root 1.117 =item node ID - C<[A-Za-z0-9_\-.:]*>
109 root 1.2
110 root 1.64 A node ID is a string that uniquely identifies the node within a
111     network. Depending on the configuration used, node IDs can look like a
112     hostname, a hostname and a port, or a random string. AnyEvent::MP itself
113 root 1.119 doesn't interpret node IDs in any way except to uniquely identify a node.
114 root 1.64
115     =item binds - C<ip:port>
116    
117     Nodes can only talk to each other by creating some kind of connection to
118     each other. To do this, nodes should listen on one or more local transport
119 root 1.119 endpoints - binds.
120    
121     Currently, only standard C<ip:port> specifications can be used, which
122     specify TCP ports to listen on. So a bind is basically just a tcp socket
123     in listening mode thta accepts conenctions form other nodes.
124 root 1.64
125 root 1.83 =item seed nodes
126 root 1.64
127 root 1.119 When a node starts, it knows nothing about the network it is in - it
128     needs to connect to at least one other node that is already in the
129     network. These other nodes are called "seed nodes".
130    
131     Seed nodes themselves are not special - they are seed nodes only because
132     some other node I<uses> them as such, but any node can be used as seed
133     node for other nodes, and eahc node cna use a different set of seed nodes.
134 root 1.83
135     In addition to discovering the network, seed nodes are also used to
136 root 1.119 maintain the network - all nodes using the same seed node form are part of
137     the same network. If a network is split into multiple subnets because e.g.
138     the network link between the parts goes down, then using the same seed
139     nodes for all nodes ensures that eventually the subnets get merged again.
140 root 1.83
141     Seed nodes are expected to be long-running, and at least one seed node
142 root 1.85 should always be available. They should also be relatively responsive - a
143     seed node that blocks for long periods will slow down everybody else.
144 root 1.83
145 root 1.119 For small networks, it's best if every node uses the same set of seed
146     nodes. For large networks, it can be useful to specify "regional" seed
147     nodes for most nodes in an area, and use all seed nodes as seed nodes for
148     each other. What's important is that all seed nodes connections form a
149     complete graph, so that the network cannot split into separate subnets
150     forever.
151    
152     Seed nodes are represented by seed IDs.
153    
154     =item seed IDs - C<host:port>
155 root 1.83
156 root 1.119 Seed IDs are transport endpoint(s) (usually a hostname/IP address and a
157 elmex 1.96 TCP port) of nodes that should be used as seed nodes.
158 root 1.29
159 root 1.119 =item global nodes
160    
161     An AEMP network needs a discovery service - nodes need to know how to
162     connect to other nodes they only know by name. In addition, AEMP offers a
163     distributed "group database", which maps group names to a list of strings
164     - for example, to register worker ports.
165    
166     A network needs at least one global node to work, and allows every node to
167     be a global node.
168    
169     Any node that loads the L<AnyEvent::MP::Global> module becomes a global
170     node and tries to keep connections to all other nodes. So while it can
171     make sense to make every node "global" in small networks, it usually makes
172     sense to only make seed nodes into global nodes in large networks (nodes
173     keep connections to seed nodes and global nodes, so makign them the same
174     reduces overhead).
175 root 1.67
176 root 1.2 =back
177    
178 root 1.3 =head1 VARIABLES/FUNCTIONS
179 root 1.2
180     =over 4
181    
182 root 1.1 =cut
183    
184     package AnyEvent::MP;
185    
186 root 1.121 use AnyEvent::MP::Config ();
187 root 1.44 use AnyEvent::MP::Kernel;
188 root 1.144 use AnyEvent::MP::Kernel qw(
189     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
190     add_node load_func
191    
192     NODE $NODE
193     configure
194     node_of port_is_local
195     snd kil
196     db_set db_del
197     db_mon db_family db_keys db_values
198     );
199 root 1.2
200 root 1.1 use common::sense;
201    
202 root 1.2 use Carp ();
203    
204 root 1.141 use AnyEvent ();
205 root 1.124 use Guard ();
206 root 1.1
207 root 1.2 use base "Exporter";
208    
209 root 1.121 our $VERSION = $AnyEvent::MP::Config::VERSION;
210 root 1.43
211 root 1.8 our @EXPORT = qw(
212 root 1.144 NODE $NODE
213 root 1.72 configure
214 root 1.144 node_of port_is_local
215     snd kil
216     db_set db_del
217     db_mon db_family db_keys db_values
218    
219     *SELF
220    
221     port rcv mon mon_guard psub peval spawn cal
222 root 1.124 db_set db_del db_reg
223 root 1.128 db_mon db_family db_keys db_values
224 root 1.144
225     after
226 root 1.8 );
227 root 1.2
228 root 1.22 our $SELF;
229    
230     sub _self_die() {
231     my $msg = $@;
232     $msg =~ s/\n+$// unless ref $msg;
233     kil $SELF, die => $msg;
234     }
235    
236     =item $thisnode = NODE / $NODE
237    
238 root 1.67 The C<NODE> function returns, and the C<$NODE> variable contains, the node
239 root 1.64 ID of the node running in the current process. This value is initialised by
240 root 1.72 a call to C<configure>.
241 root 1.22
242 root 1.63 =item $nodeid = node_of $port
243 root 1.22
244 root 1.67 Extracts and returns the node ID from a port ID or a node ID.
245 root 1.34
246 root 1.144 =item $is_local = port_is_local $port
247    
248     Returns true iff the port is a local port.
249    
250 root 1.78 =item configure $profile, key => value...
251    
252 root 1.72 =item configure key => value...
253 root 1.34
254 root 1.64 Before a node can talk to other nodes on the network (i.e. enter
255 root 1.72 "distributed mode") it has to configure itself - the minimum a node needs
256 root 1.64 to know is its own name, and optionally it should know the addresses of
257     some other nodes in the network to discover other nodes.
258 root 1.34
259 root 1.121 This function configures a node - it must be called exactly once (or
260     never) before calling other AnyEvent::MP functions.
261    
262 root 1.108 The key/value pairs are basically the same ones as documented for the
263 root 1.127 F<aemp> command line utility (sans the set/del prefix), with these additions:
264 root 1.121
265     =over 4
266    
267     =item norc => $boolean (default false)
268    
269     If true, then the rc file (e.g. F<~/.perl-anyevent-mp>) will I<not>
270 root 1.145 be consulted - all configuration options must be specified in the
271 root 1.121 C<configure> call.
272 root 1.108
273 root 1.121 =item force => $boolean (default false)
274    
275     IF true, then the values specified in the C<configure> will take
276     precedence over any values configured via the rc file. The default is for
277     the rc file to override any options specified in the program.
278    
279     =back
280 root 1.34
281 root 1.72 =over 4
282    
283     =item step 1, gathering configuration from profiles
284    
285     The function first looks up a profile in the aemp configuration (see the
286     L<aemp> commandline utility). The profile name can be specified via the
287 root 1.78 named C<profile> parameter or can simply be the first parameter). If it is
288     missing, then the nodename (F<uname -n>) will be used as profile name.
289 root 1.34
290 root 1.72 The profile data is then gathered as follows:
291 root 1.69
292 elmex 1.77 First, all remaining key => value pairs (all of which are conveniently
293 root 1.72 undocumented at the moment) will be interpreted as configuration
294     data. Then they will be overwritten by any values specified in the global
295     default configuration (see the F<aemp> utility), then the chain of
296     profiles chosen by the profile name (and any C<parent> attributes).
297    
298     That means that the values specified in the profile have highest priority
299     and the values specified directly via C<configure> have lowest priority,
300     and can only be used to specify defaults.
301 root 1.49
302 root 1.64 If the profile specifies a node ID, then this will become the node ID of
303 root 1.122 this process. If not, then the profile name will be used as node ID, with
304 root 1.126 a unique randoms tring (C</%u>) appended.
305 root 1.122
306 root 1.126 The node ID can contain some C<%> sequences that are expanded: C<%n>
307     is expanded to the local nodename, C<%u> is replaced by a random
308     strign to make the node unique. For example, the F<aemp> commandline
309     utility uses C<aemp/%n/%u> as nodename, which might expand to
310     C<aemp/cerebro/ZQDGSIkRhEZQDGSIkRhE>.
311 root 1.64
312 root 1.72 =item step 2, bind listener sockets
313    
314 root 1.64 The next step is to look up the binds in the profile, followed by binding
315     aemp protocol listeners on all binds specified (it is possible and valid
316     to have no binds, meaning that the node cannot be contacted form the
317     outside. This means the node cannot talk to other nodes that also have no
318     binds, but it can still talk to all "normal" nodes).
319    
320 root 1.70 If the profile does not specify a binds list, then a default of C<*> is
321 root 1.72 used, meaning the node will bind on a dynamically-assigned port on every
322     local IP address it finds.
323    
324     =item step 3, connect to seed nodes
325 root 1.64
326 root 1.119 As the last step, the seed ID list from the profile is passed to the
327 root 1.64 L<AnyEvent::MP::Global> module, which will then use it to keep
328 root 1.72 connectivity with at least one node at any point in time.
329 root 1.64
330 root 1.72 =back
331    
332 root 1.87 Example: become a distributed node using the local node name as profile.
333 root 1.72 This should be the most common form of invocation for "daemon"-type nodes.
334 root 1.34
335 root 1.72 configure
336 root 1.34
337 root 1.126 Example: become a semi-anonymous node. This form is often used for
338     commandline clients.
339 root 1.34
340 root 1.126 configure nodeid => "myscript/%n/%u";
341 root 1.72
342 root 1.120 Example: configure a node using a profile called seed, which is suitable
343 root 1.72 for a seed node as it binds on all local addresses on a fixed port (4040,
344     customary for aemp).
345    
346     # use the aemp commandline utility
347 root 1.122 # aemp profile seed binds '*:4040'
348 root 1.72
349     # then use it
350     configure profile => "seed";
351 root 1.34
352 root 1.72 # or simply use aemp from the shell again:
353     # aemp run profile seed
354 root 1.34
355 root 1.72 # or provide a nicer-to-remember nodeid
356     # aemp run profile seed nodeid "$(hostname)"
357 root 1.34
358 root 1.22 =item $SELF
359    
360     Contains the current port id while executing C<rcv> callbacks or C<psub>
361     blocks.
362 root 1.3
363 root 1.67 =item *SELF, SELF, %SELF, @SELF...
364 root 1.22
365     Due to some quirks in how perl exports variables, it is impossible to
366 root 1.67 just export C<$SELF>, all the symbols named C<SELF> are exported by this
367 root 1.22 module, but only C<$SELF> is currently used.
368 root 1.3
369 root 1.33 =item snd $port, type => @data
370 root 1.3
371 root 1.33 =item snd $port, @msg
372 root 1.3
373 root 1.67 Send the given message to the given port, which can identify either a
374     local or a remote port, and must be a port ID.
375 root 1.8
376 root 1.67 While the message can be almost anything, it is highly recommended to
377     use a string as first element (a port ID, or some word that indicates a
378     request type etc.) and to consist if only simple perl values (scalars,
379     arrays, hashes) - if you think you need to pass an object, think again.
380    
381     The message data logically becomes read-only after a call to this
382     function: modifying any argument (or values referenced by them) is
383     forbidden, as there can be considerable time between the call to C<snd>
384     and the time the message is actually being serialised - in fact, it might
385     never be copied as within the same process it is simply handed to the
386     receiving port.
387 root 1.3
388     The type of data you can transfer depends on the transport protocol: when
389     JSON is used, then only strings, numbers and arrays and hashes consisting
390     of those are allowed (no objects). When Storable is used, then anything
391     that Storable can serialise and deserialise is allowed, and for the local
392 root 1.67 node, anything can be passed. Best rely only on the common denominator of
393     these.
394 root 1.3
395 root 1.22 =item $local_port = port
396 root 1.2
397 root 1.50 Create a new local port object and returns its port ID. Initially it has
398     no callbacks set and will throw an error when it receives messages.
399 root 1.10
400 root 1.50 =item $local_port = port { my @msg = @_ }
401 root 1.15
402 root 1.50 Creates a new local port, and returns its ID. Semantically the same as
403     creating a port and calling C<rcv $port, $callback> on it.
404 root 1.15
405 root 1.50 The block will be called for every message received on the port, with the
406     global variable C<$SELF> set to the port ID. Runtime errors will cause the
407     port to be C<kil>ed. The message will be passed as-is, no extra argument
408     (i.e. no port ID) will be passed to the callback.
409 root 1.15
410 root 1.50 If you want to stop/destroy the port, simply C<kil> it:
411 root 1.15
412 root 1.50 my $port = port {
413     my @msg = @_;
414     ...
415     kil $SELF;
416 root 1.15 };
417 root 1.10
418     =cut
419    
420 root 1.33 sub rcv($@);
421    
422 root 1.132 my $KILME = sub {
423 root 1.133 (my $tag = substr $_[0], 0, 30) =~ s/([\x20-\x7e])/./g;
424 root 1.135 kil $SELF, unhandled_message => "no callback found for message '$tag'";
425 root 1.132 };
426 root 1.50
427 root 1.22 sub port(;&) {
428 root 1.123 my $id = $UNIQ . ++$ID;
429 root 1.22 my $port = "$NODE#$id";
430    
431 root 1.132 rcv $port, shift || $KILME;
432 root 1.10
433 root 1.22 $port
434 root 1.10 }
435    
436 root 1.50 =item rcv $local_port, $callback->(@msg)
437 root 1.31
438 root 1.50 Replaces the default callback on the specified port. There is no way to
439     remove the default callback: use C<sub { }> to disable it, or better
440     C<kil> the port when it is no longer needed.
441 root 1.3
442 root 1.33 The global C<$SELF> (exported by this module) contains C<$port> while
443 root 1.50 executing the callback. Runtime errors during callback execution will
444     result in the port being C<kil>ed.
445 root 1.22
446 root 1.133 The default callback receives all messages not matched by a more specific
447 root 1.50 C<tag> match.
448 root 1.22
449 root 1.50 =item rcv $local_port, tag => $callback->(@msg_without_tag), ...
450 root 1.3
451 root 1.54 Register (or replace) callbacks to be called on messages starting with the
452     given tag on the given port (and return the port), or unregister it (when
453     C<$callback> is C<$undef> or missing). There can only be one callback
454     registered for each tag.
455 root 1.3
456 root 1.50 The original message will be passed to the callback, after the first
457     element (the tag) has been removed. The callback will use the same
458     environment as the default callback (see above).
459 root 1.3
460 root 1.36 Example: create a port and bind receivers on it in one go.
461    
462     my $port = rcv port,
463 root 1.50 msg1 => sub { ... },
464     msg2 => sub { ... },
465 root 1.36 ;
466    
467     Example: create a port, bind receivers and send it in a message elsewhere
468     in one go:
469    
470     snd $otherport, reply =>
471     rcv port,
472 root 1.50 msg1 => sub { ... },
473 root 1.36 ...
474     ;
475    
476 root 1.54 Example: temporarily register a rcv callback for a tag matching some port
477 root 1.102 (e.g. for an rpc reply) and unregister it after a message was received.
478 root 1.54
479     rcv $port, $otherport => sub {
480     my @reply = @_;
481    
482     rcv $SELF, $otherport;
483     };
484    
485 root 1.3 =cut
486    
487     sub rcv($@) {
488 root 1.33 my $port = shift;
489 root 1.75 my ($nodeid, $portid) = split /#/, $port, 2;
490 root 1.3
491 root 1.144 $nodeid eq $NODE
492 root 1.33 or Carp::croak "$port: rcv can only be called on local ports, caught";
493 root 1.22
494 root 1.50 while (@_) {
495     if (ref $_[0]) {
496     if (my $self = $PORT_DATA{$portid}) {
497     "AnyEvent::MP::Port" eq ref $self
498     or Carp::croak "$port: rcv can only be called on message matching ports, caught";
499 root 1.33
500 root 1.103 $self->[0] = shift;
501 root 1.50 } else {
502     my $cb = shift;
503     $PORT{$portid} = sub {
504     local $SELF = $port;
505     eval { &$cb }; _self_die if $@;
506     };
507     }
508     } elsif (defined $_[0]) {
509     my $self = $PORT_DATA{$portid} ||= do {
510 root 1.103 my $self = bless [$PORT{$portid} || sub { }, { }, $port], "AnyEvent::MP::Port";
511 root 1.50
512     $PORT{$portid} = sub {
513     local $SELF = $port;
514    
515     if (my $cb = $self->[1]{$_[0]}) {
516     shift;
517     eval { &$cb }; _self_die if $@;
518     } else {
519     &{ $self->[0] };
520 root 1.33 }
521     };
522 root 1.50
523     $self
524 root 1.33 };
525    
526 root 1.50 "AnyEvent::MP::Port" eq ref $self
527     or Carp::croak "$port: rcv can only be called on message matching ports, caught";
528 root 1.22
529 root 1.50 my ($tag, $cb) = splice @_, 0, 2;
530 root 1.33
531 root 1.50 if (defined $cb) {
532     $self->[1]{$tag} = $cb;
533 root 1.33 } else {
534 root 1.50 delete $self->[1]{$tag};
535 root 1.33 }
536 root 1.22 }
537 root 1.3 }
538 root 1.31
539 root 1.33 $port
540 root 1.2 }
541    
542 root 1.101 =item peval $port, $coderef[, @args]
543    
544     Evaluates the given C<$codref> within the contetx of C<$port>, that is,
545 root 1.145 when the code throws an exception the C<$port> will be killed.
546 root 1.101
547     Any remaining args will be passed to the callback. Any return values will
548     be returned to the caller.
549    
550     This is useful when you temporarily want to execute code in the context of
551     a port.
552    
553     Example: create a port and run some initialisation code in it's context.
554    
555     my $port = port { ... };
556    
557     peval $port, sub {
558     init
559     or die "unable to init";
560     };
561    
562     =cut
563    
564     sub peval($$) {
565     local $SELF = shift;
566     my $cb = shift;
567    
568     if (wantarray) {
569     my @res = eval { &$cb };
570     _self_die if $@;
571     @res
572     } else {
573     my $res = eval { &$cb };
574     _self_die if $@;
575     $res
576     }
577     }
578    
579 root 1.22 =item $closure = psub { BLOCK }
580 root 1.2
581 root 1.22 Remembers C<$SELF> and creates a closure out of the BLOCK. When the
582     closure is executed, sets up the environment in the same way as in C<rcv>
583     callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
584    
585 root 1.101 The effect is basically as if it returned C<< sub { peval $SELF, sub {
586 root 1.114 BLOCK }, @_ } >>.
587 root 1.101
588 root 1.22 This is useful when you register callbacks from C<rcv> callbacks:
589    
590     rcv delayed_reply => sub {
591     my ($delay, @reply) = @_;
592     my $timer = AE::timer $delay, 0, psub {
593     snd @reply, $SELF;
594     };
595     };
596 root 1.3
597 root 1.8 =cut
598 root 1.3
599 root 1.22 sub psub(&) {
600     my $cb = shift;
601 root 1.3
602 root 1.22 my $port = $SELF
603     or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
604 root 1.1
605 root 1.22 sub {
606     local $SELF = $port;
607 root 1.2
608 root 1.22 if (wantarray) {
609     my @res = eval { &$cb };
610     _self_die if $@;
611     @res
612     } else {
613     my $res = eval { &$cb };
614     _self_die if $@;
615     $res
616     }
617     }
618 root 1.2 }
619    
620 root 1.67 =item $guard = mon $port, $rcvport # kill $rcvport when $port dies
621 root 1.36
622 root 1.67 =item $guard = mon $port # kill $SELF when $port dies
623 root 1.32
624 root 1.139 =item $guard = mon $port, $cb->(@reason) # call $cb when $port dies
625    
626 root 1.67 =item $guard = mon $port, $rcvport, @msg # send a message when $port dies
627 root 1.32
628 root 1.42 Monitor the given port and do something when the port is killed or
629     messages to it were lost, and optionally return a guard that can be used
630     to stop monitoring again.
631    
632 root 1.139 The first two forms distinguish between "normal" and "abnormal" kil's:
633 root 1.32
634 root 1.139 In the first form (another port given), if the C<$port> is C<kil>'ed with
635     a non-empty reason, the other port (C<$rcvport>) will be kil'ed with the
636     same reason. That is, on "normal" kil's nothing happens, while under all
637     other conditions, the other port is killed with the same reason.
638 root 1.32
639 root 1.139 The second form (kill self) is the same as the first form, except that
640 root 1.36 C<$rvport> defaults to C<$SELF>.
641    
642 root 1.139 The remaining forms don't distinguish between "normal" and "abnormal" kil's
643     - it's up to the callback or receiver to check whether the C<@reason> is
644     empty and act accordingly.
645    
646     In the third form (callback), the callback is simply called with any
647     number of C<@reason> elements (empty @reason means that the port was deleted
648     "normally"). Note also that I<< the callback B<must> never die >>, so use
649     C<eval> if unsure.
650    
651     In the last form (message), a message of the form C<$rcvport, @msg,
652     @reason> will be C<snd>.
653 root 1.32
654 root 1.79 Monitoring-actions are one-shot: once messages are lost (and a monitoring
655 root 1.139 alert was raised), they are removed and will not trigger again, even if it
656     turns out that the port is still alive.
657 root 1.79
658 root 1.139 As a rule of thumb, monitoring requests should always monitor a remote
659     port locally (using a local C<$rcvport> or a callback). The reason is that
660     kill messages might get lost, just like any other message. Another less
661     obvious reason is that even monitoring requests can get lost (for example,
662     when the connection to the other node goes down permanently). When
663     monitoring a port locally these problems do not exist.
664 root 1.37
665 root 1.79 C<mon> effectively guarantees that, in the absence of hardware failures,
666     after starting the monitor, either all messages sent to the port will
667     arrive, or the monitoring action will be invoked after possible message
668     loss has been detected. No messages will be lost "in between" (after
669     the first lost message no further messages will be received by the
670     port). After the monitoring action was invoked, further messages might get
671     delivered again.
672    
673     Inter-host-connection timeouts and monitoring depend on the transport
674     used. The only transport currently implemented is TCP, and AnyEvent::MP
675     relies on TCP to detect node-downs (this can take 10-15 minutes on a
676 elmex 1.96 non-idle connection, and usually around two hours for idle connections).
677 root 1.79
678     This means that monitoring is good for program errors and cleaning up
679     stuff eventually, but they are no replacement for a timeout when you need
680     to ensure some maximum latency.
681    
682 root 1.32 Example: call a given callback when C<$port> is killed.
683    
684     mon $port, sub { warn "port died because of <@_>\n" };
685    
686     Example: kill ourselves when C<$port> is killed abnormally.
687    
688 root 1.36 mon $port;
689 root 1.32
690 root 1.36 Example: send us a restart message when another C<$port> is killed.
691 root 1.32
692     mon $port, $self => "restart";
693    
694     =cut
695    
696     sub mon {
697 root 1.75 my ($nodeid, $port) = split /#/, shift, 2;
698 root 1.32
699 root 1.75 my $node = $NODE{$nodeid} || add_node $nodeid;
700 root 1.32
701 root 1.41 my $cb = @_ ? shift : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,';
702 root 1.32
703     unless (ref $cb) {
704     if (@_) {
705     # send a kill info message
706 root 1.41 my (@msg) = ($cb, @_);
707 root 1.32 $cb = sub { snd @msg, @_ };
708     } else {
709     # simply kill other port
710     my $port = $cb;
711     $cb = sub { kil $port, @_ if @_ };
712     }
713     }
714    
715     $node->monitor ($port, $cb);
716    
717     defined wantarray
718 root 1.124 and ($cb += 0, Guard::guard { $node->unmonitor ($port, $cb) })
719 root 1.32 }
720    
721     =item $guard = mon_guard $port, $ref, $ref...
722    
723     Monitors the given C<$port> and keeps the passed references. When the port
724     is killed, the references will be freed.
725    
726     Optionally returns a guard that will stop the monitoring.
727    
728     This function is useful when you create e.g. timers or other watchers and
729 root 1.67 want to free them when the port gets killed (note the use of C<psub>):
730 root 1.32
731     $port->rcv (start => sub {
732 root 1.67 my $timer; $timer = mon_guard $port, AE::timer 1, 1, psub {
733 root 1.32 undef $timer if 0.9 < rand;
734     });
735     });
736    
737     =cut
738    
739     sub mon_guard {
740     my ($port, @refs) = @_;
741    
742 root 1.36 #TODO: mon-less form?
743    
744 root 1.32 mon $port, sub { 0 && @refs }
745     }
746    
747 root 1.33 =item kil $port[, @reason]
748 root 1.32
749     Kill the specified port with the given C<@reason>.
750    
751 root 1.107 If no C<@reason> is specified, then the port is killed "normally" -
752     monitor callback will be invoked, but the kil will not cause linked ports
753     (C<mon $mport, $lport> form) to get killed.
754 root 1.32
755 root 1.107 If a C<@reason> is specified, then linked ports (C<mon $mport, $lport>
756     form) get killed with the same reason.
757 root 1.32
758     Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
759     will be reported as reason C<< die => $@ >>.
760    
761     Transport/communication errors are reported as C<< transport_error =>
762     $message >>.
763    
764 root 1.133 Common idioms:
765    
766     # silently remove yourself, do not kill linked ports
767     kil $SELF;
768    
769     # report a failure in some detail
770     kil $SELF, failure_mode_1 => "it failed with too high temperature";
771    
772     # do not waste much time with killing, just die when something goes wrong
773     open my $fh, "<file"
774     or die "file: $!";
775 root 1.38
776     =item $port = spawn $node, $initfunc[, @initdata]
777    
778     Creates a port on the node C<$node> (which can also be a port ID, in which
779     case it's the node where that port resides).
780    
781 root 1.67 The port ID of the newly created port is returned immediately, and it is
782     possible to immediately start sending messages or to monitor the port.
783 root 1.38
784 root 1.67 After the port has been created, the init function is called on the remote
785     node, in the same context as a C<rcv> callback. This function must be a
786     fully-qualified function name (e.g. C<MyApp::Chat::Server::init>). To
787     specify a function in the main program, use C<::name>.
788 root 1.38
789     If the function doesn't exist, then the node tries to C<require>
790     the package, then the package above the package and so on (e.g.
791     C<MyApp::Chat::Server>, C<MyApp::Chat>, C<MyApp>) until the function
792     exists or it runs out of package names.
793    
794     The init function is then called with the newly-created port as context
795 root 1.82 object (C<$SELF>) and the C<@initdata> values as arguments. It I<must>
796     call one of the C<rcv> functions to set callbacks on C<$SELF>, otherwise
797     the port might not get created.
798 root 1.38
799 root 1.67 A common idiom is to pass a local port, immediately monitor the spawned
800     port, and in the remote init function, immediately monitor the passed
801     local port. This two-way monitoring ensures that both ports get cleaned up
802     when there is a problem.
803 root 1.38
804 root 1.80 C<spawn> guarantees that the C<$initfunc> has no visible effects on the
805     caller before C<spawn> returns (by delaying invocation when spawn is
806     called for the local node).
807    
808 root 1.38 Example: spawn a chat server port on C<$othernode>.
809    
810     # this node, executed from within a port context:
811     my $server = spawn $othernode, "MyApp::Chat::Server::connect", $SELF;
812     mon $server;
813    
814     # init function on C<$othernode>
815     sub connect {
816     my ($srcport) = @_;
817    
818     mon $srcport;
819    
820     rcv $SELF, sub {
821     ...
822     };
823     }
824    
825     =cut
826    
827     sub _spawn {
828     my $port = shift;
829     my $init = shift;
830    
831 root 1.82 # rcv will create the actual port
832 root 1.38 local $SELF = "$NODE#$port";
833     eval {
834     &{ load_func $init }
835     };
836     _self_die if $@;
837     }
838    
839     sub spawn(@) {
840 root 1.75 my ($nodeid, undef) = split /#/, shift, 2;
841 root 1.38
842 root 1.123 my $id = $RUNIQ . ++$ID;
843 root 1.38
844 root 1.39 $_[0] =~ /::/
845     or Carp::croak "spawn init function must be a fully-qualified name, caught";
846    
847 root 1.75 snd_to_func $nodeid, "AnyEvent::MP::_spawn" => $id, @_;
848 root 1.38
849 root 1.75 "$nodeid#$id"
850 root 1.38 }
851    
852 root 1.121
853 root 1.59 =item after $timeout, @msg
854    
855     =item after $timeout, $callback
856    
857     Either sends the given message, or call the given callback, after the
858     specified number of seconds.
859    
860 root 1.67 This is simply a utility function that comes in handy at times - the
861     AnyEvent::MP author is not convinced of the wisdom of having it, though,
862     so it may go away in the future.
863 root 1.59
864     =cut
865    
866     sub after($@) {
867     my ($timeout, @action) = @_;
868    
869     my $t; $t = AE::timer $timeout, 0, sub {
870     undef $t;
871     ref $action[0]
872     ? $action[0]()
873     : snd @action;
874     };
875     }
876    
877 root 1.129 #=item $cb2 = timeout $seconds, $cb[, @args]
878    
879 root 1.87 =item cal $port, @msg, $callback[, $timeout]
880    
881     A simple form of RPC - sends a message to the given C<$port> with the
882     given contents (C<@msg>), but adds a reply port to the message.
883    
884     The reply port is created temporarily just for the purpose of receiving
885     the reply, and will be C<kil>ed when no longer needed.
886    
887     A reply message sent to the port is passed to the C<$callback> as-is.
888    
889     If an optional time-out (in seconds) is given and it is not C<undef>,
890     then the callback will be called without any arguments after the time-out
891     elapsed and the port is C<kil>ed.
892    
893 root 1.98 If no time-out is given (or it is C<undef>), then the local port will
894     monitor the remote port instead, so it eventually gets cleaned-up.
895 root 1.87
896     Currently this function returns the temporary port, but this "feature"
897     might go in future versions unless you can make a convincing case that
898     this is indeed useful for something.
899    
900     =cut
901    
902     sub cal(@) {
903     my $timeout = ref $_[-1] ? undef : pop;
904     my $cb = pop;
905    
906     my $port = port {
907     undef $timeout;
908     kil $SELF;
909     &$cb;
910     };
911    
912     if (defined $timeout) {
913     $timeout = AE::timer $timeout, 0, sub {
914     undef $timeout;
915     kil $port;
916     $cb->();
917     };
918     } else {
919     mon $_[0], sub {
920     kil $port;
921     $cb->();
922     };
923     }
924    
925     push @_, $port;
926     &snd;
927    
928     $port
929     }
930    
931 root 1.8 =back
932    
933 root 1.124 =head1 DISTRIBUTED DATABASE
934    
935     AnyEvent::MP comes with a simple distributed database. The database will
936 root 1.131 be mirrored asynchronously on all global nodes. Other nodes bind to one
937     of the global nodes for their needs. Every node has a "local database"
938     which contains all the values that are set locally. All local databases
939     are merged together to form the global database, which can be queried.
940    
941     The database structure is that of a two-level hash - the database hash
942     contains hashes which contain values, similarly to a perl hash of hashes,
943     i.e.:
944 root 1.124
945 root 1.131 $DATABASE{$family}{$subkey} = $value
946 root 1.124
947     The top level hash key is called "family", and the second-level hash key
948 root 1.126 is called "subkey" or simply "key".
949 root 1.124
950 root 1.125 The family must be alphanumeric, i.e. start with a letter and consist
951     of letters, digits, underscores and colons (C<[A-Za-z][A-Za-z0-9_:]*>,
952     pretty much like Perl module names.
953 root 1.124
954 root 1.125 As the family namespace is global, it is recommended to prefix family names
955 root 1.124 with the name of the application or module using it.
956    
957 root 1.126 The subkeys must be non-empty strings, with no further restrictions.
958 root 1.125
959 root 1.124 The values should preferably be strings, but other perl scalars should
960 root 1.131 work as well (such as C<undef>, arrays and hashes).
961 root 1.124
962 root 1.126 Every database entry is owned by one node - adding the same family/subkey
963 root 1.124 combination on multiple nodes will not cause discomfort for AnyEvent::MP,
964     but the result might be nondeterministic, i.e. the key might have
965     different values on different nodes.
966    
967 root 1.126 Different subkeys in the same family can be owned by different nodes
968     without problems, and in fact, this is the common method to create worker
969     pools. For example, a worker port for image scaling might do this:
970 root 1.124
971 root 1.126 db_set my_image_scalers => $port;
972 root 1.124
973 root 1.126 And clients looking for an image scaler will want to get the
974 root 1.129 C<my_image_scalers> keys from time to time:
975    
976     db_keys my_image_scalers => sub {
977     @ports = @{ $_[0] };
978     };
979    
980     Or better yet, they want to monitor the database family, so they always
981     have a reasonable up-to-date copy:
982    
983     db_mon my_image_scalers => sub {
984     @ports = keys %{ $_[0] };
985     };
986    
987     In general, you can set or delete single subkeys, but query and monitor
988     whole families only.
989 root 1.126
990 root 1.129 If you feel the need to monitor or query a single subkey, try giving it
991     it's own family.
992 root 1.126
993     =over
994    
995 root 1.137 =item $guard = db_set $family => $subkey [=> $value]
996 root 1.126
997     Sets (or replaces) a key to the database - if C<$value> is omitted,
998     C<undef> is used instead.
999    
1000 root 1.137 When called in non-void context, C<db_set> returns a guard that
1001     automatically calls C<db_del> when it is destroyed.
1002    
1003 root 1.130 =item db_del $family => $subkey...
1004 root 1.124
1005 root 1.130 Deletes one or more subkeys from the database family.
1006 root 1.124
1007 root 1.137 =item $guard = db_reg $family => $port => $value
1008    
1009     =item $guard = db_reg $family => $port
1010    
1011     =item $guard = db_reg $family
1012    
1013     Registers a port in the given family and optionally returns a guard to
1014     remove it.
1015    
1016     This function basically does the same as:
1017 root 1.124
1018 root 1.137 db_set $family => $port => $value
1019    
1020     Except that the port is monitored and automatically removed from the
1021     database family when it is kil'ed.
1022    
1023     If C<$value> is missing, C<undef> is used. If C<$port> is missing, then
1024     C<$SELF> is used.
1025    
1026     This function is most useful to register a port in some port group (which
1027     is just another name for a database family), and have it removed when the
1028     port is gone. This works best when the port is a local port.
1029    
1030     =cut
1031    
1032     sub db_reg($$;$) {
1033     my $family = shift;
1034     my $port = @_ ? shift : $SELF;
1035    
1036     my $clr = sub { db_del $family => $port };
1037     mon $port, $clr;
1038    
1039     db_set $family => $port => $_[0];
1040    
1041     defined wantarray
1042     and &Guard::guard ($clr)
1043     }
1044 root 1.124
1045 root 1.129 =item db_family $family => $cb->(\%familyhash)
1046    
1047     Queries the named database C<$family> and call the callback with the
1048     family represented as a hash. You can keep and freely modify the hash.
1049    
1050     =item db_keys $family => $cb->(\@keys)
1051    
1052     Same as C<db_family>, except it only queries the family I<subkeys> and passes
1053     them as array reference to the callback.
1054    
1055     =item db_values $family => $cb->(\@values)
1056    
1057     Same as C<db_family>, except it only queries the family I<values> and passes them
1058     as array reference to the callback.
1059    
1060 root 1.143 =item $guard = db_mon $family => $cb->(\%familyhash, \@added, \@changed, \@deleted)
1061 root 1.128
1062 root 1.130 Creates a monitor on the given database family. Each time a key is set
1063     or or is deleted the callback is called with a hash containing the
1064     database family and three lists of added, changed and deleted subkeys,
1065     respectively. If no keys have changed then the array reference might be
1066     C<undef> or even missing.
1067    
1068 root 1.132 If not called in void context, a guard object is returned that, when
1069     destroyed, stops the monitor.
1070    
1071 root 1.130 The family hash reference and the key arrays belong to AnyEvent::MP and
1072     B<must not be modified or stored> by the callback. When in doubt, make a
1073     copy.
1074    
1075     As soon as possible after the monitoring starts, the callback will be
1076     called with the intiial contents of the family, even if it is empty,
1077     i.e. there will always be a timely call to the callback with the current
1078     contents.
1079 root 1.128
1080     It is possible that the callback is called with a change event even though
1081     the subkey is already present and the value has not changed.
1082    
1083     The monitoring stops when the guard object is destroyed.
1084    
1085     Example: on every change to the family "mygroup", print out all keys.
1086    
1087     my $guard = db_mon mygroup => sub {
1088 root 1.130 my ($family, $a, $c, $d) = @_;
1089 root 1.128 print "mygroup members: ", (join " ", keys %$family), "\n";
1090     };
1091    
1092     Exmaple: wait until the family "My::Module::workers" is non-empty.
1093    
1094     my $guard; $guard = db_mon My::Module::workers => sub {
1095 root 1.130 my ($family, $a, $c, $d) = @_;
1096 root 1.128 return unless %$family;
1097     undef $guard;
1098     print "My::Module::workers now nonempty\n";
1099     };
1100    
1101     Example: print all changes to the family "AnyRvent::Fantasy::Module".
1102    
1103     my $guard = db_mon AnyRvent::Fantasy::Module => sub {
1104 root 1.130 my ($family, $a, $c, $d) = @_;
1105 root 1.128
1106 root 1.130 print "+$_=$family->{$_}\n" for @$a;
1107     print "*$_=$family->{$_}\n" for @$c;
1108     print "-$_=$family->{$_}\n" for @$d;
1109 root 1.128 };
1110    
1111 root 1.124 =cut
1112    
1113     =back
1114    
1115 root 1.26 =head1 AnyEvent::MP vs. Distributed Erlang
1116    
1117 root 1.35 AnyEvent::MP got lots of its ideas from distributed Erlang (Erlang node
1118     == aemp node, Erlang process == aemp port), so many of the documents and
1119     programming techniques employed by Erlang apply to AnyEvent::MP. Here is a
1120 root 1.27 sample:
1121    
1122 root 1.95 http://www.erlang.se/doc/programming_rules.shtml
1123     http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4
1124     http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6
1125     http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5
1126 root 1.27
1127     Despite the similarities, there are also some important differences:
1128 root 1.26
1129     =over 4
1130    
1131 root 1.65 =item * Node IDs are arbitrary strings in AEMP.
1132 root 1.26
1133 root 1.65 Erlang relies on special naming and DNS to work everywhere in the same
1134     way. AEMP relies on each node somehow knowing its own address(es) (e.g. by
1135 root 1.99 configuration or DNS), and possibly the addresses of some seed nodes, but
1136     will otherwise discover other nodes (and their IDs) itself.
1137 root 1.27
1138 root 1.54 =item * Erlang has a "remote ports are like local ports" philosophy, AEMP
1139 root 1.51 uses "local ports are like remote ports".
1140    
1141     The failure modes for local ports are quite different (runtime errors
1142     only) then for remote ports - when a local port dies, you I<know> it dies,
1143     when a connection to another node dies, you know nothing about the other
1144     port.
1145    
1146     Erlang pretends remote ports are as reliable as local ports, even when
1147     they are not.
1148    
1149     AEMP encourages a "treat remote ports differently" philosophy, with local
1150     ports being the special case/exception, where transport errors cannot
1151     occur.
1152    
1153 root 1.26 =item * Erlang uses processes and a mailbox, AEMP does not queue.
1154    
1155 root 1.119 Erlang uses processes that selectively receive messages out of order, and
1156     therefore needs a queue. AEMP is event based, queuing messages would serve
1157     no useful purpose. For the same reason the pattern-matching abilities
1158     of AnyEvent::MP are more limited, as there is little need to be able to
1159 elmex 1.77 filter messages without dequeuing them.
1160 root 1.26
1161 root 1.119 This is not a philosophical difference, but simply stems from AnyEvent::MP
1162     being event-based, while Erlang is process-based.
1163    
1164     You cna have a look at L<Coro::MP> for a more Erlang-like process model on
1165     top of AEMP and Coro threads.
1166 root 1.26
1167     =item * Erlang sends are synchronous, AEMP sends are asynchronous.
1168    
1169 root 1.119 Sending messages in Erlang is synchronous and blocks the process until
1170     a conenction has been established and the message sent (and so does not
1171     need a queue that can overflow). AEMP sends return immediately, connection
1172     establishment is handled in the background.
1173 root 1.26
1174 root 1.51 =item * Erlang suffers from silent message loss, AEMP does not.
1175 root 1.26
1176 root 1.99 Erlang implements few guarantees on messages delivery - messages can get
1177     lost without any of the processes realising it (i.e. you send messages a,
1178     b, and c, and the other side only receives messages a and c).
1179 root 1.26
1180 root 1.117 AEMP guarantees (modulo hardware errors) correct ordering, and the
1181     guarantee that after one message is lost, all following ones sent to the
1182     same port are lost as well, until monitoring raises an error, so there are
1183     no silent "holes" in the message sequence.
1184 root 1.26
1185 root 1.119 If you want your software to be very reliable, you have to cope with
1186     corrupted and even out-of-order messages in both Erlang and AEMP. AEMP
1187     simply tries to work better in common error cases, such as when a network
1188     link goes down.
1189    
1190 root 1.26 =item * Erlang can send messages to the wrong port, AEMP does not.
1191    
1192 root 1.119 In Erlang it is quite likely that a node that restarts reuses an Erlang
1193     process ID known to other nodes for a completely different process,
1194     causing messages destined for that process to end up in an unrelated
1195     process.
1196 root 1.26
1197 root 1.119 AEMP does not reuse port IDs, so old messages or old port IDs floating
1198 root 1.26 around in the network will not be sent to an unrelated port.
1199    
1200     =item * Erlang uses unprotected connections, AEMP uses secure
1201     authentication and can use TLS.
1202    
1203 root 1.66 AEMP can use a proven protocol - TLS - to protect connections and
1204 root 1.26 securely authenticate nodes.
1205    
1206 root 1.28 =item * The AEMP protocol is optimised for both text-based and binary
1207     communications.
1208    
1209 root 1.66 The AEMP protocol, unlike the Erlang protocol, supports both programming
1210 root 1.119 language independent text-only protocols (good for debugging), and binary,
1211 root 1.67 language-specific serialisers (e.g. Storable). By default, unless TLS is
1212     used, the protocol is actually completely text-based.
1213 root 1.28
1214     It has also been carefully designed to be implementable in other languages
1215 root 1.66 with a minimum of work while gracefully degrading functionality to make the
1216 root 1.28 protocol simple.
1217    
1218 root 1.35 =item * AEMP has more flexible monitoring options than Erlang.
1219    
1220 root 1.119 In Erlang, you can chose to receive I<all> exit signals as messages or
1221     I<none>, there is no in-between, so monitoring single Erlang processes is
1222     difficult to implement.
1223    
1224     Monitoring in AEMP is more flexible than in Erlang, as one can choose
1225     between automatic kill, exit message or callback on a per-port basis.
1226 root 1.35
1227 root 1.37 =item * Erlang tries to hide remote/local connections, AEMP does not.
1228 root 1.35
1229 root 1.67 Monitoring in Erlang is not an indicator of process death/crashes, in the
1230     same way as linking is (except linking is unreliable in Erlang).
1231 root 1.37
1232     In AEMP, you don't "look up" registered port names or send to named ports
1233     that might or might not be persistent. Instead, you normally spawn a port
1234 root 1.67 on the remote node. The init function monitors you, and you monitor the
1235     remote port. Since both monitors are local to the node, they are much more
1236     reliable (no need for C<spawn_link>).
1237 root 1.37
1238     This also saves round-trips and avoids sending messages to the wrong port
1239     (hard to do in Erlang).
1240 root 1.35
1241 root 1.26 =back
1242    
1243 root 1.46 =head1 RATIONALE
1244    
1245     =over 4
1246    
1247 root 1.67 =item Why strings for port and node IDs, why not objects?
1248 root 1.46
1249     We considered "objects", but found that the actual number of methods
1250 root 1.67 that can be called are quite low. Since port and node IDs travel over
1251 root 1.46 the network frequently, the serialising/deserialising would add lots of
1252 root 1.67 overhead, as well as having to keep a proxy object everywhere.
1253 root 1.46
1254     Strings can easily be printed, easily serialised etc. and need no special
1255     procedures to be "valid".
1256    
1257 root 1.110 And as a result, a port with just a default receiver consists of a single
1258 root 1.117 code reference stored in a global hash - it can't become much cheaper.
1259 root 1.47
1260 root 1.67 =item Why favour JSON, why not a real serialising format such as Storable?
1261 root 1.46
1262     In fact, any AnyEvent::MP node will happily accept Storable as framing
1263     format, but currently there is no way to make a node use Storable by
1264 root 1.67 default (although all nodes will accept it).
1265 root 1.46
1266     The default framing protocol is JSON because a) JSON::XS is many times
1267     faster for small messages and b) most importantly, after years of
1268     experience we found that object serialisation is causing more problems
1269 root 1.67 than it solves: Just like function calls, objects simply do not travel
1270 root 1.46 easily over the network, mostly because they will always be a copy, so you
1271     always have to re-think your design.
1272    
1273     Keeping your messages simple, concentrating on data structures rather than
1274     objects, will keep your messages clean, tidy and efficient.
1275    
1276     =back
1277    
1278 root 1.137 =head1 PORTING FROM AnyEvent::MP VERSION 1.X
1279    
1280 root 1.139 AEMP version 2 has a few major incompatible changes compared to version 1:
1281 root 1.137
1282     =over 4
1283    
1284     =item AnyEvent::MP::Global no longer has group management functions.
1285    
1286 root 1.140 At least not officially - the grp_* functions are still exported and might
1287     work, but they will be removed in some later release.
1288    
1289 root 1.137 AnyEvent::MP now comes with a distributed database that is more
1290 root 1.139 powerful. Its database families map closely to port groups, but the API
1291     has changed (the functions are also now exported by AnyEvent::MP). Here is
1292     a rough porting guide:
1293 root 1.137
1294     grp_reg $group, $port # old
1295     db_reg $group, $port # new
1296    
1297     $list = grp_get $group # old
1298     db_keys $group, sub { my $list = shift } # new
1299    
1300     grp_mon $group, $cb->(\@ports, $add, $del) # old
1301     db_mon $group, $cb->(\%ports, $add, $change, $del) # new
1302    
1303 root 1.139 C<grp_reg> is a no-brainer (just replace by C<db_reg>), but C<grp_get> is
1304     no longer instant, because the local node might not have a copy of the
1305     group. You can either modify your code to allow for a callback, or use
1306     C<db_mon> to keep an updated copy of the group:
1307 root 1.137
1308     my $local_group_copy;
1309 root 1.139 db_mon $group => sub { $local_group_copy = $_[0] };
1310 root 1.137
1311 root 1.139 # now "keys %$local_group_copy" always returns the most up-to-date
1312 root 1.137 # list of ports in the group.
1313    
1314 root 1.139 C<grp_mon> can be replaced by C<db_mon> with minor changes - C<db_mon>
1315     passes a hash as first argument, and an extra C<$chg> argument that can be
1316     ignored:
1317 root 1.137
1318     db_mon $group => sub {
1319     my ($ports, $add, $chg, $lde) = @_;
1320     $ports = [keys %$ports];
1321    
1322     # now $ports, $add and $del are the same as
1323     # were originally passed by grp_mon.
1324     ...
1325     };
1326    
1327     =item Nodes not longer connect to all other nodes.
1328    
1329     In AEMP 1.x, every node automatically loads the L<AnyEvent::MP::Global>
1330     module, which in turn would create connections to all other nodes in the
1331     network (helped by the seed nodes).
1332    
1333     In version 2.x, global nodes still connect to all other global nodes, but
1334     other nodes don't - now every node either is a global node itself, or
1335     attaches itself to another global node.
1336    
1337     If a node isn't a global node itself, then it attaches itself to one
1338     of its seed nodes. If that seed node isn't a global node yet, it will
1339     automatically be upgraded to a global node.
1340    
1341     So in many cases, nothing needs to be changed - one just has to make sure
1342     that all seed nodes are meshed together with the other seed nodes (as with
1343 root 1.139 AEMP 1.x), and other nodes specify them as seed nodes. This is most easily
1344     achieved by specifying the same set of seed nodes for all nodes in the
1345     network.
1346 root 1.137
1347     Not opening a connection to every other node is usually an advantage,
1348     except when you need the lower latency of an already established
1349     connection. To ensure a node establishes a connection to another node,
1350     you can monitor the node port (C<mon $node, ...>), which will attempt to
1351 root 1.138 create the connection (and notify you when the connection fails).
1352 root 1.137
1353 root 1.138 =item Listener-less nodes (nodes without binds) are gone.
1354 root 1.137
1355 root 1.138 And are not coming back, at least not in their old form. If no C<binds>
1356 root 1.139 are specified for a node, AnyEvent::MP assumes a default of C<*:*>.
1357 root 1.137
1358     There are vague plans to implement some form of routing domains, which
1359     might or might not bring back listener-less nodes, but don't count on it.
1360    
1361     The fact that most connections are now optional somewhat mitigates this,
1362     as a node can be effectively unreachable from the outside without any
1363     problems, as long as it isn't a global node and only reaches out to other
1364     nodes (as opposed to being contacted from other nodes).
1365    
1366 root 1.138 =item $AnyEvent::MP::Kernel::WARN has gone.
1367    
1368     AnyEvent has acquired a logging framework (L<AnyEvent::Log>), and AEMP now
1369     uses this, and so should your programs.
1370    
1371     Every module now documents what kinds of messages it generates, with
1372     AnyEvent::MP acting as a catch all.
1373    
1374     On the positive side, this means that instead of setting
1375 root 1.139 C<PERL_ANYEVENT_MP_WARNLEVEL>, you can get away by setting C<AE_VERBOSE> -
1376 root 1.138 much less to type.
1377    
1378 root 1.137 =back
1379    
1380 root 1.139 =head1 LOGGING
1381    
1382     AnyEvent::MP does not normally log anything by itself, but sinc eit is the
1383     root of the contetx hierarchy for AnyEvent::MP modules, it will receive
1384     all log messages by submodules.
1385    
1386 root 1.1 =head1 SEE ALSO
1387    
1388 root 1.68 L<AnyEvent::MP::Intro> - a gentle introduction.
1389    
1390     L<AnyEvent::MP::Kernel> - more, lower-level, stuff.
1391    
1392 root 1.113 L<AnyEvent::MP::Global> - network maintenance and port groups, to find
1393 root 1.68 your applications.
1394    
1395 root 1.105 L<AnyEvent::MP::DataConn> - establish data connections between nodes.
1396    
1397 root 1.81 L<AnyEvent::MP::LogCatcher> - simple service to display log messages from
1398     all nodes.
1399    
1400 root 1.1 L<AnyEvent>.
1401    
1402     =head1 AUTHOR
1403    
1404     Marc Lehmann <schmorp@schmorp.de>
1405     http://home.schmorp.de/
1406    
1407     =cut
1408    
1409     1
1410