ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Intro.pod
Revision: 1.45
Committed: Sun Mar 4 15:12:26 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.44: +4 -7 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 Message Passing for the Non-Blocked Mind
2
3 =head1 Introduction and Terminology
4
5 This is a tutorial about how to get the swing of the new L<AnyEvent::MP>
6 module, which allows programs to transparently pass messages within the
7 process and to other processes on the same or a different host.
8
9 What kind of messages? Basically a message here means a list of Perl
10 strings, numbers, hashes and arrays, anything that can be expressed as a
11 L<JSON> text (as JSON is the default serialiser in the protocol). Here are
12 two examples:
13
14 write_log => 1251555874, "action was successful.\n"
15 123, ["a", "b", "c"], { foo => "bar" }
16
17 When using L<AnyEvent::MP> it is customary to use a descriptive string as
18 first element of a message, that indicates the type of the message. This
19 element is called a I<tag> in L<AnyEvent::MP>, as some API functions
20 (C<rcv>) support matching it directly.
21
22 Supposedly you want to send a ping message with your current time to
23 somewhere, this is how such a message might look like (in Perl syntax):
24
25 ping => 1251381636
26
27 Now that we know what a message is, to which entities are those
28 messages being I<passed>? They are I<passed> to I<ports>. A I<port> is
29 a destination for messages but also a context to execute code: when
30 a runtime error occurs while executing code belonging to a port, the
31 exception will be raised on the port and can even travel to interested
32 parties on other nodes, which makes supervision of distributed processes
33 easy.
34
35 How do these ports relate to things you know? Each I<port> belongs
36 to a I<node>, and a I<node> is just the UNIX process that runs your
37 L<AnyEvent::MP> application.
38
39 Each I<node> is distinguished from other I<nodes> running on the same or
40 another host in a network by its I<node ID>. A I<node ID> is simply a
41 unique string chosen manually or assigned by L<AnyEvent::MP> in some way
42 (UNIX nodename, random string...).
43
44 Here is a diagram about how I<nodes>, I<ports> and UNIX processes relate
45 to each other. The setup consists of two nodes (more are of course
46 possible): Node C<A> (in UNIX process 7066) with the ports C<ABC> and
47 C<DEF>. And the node C<B> (in UNIX process 8321) with the ports C<FOO> and
48 C<BAR>.
49
50
51 |- PID: 7066 -| |- PID: 8321 -|
52 | | | |
53 | Node ID: A | | Node ID: B |
54 | | | |
55 | Port ABC =|= <----\ /-----> =|= Port FOO |
56 | | X | |
57 | Port DEF =|= <----/ \-----> =|= Port BAR |
58 | | | |
59 |-------------| |-------------|
60
61 The strings for the I<port IDs> here are just for illustrative
62 purposes: Even though I<ports> in L<AnyEvent::MP> are also identified by
63 strings, they can't be chosen manually and are assigned by the system
64 dynamically. These I<port IDs> are unique within a network and can also be
65 used to identify senders or as message tags for instance.
66
67 The next sections will explain the API of L<AnyEvent::MP> by going through
68 a few simple examples. Later some more complex idioms are introduced,
69 which are hopefully useful to solve some real world problems.
70
71 =head2 Passing Your First Message
72
73 As a start lets have a look at the messaging API. The following example
74 is just a demo to show the basic elements of message passing with
75 L<AnyEvent::MP>.
76
77 The example should print: C<Ending with: 123>, in a rather complicated
78 way, by passing some message to a port.
79
80 use AnyEvent;
81 use AnyEvent::MP;
82
83 my $end_cv = AnyEvent->condvar;
84
85 my $port = port;
86
87 rcv $port, test => sub {
88 my ($data) = @_;
89 $end_cv->send ($data);
90 };
91
92 snd $port, test => 123;
93
94 print "Ending with: " . $end_cv->recv . "\n";
95
96 It already uses most of the essential functions inside
97 L<AnyEvent::MP>: First there is the C<port> function which will create a
98 I<port> and will return it's I<port ID>, a simple string.
99
100 This I<port ID> can be used to send messages to the port and install
101 handlers to receive messages on the port. Since it is a simple string
102 it can be safely passed to other I<nodes> in the network when you want
103 to refer to that specific port (usually used for RPC, where you need
104 to tell the other end which I<port> to send the reply to - messages in
105 L<AnyEvent::MP> have a destination, but no source).
106
107 The next function is C<rcv>:
108
109 rcv $port, test => sub { ... };
110
111 It installs a receiver callback on the I<port> that specified as the first
112 argument (it only works for "local" ports, i.e. ports created on the same
113 node). The next argument, in this example C<test>, specifies a I<tag> to
114 match. This means that whenever a message with the first element being
115 the string C<test> is received, the callback is called with the remaining
116 parts of that message.
117
118 Messages can be sent with the C<snd> function, which is used like this in
119 the example above:
120
121 snd $port, test => 123;
122
123 This will send the message C<'test', 123> to the I<port> with the I<port
124 ID> stored in C<$port>. Since in this case the receiver has a I<tag> match
125 on C<test> it will call the callback with the first argument being the
126 number C<123>.
127
128 The callback is a typical AnyEvent idiom: the callback just passes
129 that number on to the I<condition variable> C<$end_cv> which will then
130 pass the value to the print. Condition variables are out of the scope
131 of this tutorial and not often used with ports, so please consult the
132 L<AnyEvent::Intro> about them.
133
134 Passing messages inside just one process is boring. Before we can move on
135 and do interprocess message passing we first have to make sure some things
136 have been set up correctly for our nodes to talk to each other.
137
138 =head2 System Requirements and System Setup
139
140 Before we can start with real IPC we have to make sure some things work on
141 your system.
142
143 First we have to setup a I<shared secret>: for two L<AnyEvent::MP>
144 I<nodes> to be able to communicate with each other over the network it is
145 necessary to setup the same I<shared secret> for both of them, so they can
146 prove their trustworthyness to each other.
147
148 The easiest way is to set this up is to use the F<aemp> utility:
149
150 aemp gensecret
151
152 This creates a F<$HOME/.perl-anyevent-mp> config file and generates a
153 random shared secret. You can copy this file to any other system and
154 then communicate over the network (via TCP) with it. You can also select
155 your own shared secret (F<aemp setsecret>) and for increased security
156 requirements you can even create (or configure) a TLS certificate (F<aemp
157 gencert>), causing connections to not just be securely authenticated, but
158 also to be encrypted and protected against tinkering.
159
160 Connections will only be successfully established when the I<nodes>
161 that want to connect to each other have the same I<shared secret> (or
162 successfully verify the TLS certificate of the other side, in which case
163 no shared secret is required).
164
165 B<If something does not work as expected, and for example tcpdump shows
166 that the connections are closed almost immediately, you should make sure
167 that F<~/.perl-anyevent-mp> is the same on all hosts/user accounts that
168 you try to connect with each other!>
169
170 Thats is all for now, you will find some more advanced fiddling with the
171 C<aemp> utility later.
172
173 =head2 Shooting the Trouble
174
175 Sometimes things go wrong, and AnyEvent::MP, being a professional module,
176 does not gratuitously spill out messages to your screen.
177
178 To help troubleshooting any issues, there are two environment variables
179 that you can set. The first, C<PERL_ANYEVENT_MP_WARNLEVEL> sets the
180 logging level. The default is C<5>, which means nothing much is
181 printed. You can increase it to C<8> or C<9> to get more verbose
182 output. This is example output when starting a node:
183
184 2009-08-31 19:51:50 <8> node anon/5RloFvvYL8jfSScXNL8EpX starting up.
185 2009-08-31 19:51:50 <7> starting global service.
186 2009-08-31 19:51:50 <9> 10.0.0.17:4040 connected as ruth
187 2009-08-31 19:51:50 <7> ruth is up ()
188 2009-08-31 19:51:50 <9> ruth told us it knows about {"doom":["10.0.0.5:45143"],"rain":["10.0.0.19:4040"],"anon/4SYrtJ3ft5l1C16w2hto3t":["10.0.0.1:45920","[2002:58c6:438b:20:21d:60ff:fee8:6e36]:35788","[fd00::a00:1]:37104"],"frank":["10.0.0.18:4040"]}.
189 2009-08-31 19:51:50 <9> connecting to doom with [10.0.0.5:45143]
190 2009-08-31 19:51:50 <9> connecting to anon/4SYrtJ3ft5l1C16w2hto3t with [10.0.0.1:45920 [2002:58c6:438b:20:21d:60ff:fee8:6e36]:35788 [fd00::a00:1]:37104]
191 2009-08-31 19:51:50 <9> ruth told us its addresses (10.0.0.17:4040).
192
193 A lot of info, but at least you can see that it does something.
194
195 The other environment variable that can be useful is
196 C<PERL_ANYEVENT_MP_TRACE>, which, when set to a true value, will cause
197 most messages that are sent or received to be printed. In the above
198 example you would see something like:
199
200 SND ruth <- ["addr",["10.0.0.1:49358","[2002:58c6:438b:20:21d:60ff:fee8:6e36]:58884","[fd00::a00:1]:45006"]]
201 RCV ruth -> ["","AnyEvent::MP::_spawn","20QA7cWubCLTWUhFgBKOx2.x","AnyEvent::MP::Global::connect",0,"ruth"]
202 RCV ruth -> ["","mon1","20QA7cWubCLTWUhFgBKOx2.x"]
203 RCV ruth -> ["20QA7cWubCLTWUhFgBKOx2.x","addr",["10.0.0.17:4040"]]
204 RCV ruth -> ["20QA7cWubCLTWUhFgBKOx2.x","nodes",{"doom":["10.0.0.5:45143"],"rain":["10.0.0.19:4040"],"anon/4SYrtJ3ft5l1C16w2hto3t":["10.0.0.1:45920","[2002:58c6:438b:20:21d:60ff:fee8:6e36]:35788","[fd00::a00:1]:37104"],"frank":["10.0.0.18:4040"]}]
205
206 =head1 PART 1: Passing Messages Between Processes
207
208 =head2 The Receiver
209
210 Lets split the previous example up into two programs: one that contains
211 the sender and one for the receiver. First the receiver application, in
212 full:
213
214 use AnyEvent;
215 use AnyEvent::MP;
216
217 configure nodeid => "eg_receiver/%u", binds => ["*:4040"];
218
219 my $port = port;
220 my $db_guard = db_reg eg_receivers => $port;
221
222 rcv $port, test => sub {
223 my ($data, $reply_port) = @_;
224
225 print "Received data: " . $data . "\n";
226 };
227
228 AnyEvent->condvar->recv;
229
230 =head3 AnyEvent::MP::Global
231
232 Now, that wasn't too bad, was it? OK, let's step through the new functions
233 and modules that have been used.
234
235 For starters, there is now an additional module being
236 used: L<AnyEvent::MP::Global>. This module provides us with a I<global
237 registry>, which lets us register ports in groups that are visible on all
238 I<nodes> in a network.
239
240 What is this useful for? Well, the I<port IDs> are random-looking strings,
241 assigned by L<AnyEvent::MP>. We cannot know those I<port IDs> in advance,
242 so we don't know which I<port ID> to send messages to, especially when the
243 message is to be passed between different I<nodes> (or UNIX processes). To
244 find the right I<port> of another I<node> in the network we will need
245 to communicate this somehow to the sender. And exactly that is what
246 L<AnyEvent::MP::Global> provides.
247
248 Especially in larger, more anonymous networks this is handy: imagine you
249 have a few database backends, a few web front-ends and some processing
250 distributed over a number of hosts: all of these would simply register
251 themselves in the appropriate group, and your web front-ends can start to
252 find some database backend.
253
254 =head3 C<configure> and Joining and Maintaining the Network
255
256 Now, let's have a look at the new function, C<configure>:
257
258 configure nodeid => "eg_receiver", binds => ["*:4040"];
259
260 Before we are able to send messages to other nodes we have to initialise
261 ourself to become a "distributed node". Initialising a node means naming
262 the node, optionally binding some TCP listeners so that other nodes can
263 contact it and connecting to a predefined set of seed addresses so the
264 node can discover the existing network - and the existing network can
265 discover the node!
266
267 All of this (and more) can be passed to the C<configure> function - later
268 we will see how we can do all this without even passing anything to
269 C<configure>!
270
271 The first parameter, C<nodeid>, specified the node ID (in this case
272 C<eg_receiver> - the default is to use the node name of the current host,
273 but for this example we want to be able to run many nodes on the same
274 machine). Node IDs need to be unique within the network and can be almost
275 any string - if you don't care, you can specify a node ID of C<anon/>
276 which will then be replaced by a random node name.
277
278 The second parameter, C<binds>, specifies a list of C<address:port> pairs
279 to bind TCP listeners on. The special "address" of C<*> means to bind on
280 every local IP address (this might not work on every OS, so it should not
281 be used unless you know it works).
282
283 The reason to bind on a TCP port is not just that other nodes can connect
284 to us: if no binds are specified, the node will still bind on a dynamic
285 port on all local addresses - but in this case we won't know the port, and
286 cannot tell other nodes to connect to it as seed node.
287
288 A I<seed> is a (fixed) TCP address of some other node in the network. To
289 explain the need for seeds we have to look at the topology of a typical
290 L<AnyEvent::MP> network. The topology is called a I<fully connected mesh>,
291 here an example with 4 nodes:
292
293 N1--N2
294 | \/ |
295 | /\ |
296 N3--N4
297
298 Now imagine another node - C<N5> - wants to connect itself to that network:
299
300 N1--N2
301 | \/ | N5
302 | /\ |
303 N3--N4
304
305 The new node needs to know the I<binds> of all nodes already
306 connected. Exactly this is what the I<seeds> are for: Let's assume that
307 the new node (C<N5>) uses the TCP address of the node C<N2> as seed. This
308 causes it to connect to C<N2>:
309
310 N1--N2____
311 | \/ | N5
312 | /\ |
313 N3--N4
314
315 C<N2> then tells C<N5> about the I<binds> of the other nodes it is
316 connected to, and C<N5> creates the rest of the connections:
317
318 /--------\
319 N1--N2____|
320 | \/ | N5
321 | /\ | /|
322 N3--N4--- |
323 \________/
324
325 All done: C<N5> is now happily connected to the rest of the network.
326
327 Apart form the obvious function - joining the network - seed nodes fulfill
328 another very important function: the connections created by connecting
329 to seed nodes are used to keep the network together - by trying to keep
330 connections to all seed nodes active, the network ensures that it will not
331 split into multiple networks without connection to each other.
332
333 This means that the graph created by all seed node connections must span
334 the whole network, in some way.
335
336 There are many ways of doing this - the most simple is probably to use
337 a single set of one or more seednodes as seednodes for all nodes in the
338 network - this creates a "hub" of seednodes that connect to each other,
339 and "leaf" nodes that connect to the nodes in the hub, keeping everything
340 together.
341
342 The process of joining a network takes time, during which the node is
343 already running. This also means it takes time until the node is fully
344 connected, and global groups and other information is available. The best
345 way to deal with this is to either retry regularly until you found the
346 resource you were looking for, or to only start services on demand after a
347 node has become available.
348
349 =head3 Registering the Receiver
350
351 Coming back to our example, we have now introduced the basic purpose of
352 L<AnyEvent::MP::Global> and C<configure> and its use of profiles. We
353 also set up our profiles for later use and now we will finally continue
354 talking about the receiver.
355
356 Let's look at the next line(s):
357
358 my $port = port;
359 my $db_guard = db_reg eg_receivers => $port;
360
361 The C<port> function has already been discussed. It simply creates a new
362 I<port> and returns the I<port ID>. The C<grp_reg> function, however, is
363 new: The first argument is the name of a I<global group>, and the second
364 argument is the I<port ID> to register in that group.
365
366 You can choose the name of such a I<global group> freely (prefixing your
367 package name is I<highly recommended> however and might be enforce din
368 future versions!). The purpose of such a group is to store a set of port
369 IDs. This set is made available throughout the L<AnyEvent::MP> network,
370 so that each node can see which ports belong to that group.
371
372 Later we will see how the sender looks for the ports in this global
373 group to send messages to them.
374
375 The last step in the example is to set up a receiver callback for those
376 messages, just as was discussed in the first example. We again match
377 for the tag C<test>. The difference is that this time we don't exit the
378 application after receiving the first message. Instead we continue to wait
379 for new messages indefinitely.
380
381 =head2 The Sender
382
383 Ok, now let's take a look at the sender code:
384
385 use AnyEvent;
386 use AnyEvent::MP;
387
388 configure nodeid => "eg_sender/%u", seeds => ["*:4040"];
389
390 my $find_timer =
391 AnyEvent->timer (after => 0, interval => 1, cb => sub {
392 my $ports = grp_get "eg_receivers"
393 or return;
394
395 snd $_, test => time
396 for @$ports;
397 });
398
399 AnyEvent->condvar->recv;
400
401 It's even less code. The C<configure> serves the same purpose as in the
402 receiver, but instead of specifying binds we specify a list of seeds -
403 which happens to be the same as the binds used by the receiver, which
404 becomes our seed node.
405
406 Next we set up a timer that repeatedly (every second) calls this chunk of
407 code:
408
409 my $ports = grp_get "eg_receivers"
410 or return;
411
412 snd $_, test => time
413 for @$ports;
414
415 The only new function here is the C<grp_get> function of
416 L<AnyEvent::MP::Global>. It searches in the global group named
417 C<eg_receivers> for ports. If none are found, it returns C<undef>, which
418 makes our code return instantly and wait for the next round, as nobody is
419 interested in our message.
420
421 As soon as the receiver application has connected and the information
422 about the newly added port in the receiver has propagated to the sender
423 node, C<grp_get> returns an array reference that contains the I<port ID> of
424 the receiver I<port(s)>.
425
426 We then just send a message with a tag and the current time to every
427 I<port> in the global group.
428
429 =head3 Splitting Network Configuration and Application Code
430
431 OK, so far, this works. In the real world, however, the person configuring
432 your application to run on a specific network (the end user or network
433 administrator) is often different to the person coding the application.
434
435 Or to put it differently: the arguments passed to configure are usually
436 provided not by the programmer, but by whoever is deploying the program.
437
438 To make this easy, AnyEvent::MP supports a simple configuration database,
439 using profiles, which can be managed using the F<aemp> command-line
440 utility (yes, this section is about the advanced tinkering we mentioned
441 before).
442
443 When you change both programs above to simply call
444
445 configure;
446
447 then AnyEvent::MP tries to look up a profile using the current node name
448 in its configuration database, falling back to some global default.
449
450 You can run "generic" nodes using the F<aemp> utility as well, and we will
451 exploit this in the following way: we configure a profile "seed" and run
452 a node using it, whose sole purpose is to be a seed node for our example
453 programs.
454
455 We bind the seed node to port 4040 on all interfaces:
456
457 aemp profile seed binds "*:4040"
458
459 And we configure all nodes to use this as seed node (this only works when
460 running on the same host, for multiple machines you would provide the IP
461 address or hostname of the node running the seed), and use a random name
462 (because we want to start multiple nodes on the same host):
463
464 aemp seeds "*:4040" nodeid anon/
465
466 Then we run the seed node:
467
468 aemp run profile seed
469
470 After that, we can start as many other nodes as we want, and they will all
471 use our generic seed node to discover each other.
472
473 In fact, starting many receivers nicely illustrates that the time sender
474 can have multiple receivers.
475
476 That's all for now - next we will teach you about monitoring by writing a
477 simple chat client and server :)
478
479 =head1 PART 2: Monitoring, Supervising, Exception Handling and Recovery
480
481 That's a mouthful, so what does it mean? Our previous example is what one
482 could call "very loosely coupled" - the sender doesn't care about whether
483 there are any receivers, and the receivers do not care if there is any
484 sender.
485
486 This can work fine for simple services, but most real-world applications
487 want to ensure that the side they are expecting to be there is actually
488 there. Going one step further: most bigger real-world applications even
489 want to ensure that if some component is missing, or has crashed, it will
490 still be there, by recovering and restarting the service.
491
492 AnyEvent::MP supports this by catching exceptions and network problems,
493 and notifying interested parties of this.
494
495 =head2 Exceptions, Port Context, Network Errors and Monitors
496
497 =head3 Exceptions
498
499 Exceptions are handled on a per-port basis: receive callbacks are executed
500 in a special context, the so-called I<port-context>: code that throws an
501 otherwise uncaught exception will cause the port to be C<kil>led. Killed
502 ports are destroyed automatically (killing ports is the only way to free
503 ports, incidentally).
504
505 Ports can be monitored, even from a different host, and when a port is
506 killed any entity monitoring it will be notified.
507
508 Here is a simple example:
509
510 use AnyEvent::MP;
511
512 # create a port, it always dies
513 my $port = port { die "oops" };
514
515 # monitor it
516 mon $port, sub {
517 warn "$port was killed (with reason @_)";
518 };
519
520 # now send it some message, causing it to die:
521 snd $port;
522
523 It first creates a port whose only action is to throw an exception,
524 and the monitors it with the C<mon> function. Afterwards it sends it a
525 message, causing it to die and call the monitoring callback:
526
527 anon/6WmIpj.a was killed (with reason die oops at xxx line 5.) at xxx line 9.
528
529 The callback was actually passed two arguments: C<die> (to indicate it did
530 throw an exception as opposed to, say, a network error) and the exception
531 message itself.
532
533 What happens when a port is killed before we have a chance to monitor
534 it? Granted, this is highly unlikely in our example, but when you program
535 in a network this can easily happen due to races between nodes.
536
537 use AnyEvent::MP;
538
539 my $port = port { die "oops" };
540
541 snd $port;
542
543 mon $port, sub {
544 warn "$port was killed (with reason @_)";
545 };
546
547 This time we will get something like:
548
549 anon/zpX.a was killed (with reason no_such_port cannot monitor nonexistent port)
550
551 Since the port was already gone, the kill reason is now C<no_such_port>
552 with some descriptive (we hope) error message.
553
554 In fact, the kill reason is usually some identifier as first argument
555 and a human-readable error message as second argument, but can be about
556 anything (it's a list) or even nothing - which is called a "normal" kill.
557
558 You can kill ports manually using the C<kil> function, which will be
559 treated like an error when any reason is specified:
560
561 kil $port, custom_error => "don't like your steenking face";
562
563 And a clean kill without any reason arguments:
564
565 kil $port;
566
567 By now you probably wonder what this "normal" kill business is: A common
568 idiom is to not specify a callback to C<mon>, but another port, such as
569 C<$SELF>:
570
571 mon $port, $SELF;
572
573 This basically means "monitor $port and kill me when it crashes". And a
574 "normal" kill does not count as a crash. This way you can easily link
575 ports together and make them crash together on errors (but allow you to
576 remove a port silently).
577
578 =head3 Port Context
579
580 When code runs in an environment where C<$SELF> contains its own port ID
581 and exceptions will be caught, it is said to run in a port context.
582
583 Since AnyEvent::MP is event-based, it is not uncommon to register
584 callbacks from C<rcv> handlers. As example, assume that the port receive
585 handler wants to C<die> a second later, using C<after>:
586
587 my $port = port {
588 after 1, sub { die "oops" };
589 };
590
591 Then you will find it does not work - when the after callback is executed,
592 it does not run in port context anymore, so exceptions will not be caught.
593
594 For these cases, AnyEvent::MP exports a special "closure constructor"
595 called C<psub>, which works just like perls built-in C<sub>:
596
597 my $port = port {
598 after 1, psub { die "oops" };
599 };
600
601 C<psub> stores C<$SELF> and returns a code reference. When the code
602 reference is invoked, it will run the code block within the context of
603 that port, so exception handling once more works as expected.
604
605 There is also a way to temporarily execute code in the context of some
606 port, namely C<peval>:
607
608 peval $port, sub {
609 # die'ing here will kil $port
610 };
611
612 The C<peval> function temporarily replaces C<$SELF> by the given C<$port>
613 and then executes the given sub in a port context.
614
615 =head3 Network Errors and the AEMP Guarantee
616
617 I mentioned another important source of monitoring failures: network
618 problems. When a node loses connection to another node, it will invoke all
619 monitoring actions as if the port was killed, even if it is possible that
620 the port still lives happily on another node (not being able to talk to a
621 node means we have no clue what's going on with it, it could be crashed,
622 but also still running without knowing we lost the connection).
623
624 So another way to view monitors is "notify me when some of my messages
625 couldn't be delivered". AEMP has a guarantee about message delivery to a
626 port: After starting a monitor, any message sent to a port will either
627 be delivered, or, when it is lost, any further messages will also be lost
628 until the monitoring action is invoked. After that, further messages
629 I<might> get delivered again.
630
631 This doesn't sound like a very big guarantee, but it is kind of the best
632 you can get while staying sane: Specifically, it means that there will
633 be no "holes" in the message sequence: all messages sent are delivered
634 in order, without any missing in between, and when some were lost, you
635 I<will> be notified of that, so you can take recovery action.
636
637 =head3 Supervising
638
639 Ok, so what is this crashing-everything-stuff going to make applications
640 I<more> stable? Well in fact, the goal is not really to make them more
641 stable, but to make them more resilient against actual errors and
642 crashes. And this is not done by crashing I<everything>, but by crashing
643 everything except a supervisor.
644
645 A supervisor is simply some code that ensures that an application (or a
646 part of it) is running, and if it crashes, is restarted properly.
647
648 To show how to do all this we will create a simple chat server that can
649 handle many chat clients. Both server and clients can be killed and
650 restarted, and even crash, to some extent.
651
652 =head2 Chatting, the Resilient Way
653
654 Without further ado, here is the chat server (to run it, we assume the
655 set-up explained earlier, with a separate F<aemp run> seed node):
656
657 use common::sense;
658 use AnyEvent::MP;
659 use AnyEvent::MP::Global;
660
661 configure;
662
663 my %clients;
664
665 sub msg {
666 print "relaying: $_[0]\n";
667 snd $_, $_[0]
668 for values %clients;
669 }
670
671 our $server = port;
672
673 rcv $server, join => sub {
674 my ($client, $nick) = @_;
675
676 $clients{$client} = $client;
677
678 mon $client, sub {
679 delete $clients{$client};
680 msg "$nick (quits, @_)";
681 };
682 msg "$nick (joins)";
683 };
684
685 rcv $server, privmsg => sub {
686 my ($nick, $msg) = @_;
687 msg "$nick: $msg";
688 };
689
690 grp_reg eg_chat_server => $server;
691
692 warn "server ready.\n";
693
694 AnyEvent->condvar->recv;
695
696 Looks like a lot, but it is actually quite simple: after your usual
697 preamble (this time we use common sense), we define a helper function that
698 sends some message to every registered chat client:
699
700 sub msg {
701 print "relaying: $_[0]\n";
702 snd $_, $_[0]
703 for values %clients;
704 }
705
706 The clients are stored in the hash C<%client>. Then we define a server
707 port and install two receivers on it, C<join>, which is sent by clients
708 to join the chat, and C<privmsg>, that clients use to send actual chat
709 messages.
710
711 C<join> is most complicated. It expects the client port and the nickname
712 to be passed in the message, and registers the client in C<%clients>.
713
714 rcv $server, join => sub {
715 my ($client, $nick) = @_;
716
717 $clients{$client} = $client;
718
719 The next step is to monitor the client. The monitoring action removes the
720 client and sends a quit message with the error to all remaining clients.
721
722 mon $client, sub {
723 delete $clients{$client};
724 msg "$nick (quits, @_)";
725 };
726
727 And finally, it creates a join message and sends it to all clients.
728
729 msg "$nick (joins)";
730 };
731
732 The C<privmsg> callback simply broadcasts the message to all clients:
733
734 rcv $server, privmsg => sub {
735 my ($nick, $msg) = @_;
736 msg "$nick: $msg";
737 };
738
739 And finally, the server registers itself in the server group, so that
740 clients can find it:
741
742 grp_reg eg_chat_server => $server;
743
744 Well, well... and where is this supervisor stuff? Well... we cheated,
745 it's not there. To not overcomplicate the example, we only put it into
746 the..... CLIENT!
747
748 =head3 The Client, and a Supervisor!
749
750 Again, here is the client, including supervisor, which makes it a bit
751 longer:
752
753 use common::sense;
754 use AnyEvent::MP;
755 use AnyEvent::MP::Global;
756
757 my $nick = shift;
758
759 configure;
760
761 my ($client, $server);
762
763 sub server_connect {
764 my $servernodes = grp_get "eg_chat_server"
765 or return after 1, \&server_connect;
766
767 print "\rconnecting...\n";
768
769 $client = port { print "\r \r@_\n> " };
770 mon $client, sub {
771 print "\rdisconnected @_\n";
772 &server_connect;
773 };
774
775 $server = $servernodes->[0];
776 snd $server, join => $client, $nick;
777 mon $server, $client;
778 }
779
780 server_connect;
781
782 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
783 chomp (my $line = <STDIN>);
784 print "> ";
785 snd $server, privmsg => $nick, $line
786 if $server;
787 });
788
789 $| = 1;
790 print "> ";
791 AnyEvent->condvar->recv;
792
793 The first thing the client does is to store the nick name (which is
794 expected as the only command line argument) in C<$nick>, for further
795 usage.
796
797 The next relevant thing is... finally... the supervisor:
798
799 sub server_connect {
800 my $servernodes = grp_get "eg_chat_server"
801 or return after 1, \&server_connect;
802
803 This looks up the server in the C<eg_chat_server> global group. If it
804 cannot find it (which is likely when the node is just starting up),
805 it will wait a second and then retry. This "wait a bit and retry"
806 is an important pattern, as distributed programming means lots of
807 things are going on asynchronously. In practise, one should use a more
808 intelligent algorithm, to possibly warn after an excessive number of
809 retries. Hopefully future versions of AnyEvent::MP will offer some
810 predefined supervisors, for now you will have to code it on your own.
811
812 Next it creates a local port for the server to send messages to, and
813 monitors it. When the port is killed, it will print "disconnected" and
814 tell the supervisor function to retry again.
815
816 $client = port { print "\r \r@_\n> " };
817 mon $client, sub {
818 print "\rdisconnected @_\n";
819 &server_connect;
820 };
821
822 Then everything is ready: the client will send a C<join> message with it's
823 local port to the server, and start monitoring it:
824
825 $server = $servernodes->[0];
826 snd $server, join => $client, $nick;
827 mon $server, $client;
828 }
829
830 The monitor will ensure that if the server crashes or goes away, the
831 client will be killed as well. This tells the user that the client was
832 disconnected, and will then start to connect the server again.
833
834 The rest of the program deals with the boring details of actually invoking
835 the supervisor function to start the whole client process and handle the
836 actual terminal input, sending it to the server.
837
838 You should now try to start the server and one or more clients in different
839 terminal windows (and the seed node):
840
841 perl eg/chat_client nick1
842 perl eg/chat_client nick2
843 perl eg/chat_server
844 aemp run profile seed
845
846 And then you can experiment with chatting, killing one or more clients, or
847 stopping and restarting the server, to see the monitoring in action.
848
849 The crucial point you should understand from this example is that
850 monitoring is usually symmetric: when you monitor some other port,
851 potentially on another node, that other port usually should monitor you,
852 too, so when the connection dies, both ports get killed, or at least both
853 sides can take corrective action. Exceptions are "servers" that serve
854 multiple clients at once and might only wish to clean up, and supervisors,
855 who of course should not normally get killed (unless they, too, have a
856 supervisor).
857
858 If you often think in object-oriented terms, then treat a port as an
859 object, C<port> is the constructor, the receive callbacks set by C<rcv>
860 act as methods, the C<kil> function becomes the explicit destructor and
861 C<mon> installs a destructor hook. Unlike conventional object oriented
862 programming, it can make sense to exchange ports more freely (for example,
863 to monitor one port from another).
864
865 There is ample room for improvement: the server should probably remember
866 the nickname in the C<join> handler instead of expecting it in every chat
867 message, it should probably monitor itself, and the client should not try
868 to send any messages unless a server is actually connected.
869
870 =head1 PART 3: TIMTOWTDI: Virtual Connections
871
872 The chat system developed in the previous sections is very "traditional"
873 in a way: you start some server(s) and some clients statically and they
874 start talking to each other.
875
876 Sometimes applications work more like "services": They can run on almost
877 any node and talks to itself on other nodes. The L<AnyEvent::MP::Global>
878 service for example monitors nodes joining the network and starts itself
879 automatically on other nodes (if it isn't running already).
880
881 A good way to design such applications is to put them into a module and
882 create "virtual connections" to other nodes - we call this the "bridge
883 head" method, because you start by creating a remote port (the bridge
884 head) and from that you start to bootstrap your application.
885
886 Since that sounds rather theoretical, let's redesign the chat server and
887 client using this design method.
888
889 Here is the server:
890
891 use common::sense;
892 use AnyEvent::MP;
893 use AnyEvent::MP::Global;
894
895 configure;
896
897 grp_reg eg_chat_server2 => $NODE;
898
899 my %clients;
900
901 sub msg {
902 print "relaying: $_[0]\n";
903 snd $_, $_[0]
904 for values %clients;
905 }
906
907 sub client_connect {
908 my ($client, $nick) = @_;
909
910 mon $client;
911 mon $client, sub {
912 delete $clients{$client};
913 msg "$nick (quits, @_)";
914 };
915
916 $clients{$client} = $client;
917
918 msg "$nick (joins)";
919
920 rcv $SELF, sub { msg "$nick: $_[0]" };
921 }
922
923 warn "server ready.\n";
924
925 AnyEvent->condvar->recv;
926
927 It starts out not much different then the previous example, except that
928 this time, we register the node port in the global group and not any port
929 we created - the clients only want to know which node the server should be
930 running on. In fact, they could also use some kind of election mechanism,
931 to find the node with lowest load or something like that.
932
933 The more interesting change is that indeed no server port is created -
934 the server consists only of code, and "does" nothing by itself. All it
935 does is define a function C<client_connect>, which expects a client port
936 and a nick name as arguments. It then monitors the client port and binds
937 a receive callback on C<$SELF>, which expects messages that in turn are
938 broadcast to all clients.
939
940 The two C<mon> calls are a bit tricky - the first C<mon> is a shorthand
941 for C<mon $client, $SELF>. The second does the normal "client has gone
942 away" clean-up action. Both could actually be rolled into one C<mon>
943 action.
944
945 C<$SELF> is a good hint that something interesting is going on. And
946 indeed, when looking at the client code, there is a new function,
947 C<spawn>:
948
949 use common::sense;
950 use AnyEvent::MP;
951 use AnyEvent::MP::Global;
952
953 my $nick = shift;
954
955 configure;
956
957 $| = 1;
958
959 my $port = port;
960
961 my ($client, $server);
962
963 sub server_connect {
964 my $servernodes = grp_get "eg_chat_server2"
965 or return after 1, \&server_connect;
966
967 print "\rconnecting...\n";
968
969 $client = port { print "\r \r@_\n> " };
970 mon $client, sub {
971 print "\rdisconnected @_\n";
972 &server_connect;
973 };
974
975 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
976 mon $server, $client;
977 }
978
979 server_connect;
980
981 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
982 chomp (my $line = <STDIN>);
983 print "> ";
984 snd $server, $line
985 if $server;
986 });
987
988 print "> ";
989 AnyEvent->condvar->recv;
990
991 The client is quite similar to the previous one, but instead of contacting
992 the server I<port> (which no longer exists), it C<spawn>s (creates) a new
993 the server I<port on node>:
994
995 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
996 mon $server, $client;
997
998 And of course the first thing after creating it is monitoring it.
999
1000 The C<spawn> function creates a new port on a remote node and returns
1001 its port ID. After creating the port it calls a function on the remote
1002 node, passing any remaining arguments to it, and - most importantly -
1003 executes the function within the context of the new port, so it can be
1004 manipulated by referring to C<$SELF>. The init function can reside in a
1005 module (actually it normally I<should> reside in a module) - AnyEvent::MP
1006 will automatically load the module if the function isn't defined.
1007
1008 The C<spawn> function returns immediately, which means you can instantly
1009 send messages to the port, long before the remote node has even heard
1010 of our request to create a port on it. In fact, the remote node might
1011 not even be running. Despite these troubling facts, everything should
1012 work just fine: if the node isn't running (or the init function throws an
1013 exception), then the monitor will trigger because the port doesn't exist.
1014
1015 If the spawn message gets delivered, but the monitoring message is not
1016 because of network problems (extremely unlikely, but monitoring, after
1017 all, is implemented by passing a message, and messages can get lost), then
1018 this connection loss will eventually trigger the monitoring action. On the
1019 remote node (which in return monitors the client) the port will also be
1020 cleaned up on connection loss. When the remote node comes up again and our
1021 monitoring message can be delivered, it will instantly fail because the
1022 port has been cleaned up in the meantime.
1023
1024 If your head is spinning by now, that's fine - just keep in mind, after
1025 creating a port, monitor it on the local node, and monitor "the other
1026 side" from the remote node, and all will be cleaned up just fine.
1027
1028 =head2 Services
1029
1030 Above it was mentioned that C<spawn> automatically loads modules, and this
1031 can be exploited in various ways.
1032
1033 Assume for a moment you put the server into a file called
1034 F<mymod/chatserver.pm> reachable from the current directory. Then you
1035 could run a node there with:
1036
1037 aemp run
1038
1039 The other nodes could C<spawn> the server by using
1040 C<mymod::chatserver::client_connect> as init function.
1041
1042 Likewise, when you have some service that starts automatically (similar to
1043 AnyEvent::MP::Global), then you can configure this service statically:
1044
1045 aemp profile mysrvnode services mymod::service::
1046 aemp run profile mysrvnode
1047
1048 And the module will automatically be loaded in the node, as specifying a
1049 module name (with C<::>-suffix) will simply load the module, which is then
1050 free to do whatever it wants.
1051
1052 Of course, you can also do it in the much more standard way by writing
1053 a module (e.g. C<BK::Backend::IRC>), installing it as part of a module
1054 distribution and then configure nodes, for example, if I want to run the
1055 Bummskraut IRC backend on a machine named "ruth", I could do this:
1056
1057 aemp profile ruth addservice BK::Backend::IRC::
1058
1059 And any F<aemp run> on that host will automatically have the Bummskraut
1060 IRC backend running.
1061
1062 That's plenty of possibilities you can use - it's all up to you how you
1063 structure your application.
1064
1065 =head1 PART 4: Coro::MP - selective receive
1066
1067 Not all problems lend themselves naturally to an event-based solution:
1068 sometimes things are easier if you can decide in what order you want to
1069 receive messages, irregardless of the order in which they were sent.
1070
1071 In these cases, L<Coro::MP> can provide a nice solution: instead of
1072 registering callbacks for each message type, C<Coro::MP> attached a
1073 (coro-) thread to a port. The thread can then opt to selectively receive
1074 messages it is interested in. Other messages are not lost, but queued, and
1075 can be received at a later time.
1076
1077 The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate
1078 module. It is, however, tightly integrated into C<AnyEvent::MP> - the
1079 ports it creates are fully compatible to C<AnyEvent::MP> ports.
1080
1081 In fact, C<Coro::MP> is more of an extension than a separate module: all
1082 functions exported by C<AnyEvent::MP> are exported by it as well.
1083
1084 To illustrate how programing with C<Coro::MP> looks like, consider the
1085 following (slightly contrived) example: Let's implement a server that
1086 accepts a C<< (write_file =>, $port, $path) >> message with a (source)
1087 port and a filename, followed by as many C<< (data => $port, $data) >>
1088 messages as required to fill the file, followed by an empty C<< (data =>
1089 $port) >> message.
1090
1091 The server only writes a single file at a time, other requests will stay
1092 in the queue until the current file has been finished.
1093
1094 Here is an example implementation that uses L<Coro::AIO> and largely
1095 ignores error handling:
1096
1097 my $ioserver = port_async {
1098 while () {
1099 my ($tag, $port, $path) = get_cond;
1100
1101 $tag eq "write_file"
1102 or die "only write_file messages expected";
1103
1104 my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1105 or die "$path: $!";
1106
1107 while () {
1108 my (undef, undef, $data) = get_cond {
1109 $_[0] eq "data" && $_[1] eq $port
1110 } 5
1111 or die "timeout waiting for data message from $port\n";
1112
1113 length $data or last;
1114
1115 aio_write $fh, undef, undef, $data, 0;
1116 };
1117 }
1118 };
1119
1120 mon $ioserver, sub {
1121 warn "ioserver was killed: @_\n";
1122 };
1123
1124 Let's go through it part by part.
1125
1126 my $ioserver = port_async {
1127
1128 Ports can be created by attaching a thread to an existing port via
1129 C<rcv_async>, or as here by calling C<port_async> with the code to execute
1130 as a thread. The C<async> component comes from the fact that threads are
1131 created using the C<Coro::async> function.
1132
1133 The thread runs in a normal port context (so C<$SELF> is set). In
1134 addition, when the thread returns, it will be C<kil> I<normally>, i.e.
1135 without a reason argument.
1136
1137 while () {
1138 my ($tag, $port, $path) = get_cond;
1139 or die "only write_file messages expected";
1140
1141 The thread is supposed to serve many file writes, which is why it executes
1142 in a loop. The first thing it does is fetch the next message, using
1143 C<get_cond>, the "conditional message get". Without a condition, it simply
1144 fetches the next message from the queue, which I<must> be a C<write_file>
1145 message.
1146
1147 The message contains the C<$path> to the file, which is then created:
1148
1149 my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1150 or die "$path: $!";
1151
1152 Then we enter a loop again, to serve as many C<data> messages as
1153 necessary:
1154
1155 while () {
1156 my (undef, undef, $data) = get_cond {
1157 $_[0] eq "data" && $_[1] eq $port
1158 } 5
1159 or die "timeout waiting for data message from $port\n";
1160
1161 This time, the condition is not empty, but instead a code block: similarly
1162 to grep, the code block will be called with C<@_> set to each message in
1163 the queue, and it has to return whether it wants to receive the message or
1164 not.
1165
1166 In this case we are interested in C<data> messages (C<< $_[0] eq "data"
1167 >>), whose first element is the source port (C<< $_[1] eq $port >>).
1168
1169 The condition must be this strict, as it is possible to receive both
1170 C<write_file> messages and C<data> messages from other ports while we
1171 handle the file writing.
1172
1173 The lone C<5> at the end is a timeout - when no matching message is
1174 received within C<5> seconds, we assume an error and C<die>.
1175
1176 When an empty C<data> message is received we are done and can close the
1177 file (which is done automatically as C<$fh> goes out of scope):
1178
1179 length $data or last;
1180
1181 Otherwise we need to write the data:
1182
1183 aio_write $fh, undef, undef, $data, 0;
1184
1185 That's basically it. Note that every process should have some kind of
1186 supervisor. In our case, the supervisor simply prints any error message:
1187
1188 mon $ioserver, sub {
1189 warn "ioserver was killed: @_\n";
1190 };
1191
1192 Here is a usage example:
1193
1194 port_async {
1195 snd $ioserver, write_file => $SELF, "/tmp/unsafe";
1196 snd $ioserver, data => $SELF, "abc\n";
1197 snd $ioserver, data => $SELF, "def\n";
1198 snd $ioserver, data => $SELF;
1199 };
1200
1201 The messages are sent without any flow control or acknowledgement (feel
1202 free to improve). Also, the source port does not actually need to be a
1203 port - any unique ID will do - but port identifiers happen to be a simple
1204 source of network-wide unique IDs.
1205
1206 Apart from C<get_cond> as seen above, there are other ways to receive
1207 messages. The C<write_file> message above could also selectively be
1208 received using a C<get> call:
1209
1210 my ($port, $path) = get "write_file";
1211
1212 This is simpler, but when some other code part sends an unexpected message
1213 to the C<$ioserver> it will stay in the queue forever. As a rule of thumb,
1214 every threaded port should have a "fetch next message unconditionally"
1215 somewhere, to avoid filling up the queue.
1216
1217 It is also possible to switch-like C<get_conds>:
1218
1219 get_cond {
1220 $_[0] eq "msg1" and return sub {
1221 my (undef, @msg1_data) = @_;
1222 ...;
1223 };
1224
1225 $_[0] eq "msg2" and return sub {
1226 my (undef, @msg2_data) = @_;
1227 ...;
1228 };
1229
1230 die "unexpected message $_[0] received";
1231 };
1232
1233 =head1 THE END
1234
1235 This is the end of this introduction, but hopefully not the end of
1236 your career as AEMP user. I hope the tutorial was enough to make the
1237 basic concepts clear. Keep in mind that distributed programming is not
1238 completely trivial, that AnyEvent::MP is still in it's infancy, and I hope
1239 it will be useful to create exciting new applications.
1240
1241 =head1 SEE ALSO
1242
1243 L<AnyEvent::MP>
1244
1245 L<AnyEvent::MP::Global>
1246
1247 L<Coro::MP>
1248
1249 L<AnyEvent>
1250
1251 =head1 AUTHOR
1252
1253 Robin Redeker <elmex@ta-sa.org>
1254 Marc Lehmann <schmorp@schmorp.de>
1255