ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Intro.pod
Revision: 1.62
Committed: Sun Aug 28 15:45:31 2016 UTC (7 years, 8 months ago) by root
Branch: MAIN
CVS Tags: rel-2_02, rel-2_01, rel-2_0, HEAD
Changes since 1.61: +4 -4 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 Message Passing for the Non-Blocked Mind
2
3 =head1 Introduction and Terminology
4
5 This is a tutorial about how to get the swing of the L<AnyEvent::MP>
6 module family, which allows processes to transparently pass messages to
7 itself and to other processes on the same or a different host.
8
9 What kind of messages? Basically a message here means a list of Perl
10 strings, numbers, hashes and arrays, anything that can be expressed as a
11 L<JSON> text (as JSON is the default serialiser in the protocol). Here are
12 two examples:
13
14 write_log => 1251555874, "action was successful.\n"
15 123, ["a", "b", "c"], { foo => "bar" }
16
17 When using L<AnyEvent::MP> it is customary to use a descriptive string as
18 first element of a message that indicates the type of the message. This
19 element is called a I<tag> in L<AnyEvent::MP>, as some API functions
20 (C<rcv>) support matching it directly.
21
22 Supposedly you want to send some kind of ping message with your current
23 time to somewhere, this is how such a message might look like (in Perl
24 syntax):
25
26 ping => 1251381636
27
28 Now that we know what a message is, to which entities are those
29 messages being I<passed>? They are I<passed> to I<ports>. A I<port> is
30 a destination for messages but also a context to execute code: when
31 a runtime error occurs while executing code belonging to a port, the
32 exception will be raised on the port and can even travel to interested
33 parties on other nodes, which makes supervision of distributed processes
34 easy.
35
36 How do these ports relate to things you know? Each I<port> belongs
37 to a I<node>, and a I<node> is just the UNIX process that runs your
38 L<AnyEvent::MP> application.
39
40 Each I<node> is distinguished from other I<nodes> running on the same or
41 another host in a network by its I<node ID>. A I<node ID> is simply a
42 unique string chosen manually or assigned by L<AnyEvent::MP> in some way
43 (UNIX nodename, random string...).
44
45 Here is a diagram about how I<nodes>, I<ports> and UNIX processes relate
46 to each other. The setup consists of two nodes (more are of course
47 possible): Node C<A> (in UNIX process 7066) with the ports C<ABC> and
48 C<DEF>. And the node C<B> (in UNIX process 8321) with the ports C<FOO> and
49 C<BAR>.
50
51
52 |- PID: 7066 -| |- PID: 8321 -|
53 | | | |
54 | Node ID: A | | Node ID: B |
55 | | | |
56 | Port ABC =|= <----\ /-----> =|= Port FOO |
57 | | X | |
58 | Port DEF =|= <----/ \-----> =|= Port BAR |
59 | | | |
60 |-------------| |-------------|
61
62 The strings for the I<port IDs> here are just for illustrative
63 purposes: Even though I<ports> in L<AnyEvent::MP> are also identified by
64 strings, they can't be chosen manually and are assigned by the system
65 dynamically. These I<port IDs> are unique within a network and can also be
66 used to identify senders, or even as message tags for instance.
67
68 The next sections will explain the API of L<AnyEvent::MP> by going through
69 a few simple examples. Later some more complex idioms are introduced,
70 which are hopefully useful to solve some real world problems.
71
72 =head2 Passing Your First Message
73
74 For starters, let's have a look at the messaging API. The following
75 example is just a demo to show the basic elements of message passing with
76 L<AnyEvent::MP>.
77
78 The example should print: C<Ending with: 123>, in a rather complicated
79 way, by passing some message to a port.
80
81 use AnyEvent;
82 use AnyEvent::MP;
83
84 my $end_cv = AnyEvent->condvar;
85
86 my $port = port;
87
88 rcv $port, test => sub {
89 my ($data) = @_;
90 $end_cv->send ($data);
91 };
92
93 snd $port, test => 123;
94
95 print "Ending with: " . $end_cv->recv . "\n";
96
97 It already uses most of the essential functions inside
98 L<AnyEvent::MP>: First there is the C<port> function which creates a
99 I<port> and will return it's I<port ID>, a simple string.
100
101 This I<port ID> can be used to send messages to the port and install
102 handlers to receive messages on the port. Since it is a simple string
103 it can be safely passed to other I<nodes> in the network when you want
104 to refer to that specific port (usually used for RPC, where you need
105 to tell the other end which I<port> to send the reply to - messages in
106 L<AnyEvent::MP> have a destination, but no source).
107
108 The next function is C<rcv>:
109
110 rcv $port, test => sub { ... };
111
112 It installs a receiver callback on the I<port> that specified as the first
113 argument (it only works for "local" ports, i.e. ports created on the same
114 node). The next argument, in this example C<test>, specifies a I<tag> to
115 match. This means that whenever a message with the first element being
116 the string C<test> is received, the callback is called with the remaining
117 parts of that message.
118
119 Messages can be sent with the C<snd> function, which is used like this in
120 the example above:
121
122 snd $port, test => 123;
123
124 This will send the message C<'test', 123> to the I<port> with the I<port
125 ID> stored in C<$port>. Since in this case the receiver has a I<tag> match
126 on C<test> it will call the callback with the first argument being the
127 number C<123>.
128
129 The callback is a typical AnyEvent idiom: the callback just passes
130 that number on to the I<condition variable> C<$end_cv> which will then
131 pass the value to the print. Condition variables are out of the scope
132 of this tutorial and not often used with ports, so please consult the
133 L<AnyEvent::Intro> about them.
134
135 Passing messages inside just one process is boring. Before we can move on
136 and do interprocess message passing we first have to make sure some things
137 have been set up correctly for our nodes to talk to each other.
138
139 =head2 System Requirements and System Setup
140
141 Before we can start with real IPC we have to make sure some things work on
142 your system.
143
144 First we have to setup a I<shared secret>: for two L<AnyEvent::MP>
145 I<nodes> to be able to communicate with each other over the network it is
146 necessary to setup the same I<shared secret> for both of them, so they can
147 prove their trustworthyness to each other.
148
149 The easiest way is to set this up is to use the F<aemp> utility:
150
151 aemp gensecret
152
153 This creates a F<$HOME/.perl-anyevent-mp> config file and generates a
154 random shared secret. You can copy this file to any other system and
155 then communicate over the network (via TCP) with it. You can also select
156 your own shared secret (F<aemp setsecret>) and for increased security
157 requirements you can even create (or configure) a TLS certificate (F<aemp
158 gencert>), causing connections to not just be securely authenticated, but
159 also to be encrypted and protected against tinkering.
160
161 Connections will only be successfully established when the I<nodes>
162 that want to connect to each other have the same I<shared secret> (or
163 successfully verify the TLS certificate of the other side, in which case
164 no shared secret is required).
165
166 B<If something does not work as expected, and for example tcpdump shows
167 that the connections are closed almost immediately, you should make sure
168 that F<~/.perl-anyevent-mp> is the same on all hosts/user accounts that
169 you try to connect with each other!>
170
171 Thats is all for now, you will find some more advanced fiddling with the
172 C<aemp> utility later.
173
174 =head2 Shooting the Trouble
175
176 Sometimes things go wrong, and AnyEvent::MP, being a professional module,
177 does not gratuitously spill out messages to your screen.
178
179 To help troubleshooting any issues, there are two environment variables
180 that you can set. The first, C<AE_VERBOSE> sets the logging level of
181 L<AnyEvent::Log>, which AnyEvent::MP uses. The default is C<4>, which
182 means nothing much is printed. You can increase it to C<8> or C<9> to get
183 more verbose output. This is example output when starting a node (somewhat
184 abridged to get shorter lines):
185
186 2012-03-22 01:41:43.59 debug AE::Util: using Guard module to implement guards.
187 2012-03-22 01:41:43.62 debug AE::MP::Kernel: node cerebro/slwK2LEq7O starting up.
188 2012-03-22 01:41:43.62 debug AE::MP::Kernel: node listens on [10.0.0.1:52110].
189 2012-03-22 01:41:43.62 trace AE::MP::Kernel: trying connect to seed node 10.0.0.19:4040.
190 2012-03-22 01:41:43.66 trace AE::MP::Transport: 10.0.0.19:4040 connected as rain.
191 2012-03-22 01:41:43.66 info AE::MP::Kernel: rain is up.
192
193 A lot of info, but at least you can see that it does something. To only
194 get info about AnyEvent::MP, you can use C<AE_LOG=AnyEvent::MP=+log> in
195 your environment.
196
197 The other environment variable that can be useful is
198 C<AE_MP_TRACE>, which, when set to a true value, will cause
199 most messages that are sent or received to be printed. For example, F<aemp
200 restart rijk> might output these message exchanges:
201
202 SND rijk <- [null,"eval","AnyEvent::Watchdog::Util::restart; ()","aemp/cerebro/z4kUPp2JT4#b"]
203 SND rain <- [null,"g_slave",{"'l":{"aemp/cerebro/z4kUPp2JT4":["10.0.0.1:48168"]}}]
204 SND rain <- [null,"g_find","rijk"]
205 RCV rain -> ["","g_found","rijk",["10.0.0.23:4040"]]
206 RCV rijk -> ["b",""]
207
208 =head1 PART 1: Passing Messages Between Processes
209
210 =head2 The Receiver
211
212 Lets split the previous example up into two programs: one that contains
213 the sender and one for the receiver. First the receiver application, in
214 full:
215
216 use AnyEvent;
217 use AnyEvent::MP;
218
219 configure nodeid => "eg_receiver/%u", binds => ["*:4040"];
220
221 my $port = port;
222 db_set eg_receivers => $port;
223
224 rcv $port, test => sub {
225 my ($data, $reply_port) = @_;
226
227 print "Received data: " . $data . "\n";
228 };
229
230 AnyEvent->condvar->recv;
231
232 Now, that wasn't too bad, was it? OK, let's go through the new functions
233 that have been used.
234
235 =head3 C<configure> and Joining and Maintaining the Network
236
237 First let's have a look at C<configure>:
238
239 configure nodeid => "eg_receiver/%u", binds => ["*:4040"];
240
241 Before we are able to send messages to other nodes we have to configure
242 the node to become a "networked node". Configuring a node means naming
243 the node and binding some TCP listeners so that other nodes can contact
244 it. The choice on whether a process becomes a networked node or not must
245 be done before doing anything else with AnyEvent::MP.
246
247 Additionally, to actually link all nodes in a network together, you should
248 specify a number of seed addresses, which will be used by the node to
249 connect itself into an existing network, as we will see shortly.
250
251 All of this info (and more) can be passed to the C<configure> function -
252 later we will see how we can do all this without even passing anything to
253 C<configure>!
254
255 Back to the function call in the program: the first parameter, C<nodeid>,
256 specified the node ID (in this case C<eg_receiver/%u> - the default is to
257 use the node name of the current host plus C</%u>, which gives the node a
258 name with a random suffix to make it unique, but for this example we want
259 the node to have a bit more personality, and name it C<eg_receiver> with a
260 random suffix.
261
262 Why the random suffix? Node IDs need to be unique within the network and
263 appending a random suffix is the easiest way to do that.
264
265 The second parameter, C<binds>, specifies a list of C<address:port> pairs
266 to bind TCP listeners on. The special "address" of C<*> means to bind on
267 every local IP address (this might not work on every OS, so explicit IP
268 addresses are best).
269
270 The reason to bind on a TCP port is not just that other nodes can connect
271 to us: if no binds are specified, the node will still bind on a dynamic
272 port on all local addresses - but in this case we won't know the port, and
273 cannot tell other nodes to connect to it as seed node.
274
275 Now, a I<seed> is simply the TCP address of some other node in the
276 network, often the same string as used for the C<binds> parameter of the
277 other node. The need for seeds is easy to explain: I<somehow> the nodes
278 of an aemp network have to find each other, and often this means over the
279 internet. So broadcasts are out.
280
281 Instead, a node usually specifies the addresses of one or few (for
282 redundancy) other nodes, some of which should be up. Two nodes can set
283 each other as seeds without any issues. You could even specify all nodes
284 as seeds for all nodes, for total redundancy. But the common case is to
285 have some more or less central, stable servers running seed services for
286 other nodes.
287
288 All you need to do to ensure that an AnyEvent::MP network connects
289 together is to make sure that all seed nodes are connected together via
290 their seed connections, i.e., all connections from seed nodes to I<their>
291 seed nodes form a connected graph.
292
293 A node tries to keep connections open to all of it's seed nodes at all
294 times, while other connections are made on demand only.
295
296 The simplest way to do that would be for all nodes to use the same seed
297 nodes: seed nodes would seed each other, and all other nodes would connect
298 to the seed nodes.
299
300 All of this ensures that the network stays one network - even if all the
301 nodes in one half of the net are separated from the nodes in the other
302 half by some network problem, once that is over, they will eventually
303 become a single network again.
304
305 In addition to creating the network, a node also expects the seed nodes to
306 run the shared database service - if need be, by automatically starting
307 it, so you don't normally need to configure this explicitly.
308
309 The process of joining a network takes time, during which the node
310 is already running. This means it takes time until the node is
311 fully connected, and information about services in the network are
312 available. This is why most AnyEvent::MP programs either just register
313 themselves in the database and wait to be "found" by others, or they start
314 to monitor the database until some nodes of the required type show up.
315
316 We will see how this is done later, in the sender program.
317
318 =head3 Registering the Receiver
319
320 Coming back to our example, after the node has been configured for network
321 access, it is time to publish some service, namely the receive service.
322
323 For that, let's look at the next lines:
324
325 my $port = port;
326 db_set eg_receivers => $port;
327
328 The C<port> function has already been discussed. It simply creates a new
329 I<port> and returns the I<port ID>. The C<db_set> function, however, is
330 new: The first argument is the name of a I<database family> and the second
331 argument is the name of a I<subkey> within that family. The third argument
332 would be the I<value> to be associated with the family and subkey, but,
333 since it is missing, it will simply be C<undef>.
334
335 What is a "family" you wonder? Well, AnyEvent::MP comes with a distributed
336 database. This database runs on so-called "global" nodes, which usually
337 are the seed nodes of your network. The database structure is "simply" a
338 hash of hashes of values.
339
340 To illustrate this with Perl syntax, assume the database was stored in
341 C<%DB>, then the C<db_set> function more or less would do this:
342
343 $DB{eg_receivers}{$port} = undef;
344
345 So the ominous "family" selects a hash in the database, and the "subkey"
346 is simply the key in this hash - C<db_set> very much works like this
347 assignment.
348
349 The family namespace is shared by all nodes in a network, so the names
350 should be reasonably unique, for example, they could start with the name
351 of your module, or the name of the program, using your port name or node
352 name as subkey.
353
354 The purpose behind adding this key to the database is that the sender can
355 look it up and find our port. We will shortly see how.
356
357 The last step in the example is to set up a receiver callback for those
358 messages, just as was discussed in the first example. We again match
359 for the tag C<test>. The difference is that this time we don't exit the
360 application after receiving the first message. Instead we continue to wait
361 for new messages indefinitely.
362
363 =head2 The Sender
364
365 OK, now let's take a look at the sender code:
366
367 use AnyEvent;
368 use AnyEvent::MP;
369
370 configure nodeid => "eg_sender/%u", seeds => ["*:4040"];
371
372 my $guard = db_mon eg_receivers => sub {
373 my ($family, $a, $c, $d) = @_;
374 return unless %$family;
375
376 # now there are some receivers, send them a message
377 snd $_ => test => time
378 for keys %$family;
379 };
380
381 AnyEvent->condvar->recv;
382
383 It's even less code. The C<configure> serves the same purpose as in the
384 receiver, but instead of specifying binds we specify a list of seeds - the
385 only seed happens to be the same as the bind used by the receiver, which
386 therefore becomes our seed node.
387
388 Remember the part about having to wait till things become available? Well,
389 after configure returns, nothing has been done yet - the node is not
390 connected to the network, knows nothing about the database contents, and
391 it can take ages (for a computer :) for this situation to change.
392
393 Therefore, the sender waits, in this case by using the C<db_mon>
394 function. This function registers an interest in a specific database
395 family (in this case C<eg_receivers>). Each time something inside the
396 family changes (a key is added, changed or deleted), it will call our
397 callback with the family hash as first argument, and the list of keys as
398 second argument.
399
400 The callback only checks whether the C<%$family> hash is empty - if it is,
401 then it doesn't do anything. But eventually the family will contain the
402 port subkey we set in the sender. Then it will send a message to it (and
403 any other receiver in the same family). Likewise, should the receiver go
404 away and come back, or should another receiver come up, it will again send
405 a message to all of them.
406
407 You can experiment by having multiple receivers - you have to change the
408 "binds" parameter in the receiver to the seeds used in the sender to start
409 up additional receivers, but then you can start as many as you like. If
410 you specify proper IP addresses for the seeds, you can even run them on
411 different computers.
412
413 Each time you start the sender, it will send a message to all receivers it
414 finds (you have to interrupt it manually afterwards).
415
416 Additional experiments you could try include using C<AE_MP_TRACE=1> to see
417 which messages are exchanged, or starting the sender before the receiver
418 and see how long it then takes to find the receiver.
419
420 =head3 Splitting Network Configuration and Application Code
421
422 OK, so far, this works reasonably well. In the real world, however, the
423 person configuring your application to run on a specific network (the end
424 user or network administrator) is often different to the person coding the
425 application.
426
427 Or to put it differently: the arguments passed to configure are usually
428 provided not by the programmer, but by whoever is deploying the program -
429 even in the example above, we would like to be able to just start senders
430 and receivers without having to patch the programs.
431
432 To make this easy, AnyEvent::MP supports a simple configuration database,
433 using profiles, which can be managed using the F<aemp> command-line
434 utility (yes, this section is about the advanced tinkering mentioned
435 before).
436
437 When you change both programs above to simply call
438
439 configure;
440
441 then AnyEvent::MP tries to look up a profile using the current node name
442 in its configuration database, falling back to some global default.
443
444 You can run "generic" nodes using the F<aemp> utility as well, and we will
445 exploit this in the following way: we configure a profile "seed" and run
446 a node using it, whose sole purpose is to be a seed node for our example
447 programs.
448
449 We bind the seed node to port 4040 on all interfaces:
450
451 aemp profile seed binds "*:4040"
452
453 And we configure all nodes to use this as seed node (this only works when
454 running on the same host, for multiple machines you would replace the C<*>
455 by the IP address or hostname of the node running the seed), by changing
456 the global settings shared between all profiles:
457
458 aemp seeds "*:4040"
459
460 Then we run the seed node:
461
462 aemp run profile seed
463
464 After that, we can start as many other nodes as we want, and they will
465 all use our generic seed node to discover each other. The reason we can
466 start our existing programs even though they specify "incompatible"
467 parameters to C<configure> is that the configuration file (by default)
468 takes precedence over any arguments passed to C<configure>.
469
470 That's all for now - next we will teach you about monitoring by writing a
471 simple chat client and server :)
472
473 =head1 PART 2: Monitoring, Supervising, Exception Handling and Recovery
474
475 That's a mouthful, so what does it mean? Our previous example is what one
476 could call "very loosely coupled" - the sender doesn't care about whether
477 there are any receivers, and the receivers do not care if there is any
478 sender.
479
480 This can work fine for simple services, but most real-world applications
481 want to ensure that the side they are expecting to be there is actually
482 there. Going one step further: most bigger real-world applications even
483 want to ensure that if some component is missing, or has crashed, it will
484 still be there, by recovering and restarting the service.
485
486 AnyEvent::MP supports this by catching exceptions and network problems,
487 and notifying interested parties of these.
488
489 =head2 Exceptions, Port Context, Network Errors and Monitors
490
491 =head3 Exceptions
492
493 Exceptions are handled on a per-port basis: all receive callbacks are
494 executed in a special context, the so-called I<port-context>: code
495 that throws an otherwise uncaught exception will cause the port to be
496 C<kil>led. Killed ports are destroyed automatically (killing ports is
497 actually the only way to free ports).
498
499 Ports can be monitored, even from a different node and host, and when a
500 port is killed, any entity monitoring it will be notified.
501
502 Here is a simple example:
503
504 use AnyEvent::MP;
505
506 # create a port, it always dies
507 my $port = port { die "oops" };
508
509 # monitor it
510 mon $port, sub {
511 warn "$port was killed (with reason @_)";
512 };
513
514 # now send it some message, causing it to die:
515 snd $port;
516
517 AnyEvent->condvar->recv;
518
519 It first creates a port whose only action is to throw an exception,
520 and the monitors it with the C<mon> function. Afterwards it sends it a
521 message, causing it to die and call the monitoring callback:
522
523 anon/6WmIpj.a was killed (with reason die oops at xxx line 5.) at xxx line 9.
524
525 The callback was actually passed two arguments: C<die>, to indicate it
526 did throw an I<exception> as opposed to, say, a network error, and the
527 exception message itself.
528
529 What happens when a port is killed before we have a chance to monitor
530 it? Granted, this is highly unlikely in our example, but when you program
531 in a network this can easily happen due to races between nodes.
532
533 use AnyEvent::MP;
534
535 my $port = port { die "oops" };
536
537 snd $port;
538
539 mon $port, sub {
540 warn "$port was killed (with reason @_)";
541 };
542
543 AnyEvent->condvar->recv;
544
545 This time we will get something else:
546
547 2012-03-21 00:50:36 <2> unmonitored local port fADb died with reason: die oops at - line 3.
548 anon/fADb was killed (with reason no_such_port cannot monitor nonexistent port)
549
550 The first line is an error message that is printed when a port dies that
551 isn't being monitored, because that is normally a bug. When later a C<mon>
552 is attempted, it is immediately killed, because the port is already
553 gone. The kill reason is now C<no_such_port> with some descriptive (we
554 hope) error message.
555
556 As you probably suspect from these examples, the kill reason is usually
557 some identifier as first argument and a human-readable error message as
558 second argument - all kill reasons by AnyEvent::MP itself follow this
559 pattern. But the kill reason can be anything: it is simply a list of
560 values you can choose yourself. It can even be nothing (an empty list) -
561 this is called a "normal" kill.
562
563 Apart from die'ing, you can kill ports manually using the C<kil>
564 function. Using the C<kil> function will be treated like an error when a
565 non-empty reason is specified:
566
567 kil $port, custom_error => "don't like your steenking face";
568
569 And a I<normal> kill without any reason arguments:
570
571 kil $port;
572
573 By now you probably wonder what this "normal" kill business is: A common
574 idiom is to not specify a callback to C<mon>, but another port, such as
575 C<$SELF>:
576
577 mon $port, $SELF;
578
579 This basically means "monitor $port and kill me when it crashes" - and
580 the thing is, a "normal" kill does not count as a crash. This way you can
581 easily link ports together and make them crash together on errors, while
582 allowing you to remove a port silently when it has done it's job properly.
583
584 =head3 Port Context
585
586 Code runs in the so-called "port context". That means C<$SELF> contains
587 its own port ID and exceptions that the code throws will be caught.
588
589 Since AnyEvent::MP is event-based, it is not uncommon to register
590 callbacks from within C<rcv> handlers. As example, assume that the
591 following port receive handler wants to C<die> a second later, using
592 C<after>:
593
594 my $port = port {
595 after 1, sub { die "oops" };
596 };
597
598 If you try this out, you would find it does not work - when the C<after>
599 callback is executed, it does not run in the port context anymore, so
600 exceptions will not be caught.
601
602 For these cases, AnyEvent::MP exports a special "closure constructor"
603 called C<psub>, which works mostly like perl's built-in C<sub>:
604
605 my $port = port {
606 after 1, psub { die "oops" };
607 };
608
609 C<psub> remembers the port context and returns a code reference. When the
610 code reference is invoked, it will run the code block within the context
611 that it was created in, so exception handling once more works as expected.
612
613 There is even a way to temporarily execute code in the context of some
614 port, namely C<peval>:
615
616 peval $port, sub {
617 # die'ing here will kil $port
618 };
619
620 The C<peval> function temporarily replaces C<$SELF> by the given C<$port>
621 and then executes the given sub in a port context.
622
623 =head3 Network Errors and the AEMP Guarantee
624
625 Earlier we mentioned another important source of monitoring failures:
626 network problems. When a node loses connection to another node, it will
627 invoke all monitoring actions, just as if the port was killed, I<even if
628 it is possible that the port is still happily alive on another node> (not
629 being able to talk to a node means we have no clue what's going on with
630 it, it could be crashed, but also still running without knowing we lost
631 the connection).
632
633 So another way to view monitors is: "notify me when some of my messages
634 couldn't be delivered". AEMP has a guarantee about message delivery to a
635 port: After starting a monitor, any message sent to a port will either
636 be delivered, or, when it is lost, any further messages will also be lost
637 until the monitoring action is invoked. After that, further messages
638 I<might> get delivered again.
639
640 This doesn't sound like a very big guarantee, but it is kind of the best
641 you can get while staying sane: Specifically, it means that there will be
642 no "holes" in the message sequence: all messages sent are delivered in
643 order, without any of them missing in between, and when some were lost,
644 you I<will> be notified of that, so you can take recovery action.
645
646 And, obviously, the guarantee only works in the presence of
647 correctly-working hardware, and no relevant bugs inside AEMP itself.
648
649 =head3 Supervising
650
651 OK, so how is this crashing-everything-stuff going to make applications
652 I<more> stable? Well, in fact, the goal is not really to make them
653 more stable, but to make them more resilient against actual errors
654 and crashes. And this is not done by crashing I<everything>, but by
655 crashing everything except a I<supervisor> that then cleans up and sgtarts
656 everything again.
657
658 A supervisor is simply some code that ensures that an application (or a
659 part of it) is running, and if it crashes, is restarted properly. That is,
660 it supervises a service by starting and restarting it, as necessary.
661
662 To show how to do all this we will create a simple chat server that can
663 handle many chat clients. Both server and clients can be killed and
664 restarted, and even crash, to some extent, without disturbing the chat
665 functionality.
666
667 =head2 Chatting, the Resilient Way
668
669 Without further ado, here is the chat server (to run it, we assume the
670 set-up explained earlier, with a separate F<aemp run seed> node):
671
672 use common::sense;
673 use AnyEvent::MP;
674
675 configure;
676
677 my %clients;
678
679 sub msg {
680 print "relaying: $_[0]\n";
681 snd $_, $_[0]
682 for values %clients;
683 }
684
685 our $server = port;
686
687 rcv $server, join => sub {
688 my ($client, $nick) = @_;
689
690 $clients{$client} = $client;
691
692 mon $client, sub {
693 delete $clients{$client};
694 msg "$nick (quits, @_)";
695 };
696 msg "$nick (joins)";
697 };
698
699 rcv $server, privmsg => sub {
700 my ($nick, $msg) = @_;
701 msg "$nick: $msg";
702 };
703
704 db_set eg_chat_server => $server;
705
706 warn "server ready.\n";
707
708 AnyEvent->condvar->recv;
709
710 Looks like a lot, but it is actually quite simple: after your usual
711 preamble (this time we use common sense), we define a helper function that
712 sends some message to every registered chat client:
713
714 sub msg {
715 print "relaying: $_[0]\n";
716 snd $_, $_[0]
717 for values %clients;
718 }
719
720 The clients are stored in the hash C<%client>. Then we define a server
721 port and install two receivers on it, C<join>, which is sent by clients
722 to join the chat, and C<privmsg>, that clients use to send actual chat
723 messages.
724
725 C<join> is most complicated. It expects the client port and the nickname
726 to be passed in the message, and registers the client in C<%clients>.
727
728 rcv $server, join => sub {
729 my ($client, $nick) = @_;
730
731 $clients{$client} = $client;
732
733 The next step is to monitor the client. The monitoring action removes the
734 client and sends a quit message with the error to all remaining clients.
735
736 mon $client, sub {
737 delete $clients{$client};
738 msg "$nick (quits, @_)";
739 };
740
741 And finally, it creates a join message and sends it to all clients.
742
743 msg "$nick (joins)";
744 };
745
746 The C<privmsg> callback simply broadcasts the message to all clients:
747
748 rcv $server, privmsg => sub {
749 my ($nick, $msg) = @_;
750 msg "$nick: $msg";
751 };
752
753 And finally, the server registers itself in the server group, so that
754 clients can find it:
755
756 db_set eg_chat_server => $server;
757
758 Well, well... and where is this supervisor stuff? Well... we cheated,
759 it's not there. To not overcomplicate the example, we only put it into
760 the..... CLIENT!
761
762 =head3 The Client, and a Supervisor!
763
764 Again, here is the client, including supervisor, which makes it a bit
765 longer:
766
767 use common::sense;
768 use AnyEvent::MP;
769
770 my $nick = shift || "anonymous";
771
772 configure;
773
774 my ($client, $server);
775
776 sub server_connect {
777 my $db_mon;
778 $db_mon = db_mon eg_chat_server => sub {
779 return unless %{ $_[0] };
780 undef $db_mon;
781
782 print "\rconnecting...\n";
783
784 $client = port { print "\r \r@_\n> " };
785 mon $client, sub {
786 print "\rdisconnected @_\n";
787 &server_connect;
788 };
789
790 $server = (keys %{ $_[0] })[0];
791
792 snd $server, join => $client, $nick;
793 mon $server, $client;
794 };
795 }
796
797 server_connect;
798
799 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
800 chomp (my $line = <STDIN>);
801 print "> ";
802 snd $server, privmsg => $nick, $line
803 if $server;
804 });
805
806 $| = 1;
807 print "> ";
808 AnyEvent->condvar->recv;
809
810 The first thing the client does is to store the nick name (which is
811 expected as the only command line argument) in C<$nick>, for further
812 usage.
813
814 The next relevant thing is... finally... the supervisor:
815
816 sub server_connect {
817 my $db_mon;
818 $db_mon = db_mon eg_chat_server => sub {
819 return unless %{ $_[0] };
820 undef $db_mon; # stop monitoring
821
822 This monitors the C<eg_chat_server> database family. It waits until a
823 chat server becomes available. When that happens, it "connects" to it
824 by creating a client port that receives and prints chat messages, and
825 monitoring it:
826
827 $client = port { print "\r \r@_\n> " };
828 mon $client, sub {
829 print "\rdisconnected @_\n";
830 &server_connect;
831 };
832
833 If the client port dies (for whatever reason), the "supervisor" will start
834 looking for a server again - the semantics of C<db_mon> ensure that it
835 will immediately find it if there is a server port.
836
837 After this, everything is ready: the client will send a C<join> message
838 with its local port to the server, and start monitoring it:
839
840 $server = (keys %{ $_[0] })[0];
841
842 snd $server, join => $client, $nick;
843 mon $server, $client;
844 }
845
846 This second monitor will ensure that, when the server port crashes or goes
847 away (e.g. due to network problems), the client port will be killed as
848 well. This tells the user that the client was disconnected, and will then
849 start to connect the server again.
850
851 The rest of the program deals with the boring details of actually invoking
852 the supervisor function to start the whole client process and handle the
853 actual terminal input, sending it to the server.
854
855 Now... the "supervisor" in this example is a bit of a cheat - it doesn't
856 really clean up much (because the cleanup done by AnyEvent::MP suffices),
857 and there isn't much of a restarting action either - if the server isn't
858 there because it crashed, well, it isn't there.
859
860 In the real world, one would often add a timeout that would trigger when
861 the server couldn't be found within some time limit, and then complain,
862 or even try to start a new server. Or the supervisor would have to do
863 some real cleanups, such as rolling back database transactions when the
864 database thread crashes. For this simple chat server, however, this simple
865 supervisor works fine. Hopefully future versions of AnyEvent::MP will
866 offer some predefined supervisors, for now you will have to code it on
867 your own.
868
869 You should now try to start the server and one or more clients in different
870 terminal windows (and the seed node):
871
872 perl eg/chat_client nick1
873 perl eg/chat_client nick2
874 perl eg/chat_server
875 aemp run profile seed
876
877 And then you can experiment with chatting, killing one or more clients, or
878 stopping and restarting the server, to see the monitoring in action.
879
880 The crucial point you should understand from this example is that
881 monitoring is usually symmetric: when you monitor some other port,
882 potentially on another node, that other port usually should monitor you,
883 too, so when the connection dies, both ports get killed, or at least both
884 sides can take corrective action. Exceptions are "servers" that serve
885 multiple clients at once and might only wish to clean up, and supervisors,
886 who of course should not normally get killed (unless they, too, have a
887 supervisor).
888
889 If you often think in object-oriented terms, then you can think of a port
890 as an object: C<port> is the constructor, the receive callbacks set by
891 C<rcv> act as methods, the C<kil> function becomes the explicit destructor
892 and C<mon> installs a destructor hook. Unlike conventional object oriented
893 programming, it can make sense to exchange port IDs more freely (for
894 example, to monitor one port from another), because it is cheap to send
895 port IDs over the network, and AnyEvent::MP blurs the distinction between
896 local and remote ports.
897
898 Lastly, there is ample room for improvement in this example: the server
899 should probably remember the nickname in the C<join> handler instead of
900 expecting it in every chat message, it should probably monitor itself, and
901 the client should not try to send any messages unless a server is actually
902 connected.
903
904 =head1 PART 3: TIMTOWTDI: Virtual Connections
905
906 The chat system developed in the previous sections is very "traditional"
907 in a way: you start some server(s) and some clients statically and they
908 start talking to each other.
909
910 Sometimes applications work more like "services": They can run on almost
911 any node and even talk to copies of themselves on other nodes in case they
912 are distributed. The L<AnyEvent::MP::Global> service for example monitors
913 nodes joining the network and sometimes even starts itself on other nodes.
914
915 One good way to design such services is to put them into a module and
916 create "virtual connections" to other nodes. We call this the "bridge
917 head" method, because you start by I<creating a remote port> (the bridge
918 head) and from that you start to bootstrap your application.
919
920 Since that sounds rather theoretical, let us redesign the chat server and
921 client using this design method.
922
923 As usual, we start with the full program - here is the server:
924
925 use common::sense;
926 use AnyEvent::MP;
927
928 configure;
929
930 db_set eg_chat_server2 => $NODE;
931
932 my %clients;
933
934 sub msg {
935 print "relaying: $_[0]\n";
936 snd $_, $_[0]
937 for values %clients;
938 }
939
940 sub client_connect {
941 my ($client, $nick) = @_;
942
943 mon $client;
944 mon $client, psub {
945 delete $clients{$client};
946 msg "$nick (quits, @_)";
947 };
948
949 $clients{$client} = $client;
950
951 msg "$nick (joins)";
952
953 rcv $SELF, sub { msg "$nick: $_[0]" };
954 }
955
956 warn "server ready.\n";
957
958 AnyEvent->condvar->recv;
959
960 It starts out not much different then the previous example, except that
961 this time, we register the node port in the database and not a port we
962 created - the clients only want to know which node the server should
963 be running on, and there can only be one such server (or service) per
964 node. In fact, the clients could also use some kind of election mechanism,
965 to find the node with lowest node ID, or lowest load, or something like
966 that.
967
968 The much more interesting difference to the previous server is that
969 indeed no server port is created - the server consists only of code,
970 and "does" nothing by itself. All it "does" is to define a function
971 named C<client_connect>, which expects a client port and a nick name as
972 arguments. It then monitors the client port and binds a receive callback
973 on C<$SELF>, which expects messages that in turn are broadcast to all
974 clients.
975
976 The two C<mon> calls are a bit tricky - the first C<mon> is a shorthand
977 for C<mon $client, $SELF>. The second does the normal "client has gone
978 away" clean-up action.
979
980 The last line, the C<rcv $SELF>, is a good hint that something interesting
981 is going on. And indeed, when looking at the client code, you can see a
982 new function, C<spawn>:
983 #todo#
984
985 use common::sense;
986 use AnyEvent::MP;
987
988 my $nick = shift;
989
990 configure;
991
992 $| = 1;
993
994 my $port = port;
995
996 my ($client, $server);
997
998 sub server_connect {
999 my $servernodes = grp_get "eg_chat_server2"
1000 or return after 1, \&server_connect;
1001
1002 print "\rconnecting...\n";
1003
1004 $client = port { print "\r \r@_\n> " };
1005 mon $client, sub {
1006 print "\rdisconnected @_\n";
1007 &server_connect;
1008 };
1009
1010 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
1011 mon $server, $client;
1012 }
1013
1014 server_connect;
1015
1016 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
1017 chomp (my $line = <STDIN>);
1018 print "> ";
1019 snd $server, $line
1020 if $server;
1021 });
1022
1023 print "> ";
1024 AnyEvent->condvar->recv;
1025
1026 The client is quite similar to the previous one, but instead of contacting
1027 the server I<port> (which no longer exists), it C<spawn>s (creates) a new
1028 the server I<port on node>:
1029
1030 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
1031 mon $server, $client;
1032
1033 And of course the first thing after creating it is monitoring it.
1034
1035 Phew, let's go through this in slow motion: the C<spawn> function creates
1036 a new port on a remote node and returns its port ID. After creating
1037 the port it calls a function on the remote node, passing any remaining
1038 arguments to it, and - most importantly - executes the function within
1039 the context of the new port, so it can be manipulated by referring to
1040 C<$SELF>. The init function can reside in a module (actually it normally
1041 I<should> reside in a module) - AnyEvent::MP will automatically load the
1042 module if the function isn't defined.
1043
1044 The C<spawn> function returns immediately, which means you can instantly
1045 send messages to the port, long before the remote node has even heard
1046 of our request to create a port on it. In fact, the remote node might
1047 not even be running. Despite these troubling facts, everything should
1048 work just fine: if the node isn't running (or the init function throws an
1049 exception), then the monitor will trigger because the port doesn't exist.
1050
1051 If the spawn message gets delivered, but the monitoring message is not
1052 because of network problems (extremely unlikely, but monitoring, after
1053 all, is implemented by passing a message, and messages can get lost), then
1054 this connection loss will eventually trigger the monitoring action. On the
1055 remote node (which in return monitors the client) the port will also be
1056 cleaned up on connection loss. When the remote node comes up again and our
1057 monitoring message can be delivered, it will instantly fail because the
1058 port has been cleaned up in the meantime.
1059
1060 If your head is spinning by now, that's fine - just keep in mind, after
1061 creating a port using C<spawn>, monitor it on the local node, and monitor
1062 "the other side" from the remote node, and all will be cleaned up just
1063 fine.
1064
1065 =head2 Services
1066
1067 Above it was mentioned that C<spawn> automatically loads modules. This can
1068 be exploited in various useful ways.
1069
1070 Assume for a moment you put the server into a file called
1071 F<mymod/chatserver.pm> reachable from the current directory. Then you
1072 could run a node there with:
1073
1074 aemp run
1075
1076 The other nodes could C<spawn> the server by using
1077 C<mymod::chatserver::client_connect> as init function - without any other
1078 configuration.
1079
1080 Likewise, when you have some service that starts automatically when loaded
1081 (similar to AnyEvent::MP::Global), then you can configure this service
1082 statically:
1083
1084 aemp profile mysrvnode services mymod::service::
1085 aemp run profile mysrvnode
1086
1087 And the module will automatically be loaded in the node, as specifying a
1088 module name (with C<::>-suffix) will simply load the module, which is then
1089 free to do whatever it wants.
1090
1091 Of course, you can also do it in the much more standard way by writing
1092 a module (e.g. C<BK::Backend::IRC>), installing it as part of a module
1093 distribution and then configure nodes. For example, if I wanted to run the
1094 Bummskraut IRC backend on a machine named "ruth", I could do this:
1095
1096 aemp profile ruth addservice BK::Backend::IRC::
1097
1098 And any F<aemp run> on that host will automatically have the Bummskraut
1099 IRC backend running.
1100
1101 There are plenty of possibilities you can use - it's all up to you how you
1102 structure your application.
1103
1104 =head1 PART 4: Coro::MP - selective receive
1105
1106 Not all problems lend themselves naturally to an event-based solution:
1107 sometimes things are easier if you can decide in what order you want to
1108 receive messages, regardless of the order in which they were sent.
1109
1110 In these cases, L<Coro::MP> can provide a nice solution: instead of
1111 registering callbacks for each message type, C<Coro::MP> attaches a
1112 (coro-) thread to a port. The thread can then opt to selectively receive
1113 messages it is interested in. Other messages are not lost, but queued, and
1114 can be received at a later time.
1115
1116 The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate
1117 module. It is, however, tightly integrated into C<AnyEvent::MP> - the
1118 ports it creates are fully compatible to C<AnyEvent::MP> ports.
1119
1120 In fact, C<Coro::MP> is more of an extension than a separate module: all
1121 functions exported by C<AnyEvent::MP> are exported by it as well.
1122
1123 To illustrate how programing with C<Coro::MP> looks like, consider the
1124 following (slightly contrived) example: Let's implement a server that
1125 accepts a C<< (write_file =>, $port, $path) >> message with a (source)
1126 port and a filename, followed by as many C<< (data => $port, $data) >>
1127 messages as required to fill the file, followed by an empty C<< (data =>
1128 $port) >> message.
1129
1130 The server only writes a single file at a time, other requests will stay
1131 in the queue until the current file has been finished.
1132
1133 Here is an example implementation that uses L<Coro::AIO> and largely
1134 ignores error handling:
1135
1136 my $ioserver = port_async {
1137 while () {
1138 my ($tag, $port, $path) = get_cond;
1139
1140 $tag eq "write_file"
1141 or die "only write_file messages expected";
1142
1143 my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1144 or die "$path: $!";
1145
1146 while () {
1147 my (undef, undef, $data) = get_cond {
1148 $_[0] eq "data" && $_[1] eq $port
1149 } 5
1150 or die "timeout waiting for data message from $port\n";
1151
1152 length $data or last;
1153
1154 aio_write $fh, undef, undef, $data, 0;
1155 };
1156 }
1157 };
1158
1159 mon $ioserver, sub {
1160 warn "ioserver was killed: @_\n";
1161 };
1162
1163 Let's go through it, section by section.
1164
1165 my $ioserver = port_async {
1166
1167 Ports can be created by attaching a thread to an existing port via
1168 C<rcv_async>, or as in this example, by calling C<port_async> with the
1169 code to execute as a thread. The C<async> component comes from the fact
1170 that threads are created using the C<Coro::async> function.
1171
1172 The thread runs in a normal port context (so C<$SELF> is set). In
1173 addition, when the thread returns, it will be C<kil> I<normally>, i.e.
1174 without a reason argument.
1175
1176 while () {
1177 my ($tag, $port, $path) = get_cond;
1178 or die "only write_file messages expected";
1179
1180 The thread is supposed to serve many file writes, which is why it
1181 executes in a loop. The first thing it does is fetch the next message,
1182 using C<get_cond>, the "conditional message get". Without arguments, it
1183 merely fetches the I<next> message from the queue, which I<must> be a
1184 C<write_file> message.
1185
1186 The message contains the C<$path> to the file, which is then created:
1187
1188 my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1189 or die "$path: $!";
1190
1191 Then we enter a loop again, to serve as many C<data> messages as
1192 necessary:
1193
1194 while () {
1195 my (undef, undef, $data) = get_cond {
1196 $_[0] eq "data" && $_[1] eq $port
1197 } 5
1198 or die "timeout waiting for data message from $port\n";
1199
1200 This time, the condition is not empty, but instead a code block: similarly
1201 to grep, the code block will be called with C<@_> set to each message in
1202 the queue, and it has to return whether it wants to receive the message or
1203 not.
1204
1205 In this case we are interested in C<data> messages (C<< $_[0] eq "data"
1206 >>), whose first element is the source port (C<< $_[1] eq $port >>).
1207
1208 The condition must be this strict, as it is possible to receive both
1209 C<write_file> messages and C<data> messages from other ports while we
1210 handle the file writing.
1211
1212 The lone C<5> argument at the end is a timeout - when no matching message
1213 is received within C<5> seconds, we assume an error and C<die>.
1214
1215 When an empty C<data> message is received we are done and can close the
1216 file (which is done automatically as C<$fh> goes out of scope):
1217
1218 length $data or last;
1219
1220 Otherwise we need to write the data:
1221
1222 aio_write $fh, undef, undef, $data, 0;
1223
1224 And that's basically it. Note that every port thread should have some
1225 kind of supervisor. In our case, the supervisor simply prints any error
1226 message:
1227
1228 mon $ioserver, sub {
1229 warn "ioserver was killed: @_\n";
1230 };
1231
1232 Here is a usage example:
1233
1234 port_async {
1235 snd $ioserver, write_file => $SELF, "/tmp/unsafe";
1236 snd $ioserver, data => $SELF, "abc\n";
1237 snd $ioserver, data => $SELF, "def\n";
1238 snd $ioserver, data => $SELF;
1239 };
1240
1241 The messages are sent without any flow control or acknowledgement (feel
1242 free to improve). Also, the source port does not actually need to be a
1243 port - any unique ID will do - but port identifiers happen to be a simple
1244 source of network-wide unique IDs.
1245
1246 Apart from C<get_cond> as seen above, there are other ways to receive
1247 messages. The C<write_file> message above could also selectively be
1248 received using a C<get> call:
1249
1250 my ($port, $path) = get "write_file";
1251
1252 This is simpler, but when some other code part sends an unexpected message
1253 to the C<$ioserver> it will stay in the queue forever. As a rule of thumb,
1254 every threaded port should have a "fetch next message unconditionally"
1255 somewhere, to avoid filling up the queue.
1256
1257 Finally, it is also possible to use more switch-like C<get_conds>:
1258
1259 get_cond {
1260 $_[0] eq "msg1" and return sub {
1261 my (undef, @msg1_data) = @_;
1262 ...;
1263 };
1264
1265 $_[0] eq "msg2" and return sub {
1266 my (undef, @msg2_data) = @_;
1267 ...;
1268 };
1269
1270 die "unexpected message $_[0] received";
1271 };
1272
1273 =head1 THE END
1274
1275 This is the end of this introduction, but hopefully not the end of
1276 your career as AEMP user. I hope the tutorial was enough to make the
1277 basic concepts clear. Keep in mind that distributed programming is not
1278 completely trivial, in fact, it's pretty complicated. We hope AEMP makes
1279 it simpler and will be useful to create exciting new applications.
1280
1281 =head1 SEE ALSO
1282
1283 L<AnyEvent::MP>
1284
1285 L<AnyEvent::MP::Global>
1286
1287 L<Coro::MP>
1288
1289 L<AnyEvent>
1290
1291 =head1 AUTHOR
1292
1293 Robin Redeker <elmex@ta-sa.org>
1294 Marc Lehmann <schmorp@schmorp.de>
1295