ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Intro.pod
Revision: 1.38
Committed: Mon Aug 31 18:45:05 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-0_95
Changes since 1.37: +3 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 Message Passing for the Non-Blocked Mind
2
3 =head1 Introduction and Terminology
4
5 This is a tutorial about how to get the swing of the new L<AnyEvent::MP>
6 module, which allows programs to transparently pass messages within the
7 process and to other processes on the same or a different host.
8
9 What kind of messages? Basically a message here means a list of Perl
10 strings, numbers, hashes and arrays, anything that can be expressed as a
11 L<JSON> text (as JSON is used by default in the protocol). Here are two
12 examples:
13
14 write_log => 1251555874, "action was successful.\n"
15 123, ["a", "b", "c"], { foo => "bar" }
16
17 When using L<AnyEvent::MP> it is customary to use a descriptive string as
18 first element of a message, that indictes the type of the message. This
19 element is called a I<tag> in L<AnyEvent::MP>, as some API functions
20 (C<rcv>) support matching it directly.
21
22 Supposedly you want to send a ping message with your current time to
23 somewhere, this is how such a message might look like (in Perl syntax):
24
25 ping => 1251381636
26
27 Now that we know what a message is, to which entities are those
28 messages being I<passed>? They are I<passed> to I<ports>. A I<port> is
29 a destination for messages but also a context to execute code: when
30 a runtime error occurs while executing code belonging to a port, the
31 exception will be raised on the port and can even travel to interested
32 parties on other nodes, which makes supervision of distributed processes
33 easy.
34
35 How do these ports relate to things you know? Each I<port> belongs
36 to a I<node>, and a I<node> is just the UNIX process that runs your
37 L<AnyEvent::MP> application.
38
39 Each I<node> is distinguished from other I<nodes> running on the same or
40 another host in a network by its I<node ID>. A I<node ID> is simply a
41 unique string chosen manually or assigned by L<AnyEvent::MP> in some way
42 (UNIX nodename, random string...).
43
44 Here is a diagram about how I<nodes>, I<ports> and UNIX processes relate
45 to each other. The setup consists of two nodes (more are of course
46 possible): Node C<A> (in UNIX process 7066) with the ports C<ABC> and
47 C<DEF>. And the node C<B> (in UNIX process 8321) with the ports C<FOO> and
48 C<BAR>.
49
50
51 |- PID: 7066 -| |- PID: 8321 -|
52 | | | |
53 | Node ID: A | | Node ID: B |
54 | | | |
55 | Port ABC =|= <----\ /-----> =|= Port FOO |
56 | | X | |
57 | Port DEF =|= <----/ \-----> =|= Port BAR |
58 | | | |
59 |-------------| |-------------|
60
61 The strings for the I<port IDs> here are just for illustrative
62 purposes: Even though I<ports> in L<AnyEvent::MP> are also identified by
63 strings, they can't be choosen manually and are assigned by the system
64 dynamically. These I<port IDs> are unique within a network and can also be
65 used to identify senders or as message tags for instance.
66
67 The next sections will explain the API of L<AnyEvent::MP> by going through
68 a few simple examples. Later some more complex idioms are introduced,
69 which are hopefully useful to solve some real world problems.
70
71 =head1 Passing Your First Message
72
73 As a start lets have a look at the messaging API. The following example
74 is just a demo to show the basic elements of message passing with
75 L<AnyEvent::MP>.
76
77 The example should print: C<Ending with: 123>, in a rather complicated
78 way, by passing some message to a port.
79
80 use AnyEvent;
81 use AnyEvent::MP;
82
83 my $end_cv = AnyEvent->condvar;
84
85 my $port = port;
86
87 rcv $port, test => sub {
88 my ($data) = @_;
89 $end_cv->send ($data);
90 };
91
92 snd $port, test => 123;
93
94 print "Ending with: " . $end_cv->recv . "\n";
95
96 It already uses most of the essential functions inside
97 L<AnyEvent::MP>: First there is the C<port> function which will create a
98 I<port> and will return it's I<port ID>, a simple string.
99
100 This I<port ID> can be used to send messages to the port and install
101 handlers to receive messages on the port. Since it is a simple string
102 it can be safely passed to other I<nodes> in the network when you want
103 to refer to that specific port (usually used for RPC, where you need
104 to tell the other end which I<port> to send the reply to - messages in
105 L<AnyEvent::MP> have a destination, but no source).
106
107 The next function is C<rcv>:
108
109 rcv $port, test => sub { ... };
110
111 It installs a receiver callback on the I<port> that specified as the first
112 argument (it only works for "local" ports, i.e. ports created on the same
113 node). The next argument, in this example C<test>, specifies a I<tag> to
114 match. This means that whenever a message with the first element being
115 the string C<test> is received, the callback is called with the remaining
116 parts of that message.
117
118 Messages can be sent with the C<snd> function, which is used like this in
119 the example above:
120
121 snd $port, test => 123;
122
123 This will send the message C<'test', 123> to the I<port> with the I<port
124 ID> stored in C<$port>. Since in this case the receiver has a I<tag> match
125 on C<test> it will call the callback with the first argument being the
126 number C<123>.
127
128 The callback is a typicall AnyEvent idiom: the callback just passes
129 that number on to the I<condition variable> C<$end_cv> which will then
130 pass the value to the print. Condition variables are out of the scope
131 of this tutorial and not often used with ports, so please consult the
132 L<AnyEvent::Intro> about them.
133
134 Passing messages inside just one process is boring. Before we can move on
135 and do interprocess message passing we first have to make sure some things
136 have been set up correctly for our nodes to talk to each other.
137
138 =head1 System Requirements and System Setup
139
140 Before we can start with real IPC we have to make sure some things work on
141 your system.
142
143 First we have to setup a I<shared secret>: for two L<AnyEvent::MP>
144 I<nodes> to be able to communicate with each other over the network it is
145 necessary to setup the same I<shared secret> for both of them, so they can
146 prove their trustworthyness to each other.
147
148 The easiest way is to set this up is to use the F<aemp> utility:
149
150 aemp gensecret
151
152 This creates a F<$HOME/.perl-anyevent-mp> config file and generates a
153 random shared secret. You can copy this file to any other system and
154 then communicate over the network (via TCP) with it. You can also select
155 your own shared secret (F<aemp setsecret>) and for increased security
156 requirements you can even create (or configure) a TLS certificate (F<aemp
157 gencert>), causing connections to not just be securely authenticated, but
158 also to be encrypted and protected against tinkering.
159
160 Connections will only be successfully established when the I<nodes>
161 that want to connect to each other have the same I<shared secret> (or
162 successfully verify the TLS certificate of the other side, in which case
163 no shared secret is required).
164
165 B<If something does not work as expected, and for example tcpdump shows
166 that the connections are closed almost immediately, you should make sure
167 that F<~/.perl-anyevent-mp> is the same on all hosts/user accounts that
168 you try to connect with each other!>
169
170 Thats is all for now, you will find some more advanced fiddling with the
171 C<aemp> utility later.
172
173 =head2 Shooting the Trouble
174
175 Sometimes things go wrong, and AnyEvent::MP, being a professional module,
176 does not gratitiously spill out messages to your screen.
177
178 To help troubleshooting any issues, there are two environment variables
179 that you can set. The first, C<PERL_ANYEVENT_MP_WARNLEVEL> sets the
180 logging level. The default is C<5>, which means nothing much is
181 printed. Youc an increase it to C<8> or C<9> to get more verbose
182 output. This is example output when starting a node:
183
184 2009-08-31 19:51:50 <8> node anon/5RloFvvYL8jfSScXNL8EpX starting up.
185 2009-08-31 19:51:50 <7> starting global service.
186 2009-08-31 19:51:50 <9> 10.0.0.17:4040 connected as ruth
187 2009-08-31 19:51:50 <7> ruth is up ()
188 2009-08-31 19:51:50 <9> ruth told us it knows about {"doom":["10.0.0.5:45143"],"rain":["10.0.0.19:4040"],"anon/4SYrtJ3ft5l1C16w2hto3t":["10.0.0.1:45920","[2002:58c6:438b:20:21d:60ff:fee8:6e36]:35788","[fd00::a00:1]:37104"],"frank":["10.0.0.18:4040"]}.
189 2009-08-31 19:51:50 <9> connecting to doom with [10.0.0.5:45143]
190 2009-08-31 19:51:50 <9> connecting to anon/4SYrtJ3ft5l1C16w2hto3t with [10.0.0.1:45920 [2002:58c6:438b:20:21d:60ff:fee8:6e36]:35788 [fd00::a00:1]:37104]
191 2009-08-31 19:51:50 <9> ruth told us its addresses (10.0.0.17:4040).
192
193 A lot of info, but at least you can see that it does something.
194
195 The other environment variable that can be useful is
196 C<PERL_ANYEVENT_MP_TRACE>, which, when set to a true value, will cause
197 most messages that are sent or received to be printed. In the above
198 example you would see something like:
199
200 SND ruth <- ["addr",["10.0.0.1:49358","[2002:58c6:438b:20:21d:60ff:fee8:6e36]:58884","[fd00::a00:1]:45006"]]
201 RCV ruth -> ["","AnyEvent::MP::_spawn","20QA7cWubCLTWUhFgBKOx2.x","AnyEvent::MP::Global::connect",0,"ruth"]
202 RCV ruth -> ["","mon1","20QA7cWubCLTWUhFgBKOx2.x"]
203 RCV ruth -> ["20QA7cWubCLTWUhFgBKOx2.x","addr",["10.0.0.17:4040"]]
204 RCV ruth -> ["20QA7cWubCLTWUhFgBKOx2.x","nodes",{"doom":["10.0.0.5:45143"],"rain":["10.0.0.19:4040"],"anon/4SYrtJ3ft5l1C16w2hto3t":["10.0.0.1:45920","[2002:58c6:438b:20:21d:60ff:fee8:6e36]:35788","[fd00::a00:1]:37104"],"frank":["10.0.0.18:4040"]}]
205
206 =head1 PART 1: Passing Messages Between Processes
207
208 =head2 The Receiver
209
210 Lets split the previous example up into two programs: one that contains
211 the sender and one for the receiver. First the receiver application, in
212 full:
213
214 use AnyEvent;
215 use AnyEvent::MP;
216 use AnyEvent::MP::Global;
217
218 configure nodeid => "eg_receiver", binds => ["*:4040"];
219
220 my $port = port;
221
222 AnyEvent::MP::Global::register $port, "eg_receivers";
223
224 rcv $port, test => sub {
225 my ($data, $reply_port) = @_;
226
227 print "Received data: " . $data . "\n";
228 };
229
230 AnyEvent->condvar->recv;
231
232 =head3 AnyEvent::MP::Global
233
234 Now, that wasn't too bad, was it? Ok, let's step through the new functions
235 and modules that have been used.
236
237 For starters, there is now an additional module being
238 used: L<AnyEvent::MP::Global>. This module provides us with a I<global
239 registry>, which lets us register ports in groups that are visible on all
240 I<nodes> in a network.
241
242 What is this useful for? Well, the I<port IDs> are random-looking strings,
243 assigned by L<AnyEvent::MP>. We cannot know those I<port IDs> in advance,
244 so we don't know which I<port ID> to send messages to, especially when the
245 message is to be passed between different I<nodes> (or UNIX processes). To
246 find the right I<port> of another I<node> in the network we will need
247 to communicate this somehow to the sender. And exactly that is what
248 L<AnyEvent::MP::Global> provides.
249
250 Especially in larger, more anonymous networks this is handy: imagine you
251 have a few database backends, a few web frontends and some processing
252 distributed over a number of hosts: all of these would simply register
253 themselves in the appropriate group, and your web frontends can start to
254 find some database backend.
255
256 =head3 C<configure> and the Network
257
258 Now, let's have a look at the new function, C<configure>:
259
260 configure nodeid => "eg_receiver", binds => ["*:4040"];
261
262 Before we are able to send messages to other nodes we have to initialise
263 ourself to become a "distributed node". Initialising a node means naming
264 the node, optionally binding some TCP listeners so that other nodes can
265 contact it and connecting to a predefined set of seed addresses so the
266 node can discover the existing network - and the existing network can
267 discover the node!
268
269 All of this (and more) can be passed to the C<configure> function - later
270 we will see how we can do all this without even passing anything to
271 C<configure>!
272
273 The first parameter, C<nodeid>, specified the node ID (in this case
274 C<eg_receiver> - the default is to use the node name of the current host,
275 but for this example we want to be able to run many nodes on the same
276 machine). Node IDs need to be unique within the network and can be almost
277 any string - if you don't care, you can specify a node ID of C<anon/>
278 which will then be replaced by a random node name.
279
280 The second parameter, C<binds>, specifies a list of C<address:port> pairs
281 to bind TCP listeners on. The special "address" of C<*> means to bind on
282 every local IP address.
283
284 The reason to bind on a TCP port is not just that other nodes can connect
285 to us: if no binds are specified, the node will still bind on a dynamic
286 port on all local addresses - but in this case we won't know the port, and
287 cannot tell other nodes to connect to it as seed node.
288
289 A I<seed> is a (fixed) TCP address of some other node in the network. To
290 explain the need for seeds we have to look at the topology of a typical
291 L<AnyEvent::MP> network. The topology is called a I<fully connected mesh>,
292 here an example with 4 nodes:
293
294 N1--N2
295 | \/ |
296 | /\ |
297 N3--N4
298
299 Now imagine another node - C<N5> - wants to connect itself to that network:
300
301 N1--N2
302 | \/ | N5
303 | /\ |
304 N3--N4
305
306 The new node needs to know the I<binds> of all nodes already
307 connected. Exactly this is what the I<seeds> are for: Let's assume that
308 the new node (C<N5>) uses the TCP address of the node C<N2> as seed. This
309 cuases it to connect to C<N2>:
310
311 N1--N2____
312 | \/ | N5
313 | /\ |
314 N3--N4
315
316 C<N2> then tells C<N5> about the I<binds> of the other nodes it is
317 connected to, and C<N5> creates the rest of the connections:
318
319 /--------\
320 N1--N2____|
321 | \/ | N5
322 | /\ | /|
323 N3--N4--- |
324 \________/
325
326 All done: C<N5> is now happily connected to the rest of the network.
327
328 Of course, this process takes time, during which the node is already
329 running. This also means it takes time until the node is fully connected,
330 and global groups and other information is available. The best way to deal
331 with this is to either retry regularly until you found the resource you
332 were looking for, or to only start services on demand after a node has
333 become available.
334
335 =head3 Registering the Receiver
336
337 Coming back to our example, we have now introduced the basic purpose of
338 L<AnyEvent::MP::Global> and C<configure> and its use of profiles. We
339 also set up our profiles for later use and now we will finally continue
340 talking about the receiver.
341
342 Let's look at the next line(s):
343
344 my $port = port;
345 AnyEvent::MP::Global::register $port, "eg_receivers";
346
347 The C<port> function has already been discussed. It simply creates a new
348 I<port> and returns the I<port ID>. The C<register> function, however,
349 is new: The first argument is the I<port ID> that we want to add to a
350 I<global group>, and its second argument is the name of that I<global
351 group>.
352
353 You can choose the name of such a I<global group> freely (prefixing your
354 package name is highly recommended!). The purpose of such a group is to
355 store a set of I<port IDs>. This set is made available throughout the
356 L<AnyEvent::MP> network, so that each node can see which ports belong to
357 that group.
358
359 Later we will see how the sender looks for the ports in this I<global
360 group> to send messages to them.
361
362 The last step in the example is to set up a receiver callback for those
363 messages, just as was discussed in the first example. We again match
364 for the tag C<test>. The difference is that this time we don't exit the
365 application after receiving the first message. Instead we continue to wait
366 for new messages indefinitely.
367
368 =head2 The Sender
369
370 Ok, now let's take a look at the sender code:
371
372 use AnyEvent;
373 use AnyEvent::MP;
374 use AnyEvent::MP::Global;
375
376 configure nodeid => "eg_sender", seeds => ["*:4040"];
377
378 my $find_timer =
379 AnyEvent->timer (after => 0, interval => 1, cb => sub {
380 my $ports = AnyEvent::MP::Global::find "eg_receivers"
381 or return;
382
383 snd $_, test => time
384 for @$ports;
385 });
386
387 AnyEvent->condvar->recv;
388
389 It's even less code. The C<configure> serves the same purpose as in the
390 receiver, but instead of specifying binds we specify a list of seeds -
391 which happens to be the same as the binds used by the receiver, which
392 becomes our seed node.
393
394 Next we set up a timer that repeatedly (every second) calls this chunk of
395 code:
396
397 my $ports = AnyEvent::MP::Global::find "eg_receivers"
398 or return;
399
400 snd $_, test => time
401 for @$ports;
402
403 The only new function here is the C<find> function of
404 L<AnyEvent::MP::Global>. It searches in the global group named
405 C<eg_receivers> for ports. If none are found, it returns C<undef>, which
406 makes our code return instantly and wait for the next round, as nobody is
407 interested in our message.
408
409 As soon as the receiver application has connected and the information
410 about the newly added port in the receiver has propagated to the sender
411 node, C<find> returns an array reference that contains the I<port ID> of
412 the receiver I<port(s)>.
413
414 We then just send a message with a tag and the current time to every
415 I<port> in the global group.
416
417 =head3 Splitting Network Configuration and Application Code
418
419 Ok, so far, this works. In the real world, however, the person configuring
420 your application to run on a specific network (the end user or network
421 administrator) is often different to the person coding the application.
422
423 Or to put it differently: the arguments passed to configure are usually
424 provided not by the programmer, but by whoever is deploying the program.
425
426 To make this easy, AnyEvent::MP supports a simple configuration database,
427 using profiles, which can be managed using the F<aemp> command-line
428 utility (yes, this section is about the advanced tinkering we mentioned
429 before).
430
431 When you change both programs above to simply call
432
433 configure;
434
435 then AnyEvent::MP tries to look up a profile using the current node name
436 in its configuration database, falling back to some global default.
437
438 You can run "generic" nodes using the F<aemp> utility as well, and we will
439 exploit this in the following way: we configure a profile "seed" and run
440 a node using it, whose sole purpose is to be a seed node for our example
441 programs.
442
443 We bind the seed node to port 4040 on all interfaces:
444
445 aemp profile seed binds "*:4040"
446
447 And we configure all nodes to use this as seed node (this only works when
448 running on the same host, for multiple machines you would provide the IP
449 address or hostname of the node running the seed), and use a random name
450 (because we want to start multiple nodes on the same host):
451
452 aemp seeds "*:4040" nodeid anon/
453
454 Then we run the seed node:
455
456 aemp run profile seed
457
458 After that, we can start as many other nodes as we want, and they will all
459 use our generic seed node to discover each other.
460
461 In fact, starting many receivers nicely illustrates that the time sender
462 can have multiple receivers.
463
464 That's all for now - next we will teach you about monitoring by writing a
465 simple chat client and server :)
466
467 =head1 PART 2: Monitoring, Supervising, Exception Handling and Recovery
468
469 That's a mouthful, so what does it mean? Our previous example is what one
470 could call "very loosely coupled" - the sender doesn't care about whether
471 there are any receivers, and the receivers do not care if there is any
472 sender.
473
474 This can work fine for simple services, but most real-world applications
475 want to ensure that the side they are expecting to be there is actually
476 there. Going one step further: most bigger real-world applications even
477 want to ensure that if some component is missing, or has crashed, it will
478 still be there, by recovering and restarting the service.
479
480 AnyEvent::MP supports this by catching exceptions and network problems,
481 and notifying interested parties of this.
482
483 =head2 Exceptions, Network Errors and Monitors
484
485 =head3 Exceptions
486
487 Exceptions are handled on a per-port basis: receive callbacks are executed
488 in a special context, the port-context, and code that throws an uncaught
489 exception will cause the port to be C<kil>led. Killed ports are destroyed
490 automatically (killing ports is the only way to free ports, incidentally).
491
492 Ports can be monitored, even from a different host, and when a port is
493 killed any entity monitoring it will be notified.
494
495 Here is a simple example:
496
497 use AnyEvent::MP;
498
499 # create a port, it always dies
500 my $port = port { die "oops" };
501
502 # monitor it
503 mon $port, sub {
504 warn "$port was killed (with reason @_)";
505 };
506
507 # now send it some message, causing it to die:
508 snd $port;
509
510 It first creates a port whose only action is to throw an exception,
511 and the monitors it with the C<mon> function. Afterwards it sends it a
512 message, causing it to die and call the monitoring callback:
513
514 anon/6WmIpj.a was killed (with reason die oops at xxx line 5.) at xxx line 9.
515
516 The callback was actually passed two arguments: C<die> (to indicate it did
517 throw an exception as opposed to, say, a network error) and the exception
518 message itself.
519
520 What happens when a port is killed before we have a chance to monitor
521 it? Granted, this is highly unlikely in our example, but when you program
522 in a network this can easily happen due to races between nodes.
523
524 use AnyEvent::MP;
525
526 my $port = port { die "oops" };
527
528 snd $port;
529
530 mon $port, sub {
531 warn "$port was killed (with reason @_)";
532 };
533
534 This time we will get something like:
535
536 anon/zpX.a was killed (with reason no_such_port cannot monitor nonexistent port)
537
538 Since the port was already gone, the kill reason is now C<no_such_port>
539 with some descriptive (we hope) error message.
540
541 In fact, the kill reason is usually some identifier as first argument
542 and a human-readable error message as second argument, but can be about
543 anything (it's a list) or even nothing - which is called a "normal" kill.
544
545 You can kill ports manually using the C<kil> function, which will be
546 treated like an error when any reason is specified:
547
548 kil $port, custom_error => "don't like your steenking face";
549
550 And a clean kill without any reason arguments:
551
552 kil $port;
553
554 By now you probably wonder what this "normal" kill business is: A common
555 idiom is to not specify a callback to C<mon>, but another port, such as
556 C<$SELF>:
557
558 mon $port, $SELF;
559
560 This basically means "monitor $port and kill me when it crashes". And a
561 "normal" kill does not count as a crash. This way you can easily link
562 ports together and make them crash together on errors (but allow you to
563 remove a port silently).
564
565 =head3 Port Context
566
567 When code runs in an environment where C<$SELF> contains its own port ID
568 and exceptions will be caught, it is said to run in a port context.
569
570 Since AnyEvent::MP is event-based, it is not uncommon to register
571 callbacks from C<rcv> handlers. As example, assume that the port receive
572 handler wants to C<die> a second later, using C<after>:
573
574 my $port = port {
575 after 1, sub { die "oops" };
576 };
577
578 Then you will find it does not work - when the after callback is executed,
579 it does not run in port context anymore, so exceptions will not be caught.
580
581 For these cases, AnyEvent::MP exports a special "close constructor" called
582 C<psub>, which works just like perl's builtin C<sub>:
583
584 my $port = port {
585 after 1, psub { die "oops" };
586 };
587
588 C<psub> stores C<$SELF> and returns a code reference. When the code
589 reference is invoked, it will run the code block within the context of
590 that port, so exception handling once more works as expected.
591
592 =head3 Network Errors and the AEMP Guarantee
593
594 I mentioned another important source of monitoring failures: network
595 problems. When a node loses connection to another node, it will invoke all
596 monitoring actions as if the port was killed, even if it is possible that
597 the port still lives happily on another node (not being able to talk to a
598 node means we have no clue what's going on with it, it could be crashed,
599 but also still running without knowing we lost the connection).
600
601 So another way to view monitors is "notify me when some of my messages
602 couldn't be delivered". AEMP has a guarantee about message delivery to a
603 port: After starting a monitor, any message sent to a port will either
604 be delivered, or, when it is lost, any further messages will also be lost
605 until the monitoring action is invoked. After that, further messages
606 I<might> get delivered again.
607
608 This doesn't sound like a very big guarantee, but it is kind of the best
609 you can get while staying sane: Specifically, it means that there will
610 be no "holes" in the message sequence: all messages sent are delivered
611 in order, without any missing in between, and when some were lost, you
612 I<will> be notified of that, so you can take recovery action.
613
614 =head3 Supervising
615
616 Ok, so what is this crashing-everything-stuff going to make applications
617 I<more> stable? Well in fact, the goal is not really to make them more
618 stable, but to make them more resilient against actual errors and
619 crashes. And this is not done by crashing I<everything>, but by crashing
620 everything except a supervisor.
621
622 A supervisor is simply some code that ensures that an application (or a
623 part of it) is running, and if it crashes, is restarted properly.
624
625 To show how to do all this we will create a simple chat server that can
626 handle many chat clients. Both server and clients can be killed and
627 restarted, and even crash, to some extent.
628
629 =head2 Chatting, the Resilient Way
630
631 Without further ado, here is the chat server (to run it, we assume the
632 set-up explained earlier, with a separate F<aemp run> seed node):
633
634 use common::sense;
635 use AnyEvent::MP;
636 use AnyEvent::MP::Global;
637
638 configure;
639
640 my %clients;
641
642 sub msg {
643 print "relaying: $_[0]\n";
644 snd $_, $_[0]
645 for values %clients;
646 }
647
648 our $server = port;
649
650 rcv $server, join => sub {
651 my ($client, $nick) = @_;
652
653 $clients{$client} = $client;
654
655 mon $client, sub {
656 delete $clients{$client};
657 msg "$nick (quits, @_)";
658 };
659 msg "$nick (joins)";
660 };
661
662 rcv $server, privmsg => sub {
663 my ($nick, $msg) = @_;
664 msg "$nick: $msg";
665 };
666
667 AnyEvent::MP::Global::register $server, "eg_chat_server";
668
669 warn "server ready.\n";
670
671 AnyEvent->condvar->recv;
672
673 Looks like a lot, but it is actually quite simple: after your usual
674 preamble (this time we use common sense), we define a helper function that
675 sends some message to every registered chat client:
676
677 sub msg {
678 print "relaying: $_[0]\n";
679 snd $_, $_[0]
680 for values %clients;
681 }
682
683 The clients are stored in the hash C<%client>. Then we define a server
684 port and install two receivers on it, C<join>, which is sent by clients
685 to join the chat, and C<privmsg>, that clients use to send actual chat
686 messages.
687
688 C<join> is most complicated. It expects the client port and the nickname
689 to be passed in the message, and registers the client in C<%clients>.
690
691 rcv $server, join => sub {
692 my ($client, $nick) = @_;
693
694 $clients{$client} = $client;
695
696 The next step is to monitor the client. The monitoring action removes the
697 client and sends a quit message with the error to all remaining clients.
698
699 mon $client, sub {
700 delete $clients{$client};
701 msg "$nick (quits, @_)";
702 };
703
704 And finally, it creates a join message and sends it to all clients.
705
706 msg "$nick (joins)";
707 };
708
709 The C<privmsg> callback simply broadcasts the message to all clients:
710
711 rcv $server, privmsg => sub {
712 my ($nick, $msg) = @_;
713 msg "$nick: $msg";
714 };
715
716 And finally, the server registers itself in the server group, so that
717 clients can find it:
718
719 AnyEvent::MP::Global::register $server, "eg_chat_server";
720
721 Well, well... and where is this supervisor stuff? Well... we cheated,
722 it's not there. To not overcomplicate the example, we only put it into
723 the..... CLIENT!
724
725 =head3 The Client, and a Supervisor!
726
727 Again, here is the client, including supervisor, which makes it a bit
728 longer:
729
730 use common::sense;
731 use AnyEvent::MP;
732 use AnyEvent::MP::Global;
733
734 my $nick = shift;
735
736 configure;
737
738 my ($client, $server);
739
740 sub server_connect {
741 my $servernodes = AnyEvent::MP::Global::find "eg_chat_server"
742 or return after 1, \&server_connect;
743
744 print "\rconnecting...\n";
745
746 $client = port { print "\r \r@_\n> " };
747 mon $client, sub {
748 print "\rdisconnected @_\n";
749 &server_connect;
750 };
751
752 $server = $servernodes->[0];
753 snd $server, join => $client, $nick;
754 mon $server, $client;
755 }
756
757 server_connect;
758
759 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
760 chomp (my $line = <STDIN>);
761 print "> ";
762 snd $server, privmsg => $nick, $line
763 if $server;
764 });
765
766 $| = 1;
767 print "> ";
768 AnyEvent->condvar->recv;
769
770 The first thing the client does is to store the nick name (which is
771 expected as the only command line argument) in C<$nick>, for further
772 usage.
773
774 The next relevant thing is... finally... the supervisor:
775
776 sub server_connect {
777 my $servernodes = AnyEvent::MP::Global::find "eg_chat_server"
778 or return after 1, \&server_connect;
779
780 This looks up the server in the C<eg_chat_server> global group. If it
781 cannot find it (which is likely when the node is just starting up),
782 it will wait a second and then retry. This "wait a bit and retry"
783 is an important pattern, as distributed programming means lots of
784 things are going on asynchronously. In practise, one should use a more
785 intelligent algorithm, to possibly warn after an excessive number of
786 retries. Hopefully future versions of AnyEvent::MP will offer some
787 predefined supervisors, for now you will have to code it on your own.
788
789 Next it creates a local port for the server to send messages to, and
790 monitors it. When the port is killed, it will print "disconnected" and
791 tell the supervisor function to retry again.
792
793 $client = port { print "\r \r@_\n> " };
794 mon $client, sub {
795 print "\rdisconnected @_\n";
796 &server_connect;
797 };
798
799 Then everything is ready: the client will send a C<join> message with it's
800 local port to the server, and start monitoring it:
801
802 $server = $servernodes->[0];
803 snd $server, join => $client, $nick;
804 mon $server, $client;
805 }
806
807 The monitor will ensure that if the server crashes or goes away, the
808 client will be killed as well. This tells the user that the client was
809 disconnected, and will then start to connect the server again.
810
811 The rest of the program deals with the boring details of actually invoking
812 the supervisor function to start the whole client process and handle the
813 actual terminal input, sending it to the server.
814
815 You should now try to start the server and one or more clients in different
816 terminal windows (and the seed node):
817
818 perl eg/chat_client nick1
819 perl eg/chat_client nick2
820 perl eg/chat_server
821 aemp run profile seed
822
823 And then you can experiment with chatting, killing one or more clients, or
824 stopping and restarting the server, to see the monitoring in action.
825
826 The crucial point you should understand from this example is that
827 monitoring is usually symmetric: when you monitor some other port,
828 potentially on another node, that other port usually should monitor you,
829 too, so when the connection dies, both ports get killed, or at least both
830 sides can take corrective action. Exceptions are "servers" that serve
831 multiple clients at once and might only wish to clean up, and supervisors,
832 who of course should not normally get killed (unless they, too, have a
833 supervisor).
834
835 If you often think in object-oriented terms, then treat a port as an
836 object, C<port> is the constructor, the receive callbacks set by C<rcv>
837 act as methods, the C<kil> function becomes the explicit destructor and
838 C<mon> installs a destructor hook. Unlike conventional object oriented
839 programming, it can make sense to exchange ports more freely (for example,
840 to monitor one port from another).
841
842 There is ample room for improvement: the server should probably remember
843 the nickname in the C<join> handler instead of expecting it in every chat
844 message, it should probably monitor itself, and the client should not try
845 to send any messages unless a server is actually connected.
846
847 =head1 PART 3: TIMTOWTDI: Virtual Connections
848
849 The chat system developed in the previous sections is very "traditional"
850 in a way: you start some server(s) and some clients statically and they
851 start talking to each other.
852
853 Sometimes applications work more like "services": They can run on almost
854 any node and talks to itself on other nodes. The L<AnyEvent::MP::Global>
855 service for example monitors nodes joining the network and starts itself
856 automatically on other nodes (if it isn't running already).
857
858 A good way to design such applications is to put them into a module and
859 create "virtual connections" to other nodes - we call this the "bridge
860 head" method, because you start by creating a remote port (the bridge
861 head) and from that you start to bootstrap your application.
862
863 Since that sounds rather theoretical, let's redesign the chat server and
864 client using this design method.
865
866 Here is the server:
867
868 use common::sense;
869 use AnyEvent::MP;
870 use AnyEvent::MP::Global;
871
872 configure;
873
874 AnyEvent::MP::Global::register $NODE, "eg_chat_server2";
875
876 my %clients;
877
878 sub msg {
879 print "relaying: $_[0]\n";
880 snd $_, $_[0]
881 for values %clients;
882 }
883
884 sub client_connect {
885 my ($client, $nick) = @_;
886
887 mon $client;
888 mon $client, sub {
889 delete $clients{$client};
890 msg "$nick (quits, @_)";
891 };
892
893 $clients{$client} = $client;
894
895 msg "$nick (joins)";
896
897 rcv $SELF, sub { msg "$nick: $_[0]" };
898 }
899
900 warn "server ready.\n";
901
902 AnyEvent->condvar->recv;
903
904 It starts not much different, except that this time, we register the node
905 port and not any special port - the clients only want to know which node
906 the server should be running, and in fact, they could also sue some kind
907 of election mechanism or similar.
908
909 The interesting change is that no port is created - the server is all
910 code, and does nothing. All it does is define a function C<client_connect>
911 that expects a client port and a nick as arguments. It then monitors the
912 client port and binds a receive callback on C<$SELF> that expects messages
913 to broadcast to all clients.
914
915 The two C<mon> calls are a bit tricky - the first C<mon> is a shorthand
916 for C<mon $client, $SELF>. The second does the normal "client has gone
917 away" clean-up action. Both could actually be rolled into one C<mon>
918 action.
919
920 C<$SELF> is a good hint that something interetsing is going on. And
921 indeed, when looking at the client, there is a new function, C<spawn>:
922
923 use common::sense;
924 use AnyEvent::MP;
925 use AnyEvent::MP::Global;
926
927 my $nick = shift;
928
929 configure;
930
931 $| = 1;
932
933 my $port = port;
934
935 my ($client, $server);
936
937 sub server_connect {
938 my $servernodes = AnyEvent::MP::Global::find "eg_chat_server2"
939 or return after 1, \&server_connect;
940
941 print "\rconnecting...\n";
942
943 $client = port { print "\r \r@_\n> " };
944 mon $client, sub {
945 print "\rdisconnected @_\n";
946 &server_connect;
947 };
948
949 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
950 mon $server, $client;
951 }
952
953 server_connect;
954
955 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
956 chomp (my $line = <STDIN>);
957 print "> ";
958 snd $server, $line
959 if $server;
960 });
961
962 print "> ";
963 AnyEvent->condvar->recv;
964
965 The client is quite similar to the previous one, but instead of contacting
966 the server port (which no longer exists), it C<spawn>s a new port on the
967 server I<node>:
968
969 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
970 mon $server, $client;
971
972 And of course immediately monitors it. The C<spawn> function creates a new
973 port on a remote node and returns its port ID. After creating the port it
974 calls a function on the remote node, passing any remaining arguments to
975 it, and - most importantly - within the context of the new port. The init
976 function can reside in a module (actually it normally I<should> reside
977 in a module) - AnyEvent::MP will automatically load the module if the
978 function isn't defined.
979
980 The C<spawn> function returns immediately, which means you can immediately
981 send messages to the port, long before the remote node has even heard
982 of our request to create a port on it. In fact, the remote node might
983 not even be running. Despite these troubling facts, everything should
984 work just fine: if the node isn't running (or the init function throws an
985 exception), then the monitor will trigger because the port doesn't exist.
986
987 If the spawn message gets delivered, but the monitoring message is not
988 because of network problems (monitoring, after all, is implemented by
989 passing a message, and messages can get lost), then this connection loss
990 will eventually trigger the monitoring action. On the remote node (which
991 reciprocally monitors the client) the port will also be cleaned up on
992 connection loss. When the node comes up and our monitoring message can be
993 delivered it will instantly fail because the port has been cleaned up in
994 the meantime.
995
996 If your head is spinning by now, that's fine - just keep in mind, after
997 creating a port, monitor "the other side" from it, and all will be cleaned
998 up just fine.
999
1000 =head2 Services
1001
1002 Above it was mentioned that C<spawn> automatically loads modules, and this
1003 can be exploited in various ways.
1004
1005 Assume for a moment you put the server into a file called
1006 F<mymod/chatserver.pm> reachable from the current directory. Then you
1007 could run a node there with:
1008
1009 aemp run
1010
1011 The other nodes could C<spawn> the server by using
1012 C<mymod::chatserver::client_connect> as init function.
1013
1014 Likewise, when you have some service that starts automatically (similar to
1015 AnyEvent::MP::Global), then you can configure this service statically:
1016
1017 aemp profile mysrvnode services mymod::service::
1018 aemp run profile mysrvnode
1019
1020 And the module will automatically be started in the node, as specifying a
1021 module name (with C<::>-suffix) will simply load the module, which is then
1022 free to do whatever it wants.
1023
1024 Of course, you can also do it in the much more standard way by writing
1025 a module (e.g. C<BK::Backend::IRC>), installing it as part of a module
1026 distribution and then configure nodes, for example, if I want to run the
1027 Bummskraut IRC backend on a machine named "ruth", I could do this:
1028
1029 aemp profile ruth addservice BK::Backend::IRC::
1030
1031 And any F<aemp run> on that host will automaticlaly have the bummskraut
1032 irc backend running.
1033
1034 That's plenty of possibilities you can use - it's all up to you how you
1035 structure your application.
1036
1037 =head1 THE END
1038
1039 This is the end of this introduction, but hopefully not the end of
1040 your career als AEMP user. I hope the tutorial was enough to make the
1041 basic concepts clear. Keep in mind that distributed programming is not
1042 completely trivial, that AnyEvent::MP is still in it's infancy, and I hope
1043 it will be useful to create exciting new applications.
1044
1045 =head1 SEE ALSO
1046
1047 L<AnyEvent::MP>
1048
1049 L<AnyEvent::MP::Global>
1050
1051 L<AnyEvent>
1052
1053 =head1 AUTHOR
1054
1055 Robin Redeker <elmex@ta-sa.org>
1056 Marc Lehmann <schmorp@schmorp.de>
1057