ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Intro.pod
Revision: 1.46
Committed: Sun Mar 4 18:48:27 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.45: +17 -20 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.4 =head1 Message Passing for the Non-Blocked Mind
2 elmex 1.1
3 root 1.8 =head1 Introduction and Terminology
4 elmex 1.1
5 root 1.4 This is a tutorial about how to get the swing of the new L<AnyEvent::MP>
6 root 1.23 module, which allows programs to transparently pass messages within the
7     process and to other processes on the same or a different host.
8 elmex 1.1
9 root 1.23 What kind of messages? Basically a message here means a list of Perl
10 root 1.15 strings, numbers, hashes and arrays, anything that can be expressed as a
11 root 1.43 L<JSON> text (as JSON is the default serialiser in the protocol). Here are
12     two examples:
13 elmex 1.1
14 root 1.23 write_log => 1251555874, "action was successful.\n"
15     123, ["a", "b", "c"], { foo => "bar" }
16 elmex 1.21
17 root 1.23 When using L<AnyEvent::MP> it is customary to use a descriptive string as
18 root 1.46 first element of a message that indicates the type of the message. This
19 root 1.23 element is called a I<tag> in L<AnyEvent::MP>, as some API functions
20     (C<rcv>) support matching it directly.
21    
22     Supposedly you want to send a ping message with your current time to
23     somewhere, this is how such a message might look like (in Perl syntax):
24    
25     ping => 1251381636
26    
27     Now that we know what a message is, to which entities are those
28     messages being I<passed>? They are I<passed> to I<ports>. A I<port> is
29     a destination for messages but also a context to execute code: when
30     a runtime error occurs while executing code belonging to a port, the
31     exception will be raised on the port and can even travel to interested
32     parties on other nodes, which makes supervision of distributed processes
33     easy.
34    
35     How do these ports relate to things you know? Each I<port> belongs
36     to a I<node>, and a I<node> is just the UNIX process that runs your
37     L<AnyEvent::MP> application.
38    
39     Each I<node> is distinguished from other I<nodes> running on the same or
40     another host in a network by its I<node ID>. A I<node ID> is simply a
41     unique string chosen manually or assigned by L<AnyEvent::MP> in some way
42     (UNIX nodename, random string...).
43    
44     Here is a diagram about how I<nodes>, I<ports> and UNIX processes relate
45     to each other. The setup consists of two nodes (more are of course
46     possible): Node C<A> (in UNIX process 7066) with the ports C<ABC> and
47     C<DEF>. And the node C<B> (in UNIX process 8321) with the ports C<FOO> and
48     C<BAR>.
49 elmex 1.17
50    
51     |- PID: 7066 -| |- PID: 8321 -|
52     | | | |
53     | Node ID: A | | Node ID: B |
54     | | | |
55     | Port ABC =|= <----\ /-----> =|= Port FOO |
56     | | X | |
57     | Port DEF =|= <----/ \-----> =|= Port BAR |
58     | | | |
59     |-------------| |-------------|
60    
61 root 1.23 The strings for the I<port IDs> here are just for illustrative
62     purposes: Even though I<ports> in L<AnyEvent::MP> are also identified by
63 root 1.43 strings, they can't be chosen manually and are assigned by the system
64 root 1.23 dynamically. These I<port IDs> are unique within a network and can also be
65 root 1.46 used to identify senders, or even as message tags for instance.
66 root 1.23
67     The next sections will explain the API of L<AnyEvent::MP> by going through
68     a few simple examples. Later some more complex idioms are introduced,
69     which are hopefully useful to solve some real world problems.
70 root 1.8
71 root 1.39 =head2 Passing Your First Message
72 elmex 1.16
73 root 1.46 For starters, let's have a look at the messaging API. The following
74     example is just a demo to show the basic elements of message passing with
75 root 1.24 L<AnyEvent::MP>.
76    
77     The example should print: C<Ending with: 123>, in a rather complicated
78     way, by passing some message to a port.
79 elmex 1.16
80     use AnyEvent;
81     use AnyEvent::MP;
82    
83     my $end_cv = AnyEvent->condvar;
84    
85     my $port = port;
86    
87     rcv $port, test => sub {
88     my ($data) = @_;
89     $end_cv->send ($data);
90     };
91    
92     snd $port, test => 123;
93    
94     print "Ending with: " . $end_cv->recv . "\n";
95    
96 root 1.24 It already uses most of the essential functions inside
97 root 1.46 L<AnyEvent::MP>: First there is the C<port> function which creates a
98 root 1.24 I<port> and will return it's I<port ID>, a simple string.
99    
100     This I<port ID> can be used to send messages to the port and install
101     handlers to receive messages on the port. Since it is a simple string
102     it can be safely passed to other I<nodes> in the network when you want
103     to refer to that specific port (usually used for RPC, where you need
104     to tell the other end which I<port> to send the reply to - messages in
105     L<AnyEvent::MP> have a destination, but no source).
106 elmex 1.17
107 root 1.24 The next function is C<rcv>:
108 elmex 1.16
109 elmex 1.17 rcv $port, test => sub { ... };
110 elmex 1.16
111 root 1.24 It installs a receiver callback on the I<port> that specified as the first
112     argument (it only works for "local" ports, i.e. ports created on the same
113     node). The next argument, in this example C<test>, specifies a I<tag> to
114     match. This means that whenever a message with the first element being
115     the string C<test> is received, the callback is called with the remaining
116 elmex 1.17 parts of that message.
117    
118 root 1.24 Messages can be sent with the C<snd> function, which is used like this in
119     the example above:
120 elmex 1.17
121     snd $port, test => 123;
122    
123 root 1.24 This will send the message C<'test', 123> to the I<port> with the I<port
124     ID> stored in C<$port>. Since in this case the receiver has a I<tag> match
125     on C<test> it will call the callback with the first argument being the
126     number C<123>.
127    
128 root 1.43 The callback is a typical AnyEvent idiom: the callback just passes
129 root 1.24 that number on to the I<condition variable> C<$end_cv> which will then
130     pass the value to the print. Condition variables are out of the scope
131     of this tutorial and not often used with ports, so please consult the
132 elmex 1.17 L<AnyEvent::Intro> about them.
133    
134 root 1.24 Passing messages inside just one process is boring. Before we can move on
135     and do interprocess message passing we first have to make sure some things
136     have been set up correctly for our nodes to talk to each other.
137 elmex 1.17
138 root 1.39 =head2 System Requirements and System Setup
139 elmex 1.17
140 root 1.25 Before we can start with real IPC we have to make sure some things work on
141     your system.
142 elmex 1.17
143 root 1.25 First we have to setup a I<shared secret>: for two L<AnyEvent::MP>
144     I<nodes> to be able to communicate with each other over the network it is
145     necessary to setup the same I<shared secret> for both of them, so they can
146     prove their trustworthyness to each other.
147 elmex 1.17
148     The easiest way is to set this up is to use the F<aemp> utility:
149    
150     aemp gensecret
151    
152 root 1.25 This creates a F<$HOME/.perl-anyevent-mp> config file and generates a
153     random shared secret. You can copy this file to any other system and
154     then communicate over the network (via TCP) with it. You can also select
155     your own shared secret (F<aemp setsecret>) and for increased security
156     requirements you can even create (or configure) a TLS certificate (F<aemp
157     gencert>), causing connections to not just be securely authenticated, but
158     also to be encrypted and protected against tinkering.
159    
160     Connections will only be successfully established when the I<nodes>
161     that want to connect to each other have the same I<shared secret> (or
162     successfully verify the TLS certificate of the other side, in which case
163     no shared secret is required).
164 elmex 1.17
165     B<If something does not work as expected, and for example tcpdump shows
166     that the connections are closed almost immediately, you should make sure
167     that F<~/.perl-anyevent-mp> is the same on all hosts/user accounts that
168     you try to connect with each other!>
169 elmex 1.16
170 root 1.25 Thats is all for now, you will find some more advanced fiddling with the
171     C<aemp> utility later.
172    
173 root 1.35 =head2 Shooting the Trouble
174    
175     Sometimes things go wrong, and AnyEvent::MP, being a professional module,
176 root 1.43 does not gratuitously spill out messages to your screen.
177 root 1.35
178     To help troubleshooting any issues, there are two environment variables
179     that you can set. The first, C<PERL_ANYEVENT_MP_WARNLEVEL> sets the
180     logging level. The default is C<5>, which means nothing much is
181 root 1.43 printed. You can increase it to C<8> or C<9> to get more verbose
182 root 1.35 output. This is example output when starting a node:
183    
184 root 1.46 2012-03-04 19:41:10 <8> node cerebro starting up.
185     2012-03-04 19:41:10 <8> node listens on [10.0.0.1:4040].
186     2012-03-04 19:41:10 <9> trying connect to seed node 10.0.0.19:4040.
187     2012-03-04 19:41:10 <9> 10.0.0.19:4040 connected as rain
188     2012-03-04 19:41:10 <7> rain is up ()
189 root 1.35
190     A lot of info, but at least you can see that it does something.
191    
192     The other environment variable that can be useful is
193     C<PERL_ANYEVENT_MP_TRACE>, which, when set to a true value, will cause
194 root 1.46 most messages that are sent or received to be printed. For example, F<aemp
195     restart rijk> might output these message exchanges:
196 root 1.35
197 root 1.46 SND rijk <- [null,"eval","AnyEvent::Watchdog::Util::restart; ()","aemp/cerebro/z4kUPp2JT4#b"]
198     SND rain <- [null,"g_slave",{"'l":{"aemp/cerebro/z4kUPp2JT4":["10.0.0.1:48168"]}}]
199     SND rain <- [null,"g_find","rijk"]
200     RCV rain -> ["","g_found","rijk",["10.0.0.23:4040"]]
201     RCV rijk -> ["b",""]
202 elmex 1.18
203 root 1.30 =head1 PART 1: Passing Messages Between Processes
204 elmex 1.18
205     =head2 The Receiver
206    
207 root 1.25 Lets split the previous example up into two programs: one that contains
208     the sender and one for the receiver. First the receiver application, in
209     full:
210 elmex 1.18
211     use AnyEvent;
212     use AnyEvent::MP;
213    
214 root 1.45 configure nodeid => "eg_receiver/%u", binds => ["*:4040"];
215 elmex 1.18
216     my $port = port;
217 root 1.45 my $db_guard = db_reg eg_receivers => $port;
218 elmex 1.18
219     rcv $port, test => sub {
220     my ($data, $reply_port) = @_;
221    
222     print "Received data: " . $data . "\n";
223     };
224    
225     AnyEvent->condvar->recv;
226    
227     =head3 AnyEvent::MP::Global
228    
229 root 1.43 Now, that wasn't too bad, was it? OK, let's step through the new functions
230 root 1.25 and modules that have been used.
231    
232     For starters, there is now an additional module being
233     used: L<AnyEvent::MP::Global>. This module provides us with a I<global
234     registry>, which lets us register ports in groups that are visible on all
235     I<nodes> in a network.
236    
237     What is this useful for? Well, the I<port IDs> are random-looking strings,
238     assigned by L<AnyEvent::MP>. We cannot know those I<port IDs> in advance,
239     so we don't know which I<port ID> to send messages to, especially when the
240     message is to be passed between different I<nodes> (or UNIX processes). To
241     find the right I<port> of another I<node> in the network we will need
242     to communicate this somehow to the sender. And exactly that is what
243     L<AnyEvent::MP::Global> provides.
244    
245     Especially in larger, more anonymous networks this is handy: imagine you
246 root 1.43 have a few database backends, a few web front-ends and some processing
247 root 1.25 distributed over a number of hosts: all of these would simply register
248 root 1.43 themselves in the appropriate group, and your web front-ends can start to
249 root 1.25 find some database backend.
250 elmex 1.18
251 root 1.44 =head3 C<configure> and Joining and Maintaining the Network
252 elmex 1.18
253 root 1.28 Now, let's have a look at the new function, C<configure>:
254 elmex 1.18
255 root 1.28 configure nodeid => "eg_receiver", binds => ["*:4040"];
256 elmex 1.18
257     Before we are able to send messages to other nodes we have to initialise
258 root 1.26 ourself to become a "distributed node". Initialising a node means naming
259     the node, optionally binding some TCP listeners so that other nodes can
260     contact it and connecting to a predefined set of seed addresses so the
261     node can discover the existing network - and the existing network can
262     discover the node!
263    
264 root 1.28 All of this (and more) can be passed to the C<configure> function - later
265     we will see how we can do all this without even passing anything to
266     C<configure>!
267    
268     The first parameter, C<nodeid>, specified the node ID (in this case
269     C<eg_receiver> - the default is to use the node name of the current host,
270     but for this example we want to be able to run many nodes on the same
271     machine). Node IDs need to be unique within the network and can be almost
272     any string - if you don't care, you can specify a node ID of C<anon/>
273     which will then be replaced by a random node name.
274    
275     The second parameter, C<binds>, specifies a list of C<address:port> pairs
276     to bind TCP listeners on. The special "address" of C<*> means to bind on
277 root 1.44 every local IP address (this might not work on every OS, so it should not
278     be used unless you know it works).
279 root 1.28
280     The reason to bind on a TCP port is not just that other nodes can connect
281     to us: if no binds are specified, the node will still bind on a dynamic
282     port on all local addresses - but in this case we won't know the port, and
283     cannot tell other nodes to connect to it as seed node.
284    
285     A I<seed> is a (fixed) TCP address of some other node in the network. To
286     explain the need for seeds we have to look at the topology of a typical
287     L<AnyEvent::MP> network. The topology is called a I<fully connected mesh>,
288     here an example with 4 nodes:
289 elmex 1.18
290     N1--N2
291     | \/ |
292     | /\ |
293     N3--N4
294    
295 root 1.28 Now imagine another node - C<N5> - wants to connect itself to that network:
296 elmex 1.18
297     N1--N2
298     | \/ | N5
299     | /\ |
300     N3--N4
301    
302 root 1.26 The new node needs to know the I<binds> of all nodes already
303     connected. Exactly this is what the I<seeds> are for: Let's assume that
304 root 1.43 the new node (C<N5>) uses the TCP address of the node C<N2> as seed. This
305     causes it to connect to C<N2>:
306 elmex 1.18
307     N1--N2____
308     | \/ | N5
309     | /\ |
310     N3--N4
311    
312 root 1.26 C<N2> then tells C<N5> about the I<binds> of the other nodes it is
313     connected to, and C<N5> creates the rest of the connections:
314 elmex 1.18
315     /--------\
316     N1--N2____|
317     | \/ | N5
318     | /\ | /|
319     N3--N4--- |
320     \________/
321    
322 root 1.26 All done: C<N5> is now happily connected to the rest of the network.
323 elmex 1.18
324 root 1.44 Apart form the obvious function - joining the network - seed nodes fulfill
325     another very important function: the connections created by connecting
326     to seed nodes are used to keep the network together - by trying to keep
327     connections to all seed nodes active, the network ensures that it will not
328     split into multiple networks without connection to each other.
329    
330     This means that the graph created by all seed node connections must span
331     the whole network, in some way.
332    
333     There are many ways of doing this - the most simple is probably to use
334     a single set of one or more seednodes as seednodes for all nodes in the
335     network - this creates a "hub" of seednodes that connect to each other,
336     and "leaf" nodes that connect to the nodes in the hub, keeping everything
337     together.
338    
339     The process of joining a network takes time, during which the node is
340     already running. This also means it takes time until the node is fully
341     connected, and global groups and other information is available. The best
342     way to deal with this is to either retry regularly until you found the
343     resource you were looking for, or to only start services on demand after a
344     node has become available.
345 elmex 1.19
346 root 1.28 =head3 Registering the Receiver
347 elmex 1.19
348 root 1.27 Coming back to our example, we have now introduced the basic purpose of
349 root 1.28 L<AnyEvent::MP::Global> and C<configure> and its use of profiles. We
350 root 1.27 also set up our profiles for later use and now we will finally continue
351     talking about the receiver.
352 elmex 1.19
353 root 1.27 Let's look at the next line(s):
354 elmex 1.19
355     my $port = port;
356 root 1.45 my $db_guard = db_reg eg_receivers => $port;
357 elmex 1.19
358 root 1.27 The C<port> function has already been discussed. It simply creates a new
359 root 1.40 I<port> and returns the I<port ID>. The C<grp_reg> function, however, is
360     new: The first argument is the name of a I<global group>, and the second
361 root 1.43 argument is the I<port ID> to register in that group.
362 elmex 1.19
363 root 1.27 You can choose the name of such a I<global group> freely (prefixing your
364 root 1.40 package name is I<highly recommended> however and might be enforce din
365     future versions!). The purpose of such a group is to store a set of port
366     IDs. This set is made available throughout the L<AnyEvent::MP> network,
367     so that each node can see which ports belong to that group.
368 root 1.27
369 root 1.40 Later we will see how the sender looks for the ports in this global
370     group to send messages to them.
371 root 1.27
372     The last step in the example is to set up a receiver callback for those
373     messages, just as was discussed in the first example. We again match
374     for the tag C<test>. The difference is that this time we don't exit the
375     application after receiving the first message. Instead we continue to wait
376     for new messages indefinitely.
377 elmex 1.19
378 elmex 1.20 =head2 The Sender
379 root 1.8
380 root 1.27 Ok, now let's take a look at the sender code:
381 root 1.4
382 elmex 1.1 use AnyEvent;
383     use AnyEvent::MP;
384    
385 root 1.45 configure nodeid => "eg_sender/%u", seeds => ["*:4040"];
386 elmex 1.1
387 elmex 1.20 my $find_timer =
388     AnyEvent->timer (after => 0, interval => 1, cb => sub {
389 root 1.40 my $ports = grp_get "eg_receivers"
390 elmex 1.20 or return;
391    
392     snd $_, test => time
393     for @$ports;
394     });
395 elmex 1.1
396     AnyEvent->condvar->recv;
397    
398 root 1.28 It's even less code. The C<configure> serves the same purpose as in the
399     receiver, but instead of specifying binds we specify a list of seeds -
400     which happens to be the same as the binds used by the receiver, which
401     becomes our seed node.
402 root 1.10
403 root 1.27 Next we set up a timer that repeatedly (every second) calls this chunk of
404     code:
405 elmex 1.1
406 root 1.40 my $ports = grp_get "eg_receivers"
407 elmex 1.20 or return;
408 elmex 1.2
409 elmex 1.20 snd $_, test => time
410     for @$ports;
411 elmex 1.1
412 root 1.40 The only new function here is the C<grp_get> function of
413 root 1.27 L<AnyEvent::MP::Global>. It searches in the global group named
414     C<eg_receivers> for ports. If none are found, it returns C<undef>, which
415     makes our code return instantly and wait for the next round, as nobody is
416     interested in our message.
417    
418     As soon as the receiver application has connected and the information
419     about the newly added port in the receiver has propagated to the sender
420 root 1.40 node, C<grp_get> returns an array reference that contains the I<port ID> of
421 root 1.27 the receiver I<port(s)>.
422    
423     We then just send a message with a tag and the current time to every
424     I<port> in the global group.
425    
426 root 1.28 =head3 Splitting Network Configuration and Application Code
427    
428 root 1.43 OK, so far, this works. In the real world, however, the person configuring
429 root 1.28 your application to run on a specific network (the end user or network
430     administrator) is often different to the person coding the application.
431    
432     Or to put it differently: the arguments passed to configure are usually
433 elmex 1.31 provided not by the programmer, but by whoever is deploying the program.
434 root 1.28
435     To make this easy, AnyEvent::MP supports a simple configuration database,
436     using profiles, which can be managed using the F<aemp> command-line
437 root 1.30 utility (yes, this section is about the advanced tinkering we mentioned
438     before).
439 root 1.28
440     When you change both programs above to simply call
441    
442     configure;
443    
444     then AnyEvent::MP tries to look up a profile using the current node name
445     in its configuration database, falling back to some global default.
446    
447     You can run "generic" nodes using the F<aemp> utility as well, and we will
448     exploit this in the following way: we configure a profile "seed" and run
449     a node using it, whose sole purpose is to be a seed node for our example
450     programs.
451    
452     We bind the seed node to port 4040 on all interfaces:
453    
454 root 1.29 aemp profile seed binds "*:4040"
455 root 1.28
456     And we configure all nodes to use this as seed node (this only works when
457     running on the same host, for multiple machines you would provide the IP
458 root 1.30 address or hostname of the node running the seed), and use a random name
459     (because we want to start multiple nodes on the same host):
460 root 1.28
461 root 1.30 aemp seeds "*:4040" nodeid anon/
462 root 1.28
463     Then we run the seed node:
464    
465     aemp run profile seed
466    
467     After that, we can start as many other nodes as we want, and they will all
468     use our generic seed node to discover each other.
469 root 1.27
470 root 1.28 In fact, starting many receivers nicely illustrates that the time sender
471     can have multiple receivers.
472 elmex 1.7
473 root 1.30 That's all for now - next we will teach you about monitoring by writing a
474     simple chat client and server :)
475    
476     =head1 PART 2: Monitoring, Supervising, Exception Handling and Recovery
477    
478     That's a mouthful, so what does it mean? Our previous example is what one
479     could call "very loosely coupled" - the sender doesn't care about whether
480     there are any receivers, and the receivers do not care if there is any
481     sender.
482    
483     This can work fine for simple services, but most real-world applications
484     want to ensure that the side they are expecting to be there is actually
485     there. Going one step further: most bigger real-world applications even
486     want to ensure that if some component is missing, or has crashed, it will
487     still be there, by recovering and restarting the service.
488    
489     AnyEvent::MP supports this by catching exceptions and network problems,
490     and notifying interested parties of this.
491    
492 root 1.41 =head2 Exceptions, Port Context, Network Errors and Monitors
493 root 1.30
494     =head3 Exceptions
495    
496     Exceptions are handled on a per-port basis: receive callbacks are executed
497 root 1.41 in a special context, the so-called I<port-context>: code that throws an
498     otherwise uncaught exception will cause the port to be C<kil>led. Killed
499     ports are destroyed automatically (killing ports is the only way to free
500     ports, incidentally).
501 root 1.30
502     Ports can be monitored, even from a different host, and when a port is
503     killed any entity monitoring it will be notified.
504    
505     Here is a simple example:
506    
507     use AnyEvent::MP;
508    
509     # create a port, it always dies
510     my $port = port { die "oops" };
511    
512     # monitor it
513     mon $port, sub {
514     warn "$port was killed (with reason @_)";
515     };
516    
517     # now send it some message, causing it to die:
518     snd $port;
519    
520     It first creates a port whose only action is to throw an exception,
521     and the monitors it with the C<mon> function. Afterwards it sends it a
522     message, causing it to die and call the monitoring callback:
523    
524     anon/6WmIpj.a was killed (with reason die oops at xxx line 5.) at xxx line 9.
525    
526     The callback was actually passed two arguments: C<die> (to indicate it did
527     throw an exception as opposed to, say, a network error) and the exception
528     message itself.
529    
530     What happens when a port is killed before we have a chance to monitor
531     it? Granted, this is highly unlikely in our example, but when you program
532     in a network this can easily happen due to races between nodes.
533    
534     use AnyEvent::MP;
535    
536     my $port = port { die "oops" };
537    
538     snd $port;
539    
540     mon $port, sub {
541     warn "$port was killed (with reason @_)";
542     };
543    
544     This time we will get something like:
545    
546     anon/zpX.a was killed (with reason no_such_port cannot monitor nonexistent port)
547    
548     Since the port was already gone, the kill reason is now C<no_such_port>
549     with some descriptive (we hope) error message.
550    
551     In fact, the kill reason is usually some identifier as first argument
552     and a human-readable error message as second argument, but can be about
553     anything (it's a list) or even nothing - which is called a "normal" kill.
554    
555     You can kill ports manually using the C<kil> function, which will be
556     treated like an error when any reason is specified:
557    
558     kil $port, custom_error => "don't like your steenking face";
559    
560     And a clean kill without any reason arguments:
561    
562     kil $port;
563    
564     By now you probably wonder what this "normal" kill business is: A common
565     idiom is to not specify a callback to C<mon>, but another port, such as
566     C<$SELF>:
567    
568     mon $port, $SELF;
569    
570     This basically means "monitor $port and kill me when it crashes". And a
571     "normal" kill does not count as a crash. This way you can easily link
572     ports together and make them crash together on errors (but allow you to
573     remove a port silently).
574    
575 root 1.34 =head3 Port Context
576    
577     When code runs in an environment where C<$SELF> contains its own port ID
578     and exceptions will be caught, it is said to run in a port context.
579    
580     Since AnyEvent::MP is event-based, it is not uncommon to register
581     callbacks from C<rcv> handlers. As example, assume that the port receive
582     handler wants to C<die> a second later, using C<after>:
583    
584     my $port = port {
585     after 1, sub { die "oops" };
586     };
587    
588     Then you will find it does not work - when the after callback is executed,
589     it does not run in port context anymore, so exceptions will not be caught.
590    
591 root 1.41 For these cases, AnyEvent::MP exports a special "closure constructor"
592 root 1.43 called C<psub>, which works just like perls built-in C<sub>:
593 root 1.34
594     my $port = port {
595     after 1, psub { die "oops" };
596     };
597    
598     C<psub> stores C<$SELF> and returns a code reference. When the code
599     reference is invoked, it will run the code block within the context of
600     that port, so exception handling once more works as expected.
601    
602 root 1.41 There is also a way to temporarily execute code in the context of some
603     port, namely C<peval>:
604    
605     peval $port, sub {
606     # die'ing here will kil $port
607     };
608    
609     The C<peval> function temporarily replaces C<$SELF> by the given C<$port>
610     and then executes the given sub in a port context.
611    
612 root 1.30 =head3 Network Errors and the AEMP Guarantee
613    
614     I mentioned another important source of monitoring failures: network
615     problems. When a node loses connection to another node, it will invoke all
616     monitoring actions as if the port was killed, even if it is possible that
617 elmex 1.31 the port still lives happily on another node (not being able to talk to a
618 root 1.30 node means we have no clue what's going on with it, it could be crashed,
619     but also still running without knowing we lost the connection).
620    
621     So another way to view monitors is "notify me when some of my messages
622     couldn't be delivered". AEMP has a guarantee about message delivery to a
623     port: After starting a monitor, any message sent to a port will either
624     be delivered, or, when it is lost, any further messages will also be lost
625 elmex 1.31 until the monitoring action is invoked. After that, further messages
626 root 1.30 I<might> get delivered again.
627    
628     This doesn't sound like a very big guarantee, but it is kind of the best
629 elmex 1.31 you can get while staying sane: Specifically, it means that there will
630     be no "holes" in the message sequence: all messages sent are delivered
631 root 1.30 in order, without any missing in between, and when some were lost, you
632     I<will> be notified of that, so you can take recovery action.
633    
634     =head3 Supervising
635    
636     Ok, so what is this crashing-everything-stuff going to make applications
637     I<more> stable? Well in fact, the goal is not really to make them more
638     stable, but to make them more resilient against actual errors and
639     crashes. And this is not done by crashing I<everything>, but by crashing
640     everything except a supervisor.
641    
642 elmex 1.31 A supervisor is simply some code that ensures that an application (or a
643 root 1.30 part of it) is running, and if it crashes, is restarted properly.
644    
645     To show how to do all this we will create a simple chat server that can
646     handle many chat clients. Both server and clients can be killed and
647     restarted, and even crash, to some extent.
648    
649     =head2 Chatting, the Resilient Way
650    
651     Without further ado, here is the chat server (to run it, we assume the
652     set-up explained earlier, with a separate F<aemp run> seed node):
653    
654     use common::sense;
655     use AnyEvent::MP;
656     use AnyEvent::MP::Global;
657    
658     configure;
659    
660     my %clients;
661    
662     sub msg {
663     print "relaying: $_[0]\n";
664     snd $_, $_[0]
665     for values %clients;
666     }
667    
668     our $server = port;
669    
670     rcv $server, join => sub {
671     my ($client, $nick) = @_;
672    
673     $clients{$client} = $client;
674    
675     mon $client, sub {
676     delete $clients{$client};
677     msg "$nick (quits, @_)";
678     };
679     msg "$nick (joins)";
680     };
681    
682     rcv $server, privmsg => sub {
683     my ($nick, $msg) = @_;
684     msg "$nick: $msg";
685     };
686    
687 root 1.40 grp_reg eg_chat_server => $server;
688 root 1.30
689     warn "server ready.\n";
690    
691     AnyEvent->condvar->recv;
692    
693 elmex 1.31 Looks like a lot, but it is actually quite simple: after your usual
694 root 1.30 preamble (this time we use common sense), we define a helper function that
695     sends some message to every registered chat client:
696    
697     sub msg {
698     print "relaying: $_[0]\n";
699     snd $_, $_[0]
700     for values %clients;
701     }
702    
703     The clients are stored in the hash C<%client>. Then we define a server
704     port and install two receivers on it, C<join>, which is sent by clients
705     to join the chat, and C<privmsg>, that clients use to send actual chat
706     messages.
707    
708     C<join> is most complicated. It expects the client port and the nickname
709     to be passed in the message, and registers the client in C<%clients>.
710    
711     rcv $server, join => sub {
712     my ($client, $nick) = @_;
713    
714     $clients{$client} = $client;
715    
716     The next step is to monitor the client. The monitoring action removes the
717     client and sends a quit message with the error to all remaining clients.
718    
719     mon $client, sub {
720     delete $clients{$client};
721     msg "$nick (quits, @_)";
722     };
723    
724     And finally, it creates a join message and sends it to all clients.
725    
726     msg "$nick (joins)";
727     };
728    
729     The C<privmsg> callback simply broadcasts the message to all clients:
730    
731     rcv $server, privmsg => sub {
732     my ($nick, $msg) = @_;
733     msg "$nick: $msg";
734     };
735    
736 elmex 1.31 And finally, the server registers itself in the server group, so that
737 root 1.30 clients can find it:
738    
739 root 1.40 grp_reg eg_chat_server => $server;
740 root 1.30
741     Well, well... and where is this supervisor stuff? Well... we cheated,
742     it's not there. To not overcomplicate the example, we only put it into
743     the..... CLIENT!
744    
745     =head3 The Client, and a Supervisor!
746    
747     Again, here is the client, including supervisor, which makes it a bit
748     longer:
749    
750     use common::sense;
751     use AnyEvent::MP;
752     use AnyEvent::MP::Global;
753    
754     my $nick = shift;
755    
756     configure;
757    
758     my ($client, $server);
759    
760     sub server_connect {
761 root 1.40 my $servernodes = grp_get "eg_chat_server"
762 root 1.30 or return after 1, \&server_connect;
763    
764     print "\rconnecting...\n";
765    
766     $client = port { print "\r \r@_\n> " };
767     mon $client, sub {
768     print "\rdisconnected @_\n";
769     &server_connect;
770     };
771    
772     $server = $servernodes->[0];
773     snd $server, join => $client, $nick;
774     mon $server, $client;
775     }
776    
777     server_connect;
778    
779 root 1.34 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
780 root 1.30 chomp (my $line = <STDIN>);
781     print "> ";
782     snd $server, privmsg => $nick, $line
783     if $server;
784     });
785    
786     $| = 1;
787     print "> ";
788     AnyEvent->condvar->recv;
789    
790     The first thing the client does is to store the nick name (which is
791     expected as the only command line argument) in C<$nick>, for further
792     usage.
793    
794     The next relevant thing is... finally... the supervisor:
795    
796     sub server_connect {
797 root 1.40 my $servernodes = grp_get "eg_chat_server"
798 root 1.30 or return after 1, \&server_connect;
799    
800     This looks up the server in the C<eg_chat_server> global group. If it
801     cannot find it (which is likely when the node is just starting up),
802     it will wait a second and then retry. This "wait a bit and retry"
803     is an important pattern, as distributed programming means lots of
804     things are going on asynchronously. In practise, one should use a more
805     intelligent algorithm, to possibly warn after an excessive number of
806     retries. Hopefully future versions of AnyEvent::MP will offer some
807     predefined supervisors, for now you will have to code it on your own.
808    
809     Next it creates a local port for the server to send messages to, and
810     monitors it. When the port is killed, it will print "disconnected" and
811     tell the supervisor function to retry again.
812    
813     $client = port { print "\r \r@_\n> " };
814     mon $client, sub {
815     print "\rdisconnected @_\n";
816     &server_connect;
817     };
818    
819     Then everything is ready: the client will send a C<join> message with it's
820     local port to the server, and start monitoring it:
821    
822     $server = $servernodes->[0];
823     snd $server, join => $client, $nick;
824     mon $server, $client;
825     }
826    
827     The monitor will ensure that if the server crashes or goes away, the
828     client will be killed as well. This tells the user that the client was
829     disconnected, and will then start to connect the server again.
830    
831     The rest of the program deals with the boring details of actually invoking
832     the supervisor function to start the whole client process and handle the
833     actual terminal input, sending it to the server.
834    
835 elmex 1.31 You should now try to start the server and one or more clients in different
836 root 1.30 terminal windows (and the seed node):
837    
838     perl eg/chat_client nick1
839     perl eg/chat_client nick2
840     perl eg/chat_server
841     aemp run profile seed
842    
843     And then you can experiment with chatting, killing one or more clients, or
844     stopping and restarting the server, to see the monitoring in action.
845    
846 root 1.33 The crucial point you should understand from this example is that
847     monitoring is usually symmetric: when you monitor some other port,
848     potentially on another node, that other port usually should monitor you,
849     too, so when the connection dies, both ports get killed, or at least both
850     sides can take corrective action. Exceptions are "servers" that serve
851     multiple clients at once and might only wish to clean up, and supervisors,
852     who of course should not normally get killed (unless they, too, have a
853     supervisor).
854    
855     If you often think in object-oriented terms, then treat a port as an
856     object, C<port> is the constructor, the receive callbacks set by C<rcv>
857     act as methods, the C<kil> function becomes the explicit destructor and
858     C<mon> installs a destructor hook. Unlike conventional object oriented
859     programming, it can make sense to exchange ports more freely (for example,
860     to monitor one port from another).
861    
862 root 1.30 There is ample room for improvement: the server should probably remember
863     the nickname in the C<join> handler instead of expecting it in every chat
864     message, it should probably monitor itself, and the client should not try
865     to send any messages unless a server is actually connected.
866    
867     =head1 PART 3: TIMTOWTDI: Virtual Connections
868    
869 root 1.34 The chat system developed in the previous sections is very "traditional"
870     in a way: you start some server(s) and some clients statically and they
871     start talking to each other.
872    
873     Sometimes applications work more like "services": They can run on almost
874     any node and talks to itself on other nodes. The L<AnyEvent::MP::Global>
875     service for example monitors nodes joining the network and starts itself
876     automatically on other nodes (if it isn't running already).
877    
878     A good way to design such applications is to put them into a module and
879     create "virtual connections" to other nodes - we call this the "bridge
880     head" method, because you start by creating a remote port (the bridge
881     head) and from that you start to bootstrap your application.
882    
883     Since that sounds rather theoretical, let's redesign the chat server and
884     client using this design method.
885    
886     Here is the server:
887    
888     use common::sense;
889     use AnyEvent::MP;
890     use AnyEvent::MP::Global;
891    
892     configure;
893    
894 root 1.40 grp_reg eg_chat_server2 => $NODE;
895 root 1.34
896     my %clients;
897    
898     sub msg {
899     print "relaying: $_[0]\n";
900     snd $_, $_[0]
901     for values %clients;
902     }
903    
904     sub client_connect {
905     my ($client, $nick) = @_;
906    
907     mon $client;
908     mon $client, sub {
909     delete $clients{$client};
910     msg "$nick (quits, @_)";
911     };
912    
913     $clients{$client} = $client;
914    
915     msg "$nick (joins)";
916    
917     rcv $SELF, sub { msg "$nick: $_[0]" };
918     }
919    
920     warn "server ready.\n";
921    
922     AnyEvent->condvar->recv;
923    
924 root 1.39 It starts out not much different then the previous example, except that
925     this time, we register the node port in the global group and not any port
926     we created - the clients only want to know which node the server should be
927     running on. In fact, they could also use some kind of election mechanism,
928     to find the node with lowest load or something like that.
929    
930     The more interesting change is that indeed no server port is created -
931     the server consists only of code, and "does" nothing by itself. All it
932     does is define a function C<client_connect>, which expects a client port
933     and a nick name as arguments. It then monitors the client port and binds
934     a receive callback on C<$SELF>, which expects messages that in turn are
935     broadcast to all clients.
936 root 1.34
937     The two C<mon> calls are a bit tricky - the first C<mon> is a shorthand
938     for C<mon $client, $SELF>. The second does the normal "client has gone
939     away" clean-up action. Both could actually be rolled into one C<mon>
940     action.
941    
942 root 1.39 C<$SELF> is a good hint that something interesting is going on. And
943     indeed, when looking at the client code, there is a new function,
944     C<spawn>:
945 root 1.34
946     use common::sense;
947     use AnyEvent::MP;
948     use AnyEvent::MP::Global;
949    
950     my $nick = shift;
951    
952     configure;
953    
954     $| = 1;
955    
956     my $port = port;
957    
958     my ($client, $server);
959    
960     sub server_connect {
961 root 1.40 my $servernodes = grp_get "eg_chat_server2"
962 root 1.34 or return after 1, \&server_connect;
963    
964     print "\rconnecting...\n";
965    
966     $client = port { print "\r \r@_\n> " };
967     mon $client, sub {
968     print "\rdisconnected @_\n";
969     &server_connect;
970     };
971    
972     $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
973     mon $server, $client;
974     }
975    
976     server_connect;
977    
978     my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
979     chomp (my $line = <STDIN>);
980     print "> ";
981     snd $server, $line
982     if $server;
983     });
984    
985     print "> ";
986     AnyEvent->condvar->recv;
987    
988     The client is quite similar to the previous one, but instead of contacting
989 root 1.39 the server I<port> (which no longer exists), it C<spawn>s (creates) a new
990     the server I<port on node>:
991 root 1.34
992     $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
993     mon $server, $client;
994    
995 root 1.39 And of course the first thing after creating it is monitoring it.
996 root 1.34
997 root 1.39 The C<spawn> function creates a new port on a remote node and returns
998     its port ID. After creating the port it calls a function on the remote
999     node, passing any remaining arguments to it, and - most importantly -
1000     executes the function within the context of the new port, so it can be
1001 root 1.43 manipulated by referring to C<$SELF>. The init function can reside in a
1002 root 1.39 module (actually it normally I<should> reside in a module) - AnyEvent::MP
1003     will automatically load the module if the function isn't defined.
1004    
1005     The C<spawn> function returns immediately, which means you can instantly
1006 root 1.34 send messages to the port, long before the remote node has even heard
1007     of our request to create a port on it. In fact, the remote node might
1008     not even be running. Despite these troubling facts, everything should
1009     work just fine: if the node isn't running (or the init function throws an
1010     exception), then the monitor will trigger because the port doesn't exist.
1011    
1012     If the spawn message gets delivered, but the monitoring message is not
1013 root 1.39 because of network problems (extremely unlikely, but monitoring, after
1014     all, is implemented by passing a message, and messages can get lost), then
1015     this connection loss will eventually trigger the monitoring action. On the
1016     remote node (which in return monitors the client) the port will also be
1017     cleaned up on connection loss. When the remote node comes up again and our
1018     monitoring message can be delivered, it will instantly fail because the
1019     port has been cleaned up in the meantime.
1020 root 1.34
1021     If your head is spinning by now, that's fine - just keep in mind, after
1022 root 1.39 creating a port, monitor it on the local node, and monitor "the other
1023     side" from the remote node, and all will be cleaned up just fine.
1024 root 1.34
1025 root 1.36 =head2 Services
1026 root 1.34
1027 root 1.36 Above it was mentioned that C<spawn> automatically loads modules, and this
1028     can be exploited in various ways.
1029    
1030     Assume for a moment you put the server into a file called
1031     F<mymod/chatserver.pm> reachable from the current directory. Then you
1032     could run a node there with:
1033    
1034     aemp run
1035    
1036     The other nodes could C<spawn> the server by using
1037     C<mymod::chatserver::client_connect> as init function.
1038    
1039     Likewise, when you have some service that starts automatically (similar to
1040     AnyEvent::MP::Global), then you can configure this service statically:
1041    
1042     aemp profile mysrvnode services mymod::service::
1043     aemp run profile mysrvnode
1044    
1045 root 1.39 And the module will automatically be loaded in the node, as specifying a
1046 root 1.38 module name (with C<::>-suffix) will simply load the module, which is then
1047     free to do whatever it wants.
1048 root 1.36
1049     Of course, you can also do it in the much more standard way by writing
1050     a module (e.g. C<BK::Backend::IRC>), installing it as part of a module
1051     distribution and then configure nodes, for example, if I want to run the
1052     Bummskraut IRC backend on a machine named "ruth", I could do this:
1053    
1054     aemp profile ruth addservice BK::Backend::IRC::
1055    
1056 root 1.43 And any F<aemp run> on that host will automatically have the Bummskraut
1057     IRC backend running.
1058 root 1.36
1059     That's plenty of possibilities you can use - it's all up to you how you
1060     structure your application.
1061 elmex 1.7
1062 root 1.42 =head1 PART 4: Coro::MP - selective receive
1063    
1064     Not all problems lend themselves naturally to an event-based solution:
1065     sometimes things are easier if you can decide in what order you want to
1066     receive messages, irregardless of the order in which they were sent.
1067    
1068     In these cases, L<Coro::MP> can provide a nice solution: instead of
1069     registering callbacks for each message type, C<Coro::MP> attached a
1070     (coro-) thread to a port. The thread can then opt to selectively receive
1071     messages it is interested in. Other messages are not lost, but queued, and
1072     can be received at a later time.
1073    
1074 root 1.43 The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate
1075 root 1.42 module. It is, however, tightly integrated into C<AnyEvent::MP> - the
1076     ports it creates are fully compatible to C<AnyEvent::MP> ports.
1077    
1078     In fact, C<Coro::MP> is more of an extension than a separate module: all
1079     functions exported by C<AnyEvent::MP> are exported by it as well.
1080    
1081     To illustrate how programing with C<Coro::MP> looks like, consider the
1082     following (slightly contrived) example: Let's implement a server that
1083     accepts a C<< (write_file =>, $port, $path) >> message with a (source)
1084     port and a filename, followed by as many C<< (data => $port, $data) >>
1085     messages as required to fill the file, followed by an empty C<< (data =>
1086     $port) >> message.
1087    
1088     The server only writes a single file at a time, other requests will stay
1089     in the queue until the current file has been finished.
1090    
1091     Here is an example implementation that uses L<Coro::AIO> and largely
1092     ignores error handling:
1093    
1094     my $ioserver = port_async {
1095     while () {
1096     my ($tag, $port, $path) = get_cond;
1097    
1098     $tag eq "write_file"
1099     or die "only write_file messages expected";
1100    
1101     my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1102     or die "$path: $!";
1103    
1104     while () {
1105     my (undef, undef, $data) = get_cond {
1106     $_[0] eq "data" && $_[1] eq $port
1107     } 5
1108     or die "timeout waiting for data message from $port\n";
1109    
1110     length $data or last;
1111    
1112     aio_write $fh, undef, undef, $data, 0;
1113     };
1114     }
1115     };
1116    
1117     mon $ioserver, sub {
1118     warn "ioserver was killed: @_\n";
1119     };
1120    
1121     Let's go through it part by part.
1122    
1123     my $ioserver = port_async {
1124    
1125 root 1.43 Ports can be created by attaching a thread to an existing port via
1126     C<rcv_async>, or as here by calling C<port_async> with the code to execute
1127 root 1.42 as a thread. The C<async> component comes from the fact that threads are
1128     created using the C<Coro::async> function.
1129    
1130     The thread runs in a normal port context (so C<$SELF> is set). In
1131     addition, when the thread returns, it will be C<kil> I<normally>, i.e.
1132     without a reason argument.
1133    
1134     while () {
1135     my ($tag, $port, $path) = get_cond;
1136     or die "only write_file messages expected";
1137    
1138     The thread is supposed to serve many file writes, which is why it executes
1139     in a loop. The first thing it does is fetch the next message, using
1140     C<get_cond>, the "conditional message get". Without a condition, it simply
1141     fetches the next message from the queue, which I<must> be a C<write_file>
1142     message.
1143    
1144     The message contains the C<$path> to the file, which is then created:
1145    
1146     my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1147     or die "$path: $!";
1148    
1149     Then we enter a loop again, to serve as many C<data> messages as
1150 root 1.43 necessary:
1151 root 1.42
1152     while () {
1153     my (undef, undef, $data) = get_cond {
1154     $_[0] eq "data" && $_[1] eq $port
1155     } 5
1156     or die "timeout waiting for data message from $port\n";
1157    
1158     This time, the condition is not empty, but instead a code block: similarly
1159     to grep, the code block will be called with C<@_> set to each message in
1160     the queue, and it has to return whether it wants to receive the message or
1161     not.
1162    
1163     In this case we are interested in C<data> messages (C<< $_[0] eq "data"
1164     >>), whose first element is the source port (C<< $_[1] eq $port >>).
1165    
1166     The condition must be this strict, as it is possible to receive both
1167     C<write_file> messages and C<data> messages from other ports while we
1168     handle the file writing.
1169    
1170     The lone C<5> at the end is a timeout - when no matching message is
1171     received within C<5> seconds, we assume an error and C<die>.
1172    
1173     When an empty C<data> message is received we are done and can close the
1174     file (which is done automatically as C<$fh> goes out of scope):
1175    
1176     length $data or last;
1177    
1178     Otherwise we need to write the data:
1179    
1180     aio_write $fh, undef, undef, $data, 0;
1181    
1182 root 1.43 That's basically it. Note that every process should have some kind of
1183 root 1.42 supervisor. In our case, the supervisor simply prints any error message:
1184    
1185     mon $ioserver, sub {
1186     warn "ioserver was killed: @_\n";
1187     };
1188    
1189     Here is a usage example:
1190    
1191     port_async {
1192     snd $ioserver, write_file => $SELF, "/tmp/unsafe";
1193     snd $ioserver, data => $SELF, "abc\n";
1194     snd $ioserver, data => $SELF, "def\n";
1195     snd $ioserver, data => $SELF;
1196     };
1197    
1198     The messages are sent without any flow control or acknowledgement (feel
1199     free to improve). Also, the source port does not actually need to be a
1200     port - any unique ID will do - but port identifiers happen to be a simple
1201     source of network-wide unique IDs.
1202    
1203     Apart from C<get_cond> as seen above, there are other ways to receive
1204     messages. The C<write_file> message above could also selectively be
1205     received using a C<get> call:
1206    
1207     my ($port, $path) = get "write_file";
1208    
1209     This is simpler, but when some other code part sends an unexpected message
1210     to the C<$ioserver> it will stay in the queue forever. As a rule of thumb,
1211     every threaded port should have a "fetch next message unconditionally"
1212     somewhere, to avoid filling up the queue.
1213    
1214     It is also possible to switch-like C<get_conds>:
1215    
1216     get_cond {
1217     $_[0] eq "msg1" and return sub {
1218     my (undef, @msg1_data) = @_;
1219     ...;
1220     };
1221    
1222     $_[0] eq "msg2" and return sub {
1223     my (undef, @msg2_data) = @_;
1224     ...;
1225     };
1226    
1227     die "unexpected message $_[0] received";
1228     };
1229    
1230 root 1.37 =head1 THE END
1231    
1232     This is the end of this introduction, but hopefully not the end of
1233 root 1.43 your career as AEMP user. I hope the tutorial was enough to make the
1234 root 1.37 basic concepts clear. Keep in mind that distributed programming is not
1235     completely trivial, that AnyEvent::MP is still in it's infancy, and I hope
1236     it will be useful to create exciting new applications.
1237    
1238 elmex 1.1 =head1 SEE ALSO
1239    
1240     L<AnyEvent::MP>
1241    
1242 elmex 1.20 L<AnyEvent::MP::Global>
1243    
1244 root 1.42 L<Coro::MP>
1245    
1246 root 1.34 L<AnyEvent>
1247    
1248 elmex 1.1 =head1 AUTHOR
1249    
1250     Robin Redeker <elmex@ta-sa.org>
1251 root 1.32 Marc Lehmann <schmorp@schmorp.de>
1252 root 1.4