ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Intro.pod
Revision: 1.51
Committed: Wed Mar 21 00:14:25 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.50: +52 -44 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 Message Passing for the Non-Blocked Mind
2
3 =head1 Introduction and Terminology
4
5 This is a tutorial about how to get the swing of the new L<AnyEvent::MP>
6 module, which allows programs to transparently pass messages within the
7 process and to other processes on the same or a different host.
8
9 What kind of messages? Basically a message here means a list of Perl
10 strings, numbers, hashes and arrays, anything that can be expressed as a
11 L<JSON> text (as JSON is the default serialiser in the protocol). Here are
12 two examples:
13
14 write_log => 1251555874, "action was successful.\n"
15 123, ["a", "b", "c"], { foo => "bar" }
16
17 When using L<AnyEvent::MP> it is customary to use a descriptive string as
18 first element of a message that indicates the type of the message. This
19 element is called a I<tag> in L<AnyEvent::MP>, as some API functions
20 (C<rcv>) support matching it directly.
21
22 Supposedly you want to send a ping message with your current time to
23 somewhere, this is how such a message might look like (in Perl syntax):
24
25 ping => 1251381636
26
27 Now that we know what a message is, to which entities are those
28 messages being I<passed>? They are I<passed> to I<ports>. A I<port> is
29 a destination for messages but also a context to execute code: when
30 a runtime error occurs while executing code belonging to a port, the
31 exception will be raised on the port and can even travel to interested
32 parties on other nodes, which makes supervision of distributed processes
33 easy.
34
35 How do these ports relate to things you know? Each I<port> belongs
36 to a I<node>, and a I<node> is just the UNIX process that runs your
37 L<AnyEvent::MP> application.
38
39 Each I<node> is distinguished from other I<nodes> running on the same or
40 another host in a network by its I<node ID>. A I<node ID> is simply a
41 unique string chosen manually or assigned by L<AnyEvent::MP> in some way
42 (UNIX nodename, random string...).
43
44 Here is a diagram about how I<nodes>, I<ports> and UNIX processes relate
45 to each other. The setup consists of two nodes (more are of course
46 possible): Node C<A> (in UNIX process 7066) with the ports C<ABC> and
47 C<DEF>. And the node C<B> (in UNIX process 8321) with the ports C<FOO> and
48 C<BAR>.
49
50
51 |- PID: 7066 -| |- PID: 8321 -|
52 | | | |
53 | Node ID: A | | Node ID: B |
54 | | | |
55 | Port ABC =|= <----\ /-----> =|= Port FOO |
56 | | X | |
57 | Port DEF =|= <----/ \-----> =|= Port BAR |
58 | | | |
59 |-------------| |-------------|
60
61 The strings for the I<port IDs> here are just for illustrative
62 purposes: Even though I<ports> in L<AnyEvent::MP> are also identified by
63 strings, they can't be chosen manually and are assigned by the system
64 dynamically. These I<port IDs> are unique within a network and can also be
65 used to identify senders, or even as message tags for instance.
66
67 The next sections will explain the API of L<AnyEvent::MP> by going through
68 a few simple examples. Later some more complex idioms are introduced,
69 which are hopefully useful to solve some real world problems.
70
71 =head2 Passing Your First Message
72
73 For starters, let's have a look at the messaging API. The following
74 example is just a demo to show the basic elements of message passing with
75 L<AnyEvent::MP>.
76
77 The example should print: C<Ending with: 123>, in a rather complicated
78 way, by passing some message to a port.
79
80 use AnyEvent;
81 use AnyEvent::MP;
82
83 my $end_cv = AnyEvent->condvar;
84
85 my $port = port;
86
87 rcv $port, test => sub {
88 my ($data) = @_;
89 $end_cv->send ($data);
90 };
91
92 snd $port, test => 123;
93
94 print "Ending with: " . $end_cv->recv . "\n";
95
96 It already uses most of the essential functions inside
97 L<AnyEvent::MP>: First there is the C<port> function which creates a
98 I<port> and will return it's I<port ID>, a simple string.
99
100 This I<port ID> can be used to send messages to the port and install
101 handlers to receive messages on the port. Since it is a simple string
102 it can be safely passed to other I<nodes> in the network when you want
103 to refer to that specific port (usually used for RPC, where you need
104 to tell the other end which I<port> to send the reply to - messages in
105 L<AnyEvent::MP> have a destination, but no source).
106
107 The next function is C<rcv>:
108
109 rcv $port, test => sub { ... };
110
111 It installs a receiver callback on the I<port> that specified as the first
112 argument (it only works for "local" ports, i.e. ports created on the same
113 node). The next argument, in this example C<test>, specifies a I<tag> to
114 match. This means that whenever a message with the first element being
115 the string C<test> is received, the callback is called with the remaining
116 parts of that message.
117
118 Messages can be sent with the C<snd> function, which is used like this in
119 the example above:
120
121 snd $port, test => 123;
122
123 This will send the message C<'test', 123> to the I<port> with the I<port
124 ID> stored in C<$port>. Since in this case the receiver has a I<tag> match
125 on C<test> it will call the callback with the first argument being the
126 number C<123>.
127
128 The callback is a typical AnyEvent idiom: the callback just passes
129 that number on to the I<condition variable> C<$end_cv> which will then
130 pass the value to the print. Condition variables are out of the scope
131 of this tutorial and not often used with ports, so please consult the
132 L<AnyEvent::Intro> about them.
133
134 Passing messages inside just one process is boring. Before we can move on
135 and do interprocess message passing we first have to make sure some things
136 have been set up correctly for our nodes to talk to each other.
137
138 =head2 System Requirements and System Setup
139
140 Before we can start with real IPC we have to make sure some things work on
141 your system.
142
143 First we have to setup a I<shared secret>: for two L<AnyEvent::MP>
144 I<nodes> to be able to communicate with each other over the network it is
145 necessary to setup the same I<shared secret> for both of them, so they can
146 prove their trustworthyness to each other.
147
148 The easiest way is to set this up is to use the F<aemp> utility:
149
150 aemp gensecret
151
152 This creates a F<$HOME/.perl-anyevent-mp> config file and generates a
153 random shared secret. You can copy this file to any other system and
154 then communicate over the network (via TCP) with it. You can also select
155 your own shared secret (F<aemp setsecret>) and for increased security
156 requirements you can even create (or configure) a TLS certificate (F<aemp
157 gencert>), causing connections to not just be securely authenticated, but
158 also to be encrypted and protected against tinkering.
159
160 Connections will only be successfully established when the I<nodes>
161 that want to connect to each other have the same I<shared secret> (or
162 successfully verify the TLS certificate of the other side, in which case
163 no shared secret is required).
164
165 B<If something does not work as expected, and for example tcpdump shows
166 that the connections are closed almost immediately, you should make sure
167 that F<~/.perl-anyevent-mp> is the same on all hosts/user accounts that
168 you try to connect with each other!>
169
170 Thats is all for now, you will find some more advanced fiddling with the
171 C<aemp> utility later.
172
173 =head2 Shooting the Trouble
174
175 Sometimes things go wrong, and AnyEvent::MP, being a professional module,
176 does not gratuitously spill out messages to your screen.
177
178 To help troubleshooting any issues, there are two environment variables
179 that you can set. The first, C<PERL_ANYEVENT_MP_WARNLEVEL> sets the
180 logging level. The default is C<5>, which means nothing much is
181 printed. You can increase it to C<8> or C<9> to get more verbose
182 output. This is example output when starting a node:
183
184 2012-03-04 19:41:10 <8> node cerebro starting up.
185 2012-03-04 19:41:10 <8> node listens on [10.0.0.1:4040].
186 2012-03-04 19:41:10 <9> trying connect to seed node 10.0.0.19:4040.
187 2012-03-04 19:41:10 <9> 10.0.0.19:4040 connected as rain
188 2012-03-04 19:41:10 <7> rain is up ()
189
190 A lot of info, but at least you can see that it does something.
191
192 The other environment variable that can be useful is
193 C<PERL_ANYEVENT_MP_TRACE>, which, when set to a true value, will cause
194 most messages that are sent or received to be printed. For example, F<aemp
195 restart rijk> might output these message exchanges:
196
197 SND rijk <- [null,"eval","AnyEvent::Watchdog::Util::restart; ()","aemp/cerebro/z4kUPp2JT4#b"]
198 SND rain <- [null,"g_slave",{"'l":{"aemp/cerebro/z4kUPp2JT4":["10.0.0.1:48168"]}}]
199 SND rain <- [null,"g_find","rijk"]
200 RCV rain -> ["","g_found","rijk",["10.0.0.23:4040"]]
201 RCV rijk -> ["b",""]
202
203 =head1 PART 1: Passing Messages Between Processes
204
205 =head2 The Receiver
206
207 Lets split the previous example up into two programs: one that contains
208 the sender and one for the receiver. First the receiver application, in
209 full:
210
211 use AnyEvent;
212 use AnyEvent::MP;
213
214 configure nodeid => "eg_receiver/%u", binds => ["*:4040"];
215
216 my $port = port;
217 db_set eg_receivers => $port;
218
219 rcv $port, test => sub {
220 my ($data, $reply_port) = @_;
221
222 print "Received data: " . $data . "\n";
223 };
224
225 AnyEvent->condvar->recv;
226
227 Now, that wasn't too bad, was it? OK, let's go through the new functions
228 that have been used.
229
230 =head3 C<configure> and Joining and Maintaining the Network
231
232 First let's have a look at C<configure>:
233
234 configure nodeid => "eg_receiver/%u", binds => ["*:4040"];
235
236 Before we are able to send messages to other nodes we have to initialise
237 ourself to become a "distributed node". Initialising a node means naming
238 the node and binding some TCP listeners so that other nodes can
239 contact it.
240
241 Additionally, to actually link all nodes in a network together, you can
242 specify a number of seed addresses, which will be used by the node to
243 connect itself into an existing network, as we will see shortly.
244
245 All of this (and more) can be passed to the C<configure> function - later
246 we will see how we can do all this without even passing anything to
247 C<configure>!
248
249 The first parameter, C<nodeid>, specified the node ID (in this case
250 C<eg_receiver/%u> - the default is to use the node name of the current
251 host plus C</%u>, which goves the node a name with a random suffix to
252 make it unique, but for this example we want the node to have a bit more
253 personality, and name it C<eg_receiver> with a random suffix.
254
255 Why the random suffix? Node IDs need to be unique within the network and
256 appending a random suffix is the easiest way to do that.
257
258 The second parameter, C<binds>, specifies a list of C<address:port> pairs
259 to bind TCP listeners on. The special "address" of C<*> means to bind on
260 every local IP address (this might not work on every OS, so explicit IP
261 addresses are best).
262
263 The reason to bind on a TCP port is not just that other nodes can connect
264 to us: if no binds are specified, the node will still bind on a dynamic
265 port on all local addresses - but in this case we won't know the port, and
266 cannot tell other nodes to connect to it as seed node.
267
268 Now, a I<seed> is simply the TCP address of some other node in the
269 network, often the same string as used for the C<binds> parameter of the
270 other node. The need for seeds is easy to explain: I<somehow> the nodes
271 of an aemp network have to find each other, and often this means over the
272 internet. So broadcasts are out.
273
274 Instead, a node usually specifies the addresses of a few (for redundancy)
275 other nodes, some of which should be up. Two nodes can set each other as
276 seeds without any issues. You could even specify all nodes as seeds for
277 all nodes, for total redundancy. But the common case is to have some more
278 or less central, stable servers running seed services for other nodes.
279
280 All you need to do to ensure that an AnyEvent::MP network connects
281 together is to make sure that all connections from nodes to their seed
282 nodes I<somehow> span the whole network. The simplest way to do that would
283 be for all nodes to specify a single node as seed node, and you would get
284 a star topology. If you specify all nodes as seed nodes, you get a fully
285 meshed network (that's what previous releases of AnyEvent::MP actually
286 did).
287
288 A node tries to keep connections open to all of it's seed nodes at all
289 times, while other connections are made on demand only.
290
291 All of this ensures that the network stays one network - even if all the
292 nodes in one half of the net are separated from the nodes in the other
293 half by some network problem, once that is over, they will eventually
294 become a single network again.
295
296 In addition to creating the network, a node also expects the seed nodes to
297 run the shared database service - if need be, by automatically starting it,
298 so you don't normally need to configure this explicitly.
299
300 #TODO# later?#d#
301 The process of joining a network takes time, during which the node
302 is already running. This means it takes time until the node is
303 fully connected, and information about services in the network are
304 available. This is why most AnyEvent::MP programs start by waiting a while
305 until the information they need is available.
306
307 We will see how this is done later, in the sender program.
308
309 =head3 Registering the Receiver
310
311 Coming back to our example, after the node has been configured for network
312 access, it is time to publish some service, namely the receive service.
313
314 For that, let's look at the next lines:
315
316 my $port = port;
317 db_set eg_receivers => $port;
318
319 The C<port> function has already been discussed. It simply creates a new
320 I<port> and returns the I<port ID>. The C<db_set> function, however, is
321 new: The first argument is the name of a I<database family> and the second
322 argument is the name of a I<subkey> within that family. The third argument
323 would be the I<value> to be associated with the family and subkey, but,
324 since it is missing, it will simply be C<undef>.
325
326 What is a "family" you wonder? Well, AnyEvent::MP comes with a distributed
327 database. This database runs on so-called "global" nodes, which usually
328 are the seed nodes of your network. The database structure is "simply" a
329 hash of hashes of values.
330
331 To illustrate this with Perl syntax, assume the database was stored in
332 C<%DB>, then the C<db_set> function more or less would do this:
333
334 $DB{eg_receivers}{$port} = undef;
335
336 So the ominous "family" selects a hash in the database, and the "subkey"
337 is simply the key in this hash - C<db_set> very much works like this
338 assignment.
339
340 The family namespace is shared by all nodes in a network, so the names
341 should be reasonably unique, for example, they could start with the name
342 of your module, or the name of the program, using your port name or node
343 name as subkey.
344
345 The purpose behind adding this key to the database is that the sender can
346 look it up and find our port. We will shortly see how.
347
348 The last step in the example is to set up a receiver callback for those
349 messages, just as was discussed in the first example. We again match
350 for the tag C<test>. The difference is that this time we don't exit the
351 application after receiving the first message. Instead we continue to wait
352 for new messages indefinitely.
353
354 =head2 The Sender
355
356 OK, now let's take a look at the sender code:
357
358 use AnyEvent;
359 use AnyEvent::MP;
360
361 configure nodeid => "eg_sender/%u", seeds => ["*:4040"];
362
363 my $guard = db_mon eg_receivers => sub {
364 my ($family, $a, $c, $d) = @_;
365 return unless %$family;
366
367 # now there are some receivers, send them a message
368 snd $_ => test => time
369 for keys %$family;
370 };
371
372 AnyEvent->condvar->recv;
373
374 It's even less code. The C<configure> serves the same purpose as in the
375 receiver, but instead of specifying binds we specify a list of seeds - the
376 only seed happens to be the same as the bind used by the receiver, which
377 therefore becomes our seed node.
378
379 Remember the part about having to wait till things become available? Well,
380 after configure returns, nothing has been done yet - the node is not
381 connected to the network, knows nothing about the database contents, and
382 it can take ages (for a computer :) for this situation to change.
383
384 Therefore, the sender waits, in this case by using the C<db_mon>
385 function. This function registers an interest in a specific database
386 family (in this case C<eg_receivers>). Each time something inside the
387 family changes (a key is added, changed or deleted), it will call our
388 callback with the family hash as first argument, and the list of keys as
389 second argument.
390
391 The callback only checks whether the C<%$family> has is empty - if it is,
392 then it doesn't do anything. But eventually the family will contain the
393 port subkey we set in the sender. Then it will send a message to it (and
394 any other receiver in the same family). Likewise, should the receiver go
395 away and come back, or should another receiver come up, it will again send
396 a message to all of them.
397
398 You can experiment by having multiple receivers - you have to change the
399 "binds" parameter in the receiver to the seeds used in the sender to start
400 up additional receivers, but then you can start as many as you like. If
401 you specify proper IP addresses for the seeds, you can even run them on
402 different computers.
403
404 Each time you start the sender, it will send a message to all receivers it
405 finds (you have to interrupt it manually afterwards).
406
407 Additional experiments you could try include using
408 C<PERL_ANYEVENT_MP_TRACE=1> to see which messages are exchanged, or
409 starting the sender before the receiver and see how long it then takes to
410 find the receiver.
411
412 =head3 Splitting Network Configuration and Application Code
413
414 OK, so far, this works reasonably. In the real world, however, the person
415 configuring your application to run on a specific network (the end user
416 or network administrator) is often different to the person coding the
417 application.
418
419 Or to put it differently: the arguments passed to configure are usually
420 provided not by the programmer, but by whoever is deploying the program -
421 even in the example above, we would like to be able to just start senders
422 and receivers without having to patch the programs.
423
424 To make this easy, AnyEvent::MP supports a simple configuration database,
425 using profiles, which can be managed using the F<aemp> command-line
426 utility (yes, this section is about the advanced tinkering mentioned
427 before).
428
429 When you change both programs above to simply call
430
431 configure;
432
433 then AnyEvent::MP tries to look up a profile using the current node name
434 in its configuration database, falling back to some global default.
435
436 You can run "generic" nodes using the F<aemp> utility as well, and we will
437 exploit this in the following way: we configure a profile "seed" and run
438 a node using it, whose sole purpose is to be a seed node for our example
439 programs.
440
441 We bind the seed node to port 4040 on all interfaces:
442
443 aemp profile seed binds "*:4040"
444
445 And we configure all nodes to use this as seed node (this only works when
446 running on the same host, for multiple machines you would replace the C<*>
447 by the IP address or hostname of the node running the seed), by changing
448 the global settings shared between all profiles:
449
450 aemp seeds "*:4040"
451
452 Then we run the seed node:
453
454 aemp run profile seed
455
456 After that, we can start as many other nodes as we want, and they will
457 all use our generic seed node to discover each other. The reason we can
458 start our existing programs even though they specify "incompatible"
459 parameters to C<configure> is that the configuration file (by default)
460 takes precedence over any arguments passed to C<configure>.
461
462 That's all for now - next we will teach you about monitoring by writing a
463 simple chat client and server :)
464
465 =head1 PART 2: Monitoring, Supervising, Exception Handling and Recovery
466
467 That's a mouthful, so what does it mean? Our previous example is what one
468 could call "very loosely coupled" - the sender doesn't care about whether
469 there are any receivers, and the receivers do not care if there is any
470 sender.
471
472 This can work fine for simple services, but most real-world applications
473 want to ensure that the side they are expecting to be there is actually
474 there. Going one step further: most bigger real-world applications even
475 want to ensure that if some component is missing, or has crashed, it will
476 still be there, by recovering and restarting the service.
477
478 AnyEvent::MP supports this by catching exceptions and network problems,
479 and notifying interested parties of these.
480
481 =head2 Exceptions, Port Context, Network Errors and Monitors
482
483 =head3 Exceptions
484
485 Exceptions are handled on a per-port basis: all receive callbacks are
486 executed in a special context, the so-called I<port-context>: code
487 that throws an otherwise uncaught exception will cause the port to be
488 C<kil>led. Killed ports are destroyed automatically (killing ports is
489 actually the only way to free ports).
490
491 Ports can be monitored, even from a different node and host, and when a
492 port is killed, any entity monitoring it will be notified.
493
494 Here is a simple example:
495
496 use AnyEvent::MP;
497
498 # create a port, it always dies
499 my $port = port { die "oops" };
500
501 # monitor it
502 mon $port, sub {
503 warn "$port was killed (with reason @_)";
504 };
505
506 # now send it some message, causing it to die:
507 snd $port;
508
509 AnyEvent->condvar->recv;
510
511 It first creates a port whose only action is to throw an exception,
512 and the monitors it with the C<mon> function. Afterwards it sends it a
513 message, causing it to die and call the monitoring callback:
514
515 anon/6WmIpj.a was killed (with reason die oops at xxx line 5.) at xxx line 9.
516
517 The callback was actually passed two arguments: C<die>, to indicate it
518 did throw an I<exception> as opposed to, say, a network error, and the
519 exception message itself.
520
521 What happens when a port is killed before we have a chance to monitor
522 it? Granted, this is highly unlikely in our example, but when you program
523 in a network this can easily happen due to races between nodes.
524
525 use AnyEvent::MP;
526
527 my $port = port { die "oops" };
528
529 snd $port;
530
531 mon $port, sub {
532 warn "$port was killed (with reason @_)";
533 };
534
535 AnyEvent->condvar->recv;
536
537 This time we will get something else:
538
539 2012-03-21 00:50:36 <2> unmonitored local port fADb died with reason: die oops at - line 3.
540 anon/fADb was killed (with reason no_such_port cannot monitor nonexistent port)
541
542 The first line is a warning that is printed when a port dies that isn't
543 being monitored, because that is normally a bug. When later a C<mon> is
544 attempted, it is immediately killed, because the port is already gone. The
545 kill reason is now C<no_such_port> with some descriptive (we hope) error
546 message.
547
548 As you probably suspect from these examples, the kill reason is usually
549 some identifier as first argument and a human-readable error message as
550 second argument - all kill reasons by AnyEvent::MP itself follow this
551 pattern. But the kill reason can be anything: it is simply a list of
552 values you can choose yourself. It can even be nothing (an empty list) -
553 this is called a "normal" kill.
554
555 Apart from die'ing, you can kill ports manually using the C<kil>
556 function. Using the C<kil> function will be treated like an error when a
557 non-empty reason is specified:
558
559 kil $port, custom_error => "don't like your steenking face";
560
561 And a I<normal> kill without any reason arguments:
562
563 kil $port;
564
565 By now you probably wonder what this "normal" kill business is: A common
566 idiom is to not specify a callback to C<mon>, but another port, such as
567 C<$SELF>:
568
569 mon $port, $SELF;
570
571 This basically means "monitor $port and kill me when it crashes" - and
572 the thing is, a "normal" kill does not count as a crash. This way you can
573 easily link ports together and make them crash together on errors, while
574 allowing you to remove a port silently when it has done it's job properly.
575
576 =head3 Port Context
577
578 Code runs in the so-called "port context". That means C<$SELF> contains
579 its own port ID and exceptions that the code throws will be caught.
580
581 Since AnyEvent::MP is event-based, it is not uncommon to register
582 callbacks from within C<rcv> handlers. As example, assume that the
583 following port receive handler wants to C<die> a second later, using
584 C<after>:
585
586 my $port = port {
587 after 1, sub { die "oops" };
588 };
589
590 If you try this out, you would find it does not work - when the C<after>
591 callback is executed, it does not run in the port context anymore, so
592 exceptions will not be caught.
593
594 For these cases, AnyEvent::MP exports a special "closure constructor"
595 called C<psub>, which works mostly like perl's built-in C<sub>:
596
597 my $port = port {
598 after 1, psub { die "oops" };
599 };
600
601 C<psub> remembers the port context and returns a code reference. When the
602 code reference is invoked, it will run the code block within the context
603 that it was created in, so exception handling once more works as expected.
604
605 There is even a way to temporarily execute code in the context of some
606 port, namely C<peval>:
607
608 peval $port, sub {
609 # die'ing here will kil $port
610 };
611
612 The C<peval> function temporarily replaces C<$SELF> by the given C<$port>
613 and then executes the given sub in a port context.
614
615 =head3 Network Errors and the AEMP Guarantee
616
617 I mentioned another important source of monitoring failures: network
618 problems. When a node loses connection to another node, it will invoke all
619 monitoring actions as if the port was killed, even if it is possible that
620 the port is still happily alive on another node (not being able to talk to
621 a node means we have no clue what's going on with it, it could be crashed,
622 but also still running without knowing we lost the connection).
623
624 So another way to view monitors is "notify me when some of my messages
625 couldn't be delivered". AEMP has a guarantee about message delivery to a
626 port: After starting a monitor, any message sent to a port will either
627 be delivered, or, when it is lost, any further messages will also be lost
628 until the monitoring action is invoked. After that, further messages
629 I<might> get delivered again.
630
631 This doesn't sound like a very big guarantee, but it is kind of the best
632 you can get while staying sane: Specifically, it means that there will
633 be no "holes" in the message sequence: all messages sent are delivered
634 in order, without any missing in between, and when some were lost, you
635 I<will> be notified of that, so you can take recovery action.
636
637 And, obviously, the guarantee only works in the presence of
638 correctly-working hardware, and no relevant bugs inside AEMP itself.
639
640 =head3 Supervising
641
642 OK, so how is this crashing-everything-stuff going to make applications
643 I<more> stable? Well, in fact, the goal is not really to make them more
644 stable, but to make them more resilient against actual errors and
645 crashes. And this is not done by crashing I<everything>, but by crashing
646 everything except a I<supervisor>.
647
648 A supervisor is simply some code that ensures that an application (or a
649 part of it) is running, and if it crashes, is restarted properly. That is,
650 it supervises a service by starting and restarting it, as necessary.
651
652 To show how to do all this we will create a simple chat server that can
653 handle many chat clients. Both server and clients can be killed and
654 restarted, and even crash, to some extent, without disturbing the chat
655 functionality.
656
657 =head2 Chatting, the Resilient Way
658
659 Without further ado, here is the chat server (to run it, we assume the
660 set-up explained earlier, with a separate F<aemp run seed> node):
661
662 use common::sense;
663 use AnyEvent::MP;
664 use AnyEvent::MP::Global;
665
666 configure;
667
668 my %clients;
669
670 sub msg {
671 print "relaying: $_[0]\n";
672 snd $_, $_[0]
673 for values %clients;
674 }
675
676 our $server = port;
677
678 rcv $server, join => sub {
679 my ($client, $nick) = @_;
680
681 $clients{$client} = $client;
682
683 mon $client, sub {
684 delete $clients{$client};
685 msg "$nick (quits, @_)";
686 };
687 msg "$nick (joins)";
688 };
689
690 rcv $server, privmsg => sub {
691 my ($nick, $msg) = @_;
692 msg "$nick: $msg";
693 };
694
695 db_set eg_chat_server => $server;
696
697 warn "server ready.\n";
698
699 AnyEvent->condvar->recv;
700
701 Looks like a lot, but it is actually quite simple: after your usual
702 preamble (this time we use common sense), we define a helper function that
703 sends some message to every registered chat client:
704
705 sub msg {
706 print "relaying: $_[0]\n";
707 snd $_, $_[0]
708 for values %clients;
709 }
710
711 The clients are stored in the hash C<%client>. Then we define a server
712 port and install two receivers on it, C<join>, which is sent by clients
713 to join the chat, and C<privmsg>, that clients use to send actual chat
714 messages.
715
716 C<join> is most complicated. It expects the client port and the nickname
717 to be passed in the message, and registers the client in C<%clients>.
718
719 rcv $server, join => sub {
720 my ($client, $nick) = @_;
721
722 $clients{$client} = $client;
723
724 The next step is to monitor the client. The monitoring action removes the
725 client and sends a quit message with the error to all remaining clients.
726
727 mon $client, sub {
728 delete $clients{$client};
729 msg "$nick (quits, @_)";
730 };
731
732 And finally, it creates a join message and sends it to all clients.
733
734 msg "$nick (joins)";
735 };
736
737 The C<privmsg> callback simply broadcasts the message to all clients:
738
739 rcv $server, privmsg => sub {
740 my ($nick, $msg) = @_;
741 msg "$nick: $msg";
742 };
743
744 And finally, the server registers itself in the server group, so that
745 clients can find it:
746
747 grp_reg eg_chat_server => $server;
748
749 Well, well... and where is this supervisor stuff? Well... we cheated,
750 it's not there. To not overcomplicate the example, we only put it into
751 the..... CLIENT!
752
753 =head3 The Client, and a Supervisor!
754
755 Again, here is the client, including supervisor, which makes it a bit
756 longer:
757
758 use common::sense;
759 use AnyEvent::MP;
760
761 my $nick = shift || "anonymous";
762
763 configure;
764
765 my ($client, $server);
766
767 sub server_connect {
768 my $db_mon;
769 $db_mon = db_mon eg_chat_server => sub {
770 return unless %{ $_[0] };
771 undef $db_mon;
772
773 print "\rconnecting...\n";
774
775 $client = port { print "\r \r@_\n> " };
776 mon $client, sub {
777 print "\rdisconnected @_\n";
778 &server_connect;
779 };
780
781 $server = (keys %{ $_[0] })[0];
782
783 snd $server, join => $client, $nick;
784 mon $server, $client;
785 };
786 }
787
788 server_connect;
789
790 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
791 chomp (my $line = <STDIN>);
792 print "> ";
793 snd $server, privmsg => $nick, $line
794 if $server;
795 });
796
797 $| = 1;
798 print "> ";
799 AnyEvent->condvar->recv;
800
801 The first thing the client does is to store the nick name (which is
802 expected as the only command line argument) in C<$nick>, for further
803 usage.
804
805 The next relevant thing is... finally... the supervisor:
806
807 #todo#d#
808 sub server_connect {
809 my $servernodes = grp_get "eg_chat_server"
810 or return after 1, \&server_connect;
811
812 This looks up the server in the C<eg_chat_server> global group. If it
813 cannot find it (which is likely when the node is just starting up),
814 it will wait a second and then retry. This "wait a bit and retry"
815 is an important pattern, as distributed programming means lots of
816 things are going on asynchronously. In practise, one should use a more
817 intelligent algorithm, to possibly warn after an excessive number of
818 retries. Hopefully future versions of AnyEvent::MP will offer some
819 predefined supervisors, for now you will have to code it on your own.
820
821 Next it creates a local port for the server to send messages to, and
822 monitors it. When the port is killed, it will print "disconnected" and
823 tell the supervisor function to retry again.
824
825 $client = port { print "\r \r@_\n> " };
826 mon $client, sub {
827 print "\rdisconnected @_\n";
828 &server_connect;
829 };
830
831 Then everything is ready: the client will send a C<join> message with it's
832 local port to the server, and start monitoring it:
833
834 $server = $servernodes->[0];
835 snd $server, join => $client, $nick;
836 mon $server, $client;
837 }
838
839 The monitor will ensure that if the server crashes or goes away, the
840 client will be killed as well. This tells the user that the client was
841 disconnected, and will then start to connect the server again.
842
843 The rest of the program deals with the boring details of actually invoking
844 the supervisor function to start the whole client process and handle the
845 actual terminal input, sending it to the server.
846
847 You should now try to start the server and one or more clients in different
848 terminal windows (and the seed node):
849
850 perl eg/chat_client nick1
851 perl eg/chat_client nick2
852 perl eg/chat_server
853 aemp run profile seed
854
855 And then you can experiment with chatting, killing one or more clients, or
856 stopping and restarting the server, to see the monitoring in action.
857
858 The crucial point you should understand from this example is that
859 monitoring is usually symmetric: when you monitor some other port,
860 potentially on another node, that other port usually should monitor you,
861 too, so when the connection dies, both ports get killed, or at least both
862 sides can take corrective action. Exceptions are "servers" that serve
863 multiple clients at once and might only wish to clean up, and supervisors,
864 who of course should not normally get killed (unless they, too, have a
865 supervisor).
866
867 If you often think in object-oriented terms, then treat a port as an
868 object, C<port> is the constructor, the receive callbacks set by C<rcv>
869 act as methods, the C<kil> function becomes the explicit destructor and
870 C<mon> installs a destructor hook. Unlike conventional object oriented
871 programming, it can make sense to exchange ports more freely (for example,
872 to monitor one port from another).
873
874 There is ample room for improvement: the server should probably remember
875 the nickname in the C<join> handler instead of expecting it in every chat
876 message, it should probably monitor itself, and the client should not try
877 to send any messages unless a server is actually connected.
878
879 =head1 PART 3: TIMTOWTDI: Virtual Connections
880
881 The chat system developed in the previous sections is very "traditional"
882 in a way: you start some server(s) and some clients statically and they
883 start talking to each other.
884
885 Sometimes applications work more like "services": They can run on almost
886 any node and talks to itself on other nodes. The L<AnyEvent::MP::Global>
887 service for example monitors nodes joining the network and starts itself
888 automatically on other nodes (if it isn't running already).
889
890 A good way to design such applications is to put them into a module and
891 create "virtual connections" to other nodes - we call this the "bridge
892 head" method, because you start by creating a remote port (the bridge
893 head) and from that you start to bootstrap your application.
894
895 Since that sounds rather theoretical, let's redesign the chat server and
896 client using this design method.
897
898 Here is the server:
899
900 use common::sense;
901 use AnyEvent::MP;
902 use AnyEvent::MP::Global;
903
904 configure;
905
906 grp_reg eg_chat_server2 => $NODE;
907
908 my %clients;
909
910 sub msg {
911 print "relaying: $_[0]\n";
912 snd $_, $_[0]
913 for values %clients;
914 }
915
916 sub client_connect {
917 my ($client, $nick) = @_;
918
919 mon $client;
920 mon $client, sub {
921 delete $clients{$client};
922 msg "$nick (quits, @_)";
923 };
924
925 $clients{$client} = $client;
926
927 msg "$nick (joins)";
928
929 rcv $SELF, sub { msg "$nick: $_[0]" };
930 }
931
932 warn "server ready.\n";
933
934 AnyEvent->condvar->recv;
935
936 It starts out not much different then the previous example, except that
937 this time, we register the node port in the global group and not any port
938 we created - the clients only want to know which node the server should be
939 running on. In fact, they could also use some kind of election mechanism,
940 to find the node with lowest load or something like that.
941
942 The more interesting change is that indeed no server port is created -
943 the server consists only of code, and "does" nothing by itself. All it
944 does is define a function C<client_connect>, which expects a client port
945 and a nick name as arguments. It then monitors the client port and binds
946 a receive callback on C<$SELF>, which expects messages that in turn are
947 broadcast to all clients.
948
949 The two C<mon> calls are a bit tricky - the first C<mon> is a shorthand
950 for C<mon $client, $SELF>. The second does the normal "client has gone
951 away" clean-up action. Both could actually be rolled into one C<mon>
952 action.
953
954 C<$SELF> is a good hint that something interesting is going on. And
955 indeed, when looking at the client code, there is a new function,
956 C<spawn>:
957
958 use common::sense;
959 use AnyEvent::MP;
960 use AnyEvent::MP::Global;
961
962 my $nick = shift;
963
964 configure;
965
966 $| = 1;
967
968 my $port = port;
969
970 my ($client, $server);
971
972 sub server_connect {
973 my $servernodes = grp_get "eg_chat_server2"
974 or return after 1, \&server_connect;
975
976 print "\rconnecting...\n";
977
978 $client = port { print "\r \r@_\n> " };
979 mon $client, sub {
980 print "\rdisconnected @_\n";
981 &server_connect;
982 };
983
984 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
985 mon $server, $client;
986 }
987
988 server_connect;
989
990 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
991 chomp (my $line = <STDIN>);
992 print "> ";
993 snd $server, $line
994 if $server;
995 });
996
997 print "> ";
998 AnyEvent->condvar->recv;
999
1000 The client is quite similar to the previous one, but instead of contacting
1001 the server I<port> (which no longer exists), it C<spawn>s (creates) a new
1002 the server I<port on node>:
1003
1004 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
1005 mon $server, $client;
1006
1007 And of course the first thing after creating it is monitoring it.
1008
1009 The C<spawn> function creates a new port on a remote node and returns
1010 its port ID. After creating the port it calls a function on the remote
1011 node, passing any remaining arguments to it, and - most importantly -
1012 executes the function within the context of the new port, so it can be
1013 manipulated by referring to C<$SELF>. The init function can reside in a
1014 module (actually it normally I<should> reside in a module) - AnyEvent::MP
1015 will automatically load the module if the function isn't defined.
1016
1017 The C<spawn> function returns immediately, which means you can instantly
1018 send messages to the port, long before the remote node has even heard
1019 of our request to create a port on it. In fact, the remote node might
1020 not even be running. Despite these troubling facts, everything should
1021 work just fine: if the node isn't running (or the init function throws an
1022 exception), then the monitor will trigger because the port doesn't exist.
1023
1024 If the spawn message gets delivered, but the monitoring message is not
1025 because of network problems (extremely unlikely, but monitoring, after
1026 all, is implemented by passing a message, and messages can get lost), then
1027 this connection loss will eventually trigger the monitoring action. On the
1028 remote node (which in return monitors the client) the port will also be
1029 cleaned up on connection loss. When the remote node comes up again and our
1030 monitoring message can be delivered, it will instantly fail because the
1031 port has been cleaned up in the meantime.
1032
1033 If your head is spinning by now, that's fine - just keep in mind, after
1034 creating a port, monitor it on the local node, and monitor "the other
1035 side" from the remote node, and all will be cleaned up just fine.
1036
1037 =head2 Services
1038
1039 Above it was mentioned that C<spawn> automatically loads modules, and this
1040 can be exploited in various ways.
1041
1042 Assume for a moment you put the server into a file called
1043 F<mymod/chatserver.pm> reachable from the current directory. Then you
1044 could run a node there with:
1045
1046 aemp run
1047
1048 The other nodes could C<spawn> the server by using
1049 C<mymod::chatserver::client_connect> as init function.
1050
1051 Likewise, when you have some service that starts automatically (similar to
1052 AnyEvent::MP::Global), then you can configure this service statically:
1053
1054 aemp profile mysrvnode services mymod::service::
1055 aemp run profile mysrvnode
1056
1057 And the module will automatically be loaded in the node, as specifying a
1058 module name (with C<::>-suffix) will simply load the module, which is then
1059 free to do whatever it wants.
1060
1061 Of course, you can also do it in the much more standard way by writing
1062 a module (e.g. C<BK::Backend::IRC>), installing it as part of a module
1063 distribution and then configure nodes, for example, if I want to run the
1064 Bummskraut IRC backend on a machine named "ruth", I could do this:
1065
1066 aemp profile ruth addservice BK::Backend::IRC::
1067
1068 And any F<aemp run> on that host will automatically have the Bummskraut
1069 IRC backend running.
1070
1071 That's plenty of possibilities you can use - it's all up to you how you
1072 structure your application.
1073
1074 =head1 PART 4: Coro::MP - selective receive
1075
1076 Not all problems lend themselves naturally to an event-based solution:
1077 sometimes things are easier if you can decide in what order you want to
1078 receive messages, irregardless of the order in which they were sent.
1079
1080 In these cases, L<Coro::MP> can provide a nice solution: instead of
1081 registering callbacks for each message type, C<Coro::MP> attached a
1082 (coro-) thread to a port. The thread can then opt to selectively receive
1083 messages it is interested in. Other messages are not lost, but queued, and
1084 can be received at a later time.
1085
1086 The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate
1087 module. It is, however, tightly integrated into C<AnyEvent::MP> - the
1088 ports it creates are fully compatible to C<AnyEvent::MP> ports.
1089
1090 In fact, C<Coro::MP> is more of an extension than a separate module: all
1091 functions exported by C<AnyEvent::MP> are exported by it as well.
1092
1093 To illustrate how programing with C<Coro::MP> looks like, consider the
1094 following (slightly contrived) example: Let's implement a server that
1095 accepts a C<< (write_file =>, $port, $path) >> message with a (source)
1096 port and a filename, followed by as many C<< (data => $port, $data) >>
1097 messages as required to fill the file, followed by an empty C<< (data =>
1098 $port) >> message.
1099
1100 The server only writes a single file at a time, other requests will stay
1101 in the queue until the current file has been finished.
1102
1103 Here is an example implementation that uses L<Coro::AIO> and largely
1104 ignores error handling:
1105
1106 my $ioserver = port_async {
1107 while () {
1108 my ($tag, $port, $path) = get_cond;
1109
1110 $tag eq "write_file"
1111 or die "only write_file messages expected";
1112
1113 my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1114 or die "$path: $!";
1115
1116 while () {
1117 my (undef, undef, $data) = get_cond {
1118 $_[0] eq "data" && $_[1] eq $port
1119 } 5
1120 or die "timeout waiting for data message from $port\n";
1121
1122 length $data or last;
1123
1124 aio_write $fh, undef, undef, $data, 0;
1125 };
1126 }
1127 };
1128
1129 mon $ioserver, sub {
1130 warn "ioserver was killed: @_\n";
1131 };
1132
1133 Let's go through it part by part.
1134
1135 my $ioserver = port_async {
1136
1137 Ports can be created by attaching a thread to an existing port via
1138 C<rcv_async>, or as here by calling C<port_async> with the code to execute
1139 as a thread. The C<async> component comes from the fact that threads are
1140 created using the C<Coro::async> function.
1141
1142 The thread runs in a normal port context (so C<$SELF> is set). In
1143 addition, when the thread returns, it will be C<kil> I<normally>, i.e.
1144 without a reason argument.
1145
1146 while () {
1147 my ($tag, $port, $path) = get_cond;
1148 or die "only write_file messages expected";
1149
1150 The thread is supposed to serve many file writes, which is why it executes
1151 in a loop. The first thing it does is fetch the next message, using
1152 C<get_cond>, the "conditional message get". Without a condition, it simply
1153 fetches the next message from the queue, which I<must> be a C<write_file>
1154 message.
1155
1156 The message contains the C<$path> to the file, which is then created:
1157
1158 my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1159 or die "$path: $!";
1160
1161 Then we enter a loop again, to serve as many C<data> messages as
1162 necessary:
1163
1164 while () {
1165 my (undef, undef, $data) = get_cond {
1166 $_[0] eq "data" && $_[1] eq $port
1167 } 5
1168 or die "timeout waiting for data message from $port\n";
1169
1170 This time, the condition is not empty, but instead a code block: similarly
1171 to grep, the code block will be called with C<@_> set to each message in
1172 the queue, and it has to return whether it wants to receive the message or
1173 not.
1174
1175 In this case we are interested in C<data> messages (C<< $_[0] eq "data"
1176 >>), whose first element is the source port (C<< $_[1] eq $port >>).
1177
1178 The condition must be this strict, as it is possible to receive both
1179 C<write_file> messages and C<data> messages from other ports while we
1180 handle the file writing.
1181
1182 The lone C<5> at the end is a timeout - when no matching message is
1183 received within C<5> seconds, we assume an error and C<die>.
1184
1185 When an empty C<data> message is received we are done and can close the
1186 file (which is done automatically as C<$fh> goes out of scope):
1187
1188 length $data or last;
1189
1190 Otherwise we need to write the data:
1191
1192 aio_write $fh, undef, undef, $data, 0;
1193
1194 That's basically it. Note that every process should have some kind of
1195 supervisor. In our case, the supervisor simply prints any error message:
1196
1197 mon $ioserver, sub {
1198 warn "ioserver was killed: @_\n";
1199 };
1200
1201 Here is a usage example:
1202
1203 port_async {
1204 snd $ioserver, write_file => $SELF, "/tmp/unsafe";
1205 snd $ioserver, data => $SELF, "abc\n";
1206 snd $ioserver, data => $SELF, "def\n";
1207 snd $ioserver, data => $SELF;
1208 };
1209
1210 The messages are sent without any flow control or acknowledgement (feel
1211 free to improve). Also, the source port does not actually need to be a
1212 port - any unique ID will do - but port identifiers happen to be a simple
1213 source of network-wide unique IDs.
1214
1215 Apart from C<get_cond> as seen above, there are other ways to receive
1216 messages. The C<write_file> message above could also selectively be
1217 received using a C<get> call:
1218
1219 my ($port, $path) = get "write_file";
1220
1221 This is simpler, but when some other code part sends an unexpected message
1222 to the C<$ioserver> it will stay in the queue forever. As a rule of thumb,
1223 every threaded port should have a "fetch next message unconditionally"
1224 somewhere, to avoid filling up the queue.
1225
1226 It is also possible to switch-like C<get_conds>:
1227
1228 get_cond {
1229 $_[0] eq "msg1" and return sub {
1230 my (undef, @msg1_data) = @_;
1231 ...;
1232 };
1233
1234 $_[0] eq "msg2" and return sub {
1235 my (undef, @msg2_data) = @_;
1236 ...;
1237 };
1238
1239 die "unexpected message $_[0] received";
1240 };
1241
1242 =head1 THE END
1243
1244 This is the end of this introduction, but hopefully not the end of
1245 your career as AEMP user. I hope the tutorial was enough to make the
1246 basic concepts clear. Keep in mind that distributed programming is not
1247 completely trivial, that AnyEvent::MP is still in it's infancy, and I hope
1248 it will be useful to create exciting new applications.
1249
1250 =head1 SEE ALSO
1251
1252 L<AnyEvent::MP>
1253
1254 L<AnyEvent::MP::Global>
1255
1256 L<Coro::MP>
1257
1258 L<AnyEvent>
1259
1260 =head1 AUTHOR
1261
1262 Robin Redeker <elmex@ta-sa.org>
1263 Marc Lehmann <schmorp@schmorp.de>
1264