ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.115
Committed: Fri May 7 18:14:21 2010 UTC (14 years ago) by root
Branch: MAIN
CVS Tags: rel-1_29
Changes since 1.114: +1 -1 lines
Log Message:
1.29

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.89 AnyEvent::MP - erlang-style multi-processing/message-passing framework
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.75 $NODE # contains this node's node ID
10     NODE # returns this node's node ID
11 root 1.2
12 root 1.38 $SELF # receiving/own port id in rcv callbacks
13    
14 root 1.48 # initialise the node so it can send/receive messages
15 root 1.72 configure;
16 root 1.48
17 root 1.75 # ports are message destinations
18 root 1.38
19     # sending messages
20 root 1.2 snd $port, type => data...;
21 root 1.38 snd $port, @msg;
22     snd @msg_with_first_element_being_a_port;
23 root 1.2
24 root 1.50 # creating/using ports, the simple way
25 root 1.73 my $simple_port = port { my @msg = @_ };
26 root 1.22
27 root 1.52 # creating/using ports, tagged message matching
28 root 1.38 my $port = port;
29 root 1.73 rcv $port, ping => sub { snd $_[0], "pong" };
30     rcv $port, pong => sub { warn "pong received\n" };
31 root 1.2
32 root 1.48 # create a port on another node
33     my $port = spawn $node, $initfunc, @initdata;
34    
35 root 1.101 # destroy a prot again
36     kil $port; # "normal" kill
37     kil $port, my_error => "everything is broken"; # error kill
38    
39 root 1.35 # monitoring
40 root 1.90 mon $localport, $cb->(@msg) # callback is invoked on death
41     mon $localport, $otherport # kill otherport on abnormal death
42     mon $localport, $otherport, @msg # send message on death
43 root 1.35
44 root 1.101 # temporarily execute code in port context
45     peval $port, sub { die "kill the port!" };
46    
47     # execute callbacks in $SELF port context
48     my $timer = AE::timer 1, 0, psub {
49     die "kill the port, delayed";
50     };
51    
52 root 1.45 =head1 CURRENT STATUS
53    
54 root 1.71 bin/aemp - stable.
55     AnyEvent::MP - stable API, should work.
56 elmex 1.77 AnyEvent::MP::Intro - explains most concepts.
57 root 1.88 AnyEvent::MP::Kernel - mostly stable API.
58     AnyEvent::MP::Global - stable API.
59 root 1.45
60 root 1.1 =head1 DESCRIPTION
61    
62 root 1.2 This module (-family) implements a simple message passing framework.
63    
64     Despite its simplicity, you can securely message other processes running
65 root 1.67 on the same or other hosts, and you can supervise entities remotely.
66 root 1.2
67 root 1.23 For an introduction to this module family, see the L<AnyEvent::MP::Intro>
68 root 1.67 manual page and the examples under F<eg/>.
69 root 1.23
70 root 1.2 =head1 CONCEPTS
71    
72     =over 4
73    
74     =item port
75    
76 root 1.79 Not to be confused with a TCP port, a "port" is something you can send
77     messages to (with the C<snd> function).
78 root 1.29
79 root 1.53 Ports allow you to register C<rcv> handlers that can match all or just
80 root 1.64 some messages. Messages send to ports will not be queued, regardless of
81     anything was listening for them or not.
82 root 1.2
83 root 1.67 =item port ID - C<nodeid#portname>
84 root 1.2
85 root 1.67 A port ID is the concatenation of a node ID, a hash-mark (C<#>) as
86     separator, and a port name (a printable string of unspecified format).
87 root 1.2
88     =item node
89    
90 root 1.53 A node is a single process containing at least one port - the node port,
91 root 1.67 which enables nodes to manage each other remotely, and to create new
92 root 1.53 ports.
93 root 1.2
94 root 1.67 Nodes are either public (have one or more listening ports) or private
95     (no listening ports). Private nodes cannot talk to other private nodes
96     currently.
97 root 1.2
98 root 1.84 =item node ID - C<[A-Z_][a-zA-Z0-9_\-.:]*>
99 root 1.2
100 root 1.64 A node ID is a string that uniquely identifies the node within a
101     network. Depending on the configuration used, node IDs can look like a
102     hostname, a hostname and a port, or a random string. AnyEvent::MP itself
103     doesn't interpret node IDs in any way.
104    
105     =item binds - C<ip:port>
106    
107     Nodes can only talk to each other by creating some kind of connection to
108     each other. To do this, nodes should listen on one or more local transport
109     endpoints - binds. Currently, only standard C<ip:port> specifications can
110     be used, which specify TCP ports to listen on.
111    
112 root 1.83 =item seed nodes
113 root 1.64
114     When a node starts, it knows nothing about the network. To teach the node
115     about the network it first has to contact some other node within the
116     network. This node is called a seed.
117    
118 root 1.83 Apart from the fact that other nodes know them as seed nodes and they have
119     to have fixed listening addresses, seed nodes are perfectly normal nodes -
120     any node can function as a seed node for others.
121    
122     In addition to discovering the network, seed nodes are also used to
123     maintain the network and to connect nodes that otherwise would have
124 root 1.86 trouble connecting. They form the backbone of an AnyEvent::MP network.
125 root 1.83
126     Seed nodes are expected to be long-running, and at least one seed node
127 root 1.85 should always be available. They should also be relatively responsive - a
128     seed node that blocks for long periods will slow down everybody else.
129 root 1.83
130     =item seeds - C<host:port>
131    
132     Seeds are transport endpoint(s) (usually a hostname/IP address and a
133 elmex 1.96 TCP port) of nodes that should be used as seed nodes.
134 root 1.29
135 root 1.83 The nodes listening on those endpoints are expected to be long-running,
136     and at least one of those should always be available. When nodes run out
137     of connections (e.g. due to a network error), they try to re-establish
138     connections to some seednodes again to join the network.
139 root 1.67
140 root 1.2 =back
141    
142 root 1.3 =head1 VARIABLES/FUNCTIONS
143 root 1.2
144     =over 4
145    
146 root 1.1 =cut
147    
148     package AnyEvent::MP;
149    
150 root 1.44 use AnyEvent::MP::Kernel;
151 root 1.2
152 root 1.1 use common::sense;
153    
154 root 1.2 use Carp ();
155    
156 root 1.1 use AE ();
157    
158 root 1.2 use base "Exporter";
159    
160 root 1.115 our $VERSION = 1.29;
161 root 1.43
162 root 1.8 our @EXPORT = qw(
163 root 1.59 NODE $NODE *SELF node_of after
164 root 1.72 configure
165 root 1.101 snd rcv mon mon_guard kil psub peval spawn cal
166 root 1.22 port
167 root 1.8 );
168 root 1.2
169 root 1.22 our $SELF;
170    
171     sub _self_die() {
172     my $msg = $@;
173     $msg =~ s/\n+$// unless ref $msg;
174     kil $SELF, die => $msg;
175     }
176    
177     =item $thisnode = NODE / $NODE
178    
179 root 1.67 The C<NODE> function returns, and the C<$NODE> variable contains, the node
180 root 1.64 ID of the node running in the current process. This value is initialised by
181 root 1.72 a call to C<configure>.
182 root 1.22
183 root 1.63 =item $nodeid = node_of $port
184 root 1.22
185 root 1.67 Extracts and returns the node ID from a port ID or a node ID.
186 root 1.34
187 root 1.78 =item configure $profile, key => value...
188    
189 root 1.72 =item configure key => value...
190 root 1.34
191 root 1.64 Before a node can talk to other nodes on the network (i.e. enter
192 root 1.72 "distributed mode") it has to configure itself - the minimum a node needs
193 root 1.64 to know is its own name, and optionally it should know the addresses of
194     some other nodes in the network to discover other nodes.
195 root 1.34
196 root 1.108 The key/value pairs are basically the same ones as documented for the
197     F<aemp> command line utility (sans the set/del prefix).
198    
199 root 1.72 This function configures a node - it must be called exactly once (or
200 root 1.34 never) before calling other AnyEvent::MP functions.
201    
202 root 1.72 =over 4
203    
204     =item step 1, gathering configuration from profiles
205    
206     The function first looks up a profile in the aemp configuration (see the
207     L<aemp> commandline utility). The profile name can be specified via the
208 root 1.78 named C<profile> parameter or can simply be the first parameter). If it is
209     missing, then the nodename (F<uname -n>) will be used as profile name.
210 root 1.34
211 root 1.72 The profile data is then gathered as follows:
212 root 1.69
213 elmex 1.77 First, all remaining key => value pairs (all of which are conveniently
214 root 1.72 undocumented at the moment) will be interpreted as configuration
215     data. Then they will be overwritten by any values specified in the global
216     default configuration (see the F<aemp> utility), then the chain of
217     profiles chosen by the profile name (and any C<parent> attributes).
218    
219     That means that the values specified in the profile have highest priority
220     and the values specified directly via C<configure> have lowest priority,
221     and can only be used to specify defaults.
222 root 1.49
223 root 1.64 If the profile specifies a node ID, then this will become the node ID of
224     this process. If not, then the profile name will be used as node ID. The
225     special node ID of C<anon/> will be replaced by a random node ID.
226    
227 root 1.72 =item step 2, bind listener sockets
228    
229 root 1.64 The next step is to look up the binds in the profile, followed by binding
230     aemp protocol listeners on all binds specified (it is possible and valid
231     to have no binds, meaning that the node cannot be contacted form the
232     outside. This means the node cannot talk to other nodes that also have no
233     binds, but it can still talk to all "normal" nodes).
234    
235 root 1.70 If the profile does not specify a binds list, then a default of C<*> is
236 root 1.72 used, meaning the node will bind on a dynamically-assigned port on every
237     local IP address it finds.
238    
239     =item step 3, connect to seed nodes
240 root 1.64
241 root 1.72 As the last step, the seeds list from the profile is passed to the
242 root 1.64 L<AnyEvent::MP::Global> module, which will then use it to keep
243 root 1.72 connectivity with at least one node at any point in time.
244 root 1.64
245 root 1.72 =back
246    
247 root 1.87 Example: become a distributed node using the local node name as profile.
248 root 1.72 This should be the most common form of invocation for "daemon"-type nodes.
249 root 1.34
250 root 1.72 configure
251 root 1.34
252 root 1.64 Example: become an anonymous node. This form is often used for commandline
253     clients.
254 root 1.34
255 root 1.72 configure nodeid => "anon/";
256    
257     Example: configure a node using a profile called seed, which si suitable
258     for a seed node as it binds on all local addresses on a fixed port (4040,
259     customary for aemp).
260    
261     # use the aemp commandline utility
262 root 1.74 # aemp profile seed nodeid anon/ binds '*:4040'
263 root 1.72
264     # then use it
265     configure profile => "seed";
266 root 1.34
267 root 1.72 # or simply use aemp from the shell again:
268     # aemp run profile seed
269 root 1.34
270 root 1.72 # or provide a nicer-to-remember nodeid
271     # aemp run profile seed nodeid "$(hostname)"
272 root 1.34
273 root 1.22 =item $SELF
274    
275     Contains the current port id while executing C<rcv> callbacks or C<psub>
276     blocks.
277 root 1.3
278 root 1.67 =item *SELF, SELF, %SELF, @SELF...
279 root 1.22
280     Due to some quirks in how perl exports variables, it is impossible to
281 root 1.67 just export C<$SELF>, all the symbols named C<SELF> are exported by this
282 root 1.22 module, but only C<$SELF> is currently used.
283 root 1.3
284 root 1.33 =item snd $port, type => @data
285 root 1.3
286 root 1.33 =item snd $port, @msg
287 root 1.3
288 root 1.67 Send the given message to the given port, which can identify either a
289     local or a remote port, and must be a port ID.
290 root 1.8
291 root 1.67 While the message can be almost anything, it is highly recommended to
292     use a string as first element (a port ID, or some word that indicates a
293     request type etc.) and to consist if only simple perl values (scalars,
294     arrays, hashes) - if you think you need to pass an object, think again.
295    
296     The message data logically becomes read-only after a call to this
297     function: modifying any argument (or values referenced by them) is
298     forbidden, as there can be considerable time between the call to C<snd>
299     and the time the message is actually being serialised - in fact, it might
300     never be copied as within the same process it is simply handed to the
301     receiving port.
302 root 1.3
303     The type of data you can transfer depends on the transport protocol: when
304     JSON is used, then only strings, numbers and arrays and hashes consisting
305     of those are allowed (no objects). When Storable is used, then anything
306     that Storable can serialise and deserialise is allowed, and for the local
307 root 1.67 node, anything can be passed. Best rely only on the common denominator of
308     these.
309 root 1.3
310 root 1.22 =item $local_port = port
311 root 1.2
312 root 1.50 Create a new local port object and returns its port ID. Initially it has
313     no callbacks set and will throw an error when it receives messages.
314 root 1.10
315 root 1.50 =item $local_port = port { my @msg = @_ }
316 root 1.15
317 root 1.50 Creates a new local port, and returns its ID. Semantically the same as
318     creating a port and calling C<rcv $port, $callback> on it.
319 root 1.15
320 root 1.50 The block will be called for every message received on the port, with the
321     global variable C<$SELF> set to the port ID. Runtime errors will cause the
322     port to be C<kil>ed. The message will be passed as-is, no extra argument
323     (i.e. no port ID) will be passed to the callback.
324 root 1.15
325 root 1.50 If you want to stop/destroy the port, simply C<kil> it:
326 root 1.15
327 root 1.50 my $port = port {
328     my @msg = @_;
329     ...
330     kil $SELF;
331 root 1.15 };
332 root 1.10
333     =cut
334    
335 root 1.33 sub rcv($@);
336    
337 root 1.50 sub _kilme {
338     die "received message on port without callback";
339     }
340    
341 root 1.22 sub port(;&) {
342     my $id = "$UNIQ." . $ID++;
343     my $port = "$NODE#$id";
344    
345 root 1.50 rcv $port, shift || \&_kilme;
346 root 1.10
347 root 1.22 $port
348 root 1.10 }
349    
350 root 1.50 =item rcv $local_port, $callback->(@msg)
351 root 1.31
352 root 1.50 Replaces the default callback on the specified port. There is no way to
353     remove the default callback: use C<sub { }> to disable it, or better
354     C<kil> the port when it is no longer needed.
355 root 1.3
356 root 1.33 The global C<$SELF> (exported by this module) contains C<$port> while
357 root 1.50 executing the callback. Runtime errors during callback execution will
358     result in the port being C<kil>ed.
359 root 1.22
360 root 1.50 The default callback received all messages not matched by a more specific
361     C<tag> match.
362 root 1.22
363 root 1.50 =item rcv $local_port, tag => $callback->(@msg_without_tag), ...
364 root 1.3
365 root 1.54 Register (or replace) callbacks to be called on messages starting with the
366     given tag on the given port (and return the port), or unregister it (when
367     C<$callback> is C<$undef> or missing). There can only be one callback
368     registered for each tag.
369 root 1.3
370 root 1.50 The original message will be passed to the callback, after the first
371     element (the tag) has been removed. The callback will use the same
372     environment as the default callback (see above).
373 root 1.3
374 root 1.36 Example: create a port and bind receivers on it in one go.
375    
376     my $port = rcv port,
377 root 1.50 msg1 => sub { ... },
378     msg2 => sub { ... },
379 root 1.36 ;
380    
381     Example: create a port, bind receivers and send it in a message elsewhere
382     in one go:
383    
384     snd $otherport, reply =>
385     rcv port,
386 root 1.50 msg1 => sub { ... },
387 root 1.36 ...
388     ;
389    
390 root 1.54 Example: temporarily register a rcv callback for a tag matching some port
391 root 1.102 (e.g. for an rpc reply) and unregister it after a message was received.
392 root 1.54
393     rcv $port, $otherport => sub {
394     my @reply = @_;
395    
396     rcv $SELF, $otherport;
397     };
398    
399 root 1.3 =cut
400    
401     sub rcv($@) {
402 root 1.33 my $port = shift;
403 root 1.75 my ($nodeid, $portid) = split /#/, $port, 2;
404 root 1.3
405 root 1.75 $NODE{$nodeid} == $NODE{""}
406 root 1.33 or Carp::croak "$port: rcv can only be called on local ports, caught";
407 root 1.22
408 root 1.50 while (@_) {
409     if (ref $_[0]) {
410     if (my $self = $PORT_DATA{$portid}) {
411     "AnyEvent::MP::Port" eq ref $self
412     or Carp::croak "$port: rcv can only be called on message matching ports, caught";
413 root 1.33
414 root 1.103 $self->[0] = shift;
415 root 1.50 } else {
416     my $cb = shift;
417     $PORT{$portid} = sub {
418     local $SELF = $port;
419     eval { &$cb }; _self_die if $@;
420     };
421     }
422     } elsif (defined $_[0]) {
423     my $self = $PORT_DATA{$portid} ||= do {
424 root 1.103 my $self = bless [$PORT{$portid} || sub { }, { }, $port], "AnyEvent::MP::Port";
425 root 1.50
426     $PORT{$portid} = sub {
427     local $SELF = $port;
428    
429     if (my $cb = $self->[1]{$_[0]}) {
430     shift;
431     eval { &$cb }; _self_die if $@;
432     } else {
433     &{ $self->[0] };
434 root 1.33 }
435     };
436 root 1.50
437     $self
438 root 1.33 };
439    
440 root 1.50 "AnyEvent::MP::Port" eq ref $self
441     or Carp::croak "$port: rcv can only be called on message matching ports, caught";
442 root 1.22
443 root 1.50 my ($tag, $cb) = splice @_, 0, 2;
444 root 1.33
445 root 1.50 if (defined $cb) {
446     $self->[1]{$tag} = $cb;
447 root 1.33 } else {
448 root 1.50 delete $self->[1]{$tag};
449 root 1.33 }
450 root 1.22 }
451 root 1.3 }
452 root 1.31
453 root 1.33 $port
454 root 1.2 }
455    
456 root 1.101 =item peval $port, $coderef[, @args]
457    
458     Evaluates the given C<$codref> within the contetx of C<$port>, that is,
459     when the code throews an exception the C<$port> will be killed.
460    
461     Any remaining args will be passed to the callback. Any return values will
462     be returned to the caller.
463    
464     This is useful when you temporarily want to execute code in the context of
465     a port.
466    
467     Example: create a port and run some initialisation code in it's context.
468    
469     my $port = port { ... };
470    
471     peval $port, sub {
472     init
473     or die "unable to init";
474     };
475    
476     =cut
477    
478     sub peval($$) {
479     local $SELF = shift;
480     my $cb = shift;
481    
482     if (wantarray) {
483     my @res = eval { &$cb };
484     _self_die if $@;
485     @res
486     } else {
487     my $res = eval { &$cb };
488     _self_die if $@;
489     $res
490     }
491     }
492    
493 root 1.22 =item $closure = psub { BLOCK }
494 root 1.2
495 root 1.22 Remembers C<$SELF> and creates a closure out of the BLOCK. When the
496     closure is executed, sets up the environment in the same way as in C<rcv>
497     callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
498    
499 root 1.101 The effect is basically as if it returned C<< sub { peval $SELF, sub {
500 root 1.114 BLOCK }, @_ } >>.
501 root 1.101
502 root 1.22 This is useful when you register callbacks from C<rcv> callbacks:
503    
504     rcv delayed_reply => sub {
505     my ($delay, @reply) = @_;
506     my $timer = AE::timer $delay, 0, psub {
507     snd @reply, $SELF;
508     };
509     };
510 root 1.3
511 root 1.8 =cut
512 root 1.3
513 root 1.22 sub psub(&) {
514     my $cb = shift;
515 root 1.3
516 root 1.22 my $port = $SELF
517     or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
518 root 1.1
519 root 1.22 sub {
520     local $SELF = $port;
521 root 1.2
522 root 1.22 if (wantarray) {
523     my @res = eval { &$cb };
524     _self_die if $@;
525     @res
526     } else {
527     my $res = eval { &$cb };
528     _self_die if $@;
529     $res
530     }
531     }
532 root 1.2 }
533    
534 root 1.67 =item $guard = mon $port, $cb->(@reason) # call $cb when $port dies
535 root 1.32
536 root 1.67 =item $guard = mon $port, $rcvport # kill $rcvport when $port dies
537 root 1.36
538 root 1.67 =item $guard = mon $port # kill $SELF when $port dies
539 root 1.32
540 root 1.67 =item $guard = mon $port, $rcvport, @msg # send a message when $port dies
541 root 1.32
542 root 1.42 Monitor the given port and do something when the port is killed or
543     messages to it were lost, and optionally return a guard that can be used
544     to stop monitoring again.
545    
546 root 1.36 In the first form (callback), the callback is simply called with any
547     number of C<@reason> elements (no @reason means that the port was deleted
548 root 1.32 "normally"). Note also that I<< the callback B<must> never die >>, so use
549     C<eval> if unsure.
550    
551 root 1.43 In the second form (another port given), the other port (C<$rcvport>)
552 elmex 1.77 will be C<kil>'ed with C<@reason>, if a @reason was specified, i.e. on
553 root 1.36 "normal" kils nothing happens, while under all other conditions, the other
554     port is killed with the same reason.
555 root 1.32
556 root 1.36 The third form (kill self) is the same as the second form, except that
557     C<$rvport> defaults to C<$SELF>.
558    
559     In the last form (message), a message of the form C<@msg, @reason> will be
560     C<snd>.
561 root 1.32
562 root 1.79 Monitoring-actions are one-shot: once messages are lost (and a monitoring
563     alert was raised), they are removed and will not trigger again.
564    
565 root 1.37 As a rule of thumb, monitoring requests should always monitor a port from
566     a local port (or callback). The reason is that kill messages might get
567     lost, just like any other message. Another less obvious reason is that
568 elmex 1.77 even monitoring requests can get lost (for example, when the connection
569 root 1.37 to the other node goes down permanently). When monitoring a port locally
570     these problems do not exist.
571    
572 root 1.79 C<mon> effectively guarantees that, in the absence of hardware failures,
573     after starting the monitor, either all messages sent to the port will
574     arrive, or the monitoring action will be invoked after possible message
575     loss has been detected. No messages will be lost "in between" (after
576     the first lost message no further messages will be received by the
577     port). After the monitoring action was invoked, further messages might get
578     delivered again.
579    
580     Inter-host-connection timeouts and monitoring depend on the transport
581     used. The only transport currently implemented is TCP, and AnyEvent::MP
582     relies on TCP to detect node-downs (this can take 10-15 minutes on a
583 elmex 1.96 non-idle connection, and usually around two hours for idle connections).
584 root 1.79
585     This means that monitoring is good for program errors and cleaning up
586     stuff eventually, but they are no replacement for a timeout when you need
587     to ensure some maximum latency.
588    
589 root 1.32 Example: call a given callback when C<$port> is killed.
590    
591     mon $port, sub { warn "port died because of <@_>\n" };
592    
593     Example: kill ourselves when C<$port> is killed abnormally.
594    
595 root 1.36 mon $port;
596 root 1.32
597 root 1.36 Example: send us a restart message when another C<$port> is killed.
598 root 1.32
599     mon $port, $self => "restart";
600    
601     =cut
602    
603     sub mon {
604 root 1.75 my ($nodeid, $port) = split /#/, shift, 2;
605 root 1.32
606 root 1.75 my $node = $NODE{$nodeid} || add_node $nodeid;
607 root 1.32
608 root 1.41 my $cb = @_ ? shift : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,';
609 root 1.32
610     unless (ref $cb) {
611     if (@_) {
612     # send a kill info message
613 root 1.41 my (@msg) = ($cb, @_);
614 root 1.32 $cb = sub { snd @msg, @_ };
615     } else {
616     # simply kill other port
617     my $port = $cb;
618     $cb = sub { kil $port, @_ if @_ };
619     }
620     }
621    
622     $node->monitor ($port, $cb);
623    
624     defined wantarray
625 root 1.94 and ($cb += 0, AnyEvent::Util::guard { $node->unmonitor ($port, $cb) })
626 root 1.32 }
627    
628     =item $guard = mon_guard $port, $ref, $ref...
629    
630     Monitors the given C<$port> and keeps the passed references. When the port
631     is killed, the references will be freed.
632    
633     Optionally returns a guard that will stop the monitoring.
634    
635     This function is useful when you create e.g. timers or other watchers and
636 root 1.67 want to free them when the port gets killed (note the use of C<psub>):
637 root 1.32
638     $port->rcv (start => sub {
639 root 1.67 my $timer; $timer = mon_guard $port, AE::timer 1, 1, psub {
640 root 1.32 undef $timer if 0.9 < rand;
641     });
642     });
643    
644     =cut
645    
646     sub mon_guard {
647     my ($port, @refs) = @_;
648    
649 root 1.36 #TODO: mon-less form?
650    
651 root 1.32 mon $port, sub { 0 && @refs }
652     }
653    
654 root 1.33 =item kil $port[, @reason]
655 root 1.32
656     Kill the specified port with the given C<@reason>.
657    
658 root 1.107 If no C<@reason> is specified, then the port is killed "normally" -
659     monitor callback will be invoked, but the kil will not cause linked ports
660     (C<mon $mport, $lport> form) to get killed.
661 root 1.32
662 root 1.107 If a C<@reason> is specified, then linked ports (C<mon $mport, $lport>
663     form) get killed with the same reason.
664 root 1.32
665     Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
666     will be reported as reason C<< die => $@ >>.
667    
668     Transport/communication errors are reported as C<< transport_error =>
669     $message >>.
670    
671 root 1.38 =cut
672    
673     =item $port = spawn $node, $initfunc[, @initdata]
674    
675     Creates a port on the node C<$node> (which can also be a port ID, in which
676     case it's the node where that port resides).
677    
678 root 1.67 The port ID of the newly created port is returned immediately, and it is
679     possible to immediately start sending messages or to monitor the port.
680 root 1.38
681 root 1.67 After the port has been created, the init function is called on the remote
682     node, in the same context as a C<rcv> callback. This function must be a
683     fully-qualified function name (e.g. C<MyApp::Chat::Server::init>). To
684     specify a function in the main program, use C<::name>.
685 root 1.38
686     If the function doesn't exist, then the node tries to C<require>
687     the package, then the package above the package and so on (e.g.
688     C<MyApp::Chat::Server>, C<MyApp::Chat>, C<MyApp>) until the function
689     exists or it runs out of package names.
690    
691     The init function is then called with the newly-created port as context
692 root 1.82 object (C<$SELF>) and the C<@initdata> values as arguments. It I<must>
693     call one of the C<rcv> functions to set callbacks on C<$SELF>, otherwise
694     the port might not get created.
695 root 1.38
696 root 1.67 A common idiom is to pass a local port, immediately monitor the spawned
697     port, and in the remote init function, immediately monitor the passed
698     local port. This two-way monitoring ensures that both ports get cleaned up
699     when there is a problem.
700 root 1.38
701 root 1.80 C<spawn> guarantees that the C<$initfunc> has no visible effects on the
702     caller before C<spawn> returns (by delaying invocation when spawn is
703     called for the local node).
704    
705 root 1.38 Example: spawn a chat server port on C<$othernode>.
706    
707     # this node, executed from within a port context:
708     my $server = spawn $othernode, "MyApp::Chat::Server::connect", $SELF;
709     mon $server;
710    
711     # init function on C<$othernode>
712     sub connect {
713     my ($srcport) = @_;
714    
715     mon $srcport;
716    
717     rcv $SELF, sub {
718     ...
719     };
720     }
721    
722     =cut
723    
724     sub _spawn {
725     my $port = shift;
726     my $init = shift;
727    
728 root 1.82 # rcv will create the actual port
729 root 1.38 local $SELF = "$NODE#$port";
730     eval {
731     &{ load_func $init }
732     };
733     _self_die if $@;
734     }
735    
736     sub spawn(@) {
737 root 1.75 my ($nodeid, undef) = split /#/, shift, 2;
738 root 1.38
739     my $id = "$RUNIQ." . $ID++;
740    
741 root 1.39 $_[0] =~ /::/
742     or Carp::croak "spawn init function must be a fully-qualified name, caught";
743    
744 root 1.75 snd_to_func $nodeid, "AnyEvent::MP::_spawn" => $id, @_;
745 root 1.38
746 root 1.75 "$nodeid#$id"
747 root 1.38 }
748    
749 root 1.59 =item after $timeout, @msg
750    
751     =item after $timeout, $callback
752    
753     Either sends the given message, or call the given callback, after the
754     specified number of seconds.
755    
756 root 1.67 This is simply a utility function that comes in handy at times - the
757     AnyEvent::MP author is not convinced of the wisdom of having it, though,
758     so it may go away in the future.
759 root 1.59
760     =cut
761    
762     sub after($@) {
763     my ($timeout, @action) = @_;
764    
765     my $t; $t = AE::timer $timeout, 0, sub {
766     undef $t;
767     ref $action[0]
768     ? $action[0]()
769     : snd @action;
770     };
771     }
772    
773 root 1.87 =item cal $port, @msg, $callback[, $timeout]
774    
775     A simple form of RPC - sends a message to the given C<$port> with the
776     given contents (C<@msg>), but adds a reply port to the message.
777    
778     The reply port is created temporarily just for the purpose of receiving
779     the reply, and will be C<kil>ed when no longer needed.
780    
781     A reply message sent to the port is passed to the C<$callback> as-is.
782    
783     If an optional time-out (in seconds) is given and it is not C<undef>,
784     then the callback will be called without any arguments after the time-out
785     elapsed and the port is C<kil>ed.
786    
787 root 1.98 If no time-out is given (or it is C<undef>), then the local port will
788     monitor the remote port instead, so it eventually gets cleaned-up.
789 root 1.87
790     Currently this function returns the temporary port, but this "feature"
791     might go in future versions unless you can make a convincing case that
792     this is indeed useful for something.
793    
794     =cut
795    
796     sub cal(@) {
797     my $timeout = ref $_[-1] ? undef : pop;
798     my $cb = pop;
799    
800     my $port = port {
801     undef $timeout;
802     kil $SELF;
803     &$cb;
804     };
805    
806     if (defined $timeout) {
807     $timeout = AE::timer $timeout, 0, sub {
808     undef $timeout;
809     kil $port;
810     $cb->();
811     };
812     } else {
813     mon $_[0], sub {
814     kil $port;
815     $cb->();
816     };
817     }
818    
819     push @_, $port;
820     &snd;
821    
822     $port
823     }
824    
825 root 1.8 =back
826    
827 root 1.26 =head1 AnyEvent::MP vs. Distributed Erlang
828    
829 root 1.35 AnyEvent::MP got lots of its ideas from distributed Erlang (Erlang node
830     == aemp node, Erlang process == aemp port), so many of the documents and
831     programming techniques employed by Erlang apply to AnyEvent::MP. Here is a
832 root 1.27 sample:
833    
834 root 1.95 http://www.erlang.se/doc/programming_rules.shtml
835     http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4
836     http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6
837     http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5
838 root 1.27
839     Despite the similarities, there are also some important differences:
840 root 1.26
841     =over 4
842    
843 root 1.65 =item * Node IDs are arbitrary strings in AEMP.
844 root 1.26
845 root 1.65 Erlang relies on special naming and DNS to work everywhere in the same
846     way. AEMP relies on each node somehow knowing its own address(es) (e.g. by
847 root 1.99 configuration or DNS), and possibly the addresses of some seed nodes, but
848     will otherwise discover other nodes (and their IDs) itself.
849 root 1.27
850 root 1.54 =item * Erlang has a "remote ports are like local ports" philosophy, AEMP
851 root 1.51 uses "local ports are like remote ports".
852    
853     The failure modes for local ports are quite different (runtime errors
854     only) then for remote ports - when a local port dies, you I<know> it dies,
855     when a connection to another node dies, you know nothing about the other
856     port.
857    
858     Erlang pretends remote ports are as reliable as local ports, even when
859     they are not.
860    
861     AEMP encourages a "treat remote ports differently" philosophy, with local
862     ports being the special case/exception, where transport errors cannot
863     occur.
864    
865 root 1.26 =item * Erlang uses processes and a mailbox, AEMP does not queue.
866    
867 root 1.51 Erlang uses processes that selectively receive messages, and therefore
868     needs a queue. AEMP is event based, queuing messages would serve no
869     useful purpose. For the same reason the pattern-matching abilities of
870     AnyEvent::MP are more limited, as there is little need to be able to
871 elmex 1.77 filter messages without dequeuing them.
872 root 1.26
873 root 1.35 (But see L<Coro::MP> for a more Erlang-like process model on top of AEMP).
874 root 1.26
875     =item * Erlang sends are synchronous, AEMP sends are asynchronous.
876    
877 root 1.51 Sending messages in Erlang is synchronous and blocks the process (and
878     so does not need a queue that can overflow). AEMP sends are immediate,
879     connection establishment is handled in the background.
880 root 1.26
881 root 1.51 =item * Erlang suffers from silent message loss, AEMP does not.
882 root 1.26
883 root 1.99 Erlang implements few guarantees on messages delivery - messages can get
884     lost without any of the processes realising it (i.e. you send messages a,
885     b, and c, and the other side only receives messages a and c).
886 root 1.26
887 root 1.66 AEMP guarantees correct ordering, and the guarantee that after one message
888     is lost, all following ones sent to the same port are lost as well, until
889     monitoring raises an error, so there are no silent "holes" in the message
890     sequence.
891 root 1.26
892     =item * Erlang can send messages to the wrong port, AEMP does not.
893    
894 root 1.51 In Erlang it is quite likely that a node that restarts reuses a process ID
895     known to other nodes for a completely different process, causing messages
896     destined for that process to end up in an unrelated process.
897 root 1.26
898     AEMP never reuses port IDs, so old messages or old port IDs floating
899     around in the network will not be sent to an unrelated port.
900    
901     =item * Erlang uses unprotected connections, AEMP uses secure
902     authentication and can use TLS.
903    
904 root 1.66 AEMP can use a proven protocol - TLS - to protect connections and
905 root 1.26 securely authenticate nodes.
906    
907 root 1.28 =item * The AEMP protocol is optimised for both text-based and binary
908     communications.
909    
910 root 1.66 The AEMP protocol, unlike the Erlang protocol, supports both programming
911     language independent text-only protocols (good for debugging) and binary,
912 root 1.67 language-specific serialisers (e.g. Storable). By default, unless TLS is
913     used, the protocol is actually completely text-based.
914 root 1.28
915     It has also been carefully designed to be implementable in other languages
916 root 1.66 with a minimum of work while gracefully degrading functionality to make the
917 root 1.28 protocol simple.
918    
919 root 1.35 =item * AEMP has more flexible monitoring options than Erlang.
920    
921     In Erlang, you can chose to receive I<all> exit signals as messages
922     or I<none>, there is no in-between, so monitoring single processes is
923     difficult to implement. Monitoring in AEMP is more flexible than in
924     Erlang, as one can choose between automatic kill, exit message or callback
925     on a per-process basis.
926    
927 root 1.37 =item * Erlang tries to hide remote/local connections, AEMP does not.
928 root 1.35
929 root 1.67 Monitoring in Erlang is not an indicator of process death/crashes, in the
930     same way as linking is (except linking is unreliable in Erlang).
931 root 1.37
932     In AEMP, you don't "look up" registered port names or send to named ports
933     that might or might not be persistent. Instead, you normally spawn a port
934 root 1.67 on the remote node. The init function monitors you, and you monitor the
935     remote port. Since both monitors are local to the node, they are much more
936     reliable (no need for C<spawn_link>).
937 root 1.37
938     This also saves round-trips and avoids sending messages to the wrong port
939     (hard to do in Erlang).
940 root 1.35
941 root 1.26 =back
942    
943 root 1.46 =head1 RATIONALE
944    
945     =over 4
946    
947 root 1.67 =item Why strings for port and node IDs, why not objects?
948 root 1.46
949     We considered "objects", but found that the actual number of methods
950 root 1.67 that can be called are quite low. Since port and node IDs travel over
951 root 1.46 the network frequently, the serialising/deserialising would add lots of
952 root 1.67 overhead, as well as having to keep a proxy object everywhere.
953 root 1.46
954     Strings can easily be printed, easily serialised etc. and need no special
955     procedures to be "valid".
956    
957 root 1.110 And as a result, a port with just a default receiver consists of a single
958     closure stored in a global hash - it can't become much cheaper.
959 root 1.47
960 root 1.67 =item Why favour JSON, why not a real serialising format such as Storable?
961 root 1.46
962     In fact, any AnyEvent::MP node will happily accept Storable as framing
963     format, but currently there is no way to make a node use Storable by
964 root 1.67 default (although all nodes will accept it).
965 root 1.46
966     The default framing protocol is JSON because a) JSON::XS is many times
967     faster for small messages and b) most importantly, after years of
968     experience we found that object serialisation is causing more problems
969 root 1.67 than it solves: Just like function calls, objects simply do not travel
970 root 1.46 easily over the network, mostly because they will always be a copy, so you
971     always have to re-think your design.
972    
973     Keeping your messages simple, concentrating on data structures rather than
974     objects, will keep your messages clean, tidy and efficient.
975    
976     =back
977    
978 root 1.1 =head1 SEE ALSO
979    
980 root 1.68 L<AnyEvent::MP::Intro> - a gentle introduction.
981    
982     L<AnyEvent::MP::Kernel> - more, lower-level, stuff.
983    
984 root 1.113 L<AnyEvent::MP::Global> - network maintenance and port groups, to find
985 root 1.68 your applications.
986    
987 root 1.105 L<AnyEvent::MP::DataConn> - establish data connections between nodes.
988    
989 root 1.81 L<AnyEvent::MP::LogCatcher> - simple service to display log messages from
990     all nodes.
991    
992 root 1.1 L<AnyEvent>.
993    
994     =head1 AUTHOR
995    
996     Marc Lehmann <schmorp@schmorp.de>
997     http://home.schmorp.de/
998    
999     =cut
1000    
1001     1
1002