ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Intro.pod
Revision: 1.44
Committed: Tue Feb 28 18:37:24 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.43: +24 -8 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 Message Passing for the Non-Blocked Mind
2
3 =head1 Introduction and Terminology
4
5 This is a tutorial about how to get the swing of the new L<AnyEvent::MP>
6 module, which allows programs to transparently pass messages within the
7 process and to other processes on the same or a different host.
8
9 What kind of messages? Basically a message here means a list of Perl
10 strings, numbers, hashes and arrays, anything that can be expressed as a
11 L<JSON> text (as JSON is the default serialiser in the protocol). Here are
12 two examples:
13
14 write_log => 1251555874, "action was successful.\n"
15 123, ["a", "b", "c"], { foo => "bar" }
16
17 When using L<AnyEvent::MP> it is customary to use a descriptive string as
18 first element of a message, that indicates the type of the message. This
19 element is called a I<tag> in L<AnyEvent::MP>, as some API functions
20 (C<rcv>) support matching it directly.
21
22 Supposedly you want to send a ping message with your current time to
23 somewhere, this is how such a message might look like (in Perl syntax):
24
25 ping => 1251381636
26
27 Now that we know what a message is, to which entities are those
28 messages being I<passed>? They are I<passed> to I<ports>. A I<port> is
29 a destination for messages but also a context to execute code: when
30 a runtime error occurs while executing code belonging to a port, the
31 exception will be raised on the port and can even travel to interested
32 parties on other nodes, which makes supervision of distributed processes
33 easy.
34
35 How do these ports relate to things you know? Each I<port> belongs
36 to a I<node>, and a I<node> is just the UNIX process that runs your
37 L<AnyEvent::MP> application.
38
39 Each I<node> is distinguished from other I<nodes> running on the same or
40 another host in a network by its I<node ID>. A I<node ID> is simply a
41 unique string chosen manually or assigned by L<AnyEvent::MP> in some way
42 (UNIX nodename, random string...).
43
44 Here is a diagram about how I<nodes>, I<ports> and UNIX processes relate
45 to each other. The setup consists of two nodes (more are of course
46 possible): Node C<A> (in UNIX process 7066) with the ports C<ABC> and
47 C<DEF>. And the node C<B> (in UNIX process 8321) with the ports C<FOO> and
48 C<BAR>.
49
50
51 |- PID: 7066 -| |- PID: 8321 -|
52 | | | |
53 | Node ID: A | | Node ID: B |
54 | | | |
55 | Port ABC =|= <----\ /-----> =|= Port FOO |
56 | | X | |
57 | Port DEF =|= <----/ \-----> =|= Port BAR |
58 | | | |
59 |-------------| |-------------|
60
61 The strings for the I<port IDs> here are just for illustrative
62 purposes: Even though I<ports> in L<AnyEvent::MP> are also identified by
63 strings, they can't be chosen manually and are assigned by the system
64 dynamically. These I<port IDs> are unique within a network and can also be
65 used to identify senders or as message tags for instance.
66
67 The next sections will explain the API of L<AnyEvent::MP> by going through
68 a few simple examples. Later some more complex idioms are introduced,
69 which are hopefully useful to solve some real world problems.
70
71 =head2 Passing Your First Message
72
73 As a start lets have a look at the messaging API. The following example
74 is just a demo to show the basic elements of message passing with
75 L<AnyEvent::MP>.
76
77 The example should print: C<Ending with: 123>, in a rather complicated
78 way, by passing some message to a port.
79
80 use AnyEvent;
81 use AnyEvent::MP;
82
83 my $end_cv = AnyEvent->condvar;
84
85 my $port = port;
86
87 rcv $port, test => sub {
88 my ($data) = @_;
89 $end_cv->send ($data);
90 };
91
92 snd $port, test => 123;
93
94 print "Ending with: " . $end_cv->recv . "\n";
95
96 It already uses most of the essential functions inside
97 L<AnyEvent::MP>: First there is the C<port> function which will create a
98 I<port> and will return it's I<port ID>, a simple string.
99
100 This I<port ID> can be used to send messages to the port and install
101 handlers to receive messages on the port. Since it is a simple string
102 it can be safely passed to other I<nodes> in the network when you want
103 to refer to that specific port (usually used for RPC, where you need
104 to tell the other end which I<port> to send the reply to - messages in
105 L<AnyEvent::MP> have a destination, but no source).
106
107 The next function is C<rcv>:
108
109 rcv $port, test => sub { ... };
110
111 It installs a receiver callback on the I<port> that specified as the first
112 argument (it only works for "local" ports, i.e. ports created on the same
113 node). The next argument, in this example C<test>, specifies a I<tag> to
114 match. This means that whenever a message with the first element being
115 the string C<test> is received, the callback is called with the remaining
116 parts of that message.
117
118 Messages can be sent with the C<snd> function, which is used like this in
119 the example above:
120
121 snd $port, test => 123;
122
123 This will send the message C<'test', 123> to the I<port> with the I<port
124 ID> stored in C<$port>. Since in this case the receiver has a I<tag> match
125 on C<test> it will call the callback with the first argument being the
126 number C<123>.
127
128 The callback is a typical AnyEvent idiom: the callback just passes
129 that number on to the I<condition variable> C<$end_cv> which will then
130 pass the value to the print. Condition variables are out of the scope
131 of this tutorial and not often used with ports, so please consult the
132 L<AnyEvent::Intro> about them.
133
134 Passing messages inside just one process is boring. Before we can move on
135 and do interprocess message passing we first have to make sure some things
136 have been set up correctly for our nodes to talk to each other.
137
138 =head2 System Requirements and System Setup
139
140 Before we can start with real IPC we have to make sure some things work on
141 your system.
142
143 First we have to setup a I<shared secret>: for two L<AnyEvent::MP>
144 I<nodes> to be able to communicate with each other over the network it is
145 necessary to setup the same I<shared secret> for both of them, so they can
146 prove their trustworthyness to each other.
147
148 The easiest way is to set this up is to use the F<aemp> utility:
149
150 aemp gensecret
151
152 This creates a F<$HOME/.perl-anyevent-mp> config file and generates a
153 random shared secret. You can copy this file to any other system and
154 then communicate over the network (via TCP) with it. You can also select
155 your own shared secret (F<aemp setsecret>) and for increased security
156 requirements you can even create (or configure) a TLS certificate (F<aemp
157 gencert>), causing connections to not just be securely authenticated, but
158 also to be encrypted and protected against tinkering.
159
160 Connections will only be successfully established when the I<nodes>
161 that want to connect to each other have the same I<shared secret> (or
162 successfully verify the TLS certificate of the other side, in which case
163 no shared secret is required).
164
165 B<If something does not work as expected, and for example tcpdump shows
166 that the connections are closed almost immediately, you should make sure
167 that F<~/.perl-anyevent-mp> is the same on all hosts/user accounts that
168 you try to connect with each other!>
169
170 Thats is all for now, you will find some more advanced fiddling with the
171 C<aemp> utility later.
172
173 =head2 Shooting the Trouble
174
175 Sometimes things go wrong, and AnyEvent::MP, being a professional module,
176 does not gratuitously spill out messages to your screen.
177
178 To help troubleshooting any issues, there are two environment variables
179 that you can set. The first, C<PERL_ANYEVENT_MP_WARNLEVEL> sets the
180 logging level. The default is C<5>, which means nothing much is
181 printed. You can increase it to C<8> or C<9> to get more verbose
182 output. This is example output when starting a node:
183
184 2009-08-31 19:51:50 <8> node anon/5RloFvvYL8jfSScXNL8EpX starting up.
185 2009-08-31 19:51:50 <7> starting global service.
186 2009-08-31 19:51:50 <9> 10.0.0.17:4040 connected as ruth
187 2009-08-31 19:51:50 <7> ruth is up ()
188 2009-08-31 19:51:50 <9> ruth told us it knows about {"doom":["10.0.0.5:45143"],"rain":["10.0.0.19:4040"],"anon/4SYrtJ3ft5l1C16w2hto3t":["10.0.0.1:45920","[2002:58c6:438b:20:21d:60ff:fee8:6e36]:35788","[fd00::a00:1]:37104"],"frank":["10.0.0.18:4040"]}.
189 2009-08-31 19:51:50 <9> connecting to doom with [10.0.0.5:45143]
190 2009-08-31 19:51:50 <9> connecting to anon/4SYrtJ3ft5l1C16w2hto3t with [10.0.0.1:45920 [2002:58c6:438b:20:21d:60ff:fee8:6e36]:35788 [fd00::a00:1]:37104]
191 2009-08-31 19:51:50 <9> ruth told us its addresses (10.0.0.17:4040).
192
193 A lot of info, but at least you can see that it does something.
194
195 The other environment variable that can be useful is
196 C<PERL_ANYEVENT_MP_TRACE>, which, when set to a true value, will cause
197 most messages that are sent or received to be printed. In the above
198 example you would see something like:
199
200 SND ruth <- ["addr",["10.0.0.1:49358","[2002:58c6:438b:20:21d:60ff:fee8:6e36]:58884","[fd00::a00:1]:45006"]]
201 RCV ruth -> ["","AnyEvent::MP::_spawn","20QA7cWubCLTWUhFgBKOx2.x","AnyEvent::MP::Global::connect",0,"ruth"]
202 RCV ruth -> ["","mon1","20QA7cWubCLTWUhFgBKOx2.x"]
203 RCV ruth -> ["20QA7cWubCLTWUhFgBKOx2.x","addr",["10.0.0.17:4040"]]
204 RCV ruth -> ["20QA7cWubCLTWUhFgBKOx2.x","nodes",{"doom":["10.0.0.5:45143"],"rain":["10.0.0.19:4040"],"anon/4SYrtJ3ft5l1C16w2hto3t":["10.0.0.1:45920","[2002:58c6:438b:20:21d:60ff:fee8:6e36]:35788","[fd00::a00:1]:37104"],"frank":["10.0.0.18:4040"]}]
205
206 =head1 PART 1: Passing Messages Between Processes
207
208 =head2 The Receiver
209
210 Lets split the previous example up into two programs: one that contains
211 the sender and one for the receiver. First the receiver application, in
212 full:
213
214 use AnyEvent;
215 use AnyEvent::MP;
216 use AnyEvent::MP::Global;
217
218 configure nodeid => "eg_receiver", binds => ["*:4040"];
219
220 my $port = port;
221
222 grp_reg eg_receivers => $port;
223
224 rcv $port, test => sub {
225 my ($data, $reply_port) = @_;
226
227 print "Received data: " . $data . "\n";
228 };
229
230 AnyEvent->condvar->recv;
231
232 =head3 AnyEvent::MP::Global
233
234 Now, that wasn't too bad, was it? OK, let's step through the new functions
235 and modules that have been used.
236
237 For starters, there is now an additional module being
238 used: L<AnyEvent::MP::Global>. This module provides us with a I<global
239 registry>, which lets us register ports in groups that are visible on all
240 I<nodes> in a network.
241
242 What is this useful for? Well, the I<port IDs> are random-looking strings,
243 assigned by L<AnyEvent::MP>. We cannot know those I<port IDs> in advance,
244 so we don't know which I<port ID> to send messages to, especially when the
245 message is to be passed between different I<nodes> (or UNIX processes). To
246 find the right I<port> of another I<node> in the network we will need
247 to communicate this somehow to the sender. And exactly that is what
248 L<AnyEvent::MP::Global> provides.
249
250 Especially in larger, more anonymous networks this is handy: imagine you
251 have a few database backends, a few web front-ends and some processing
252 distributed over a number of hosts: all of these would simply register
253 themselves in the appropriate group, and your web front-ends can start to
254 find some database backend.
255
256 =head3 C<configure> and Joining and Maintaining the Network
257
258 Now, let's have a look at the new function, C<configure>:
259
260 configure nodeid => "eg_receiver", binds => ["*:4040"];
261
262 Before we are able to send messages to other nodes we have to initialise
263 ourself to become a "distributed node". Initialising a node means naming
264 the node, optionally binding some TCP listeners so that other nodes can
265 contact it and connecting to a predefined set of seed addresses so the
266 node can discover the existing network - and the existing network can
267 discover the node!
268
269 All of this (and more) can be passed to the C<configure> function - later
270 we will see how we can do all this without even passing anything to
271 C<configure>!
272
273 The first parameter, C<nodeid>, specified the node ID (in this case
274 C<eg_receiver> - the default is to use the node name of the current host,
275 but for this example we want to be able to run many nodes on the same
276 machine). Node IDs need to be unique within the network and can be almost
277 any string - if you don't care, you can specify a node ID of C<anon/>
278 which will then be replaced by a random node name.
279
280 The second parameter, C<binds>, specifies a list of C<address:port> pairs
281 to bind TCP listeners on. The special "address" of C<*> means to bind on
282 every local IP address (this might not work on every OS, so it should not
283 be used unless you know it works).
284
285 The reason to bind on a TCP port is not just that other nodes can connect
286 to us: if no binds are specified, the node will still bind on a dynamic
287 port on all local addresses - but in this case we won't know the port, and
288 cannot tell other nodes to connect to it as seed node.
289
290 A I<seed> is a (fixed) TCP address of some other node in the network. To
291 explain the need for seeds we have to look at the topology of a typical
292 L<AnyEvent::MP> network. The topology is called a I<fully connected mesh>,
293 here an example with 4 nodes:
294
295 N1--N2
296 | \/ |
297 | /\ |
298 N3--N4
299
300 Now imagine another node - C<N5> - wants to connect itself to that network:
301
302 N1--N2
303 | \/ | N5
304 | /\ |
305 N3--N4
306
307 The new node needs to know the I<binds> of all nodes already
308 connected. Exactly this is what the I<seeds> are for: Let's assume that
309 the new node (C<N5>) uses the TCP address of the node C<N2> as seed. This
310 causes it to connect to C<N2>:
311
312 N1--N2____
313 | \/ | N5
314 | /\ |
315 N3--N4
316
317 C<N2> then tells C<N5> about the I<binds> of the other nodes it is
318 connected to, and C<N5> creates the rest of the connections:
319
320 /--------\
321 N1--N2____|
322 | \/ | N5
323 | /\ | /|
324 N3--N4--- |
325 \________/
326
327 All done: C<N5> is now happily connected to the rest of the network.
328
329 Apart form the obvious function - joining the network - seed nodes fulfill
330 another very important function: the connections created by connecting
331 to seed nodes are used to keep the network together - by trying to keep
332 connections to all seed nodes active, the network ensures that it will not
333 split into multiple networks without connection to each other.
334
335 This means that the graph created by all seed node connections must span
336 the whole network, in some way.
337
338 There are many ways of doing this - the most simple is probably to use
339 a single set of one or more seednodes as seednodes for all nodes in the
340 network - this creates a "hub" of seednodes that connect to each other,
341 and "leaf" nodes that connect to the nodes in the hub, keeping everything
342 together.
343
344 The process of joining a network takes time, during which the node is
345 already running. This also means it takes time until the node is fully
346 connected, and global groups and other information is available. The best
347 way to deal with this is to either retry regularly until you found the
348 resource you were looking for, or to only start services on demand after a
349 node has become available.
350
351 =head3 Registering the Receiver
352
353 Coming back to our example, we have now introduced the basic purpose of
354 L<AnyEvent::MP::Global> and C<configure> and its use of profiles. We
355 also set up our profiles for later use and now we will finally continue
356 talking about the receiver.
357
358 Let's look at the next line(s):
359
360 my $port = port;
361 grp_reg eg_receivers => $port;
362
363 The C<port> function has already been discussed. It simply creates a new
364 I<port> and returns the I<port ID>. The C<grp_reg> function, however, is
365 new: The first argument is the name of a I<global group>, and the second
366 argument is the I<port ID> to register in that group.
367
368 You can choose the name of such a I<global group> freely (prefixing your
369 package name is I<highly recommended> however and might be enforce din
370 future versions!). The purpose of such a group is to store a set of port
371 IDs. This set is made available throughout the L<AnyEvent::MP> network,
372 so that each node can see which ports belong to that group.
373
374 Later we will see how the sender looks for the ports in this global
375 group to send messages to them.
376
377 The last step in the example is to set up a receiver callback for those
378 messages, just as was discussed in the first example. We again match
379 for the tag C<test>. The difference is that this time we don't exit the
380 application after receiving the first message. Instead we continue to wait
381 for new messages indefinitely.
382
383 =head2 The Sender
384
385 Ok, now let's take a look at the sender code:
386
387 use AnyEvent;
388 use AnyEvent::MP;
389 use AnyEvent::MP::Global;
390
391 configure nodeid => "eg_sender", seeds => ["*:4040"];
392
393 my $find_timer =
394 AnyEvent->timer (after => 0, interval => 1, cb => sub {
395 my $ports = grp_get "eg_receivers"
396 or return;
397
398 snd $_, test => time
399 for @$ports;
400 });
401
402 AnyEvent->condvar->recv;
403
404 It's even less code. The C<configure> serves the same purpose as in the
405 receiver, but instead of specifying binds we specify a list of seeds -
406 which happens to be the same as the binds used by the receiver, which
407 becomes our seed node.
408
409 Next we set up a timer that repeatedly (every second) calls this chunk of
410 code:
411
412 my $ports = grp_get "eg_receivers"
413 or return;
414
415 snd $_, test => time
416 for @$ports;
417
418 The only new function here is the C<grp_get> function of
419 L<AnyEvent::MP::Global>. It searches in the global group named
420 C<eg_receivers> for ports. If none are found, it returns C<undef>, which
421 makes our code return instantly and wait for the next round, as nobody is
422 interested in our message.
423
424 As soon as the receiver application has connected and the information
425 about the newly added port in the receiver has propagated to the sender
426 node, C<grp_get> returns an array reference that contains the I<port ID> of
427 the receiver I<port(s)>.
428
429 We then just send a message with a tag and the current time to every
430 I<port> in the global group.
431
432 =head3 Splitting Network Configuration and Application Code
433
434 OK, so far, this works. In the real world, however, the person configuring
435 your application to run on a specific network (the end user or network
436 administrator) is often different to the person coding the application.
437
438 Or to put it differently: the arguments passed to configure are usually
439 provided not by the programmer, but by whoever is deploying the program.
440
441 To make this easy, AnyEvent::MP supports a simple configuration database,
442 using profiles, which can be managed using the F<aemp> command-line
443 utility (yes, this section is about the advanced tinkering we mentioned
444 before).
445
446 When you change both programs above to simply call
447
448 configure;
449
450 then AnyEvent::MP tries to look up a profile using the current node name
451 in its configuration database, falling back to some global default.
452
453 You can run "generic" nodes using the F<aemp> utility as well, and we will
454 exploit this in the following way: we configure a profile "seed" and run
455 a node using it, whose sole purpose is to be a seed node for our example
456 programs.
457
458 We bind the seed node to port 4040 on all interfaces:
459
460 aemp profile seed binds "*:4040"
461
462 And we configure all nodes to use this as seed node (this only works when
463 running on the same host, for multiple machines you would provide the IP
464 address or hostname of the node running the seed), and use a random name
465 (because we want to start multiple nodes on the same host):
466
467 aemp seeds "*:4040" nodeid anon/
468
469 Then we run the seed node:
470
471 aemp run profile seed
472
473 After that, we can start as many other nodes as we want, and they will all
474 use our generic seed node to discover each other.
475
476 In fact, starting many receivers nicely illustrates that the time sender
477 can have multiple receivers.
478
479 That's all for now - next we will teach you about monitoring by writing a
480 simple chat client and server :)
481
482 =head1 PART 2: Monitoring, Supervising, Exception Handling and Recovery
483
484 That's a mouthful, so what does it mean? Our previous example is what one
485 could call "very loosely coupled" - the sender doesn't care about whether
486 there are any receivers, and the receivers do not care if there is any
487 sender.
488
489 This can work fine for simple services, but most real-world applications
490 want to ensure that the side they are expecting to be there is actually
491 there. Going one step further: most bigger real-world applications even
492 want to ensure that if some component is missing, or has crashed, it will
493 still be there, by recovering and restarting the service.
494
495 AnyEvent::MP supports this by catching exceptions and network problems,
496 and notifying interested parties of this.
497
498 =head2 Exceptions, Port Context, Network Errors and Monitors
499
500 =head3 Exceptions
501
502 Exceptions are handled on a per-port basis: receive callbacks are executed
503 in a special context, the so-called I<port-context>: code that throws an
504 otherwise uncaught exception will cause the port to be C<kil>led. Killed
505 ports are destroyed automatically (killing ports is the only way to free
506 ports, incidentally).
507
508 Ports can be monitored, even from a different host, and when a port is
509 killed any entity monitoring it will be notified.
510
511 Here is a simple example:
512
513 use AnyEvent::MP;
514
515 # create a port, it always dies
516 my $port = port { die "oops" };
517
518 # monitor it
519 mon $port, sub {
520 warn "$port was killed (with reason @_)";
521 };
522
523 # now send it some message, causing it to die:
524 snd $port;
525
526 It first creates a port whose only action is to throw an exception,
527 and the monitors it with the C<mon> function. Afterwards it sends it a
528 message, causing it to die and call the monitoring callback:
529
530 anon/6WmIpj.a was killed (with reason die oops at xxx line 5.) at xxx line 9.
531
532 The callback was actually passed two arguments: C<die> (to indicate it did
533 throw an exception as opposed to, say, a network error) and the exception
534 message itself.
535
536 What happens when a port is killed before we have a chance to monitor
537 it? Granted, this is highly unlikely in our example, but when you program
538 in a network this can easily happen due to races between nodes.
539
540 use AnyEvent::MP;
541
542 my $port = port { die "oops" };
543
544 snd $port;
545
546 mon $port, sub {
547 warn "$port was killed (with reason @_)";
548 };
549
550 This time we will get something like:
551
552 anon/zpX.a was killed (with reason no_such_port cannot monitor nonexistent port)
553
554 Since the port was already gone, the kill reason is now C<no_such_port>
555 with some descriptive (we hope) error message.
556
557 In fact, the kill reason is usually some identifier as first argument
558 and a human-readable error message as second argument, but can be about
559 anything (it's a list) or even nothing - which is called a "normal" kill.
560
561 You can kill ports manually using the C<kil> function, which will be
562 treated like an error when any reason is specified:
563
564 kil $port, custom_error => "don't like your steenking face";
565
566 And a clean kill without any reason arguments:
567
568 kil $port;
569
570 By now you probably wonder what this "normal" kill business is: A common
571 idiom is to not specify a callback to C<mon>, but another port, such as
572 C<$SELF>:
573
574 mon $port, $SELF;
575
576 This basically means "monitor $port and kill me when it crashes". And a
577 "normal" kill does not count as a crash. This way you can easily link
578 ports together and make them crash together on errors (but allow you to
579 remove a port silently).
580
581 =head3 Port Context
582
583 When code runs in an environment where C<$SELF> contains its own port ID
584 and exceptions will be caught, it is said to run in a port context.
585
586 Since AnyEvent::MP is event-based, it is not uncommon to register
587 callbacks from C<rcv> handlers. As example, assume that the port receive
588 handler wants to C<die> a second later, using C<after>:
589
590 my $port = port {
591 after 1, sub { die "oops" };
592 };
593
594 Then you will find it does not work - when the after callback is executed,
595 it does not run in port context anymore, so exceptions will not be caught.
596
597 For these cases, AnyEvent::MP exports a special "closure constructor"
598 called C<psub>, which works just like perls built-in C<sub>:
599
600 my $port = port {
601 after 1, psub { die "oops" };
602 };
603
604 C<psub> stores C<$SELF> and returns a code reference. When the code
605 reference is invoked, it will run the code block within the context of
606 that port, so exception handling once more works as expected.
607
608 There is also a way to temporarily execute code in the context of some
609 port, namely C<peval>:
610
611 peval $port, sub {
612 # die'ing here will kil $port
613 };
614
615 The C<peval> function temporarily replaces C<$SELF> by the given C<$port>
616 and then executes the given sub in a port context.
617
618 =head3 Network Errors and the AEMP Guarantee
619
620 I mentioned another important source of monitoring failures: network
621 problems. When a node loses connection to another node, it will invoke all
622 monitoring actions as if the port was killed, even if it is possible that
623 the port still lives happily on another node (not being able to talk to a
624 node means we have no clue what's going on with it, it could be crashed,
625 but also still running without knowing we lost the connection).
626
627 So another way to view monitors is "notify me when some of my messages
628 couldn't be delivered". AEMP has a guarantee about message delivery to a
629 port: After starting a monitor, any message sent to a port will either
630 be delivered, or, when it is lost, any further messages will also be lost
631 until the monitoring action is invoked. After that, further messages
632 I<might> get delivered again.
633
634 This doesn't sound like a very big guarantee, but it is kind of the best
635 you can get while staying sane: Specifically, it means that there will
636 be no "holes" in the message sequence: all messages sent are delivered
637 in order, without any missing in between, and when some were lost, you
638 I<will> be notified of that, so you can take recovery action.
639
640 =head3 Supervising
641
642 Ok, so what is this crashing-everything-stuff going to make applications
643 I<more> stable? Well in fact, the goal is not really to make them more
644 stable, but to make them more resilient against actual errors and
645 crashes. And this is not done by crashing I<everything>, but by crashing
646 everything except a supervisor.
647
648 A supervisor is simply some code that ensures that an application (or a
649 part of it) is running, and if it crashes, is restarted properly.
650
651 To show how to do all this we will create a simple chat server that can
652 handle many chat clients. Both server and clients can be killed and
653 restarted, and even crash, to some extent.
654
655 =head2 Chatting, the Resilient Way
656
657 Without further ado, here is the chat server (to run it, we assume the
658 set-up explained earlier, with a separate F<aemp run> seed node):
659
660 use common::sense;
661 use AnyEvent::MP;
662 use AnyEvent::MP::Global;
663
664 configure;
665
666 my %clients;
667
668 sub msg {
669 print "relaying: $_[0]\n";
670 snd $_, $_[0]
671 for values %clients;
672 }
673
674 our $server = port;
675
676 rcv $server, join => sub {
677 my ($client, $nick) = @_;
678
679 $clients{$client} = $client;
680
681 mon $client, sub {
682 delete $clients{$client};
683 msg "$nick (quits, @_)";
684 };
685 msg "$nick (joins)";
686 };
687
688 rcv $server, privmsg => sub {
689 my ($nick, $msg) = @_;
690 msg "$nick: $msg";
691 };
692
693 grp_reg eg_chat_server => $server;
694
695 warn "server ready.\n";
696
697 AnyEvent->condvar->recv;
698
699 Looks like a lot, but it is actually quite simple: after your usual
700 preamble (this time we use common sense), we define a helper function that
701 sends some message to every registered chat client:
702
703 sub msg {
704 print "relaying: $_[0]\n";
705 snd $_, $_[0]
706 for values %clients;
707 }
708
709 The clients are stored in the hash C<%client>. Then we define a server
710 port and install two receivers on it, C<join>, which is sent by clients
711 to join the chat, and C<privmsg>, that clients use to send actual chat
712 messages.
713
714 C<join> is most complicated. It expects the client port and the nickname
715 to be passed in the message, and registers the client in C<%clients>.
716
717 rcv $server, join => sub {
718 my ($client, $nick) = @_;
719
720 $clients{$client} = $client;
721
722 The next step is to monitor the client. The monitoring action removes the
723 client and sends a quit message with the error to all remaining clients.
724
725 mon $client, sub {
726 delete $clients{$client};
727 msg "$nick (quits, @_)";
728 };
729
730 And finally, it creates a join message and sends it to all clients.
731
732 msg "$nick (joins)";
733 };
734
735 The C<privmsg> callback simply broadcasts the message to all clients:
736
737 rcv $server, privmsg => sub {
738 my ($nick, $msg) = @_;
739 msg "$nick: $msg";
740 };
741
742 And finally, the server registers itself in the server group, so that
743 clients can find it:
744
745 grp_reg eg_chat_server => $server;
746
747 Well, well... and where is this supervisor stuff? Well... we cheated,
748 it's not there. To not overcomplicate the example, we only put it into
749 the..... CLIENT!
750
751 =head3 The Client, and a Supervisor!
752
753 Again, here is the client, including supervisor, which makes it a bit
754 longer:
755
756 use common::sense;
757 use AnyEvent::MP;
758 use AnyEvent::MP::Global;
759
760 my $nick = shift;
761
762 configure;
763
764 my ($client, $server);
765
766 sub server_connect {
767 my $servernodes = grp_get "eg_chat_server"
768 or return after 1, \&server_connect;
769
770 print "\rconnecting...\n";
771
772 $client = port { print "\r \r@_\n> " };
773 mon $client, sub {
774 print "\rdisconnected @_\n";
775 &server_connect;
776 };
777
778 $server = $servernodes->[0];
779 snd $server, join => $client, $nick;
780 mon $server, $client;
781 }
782
783 server_connect;
784
785 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
786 chomp (my $line = <STDIN>);
787 print "> ";
788 snd $server, privmsg => $nick, $line
789 if $server;
790 });
791
792 $| = 1;
793 print "> ";
794 AnyEvent->condvar->recv;
795
796 The first thing the client does is to store the nick name (which is
797 expected as the only command line argument) in C<$nick>, for further
798 usage.
799
800 The next relevant thing is... finally... the supervisor:
801
802 sub server_connect {
803 my $servernodes = grp_get "eg_chat_server"
804 or return after 1, \&server_connect;
805
806 This looks up the server in the C<eg_chat_server> global group. If it
807 cannot find it (which is likely when the node is just starting up),
808 it will wait a second and then retry. This "wait a bit and retry"
809 is an important pattern, as distributed programming means lots of
810 things are going on asynchronously. In practise, one should use a more
811 intelligent algorithm, to possibly warn after an excessive number of
812 retries. Hopefully future versions of AnyEvent::MP will offer some
813 predefined supervisors, for now you will have to code it on your own.
814
815 Next it creates a local port for the server to send messages to, and
816 monitors it. When the port is killed, it will print "disconnected" and
817 tell the supervisor function to retry again.
818
819 $client = port { print "\r \r@_\n> " };
820 mon $client, sub {
821 print "\rdisconnected @_\n";
822 &server_connect;
823 };
824
825 Then everything is ready: the client will send a C<join> message with it's
826 local port to the server, and start monitoring it:
827
828 $server = $servernodes->[0];
829 snd $server, join => $client, $nick;
830 mon $server, $client;
831 }
832
833 The monitor will ensure that if the server crashes or goes away, the
834 client will be killed as well. This tells the user that the client was
835 disconnected, and will then start to connect the server again.
836
837 The rest of the program deals with the boring details of actually invoking
838 the supervisor function to start the whole client process and handle the
839 actual terminal input, sending it to the server.
840
841 You should now try to start the server and one or more clients in different
842 terminal windows (and the seed node):
843
844 perl eg/chat_client nick1
845 perl eg/chat_client nick2
846 perl eg/chat_server
847 aemp run profile seed
848
849 And then you can experiment with chatting, killing one or more clients, or
850 stopping and restarting the server, to see the monitoring in action.
851
852 The crucial point you should understand from this example is that
853 monitoring is usually symmetric: when you monitor some other port,
854 potentially on another node, that other port usually should monitor you,
855 too, so when the connection dies, both ports get killed, or at least both
856 sides can take corrective action. Exceptions are "servers" that serve
857 multiple clients at once and might only wish to clean up, and supervisors,
858 who of course should not normally get killed (unless they, too, have a
859 supervisor).
860
861 If you often think in object-oriented terms, then treat a port as an
862 object, C<port> is the constructor, the receive callbacks set by C<rcv>
863 act as methods, the C<kil> function becomes the explicit destructor and
864 C<mon> installs a destructor hook. Unlike conventional object oriented
865 programming, it can make sense to exchange ports more freely (for example,
866 to monitor one port from another).
867
868 There is ample room for improvement: the server should probably remember
869 the nickname in the C<join> handler instead of expecting it in every chat
870 message, it should probably monitor itself, and the client should not try
871 to send any messages unless a server is actually connected.
872
873 =head1 PART 3: TIMTOWTDI: Virtual Connections
874
875 The chat system developed in the previous sections is very "traditional"
876 in a way: you start some server(s) and some clients statically and they
877 start talking to each other.
878
879 Sometimes applications work more like "services": They can run on almost
880 any node and talks to itself on other nodes. The L<AnyEvent::MP::Global>
881 service for example monitors nodes joining the network and starts itself
882 automatically on other nodes (if it isn't running already).
883
884 A good way to design such applications is to put them into a module and
885 create "virtual connections" to other nodes - we call this the "bridge
886 head" method, because you start by creating a remote port (the bridge
887 head) and from that you start to bootstrap your application.
888
889 Since that sounds rather theoretical, let's redesign the chat server and
890 client using this design method.
891
892 Here is the server:
893
894 use common::sense;
895 use AnyEvent::MP;
896 use AnyEvent::MP::Global;
897
898 configure;
899
900 grp_reg eg_chat_server2 => $NODE;
901
902 my %clients;
903
904 sub msg {
905 print "relaying: $_[0]\n";
906 snd $_, $_[0]
907 for values %clients;
908 }
909
910 sub client_connect {
911 my ($client, $nick) = @_;
912
913 mon $client;
914 mon $client, sub {
915 delete $clients{$client};
916 msg "$nick (quits, @_)";
917 };
918
919 $clients{$client} = $client;
920
921 msg "$nick (joins)";
922
923 rcv $SELF, sub { msg "$nick: $_[0]" };
924 }
925
926 warn "server ready.\n";
927
928 AnyEvent->condvar->recv;
929
930 It starts out not much different then the previous example, except that
931 this time, we register the node port in the global group and not any port
932 we created - the clients only want to know which node the server should be
933 running on. In fact, they could also use some kind of election mechanism,
934 to find the node with lowest load or something like that.
935
936 The more interesting change is that indeed no server port is created -
937 the server consists only of code, and "does" nothing by itself. All it
938 does is define a function C<client_connect>, which expects a client port
939 and a nick name as arguments. It then monitors the client port and binds
940 a receive callback on C<$SELF>, which expects messages that in turn are
941 broadcast to all clients.
942
943 The two C<mon> calls are a bit tricky - the first C<mon> is a shorthand
944 for C<mon $client, $SELF>. The second does the normal "client has gone
945 away" clean-up action. Both could actually be rolled into one C<mon>
946 action.
947
948 C<$SELF> is a good hint that something interesting is going on. And
949 indeed, when looking at the client code, there is a new function,
950 C<spawn>:
951
952 use common::sense;
953 use AnyEvent::MP;
954 use AnyEvent::MP::Global;
955
956 my $nick = shift;
957
958 configure;
959
960 $| = 1;
961
962 my $port = port;
963
964 my ($client, $server);
965
966 sub server_connect {
967 my $servernodes = grp_get "eg_chat_server2"
968 or return after 1, \&server_connect;
969
970 print "\rconnecting...\n";
971
972 $client = port { print "\r \r@_\n> " };
973 mon $client, sub {
974 print "\rdisconnected @_\n";
975 &server_connect;
976 };
977
978 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
979 mon $server, $client;
980 }
981
982 server_connect;
983
984 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
985 chomp (my $line = <STDIN>);
986 print "> ";
987 snd $server, $line
988 if $server;
989 });
990
991 print "> ";
992 AnyEvent->condvar->recv;
993
994 The client is quite similar to the previous one, but instead of contacting
995 the server I<port> (which no longer exists), it C<spawn>s (creates) a new
996 the server I<port on node>:
997
998 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
999 mon $server, $client;
1000
1001 And of course the first thing after creating it is monitoring it.
1002
1003 The C<spawn> function creates a new port on a remote node and returns
1004 its port ID. After creating the port it calls a function on the remote
1005 node, passing any remaining arguments to it, and - most importantly -
1006 executes the function within the context of the new port, so it can be
1007 manipulated by referring to C<$SELF>. The init function can reside in a
1008 module (actually it normally I<should> reside in a module) - AnyEvent::MP
1009 will automatically load the module if the function isn't defined.
1010
1011 The C<spawn> function returns immediately, which means you can instantly
1012 send messages to the port, long before the remote node has even heard
1013 of our request to create a port on it. In fact, the remote node might
1014 not even be running. Despite these troubling facts, everything should
1015 work just fine: if the node isn't running (or the init function throws an
1016 exception), then the monitor will trigger because the port doesn't exist.
1017
1018 If the spawn message gets delivered, but the monitoring message is not
1019 because of network problems (extremely unlikely, but monitoring, after
1020 all, is implemented by passing a message, and messages can get lost), then
1021 this connection loss will eventually trigger the monitoring action. On the
1022 remote node (which in return monitors the client) the port will also be
1023 cleaned up on connection loss. When the remote node comes up again and our
1024 monitoring message can be delivered, it will instantly fail because the
1025 port has been cleaned up in the meantime.
1026
1027 If your head is spinning by now, that's fine - just keep in mind, after
1028 creating a port, monitor it on the local node, and monitor "the other
1029 side" from the remote node, and all will be cleaned up just fine.
1030
1031 =head2 Services
1032
1033 Above it was mentioned that C<spawn> automatically loads modules, and this
1034 can be exploited in various ways.
1035
1036 Assume for a moment you put the server into a file called
1037 F<mymod/chatserver.pm> reachable from the current directory. Then you
1038 could run a node there with:
1039
1040 aemp run
1041
1042 The other nodes could C<spawn> the server by using
1043 C<mymod::chatserver::client_connect> as init function.
1044
1045 Likewise, when you have some service that starts automatically (similar to
1046 AnyEvent::MP::Global), then you can configure this service statically:
1047
1048 aemp profile mysrvnode services mymod::service::
1049 aemp run profile mysrvnode
1050
1051 And the module will automatically be loaded in the node, as specifying a
1052 module name (with C<::>-suffix) will simply load the module, which is then
1053 free to do whatever it wants.
1054
1055 Of course, you can also do it in the much more standard way by writing
1056 a module (e.g. C<BK::Backend::IRC>), installing it as part of a module
1057 distribution and then configure nodes, for example, if I want to run the
1058 Bummskraut IRC backend on a machine named "ruth", I could do this:
1059
1060 aemp profile ruth addservice BK::Backend::IRC::
1061
1062 And any F<aemp run> on that host will automatically have the Bummskraut
1063 IRC backend running.
1064
1065 That's plenty of possibilities you can use - it's all up to you how you
1066 structure your application.
1067
1068 =head1 PART 4: Coro::MP - selective receive
1069
1070 Not all problems lend themselves naturally to an event-based solution:
1071 sometimes things are easier if you can decide in what order you want to
1072 receive messages, irregardless of the order in which they were sent.
1073
1074 In these cases, L<Coro::MP> can provide a nice solution: instead of
1075 registering callbacks for each message type, C<Coro::MP> attached a
1076 (coro-) thread to a port. The thread can then opt to selectively receive
1077 messages it is interested in. Other messages are not lost, but queued, and
1078 can be received at a later time.
1079
1080 The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate
1081 module. It is, however, tightly integrated into C<AnyEvent::MP> - the
1082 ports it creates are fully compatible to C<AnyEvent::MP> ports.
1083
1084 In fact, C<Coro::MP> is more of an extension than a separate module: all
1085 functions exported by C<AnyEvent::MP> are exported by it as well.
1086
1087 To illustrate how programing with C<Coro::MP> looks like, consider the
1088 following (slightly contrived) example: Let's implement a server that
1089 accepts a C<< (write_file =>, $port, $path) >> message with a (source)
1090 port and a filename, followed by as many C<< (data => $port, $data) >>
1091 messages as required to fill the file, followed by an empty C<< (data =>
1092 $port) >> message.
1093
1094 The server only writes a single file at a time, other requests will stay
1095 in the queue until the current file has been finished.
1096
1097 Here is an example implementation that uses L<Coro::AIO> and largely
1098 ignores error handling:
1099
1100 my $ioserver = port_async {
1101 while () {
1102 my ($tag, $port, $path) = get_cond;
1103
1104 $tag eq "write_file"
1105 or die "only write_file messages expected";
1106
1107 my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1108 or die "$path: $!";
1109
1110 while () {
1111 my (undef, undef, $data) = get_cond {
1112 $_[0] eq "data" && $_[1] eq $port
1113 } 5
1114 or die "timeout waiting for data message from $port\n";
1115
1116 length $data or last;
1117
1118 aio_write $fh, undef, undef, $data, 0;
1119 };
1120 }
1121 };
1122
1123 mon $ioserver, sub {
1124 warn "ioserver was killed: @_\n";
1125 };
1126
1127 Let's go through it part by part.
1128
1129 my $ioserver = port_async {
1130
1131 Ports can be created by attaching a thread to an existing port via
1132 C<rcv_async>, or as here by calling C<port_async> with the code to execute
1133 as a thread. The C<async> component comes from the fact that threads are
1134 created using the C<Coro::async> function.
1135
1136 The thread runs in a normal port context (so C<$SELF> is set). In
1137 addition, when the thread returns, it will be C<kil> I<normally>, i.e.
1138 without a reason argument.
1139
1140 while () {
1141 my ($tag, $port, $path) = get_cond;
1142 or die "only write_file messages expected";
1143
1144 The thread is supposed to serve many file writes, which is why it executes
1145 in a loop. The first thing it does is fetch the next message, using
1146 C<get_cond>, the "conditional message get". Without a condition, it simply
1147 fetches the next message from the queue, which I<must> be a C<write_file>
1148 message.
1149
1150 The message contains the C<$path> to the file, which is then created:
1151
1152 my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1153 or die "$path: $!";
1154
1155 Then we enter a loop again, to serve as many C<data> messages as
1156 necessary:
1157
1158 while () {
1159 my (undef, undef, $data) = get_cond {
1160 $_[0] eq "data" && $_[1] eq $port
1161 } 5
1162 or die "timeout waiting for data message from $port\n";
1163
1164 This time, the condition is not empty, but instead a code block: similarly
1165 to grep, the code block will be called with C<@_> set to each message in
1166 the queue, and it has to return whether it wants to receive the message or
1167 not.
1168
1169 In this case we are interested in C<data> messages (C<< $_[0] eq "data"
1170 >>), whose first element is the source port (C<< $_[1] eq $port >>).
1171
1172 The condition must be this strict, as it is possible to receive both
1173 C<write_file> messages and C<data> messages from other ports while we
1174 handle the file writing.
1175
1176 The lone C<5> at the end is a timeout - when no matching message is
1177 received within C<5> seconds, we assume an error and C<die>.
1178
1179 When an empty C<data> message is received we are done and can close the
1180 file (which is done automatically as C<$fh> goes out of scope):
1181
1182 length $data or last;
1183
1184 Otherwise we need to write the data:
1185
1186 aio_write $fh, undef, undef, $data, 0;
1187
1188 That's basically it. Note that every process should have some kind of
1189 supervisor. In our case, the supervisor simply prints any error message:
1190
1191 mon $ioserver, sub {
1192 warn "ioserver was killed: @_\n";
1193 };
1194
1195 Here is a usage example:
1196
1197 port_async {
1198 snd $ioserver, write_file => $SELF, "/tmp/unsafe";
1199 snd $ioserver, data => $SELF, "abc\n";
1200 snd $ioserver, data => $SELF, "def\n";
1201 snd $ioserver, data => $SELF;
1202 };
1203
1204 The messages are sent without any flow control or acknowledgement (feel
1205 free to improve). Also, the source port does not actually need to be a
1206 port - any unique ID will do - but port identifiers happen to be a simple
1207 source of network-wide unique IDs.
1208
1209 Apart from C<get_cond> as seen above, there are other ways to receive
1210 messages. The C<write_file> message above could also selectively be
1211 received using a C<get> call:
1212
1213 my ($port, $path) = get "write_file";
1214
1215 This is simpler, but when some other code part sends an unexpected message
1216 to the C<$ioserver> it will stay in the queue forever. As a rule of thumb,
1217 every threaded port should have a "fetch next message unconditionally"
1218 somewhere, to avoid filling up the queue.
1219
1220 It is also possible to switch-like C<get_conds>:
1221
1222 get_cond {
1223 $_[0] eq "msg1" and return sub {
1224 my (undef, @msg1_data) = @_;
1225 ...;
1226 };
1227
1228 $_[0] eq "msg2" and return sub {
1229 my (undef, @msg2_data) = @_;
1230 ...;
1231 };
1232
1233 die "unexpected message $_[0] received";
1234 };
1235
1236 =head1 THE END
1237
1238 This is the end of this introduction, but hopefully not the end of
1239 your career as AEMP user. I hope the tutorial was enough to make the
1240 basic concepts clear. Keep in mind that distributed programming is not
1241 completely trivial, that AnyEvent::MP is still in it's infancy, and I hope
1242 it will be useful to create exciting new applications.
1243
1244 =head1 SEE ALSO
1245
1246 L<AnyEvent::MP>
1247
1248 L<AnyEvent::MP::Global>
1249
1250 L<Coro::MP>
1251
1252 L<AnyEvent>
1253
1254 =head1 AUTHOR
1255
1256 Robin Redeker <elmex@ta-sa.org>
1257 Marc Lehmann <schmorp@schmorp.de>
1258