ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
Revision: 1.75
Committed: Mon Aug 31 13:18:06 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.74: +10 -11 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP - multi-processing/message-passing framework
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP;
8    
9 root 1.75 $NODE # contains this node's node ID
10     NODE # returns this node's node ID
11 root 1.2
12 root 1.38 $SELF # receiving/own port id in rcv callbacks
13    
14 root 1.48 # initialise the node so it can send/receive messages
15 root 1.72 configure;
16 root 1.48
17 root 1.75 # ports are message destinations
18 root 1.38
19     # sending messages
20 root 1.2 snd $port, type => data...;
21 root 1.38 snd $port, @msg;
22     snd @msg_with_first_element_being_a_port;
23 root 1.2
24 root 1.50 # creating/using ports, the simple way
25 root 1.73 my $simple_port = port { my @msg = @_ };
26 root 1.22
27 root 1.52 # creating/using ports, tagged message matching
28 root 1.38 my $port = port;
29 root 1.73 rcv $port, ping => sub { snd $_[0], "pong" };
30     rcv $port, pong => sub { warn "pong received\n" };
31 root 1.2
32 root 1.48 # create a port on another node
33     my $port = spawn $node, $initfunc, @initdata;
34    
35 root 1.35 # monitoring
36     mon $port, $cb->(@msg) # callback is invoked on death
37     mon $port, $otherport # kill otherport on abnormal death
38     mon $port, $otherport, @msg # send message on death
39    
40 root 1.45 =head1 CURRENT STATUS
41    
42 root 1.71 bin/aemp - stable.
43     AnyEvent::MP - stable API, should work.
44     AnyEvent::MP::Intro - uptodate, but incomplete.
45     AnyEvent::MP::Kernel - mostly stable.
46     AnyEvent::MP::Global - stable API, protocol not yet final.
47 root 1.45
48     stay tuned.
49    
50 root 1.1 =head1 DESCRIPTION
51    
52 root 1.2 This module (-family) implements a simple message passing framework.
53    
54     Despite its simplicity, you can securely message other processes running
55 root 1.67 on the same or other hosts, and you can supervise entities remotely.
56 root 1.2
57 root 1.23 For an introduction to this module family, see the L<AnyEvent::MP::Intro>
58 root 1.67 manual page and the examples under F<eg/>.
59 root 1.23
60 root 1.67 At the moment, this module family is a bit underdocumented.
61 root 1.6
62 root 1.2 =head1 CONCEPTS
63    
64     =over 4
65    
66     =item port
67    
68 root 1.29 A port is something you can send messages to (with the C<snd> function).
69    
70 root 1.53 Ports allow you to register C<rcv> handlers that can match all or just
71 root 1.64 some messages. Messages send to ports will not be queued, regardless of
72     anything was listening for them or not.
73 root 1.2
74 root 1.67 =item port ID - C<nodeid#portname>
75 root 1.2
76 root 1.67 A port ID is the concatenation of a node ID, a hash-mark (C<#>) as
77     separator, and a port name (a printable string of unspecified format).
78 root 1.2
79     =item node
80    
81 root 1.53 A node is a single process containing at least one port - the node port,
82 root 1.67 which enables nodes to manage each other remotely, and to create new
83 root 1.53 ports.
84 root 1.2
85 root 1.67 Nodes are either public (have one or more listening ports) or private
86     (no listening ports). Private nodes cannot talk to other private nodes
87     currently.
88 root 1.2
89 root 1.63 =item node ID - C<[a-za-Z0-9_\-.:]+>
90 root 1.2
91 root 1.64 A node ID is a string that uniquely identifies the node within a
92     network. Depending on the configuration used, node IDs can look like a
93     hostname, a hostname and a port, or a random string. AnyEvent::MP itself
94     doesn't interpret node IDs in any way.
95    
96     =item binds - C<ip:port>
97    
98     Nodes can only talk to each other by creating some kind of connection to
99     each other. To do this, nodes should listen on one or more local transport
100     endpoints - binds. Currently, only standard C<ip:port> specifications can
101     be used, which specify TCP ports to listen on.
102    
103     =item seeds - C<host:port>
104    
105     When a node starts, it knows nothing about the network. To teach the node
106     about the network it first has to contact some other node within the
107     network. This node is called a seed.
108    
109     Seeds are transport endpoint(s) of as many nodes as one wants. Those nodes
110     are expected to be long-running, and at least one of those should always
111     be available. When nodes run out of connections (e.g. due to a network
112     error), they try to re-establish connections to some seednodes again to
113     join the network.
114 root 1.29
115 root 1.67 Apart from being sued for seeding, seednodes are not special in any way -
116     every public node can be a seednode.
117    
118 root 1.2 =back
119    
120 root 1.3 =head1 VARIABLES/FUNCTIONS
121 root 1.2
122     =over 4
123    
124 root 1.1 =cut
125    
126     package AnyEvent::MP;
127    
128 root 1.44 use AnyEvent::MP::Kernel;
129 root 1.2
130 root 1.1 use common::sense;
131    
132 root 1.2 use Carp ();
133    
134 root 1.1 use AE ();
135    
136 root 1.2 use base "Exporter";
137    
138 root 1.44 our $VERSION = $AnyEvent::MP::Kernel::VERSION;
139 root 1.43
140 root 1.8 our @EXPORT = qw(
141 root 1.59 NODE $NODE *SELF node_of after
142 root 1.72 configure
143 root 1.61 snd rcv mon mon_guard kil reg psub spawn
144 root 1.22 port
145 root 1.8 );
146 root 1.2
147 root 1.22 our $SELF;
148    
149     sub _self_die() {
150     my $msg = $@;
151     $msg =~ s/\n+$// unless ref $msg;
152     kil $SELF, die => $msg;
153     }
154    
155     =item $thisnode = NODE / $NODE
156    
157 root 1.67 The C<NODE> function returns, and the C<$NODE> variable contains, the node
158 root 1.64 ID of the node running in the current process. This value is initialised by
159 root 1.72 a call to C<configure>.
160 root 1.22
161 root 1.63 =item $nodeid = node_of $port
162 root 1.22
163 root 1.67 Extracts and returns the node ID from a port ID or a node ID.
164 root 1.34
165 root 1.72 =item configure key => value...
166 root 1.34
167 root 1.64 Before a node can talk to other nodes on the network (i.e. enter
168 root 1.72 "distributed mode") it has to configure itself - the minimum a node needs
169 root 1.64 to know is its own name, and optionally it should know the addresses of
170     some other nodes in the network to discover other nodes.
171 root 1.34
172 root 1.72 This function configures a node - it must be called exactly once (or
173 root 1.34 never) before calling other AnyEvent::MP functions.
174    
175 root 1.72 =over 4
176    
177     =item step 1, gathering configuration from profiles
178    
179     The function first looks up a profile in the aemp configuration (see the
180     L<aemp> commandline utility). The profile name can be specified via the
181     named C<profile> parameter. If it is missing, then the nodename (F<uname
182     -n>) will be used as profile name.
183 root 1.34
184 root 1.72 The profile data is then gathered as follows:
185 root 1.69
186 root 1.70 First, all remaining key => value pairs (all of which are conviniently
187 root 1.72 undocumented at the moment) will be interpreted as configuration
188     data. Then they will be overwritten by any values specified in the global
189     default configuration (see the F<aemp> utility), then the chain of
190     profiles chosen by the profile name (and any C<parent> attributes).
191    
192     That means that the values specified in the profile have highest priority
193     and the values specified directly via C<configure> have lowest priority,
194     and can only be used to specify defaults.
195 root 1.49
196 root 1.64 If the profile specifies a node ID, then this will become the node ID of
197     this process. If not, then the profile name will be used as node ID. The
198     special node ID of C<anon/> will be replaced by a random node ID.
199    
200 root 1.72 =item step 2, bind listener sockets
201    
202 root 1.64 The next step is to look up the binds in the profile, followed by binding
203     aemp protocol listeners on all binds specified (it is possible and valid
204     to have no binds, meaning that the node cannot be contacted form the
205     outside. This means the node cannot talk to other nodes that also have no
206     binds, but it can still talk to all "normal" nodes).
207    
208 root 1.70 If the profile does not specify a binds list, then a default of C<*> is
209 root 1.72 used, meaning the node will bind on a dynamically-assigned port on every
210     local IP address it finds.
211    
212     =item step 3, connect to seed nodes
213 root 1.64
214 root 1.72 As the last step, the seeds list from the profile is passed to the
215 root 1.64 L<AnyEvent::MP::Global> module, which will then use it to keep
216 root 1.72 connectivity with at least one node at any point in time.
217 root 1.64
218 root 1.72 =back
219    
220     Example: become a distributed node using the locla node name as profile.
221     This should be the most common form of invocation for "daemon"-type nodes.
222 root 1.34
223 root 1.72 configure
224 root 1.34
225 root 1.64 Example: become an anonymous node. This form is often used for commandline
226     clients.
227 root 1.34
228 root 1.72 configure nodeid => "anon/";
229    
230     Example: configure a node using a profile called seed, which si suitable
231     for a seed node as it binds on all local addresses on a fixed port (4040,
232     customary for aemp).
233    
234     # use the aemp commandline utility
235 root 1.74 # aemp profile seed nodeid anon/ binds '*:4040'
236 root 1.72
237     # then use it
238     configure profile => "seed";
239 root 1.34
240 root 1.72 # or simply use aemp from the shell again:
241     # aemp run profile seed
242 root 1.34
243 root 1.72 # or provide a nicer-to-remember nodeid
244     # aemp run profile seed nodeid "$(hostname)"
245 root 1.34
246 root 1.22 =item $SELF
247    
248     Contains the current port id while executing C<rcv> callbacks or C<psub>
249     blocks.
250 root 1.3
251 root 1.67 =item *SELF, SELF, %SELF, @SELF...
252 root 1.22
253     Due to some quirks in how perl exports variables, it is impossible to
254 root 1.67 just export C<$SELF>, all the symbols named C<SELF> are exported by this
255 root 1.22 module, but only C<$SELF> is currently used.
256 root 1.3
257 root 1.33 =item snd $port, type => @data
258 root 1.3
259 root 1.33 =item snd $port, @msg
260 root 1.3
261 root 1.67 Send the given message to the given port, which can identify either a
262     local or a remote port, and must be a port ID.
263 root 1.8
264 root 1.67 While the message can be almost anything, it is highly recommended to
265     use a string as first element (a port ID, or some word that indicates a
266     request type etc.) and to consist if only simple perl values (scalars,
267     arrays, hashes) - if you think you need to pass an object, think again.
268    
269     The message data logically becomes read-only after a call to this
270     function: modifying any argument (or values referenced by them) is
271     forbidden, as there can be considerable time between the call to C<snd>
272     and the time the message is actually being serialised - in fact, it might
273     never be copied as within the same process it is simply handed to the
274     receiving port.
275 root 1.3
276     The type of data you can transfer depends on the transport protocol: when
277     JSON is used, then only strings, numbers and arrays and hashes consisting
278     of those are allowed (no objects). When Storable is used, then anything
279     that Storable can serialise and deserialise is allowed, and for the local
280 root 1.67 node, anything can be passed. Best rely only on the common denominator of
281     these.
282 root 1.3
283 root 1.22 =item $local_port = port
284 root 1.2
285 root 1.50 Create a new local port object and returns its port ID. Initially it has
286     no callbacks set and will throw an error when it receives messages.
287 root 1.10
288 root 1.50 =item $local_port = port { my @msg = @_ }
289 root 1.15
290 root 1.50 Creates a new local port, and returns its ID. Semantically the same as
291     creating a port and calling C<rcv $port, $callback> on it.
292 root 1.15
293 root 1.50 The block will be called for every message received on the port, with the
294     global variable C<$SELF> set to the port ID. Runtime errors will cause the
295     port to be C<kil>ed. The message will be passed as-is, no extra argument
296     (i.e. no port ID) will be passed to the callback.
297 root 1.15
298 root 1.50 If you want to stop/destroy the port, simply C<kil> it:
299 root 1.15
300 root 1.50 my $port = port {
301     my @msg = @_;
302     ...
303     kil $SELF;
304 root 1.15 };
305 root 1.10
306     =cut
307    
308 root 1.33 sub rcv($@);
309    
310 root 1.50 sub _kilme {
311     die "received message on port without callback";
312     }
313    
314 root 1.22 sub port(;&) {
315     my $id = "$UNIQ." . $ID++;
316     my $port = "$NODE#$id";
317    
318 root 1.50 rcv $port, shift || \&_kilme;
319 root 1.10
320 root 1.22 $port
321 root 1.10 }
322    
323 root 1.50 =item rcv $local_port, $callback->(@msg)
324 root 1.31
325 root 1.50 Replaces the default callback on the specified port. There is no way to
326     remove the default callback: use C<sub { }> to disable it, or better
327     C<kil> the port when it is no longer needed.
328 root 1.3
329 root 1.33 The global C<$SELF> (exported by this module) contains C<$port> while
330 root 1.50 executing the callback. Runtime errors during callback execution will
331     result in the port being C<kil>ed.
332 root 1.22
333 root 1.50 The default callback received all messages not matched by a more specific
334     C<tag> match.
335 root 1.22
336 root 1.50 =item rcv $local_port, tag => $callback->(@msg_without_tag), ...
337 root 1.3
338 root 1.54 Register (or replace) callbacks to be called on messages starting with the
339     given tag on the given port (and return the port), or unregister it (when
340     C<$callback> is C<$undef> or missing). There can only be one callback
341     registered for each tag.
342 root 1.3
343 root 1.50 The original message will be passed to the callback, after the first
344     element (the tag) has been removed. The callback will use the same
345     environment as the default callback (see above).
346 root 1.3
347 root 1.36 Example: create a port and bind receivers on it in one go.
348    
349     my $port = rcv port,
350 root 1.50 msg1 => sub { ... },
351     msg2 => sub { ... },
352 root 1.36 ;
353    
354     Example: create a port, bind receivers and send it in a message elsewhere
355     in one go:
356    
357     snd $otherport, reply =>
358     rcv port,
359 root 1.50 msg1 => sub { ... },
360 root 1.36 ...
361     ;
362    
363 root 1.54 Example: temporarily register a rcv callback for a tag matching some port
364     (e.g. for a rpc reply) and unregister it after a message was received.
365    
366     rcv $port, $otherport => sub {
367     my @reply = @_;
368    
369     rcv $SELF, $otherport;
370     };
371    
372 root 1.3 =cut
373    
374     sub rcv($@) {
375 root 1.33 my $port = shift;
376 root 1.75 my ($nodeid, $portid) = split /#/, $port, 2;
377 root 1.3
378 root 1.75 $NODE{$nodeid} == $NODE{""}
379 root 1.33 or Carp::croak "$port: rcv can only be called on local ports, caught";
380 root 1.22
381 root 1.50 while (@_) {
382     if (ref $_[0]) {
383     if (my $self = $PORT_DATA{$portid}) {
384     "AnyEvent::MP::Port" eq ref $self
385     or Carp::croak "$port: rcv can only be called on message matching ports, caught";
386 root 1.33
387 root 1.50 $self->[2] = shift;
388     } else {
389     my $cb = shift;
390     $PORT{$portid} = sub {
391     local $SELF = $port;
392     eval { &$cb }; _self_die if $@;
393     };
394     }
395     } elsif (defined $_[0]) {
396     my $self = $PORT_DATA{$portid} ||= do {
397     my $self = bless [$PORT{$port} || sub { }, { }, $port], "AnyEvent::MP::Port";
398    
399     $PORT{$portid} = sub {
400     local $SELF = $port;
401    
402     if (my $cb = $self->[1]{$_[0]}) {
403     shift;
404     eval { &$cb }; _self_die if $@;
405     } else {
406     &{ $self->[0] };
407 root 1.33 }
408     };
409 root 1.50
410     $self
411 root 1.33 };
412    
413 root 1.50 "AnyEvent::MP::Port" eq ref $self
414     or Carp::croak "$port: rcv can only be called on message matching ports, caught";
415 root 1.22
416 root 1.50 my ($tag, $cb) = splice @_, 0, 2;
417 root 1.33
418 root 1.50 if (defined $cb) {
419     $self->[1]{$tag} = $cb;
420 root 1.33 } else {
421 root 1.50 delete $self->[1]{$tag};
422 root 1.33 }
423 root 1.22 }
424 root 1.3 }
425 root 1.31
426 root 1.33 $port
427 root 1.2 }
428    
429 root 1.22 =item $closure = psub { BLOCK }
430 root 1.2
431 root 1.22 Remembers C<$SELF> and creates a closure out of the BLOCK. When the
432     closure is executed, sets up the environment in the same way as in C<rcv>
433     callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
434    
435     This is useful when you register callbacks from C<rcv> callbacks:
436    
437     rcv delayed_reply => sub {
438     my ($delay, @reply) = @_;
439     my $timer = AE::timer $delay, 0, psub {
440     snd @reply, $SELF;
441     };
442     };
443 root 1.3
444 root 1.8 =cut
445 root 1.3
446 root 1.22 sub psub(&) {
447     my $cb = shift;
448 root 1.3
449 root 1.22 my $port = $SELF
450     or Carp::croak "psub can only be called from within rcv or psub callbacks, not";
451 root 1.1
452 root 1.22 sub {
453     local $SELF = $port;
454 root 1.2
455 root 1.22 if (wantarray) {
456     my @res = eval { &$cb };
457     _self_die if $@;
458     @res
459     } else {
460     my $res = eval { &$cb };
461     _self_die if $@;
462     $res
463     }
464     }
465 root 1.2 }
466    
467 root 1.67 =item $guard = mon $port, $cb->(@reason) # call $cb when $port dies
468 root 1.32
469 root 1.67 =item $guard = mon $port, $rcvport # kill $rcvport when $port dies
470 root 1.36
471 root 1.67 =item $guard = mon $port # kill $SELF when $port dies
472 root 1.32
473 root 1.67 =item $guard = mon $port, $rcvport, @msg # send a message when $port dies
474 root 1.32
475 root 1.42 Monitor the given port and do something when the port is killed or
476     messages to it were lost, and optionally return a guard that can be used
477     to stop monitoring again.
478    
479     C<mon> effectively guarantees that, in the absence of hardware failures,
480 root 1.67 after starting the monitor, either all messages sent to the port will
481     arrive, or the monitoring action will be invoked after possible message
482     loss has been detected. No messages will be lost "in between" (after
483     the first lost message no further messages will be received by the
484 root 1.42 port). After the monitoring action was invoked, further messages might get
485     delivered again.
486 root 1.32
487 root 1.67 Note that monitoring-actions are one-shot: once messages are lost (and a
488     monitoring alert was raised), they are removed and will not trigger again.
489 root 1.58
490 root 1.36 In the first form (callback), the callback is simply called with any
491     number of C<@reason> elements (no @reason means that the port was deleted
492 root 1.32 "normally"). Note also that I<< the callback B<must> never die >>, so use
493     C<eval> if unsure.
494    
495 root 1.43 In the second form (another port given), the other port (C<$rcvport>)
496 root 1.36 will be C<kil>'ed with C<@reason>, iff a @reason was specified, i.e. on
497     "normal" kils nothing happens, while under all other conditions, the other
498     port is killed with the same reason.
499 root 1.32
500 root 1.36 The third form (kill self) is the same as the second form, except that
501     C<$rvport> defaults to C<$SELF>.
502    
503     In the last form (message), a message of the form C<@msg, @reason> will be
504     C<snd>.
505 root 1.32
506 root 1.37 As a rule of thumb, monitoring requests should always monitor a port from
507     a local port (or callback). The reason is that kill messages might get
508     lost, just like any other message. Another less obvious reason is that
509     even monitoring requests can get lost (for exmaple, when the connection
510     to the other node goes down permanently). When monitoring a port locally
511     these problems do not exist.
512    
513 root 1.32 Example: call a given callback when C<$port> is killed.
514    
515     mon $port, sub { warn "port died because of <@_>\n" };
516    
517     Example: kill ourselves when C<$port> is killed abnormally.
518    
519 root 1.36 mon $port;
520 root 1.32
521 root 1.36 Example: send us a restart message when another C<$port> is killed.
522 root 1.32
523     mon $port, $self => "restart";
524    
525     =cut
526    
527     sub mon {
528 root 1.75 my ($nodeid, $port) = split /#/, shift, 2;
529 root 1.32
530 root 1.75 my $node = $NODE{$nodeid} || add_node $nodeid;
531 root 1.32
532 root 1.41 my $cb = @_ ? shift : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,';
533 root 1.32
534     unless (ref $cb) {
535     if (@_) {
536     # send a kill info message
537 root 1.41 my (@msg) = ($cb, @_);
538 root 1.32 $cb = sub { snd @msg, @_ };
539     } else {
540     # simply kill other port
541     my $port = $cb;
542     $cb = sub { kil $port, @_ if @_ };
543     }
544     }
545    
546     $node->monitor ($port, $cb);
547    
548     defined wantarray
549     and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }
550     }
551    
552     =item $guard = mon_guard $port, $ref, $ref...
553    
554     Monitors the given C<$port> and keeps the passed references. When the port
555     is killed, the references will be freed.
556    
557     Optionally returns a guard that will stop the monitoring.
558    
559     This function is useful when you create e.g. timers or other watchers and
560 root 1.67 want to free them when the port gets killed (note the use of C<psub>):
561 root 1.32
562     $port->rcv (start => sub {
563 root 1.67 my $timer; $timer = mon_guard $port, AE::timer 1, 1, psub {
564 root 1.32 undef $timer if 0.9 < rand;
565     });
566     });
567    
568     =cut
569    
570     sub mon_guard {
571     my ($port, @refs) = @_;
572    
573 root 1.36 #TODO: mon-less form?
574    
575 root 1.32 mon $port, sub { 0 && @refs }
576     }
577    
578 root 1.33 =item kil $port[, @reason]
579 root 1.32
580     Kill the specified port with the given C<@reason>.
581    
582 root 1.67 If no C<@reason> is specified, then the port is killed "normally" (ports
583     monitoring other ports will not necessarily die because a port dies
584     "normally").
585 root 1.32
586     Otherwise, linked ports get killed with the same reason (second form of
587 root 1.67 C<mon>, see above).
588 root 1.32
589     Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
590     will be reported as reason C<< die => $@ >>.
591    
592     Transport/communication errors are reported as C<< transport_error =>
593     $message >>.
594    
595 root 1.38 =cut
596    
597     =item $port = spawn $node, $initfunc[, @initdata]
598    
599     Creates a port on the node C<$node> (which can also be a port ID, in which
600     case it's the node where that port resides).
601    
602 root 1.67 The port ID of the newly created port is returned immediately, and it is
603     possible to immediately start sending messages or to monitor the port.
604 root 1.38
605 root 1.67 After the port has been created, the init function is called on the remote
606     node, in the same context as a C<rcv> callback. This function must be a
607     fully-qualified function name (e.g. C<MyApp::Chat::Server::init>). To
608     specify a function in the main program, use C<::name>.
609 root 1.38
610     If the function doesn't exist, then the node tries to C<require>
611     the package, then the package above the package and so on (e.g.
612     C<MyApp::Chat::Server>, C<MyApp::Chat>, C<MyApp>) until the function
613     exists or it runs out of package names.
614    
615     The init function is then called with the newly-created port as context
616     object (C<$SELF>) and the C<@initdata> values as arguments.
617    
618 root 1.67 A common idiom is to pass a local port, immediately monitor the spawned
619     port, and in the remote init function, immediately monitor the passed
620     local port. This two-way monitoring ensures that both ports get cleaned up
621     when there is a problem.
622 root 1.38
623     Example: spawn a chat server port on C<$othernode>.
624    
625     # this node, executed from within a port context:
626     my $server = spawn $othernode, "MyApp::Chat::Server::connect", $SELF;
627     mon $server;
628    
629     # init function on C<$othernode>
630     sub connect {
631     my ($srcport) = @_;
632    
633     mon $srcport;
634    
635     rcv $SELF, sub {
636     ...
637     };
638     }
639    
640     =cut
641    
642     sub _spawn {
643     my $port = shift;
644     my $init = shift;
645    
646     local $SELF = "$NODE#$port";
647     eval {
648     &{ load_func $init }
649     };
650     _self_die if $@;
651     }
652    
653     sub spawn(@) {
654 root 1.75 my ($nodeid, undef) = split /#/, shift, 2;
655 root 1.38
656     my $id = "$RUNIQ." . $ID++;
657    
658 root 1.39 $_[0] =~ /::/
659     or Carp::croak "spawn init function must be a fully-qualified name, caught";
660    
661 root 1.75 snd_to_func $nodeid, "AnyEvent::MP::_spawn" => $id, @_;
662 root 1.38
663 root 1.75 "$nodeid#$id"
664 root 1.38 }
665    
666 root 1.59 =item after $timeout, @msg
667    
668     =item after $timeout, $callback
669    
670     Either sends the given message, or call the given callback, after the
671     specified number of seconds.
672    
673 root 1.67 This is simply a utility function that comes in handy at times - the
674     AnyEvent::MP author is not convinced of the wisdom of having it, though,
675     so it may go away in the future.
676 root 1.59
677     =cut
678    
679     sub after($@) {
680     my ($timeout, @action) = @_;
681    
682     my $t; $t = AE::timer $timeout, 0, sub {
683     undef $t;
684     ref $action[0]
685     ? $action[0]()
686     : snd @action;
687     };
688     }
689    
690 root 1.8 =back
691    
692 root 1.26 =head1 AnyEvent::MP vs. Distributed Erlang
693    
694 root 1.35 AnyEvent::MP got lots of its ideas from distributed Erlang (Erlang node
695     == aemp node, Erlang process == aemp port), so many of the documents and
696     programming techniques employed by Erlang apply to AnyEvent::MP. Here is a
697 root 1.27 sample:
698    
699 root 1.35 http://www.Erlang.se/doc/programming_rules.shtml
700     http://Erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4
701     http://Erlang.org/download/Erlang-book-part1.pdf # chapters 5 and 6
702     http://Erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5
703 root 1.27
704     Despite the similarities, there are also some important differences:
705 root 1.26
706     =over 4
707    
708 root 1.65 =item * Node IDs are arbitrary strings in AEMP.
709 root 1.26
710 root 1.65 Erlang relies on special naming and DNS to work everywhere in the same
711     way. AEMP relies on each node somehow knowing its own address(es) (e.g. by
712     configuraiton or DNS), but will otherwise discover other odes itself.
713 root 1.27
714 root 1.54 =item * Erlang has a "remote ports are like local ports" philosophy, AEMP
715 root 1.51 uses "local ports are like remote ports".
716    
717     The failure modes for local ports are quite different (runtime errors
718     only) then for remote ports - when a local port dies, you I<know> it dies,
719     when a connection to another node dies, you know nothing about the other
720     port.
721    
722     Erlang pretends remote ports are as reliable as local ports, even when
723     they are not.
724    
725     AEMP encourages a "treat remote ports differently" philosophy, with local
726     ports being the special case/exception, where transport errors cannot
727     occur.
728    
729 root 1.26 =item * Erlang uses processes and a mailbox, AEMP does not queue.
730    
731 root 1.51 Erlang uses processes that selectively receive messages, and therefore
732     needs a queue. AEMP is event based, queuing messages would serve no
733     useful purpose. For the same reason the pattern-matching abilities of
734     AnyEvent::MP are more limited, as there is little need to be able to
735     filter messages without dequeing them.
736 root 1.26
737 root 1.35 (But see L<Coro::MP> for a more Erlang-like process model on top of AEMP).
738 root 1.26
739     =item * Erlang sends are synchronous, AEMP sends are asynchronous.
740    
741 root 1.51 Sending messages in Erlang is synchronous and blocks the process (and
742     so does not need a queue that can overflow). AEMP sends are immediate,
743     connection establishment is handled in the background.
744 root 1.26
745 root 1.51 =item * Erlang suffers from silent message loss, AEMP does not.
746 root 1.26
747     Erlang makes few guarantees on messages delivery - messages can get lost
748     without any of the processes realising it (i.e. you send messages a, b,
749     and c, and the other side only receives messages a and c).
750    
751 root 1.66 AEMP guarantees correct ordering, and the guarantee that after one message
752     is lost, all following ones sent to the same port are lost as well, until
753     monitoring raises an error, so there are no silent "holes" in the message
754     sequence.
755 root 1.26
756     =item * Erlang can send messages to the wrong port, AEMP does not.
757    
758 root 1.51 In Erlang it is quite likely that a node that restarts reuses a process ID
759     known to other nodes for a completely different process, causing messages
760     destined for that process to end up in an unrelated process.
761 root 1.26
762     AEMP never reuses port IDs, so old messages or old port IDs floating
763     around in the network will not be sent to an unrelated port.
764    
765     =item * Erlang uses unprotected connections, AEMP uses secure
766     authentication and can use TLS.
767    
768 root 1.66 AEMP can use a proven protocol - TLS - to protect connections and
769 root 1.26 securely authenticate nodes.
770    
771 root 1.28 =item * The AEMP protocol is optimised for both text-based and binary
772     communications.
773    
774 root 1.66 The AEMP protocol, unlike the Erlang protocol, supports both programming
775     language independent text-only protocols (good for debugging) and binary,
776 root 1.67 language-specific serialisers (e.g. Storable). By default, unless TLS is
777     used, the protocol is actually completely text-based.
778 root 1.28
779     It has also been carefully designed to be implementable in other languages
780 root 1.66 with a minimum of work while gracefully degrading functionality to make the
781 root 1.28 protocol simple.
782    
783 root 1.35 =item * AEMP has more flexible monitoring options than Erlang.
784    
785     In Erlang, you can chose to receive I<all> exit signals as messages
786     or I<none>, there is no in-between, so monitoring single processes is
787     difficult to implement. Monitoring in AEMP is more flexible than in
788     Erlang, as one can choose between automatic kill, exit message or callback
789     on a per-process basis.
790    
791 root 1.37 =item * Erlang tries to hide remote/local connections, AEMP does not.
792 root 1.35
793 root 1.67 Monitoring in Erlang is not an indicator of process death/crashes, in the
794     same way as linking is (except linking is unreliable in Erlang).
795 root 1.37
796     In AEMP, you don't "look up" registered port names or send to named ports
797     that might or might not be persistent. Instead, you normally spawn a port
798 root 1.67 on the remote node. The init function monitors you, and you monitor the
799     remote port. Since both monitors are local to the node, they are much more
800     reliable (no need for C<spawn_link>).
801 root 1.37
802     This also saves round-trips and avoids sending messages to the wrong port
803     (hard to do in Erlang).
804 root 1.35
805 root 1.26 =back
806    
807 root 1.46 =head1 RATIONALE
808    
809     =over 4
810    
811 root 1.67 =item Why strings for port and node IDs, why not objects?
812 root 1.46
813     We considered "objects", but found that the actual number of methods
814 root 1.67 that can be called are quite low. Since port and node IDs travel over
815 root 1.46 the network frequently, the serialising/deserialising would add lots of
816 root 1.67 overhead, as well as having to keep a proxy object everywhere.
817 root 1.46
818     Strings can easily be printed, easily serialised etc. and need no special
819     procedures to be "valid".
820    
821 root 1.67 And as a result, a miniport consists of a single closure stored in a
822     global hash - it can't become much cheaper.
823 root 1.47
824 root 1.67 =item Why favour JSON, why not a real serialising format such as Storable?
825 root 1.46
826     In fact, any AnyEvent::MP node will happily accept Storable as framing
827     format, but currently there is no way to make a node use Storable by
828 root 1.67 default (although all nodes will accept it).
829 root 1.46
830     The default framing protocol is JSON because a) JSON::XS is many times
831     faster for small messages and b) most importantly, after years of
832     experience we found that object serialisation is causing more problems
833 root 1.67 than it solves: Just like function calls, objects simply do not travel
834 root 1.46 easily over the network, mostly because they will always be a copy, so you
835     always have to re-think your design.
836    
837     Keeping your messages simple, concentrating on data structures rather than
838     objects, will keep your messages clean, tidy and efficient.
839    
840     =back
841    
842 root 1.1 =head1 SEE ALSO
843    
844 root 1.68 L<AnyEvent::MP::Intro> - a gentle introduction.
845    
846     L<AnyEvent::MP::Kernel> - more, lower-level, stuff.
847    
848     L<AnyEvent::MP::Global> - network maintainance and port groups, to find
849     your applications.
850    
851 root 1.1 L<AnyEvent>.
852    
853     =head1 AUTHOR
854    
855     Marc Lehmann <schmorp@schmorp.de>
856     http://home.schmorp.de/
857    
858     =cut
859    
860     1
861