ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Intro.pod
Revision: 1.54
Committed: Wed Mar 21 23:48:39 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.53: +16 -11 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 Message Passing for the Non-Blocked Mind
2
3 =head1 Introduction and Terminology
4
5 This is a tutorial about how to get the swing of the new L<AnyEvent::MP>
6 module, which allows programs to transparently pass messages within the
7 process and to other processes on the same or a different host.
8
9 What kind of messages? Basically a message here means a list of Perl
10 strings, numbers, hashes and arrays, anything that can be expressed as a
11 L<JSON> text (as JSON is the default serialiser in the protocol). Here are
12 two examples:
13
14 write_log => 1251555874, "action was successful.\n"
15 123, ["a", "b", "c"], { foo => "bar" }
16
17 When using L<AnyEvent::MP> it is customary to use a descriptive string as
18 first element of a message that indicates the type of the message. This
19 element is called a I<tag> in L<AnyEvent::MP>, as some API functions
20 (C<rcv>) support matching it directly.
21
22 Supposedly you want to send a ping message with your current time to
23 somewhere, this is how such a message might look like (in Perl syntax):
24
25 ping => 1251381636
26
27 Now that we know what a message is, to which entities are those
28 messages being I<passed>? They are I<passed> to I<ports>. A I<port> is
29 a destination for messages but also a context to execute code: when
30 a runtime error occurs while executing code belonging to a port, the
31 exception will be raised on the port and can even travel to interested
32 parties on other nodes, which makes supervision of distributed processes
33 easy.
34
35 How do these ports relate to things you know? Each I<port> belongs
36 to a I<node>, and a I<node> is just the UNIX process that runs your
37 L<AnyEvent::MP> application.
38
39 Each I<node> is distinguished from other I<nodes> running on the same or
40 another host in a network by its I<node ID>. A I<node ID> is simply a
41 unique string chosen manually or assigned by L<AnyEvent::MP> in some way
42 (UNIX nodename, random string...).
43
44 Here is a diagram about how I<nodes>, I<ports> and UNIX processes relate
45 to each other. The setup consists of two nodes (more are of course
46 possible): Node C<A> (in UNIX process 7066) with the ports C<ABC> and
47 C<DEF>. And the node C<B> (in UNIX process 8321) with the ports C<FOO> and
48 C<BAR>.
49
50
51 |- PID: 7066 -| |- PID: 8321 -|
52 | | | |
53 | Node ID: A | | Node ID: B |
54 | | | |
55 | Port ABC =|= <----\ /-----> =|= Port FOO |
56 | | X | |
57 | Port DEF =|= <----/ \-----> =|= Port BAR |
58 | | | |
59 |-------------| |-------------|
60
61 The strings for the I<port IDs> here are just for illustrative
62 purposes: Even though I<ports> in L<AnyEvent::MP> are also identified by
63 strings, they can't be chosen manually and are assigned by the system
64 dynamically. These I<port IDs> are unique within a network and can also be
65 used to identify senders, or even as message tags for instance.
66
67 The next sections will explain the API of L<AnyEvent::MP> by going through
68 a few simple examples. Later some more complex idioms are introduced,
69 which are hopefully useful to solve some real world problems.
70
71 =head2 Passing Your First Message
72
73 For starters, let's have a look at the messaging API. The following
74 example is just a demo to show the basic elements of message passing with
75 L<AnyEvent::MP>.
76
77 The example should print: C<Ending with: 123>, in a rather complicated
78 way, by passing some message to a port.
79
80 use AnyEvent;
81 use AnyEvent::MP;
82
83 my $end_cv = AnyEvent->condvar;
84
85 my $port = port;
86
87 rcv $port, test => sub {
88 my ($data) = @_;
89 $end_cv->send ($data);
90 };
91
92 snd $port, test => 123;
93
94 print "Ending with: " . $end_cv->recv . "\n";
95
96 It already uses most of the essential functions inside
97 L<AnyEvent::MP>: First there is the C<port> function which creates a
98 I<port> and will return it's I<port ID>, a simple string.
99
100 This I<port ID> can be used to send messages to the port and install
101 handlers to receive messages on the port. Since it is a simple string
102 it can be safely passed to other I<nodes> in the network when you want
103 to refer to that specific port (usually used for RPC, where you need
104 to tell the other end which I<port> to send the reply to - messages in
105 L<AnyEvent::MP> have a destination, but no source).
106
107 The next function is C<rcv>:
108
109 rcv $port, test => sub { ... };
110
111 It installs a receiver callback on the I<port> that specified as the first
112 argument (it only works for "local" ports, i.e. ports created on the same
113 node). The next argument, in this example C<test>, specifies a I<tag> to
114 match. This means that whenever a message with the first element being
115 the string C<test> is received, the callback is called with the remaining
116 parts of that message.
117
118 Messages can be sent with the C<snd> function, which is used like this in
119 the example above:
120
121 snd $port, test => 123;
122
123 This will send the message C<'test', 123> to the I<port> with the I<port
124 ID> stored in C<$port>. Since in this case the receiver has a I<tag> match
125 on C<test> it will call the callback with the first argument being the
126 number C<123>.
127
128 The callback is a typical AnyEvent idiom: the callback just passes
129 that number on to the I<condition variable> C<$end_cv> which will then
130 pass the value to the print. Condition variables are out of the scope
131 of this tutorial and not often used with ports, so please consult the
132 L<AnyEvent::Intro> about them.
133
134 Passing messages inside just one process is boring. Before we can move on
135 and do interprocess message passing we first have to make sure some things
136 have been set up correctly for our nodes to talk to each other.
137
138 =head2 System Requirements and System Setup
139
140 Before we can start with real IPC we have to make sure some things work on
141 your system.
142
143 First we have to setup a I<shared secret>: for two L<AnyEvent::MP>
144 I<nodes> to be able to communicate with each other over the network it is
145 necessary to setup the same I<shared secret> for both of them, so they can
146 prove their trustworthyness to each other.
147
148 The easiest way is to set this up is to use the F<aemp> utility:
149
150 aemp gensecret
151
152 This creates a F<$HOME/.perl-anyevent-mp> config file and generates a
153 random shared secret. You can copy this file to any other system and
154 then communicate over the network (via TCP) with it. You can also select
155 your own shared secret (F<aemp setsecret>) and for increased security
156 requirements you can even create (or configure) a TLS certificate (F<aemp
157 gencert>), causing connections to not just be securely authenticated, but
158 also to be encrypted and protected against tinkering.
159
160 Connections will only be successfully established when the I<nodes>
161 that want to connect to each other have the same I<shared secret> (or
162 successfully verify the TLS certificate of the other side, in which case
163 no shared secret is required).
164
165 B<If something does not work as expected, and for example tcpdump shows
166 that the connections are closed almost immediately, you should make sure
167 that F<~/.perl-anyevent-mp> is the same on all hosts/user accounts that
168 you try to connect with each other!>
169
170 Thats is all for now, you will find some more advanced fiddling with the
171 C<aemp> utility later.
172
173 =head2 Shooting the Trouble
174
175 Sometimes things go wrong, and AnyEvent::MP, being a professional module,
176 does not gratuitously spill out messages to your screen.
177
178 To help troubleshooting any issues, there are two environment variables
179 that you can set. The first, C<PERL_ANYEVENT_MP_WARNLEVEL> sets the
180 logging level. The default is C<5>, which means nothing much is
181 printed. You can increase it to C<8> or C<9> to get more verbose
182 output. This is example output when starting a node:
183
184 2012-03-04 19:41:10 <8> node cerebro starting up.
185 2012-03-04 19:41:10 <8> node listens on [10.0.0.1:4040].
186 2012-03-04 19:41:10 <9> trying connect to seed node 10.0.0.19:4040.
187 2012-03-04 19:41:10 <9> 10.0.0.19:4040 connected as rain
188 2012-03-04 19:41:10 <7> rain is up ()
189
190 A lot of info, but at least you can see that it does something.
191
192 The other environment variable that can be useful is
193 C<PERL_ANYEVENT_MP_TRACE>, which, when set to a true value, will cause
194 most messages that are sent or received to be printed. For example, F<aemp
195 restart rijk> might output these message exchanges:
196
197 SND rijk <- [null,"eval","AnyEvent::Watchdog::Util::restart; ()","aemp/cerebro/z4kUPp2JT4#b"]
198 SND rain <- [null,"g_slave",{"'l":{"aemp/cerebro/z4kUPp2JT4":["10.0.0.1:48168"]}}]
199 SND rain <- [null,"g_find","rijk"]
200 RCV rain -> ["","g_found","rijk",["10.0.0.23:4040"]]
201 RCV rijk -> ["b",""]
202
203 =head1 PART 1: Passing Messages Between Processes
204
205 =head2 The Receiver
206
207 Lets split the previous example up into two programs: one that contains
208 the sender and one for the receiver. First the receiver application, in
209 full:
210
211 use AnyEvent;
212 use AnyEvent::MP;
213
214 configure nodeid => "eg_receiver/%u", binds => ["*:4040"];
215
216 my $port = port;
217 db_set eg_receivers => $port;
218
219 rcv $port, test => sub {
220 my ($data, $reply_port) = @_;
221
222 print "Received data: " . $data . "\n";
223 };
224
225 AnyEvent->condvar->recv;
226
227 Now, that wasn't too bad, was it? OK, let's go through the new functions
228 that have been used.
229
230 =head3 C<configure> and Joining and Maintaining the Network
231
232 First let's have a look at C<configure>:
233
234 configure nodeid => "eg_receiver/%u", binds => ["*:4040"];
235
236 Before we are able to send messages to other nodes we have to initialise
237 ourself to become a "distributed node". Initialising a node means naming
238 the node and binding some TCP listeners so that other nodes can
239 contact it.
240
241 Additionally, to actually link all nodes in a network together, you can
242 specify a number of seed addresses, which will be used by the node to
243 connect itself into an existing network, as we will see shortly.
244
245 All of this (and more) can be passed to the C<configure> function - later
246 we will see how we can do all this without even passing anything to
247 C<configure>!
248
249 The first parameter, C<nodeid>, specified the node ID (in this case
250 C<eg_receiver/%u> - the default is to use the node name of the current
251 host plus C</%u>, which goves the node a name with a random suffix to
252 make it unique, but for this example we want the node to have a bit more
253 personality, and name it C<eg_receiver> with a random suffix.
254
255 Why the random suffix? Node IDs need to be unique within the network and
256 appending a random suffix is the easiest way to do that.
257
258 The second parameter, C<binds>, specifies a list of C<address:port> pairs
259 to bind TCP listeners on. The special "address" of C<*> means to bind on
260 every local IP address (this might not work on every OS, so explicit IP
261 addresses are best).
262
263 The reason to bind on a TCP port is not just that other nodes can connect
264 to us: if no binds are specified, the node will still bind on a dynamic
265 port on all local addresses - but in this case we won't know the port, and
266 cannot tell other nodes to connect to it as seed node.
267
268 Now, a I<seed> is simply the TCP address of some other node in the
269 network, often the same string as used for the C<binds> parameter of the
270 other node. The need for seeds is easy to explain: I<somehow> the nodes
271 of an aemp network have to find each other, and often this means over the
272 internet. So broadcasts are out.
273
274 Instead, a node usually specifies the addresses of a few (for redundancy)
275 other nodes, some of which should be up. Two nodes can set each other as
276 seeds without any issues. You could even specify all nodes as seeds for
277 all nodes, for total redundancy. But the common case is to have some more
278 or less central, stable servers running seed services for other nodes.
279
280 All you need to do to ensure that an AnyEvent::MP network connects
281 together is to make sure that all seed nodes are connected together via
282 their seed connections, i.e., all connections from seed nodes to I<their>
283 seed nodes form a connected graph. It's not necessary (but common) for a
284 seed node to list all other seed nodes as seeds. The rest of the nodes in
285 the network simply specify one or more of the seed nodes in their seed
286 list.
287
288 The simplest way to do that would be for all nodes to specify a single
289 node as seed node, and you would get a star topology. If you specify all
290 nodes as seed nodes, you get a fully meshed network (that's what previous
291 releases of AnyEvent::MP actually did).
292
293 A node tries to keep connections open to all of it's seed nodes at all
294 times, while other connections are made on demand only.
295
296 All of this ensures that the network stays one network - even if all the
297 nodes in one half of the net are separated from the nodes in the other
298 half by some network problem, once that is over, they will eventually
299 become a single network again.
300
301 In addition to creating the network, a node also expects the seed nodes to
302 run the shared database service - if need be, by automatically starting
303 it, so you don't normally need to configure this explicitly.
304
305 The process of joining a network takes time, during which the node
306 is already running. This means it takes time until the node is
307 fully connected, and information about services in the network are
308 available. This is why most AnyEvent::MP programs either just register
309 themselves in the database and wait to be "found" by others, or they start
310 to monitor the database until some nodes of the required type show up.
311
312 We will see how this is done later, in the sender program.
313
314 =head3 Registering the Receiver
315
316 Coming back to our example, after the node has been configured for network
317 access, it is time to publish some service, namely the receive service.
318
319 For that, let's look at the next lines:
320
321 my $port = port;
322 db_set eg_receivers => $port;
323
324 The C<port> function has already been discussed. It simply creates a new
325 I<port> and returns the I<port ID>. The C<db_set> function, however, is
326 new: The first argument is the name of a I<database family> and the second
327 argument is the name of a I<subkey> within that family. The third argument
328 would be the I<value> to be associated with the family and subkey, but,
329 since it is missing, it will simply be C<undef>.
330
331 What is a "family" you wonder? Well, AnyEvent::MP comes with a distributed
332 database. This database runs on so-called "global" nodes, which usually
333 are the seed nodes of your network. The database structure is "simply" a
334 hash of hashes of values.
335
336 To illustrate this with Perl syntax, assume the database was stored in
337 C<%DB>, then the C<db_set> function more or less would do this:
338
339 $DB{eg_receivers}{$port} = undef;
340
341 So the ominous "family" selects a hash in the database, and the "subkey"
342 is simply the key in this hash - C<db_set> very much works like this
343 assignment.
344
345 The family namespace is shared by all nodes in a network, so the names
346 should be reasonably unique, for example, they could start with the name
347 of your module, or the name of the program, using your port name or node
348 name as subkey.
349
350 The purpose behind adding this key to the database is that the sender can
351 look it up and find our port. We will shortly see how.
352
353 The last step in the example is to set up a receiver callback for those
354 messages, just as was discussed in the first example. We again match
355 for the tag C<test>. The difference is that this time we don't exit the
356 application after receiving the first message. Instead we continue to wait
357 for new messages indefinitely.
358
359 =head2 The Sender
360
361 OK, now let's take a look at the sender code:
362
363 use AnyEvent;
364 use AnyEvent::MP;
365
366 configure nodeid => "eg_sender/%u", seeds => ["*:4040"];
367
368 my $guard = db_mon eg_receivers => sub {
369 my ($family, $a, $c, $d) = @_;
370 return unless %$family;
371
372 # now there are some receivers, send them a message
373 snd $_ => test => time
374 for keys %$family;
375 };
376
377 AnyEvent->condvar->recv;
378
379 It's even less code. The C<configure> serves the same purpose as in the
380 receiver, but instead of specifying binds we specify a list of seeds - the
381 only seed happens to be the same as the bind used by the receiver, which
382 therefore becomes our seed node.
383
384 Remember the part about having to wait till things become available? Well,
385 after configure returns, nothing has been done yet - the node is not
386 connected to the network, knows nothing about the database contents, and
387 it can take ages (for a computer :) for this situation to change.
388
389 Therefore, the sender waits, in this case by using the C<db_mon>
390 function. This function registers an interest in a specific database
391 family (in this case C<eg_receivers>). Each time something inside the
392 family changes (a key is added, changed or deleted), it will call our
393 callback with the family hash as first argument, and the list of keys as
394 second argument.
395
396 The callback only checks whether the C<%$family> has is empty - if it is,
397 then it doesn't do anything. But eventually the family will contain the
398 port subkey we set in the sender. Then it will send a message to it (and
399 any other receiver in the same family). Likewise, should the receiver go
400 away and come back, or should another receiver come up, it will again send
401 a message to all of them.
402
403 You can experiment by having multiple receivers - you have to change the
404 "binds" parameter in the receiver to the seeds used in the sender to start
405 up additional receivers, but then you can start as many as you like. If
406 you specify proper IP addresses for the seeds, you can even run them on
407 different computers.
408
409 Each time you start the sender, it will send a message to all receivers it
410 finds (you have to interrupt it manually afterwards).
411
412 Additional experiments you could try include using
413 C<PERL_ANYEVENT_MP_TRACE=1> to see which messages are exchanged, or
414 starting the sender before the receiver and see how long it then takes to
415 find the receiver.
416
417 =head3 Splitting Network Configuration and Application Code
418
419 OK, so far, this works reasonably. In the real world, however, the person
420 configuring your application to run on a specific network (the end user
421 or network administrator) is often different to the person coding the
422 application.
423
424 Or to put it differently: the arguments passed to configure are usually
425 provided not by the programmer, but by whoever is deploying the program -
426 even in the example above, we would like to be able to just start senders
427 and receivers without having to patch the programs.
428
429 To make this easy, AnyEvent::MP supports a simple configuration database,
430 using profiles, which can be managed using the F<aemp> command-line
431 utility (yes, this section is about the advanced tinkering mentioned
432 before).
433
434 When you change both programs above to simply call
435
436 configure;
437
438 then AnyEvent::MP tries to look up a profile using the current node name
439 in its configuration database, falling back to some global default.
440
441 You can run "generic" nodes using the F<aemp> utility as well, and we will
442 exploit this in the following way: we configure a profile "seed" and run
443 a node using it, whose sole purpose is to be a seed node for our example
444 programs.
445
446 We bind the seed node to port 4040 on all interfaces:
447
448 aemp profile seed binds "*:4040"
449
450 And we configure all nodes to use this as seed node (this only works when
451 running on the same host, for multiple machines you would replace the C<*>
452 by the IP address or hostname of the node running the seed), by changing
453 the global settings shared between all profiles:
454
455 aemp seeds "*:4040"
456
457 Then we run the seed node:
458
459 aemp run profile seed
460
461 After that, we can start as many other nodes as we want, and they will
462 all use our generic seed node to discover each other. The reason we can
463 start our existing programs even though they specify "incompatible"
464 parameters to C<configure> is that the configuration file (by default)
465 takes precedence over any arguments passed to C<configure>.
466
467 That's all for now - next we will teach you about monitoring by writing a
468 simple chat client and server :)
469
470 =head1 PART 2: Monitoring, Supervising, Exception Handling and Recovery
471
472 That's a mouthful, so what does it mean? Our previous example is what one
473 could call "very loosely coupled" - the sender doesn't care about whether
474 there are any receivers, and the receivers do not care if there is any
475 sender.
476
477 This can work fine for simple services, but most real-world applications
478 want to ensure that the side they are expecting to be there is actually
479 there. Going one step further: most bigger real-world applications even
480 want to ensure that if some component is missing, or has crashed, it will
481 still be there, by recovering and restarting the service.
482
483 AnyEvent::MP supports this by catching exceptions and network problems,
484 and notifying interested parties of these.
485
486 =head2 Exceptions, Port Context, Network Errors and Monitors
487
488 =head3 Exceptions
489
490 Exceptions are handled on a per-port basis: all receive callbacks are
491 executed in a special context, the so-called I<port-context>: code
492 that throws an otherwise uncaught exception will cause the port to be
493 C<kil>led. Killed ports are destroyed automatically (killing ports is
494 actually the only way to free ports).
495
496 Ports can be monitored, even from a different node and host, and when a
497 port is killed, any entity monitoring it will be notified.
498
499 Here is a simple example:
500
501 use AnyEvent::MP;
502
503 # create a port, it always dies
504 my $port = port { die "oops" };
505
506 # monitor it
507 mon $port, sub {
508 warn "$port was killed (with reason @_)";
509 };
510
511 # now send it some message, causing it to die:
512 snd $port;
513
514 AnyEvent->condvar->recv;
515
516 It first creates a port whose only action is to throw an exception,
517 and the monitors it with the C<mon> function. Afterwards it sends it a
518 message, causing it to die and call the monitoring callback:
519
520 anon/6WmIpj.a was killed (with reason die oops at xxx line 5.) at xxx line 9.
521
522 The callback was actually passed two arguments: C<die>, to indicate it
523 did throw an I<exception> as opposed to, say, a network error, and the
524 exception message itself.
525
526 What happens when a port is killed before we have a chance to monitor
527 it? Granted, this is highly unlikely in our example, but when you program
528 in a network this can easily happen due to races between nodes.
529
530 use AnyEvent::MP;
531
532 my $port = port { die "oops" };
533
534 snd $port;
535
536 mon $port, sub {
537 warn "$port was killed (with reason @_)";
538 };
539
540 AnyEvent->condvar->recv;
541
542 This time we will get something else:
543
544 2012-03-21 00:50:36 <2> unmonitored local port fADb died with reason: die oops at - line 3.
545 anon/fADb was killed (with reason no_such_port cannot monitor nonexistent port)
546
547 The first line is a warning that is printed when a port dies that isn't
548 being monitored, because that is normally a bug. When later a C<mon> is
549 attempted, it is immediately killed, because the port is already gone. The
550 kill reason is now C<no_such_port> with some descriptive (we hope) error
551 message.
552
553 As you probably suspect from these examples, the kill reason is usually
554 some identifier as first argument and a human-readable error message as
555 second argument - all kill reasons by AnyEvent::MP itself follow this
556 pattern. But the kill reason can be anything: it is simply a list of
557 values you can choose yourself. It can even be nothing (an empty list) -
558 this is called a "normal" kill.
559
560 Apart from die'ing, you can kill ports manually using the C<kil>
561 function. Using the C<kil> function will be treated like an error when a
562 non-empty reason is specified:
563
564 kil $port, custom_error => "don't like your steenking face";
565
566 And a I<normal> kill without any reason arguments:
567
568 kil $port;
569
570 By now you probably wonder what this "normal" kill business is: A common
571 idiom is to not specify a callback to C<mon>, but another port, such as
572 C<$SELF>:
573
574 mon $port, $SELF;
575
576 This basically means "monitor $port and kill me when it crashes" - and
577 the thing is, a "normal" kill does not count as a crash. This way you can
578 easily link ports together and make them crash together on errors, while
579 allowing you to remove a port silently when it has done it's job properly.
580
581 =head3 Port Context
582
583 Code runs in the so-called "port context". That means C<$SELF> contains
584 its own port ID and exceptions that the code throws will be caught.
585
586 Since AnyEvent::MP is event-based, it is not uncommon to register
587 callbacks from within C<rcv> handlers. As example, assume that the
588 following port receive handler wants to C<die> a second later, using
589 C<after>:
590
591 my $port = port {
592 after 1, sub { die "oops" };
593 };
594
595 If you try this out, you would find it does not work - when the C<after>
596 callback is executed, it does not run in the port context anymore, so
597 exceptions will not be caught.
598
599 For these cases, AnyEvent::MP exports a special "closure constructor"
600 called C<psub>, which works mostly like perl's built-in C<sub>:
601
602 my $port = port {
603 after 1, psub { die "oops" };
604 };
605
606 C<psub> remembers the port context and returns a code reference. When the
607 code reference is invoked, it will run the code block within the context
608 that it was created in, so exception handling once more works as expected.
609
610 There is even a way to temporarily execute code in the context of some
611 port, namely C<peval>:
612
613 peval $port, sub {
614 # die'ing here will kil $port
615 };
616
617 The C<peval> function temporarily replaces C<$SELF> by the given C<$port>
618 and then executes the given sub in a port context.
619
620 =head3 Network Errors and the AEMP Guarantee
621
622 Earlier we mentioned another important source of monitoring failures:
623 network problems. When a node loses connection to another node, it will
624 invoke all monitoring actions, just as if the port was killed, I<even if
625 it is possible that the port is still happily alive on another node> (not
626 being able to talk to a node means we have no clue what's going on with
627 it, it could be crashed, but also still running without knowing we lost
628 the connection).
629
630 So another way to view monitors is: "notify me when some of my messages
631 couldn't be delivered". AEMP has a guarantee about message delivery to a
632 port: After starting a monitor, any message sent to a port will either
633 be delivered, or, when it is lost, any further messages will also be lost
634 until the monitoring action is invoked. After that, further messages
635 I<might> get delivered again.
636
637 This doesn't sound like a very big guarantee, but it is kind of the best
638 you can get while staying sane: Specifically, it means that there will be
639 no "holes" in the message sequence: all messages sent are delivered in
640 order, without any of them missing in between, and when some were lost,
641 you I<will> be notified of that, so you can take recovery action.
642
643 And, obviously, the guarantee only works in the presence of
644 correctly-working hardware, and no relevant bugs inside AEMP itself.
645
646 =head3 Supervising
647
648 OK, so how is this crashing-everything-stuff going to make applications
649 I<more> stable? Well, in fact, the goal is not really to make them
650 more stable, but to make them more resilient against actual errors
651 and crashes. And this is not done by crashing I<everything>, but by
652 crashing everything except a I<supervisor> that then cleans up and sgtarts
653 everything again.
654
655 A supervisor is simply some code that ensures that an application (or a
656 part of it) is running, and if it crashes, is restarted properly. That is,
657 it supervises a service by starting and restarting it, as necessary.
658
659 To show how to do all this we will create a simple chat server that can
660 handle many chat clients. Both server and clients can be killed and
661 restarted, and even crash, to some extent, without disturbing the chat
662 functionality.
663
664 =head2 Chatting, the Resilient Way
665
666 Without further ado, here is the chat server (to run it, we assume the
667 set-up explained earlier, with a separate F<aemp run seed> node):
668
669 use common::sense;
670 use AnyEvent::MP;
671 use AnyEvent::MP::Global;
672
673 configure;
674
675 my %clients;
676
677 sub msg {
678 print "relaying: $_[0]\n";
679 snd $_, $_[0]
680 for values %clients;
681 }
682
683 our $server = port;
684
685 rcv $server, join => sub {
686 my ($client, $nick) = @_;
687
688 $clients{$client} = $client;
689
690 mon $client, sub {
691 delete $clients{$client};
692 msg "$nick (quits, @_)";
693 };
694 msg "$nick (joins)";
695 };
696
697 rcv $server, privmsg => sub {
698 my ($nick, $msg) = @_;
699 msg "$nick: $msg";
700 };
701
702 db_set eg_chat_server => $server;
703
704 warn "server ready.\n";
705
706 AnyEvent->condvar->recv;
707
708 Looks like a lot, but it is actually quite simple: after your usual
709 preamble (this time we use common sense), we define a helper function that
710 sends some message to every registered chat client:
711
712 sub msg {
713 print "relaying: $_[0]\n";
714 snd $_, $_[0]
715 for values %clients;
716 }
717
718 The clients are stored in the hash C<%client>. Then we define a server
719 port and install two receivers on it, C<join>, which is sent by clients
720 to join the chat, and C<privmsg>, that clients use to send actual chat
721 messages.
722
723 C<join> is most complicated. It expects the client port and the nickname
724 to be passed in the message, and registers the client in C<%clients>.
725
726 rcv $server, join => sub {
727 my ($client, $nick) = @_;
728
729 $clients{$client} = $client;
730
731 The next step is to monitor the client. The monitoring action removes the
732 client and sends a quit message with the error to all remaining clients.
733
734 mon $client, sub {
735 delete $clients{$client};
736 msg "$nick (quits, @_)";
737 };
738
739 And finally, it creates a join message and sends it to all clients.
740
741 msg "$nick (joins)";
742 };
743
744 The C<privmsg> callback simply broadcasts the message to all clients:
745
746 rcv $server, privmsg => sub {
747 my ($nick, $msg) = @_;
748 msg "$nick: $msg";
749 };
750
751 And finally, the server registers itself in the server group, so that
752 clients can find it:
753
754 db_set eg_chat_server => $server;
755
756 Well, well... and where is this supervisor stuff? Well... we cheated,
757 it's not there. To not overcomplicate the example, we only put it into
758 the..... CLIENT!
759
760 =head3 The Client, and a Supervisor!
761
762 Again, here is the client, including supervisor, which makes it a bit
763 longer:
764
765 use common::sense;
766 use AnyEvent::MP;
767
768 my $nick = shift || "anonymous";
769
770 configure;
771
772 my ($client, $server);
773
774 sub server_connect {
775 my $db_mon;
776 $db_mon = db_mon eg_chat_server => sub {
777 return unless %{ $_[0] };
778 undef $db_mon;
779
780 print "\rconnecting...\n";
781
782 $client = port { print "\r \r@_\n> " };
783 mon $client, sub {
784 print "\rdisconnected @_\n";
785 &server_connect;
786 };
787
788 $server = (keys %{ $_[0] })[0];
789
790 snd $server, join => $client, $nick;
791 mon $server, $client;
792 };
793 }
794
795 server_connect;
796
797 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
798 chomp (my $line = <STDIN>);
799 print "> ";
800 snd $server, privmsg => $nick, $line
801 if $server;
802 });
803
804 $| = 1;
805 print "> ";
806 AnyEvent->condvar->recv;
807
808 The first thing the client does is to store the nick name (which is
809 expected as the only command line argument) in C<$nick>, for further
810 usage.
811
812 The next relevant thing is... finally... the supervisor:
813
814 sub server_connect {
815 my $db_mon;
816 $db_mon = db_mon eg_chat_server => sub {
817 return unless %{ $_[0] };
818 undef $db_mon; # stop monitoring
819
820 This monitors the C<eg_chat_server> database family. It waits until a
821 chat server becomes available. When that happens, it "connects" to it
822 by creating a client port that receives and prints chat messages, and
823 monitoring it:
824
825 $client = port { print "\r \r@_\n> " };
826 mon $client, sub {
827 print "\rdisconnected @_\n";
828 &server_connect;
829 };
830
831 If the client port dies (for whatever reason), the "supervisor" will start
832 looking for a server again - the semantics of C<db_mon> ensure that it
833 will immediately find it if there is a server port.
834
835 After this, everything is ready: the client will send a C<join> message
836 with its local port to the server, and start monitoring it:
837
838 $server = (keys %{ $_[0] })[0];
839
840 snd $server, join => $client, $nick;
841 mon $server, $client;
842 }
843
844 This second monitor will ensure that, when the server port crashes or goes
845 away (e.g. due to network problems), the client port will be killed as
846 well. This tells the user that the client was disconnected, and will then
847 start to connect the server again.
848
849 The rest of the program deals with the boring details of actually invoking
850 the supervisor function to start the whole client process and handle the
851 actual terminal input, sending it to the server.
852
853 Now... the "supervisor" in this example is a bit of a cheat - it doesn't
854 really clean up much (because the cleanup done by AnyEvent::MP suffices),
855 and there isn't much of a restarting action either - if the server isn't
856 there because it crashed, well, it isn't there.
857
858 In the real world, one would often add a timeout that would trigger when
859 the server couldn't be found within some time limit, and then complain,
860 or even try to start a new server. Or the supervisor would have to do
861 some real cleanups, such as rolling back database transactions when the
862 database thread crashes. For this simple chat server, however, this simple
863 supervisor works fine. Hopefully future versions of AnyEvent::MP will
864 offer some predefined supervisors, for now you will have to code it on
865 your own.
866
867 You should now try to start the server and one or more clients in different
868 terminal windows (and the seed node):
869
870 perl eg/chat_client nick1
871 perl eg/chat_client nick2
872 perl eg/chat_server
873 aemp run profile seed
874
875 And then you can experiment with chatting, killing one or more clients, or
876 stopping and restarting the server, to see the monitoring in action.
877
878 The crucial point you should understand from this example is that
879 monitoring is usually symmetric: when you monitor some other port,
880 potentially on another node, that other port usually should monitor you,
881 too, so when the connection dies, both ports get killed, or at least both
882 sides can take corrective action. Exceptions are "servers" that serve
883 multiple clients at once and might only wish to clean up, and supervisors,
884 who of course should not normally get killed (unless they, too, have a
885 supervisor).
886
887 If you often think in object-oriented terms, then you can think of a port
888 as an object: C<port> is the constructor, the receive callbacks set by
889 C<rcv> act as methods, the C<kil> function becomes the explicit destructor
890 and C<mon> installs a destructor hook. Unlike conventional object oriented
891 programming, it can make sense to exchange port IDs more freely (for
892 example, to monitor one port from another), because it is cheap to send
893 port IDs over the network, and AnyEvent::MP blurs the distinction between
894 local and remote ports.
895
896 Lastly, there is ample room for improvement in this example: the server
897 should probably remember the nickname in the C<join> handler instead of
898 expecting it in every chat message, it should probably monitor itself, and
899 the client should not try to send any messages unless a server is actually
900 connected.
901
902 =head1 PART 3: TIMTOWTDI: Virtual Connections
903
904 The chat system developed in the previous sections is very "traditional"
905 in a way: you start some server(s) and some clients statically and they
906 start talking to each other.
907
908 Sometimes applications work more like "services": They can run on almost
909 any node and even talk to copies of themselves on other nodes in case they
910 are distributed. The L<AnyEvent::MP::Global> service for example monitors
911 nodes joining the network and sometimes even starts itself on other nodes.
912
913 One good way to design such services is to put them into a module and
914 create "virtual connections" to other nodes. We call this the "bridge
915 head" method, because you start by I<creating a remote port> (the bridge
916 head) and from that you start to bootstrap your application.
917
918 Since that sounds rather theoretical, let us redesign the chat server and
919 client using this design method.
920
921 As usual, we start with the full program - here is the server:
922
923 use common::sense;
924 use AnyEvent::MP;
925
926 configure;
927
928 db_set eg_chat_server2 => $NODE;
929
930 my %clients;
931
932 sub msg {
933 print "relaying: $_[0]\n";
934 snd $_, $_[0]
935 for values %clients;
936 }
937
938 sub client_connect {
939 my ($client, $nick) = @_;
940
941 mon $client;
942 mon $client, psub {
943 delete $clients{$client};
944 msg "$nick (quits, @_)";
945 };
946
947 $clients{$client} = $client;
948
949 msg "$nick (joins)";
950
951 rcv $SELF, sub { msg "$nick: $_[0]" };
952 }
953
954 warn "server ready.\n";
955
956 AnyEvent->condvar->recv;
957
958 It starts out not much different then the previous example, except that
959 this time, we register the node port in the database and not a port we
960 created - the clients only want to know which node the server should
961 be running on, and there can only be one such server (or service) per
962 node. In fact, the clients could also use some kind of election mechanism,
963 to find the node with lowest node ID, or lowest load, or something like
964 that.
965
966 The much more interesting difference to the previous server is that
967 indeed no server port is created - the server consists only of code,
968 and "does" nothing by itself. All it "does" is to define a function
969 named C<client_connect>, which expects a client port and a nick name as
970 arguments. It then monitors the client port and binds a receive callback
971 on C<$SELF>, which expects messages that in turn are broadcast to all
972 clients.
973
974 The two C<mon> calls are a bit tricky - the first C<mon> is a shorthand
975 for C<mon $client, $SELF>. The second does the normal "client has gone
976 away" clean-up action.
977
978 The last line, the C<rcv $SELF>, is a good hint that something interesting
979 is going on. And indeed, when looking at the client code, you can see a
980 new function, C<spawn>:
981 #todo#
982
983 use common::sense;
984 use AnyEvent::MP;
985 use AnyEvent::MP::Global;
986
987 my $nick = shift;
988
989 configure;
990
991 $| = 1;
992
993 my $port = port;
994
995 my ($client, $server);
996
997 sub server_connect {
998 my $servernodes = grp_get "eg_chat_server2"
999 or return after 1, \&server_connect;
1000
1001 print "\rconnecting...\n";
1002
1003 $client = port { print "\r \r@_\n> " };
1004 mon $client, sub {
1005 print "\rdisconnected @_\n";
1006 &server_connect;
1007 };
1008
1009 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
1010 mon $server, $client;
1011 }
1012
1013 server_connect;
1014
1015 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
1016 chomp (my $line = <STDIN>);
1017 print "> ";
1018 snd $server, $line
1019 if $server;
1020 });
1021
1022 print "> ";
1023 AnyEvent->condvar->recv;
1024
1025 The client is quite similar to the previous one, but instead of contacting
1026 the server I<port> (which no longer exists), it C<spawn>s (creates) a new
1027 the server I<port on node>:
1028
1029 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
1030 mon $server, $client;
1031
1032 And of course the first thing after creating it is monitoring it.
1033
1034 Phew, let's go through this in slow motion: the C<spawn> function creates
1035 a new port on a remote node and returns its port ID. After creating
1036 the port it calls a function on the remote node, passing any remaining
1037 arguments to it, and - most importantly - executes the function within
1038 the context of the new port, so it can be manipulated by referring to
1039 C<$SELF>. The init function can reside in a module (actually it normally
1040 I<should> reside in a module) - AnyEvent::MP will automatically load the
1041 module if the function isn't defined.
1042
1043 The C<spawn> function returns immediately, which means you can instantly
1044 send messages to the port, long before the remote node has even heard
1045 of our request to create a port on it. In fact, the remote node might
1046 not even be running. Despite these troubling facts, everything should
1047 work just fine: if the node isn't running (or the init function throws an
1048 exception), then the monitor will trigger because the port doesn't exist.
1049
1050 If the spawn message gets delivered, but the monitoring message is not
1051 because of network problems (extremely unlikely, but monitoring, after
1052 all, is implemented by passing a message, and messages can get lost), then
1053 this connection loss will eventually trigger the monitoring action. On the
1054 remote node (which in return monitors the client) the port will also be
1055 cleaned up on connection loss. When the remote node comes up again and our
1056 monitoring message can be delivered, it will instantly fail because the
1057 port has been cleaned up in the meantime.
1058
1059 If your head is spinning by now, that's fine - just keep in mind, after
1060 creating a port using C<spawn>, monitor it on the local node, and monitor
1061 "the other side" from the remote node, and all will be cleaned up just
1062 fine.
1063
1064 =head2 Services
1065
1066 Above it was mentioned that C<spawn> automatically loads modules. This can
1067 be exploited in various useful ways.
1068
1069 Assume for a moment you put the server into a file called
1070 F<mymod/chatserver.pm> reachable from the current directory. Then you
1071 could run a node there with:
1072
1073 aemp run
1074
1075 The other nodes could C<spawn> the server by using
1076 C<mymod::chatserver::client_connect> as init function - without any other
1077 configuration.
1078
1079 Likewise, when you have some service that starts automatically when loaded
1080 (similar to AnyEvent::MP::Global), then you can configure this service
1081 statically:
1082
1083 aemp profile mysrvnode services mymod::service::
1084 aemp run profile mysrvnode
1085
1086 And the module will automatically be loaded in the node, as specifying a
1087 module name (with C<::>-suffix) will simply load the module, which is then
1088 free to do whatever it wants.
1089
1090 Of course, you can also do it in the much more standard way by writing
1091 a module (e.g. C<BK::Backend::IRC>), installing it as part of a module
1092 distribution and then configure nodes. For example, if I wanted to run the
1093 Bummskraut IRC backend on a machine named "ruth", I could do this:
1094
1095 aemp profile ruth addservice BK::Backend::IRC::
1096
1097 And any F<aemp run> on that host will automatically have the Bummskraut
1098 IRC backend running.
1099
1100 There are plenty of possibilities you can use - it's all up to you how you
1101 structure your application.
1102
1103 =head1 PART 4: Coro::MP - selective receive
1104
1105 Not all problems lend themselves naturally to an event-based solution:
1106 sometimes things are easier if you can decide in what order you want to
1107 receive messages, regardless of the order in which they were sent.
1108
1109 In these cases, L<Coro::MP> can provide a nice solution: instead of
1110 registering callbacks for each message type, C<Coro::MP> attaches a
1111 (coro-) thread to a port. The thread can then opt to selectively receive
1112 messages it is interested in. Other messages are not lost, but queued, and
1113 can be received at a later time.
1114
1115 The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate
1116 module. It is, however, tightly integrated into C<AnyEvent::MP> - the
1117 ports it creates are fully compatible to C<AnyEvent::MP> ports.
1118
1119 In fact, C<Coro::MP> is more of an extension than a separate module: all
1120 functions exported by C<AnyEvent::MP> are exported by it as well.
1121
1122 To illustrate how programing with C<Coro::MP> looks like, consider the
1123 following (slightly contrived) example: Let's implement a server that
1124 accepts a C<< (write_file =>, $port, $path) >> message with a (source)
1125 port and a filename, followed by as many C<< (data => $port, $data) >>
1126 messages as required to fill the file, followed by an empty C<< (data =>
1127 $port) >> message.
1128
1129 The server only writes a single file at a time, other requests will stay
1130 in the queue until the current file has been finished.
1131
1132 Here is an example implementation that uses L<Coro::AIO> and largely
1133 ignores error handling:
1134
1135 my $ioserver = port_async {
1136 while () {
1137 my ($tag, $port, $path) = get_cond;
1138
1139 $tag eq "write_file"
1140 or die "only write_file messages expected";
1141
1142 my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1143 or die "$path: $!";
1144
1145 while () {
1146 my (undef, undef, $data) = get_cond {
1147 $_[0] eq "data" && $_[1] eq $port
1148 } 5
1149 or die "timeout waiting for data message from $port\n";
1150
1151 length $data or last;
1152
1153 aio_write $fh, undef, undef, $data, 0;
1154 };
1155 }
1156 };
1157
1158 mon $ioserver, sub {
1159 warn "ioserver was killed: @_\n";
1160 };
1161
1162 Let's go through it, section by section.
1163
1164 my $ioserver = port_async {
1165
1166 Ports can be created by attaching a thread to an existing port via
1167 C<rcv_async>, or as in this example, by calling C<port_async> with the
1168 code to execute as a thread. The C<async> component comes from the fact
1169 that threads are created using the C<Coro::async> function.
1170
1171 The thread runs in a normal port context (so C<$SELF> is set). In
1172 addition, when the thread returns, it will be C<kil> I<normally>, i.e.
1173 without a reason argument.
1174
1175 while () {
1176 my ($tag, $port, $path) = get_cond;
1177 or die "only write_file messages expected";
1178
1179 The thread is supposed to serve many file writes, which is why it
1180 executes in a loop. The first thing it does is fetch the next message,
1181 using C<get_cond>, the "conditional message get". Without arguments, it
1182 merely fetches the I<next> message from the queue, which I<must> be a
1183 C<write_file> message.
1184
1185 The message contains the C<$path> to the file, which is then created:
1186
1187 my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1188 or die "$path: $!";
1189
1190 Then we enter a loop again, to serve as many C<data> messages as
1191 necessary:
1192
1193 while () {
1194 my (undef, undef, $data) = get_cond {
1195 $_[0] eq "data" && $_[1] eq $port
1196 } 5
1197 or die "timeout waiting for data message from $port\n";
1198
1199 This time, the condition is not empty, but instead a code block: similarly
1200 to grep, the code block will be called with C<@_> set to each message in
1201 the queue, and it has to return whether it wants to receive the message or
1202 not.
1203
1204 In this case we are interested in C<data> messages (C<< $_[0] eq "data"
1205 >>), whose first element is the source port (C<< $_[1] eq $port >>).
1206
1207 The condition must be this strict, as it is possible to receive both
1208 C<write_file> messages and C<data> messages from other ports while we
1209 handle the file writing.
1210
1211 The lone C<5> argument at the end is a timeout - when no matching message
1212 is received within C<5> seconds, we assume an error and C<die>.
1213
1214 When an empty C<data> message is received we are done and can close the
1215 file (which is done automatically as C<$fh> goes out of scope):
1216
1217 length $data or last;
1218
1219 Otherwise we need to write the data:
1220
1221 aio_write $fh, undef, undef, $data, 0;
1222
1223 And that's basically it. Note that every port thread should have some
1224 kind of supervisor. In our case, the supervisor simply prints any error
1225 message:
1226
1227 mon $ioserver, sub {
1228 warn "ioserver was killed: @_\n";
1229 };
1230
1231 Here is a usage example:
1232
1233 port_async {
1234 snd $ioserver, write_file => $SELF, "/tmp/unsafe";
1235 snd $ioserver, data => $SELF, "abc\n";
1236 snd $ioserver, data => $SELF, "def\n";
1237 snd $ioserver, data => $SELF;
1238 };
1239
1240 The messages are sent without any flow control or acknowledgement (feel
1241 free to improve). Also, the source port does not actually need to be a
1242 port - any unique ID will do - but port identifiers happen to be a simple
1243 source of network-wide unique IDs.
1244
1245 Apart from C<get_cond> as seen above, there are other ways to receive
1246 messages. The C<write_file> message above could also selectively be
1247 received using a C<get> call:
1248
1249 my ($port, $path) = get "write_file";
1250
1251 This is simpler, but when some other code part sends an unexpected message
1252 to the C<$ioserver> it will stay in the queue forever. As a rule of thumb,
1253 every threaded port should have a "fetch next message unconditionally"
1254 somewhere, to avoid filling up the queue.
1255
1256 Finally, it is also possible to use more switch-like C<get_conds>:
1257
1258 get_cond {
1259 $_[0] eq "msg1" and return sub {
1260 my (undef, @msg1_data) = @_;
1261 ...;
1262 };
1263
1264 $_[0] eq "msg2" and return sub {
1265 my (undef, @msg2_data) = @_;
1266 ...;
1267 };
1268
1269 die "unexpected message $_[0] received";
1270 };
1271
1272 =head1 THE END
1273
1274 This is the end of this introduction, but hopefully not the end of
1275 your career as AEMP user. I hope the tutorial was enough to make the
1276 basic concepts clear. Keep in mind that distributed programming is not
1277 completely trivial, in fact, it's pretty complicated. We hope AEMP makes
1278 it simpler and will be useful to create exciting new applications.
1279
1280 =head1 SEE ALSO
1281
1282 L<AnyEvent::MP>
1283
1284 L<AnyEvent::MP::Global>
1285
1286 L<Coro::MP>
1287
1288 L<AnyEvent>
1289
1290 =head1 AUTHOR
1291
1292 Robin Redeker <elmex@ta-sa.org>
1293 Marc Lehmann <schmorp@schmorp.de>
1294