ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Intro.pod
Revision: 1.55
Committed: Thu Mar 22 00:48:29 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.54: +16 -12 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 Message Passing for the Non-Blocked Mind
2
3 =head1 Introduction and Terminology
4
5 This is a tutorial about how to get the swing of the new L<AnyEvent::MP>
6 module, which allows programs to transparently pass messages within the
7 process and to other processes on the same or a different host.
8
9 What kind of messages? Basically a message here means a list of Perl
10 strings, numbers, hashes and arrays, anything that can be expressed as a
11 L<JSON> text (as JSON is the default serialiser in the protocol). Here are
12 two examples:
13
14 write_log => 1251555874, "action was successful.\n"
15 123, ["a", "b", "c"], { foo => "bar" }
16
17 When using L<AnyEvent::MP> it is customary to use a descriptive string as
18 first element of a message that indicates the type of the message. This
19 element is called a I<tag> in L<AnyEvent::MP>, as some API functions
20 (C<rcv>) support matching it directly.
21
22 Supposedly you want to send a ping message with your current time to
23 somewhere, this is how such a message might look like (in Perl syntax):
24
25 ping => 1251381636
26
27 Now that we know what a message is, to which entities are those
28 messages being I<passed>? They are I<passed> to I<ports>. A I<port> is
29 a destination for messages but also a context to execute code: when
30 a runtime error occurs while executing code belonging to a port, the
31 exception will be raised on the port and can even travel to interested
32 parties on other nodes, which makes supervision of distributed processes
33 easy.
34
35 How do these ports relate to things you know? Each I<port> belongs
36 to a I<node>, and a I<node> is just the UNIX process that runs your
37 L<AnyEvent::MP> application.
38
39 Each I<node> is distinguished from other I<nodes> running on the same or
40 another host in a network by its I<node ID>. A I<node ID> is simply a
41 unique string chosen manually or assigned by L<AnyEvent::MP> in some way
42 (UNIX nodename, random string...).
43
44 Here is a diagram about how I<nodes>, I<ports> and UNIX processes relate
45 to each other. The setup consists of two nodes (more are of course
46 possible): Node C<A> (in UNIX process 7066) with the ports C<ABC> and
47 C<DEF>. And the node C<B> (in UNIX process 8321) with the ports C<FOO> and
48 C<BAR>.
49
50
51 |- PID: 7066 -| |- PID: 8321 -|
52 | | | |
53 | Node ID: A | | Node ID: B |
54 | | | |
55 | Port ABC =|= <----\ /-----> =|= Port FOO |
56 | | X | |
57 | Port DEF =|= <----/ \-----> =|= Port BAR |
58 | | | |
59 |-------------| |-------------|
60
61 The strings for the I<port IDs> here are just for illustrative
62 purposes: Even though I<ports> in L<AnyEvent::MP> are also identified by
63 strings, they can't be chosen manually and are assigned by the system
64 dynamically. These I<port IDs> are unique within a network and can also be
65 used to identify senders, or even as message tags for instance.
66
67 The next sections will explain the API of L<AnyEvent::MP> by going through
68 a few simple examples. Later some more complex idioms are introduced,
69 which are hopefully useful to solve some real world problems.
70
71 =head2 Passing Your First Message
72
73 For starters, let's have a look at the messaging API. The following
74 example is just a demo to show the basic elements of message passing with
75 L<AnyEvent::MP>.
76
77 The example should print: C<Ending with: 123>, in a rather complicated
78 way, by passing some message to a port.
79
80 use AnyEvent;
81 use AnyEvent::MP;
82
83 my $end_cv = AnyEvent->condvar;
84
85 my $port = port;
86
87 rcv $port, test => sub {
88 my ($data) = @_;
89 $end_cv->send ($data);
90 };
91
92 snd $port, test => 123;
93
94 print "Ending with: " . $end_cv->recv . "\n";
95
96 It already uses most of the essential functions inside
97 L<AnyEvent::MP>: First there is the C<port> function which creates a
98 I<port> and will return it's I<port ID>, a simple string.
99
100 This I<port ID> can be used to send messages to the port and install
101 handlers to receive messages on the port. Since it is a simple string
102 it can be safely passed to other I<nodes> in the network when you want
103 to refer to that specific port (usually used for RPC, where you need
104 to tell the other end which I<port> to send the reply to - messages in
105 L<AnyEvent::MP> have a destination, but no source).
106
107 The next function is C<rcv>:
108
109 rcv $port, test => sub { ... };
110
111 It installs a receiver callback on the I<port> that specified as the first
112 argument (it only works for "local" ports, i.e. ports created on the same
113 node). The next argument, in this example C<test>, specifies a I<tag> to
114 match. This means that whenever a message with the first element being
115 the string C<test> is received, the callback is called with the remaining
116 parts of that message.
117
118 Messages can be sent with the C<snd> function, which is used like this in
119 the example above:
120
121 snd $port, test => 123;
122
123 This will send the message C<'test', 123> to the I<port> with the I<port
124 ID> stored in C<$port>. Since in this case the receiver has a I<tag> match
125 on C<test> it will call the callback with the first argument being the
126 number C<123>.
127
128 The callback is a typical AnyEvent idiom: the callback just passes
129 that number on to the I<condition variable> C<$end_cv> which will then
130 pass the value to the print. Condition variables are out of the scope
131 of this tutorial and not often used with ports, so please consult the
132 L<AnyEvent::Intro> about them.
133
134 Passing messages inside just one process is boring. Before we can move on
135 and do interprocess message passing we first have to make sure some things
136 have been set up correctly for our nodes to talk to each other.
137
138 =head2 System Requirements and System Setup
139
140 Before we can start with real IPC we have to make sure some things work on
141 your system.
142
143 First we have to setup a I<shared secret>: for two L<AnyEvent::MP>
144 I<nodes> to be able to communicate with each other over the network it is
145 necessary to setup the same I<shared secret> for both of them, so they can
146 prove their trustworthyness to each other.
147
148 The easiest way is to set this up is to use the F<aemp> utility:
149
150 aemp gensecret
151
152 This creates a F<$HOME/.perl-anyevent-mp> config file and generates a
153 random shared secret. You can copy this file to any other system and
154 then communicate over the network (via TCP) with it. You can also select
155 your own shared secret (F<aemp setsecret>) and for increased security
156 requirements you can even create (or configure) a TLS certificate (F<aemp
157 gencert>), causing connections to not just be securely authenticated, but
158 also to be encrypted and protected against tinkering.
159
160 Connections will only be successfully established when the I<nodes>
161 that want to connect to each other have the same I<shared secret> (or
162 successfully verify the TLS certificate of the other side, in which case
163 no shared secret is required).
164
165 B<If something does not work as expected, and for example tcpdump shows
166 that the connections are closed almost immediately, you should make sure
167 that F<~/.perl-anyevent-mp> is the same on all hosts/user accounts that
168 you try to connect with each other!>
169
170 Thats is all for now, you will find some more advanced fiddling with the
171 C<aemp> utility later.
172
173 =head2 Shooting the Trouble
174
175 Sometimes things go wrong, and AnyEvent::MP, being a professional module,
176 does not gratuitously spill out messages to your screen.
177
178 To help troubleshooting any issues, there are two environment variables
179 that you can set. The first, C<AE_VERBOSE> sets the logging level of
180 L<AnyEvent::Log>, which AnyEvent::MP uses. The default is C<3>, which
181 means nothing much is printed. You can increase it to C<8> or C<9> to get
182 more verbose output. This is example output when starting a node (somewhat
183 abridged to get shorter lines):
184
185 2012-03-22 01:41:43.59 debug AE::Util: using Guard module to implement guards.
186 2012-03-22 01:41:43.62 debug AE::MP::Kernel: node cerebro/slwK2LEq7O starting up.
187 2012-03-22 01:41:43.62 debug AE::MP::Kernel: node listens on [10.0.0.1:52110].
188 2012-03-22 01:41:43.62 trace AE::MP::Kernel: trying connect to seed node 10.0.0.19:4040.
189 2012-03-22 01:41:43.66 trace AE::MP::Transport: 10.0.0.19:4040 connected as rain
190 2012-03-22 01:41:43.66 info AE::MP::Kernel: rain is up ()
191
192 A lot of info, but at least you can see that it does something. To only
193 get info about AnyEvent::MP, you can use C<AE_LOG=AnyEvent::MP=+log> in
194 your environment.
195
196 The other environment variable that can be useful is
197 C<PERL_ANYEVENT_MP_TRACE>, which, when set to a true value, will cause
198 most messages that are sent or received to be printed. For example, F<aemp
199 restart rijk> might output these message exchanges:
200
201 SND rijk <- [null,"eval","AnyEvent::Watchdog::Util::restart; ()","aemp/cerebro/z4kUPp2JT4#b"]
202 SND rain <- [null,"g_slave",{"'l":{"aemp/cerebro/z4kUPp2JT4":["10.0.0.1:48168"]}}]
203 SND rain <- [null,"g_find","rijk"]
204 RCV rain -> ["","g_found","rijk",["10.0.0.23:4040"]]
205 RCV rijk -> ["b",""]
206
207 =head1 PART 1: Passing Messages Between Processes
208
209 =head2 The Receiver
210
211 Lets split the previous example up into two programs: one that contains
212 the sender and one for the receiver. First the receiver application, in
213 full:
214
215 use AnyEvent;
216 use AnyEvent::MP;
217
218 configure nodeid => "eg_receiver/%u", binds => ["*:4040"];
219
220 my $port = port;
221 db_set eg_receivers => $port;
222
223 rcv $port, test => sub {
224 my ($data, $reply_port) = @_;
225
226 print "Received data: " . $data . "\n";
227 };
228
229 AnyEvent->condvar->recv;
230
231 Now, that wasn't too bad, was it? OK, let's go through the new functions
232 that have been used.
233
234 =head3 C<configure> and Joining and Maintaining the Network
235
236 First let's have a look at C<configure>:
237
238 configure nodeid => "eg_receiver/%u", binds => ["*:4040"];
239
240 Before we are able to send messages to other nodes we have to initialise
241 ourself to become a "distributed node". Initialising a node means naming
242 the node and binding some TCP listeners so that other nodes can
243 contact it.
244
245 Additionally, to actually link all nodes in a network together, you can
246 specify a number of seed addresses, which will be used by the node to
247 connect itself into an existing network, as we will see shortly.
248
249 All of this (and more) can be passed to the C<configure> function - later
250 we will see how we can do all this without even passing anything to
251 C<configure>!
252
253 The first parameter, C<nodeid>, specified the node ID (in this case
254 C<eg_receiver/%u> - the default is to use the node name of the current
255 host plus C</%u>, which goves the node a name with a random suffix to
256 make it unique, but for this example we want the node to have a bit more
257 personality, and name it C<eg_receiver> with a random suffix.
258
259 Why the random suffix? Node IDs need to be unique within the network and
260 appending a random suffix is the easiest way to do that.
261
262 The second parameter, C<binds>, specifies a list of C<address:port> pairs
263 to bind TCP listeners on. The special "address" of C<*> means to bind on
264 every local IP address (this might not work on every OS, so explicit IP
265 addresses are best).
266
267 The reason to bind on a TCP port is not just that other nodes can connect
268 to us: if no binds are specified, the node will still bind on a dynamic
269 port on all local addresses - but in this case we won't know the port, and
270 cannot tell other nodes to connect to it as seed node.
271
272 Now, a I<seed> is simply the TCP address of some other node in the
273 network, often the same string as used for the C<binds> parameter of the
274 other node. The need for seeds is easy to explain: I<somehow> the nodes
275 of an aemp network have to find each other, and often this means over the
276 internet. So broadcasts are out.
277
278 Instead, a node usually specifies the addresses of a few (for redundancy)
279 other nodes, some of which should be up. Two nodes can set each other as
280 seeds without any issues. You could even specify all nodes as seeds for
281 all nodes, for total redundancy. But the common case is to have some more
282 or less central, stable servers running seed services for other nodes.
283
284 All you need to do to ensure that an AnyEvent::MP network connects
285 together is to make sure that all seed nodes are connected together via
286 their seed connections, i.e., all connections from seed nodes to I<their>
287 seed nodes form a connected graph. It's not necessary (but common) for a
288 seed node to list all other seed nodes as seeds. The rest of the nodes in
289 the network simply specify one or more of the seed nodes in their seed
290 list.
291
292 The simplest way to do that would be for all nodes to specify a single
293 node as seed node, and you would get a star topology. If you specify all
294 nodes as seed nodes, you get a fully meshed network (that's what previous
295 releases of AnyEvent::MP actually did).
296
297 A node tries to keep connections open to all of it's seed nodes at all
298 times, while other connections are made on demand only.
299
300 All of this ensures that the network stays one network - even if all the
301 nodes in one half of the net are separated from the nodes in the other
302 half by some network problem, once that is over, they will eventually
303 become a single network again.
304
305 In addition to creating the network, a node also expects the seed nodes to
306 run the shared database service - if need be, by automatically starting
307 it, so you don't normally need to configure this explicitly.
308
309 The process of joining a network takes time, during which the node
310 is already running. This means it takes time until the node is
311 fully connected, and information about services in the network are
312 available. This is why most AnyEvent::MP programs either just register
313 themselves in the database and wait to be "found" by others, or they start
314 to monitor the database until some nodes of the required type show up.
315
316 We will see how this is done later, in the sender program.
317
318 =head3 Registering the Receiver
319
320 Coming back to our example, after the node has been configured for network
321 access, it is time to publish some service, namely the receive service.
322
323 For that, let's look at the next lines:
324
325 my $port = port;
326 db_set eg_receivers => $port;
327
328 The C<port> function has already been discussed. It simply creates a new
329 I<port> and returns the I<port ID>. The C<db_set> function, however, is
330 new: The first argument is the name of a I<database family> and the second
331 argument is the name of a I<subkey> within that family. The third argument
332 would be the I<value> to be associated with the family and subkey, but,
333 since it is missing, it will simply be C<undef>.
334
335 What is a "family" you wonder? Well, AnyEvent::MP comes with a distributed
336 database. This database runs on so-called "global" nodes, which usually
337 are the seed nodes of your network. The database structure is "simply" a
338 hash of hashes of values.
339
340 To illustrate this with Perl syntax, assume the database was stored in
341 C<%DB>, then the C<db_set> function more or less would do this:
342
343 $DB{eg_receivers}{$port} = undef;
344
345 So the ominous "family" selects a hash in the database, and the "subkey"
346 is simply the key in this hash - C<db_set> very much works like this
347 assignment.
348
349 The family namespace is shared by all nodes in a network, so the names
350 should be reasonably unique, for example, they could start with the name
351 of your module, or the name of the program, using your port name or node
352 name as subkey.
353
354 The purpose behind adding this key to the database is that the sender can
355 look it up and find our port. We will shortly see how.
356
357 The last step in the example is to set up a receiver callback for those
358 messages, just as was discussed in the first example. We again match
359 for the tag C<test>. The difference is that this time we don't exit the
360 application after receiving the first message. Instead we continue to wait
361 for new messages indefinitely.
362
363 =head2 The Sender
364
365 OK, now let's take a look at the sender code:
366
367 use AnyEvent;
368 use AnyEvent::MP;
369
370 configure nodeid => "eg_sender/%u", seeds => ["*:4040"];
371
372 my $guard = db_mon eg_receivers => sub {
373 my ($family, $a, $c, $d) = @_;
374 return unless %$family;
375
376 # now there are some receivers, send them a message
377 snd $_ => test => time
378 for keys %$family;
379 };
380
381 AnyEvent->condvar->recv;
382
383 It's even less code. The C<configure> serves the same purpose as in the
384 receiver, but instead of specifying binds we specify a list of seeds - the
385 only seed happens to be the same as the bind used by the receiver, which
386 therefore becomes our seed node.
387
388 Remember the part about having to wait till things become available? Well,
389 after configure returns, nothing has been done yet - the node is not
390 connected to the network, knows nothing about the database contents, and
391 it can take ages (for a computer :) for this situation to change.
392
393 Therefore, the sender waits, in this case by using the C<db_mon>
394 function. This function registers an interest in a specific database
395 family (in this case C<eg_receivers>). Each time something inside the
396 family changes (a key is added, changed or deleted), it will call our
397 callback with the family hash as first argument, and the list of keys as
398 second argument.
399
400 The callback only checks whether the C<%$family> has is empty - if it is,
401 then it doesn't do anything. But eventually the family will contain the
402 port subkey we set in the sender. Then it will send a message to it (and
403 any other receiver in the same family). Likewise, should the receiver go
404 away and come back, or should another receiver come up, it will again send
405 a message to all of them.
406
407 You can experiment by having multiple receivers - you have to change the
408 "binds" parameter in the receiver to the seeds used in the sender to start
409 up additional receivers, but then you can start as many as you like. If
410 you specify proper IP addresses for the seeds, you can even run them on
411 different computers.
412
413 Each time you start the sender, it will send a message to all receivers it
414 finds (you have to interrupt it manually afterwards).
415
416 Additional experiments you could try include using
417 C<PERL_ANYEVENT_MP_TRACE=1> to see which messages are exchanged, or
418 starting the sender before the receiver and see how long it then takes to
419 find the receiver.
420
421 =head3 Splitting Network Configuration and Application Code
422
423 OK, so far, this works reasonably. In the real world, however, the person
424 configuring your application to run on a specific network (the end user
425 or network administrator) is often different to the person coding the
426 application.
427
428 Or to put it differently: the arguments passed to configure are usually
429 provided not by the programmer, but by whoever is deploying the program -
430 even in the example above, we would like to be able to just start senders
431 and receivers without having to patch the programs.
432
433 To make this easy, AnyEvent::MP supports a simple configuration database,
434 using profiles, which can be managed using the F<aemp> command-line
435 utility (yes, this section is about the advanced tinkering mentioned
436 before).
437
438 When you change both programs above to simply call
439
440 configure;
441
442 then AnyEvent::MP tries to look up a profile using the current node name
443 in its configuration database, falling back to some global default.
444
445 You can run "generic" nodes using the F<aemp> utility as well, and we will
446 exploit this in the following way: we configure a profile "seed" and run
447 a node using it, whose sole purpose is to be a seed node for our example
448 programs.
449
450 We bind the seed node to port 4040 on all interfaces:
451
452 aemp profile seed binds "*:4040"
453
454 And we configure all nodes to use this as seed node (this only works when
455 running on the same host, for multiple machines you would replace the C<*>
456 by the IP address or hostname of the node running the seed), by changing
457 the global settings shared between all profiles:
458
459 aemp seeds "*:4040"
460
461 Then we run the seed node:
462
463 aemp run profile seed
464
465 After that, we can start as many other nodes as we want, and they will
466 all use our generic seed node to discover each other. The reason we can
467 start our existing programs even though they specify "incompatible"
468 parameters to C<configure> is that the configuration file (by default)
469 takes precedence over any arguments passed to C<configure>.
470
471 That's all for now - next we will teach you about monitoring by writing a
472 simple chat client and server :)
473
474 =head1 PART 2: Monitoring, Supervising, Exception Handling and Recovery
475
476 That's a mouthful, so what does it mean? Our previous example is what one
477 could call "very loosely coupled" - the sender doesn't care about whether
478 there are any receivers, and the receivers do not care if there is any
479 sender.
480
481 This can work fine for simple services, but most real-world applications
482 want to ensure that the side they are expecting to be there is actually
483 there. Going one step further: most bigger real-world applications even
484 want to ensure that if some component is missing, or has crashed, it will
485 still be there, by recovering and restarting the service.
486
487 AnyEvent::MP supports this by catching exceptions and network problems,
488 and notifying interested parties of these.
489
490 =head2 Exceptions, Port Context, Network Errors and Monitors
491
492 =head3 Exceptions
493
494 Exceptions are handled on a per-port basis: all receive callbacks are
495 executed in a special context, the so-called I<port-context>: code
496 that throws an otherwise uncaught exception will cause the port to be
497 C<kil>led. Killed ports are destroyed automatically (killing ports is
498 actually the only way to free ports).
499
500 Ports can be monitored, even from a different node and host, and when a
501 port is killed, any entity monitoring it will be notified.
502
503 Here is a simple example:
504
505 use AnyEvent::MP;
506
507 # create a port, it always dies
508 my $port = port { die "oops" };
509
510 # monitor it
511 mon $port, sub {
512 warn "$port was killed (with reason @_)";
513 };
514
515 # now send it some message, causing it to die:
516 snd $port;
517
518 AnyEvent->condvar->recv;
519
520 It first creates a port whose only action is to throw an exception,
521 and the monitors it with the C<mon> function. Afterwards it sends it a
522 message, causing it to die and call the monitoring callback:
523
524 anon/6WmIpj.a was killed (with reason die oops at xxx line 5.) at xxx line 9.
525
526 The callback was actually passed two arguments: C<die>, to indicate it
527 did throw an I<exception> as opposed to, say, a network error, and the
528 exception message itself.
529
530 What happens when a port is killed before we have a chance to monitor
531 it? Granted, this is highly unlikely in our example, but when you program
532 in a network this can easily happen due to races between nodes.
533
534 use AnyEvent::MP;
535
536 my $port = port { die "oops" };
537
538 snd $port;
539
540 mon $port, sub {
541 warn "$port was killed (with reason @_)";
542 };
543
544 AnyEvent->condvar->recv;
545
546 This time we will get something else:
547
548 2012-03-21 00:50:36 <2> unmonitored local port fADb died with reason: die oops at - line 3.
549 anon/fADb was killed (with reason no_such_port cannot monitor nonexistent port)
550
551 The first line is a warning that is printed when a port dies that isn't
552 being monitored, because that is normally a bug. When later a C<mon> is
553 attempted, it is immediately killed, because the port is already gone. The
554 kill reason is now C<no_such_port> with some descriptive (we hope) error
555 message.
556
557 As you probably suspect from these examples, the kill reason is usually
558 some identifier as first argument and a human-readable error message as
559 second argument - all kill reasons by AnyEvent::MP itself follow this
560 pattern. But the kill reason can be anything: it is simply a list of
561 values you can choose yourself. It can even be nothing (an empty list) -
562 this is called a "normal" kill.
563
564 Apart from die'ing, you can kill ports manually using the C<kil>
565 function. Using the C<kil> function will be treated like an error when a
566 non-empty reason is specified:
567
568 kil $port, custom_error => "don't like your steenking face";
569
570 And a I<normal> kill without any reason arguments:
571
572 kil $port;
573
574 By now you probably wonder what this "normal" kill business is: A common
575 idiom is to not specify a callback to C<mon>, but another port, such as
576 C<$SELF>:
577
578 mon $port, $SELF;
579
580 This basically means "monitor $port and kill me when it crashes" - and
581 the thing is, a "normal" kill does not count as a crash. This way you can
582 easily link ports together and make them crash together on errors, while
583 allowing you to remove a port silently when it has done it's job properly.
584
585 =head3 Port Context
586
587 Code runs in the so-called "port context". That means C<$SELF> contains
588 its own port ID and exceptions that the code throws will be caught.
589
590 Since AnyEvent::MP is event-based, it is not uncommon to register
591 callbacks from within C<rcv> handlers. As example, assume that the
592 following port receive handler wants to C<die> a second later, using
593 C<after>:
594
595 my $port = port {
596 after 1, sub { die "oops" };
597 };
598
599 If you try this out, you would find it does not work - when the C<after>
600 callback is executed, it does not run in the port context anymore, so
601 exceptions will not be caught.
602
603 For these cases, AnyEvent::MP exports a special "closure constructor"
604 called C<psub>, which works mostly like perl's built-in C<sub>:
605
606 my $port = port {
607 after 1, psub { die "oops" };
608 };
609
610 C<psub> remembers the port context and returns a code reference. When the
611 code reference is invoked, it will run the code block within the context
612 that it was created in, so exception handling once more works as expected.
613
614 There is even a way to temporarily execute code in the context of some
615 port, namely C<peval>:
616
617 peval $port, sub {
618 # die'ing here will kil $port
619 };
620
621 The C<peval> function temporarily replaces C<$SELF> by the given C<$port>
622 and then executes the given sub in a port context.
623
624 =head3 Network Errors and the AEMP Guarantee
625
626 Earlier we mentioned another important source of monitoring failures:
627 network problems. When a node loses connection to another node, it will
628 invoke all monitoring actions, just as if the port was killed, I<even if
629 it is possible that the port is still happily alive on another node> (not
630 being able to talk to a node means we have no clue what's going on with
631 it, it could be crashed, but also still running without knowing we lost
632 the connection).
633
634 So another way to view monitors is: "notify me when some of my messages
635 couldn't be delivered". AEMP has a guarantee about message delivery to a
636 port: After starting a monitor, any message sent to a port will either
637 be delivered, or, when it is lost, any further messages will also be lost
638 until the monitoring action is invoked. After that, further messages
639 I<might> get delivered again.
640
641 This doesn't sound like a very big guarantee, but it is kind of the best
642 you can get while staying sane: Specifically, it means that there will be
643 no "holes" in the message sequence: all messages sent are delivered in
644 order, without any of them missing in between, and when some were lost,
645 you I<will> be notified of that, so you can take recovery action.
646
647 And, obviously, the guarantee only works in the presence of
648 correctly-working hardware, and no relevant bugs inside AEMP itself.
649
650 =head3 Supervising
651
652 OK, so how is this crashing-everything-stuff going to make applications
653 I<more> stable? Well, in fact, the goal is not really to make them
654 more stable, but to make them more resilient against actual errors
655 and crashes. And this is not done by crashing I<everything>, but by
656 crashing everything except a I<supervisor> that then cleans up and sgtarts
657 everything again.
658
659 A supervisor is simply some code that ensures that an application (or a
660 part of it) is running, and if it crashes, is restarted properly. That is,
661 it supervises a service by starting and restarting it, as necessary.
662
663 To show how to do all this we will create a simple chat server that can
664 handle many chat clients. Both server and clients can be killed and
665 restarted, and even crash, to some extent, without disturbing the chat
666 functionality.
667
668 =head2 Chatting, the Resilient Way
669
670 Without further ado, here is the chat server (to run it, we assume the
671 set-up explained earlier, with a separate F<aemp run seed> node):
672
673 use common::sense;
674 use AnyEvent::MP;
675 use AnyEvent::MP::Global;
676
677 configure;
678
679 my %clients;
680
681 sub msg {
682 print "relaying: $_[0]\n";
683 snd $_, $_[0]
684 for values %clients;
685 }
686
687 our $server = port;
688
689 rcv $server, join => sub {
690 my ($client, $nick) = @_;
691
692 $clients{$client} = $client;
693
694 mon $client, sub {
695 delete $clients{$client};
696 msg "$nick (quits, @_)";
697 };
698 msg "$nick (joins)";
699 };
700
701 rcv $server, privmsg => sub {
702 my ($nick, $msg) = @_;
703 msg "$nick: $msg";
704 };
705
706 db_set eg_chat_server => $server;
707
708 warn "server ready.\n";
709
710 AnyEvent->condvar->recv;
711
712 Looks like a lot, but it is actually quite simple: after your usual
713 preamble (this time we use common sense), we define a helper function that
714 sends some message to every registered chat client:
715
716 sub msg {
717 print "relaying: $_[0]\n";
718 snd $_, $_[0]
719 for values %clients;
720 }
721
722 The clients are stored in the hash C<%client>. Then we define a server
723 port and install two receivers on it, C<join>, which is sent by clients
724 to join the chat, and C<privmsg>, that clients use to send actual chat
725 messages.
726
727 C<join> is most complicated. It expects the client port and the nickname
728 to be passed in the message, and registers the client in C<%clients>.
729
730 rcv $server, join => sub {
731 my ($client, $nick) = @_;
732
733 $clients{$client} = $client;
734
735 The next step is to monitor the client. The monitoring action removes the
736 client and sends a quit message with the error to all remaining clients.
737
738 mon $client, sub {
739 delete $clients{$client};
740 msg "$nick (quits, @_)";
741 };
742
743 And finally, it creates a join message and sends it to all clients.
744
745 msg "$nick (joins)";
746 };
747
748 The C<privmsg> callback simply broadcasts the message to all clients:
749
750 rcv $server, privmsg => sub {
751 my ($nick, $msg) = @_;
752 msg "$nick: $msg";
753 };
754
755 And finally, the server registers itself in the server group, so that
756 clients can find it:
757
758 db_set eg_chat_server => $server;
759
760 Well, well... and where is this supervisor stuff? Well... we cheated,
761 it's not there. To not overcomplicate the example, we only put it into
762 the..... CLIENT!
763
764 =head3 The Client, and a Supervisor!
765
766 Again, here is the client, including supervisor, which makes it a bit
767 longer:
768
769 use common::sense;
770 use AnyEvent::MP;
771
772 my $nick = shift || "anonymous";
773
774 configure;
775
776 my ($client, $server);
777
778 sub server_connect {
779 my $db_mon;
780 $db_mon = db_mon eg_chat_server => sub {
781 return unless %{ $_[0] };
782 undef $db_mon;
783
784 print "\rconnecting...\n";
785
786 $client = port { print "\r \r@_\n> " };
787 mon $client, sub {
788 print "\rdisconnected @_\n";
789 &server_connect;
790 };
791
792 $server = (keys %{ $_[0] })[0];
793
794 snd $server, join => $client, $nick;
795 mon $server, $client;
796 };
797 }
798
799 server_connect;
800
801 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
802 chomp (my $line = <STDIN>);
803 print "> ";
804 snd $server, privmsg => $nick, $line
805 if $server;
806 });
807
808 $| = 1;
809 print "> ";
810 AnyEvent->condvar->recv;
811
812 The first thing the client does is to store the nick name (which is
813 expected as the only command line argument) in C<$nick>, for further
814 usage.
815
816 The next relevant thing is... finally... the supervisor:
817
818 sub server_connect {
819 my $db_mon;
820 $db_mon = db_mon eg_chat_server => sub {
821 return unless %{ $_[0] };
822 undef $db_mon; # stop monitoring
823
824 This monitors the C<eg_chat_server> database family. It waits until a
825 chat server becomes available. When that happens, it "connects" to it
826 by creating a client port that receives and prints chat messages, and
827 monitoring it:
828
829 $client = port { print "\r \r@_\n> " };
830 mon $client, sub {
831 print "\rdisconnected @_\n";
832 &server_connect;
833 };
834
835 If the client port dies (for whatever reason), the "supervisor" will start
836 looking for a server again - the semantics of C<db_mon> ensure that it
837 will immediately find it if there is a server port.
838
839 After this, everything is ready: the client will send a C<join> message
840 with its local port to the server, and start monitoring it:
841
842 $server = (keys %{ $_[0] })[0];
843
844 snd $server, join => $client, $nick;
845 mon $server, $client;
846 }
847
848 This second monitor will ensure that, when the server port crashes or goes
849 away (e.g. due to network problems), the client port will be killed as
850 well. This tells the user that the client was disconnected, and will then
851 start to connect the server again.
852
853 The rest of the program deals with the boring details of actually invoking
854 the supervisor function to start the whole client process and handle the
855 actual terminal input, sending it to the server.
856
857 Now... the "supervisor" in this example is a bit of a cheat - it doesn't
858 really clean up much (because the cleanup done by AnyEvent::MP suffices),
859 and there isn't much of a restarting action either - if the server isn't
860 there because it crashed, well, it isn't there.
861
862 In the real world, one would often add a timeout that would trigger when
863 the server couldn't be found within some time limit, and then complain,
864 or even try to start a new server. Or the supervisor would have to do
865 some real cleanups, such as rolling back database transactions when the
866 database thread crashes. For this simple chat server, however, this simple
867 supervisor works fine. Hopefully future versions of AnyEvent::MP will
868 offer some predefined supervisors, for now you will have to code it on
869 your own.
870
871 You should now try to start the server and one or more clients in different
872 terminal windows (and the seed node):
873
874 perl eg/chat_client nick1
875 perl eg/chat_client nick2
876 perl eg/chat_server
877 aemp run profile seed
878
879 And then you can experiment with chatting, killing one or more clients, or
880 stopping and restarting the server, to see the monitoring in action.
881
882 The crucial point you should understand from this example is that
883 monitoring is usually symmetric: when you monitor some other port,
884 potentially on another node, that other port usually should monitor you,
885 too, so when the connection dies, both ports get killed, or at least both
886 sides can take corrective action. Exceptions are "servers" that serve
887 multiple clients at once and might only wish to clean up, and supervisors,
888 who of course should not normally get killed (unless they, too, have a
889 supervisor).
890
891 If you often think in object-oriented terms, then you can think of a port
892 as an object: C<port> is the constructor, the receive callbacks set by
893 C<rcv> act as methods, the C<kil> function becomes the explicit destructor
894 and C<mon> installs a destructor hook. Unlike conventional object oriented
895 programming, it can make sense to exchange port IDs more freely (for
896 example, to monitor one port from another), because it is cheap to send
897 port IDs over the network, and AnyEvent::MP blurs the distinction between
898 local and remote ports.
899
900 Lastly, there is ample room for improvement in this example: the server
901 should probably remember the nickname in the C<join> handler instead of
902 expecting it in every chat message, it should probably monitor itself, and
903 the client should not try to send any messages unless a server is actually
904 connected.
905
906 =head1 PART 3: TIMTOWTDI: Virtual Connections
907
908 The chat system developed in the previous sections is very "traditional"
909 in a way: you start some server(s) and some clients statically and they
910 start talking to each other.
911
912 Sometimes applications work more like "services": They can run on almost
913 any node and even talk to copies of themselves on other nodes in case they
914 are distributed. The L<AnyEvent::MP::Global> service for example monitors
915 nodes joining the network and sometimes even starts itself on other nodes.
916
917 One good way to design such services is to put them into a module and
918 create "virtual connections" to other nodes. We call this the "bridge
919 head" method, because you start by I<creating a remote port> (the bridge
920 head) and from that you start to bootstrap your application.
921
922 Since that sounds rather theoretical, let us redesign the chat server and
923 client using this design method.
924
925 As usual, we start with the full program - here is the server:
926
927 use common::sense;
928 use AnyEvent::MP;
929
930 configure;
931
932 db_set eg_chat_server2 => $NODE;
933
934 my %clients;
935
936 sub msg {
937 print "relaying: $_[0]\n";
938 snd $_, $_[0]
939 for values %clients;
940 }
941
942 sub client_connect {
943 my ($client, $nick) = @_;
944
945 mon $client;
946 mon $client, psub {
947 delete $clients{$client};
948 msg "$nick (quits, @_)";
949 };
950
951 $clients{$client} = $client;
952
953 msg "$nick (joins)";
954
955 rcv $SELF, sub { msg "$nick: $_[0]" };
956 }
957
958 warn "server ready.\n";
959
960 AnyEvent->condvar->recv;
961
962 It starts out not much different then the previous example, except that
963 this time, we register the node port in the database and not a port we
964 created - the clients only want to know which node the server should
965 be running on, and there can only be one such server (or service) per
966 node. In fact, the clients could also use some kind of election mechanism,
967 to find the node with lowest node ID, or lowest load, or something like
968 that.
969
970 The much more interesting difference to the previous server is that
971 indeed no server port is created - the server consists only of code,
972 and "does" nothing by itself. All it "does" is to define a function
973 named C<client_connect>, which expects a client port and a nick name as
974 arguments. It then monitors the client port and binds a receive callback
975 on C<$SELF>, which expects messages that in turn are broadcast to all
976 clients.
977
978 The two C<mon> calls are a bit tricky - the first C<mon> is a shorthand
979 for C<mon $client, $SELF>. The second does the normal "client has gone
980 away" clean-up action.
981
982 The last line, the C<rcv $SELF>, is a good hint that something interesting
983 is going on. And indeed, when looking at the client code, you can see a
984 new function, C<spawn>:
985 #todo#
986
987 use common::sense;
988 use AnyEvent::MP;
989 use AnyEvent::MP::Global;
990
991 my $nick = shift;
992
993 configure;
994
995 $| = 1;
996
997 my $port = port;
998
999 my ($client, $server);
1000
1001 sub server_connect {
1002 my $servernodes = grp_get "eg_chat_server2"
1003 or return after 1, \&server_connect;
1004
1005 print "\rconnecting...\n";
1006
1007 $client = port { print "\r \r@_\n> " };
1008 mon $client, sub {
1009 print "\rdisconnected @_\n";
1010 &server_connect;
1011 };
1012
1013 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
1014 mon $server, $client;
1015 }
1016
1017 server_connect;
1018
1019 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
1020 chomp (my $line = <STDIN>);
1021 print "> ";
1022 snd $server, $line
1023 if $server;
1024 });
1025
1026 print "> ";
1027 AnyEvent->condvar->recv;
1028
1029 The client is quite similar to the previous one, but instead of contacting
1030 the server I<port> (which no longer exists), it C<spawn>s (creates) a new
1031 the server I<port on node>:
1032
1033 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
1034 mon $server, $client;
1035
1036 And of course the first thing after creating it is monitoring it.
1037
1038 Phew, let's go through this in slow motion: the C<spawn> function creates
1039 a new port on a remote node and returns its port ID. After creating
1040 the port it calls a function on the remote node, passing any remaining
1041 arguments to it, and - most importantly - executes the function within
1042 the context of the new port, so it can be manipulated by referring to
1043 C<$SELF>. The init function can reside in a module (actually it normally
1044 I<should> reside in a module) - AnyEvent::MP will automatically load the
1045 module if the function isn't defined.
1046
1047 The C<spawn> function returns immediately, which means you can instantly
1048 send messages to the port, long before the remote node has even heard
1049 of our request to create a port on it. In fact, the remote node might
1050 not even be running. Despite these troubling facts, everything should
1051 work just fine: if the node isn't running (or the init function throws an
1052 exception), then the monitor will trigger because the port doesn't exist.
1053
1054 If the spawn message gets delivered, but the monitoring message is not
1055 because of network problems (extremely unlikely, but monitoring, after
1056 all, is implemented by passing a message, and messages can get lost), then
1057 this connection loss will eventually trigger the monitoring action. On the
1058 remote node (which in return monitors the client) the port will also be
1059 cleaned up on connection loss. When the remote node comes up again and our
1060 monitoring message can be delivered, it will instantly fail because the
1061 port has been cleaned up in the meantime.
1062
1063 If your head is spinning by now, that's fine - just keep in mind, after
1064 creating a port using C<spawn>, monitor it on the local node, and monitor
1065 "the other side" from the remote node, and all will be cleaned up just
1066 fine.
1067
1068 =head2 Services
1069
1070 Above it was mentioned that C<spawn> automatically loads modules. This can
1071 be exploited in various useful ways.
1072
1073 Assume for a moment you put the server into a file called
1074 F<mymod/chatserver.pm> reachable from the current directory. Then you
1075 could run a node there with:
1076
1077 aemp run
1078
1079 The other nodes could C<spawn> the server by using
1080 C<mymod::chatserver::client_connect> as init function - without any other
1081 configuration.
1082
1083 Likewise, when you have some service that starts automatically when loaded
1084 (similar to AnyEvent::MP::Global), then you can configure this service
1085 statically:
1086
1087 aemp profile mysrvnode services mymod::service::
1088 aemp run profile mysrvnode
1089
1090 And the module will automatically be loaded in the node, as specifying a
1091 module name (with C<::>-suffix) will simply load the module, which is then
1092 free to do whatever it wants.
1093
1094 Of course, you can also do it in the much more standard way by writing
1095 a module (e.g. C<BK::Backend::IRC>), installing it as part of a module
1096 distribution and then configure nodes. For example, if I wanted to run the
1097 Bummskraut IRC backend on a machine named "ruth", I could do this:
1098
1099 aemp profile ruth addservice BK::Backend::IRC::
1100
1101 And any F<aemp run> on that host will automatically have the Bummskraut
1102 IRC backend running.
1103
1104 There are plenty of possibilities you can use - it's all up to you how you
1105 structure your application.
1106
1107 =head1 PART 4: Coro::MP - selective receive
1108
1109 Not all problems lend themselves naturally to an event-based solution:
1110 sometimes things are easier if you can decide in what order you want to
1111 receive messages, regardless of the order in which they were sent.
1112
1113 In these cases, L<Coro::MP> can provide a nice solution: instead of
1114 registering callbacks for each message type, C<Coro::MP> attaches a
1115 (coro-) thread to a port. The thread can then opt to selectively receive
1116 messages it is interested in. Other messages are not lost, but queued, and
1117 can be received at a later time.
1118
1119 The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate
1120 module. It is, however, tightly integrated into C<AnyEvent::MP> - the
1121 ports it creates are fully compatible to C<AnyEvent::MP> ports.
1122
1123 In fact, C<Coro::MP> is more of an extension than a separate module: all
1124 functions exported by C<AnyEvent::MP> are exported by it as well.
1125
1126 To illustrate how programing with C<Coro::MP> looks like, consider the
1127 following (slightly contrived) example: Let's implement a server that
1128 accepts a C<< (write_file =>, $port, $path) >> message with a (source)
1129 port and a filename, followed by as many C<< (data => $port, $data) >>
1130 messages as required to fill the file, followed by an empty C<< (data =>
1131 $port) >> message.
1132
1133 The server only writes a single file at a time, other requests will stay
1134 in the queue until the current file has been finished.
1135
1136 Here is an example implementation that uses L<Coro::AIO> and largely
1137 ignores error handling:
1138
1139 my $ioserver = port_async {
1140 while () {
1141 my ($tag, $port, $path) = get_cond;
1142
1143 $tag eq "write_file"
1144 or die "only write_file messages expected";
1145
1146 my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1147 or die "$path: $!";
1148
1149 while () {
1150 my (undef, undef, $data) = get_cond {
1151 $_[0] eq "data" && $_[1] eq $port
1152 } 5
1153 or die "timeout waiting for data message from $port\n";
1154
1155 length $data or last;
1156
1157 aio_write $fh, undef, undef, $data, 0;
1158 };
1159 }
1160 };
1161
1162 mon $ioserver, sub {
1163 warn "ioserver was killed: @_\n";
1164 };
1165
1166 Let's go through it, section by section.
1167
1168 my $ioserver = port_async {
1169
1170 Ports can be created by attaching a thread to an existing port via
1171 C<rcv_async>, or as in this example, by calling C<port_async> with the
1172 code to execute as a thread. The C<async> component comes from the fact
1173 that threads are created using the C<Coro::async> function.
1174
1175 The thread runs in a normal port context (so C<$SELF> is set). In
1176 addition, when the thread returns, it will be C<kil> I<normally>, i.e.
1177 without a reason argument.
1178
1179 while () {
1180 my ($tag, $port, $path) = get_cond;
1181 or die "only write_file messages expected";
1182
1183 The thread is supposed to serve many file writes, which is why it
1184 executes in a loop. The first thing it does is fetch the next message,
1185 using C<get_cond>, the "conditional message get". Without arguments, it
1186 merely fetches the I<next> message from the queue, which I<must> be a
1187 C<write_file> message.
1188
1189 The message contains the C<$path> to the file, which is then created:
1190
1191 my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1192 or die "$path: $!";
1193
1194 Then we enter a loop again, to serve as many C<data> messages as
1195 necessary:
1196
1197 while () {
1198 my (undef, undef, $data) = get_cond {
1199 $_[0] eq "data" && $_[1] eq $port
1200 } 5
1201 or die "timeout waiting for data message from $port\n";
1202
1203 This time, the condition is not empty, but instead a code block: similarly
1204 to grep, the code block will be called with C<@_> set to each message in
1205 the queue, and it has to return whether it wants to receive the message or
1206 not.
1207
1208 In this case we are interested in C<data> messages (C<< $_[0] eq "data"
1209 >>), whose first element is the source port (C<< $_[1] eq $port >>).
1210
1211 The condition must be this strict, as it is possible to receive both
1212 C<write_file> messages and C<data> messages from other ports while we
1213 handle the file writing.
1214
1215 The lone C<5> argument at the end is a timeout - when no matching message
1216 is received within C<5> seconds, we assume an error and C<die>.
1217
1218 When an empty C<data> message is received we are done and can close the
1219 file (which is done automatically as C<$fh> goes out of scope):
1220
1221 length $data or last;
1222
1223 Otherwise we need to write the data:
1224
1225 aio_write $fh, undef, undef, $data, 0;
1226
1227 And that's basically it. Note that every port thread should have some
1228 kind of supervisor. In our case, the supervisor simply prints any error
1229 message:
1230
1231 mon $ioserver, sub {
1232 warn "ioserver was killed: @_\n";
1233 };
1234
1235 Here is a usage example:
1236
1237 port_async {
1238 snd $ioserver, write_file => $SELF, "/tmp/unsafe";
1239 snd $ioserver, data => $SELF, "abc\n";
1240 snd $ioserver, data => $SELF, "def\n";
1241 snd $ioserver, data => $SELF;
1242 };
1243
1244 The messages are sent without any flow control or acknowledgement (feel
1245 free to improve). Also, the source port does not actually need to be a
1246 port - any unique ID will do - but port identifiers happen to be a simple
1247 source of network-wide unique IDs.
1248
1249 Apart from C<get_cond> as seen above, there are other ways to receive
1250 messages. The C<write_file> message above could also selectively be
1251 received using a C<get> call:
1252
1253 my ($port, $path) = get "write_file";
1254
1255 This is simpler, but when some other code part sends an unexpected message
1256 to the C<$ioserver> it will stay in the queue forever. As a rule of thumb,
1257 every threaded port should have a "fetch next message unconditionally"
1258 somewhere, to avoid filling up the queue.
1259
1260 Finally, it is also possible to use more switch-like C<get_conds>:
1261
1262 get_cond {
1263 $_[0] eq "msg1" and return sub {
1264 my (undef, @msg1_data) = @_;
1265 ...;
1266 };
1267
1268 $_[0] eq "msg2" and return sub {
1269 my (undef, @msg2_data) = @_;
1270 ...;
1271 };
1272
1273 die "unexpected message $_[0] received";
1274 };
1275
1276 =head1 THE END
1277
1278 This is the end of this introduction, but hopefully not the end of
1279 your career as AEMP user. I hope the tutorial was enough to make the
1280 basic concepts clear. Keep in mind that distributed programming is not
1281 completely trivial, in fact, it's pretty complicated. We hope AEMP makes
1282 it simpler and will be useful to create exciting new applications.
1283
1284 =head1 SEE ALSO
1285
1286 L<AnyEvent::MP>
1287
1288 L<AnyEvent::MP::Global>
1289
1290 L<Coro::MP>
1291
1292 L<AnyEvent>
1293
1294 =head1 AUTHOR
1295
1296 Robin Redeker <elmex@ta-sa.org>
1297 Marc Lehmann <schmorp@schmorp.de>
1298