ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Intro.pod
Revision: 1.49
Committed: Tue Mar 6 16:54:49 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.48: +69 -53 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 Message Passing for the Non-Blocked Mind
2
3 =head1 Introduction and Terminology
4
5 This is a tutorial about how to get the swing of the new L<AnyEvent::MP>
6 module, which allows programs to transparently pass messages within the
7 process and to other processes on the same or a different host.
8
9 What kind of messages? Basically a message here means a list of Perl
10 strings, numbers, hashes and arrays, anything that can be expressed as a
11 L<JSON> text (as JSON is the default serialiser in the protocol). Here are
12 two examples:
13
14 write_log => 1251555874, "action was successful.\n"
15 123, ["a", "b", "c"], { foo => "bar" }
16
17 When using L<AnyEvent::MP> it is customary to use a descriptive string as
18 first element of a message that indicates the type of the message. This
19 element is called a I<tag> in L<AnyEvent::MP>, as some API functions
20 (C<rcv>) support matching it directly.
21
22 Supposedly you want to send a ping message with your current time to
23 somewhere, this is how such a message might look like (in Perl syntax):
24
25 ping => 1251381636
26
27 Now that we know what a message is, to which entities are those
28 messages being I<passed>? They are I<passed> to I<ports>. A I<port> is
29 a destination for messages but also a context to execute code: when
30 a runtime error occurs while executing code belonging to a port, the
31 exception will be raised on the port and can even travel to interested
32 parties on other nodes, which makes supervision of distributed processes
33 easy.
34
35 How do these ports relate to things you know? Each I<port> belongs
36 to a I<node>, and a I<node> is just the UNIX process that runs your
37 L<AnyEvent::MP> application.
38
39 Each I<node> is distinguished from other I<nodes> running on the same or
40 another host in a network by its I<node ID>. A I<node ID> is simply a
41 unique string chosen manually or assigned by L<AnyEvent::MP> in some way
42 (UNIX nodename, random string...).
43
44 Here is a diagram about how I<nodes>, I<ports> and UNIX processes relate
45 to each other. The setup consists of two nodes (more are of course
46 possible): Node C<A> (in UNIX process 7066) with the ports C<ABC> and
47 C<DEF>. And the node C<B> (in UNIX process 8321) with the ports C<FOO> and
48 C<BAR>.
49
50
51 |- PID: 7066 -| |- PID: 8321 -|
52 | | | |
53 | Node ID: A | | Node ID: B |
54 | | | |
55 | Port ABC =|= <----\ /-----> =|= Port FOO |
56 | | X | |
57 | Port DEF =|= <----/ \-----> =|= Port BAR |
58 | | | |
59 |-------------| |-------------|
60
61 The strings for the I<port IDs> here are just for illustrative
62 purposes: Even though I<ports> in L<AnyEvent::MP> are also identified by
63 strings, they can't be chosen manually and are assigned by the system
64 dynamically. These I<port IDs> are unique within a network and can also be
65 used to identify senders, or even as message tags for instance.
66
67 The next sections will explain the API of L<AnyEvent::MP> by going through
68 a few simple examples. Later some more complex idioms are introduced,
69 which are hopefully useful to solve some real world problems.
70
71 =head2 Passing Your First Message
72
73 For starters, let's have a look at the messaging API. The following
74 example is just a demo to show the basic elements of message passing with
75 L<AnyEvent::MP>.
76
77 The example should print: C<Ending with: 123>, in a rather complicated
78 way, by passing some message to a port.
79
80 use AnyEvent;
81 use AnyEvent::MP;
82
83 my $end_cv = AnyEvent->condvar;
84
85 my $port = port;
86
87 rcv $port, test => sub {
88 my ($data) = @_;
89 $end_cv->send ($data);
90 };
91
92 snd $port, test => 123;
93
94 print "Ending with: " . $end_cv->recv . "\n";
95
96 It already uses most of the essential functions inside
97 L<AnyEvent::MP>: First there is the C<port> function which creates a
98 I<port> and will return it's I<port ID>, a simple string.
99
100 This I<port ID> can be used to send messages to the port and install
101 handlers to receive messages on the port. Since it is a simple string
102 it can be safely passed to other I<nodes> in the network when you want
103 to refer to that specific port (usually used for RPC, where you need
104 to tell the other end which I<port> to send the reply to - messages in
105 L<AnyEvent::MP> have a destination, but no source).
106
107 The next function is C<rcv>:
108
109 rcv $port, test => sub { ... };
110
111 It installs a receiver callback on the I<port> that specified as the first
112 argument (it only works for "local" ports, i.e. ports created on the same
113 node). The next argument, in this example C<test>, specifies a I<tag> to
114 match. This means that whenever a message with the first element being
115 the string C<test> is received, the callback is called with the remaining
116 parts of that message.
117
118 Messages can be sent with the C<snd> function, which is used like this in
119 the example above:
120
121 snd $port, test => 123;
122
123 This will send the message C<'test', 123> to the I<port> with the I<port
124 ID> stored in C<$port>. Since in this case the receiver has a I<tag> match
125 on C<test> it will call the callback with the first argument being the
126 number C<123>.
127
128 The callback is a typical AnyEvent idiom: the callback just passes
129 that number on to the I<condition variable> C<$end_cv> which will then
130 pass the value to the print. Condition variables are out of the scope
131 of this tutorial and not often used with ports, so please consult the
132 L<AnyEvent::Intro> about them.
133
134 Passing messages inside just one process is boring. Before we can move on
135 and do interprocess message passing we first have to make sure some things
136 have been set up correctly for our nodes to talk to each other.
137
138 =head2 System Requirements and System Setup
139
140 Before we can start with real IPC we have to make sure some things work on
141 your system.
142
143 First we have to setup a I<shared secret>: for two L<AnyEvent::MP>
144 I<nodes> to be able to communicate with each other over the network it is
145 necessary to setup the same I<shared secret> for both of them, so they can
146 prove their trustworthyness to each other.
147
148 The easiest way is to set this up is to use the F<aemp> utility:
149
150 aemp gensecret
151
152 This creates a F<$HOME/.perl-anyevent-mp> config file and generates a
153 random shared secret. You can copy this file to any other system and
154 then communicate over the network (via TCP) with it. You can also select
155 your own shared secret (F<aemp setsecret>) and for increased security
156 requirements you can even create (or configure) a TLS certificate (F<aemp
157 gencert>), causing connections to not just be securely authenticated, but
158 also to be encrypted and protected against tinkering.
159
160 Connections will only be successfully established when the I<nodes>
161 that want to connect to each other have the same I<shared secret> (or
162 successfully verify the TLS certificate of the other side, in which case
163 no shared secret is required).
164
165 B<If something does not work as expected, and for example tcpdump shows
166 that the connections are closed almost immediately, you should make sure
167 that F<~/.perl-anyevent-mp> is the same on all hosts/user accounts that
168 you try to connect with each other!>
169
170 Thats is all for now, you will find some more advanced fiddling with the
171 C<aemp> utility later.
172
173 =head2 Shooting the Trouble
174
175 Sometimes things go wrong, and AnyEvent::MP, being a professional module,
176 does not gratuitously spill out messages to your screen.
177
178 To help troubleshooting any issues, there are two environment variables
179 that you can set. The first, C<PERL_ANYEVENT_MP_WARNLEVEL> sets the
180 logging level. The default is C<5>, which means nothing much is
181 printed. You can increase it to C<8> or C<9> to get more verbose
182 output. This is example output when starting a node:
183
184 2012-03-04 19:41:10 <8> node cerebro starting up.
185 2012-03-04 19:41:10 <8> node listens on [10.0.0.1:4040].
186 2012-03-04 19:41:10 <9> trying connect to seed node 10.0.0.19:4040.
187 2012-03-04 19:41:10 <9> 10.0.0.19:4040 connected as rain
188 2012-03-04 19:41:10 <7> rain is up ()
189
190 A lot of info, but at least you can see that it does something.
191
192 The other environment variable that can be useful is
193 C<PERL_ANYEVENT_MP_TRACE>, which, when set to a true value, will cause
194 most messages that are sent or received to be printed. For example, F<aemp
195 restart rijk> might output these message exchanges:
196
197 SND rijk <- [null,"eval","AnyEvent::Watchdog::Util::restart; ()","aemp/cerebro/z4kUPp2JT4#b"]
198 SND rain <- [null,"g_slave",{"'l":{"aemp/cerebro/z4kUPp2JT4":["10.0.0.1:48168"]}}]
199 SND rain <- [null,"g_find","rijk"]
200 RCV rain -> ["","g_found","rijk",["10.0.0.23:4040"]]
201 RCV rijk -> ["b",""]
202
203 =head1 PART 1: Passing Messages Between Processes
204
205 =head2 The Receiver
206
207 Lets split the previous example up into two programs: one that contains
208 the sender and one for the receiver. First the receiver application, in
209 full:
210
211 use AnyEvent;
212 use AnyEvent::MP;
213
214 configure nodeid => "eg_receiver/%u", binds => ["*:4040"];
215
216 my $port = port;
217 db_set eg_receivers => $port;
218
219 rcv $port, test => sub {
220 my ($data, $reply_port) = @_;
221
222 print "Received data: " . $data . "\n";
223 };
224
225 AnyEvent->condvar->recv;
226
227 =head3 AnyEvent::MP::Global
228
229 Now, that wasn't too bad, was it? OK, let's step through the new functions
230 that have been used.
231
232 =head3 C<configure> and Joining and Maintaining the Network
233
234 First let's have a look at C<configure>:
235
236 configure nodeid => "eg_receiver/%u", binds => ["*:4040"];
237
238 Before we are able to send messages to other nodes we have to initialise
239 ourself to become a "distributed node". Initialising a node means naming
240 the node and binding some TCP listeners so that other nodes can
241 contact it.
242
243 Additionally, to actually link all nodes in a network together, you can
244 specify a number of seed addresses, which will be used by the node to
245 connect itself into an existing network, as we will see shortly.
246
247 All of this (and more) can be passed to the C<configure> function - later
248 we will see how we can do all this without even passing anything to
249 C<configure>!
250
251 The first parameter, C<nodeid>, specified the node ID (in this case
252 C<eg_receiver/%u> - the default is to use the node name of the current
253 host plus C</%u>, which goves the node a name with a random suffix to
254 make it unique, but for this example we want the node to have a bit more
255 personality, and name it C<eg_receiver> with a random suffix.
256
257 Why the random suffix? Node IDs need to be unique within the network and
258 appending a random suffix is the easiest way to do that.
259
260 The second parameter, C<binds>, specifies a list of C<address:port> pairs
261 to bind TCP listeners on. The special "address" of C<*> means to bind on
262 every local IP address (this might not work on every OS, so explicit IP
263 addresses are best).
264
265 The reason to bind on a TCP port is not just that other nodes can connect
266 to us: if no binds are specified, the node will still bind on a dynamic
267 port on all local addresses - but in this case we won't know the port, and
268 cannot tell other nodes to connect to it as seed node.
269
270 Now, a I<seed> is simply the TCP address of some other node in the
271 network, often the same string as used for the C<binds> parameter of the
272 other node. The need for seeds is easy to explain: I<somehow> the nodes
273 of an aemp network have to find each other, and often this means over the
274 internet. So broadcasts are out.
275
276 Instead, a node usually specifies the addresses of a few (for redundancy)
277 other nodes, some of which should be up. Two nodes can set each other as
278 seeds without any issues. You could even specify all nodes as seeds for
279 all nodes, for total redundancy. But the common case is to have some more
280 or less central, stable servers running seed services for other nodes.
281
282 All you need to do to ensure that an AnyEvent::MP network connects
283 together is to make sure that all connections from nodes to their seed
284 nodes I<somehow> span the whole network. The simplest way to do that would
285 be for all nodes to specify a single node as seed node, and you would get
286 a star topology. If you specify all nodes as seed nodes, you get a fully
287 meshed network (that's what previous releases of AnyEvent::MP actually
288 did).
289
290 A node tries to keep connections open to all of it's seed nodes at all
291 times, while other connections are made on demand only.
292
293 All of this ensures that the network stays one network - even if all the
294 nodes in one half of the net are separated from the nodes in the other
295 half by some network problem, once that is over, they will eventually
296 become a single network again.
297
298 In addition to creating the network, a node also expects the seed nodes to
299 run the shared database service - if need be, by automatically starting it,
300 so you don't normally need to configure this explicitly.
301
302 #TODO# later?#d#
303 The process of joining a network takes time, during which the node
304 is already running. This means it takes time until the node is
305 fully connected, and information about services in the network are
306 available. This is why most AnyEvent::MP programs start by waiting a while
307 until the information they need is available.
308
309 We will see how this is done later, in the sender program.
310
311 =head3 Registering the Receiver
312
313 Coming back to our example, after the node has been configured for network
314 access, it is time to publish some service, namely the receive service.
315
316 For that, let's look at the next lines:
317
318 my $port = port;
319 db_set eg_receivers => $port;
320
321 The C<port> function has already been discussed. It simply creates a new
322 I<port> and returns the I<port ID>. The C<db_reg> function, however, is
323 new: The first argument is the name of a I<database family> and the second
324 argument is the name of a I<subkey> within that family. The third argument
325 would be the I<value> to be associated with the family and subkey, but,
326 since it is missing, it will simply be C<undef>.
327
328 OK, what's this weird talk about families you wonder - AnyEvent::MP comes
329 with a distributed database. This database runs on so-called "global"
330 nodes, which usually are the seed nodes of your network. The database
331 structure is "simply" a hash of hashes of values.
332
333 In other words, if the database were stored in C<%DB>, then the C<db_set>
334 function more or less would do this:
335
336 $DB{eg_receivers}{$port} = undef;
337
338 So the ominous "family" selects a hash in the database, and the "subkey"
339 is simply the key in this hash. And C<db_set> very much works like an
340 assignment.
341
342 The family namespace is shared by all nodes in a network, so the names
343 should be reasonably unique, for example, they could start with the name
344 of your module, or the name of the program, using your port name or node
345 name as subkey.
346
347 The purpose behind adding this key to the database is that the sender can
348 look it up and find our port. We will shortly see how.
349
350 The last step in the example is to set up a receiver callback for those
351 messages, just as was discussed in the first example. We again match
352 for the tag C<test>. The difference is that this time we don't exit the
353 application after receiving the first message. Instead we continue to wait
354 for new messages indefinitely.
355
356 =head2 The Sender
357
358 OK, now let's take a look at the sender code:
359
360 use AnyEvent;
361 use AnyEvent::MP;
362
363 configure nodeid => "eg_sender/%u", seeds => ["*:4040"];
364
365 my $guard = db_mon eg_receivers => sub {
366 my ($family, $keys) = @_;
367 return unless %$family;
368
369 # now there are some receivers, send them a message
370 snd $_ => test => time, keys %$family
371 for keys %$family;
372 };
373
374 AnyEvent->condvar->recv;
375
376 It's even less code. The C<configure> serves the same purpose as in the
377 receiver, but instead of specifying binds we specify a list of seeds - the
378 only seed happens to be the same as the bind used by the receiver, which
379 therefore becomes our seed node.
380
381 Remember the part about having to wait till things become available? Well,
382 after configure returns, nothing has been done yet - the node is not
383 connected to the network, knows nothing about the database contents, and
384 it can take ages (for a computer :) for this situation to change.
385
386 Therefore, the sender waits, in this case by using the C<db_mon>
387 function. This function registers an interest in a specific database
388 family (in this case C<eg_receivers>). Each time something inside the
389 family changes (a key is added, changed or deleted), it will call our
390 callback with the family hash as first argument, and the list of keys as
391 second argument.
392
393 The callback only checks whether the C<%$family> has is empty - if it is,
394 then it doesn't do anything. But eventually the family will contain the
395 port subkey we set in the sender. Then it will send a message to it (and
396 any other receiver in the same family). Likewise, should the receiver go
397 away and come back, or should another receiver come up, it will again send
398 a message to all of them.
399
400 You can experiment by having multiple receivers - you have to change the
401 "binds" parameter in the receiver to the seeds used in the sender to start
402 up additional receivers, but then you can start as many as you like. If
403 you specify proper IP addresses for the seeds, you can even run them on
404 different computers.
405
406 Each time you start the sender, it will send a message to all receivers it
407 finds (you have to interrupt it manually afterwards).
408
409 Additional things you could try include using C<PERL_ANYEVENT_MP_TRACE=1>
410 to see which messages are exchanged, or starting the sender first and see
411 how long it takes it to find the receiver.
412
413 =head3 Splitting Network Configuration and Application Code
414
415 OK, so far, this works reasonably. In the real world, however, the person
416 configuring your application to run on a specific network (the end user
417 or network administrator) is often different to the person coding the
418 application.
419
420 Or to put it differently: the arguments passed to configure are usually
421 provided not by the programmer, but by whoever is deploying the program -
422 even in the example above, we would like to be able to just start senders
423 and receivers without having to patch the programs.
424
425 To make this easy, AnyEvent::MP supports a simple configuration database,
426 using profiles, which can be managed using the F<aemp> command-line
427 utility (yes, this section is about the advanced tinkering mentioned
428 before).
429
430 When you change both programs above to simply call
431
432 configure;
433
434 then AnyEvent::MP tries to look up a profile using the current node name
435 in its configuration database, falling back to some global default.
436
437 You can run "generic" nodes using the F<aemp> utility as well, and we will
438 exploit this in the following way: we configure a profile "seed" and run
439 a node using it, whose sole purpose is to be a seed node for our example
440 programs.
441
442 We bind the seed node to port 4040 on all interfaces:
443
444 aemp profile seed binds "*:4040"
445
446 And we configure all nodes to use this as seed node (this only works when
447 running on the same host, for multiple machines you would provide the IP
448 address or hostname of the node running the seed), by changing the global
449 settings shared between all profiles:
450
451 aemp seeds "*:4040"
452
453 Then we run the seed node:
454
455 aemp run profile seed
456
457 After that, we can start as many other nodes as we want, and they will
458 all use our generic seed node to discover each other. The reason we can
459 start our existing programs even though they specify "incompatible"
460 parameters to C<configure> is that the configuration file (by default)
461 takes precedence over any arguments passed to C<configure>.
462
463 That's all for now - next we will teach you about monitoring by writing a
464 simple chat client and server :)
465
466 =head1 PART 2: Monitoring, Supervising, Exception Handling and Recovery
467
468 That's a mouthful, so what does it mean? Our previous example is what one
469 could call "very loosely coupled" - the sender doesn't care about whether
470 there are any receivers, and the receivers do not care if there is any
471 sender.
472
473 This can work fine for simple services, but most real-world applications
474 want to ensure that the side they are expecting to be there is actually
475 there. Going one step further: most bigger real-world applications even
476 want to ensure that if some component is missing, or has crashed, it will
477 still be there, by recovering and restarting the service.
478
479 AnyEvent::MP supports this by catching exceptions and network problems,
480 and notifying interested parties of these.
481
482 =head2 Exceptions, Port Context, Network Errors and Monitors
483
484 =head3 Exceptions
485
486 Exceptions are handled on a per-port basis: all receive callbacks are
487 executed in a special context, the so-called I<port-context>: code
488 that throws an otherwise uncaught exception will cause the port to be
489 C<kil>led. Killed ports are destroyed automatically (killing ports is
490 actually the only way to free ports).
491
492 Ports can be monitored, even from a different node and host, and when a
493 port is killed, any entity monitoring it will be notified.
494
495 Here is a simple example:
496
497 use AnyEvent::MP;
498
499 # create a port, it always dies
500 my $port = port { die "oops" };
501
502 # monitor it
503 mon $port, sub {
504 warn "$port was killed (with reason @_)";
505 };
506
507 # now send it some message, causing it to die:
508 snd $port;
509
510 AnyEvent->condvar->recv;
511
512 It first creates a port whose only action is to throw an exception,
513 and the monitors it with the C<mon> function. Afterwards it sends it a
514 message, causing it to die and call the monitoring callback:
515
516 anon/6WmIpj.a was killed (with reason die oops at xxx line 5.) at xxx line 9.
517
518 The callback was actually passed two arguments: C<die>, to indicate it
519 did throw an I<exception> as opposed to, say, a network error, and the
520 exception message itself.
521
522 What happens when a port is killed before we have a chance to monitor
523 it? Granted, this is highly unlikely in our example, but when you program
524 in a network this can easily happen due to races between nodes.
525
526 use AnyEvent::MP;
527
528 my $port = port { die "oops" };
529
530 snd $port;
531
532 mon $port, sub {
533 warn "$port was killed (with reason @_)";
534 };
535
536 AnyEvent->condvar->recv;
537
538 This time we will get something like:
539
540 anon/zpX.a was killed (with reason no_such_port cannot monitor nonexistent port)
541
542 Since the port was already gone, the kill reason is now C<no_such_port>
543 with some descriptive (we hope) error message.
544
545 In fact, the kill reason is usually some identifier as first argument
546 and a human-readable error message as second argument, but can be about
547 anything (it is simply a list of values you cna choose yourself) or even
548 nothing - which is called a "normal" kill.
549
550 You can kill ports manually using the C<kil> function, which will be
551 treated like an error when any reason is specified:
552
553 kil $port, custom_error => "don't like your steenking face";
554
555 And a clean kill without any reason arguments:
556
557 kil $port;
558
559 By now you probably wonder what this "normal" kill business is: A common
560 idiom is to not specify a callback to C<mon>, but another port, such as
561 C<$SELF>:
562
563 mon $port, $SELF;
564
565 This basically means "monitor $port and kill me when it crashes". And a
566 "normal" kill does not count as a crash. This way you can easily link
567 ports together and make them crash together on errors, while allowing you
568 to remove a port silently.
569
570 =head3 Port Context
571
572 When code runs in a port context, that means C<$SELF> contains its own
573 port ID and exceptions that the code throws will be caught.
574
575 Since AnyEvent::MP is event-based, it is not uncommon to register
576 callbacks from C<rcv> handlers. As example, assume that the port receive
577 handler wants to C<die> a second later, using C<after>:
578
579 my $port = port {
580 after 1, sub { die "oops" };
581 };
582
583 Then you will find it does not work - when the after callback is executed,
584 it does not run in port context anymore, so exceptions will not be caught.
585
586 For these cases, AnyEvent::MP exports a special "closure constructor"
587 called C<psub>, which works just like perls built-in C<sub>:
588
589 my $port = port {
590 after 1, psub { die "oops" };
591 };
592
593 C<psub> stores C<$SELF> and returns a code reference. When the code
594 reference is invoked, it will run the code block within the context of
595 that port, so exception handling once more works as expected.
596
597 There is even a way to temporarily execute code in the context of some
598 port, namely C<peval>:
599
600 peval $port, sub {
601 # die'ing here will kil $port
602 };
603
604 The C<peval> function temporarily replaces C<$SELF> by the given C<$port>
605 and then executes the given sub in a port context.
606
607 =head3 Network Errors and the AEMP Guarantee
608
609 I mentioned another important source of monitoring failures: network
610 problems. When a node loses connection to another node, it will invoke all
611 monitoring actions as if the port was killed, even if it is possible that
612 the port is still happily alive on another node (not being able to talk to
613 a node means we have no clue what's going on with it, it could be crashed,
614 but also still running without knowing we lost the connection).
615
616 So another way to view monitors is "notify me when some of my messages
617 couldn't be delivered". AEMP has a guarantee about message delivery to a
618 port: After starting a monitor, any message sent to a port will either
619 be delivered, or, when it is lost, any further messages will also be lost
620 until the monitoring action is invoked. After that, further messages
621 I<might> get delivered again.
622
623 This doesn't sound like a very big guarantee, but it is kind of the best
624 you can get while staying sane: Specifically, it means that there will
625 be no "holes" in the message sequence: all messages sent are delivered
626 in order, without any missing in between, and when some were lost, you
627 I<will> be notified of that, so you can take recovery action.
628
629 And, obviously, the guarantee only works in the presence of
630 correctly-working hardware, and no relevant bugs inside AEMP itself.
631
632 =head3 Supervising
633
634 OK, so how is this crashing-everything-stuff going to make applications
635 I<more> stable? Well, in fact, the goal is not really to make them more
636 stable, but to make them more resilient against actual errors and
637 crashes. And this is not done by crashing I<everything>, but by crashing
638 everything except a I<supervisor>.
639
640 A supervisor is simply some code that ensures that an application (or a
641 part of it) is running, and if it crashes, is restarted properly. That is,
642 it supervises a service by starting and restarting it, as necessary.
643
644 To show how to do all this we will create a simple chat server that can
645 handle many chat clients. Both server and clients can be killed and
646 restarted, and even crash, to some extent, without disturbing the chat
647 functionality.
648
649 =head2 Chatting, the Resilient Way
650
651 Without further ado, here is the chat server (to run it, we assume the
652 set-up explained earlier, with a separate F<aemp run seed> node):
653
654 use common::sense;
655 use AnyEvent::MP;
656 use AnyEvent::MP::Global;
657
658 configure;
659
660 my %clients;
661
662 sub msg {
663 print "relaying: $_[0]\n";
664 snd $_, $_[0]
665 for values %clients;
666 }
667
668 our $server = port;
669
670 rcv $server, join => sub {
671 my ($client, $nick) = @_;
672
673 $clients{$client} = $client;
674
675 mon $client, sub {
676 delete $clients{$client};
677 msg "$nick (quits, @_)";
678 };
679 msg "$nick (joins)";
680 };
681
682 rcv $server, privmsg => sub {
683 my ($nick, $msg) = @_;
684 msg "$nick: $msg";
685 };
686
687 db_set eg_chat_server => $server;
688
689 warn "server ready.\n";
690
691 AnyEvent->condvar->recv;
692
693 Looks like a lot, but it is actually quite simple: after your usual
694 preamble (this time we use common sense), we define a helper function that
695 sends some message to every registered chat client:
696
697 sub msg {
698 print "relaying: $_[0]\n";
699 snd $_, $_[0]
700 for values %clients;
701 }
702
703 The clients are stored in the hash C<%client>. Then we define a server
704 port and install two receivers on it, C<join>, which is sent by clients
705 to join the chat, and C<privmsg>, that clients use to send actual chat
706 messages.
707
708 C<join> is most complicated. It expects the client port and the nickname
709 to be passed in the message, and registers the client in C<%clients>.
710
711 rcv $server, join => sub {
712 my ($client, $nick) = @_;
713
714 $clients{$client} = $client;
715
716 The next step is to monitor the client. The monitoring action removes the
717 client and sends a quit message with the error to all remaining clients.
718
719 mon $client, sub {
720 delete $clients{$client};
721 msg "$nick (quits, @_)";
722 };
723
724 And finally, it creates a join message and sends it to all clients.
725
726 msg "$nick (joins)";
727 };
728
729 The C<privmsg> callback simply broadcasts the message to all clients:
730
731 rcv $server, privmsg => sub {
732 my ($nick, $msg) = @_;
733 msg "$nick: $msg";
734 };
735
736 And finally, the server registers itself in the server group, so that
737 clients can find it:
738
739 grp_reg eg_chat_server => $server;
740
741 Well, well... and where is this supervisor stuff? Well... we cheated,
742 it's not there. To not overcomplicate the example, we only put it into
743 the..... CLIENT!
744
745 =head3 The Client, and a Supervisor!
746
747 Again, here is the client, including supervisor, which makes it a bit
748 longer:
749
750 use common::sense;
751 use AnyEvent::MP;
752
753 my $nick = shift || "anonymous";
754
755 configure;
756
757 my ($client, $server);
758
759 sub server_connect {
760 my $db_mon;
761 $db_mon = db_mon eg_chat_server => sub {
762 return unless %{ $_[0] };
763 undef $db_mon;
764
765 print "\rconnecting...\n";
766
767 $client = port { print "\r \r@_\n> " };
768 mon $client, sub {
769 print "\rdisconnected @_\n";
770 &server_connect;
771 };
772
773 $server = (keys %{ $_[0] })[0];
774
775 snd $server, join => $client, $nick;
776 mon $server, $client;
777 };
778 }
779
780 server_connect;
781
782 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
783 chomp (my $line = <STDIN>);
784 print "> ";
785 snd $server, privmsg => $nick, $line
786 if $server;
787 });
788
789 $| = 1;
790 print "> ";
791 AnyEvent->condvar->recv;
792
793 The first thing the client does is to store the nick name (which is
794 expected as the only command line argument) in C<$nick>, for further
795 usage.
796
797 The next relevant thing is... finally... the supervisor:
798
799 #todo#d#
800 sub server_connect {
801 my $servernodes = grp_get "eg_chat_server"
802 or return after 1, \&server_connect;
803
804 This looks up the server in the C<eg_chat_server> global group. If it
805 cannot find it (which is likely when the node is just starting up),
806 it will wait a second and then retry. This "wait a bit and retry"
807 is an important pattern, as distributed programming means lots of
808 things are going on asynchronously. In practise, one should use a more
809 intelligent algorithm, to possibly warn after an excessive number of
810 retries. Hopefully future versions of AnyEvent::MP will offer some
811 predefined supervisors, for now you will have to code it on your own.
812
813 Next it creates a local port for the server to send messages to, and
814 monitors it. When the port is killed, it will print "disconnected" and
815 tell the supervisor function to retry again.
816
817 $client = port { print "\r \r@_\n> " };
818 mon $client, sub {
819 print "\rdisconnected @_\n";
820 &server_connect;
821 };
822
823 Then everything is ready: the client will send a C<join> message with it's
824 local port to the server, and start monitoring it:
825
826 $server = $servernodes->[0];
827 snd $server, join => $client, $nick;
828 mon $server, $client;
829 }
830
831 The monitor will ensure that if the server crashes or goes away, the
832 client will be killed as well. This tells the user that the client was
833 disconnected, and will then start to connect the server again.
834
835 The rest of the program deals with the boring details of actually invoking
836 the supervisor function to start the whole client process and handle the
837 actual terminal input, sending it to the server.
838
839 You should now try to start the server and one or more clients in different
840 terminal windows (and the seed node):
841
842 perl eg/chat_client nick1
843 perl eg/chat_client nick2
844 perl eg/chat_server
845 aemp run profile seed
846
847 And then you can experiment with chatting, killing one or more clients, or
848 stopping and restarting the server, to see the monitoring in action.
849
850 The crucial point you should understand from this example is that
851 monitoring is usually symmetric: when you monitor some other port,
852 potentially on another node, that other port usually should monitor you,
853 too, so when the connection dies, both ports get killed, or at least both
854 sides can take corrective action. Exceptions are "servers" that serve
855 multiple clients at once and might only wish to clean up, and supervisors,
856 who of course should not normally get killed (unless they, too, have a
857 supervisor).
858
859 If you often think in object-oriented terms, then treat a port as an
860 object, C<port> is the constructor, the receive callbacks set by C<rcv>
861 act as methods, the C<kil> function becomes the explicit destructor and
862 C<mon> installs a destructor hook. Unlike conventional object oriented
863 programming, it can make sense to exchange ports more freely (for example,
864 to monitor one port from another).
865
866 There is ample room for improvement: the server should probably remember
867 the nickname in the C<join> handler instead of expecting it in every chat
868 message, it should probably monitor itself, and the client should not try
869 to send any messages unless a server is actually connected.
870
871 =head1 PART 3: TIMTOWTDI: Virtual Connections
872
873 The chat system developed in the previous sections is very "traditional"
874 in a way: you start some server(s) and some clients statically and they
875 start talking to each other.
876
877 Sometimes applications work more like "services": They can run on almost
878 any node and talks to itself on other nodes. The L<AnyEvent::MP::Global>
879 service for example monitors nodes joining the network and starts itself
880 automatically on other nodes (if it isn't running already).
881
882 A good way to design such applications is to put them into a module and
883 create "virtual connections" to other nodes - we call this the "bridge
884 head" method, because you start by creating a remote port (the bridge
885 head) and from that you start to bootstrap your application.
886
887 Since that sounds rather theoretical, let's redesign the chat server and
888 client using this design method.
889
890 Here is the server:
891
892 use common::sense;
893 use AnyEvent::MP;
894 use AnyEvent::MP::Global;
895
896 configure;
897
898 grp_reg eg_chat_server2 => $NODE;
899
900 my %clients;
901
902 sub msg {
903 print "relaying: $_[0]\n";
904 snd $_, $_[0]
905 for values %clients;
906 }
907
908 sub client_connect {
909 my ($client, $nick) = @_;
910
911 mon $client;
912 mon $client, sub {
913 delete $clients{$client};
914 msg "$nick (quits, @_)";
915 };
916
917 $clients{$client} = $client;
918
919 msg "$nick (joins)";
920
921 rcv $SELF, sub { msg "$nick: $_[0]" };
922 }
923
924 warn "server ready.\n";
925
926 AnyEvent->condvar->recv;
927
928 It starts out not much different then the previous example, except that
929 this time, we register the node port in the global group and not any port
930 we created - the clients only want to know which node the server should be
931 running on. In fact, they could also use some kind of election mechanism,
932 to find the node with lowest load or something like that.
933
934 The more interesting change is that indeed no server port is created -
935 the server consists only of code, and "does" nothing by itself. All it
936 does is define a function C<client_connect>, which expects a client port
937 and a nick name as arguments. It then monitors the client port and binds
938 a receive callback on C<$SELF>, which expects messages that in turn are
939 broadcast to all clients.
940
941 The two C<mon> calls are a bit tricky - the first C<mon> is a shorthand
942 for C<mon $client, $SELF>. The second does the normal "client has gone
943 away" clean-up action. Both could actually be rolled into one C<mon>
944 action.
945
946 C<$SELF> is a good hint that something interesting is going on. And
947 indeed, when looking at the client code, there is a new function,
948 C<spawn>:
949
950 use common::sense;
951 use AnyEvent::MP;
952 use AnyEvent::MP::Global;
953
954 my $nick = shift;
955
956 configure;
957
958 $| = 1;
959
960 my $port = port;
961
962 my ($client, $server);
963
964 sub server_connect {
965 my $servernodes = grp_get "eg_chat_server2"
966 or return after 1, \&server_connect;
967
968 print "\rconnecting...\n";
969
970 $client = port { print "\r \r@_\n> " };
971 mon $client, sub {
972 print "\rdisconnected @_\n";
973 &server_connect;
974 };
975
976 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
977 mon $server, $client;
978 }
979
980 server_connect;
981
982 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
983 chomp (my $line = <STDIN>);
984 print "> ";
985 snd $server, $line
986 if $server;
987 });
988
989 print "> ";
990 AnyEvent->condvar->recv;
991
992 The client is quite similar to the previous one, but instead of contacting
993 the server I<port> (which no longer exists), it C<spawn>s (creates) a new
994 the server I<port on node>:
995
996 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
997 mon $server, $client;
998
999 And of course the first thing after creating it is monitoring it.
1000
1001 The C<spawn> function creates a new port on a remote node and returns
1002 its port ID. After creating the port it calls a function on the remote
1003 node, passing any remaining arguments to it, and - most importantly -
1004 executes the function within the context of the new port, so it can be
1005 manipulated by referring to C<$SELF>. The init function can reside in a
1006 module (actually it normally I<should> reside in a module) - AnyEvent::MP
1007 will automatically load the module if the function isn't defined.
1008
1009 The C<spawn> function returns immediately, which means you can instantly
1010 send messages to the port, long before the remote node has even heard
1011 of our request to create a port on it. In fact, the remote node might
1012 not even be running. Despite these troubling facts, everything should
1013 work just fine: if the node isn't running (or the init function throws an
1014 exception), then the monitor will trigger because the port doesn't exist.
1015
1016 If the spawn message gets delivered, but the monitoring message is not
1017 because of network problems (extremely unlikely, but monitoring, after
1018 all, is implemented by passing a message, and messages can get lost), then
1019 this connection loss will eventually trigger the monitoring action. On the
1020 remote node (which in return monitors the client) the port will also be
1021 cleaned up on connection loss. When the remote node comes up again and our
1022 monitoring message can be delivered, it will instantly fail because the
1023 port has been cleaned up in the meantime.
1024
1025 If your head is spinning by now, that's fine - just keep in mind, after
1026 creating a port, monitor it on the local node, and monitor "the other
1027 side" from the remote node, and all will be cleaned up just fine.
1028
1029 =head2 Services
1030
1031 Above it was mentioned that C<spawn> automatically loads modules, and this
1032 can be exploited in various ways.
1033
1034 Assume for a moment you put the server into a file called
1035 F<mymod/chatserver.pm> reachable from the current directory. Then you
1036 could run a node there with:
1037
1038 aemp run
1039
1040 The other nodes could C<spawn> the server by using
1041 C<mymod::chatserver::client_connect> as init function.
1042
1043 Likewise, when you have some service that starts automatically (similar to
1044 AnyEvent::MP::Global), then you can configure this service statically:
1045
1046 aemp profile mysrvnode services mymod::service::
1047 aemp run profile mysrvnode
1048
1049 And the module will automatically be loaded in the node, as specifying a
1050 module name (with C<::>-suffix) will simply load the module, which is then
1051 free to do whatever it wants.
1052
1053 Of course, you can also do it in the much more standard way by writing
1054 a module (e.g. C<BK::Backend::IRC>), installing it as part of a module
1055 distribution and then configure nodes, for example, if I want to run the
1056 Bummskraut IRC backend on a machine named "ruth", I could do this:
1057
1058 aemp profile ruth addservice BK::Backend::IRC::
1059
1060 And any F<aemp run> on that host will automatically have the Bummskraut
1061 IRC backend running.
1062
1063 That's plenty of possibilities you can use - it's all up to you how you
1064 structure your application.
1065
1066 =head1 PART 4: Coro::MP - selective receive
1067
1068 Not all problems lend themselves naturally to an event-based solution:
1069 sometimes things are easier if you can decide in what order you want to
1070 receive messages, irregardless of the order in which they were sent.
1071
1072 In these cases, L<Coro::MP> can provide a nice solution: instead of
1073 registering callbacks for each message type, C<Coro::MP> attached a
1074 (coro-) thread to a port. The thread can then opt to selectively receive
1075 messages it is interested in. Other messages are not lost, but queued, and
1076 can be received at a later time.
1077
1078 The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate
1079 module. It is, however, tightly integrated into C<AnyEvent::MP> - the
1080 ports it creates are fully compatible to C<AnyEvent::MP> ports.
1081
1082 In fact, C<Coro::MP> is more of an extension than a separate module: all
1083 functions exported by C<AnyEvent::MP> are exported by it as well.
1084
1085 To illustrate how programing with C<Coro::MP> looks like, consider the
1086 following (slightly contrived) example: Let's implement a server that
1087 accepts a C<< (write_file =>, $port, $path) >> message with a (source)
1088 port and a filename, followed by as many C<< (data => $port, $data) >>
1089 messages as required to fill the file, followed by an empty C<< (data =>
1090 $port) >> message.
1091
1092 The server only writes a single file at a time, other requests will stay
1093 in the queue until the current file has been finished.
1094
1095 Here is an example implementation that uses L<Coro::AIO> and largely
1096 ignores error handling:
1097
1098 my $ioserver = port_async {
1099 while () {
1100 my ($tag, $port, $path) = get_cond;
1101
1102 $tag eq "write_file"
1103 or die "only write_file messages expected";
1104
1105 my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1106 or die "$path: $!";
1107
1108 while () {
1109 my (undef, undef, $data) = get_cond {
1110 $_[0] eq "data" && $_[1] eq $port
1111 } 5
1112 or die "timeout waiting for data message from $port\n";
1113
1114 length $data or last;
1115
1116 aio_write $fh, undef, undef, $data, 0;
1117 };
1118 }
1119 };
1120
1121 mon $ioserver, sub {
1122 warn "ioserver was killed: @_\n";
1123 };
1124
1125 Let's go through it part by part.
1126
1127 my $ioserver = port_async {
1128
1129 Ports can be created by attaching a thread to an existing port via
1130 C<rcv_async>, or as here by calling C<port_async> with the code to execute
1131 as a thread. The C<async> component comes from the fact that threads are
1132 created using the C<Coro::async> function.
1133
1134 The thread runs in a normal port context (so C<$SELF> is set). In
1135 addition, when the thread returns, it will be C<kil> I<normally>, i.e.
1136 without a reason argument.
1137
1138 while () {
1139 my ($tag, $port, $path) = get_cond;
1140 or die "only write_file messages expected";
1141
1142 The thread is supposed to serve many file writes, which is why it executes
1143 in a loop. The first thing it does is fetch the next message, using
1144 C<get_cond>, the "conditional message get". Without a condition, it simply
1145 fetches the next message from the queue, which I<must> be a C<write_file>
1146 message.
1147
1148 The message contains the C<$path> to the file, which is then created:
1149
1150 my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1151 or die "$path: $!";
1152
1153 Then we enter a loop again, to serve as many C<data> messages as
1154 necessary:
1155
1156 while () {
1157 my (undef, undef, $data) = get_cond {
1158 $_[0] eq "data" && $_[1] eq $port
1159 } 5
1160 or die "timeout waiting for data message from $port\n";
1161
1162 This time, the condition is not empty, but instead a code block: similarly
1163 to grep, the code block will be called with C<@_> set to each message in
1164 the queue, and it has to return whether it wants to receive the message or
1165 not.
1166
1167 In this case we are interested in C<data> messages (C<< $_[0] eq "data"
1168 >>), whose first element is the source port (C<< $_[1] eq $port >>).
1169
1170 The condition must be this strict, as it is possible to receive both
1171 C<write_file> messages and C<data> messages from other ports while we
1172 handle the file writing.
1173
1174 The lone C<5> at the end is a timeout - when no matching message is
1175 received within C<5> seconds, we assume an error and C<die>.
1176
1177 When an empty C<data> message is received we are done and can close the
1178 file (which is done automatically as C<$fh> goes out of scope):
1179
1180 length $data or last;
1181
1182 Otherwise we need to write the data:
1183
1184 aio_write $fh, undef, undef, $data, 0;
1185
1186 That's basically it. Note that every process should have some kind of
1187 supervisor. In our case, the supervisor simply prints any error message:
1188
1189 mon $ioserver, sub {
1190 warn "ioserver was killed: @_\n";
1191 };
1192
1193 Here is a usage example:
1194
1195 port_async {
1196 snd $ioserver, write_file => $SELF, "/tmp/unsafe";
1197 snd $ioserver, data => $SELF, "abc\n";
1198 snd $ioserver, data => $SELF, "def\n";
1199 snd $ioserver, data => $SELF;
1200 };
1201
1202 The messages are sent without any flow control or acknowledgement (feel
1203 free to improve). Also, the source port does not actually need to be a
1204 port - any unique ID will do - but port identifiers happen to be a simple
1205 source of network-wide unique IDs.
1206
1207 Apart from C<get_cond> as seen above, there are other ways to receive
1208 messages. The C<write_file> message above could also selectively be
1209 received using a C<get> call:
1210
1211 my ($port, $path) = get "write_file";
1212
1213 This is simpler, but when some other code part sends an unexpected message
1214 to the C<$ioserver> it will stay in the queue forever. As a rule of thumb,
1215 every threaded port should have a "fetch next message unconditionally"
1216 somewhere, to avoid filling up the queue.
1217
1218 It is also possible to switch-like C<get_conds>:
1219
1220 get_cond {
1221 $_[0] eq "msg1" and return sub {
1222 my (undef, @msg1_data) = @_;
1223 ...;
1224 };
1225
1226 $_[0] eq "msg2" and return sub {
1227 my (undef, @msg2_data) = @_;
1228 ...;
1229 };
1230
1231 die "unexpected message $_[0] received";
1232 };
1233
1234 =head1 THE END
1235
1236 This is the end of this introduction, but hopefully not the end of
1237 your career as AEMP user. I hope the tutorial was enough to make the
1238 basic concepts clear. Keep in mind that distributed programming is not
1239 completely trivial, that AnyEvent::MP is still in it's infancy, and I hope
1240 it will be useful to create exciting new applications.
1241
1242 =head1 SEE ALSO
1243
1244 L<AnyEvent::MP>
1245
1246 L<AnyEvent::MP::Global>
1247
1248 L<Coro::MP>
1249
1250 L<AnyEvent>
1251
1252 =head1 AUTHOR
1253
1254 Robin Redeker <elmex@ta-sa.org>
1255 Marc Lehmann <schmorp@schmorp.de>
1256