ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Intro.pod
Revision: 1.34
Committed: Mon Aug 31 17:49:02 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.33: +183 -5 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 Message Passing for the Non-Blocked Mind
2
3 =head1 Introduction and Terminology
4
5 This is a tutorial about how to get the swing of the new L<AnyEvent::MP>
6 module, which allows programs to transparently pass messages within the
7 process and to other processes on the same or a different host.
8
9 What kind of messages? Basically a message here means a list of Perl
10 strings, numbers, hashes and arrays, anything that can be expressed as a
11 L<JSON> text (as JSON is used by default in the protocol). Here are two
12 examples:
13
14 write_log => 1251555874, "action was successful.\n"
15 123, ["a", "b", "c"], { foo => "bar" }
16
17 When using L<AnyEvent::MP> it is customary to use a descriptive string as
18 first element of a message, that indictes the type of the message. This
19 element is called a I<tag> in L<AnyEvent::MP>, as some API functions
20 (C<rcv>) support matching it directly.
21
22 Supposedly you want to send a ping message with your current time to
23 somewhere, this is how such a message might look like (in Perl syntax):
24
25 ping => 1251381636
26
27 Now that we know what a message is, to which entities are those
28 messages being I<passed>? They are I<passed> to I<ports>. A I<port> is
29 a destination for messages but also a context to execute code: when
30 a runtime error occurs while executing code belonging to a port, the
31 exception will be raised on the port and can even travel to interested
32 parties on other nodes, which makes supervision of distributed processes
33 easy.
34
35 How do these ports relate to things you know? Each I<port> belongs
36 to a I<node>, and a I<node> is just the UNIX process that runs your
37 L<AnyEvent::MP> application.
38
39 Each I<node> is distinguished from other I<nodes> running on the same or
40 another host in a network by its I<node ID>. A I<node ID> is simply a
41 unique string chosen manually or assigned by L<AnyEvent::MP> in some way
42 (UNIX nodename, random string...).
43
44 Here is a diagram about how I<nodes>, I<ports> and UNIX processes relate
45 to each other. The setup consists of two nodes (more are of course
46 possible): Node C<A> (in UNIX process 7066) with the ports C<ABC> and
47 C<DEF>. And the node C<B> (in UNIX process 8321) with the ports C<FOO> and
48 C<BAR>.
49
50
51 |- PID: 7066 -| |- PID: 8321 -|
52 | | | |
53 | Node ID: A | | Node ID: B |
54 | | | |
55 | Port ABC =|= <----\ /-----> =|= Port FOO |
56 | | X | |
57 | Port DEF =|= <----/ \-----> =|= Port BAR |
58 | | | |
59 |-------------| |-------------|
60
61 The strings for the I<port IDs> here are just for illustrative
62 purposes: Even though I<ports> in L<AnyEvent::MP> are also identified by
63 strings, they can't be choosen manually and are assigned by the system
64 dynamically. These I<port IDs> are unique within a network and can also be
65 used to identify senders or as message tags for instance.
66
67 The next sections will explain the API of L<AnyEvent::MP> by going through
68 a few simple examples. Later some more complex idioms are introduced,
69 which are hopefully useful to solve some real world problems.
70
71 =head1 Passing Your First Message
72
73 As a start lets have a look at the messaging API. The following example
74 is just a demo to show the basic elements of message passing with
75 L<AnyEvent::MP>.
76
77 The example should print: C<Ending with: 123>, in a rather complicated
78 way, by passing some message to a port.
79
80 use AnyEvent;
81 use AnyEvent::MP;
82
83 my $end_cv = AnyEvent->condvar;
84
85 my $port = port;
86
87 rcv $port, test => sub {
88 my ($data) = @_;
89 $end_cv->send ($data);
90 };
91
92 snd $port, test => 123;
93
94 print "Ending with: " . $end_cv->recv . "\n";
95
96 It already uses most of the essential functions inside
97 L<AnyEvent::MP>: First there is the C<port> function which will create a
98 I<port> and will return it's I<port ID>, a simple string.
99
100 This I<port ID> can be used to send messages to the port and install
101 handlers to receive messages on the port. Since it is a simple string
102 it can be safely passed to other I<nodes> in the network when you want
103 to refer to that specific port (usually used for RPC, where you need
104 to tell the other end which I<port> to send the reply to - messages in
105 L<AnyEvent::MP> have a destination, but no source).
106
107 The next function is C<rcv>:
108
109 rcv $port, test => sub { ... };
110
111 It installs a receiver callback on the I<port> that specified as the first
112 argument (it only works for "local" ports, i.e. ports created on the same
113 node). The next argument, in this example C<test>, specifies a I<tag> to
114 match. This means that whenever a message with the first element being
115 the string C<test> is received, the callback is called with the remaining
116 parts of that message.
117
118 Messages can be sent with the C<snd> function, which is used like this in
119 the example above:
120
121 snd $port, test => 123;
122
123 This will send the message C<'test', 123> to the I<port> with the I<port
124 ID> stored in C<$port>. Since in this case the receiver has a I<tag> match
125 on C<test> it will call the callback with the first argument being the
126 number C<123>.
127
128 The callback is a typicall AnyEvent idiom: the callback just passes
129 that number on to the I<condition variable> C<$end_cv> which will then
130 pass the value to the print. Condition variables are out of the scope
131 of this tutorial and not often used with ports, so please consult the
132 L<AnyEvent::Intro> about them.
133
134 Passing messages inside just one process is boring. Before we can move on
135 and do interprocess message passing we first have to make sure some things
136 have been set up correctly for our nodes to talk to each other.
137
138 =head1 System Requirements and System Setup
139
140 Before we can start with real IPC we have to make sure some things work on
141 your system.
142
143 First we have to setup a I<shared secret>: for two L<AnyEvent::MP>
144 I<nodes> to be able to communicate with each other over the network it is
145 necessary to setup the same I<shared secret> for both of them, so they can
146 prove their trustworthyness to each other.
147
148 The easiest way is to set this up is to use the F<aemp> utility:
149
150 aemp gensecret
151
152 This creates a F<$HOME/.perl-anyevent-mp> config file and generates a
153 random shared secret. You can copy this file to any other system and
154 then communicate over the network (via TCP) with it. You can also select
155 your own shared secret (F<aemp setsecret>) and for increased security
156 requirements you can even create (or configure) a TLS certificate (F<aemp
157 gencert>), causing connections to not just be securely authenticated, but
158 also to be encrypted and protected against tinkering.
159
160 Connections will only be successfully established when the I<nodes>
161 that want to connect to each other have the same I<shared secret> (or
162 successfully verify the TLS certificate of the other side, in which case
163 no shared secret is required).
164
165 B<If something does not work as expected, and for example tcpdump shows
166 that the connections are closed almost immediately, you should make sure
167 that F<~/.perl-anyevent-mp> is the same on all hosts/user accounts that
168 you try to connect with each other!>
169
170 Thats is all for now, you will find some more advanced fiddling with the
171 C<aemp> utility later.
172
173
174 =head1 PART 1: Passing Messages Between Processes
175
176 =head2 The Receiver
177
178 Lets split the previous example up into two programs: one that contains
179 the sender and one for the receiver. First the receiver application, in
180 full:
181
182 use AnyEvent;
183 use AnyEvent::MP;
184 use AnyEvent::MP::Global;
185
186 configure nodeid => "eg_receiver", binds => ["*:4040"];
187
188 my $port = port;
189
190 AnyEvent::MP::Global::register $port, "eg_receivers";
191
192 rcv $port, test => sub {
193 my ($data, $reply_port) = @_;
194
195 print "Received data: " . $data . "\n";
196 };
197
198 AnyEvent->condvar->recv;
199
200 =head3 AnyEvent::MP::Global
201
202 Now, that wasn't too bad, was it? Ok, let's step through the new functions
203 and modules that have been used.
204
205 For starters, there is now an additional module being
206 used: L<AnyEvent::MP::Global>. This module provides us with a I<global
207 registry>, which lets us register ports in groups that are visible on all
208 I<nodes> in a network.
209
210 What is this useful for? Well, the I<port IDs> are random-looking strings,
211 assigned by L<AnyEvent::MP>. We cannot know those I<port IDs> in advance,
212 so we don't know which I<port ID> to send messages to, especially when the
213 message is to be passed between different I<nodes> (or UNIX processes). To
214 find the right I<port> of another I<node> in the network we will need
215 to communicate this somehow to the sender. And exactly that is what
216 L<AnyEvent::MP::Global> provides.
217
218 Especially in larger, more anonymous networks this is handy: imagine you
219 have a few database backends, a few web frontends and some processing
220 distributed over a number of hosts: all of these would simply register
221 themselves in the appropriate group, and your web frontends can start to
222 find some database backend.
223
224 =head3 C<configure> and the Network
225
226 Now, let's have a look at the new function, C<configure>:
227
228 configure nodeid => "eg_receiver", binds => ["*:4040"];
229
230 Before we are able to send messages to other nodes we have to initialise
231 ourself to become a "distributed node". Initialising a node means naming
232 the node, optionally binding some TCP listeners so that other nodes can
233 contact it and connecting to a predefined set of seed addresses so the
234 node can discover the existing network - and the existing network can
235 discover the node!
236
237 All of this (and more) can be passed to the C<configure> function - later
238 we will see how we can do all this without even passing anything to
239 C<configure>!
240
241 The first parameter, C<nodeid>, specified the node ID (in this case
242 C<eg_receiver> - the default is to use the node name of the current host,
243 but for this example we want to be able to run many nodes on the same
244 machine). Node IDs need to be unique within the network and can be almost
245 any string - if you don't care, you can specify a node ID of C<anon/>
246 which will then be replaced by a random node name.
247
248 The second parameter, C<binds>, specifies a list of C<address:port> pairs
249 to bind TCP listeners on. The special "address" of C<*> means to bind on
250 every local IP address.
251
252 The reason to bind on a TCP port is not just that other nodes can connect
253 to us: if no binds are specified, the node will still bind on a dynamic
254 port on all local addresses - but in this case we won't know the port, and
255 cannot tell other nodes to connect to it as seed node.
256
257 A I<seed> is a (fixed) TCP address of some other node in the network. To
258 explain the need for seeds we have to look at the topology of a typical
259 L<AnyEvent::MP> network. The topology is called a I<fully connected mesh>,
260 here an example with 4 nodes:
261
262 N1--N2
263 | \/ |
264 | /\ |
265 N3--N4
266
267 Now imagine another node - C<N5> - wants to connect itself to that network:
268
269 N1--N2
270 | \/ | N5
271 | /\ |
272 N3--N4
273
274 The new node needs to know the I<binds> of all nodes already
275 connected. Exactly this is what the I<seeds> are for: Let's assume that
276 the new node (C<N5>) uses the TCP address of the node C<N2> as seed. This
277 cuases it to connect to C<N2>:
278
279 N1--N2____
280 | \/ | N5
281 | /\ |
282 N3--N4
283
284 C<N2> then tells C<N5> about the I<binds> of the other nodes it is
285 connected to, and C<N5> creates the rest of the connections:
286
287 /--------\
288 N1--N2____|
289 | \/ | N5
290 | /\ | /|
291 N3--N4--- |
292 \________/
293
294 All done: C<N5> is now happily connected to the rest of the network.
295
296 Of course, this process takes time, during which the node is already
297 running. This also means it takes time until the node is fully connected,
298 and global groups and other information is available. The best way to deal
299 with this is to either retry regularly until you found the resource you
300 were looking for, or to only start services on demand after a node has
301 become available.
302
303 =head3 Registering the Receiver
304
305 Coming back to our example, we have now introduced the basic purpose of
306 L<AnyEvent::MP::Global> and C<configure> and its use of profiles. We
307 also set up our profiles for later use and now we will finally continue
308 talking about the receiver.
309
310 Let's look at the next line(s):
311
312 my $port = port;
313 AnyEvent::MP::Global::register $port, "eg_receivers";
314
315 The C<port> function has already been discussed. It simply creates a new
316 I<port> and returns the I<port ID>. The C<register> function, however,
317 is new: The first argument is the I<port ID> that we want to add to a
318 I<global group>, and its second argument is the name of that I<global
319 group>.
320
321 You can choose the name of such a I<global group> freely (prefixing your
322 package name is highly recommended!). The purpose of such a group is to
323 store a set of I<port IDs>. This set is made available throughout the
324 L<AnyEvent::MP> network, so that each node can see which ports belong to
325 that group.
326
327 Later we will see how the sender looks for the ports in this I<global
328 group> to send messages to them.
329
330 The last step in the example is to set up a receiver callback for those
331 messages, just as was discussed in the first example. We again match
332 for the tag C<test>. The difference is that this time we don't exit the
333 application after receiving the first message. Instead we continue to wait
334 for new messages indefinitely.
335
336 =head2 The Sender
337
338 Ok, now let's take a look at the sender code:
339
340 use AnyEvent;
341 use AnyEvent::MP;
342 use AnyEvent::MP::Global;
343
344 configure nodeid => "eg_sender", seeds => ["*:4040"];
345
346 my $find_timer =
347 AnyEvent->timer (after => 0, interval => 1, cb => sub {
348 my $ports = AnyEvent::MP::Global::find "eg_receivers"
349 or return;
350
351 snd $_, test => time
352 for @$ports;
353 });
354
355 AnyEvent->condvar->recv;
356
357 It's even less code. The C<configure> serves the same purpose as in the
358 receiver, but instead of specifying binds we specify a list of seeds -
359 which happens to be the same as the binds used by the receiver, which
360 becomes our seed node.
361
362 Next we set up a timer that repeatedly (every second) calls this chunk of
363 code:
364
365 my $ports = AnyEvent::MP::Global::find "eg_receivers"
366 or return;
367
368 snd $_, test => time
369 for @$ports;
370
371 The only new function here is the C<find> function of
372 L<AnyEvent::MP::Global>. It searches in the global group named
373 C<eg_receivers> for ports. If none are found, it returns C<undef>, which
374 makes our code return instantly and wait for the next round, as nobody is
375 interested in our message.
376
377 As soon as the receiver application has connected and the information
378 about the newly added port in the receiver has propagated to the sender
379 node, C<find> returns an array reference that contains the I<port ID> of
380 the receiver I<port(s)>.
381
382 We then just send a message with a tag and the current time to every
383 I<port> in the global group.
384
385 =head3 Splitting Network Configuration and Application Code
386
387 Ok, so far, this works. In the real world, however, the person configuring
388 your application to run on a specific network (the end user or network
389 administrator) is often different to the person coding the application.
390
391 Or to put it differently: the arguments passed to configure are usually
392 provided not by the programmer, but by whoever is deploying the program.
393
394 To make this easy, AnyEvent::MP supports a simple configuration database,
395 using profiles, which can be managed using the F<aemp> command-line
396 utility (yes, this section is about the advanced tinkering we mentioned
397 before).
398
399 When you change both programs above to simply call
400
401 configure;
402
403 then AnyEvent::MP tries to look up a profile using the current node name
404 in its configuration database, falling back to some global default.
405
406 You can run "generic" nodes using the F<aemp> utility as well, and we will
407 exploit this in the following way: we configure a profile "seed" and run
408 a node using it, whose sole purpose is to be a seed node for our example
409 programs.
410
411 We bind the seed node to port 4040 on all interfaces:
412
413 aemp profile seed binds "*:4040"
414
415 And we configure all nodes to use this as seed node (this only works when
416 running on the same host, for multiple machines you would provide the IP
417 address or hostname of the node running the seed), and use a random name
418 (because we want to start multiple nodes on the same host):
419
420 aemp seeds "*:4040" nodeid anon/
421
422 Then we run the seed node:
423
424 aemp run profile seed
425
426 After that, we can start as many other nodes as we want, and they will all
427 use our generic seed node to discover each other.
428
429 In fact, starting many receivers nicely illustrates that the time sender
430 can have multiple receivers.
431
432 That's all for now - next we will teach you about monitoring by writing a
433 simple chat client and server :)
434
435 =head1 PART 2: Monitoring, Supervising, Exception Handling and Recovery
436
437 That's a mouthful, so what does it mean? Our previous example is what one
438 could call "very loosely coupled" - the sender doesn't care about whether
439 there are any receivers, and the receivers do not care if there is any
440 sender.
441
442 This can work fine for simple services, but most real-world applications
443 want to ensure that the side they are expecting to be there is actually
444 there. Going one step further: most bigger real-world applications even
445 want to ensure that if some component is missing, or has crashed, it will
446 still be there, by recovering and restarting the service.
447
448 AnyEvent::MP supports this by catching exceptions and network problems,
449 and notifying interested parties of this.
450
451 =head2 Exceptions, Network Errors and Monitors
452
453 =head3 Exceptions
454
455 Exceptions are handled on a per-port basis: receive callbacks are executed
456 in a special context, the port-context, and code that throws an uncaught
457 exception will cause the port to be C<kil>led. Killed ports are destroyed
458 automatically (killing ports is the only way to free ports, incidentally).
459
460 Ports can be monitored, even from a different host, and when a port is
461 killed any entity monitoring it will be notified.
462
463 Here is a simple example:
464
465 use AnyEvent::MP;
466
467 # create a port, it always dies
468 my $port = port { die "oops" };
469
470 # monitor it
471 mon $port, sub {
472 warn "$port was killed (with reason @_)";
473 };
474
475 # now send it some message, causing it to die:
476 snd $port;
477
478 It first creates a port whose only action is to throw an exception,
479 and the monitors it with the C<mon> function. Afterwards it sends it a
480 message, causing it to die and call the monitoring callback:
481
482 anon/6WmIpj.a was killed (with reason die oops at xxx line 5.) at xxx line 9.
483
484 The callback was actually passed two arguments: C<die> (to indicate it did
485 throw an exception as opposed to, say, a network error) and the exception
486 message itself.
487
488 What happens when a port is killed before we have a chance to monitor
489 it? Granted, this is highly unlikely in our example, but when you program
490 in a network this can easily happen due to races between nodes.
491
492 use AnyEvent::MP;
493
494 my $port = port { die "oops" };
495
496 snd $port;
497
498 mon $port, sub {
499 warn "$port was killed (with reason @_)";
500 };
501
502 This time we will get something like:
503
504 anon/zpX.a was killed (with reason no_such_port cannot monitor nonexistent port)
505
506 Since the port was already gone, the kill reason is now C<no_such_port>
507 with some descriptive (we hope) error message.
508
509 In fact, the kill reason is usually some identifier as first argument
510 and a human-readable error message as second argument, but can be about
511 anything (it's a list) or even nothing - which is called a "normal" kill.
512
513 You can kill ports manually using the C<kil> function, which will be
514 treated like an error when any reason is specified:
515
516 kil $port, custom_error => "don't like your steenking face";
517
518 And a clean kill without any reason arguments:
519
520 kil $port;
521
522 By now you probably wonder what this "normal" kill business is: A common
523 idiom is to not specify a callback to C<mon>, but another port, such as
524 C<$SELF>:
525
526 mon $port, $SELF;
527
528 This basically means "monitor $port and kill me when it crashes". And a
529 "normal" kill does not count as a crash. This way you can easily link
530 ports together and make them crash together on errors (but allow you to
531 remove a port silently).
532
533 =head3 Port Context
534
535 When code runs in an environment where C<$SELF> contains its own port ID
536 and exceptions will be caught, it is said to run in a port context.
537
538 Since AnyEvent::MP is event-based, it is not uncommon to register
539 callbacks from C<rcv> handlers. As example, assume that the port receive
540 handler wants to C<die> a second later, using C<after>:
541
542 my $port = port {
543 after 1, sub { die "oops" };
544 };
545
546 Then you will find it does not work - when the after callback is executed,
547 it does not run in port context anymore, so exceptions will not be caught.
548
549 For these cases, AnyEvent::MP exports a special "close constructor" called
550 C<psub>, which works just like perl's builtin C<sub>:
551
552 my $port = port {
553 after 1, psub { die "oops" };
554 };
555
556 C<psub> stores C<$SELF> and returns a code reference. When the code
557 reference is invoked, it will run the code block within the context of
558 that port, so exception handling once more works as expected.
559
560 =head3 Network Errors and the AEMP Guarantee
561
562 I mentioned another important source of monitoring failures: network
563 problems. When a node loses connection to another node, it will invoke all
564 monitoring actions as if the port was killed, even if it is possible that
565 the port still lives happily on another node (not being able to talk to a
566 node means we have no clue what's going on with it, it could be crashed,
567 but also still running without knowing we lost the connection).
568
569 So another way to view monitors is "notify me when some of my messages
570 couldn't be delivered". AEMP has a guarantee about message delivery to a
571 port: After starting a monitor, any message sent to a port will either
572 be delivered, or, when it is lost, any further messages will also be lost
573 until the monitoring action is invoked. After that, further messages
574 I<might> get delivered again.
575
576 This doesn't sound like a very big guarantee, but it is kind of the best
577 you can get while staying sane: Specifically, it means that there will
578 be no "holes" in the message sequence: all messages sent are delivered
579 in order, without any missing in between, and when some were lost, you
580 I<will> be notified of that, so you can take recovery action.
581
582 =head3 Supervising
583
584 Ok, so what is this crashing-everything-stuff going to make applications
585 I<more> stable? Well in fact, the goal is not really to make them more
586 stable, but to make them more resilient against actual errors and
587 crashes. And this is not done by crashing I<everything>, but by crashing
588 everything except a supervisor.
589
590 A supervisor is simply some code that ensures that an application (or a
591 part of it) is running, and if it crashes, is restarted properly.
592
593 To show how to do all this we will create a simple chat server that can
594 handle many chat clients. Both server and clients can be killed and
595 restarted, and even crash, to some extent.
596
597 =head2 Chatting, the Resilient Way
598
599 Without further ado, here is the chat server (to run it, we assume the
600 set-up explained earlier, with a separate F<aemp run> seed node):
601
602 use common::sense;
603 use AnyEvent::MP;
604 use AnyEvent::MP::Global;
605
606 configure;
607
608 my %clients;
609
610 sub msg {
611 print "relaying: $_[0]\n";
612 snd $_, $_[0]
613 for values %clients;
614 }
615
616 our $server = port;
617
618 rcv $server, join => sub {
619 my ($client, $nick) = @_;
620
621 $clients{$client} = $client;
622
623 mon $client, sub {
624 delete $clients{$client};
625 msg "$nick (quits, @_)";
626 };
627 msg "$nick (joins)";
628 };
629
630 rcv $server, privmsg => sub {
631 my ($nick, $msg) = @_;
632 msg "$nick: $msg";
633 };
634
635 AnyEvent::MP::Global::register $server, "eg_chat_server";
636
637 warn "server ready.\n";
638
639 AnyEvent->condvar->recv;
640
641 Looks like a lot, but it is actually quite simple: after your usual
642 preamble (this time we use common sense), we define a helper function that
643 sends some message to every registered chat client:
644
645 sub msg {
646 print "relaying: $_[0]\n";
647 snd $_, $_[0]
648 for values %clients;
649 }
650
651 The clients are stored in the hash C<%client>. Then we define a server
652 port and install two receivers on it, C<join>, which is sent by clients
653 to join the chat, and C<privmsg>, that clients use to send actual chat
654 messages.
655
656 C<join> is most complicated. It expects the client port and the nickname
657 to be passed in the message, and registers the client in C<%clients>.
658
659 rcv $server, join => sub {
660 my ($client, $nick) = @_;
661
662 $clients{$client} = $client;
663
664 The next step is to monitor the client. The monitoring action removes the
665 client and sends a quit message with the error to all remaining clients.
666
667 mon $client, sub {
668 delete $clients{$client};
669 msg "$nick (quits, @_)";
670 };
671
672 And finally, it creates a join message and sends it to all clients.
673
674 msg "$nick (joins)";
675 };
676
677 The C<privmsg> callback simply broadcasts the message to all clients:
678
679 rcv $server, privmsg => sub {
680 my ($nick, $msg) = @_;
681 msg "$nick: $msg";
682 };
683
684 And finally, the server registers itself in the server group, so that
685 clients can find it:
686
687 AnyEvent::MP::Global::register $server, "eg_chat_server";
688
689 Well, well... and where is this supervisor stuff? Well... we cheated,
690 it's not there. To not overcomplicate the example, we only put it into
691 the..... CLIENT!
692
693 =head3 The Client, and a Supervisor!
694
695 Again, here is the client, including supervisor, which makes it a bit
696 longer:
697
698 use common::sense;
699 use AnyEvent::MP;
700 use AnyEvent::MP::Global;
701
702 my $nick = shift;
703
704 configure;
705
706 my ($client, $server);
707
708 sub server_connect {
709 my $servernodes = AnyEvent::MP::Global::find "eg_chat_server"
710 or return after 1, \&server_connect;
711
712 print "\rconnecting...\n";
713
714 $client = port { print "\r \r@_\n> " };
715 mon $client, sub {
716 print "\rdisconnected @_\n";
717 &server_connect;
718 };
719
720 $server = $servernodes->[0];
721 snd $server, join => $client, $nick;
722 mon $server, $client;
723 }
724
725 server_connect;
726
727 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
728 chomp (my $line = <STDIN>);
729 print "> ";
730 snd $server, privmsg => $nick, $line
731 if $server;
732 });
733
734 $| = 1;
735 print "> ";
736 AnyEvent->condvar->recv;
737
738 The first thing the client does is to store the nick name (which is
739 expected as the only command line argument) in C<$nick>, for further
740 usage.
741
742 The next relevant thing is... finally... the supervisor:
743
744 sub server_connect {
745 my $servernodes = AnyEvent::MP::Global::find "eg_chat_server"
746 or return after 1, \&server_connect;
747
748 This looks up the server in the C<eg_chat_server> global group. If it
749 cannot find it (which is likely when the node is just starting up),
750 it will wait a second and then retry. This "wait a bit and retry"
751 is an important pattern, as distributed programming means lots of
752 things are going on asynchronously. In practise, one should use a more
753 intelligent algorithm, to possibly warn after an excessive number of
754 retries. Hopefully future versions of AnyEvent::MP will offer some
755 predefined supervisors, for now you will have to code it on your own.
756
757 Next it creates a local port for the server to send messages to, and
758 monitors it. When the port is killed, it will print "disconnected" and
759 tell the supervisor function to retry again.
760
761 $client = port { print "\r \r@_\n> " };
762 mon $client, sub {
763 print "\rdisconnected @_\n";
764 &server_connect;
765 };
766
767 Then everything is ready: the client will send a C<join> message with it's
768 local port to the server, and start monitoring it:
769
770 $server = $servernodes->[0];
771 snd $server, join => $client, $nick;
772 mon $server, $client;
773 }
774
775 The monitor will ensure that if the server crashes or goes away, the
776 client will be killed as well. This tells the user that the client was
777 disconnected, and will then start to connect the server again.
778
779 The rest of the program deals with the boring details of actually invoking
780 the supervisor function to start the whole client process and handle the
781 actual terminal input, sending it to the server.
782
783 You should now try to start the server and one or more clients in different
784 terminal windows (and the seed node):
785
786 perl eg/chat_client nick1
787 perl eg/chat_client nick2
788 perl eg/chat_server
789 aemp run profile seed
790
791 And then you can experiment with chatting, killing one or more clients, or
792 stopping and restarting the server, to see the monitoring in action.
793
794 The crucial point you should understand from this example is that
795 monitoring is usually symmetric: when you monitor some other port,
796 potentially on another node, that other port usually should monitor you,
797 too, so when the connection dies, both ports get killed, or at least both
798 sides can take corrective action. Exceptions are "servers" that serve
799 multiple clients at once and might only wish to clean up, and supervisors,
800 who of course should not normally get killed (unless they, too, have a
801 supervisor).
802
803 If you often think in object-oriented terms, then treat a port as an
804 object, C<port> is the constructor, the receive callbacks set by C<rcv>
805 act as methods, the C<kil> function becomes the explicit destructor and
806 C<mon> installs a destructor hook. Unlike conventional object oriented
807 programming, it can make sense to exchange ports more freely (for example,
808 to monitor one port from another).
809
810 There is ample room for improvement: the server should probably remember
811 the nickname in the C<join> handler instead of expecting it in every chat
812 message, it should probably monitor itself, and the client should not try
813 to send any messages unless a server is actually connected.
814
815 =head1 PART 3: TIMTOWTDI: Virtual Connections
816
817 The chat system developed in the previous sections is very "traditional"
818 in a way: you start some server(s) and some clients statically and they
819 start talking to each other.
820
821 Sometimes applications work more like "services": They can run on almost
822 any node and talks to itself on other nodes. The L<AnyEvent::MP::Global>
823 service for example monitors nodes joining the network and starts itself
824 automatically on other nodes (if it isn't running already).
825
826 A good way to design such applications is to put them into a module and
827 create "virtual connections" to other nodes - we call this the "bridge
828 head" method, because you start by creating a remote port (the bridge
829 head) and from that you start to bootstrap your application.
830
831 Since that sounds rather theoretical, let's redesign the chat server and
832 client using this design method.
833
834 Here is the server:
835
836 use common::sense;
837 use AnyEvent::MP;
838 use AnyEvent::MP::Global;
839
840 configure;
841
842 AnyEvent::MP::Global::register $NODE, "eg_chat_server2";
843
844 my %clients;
845
846 sub msg {
847 print "relaying: $_[0]\n";
848 snd $_, $_[0]
849 for values %clients;
850 }
851
852 sub client_connect {
853 my ($client, $nick) = @_;
854
855 mon $client;
856 mon $client, sub {
857 delete $clients{$client};
858 msg "$nick (quits, @_)";
859 };
860
861 $clients{$client} = $client;
862
863 msg "$nick (joins)";
864
865 rcv $SELF, sub { msg "$nick: $_[0]" };
866 }
867
868 warn "server ready.\n";
869
870 AnyEvent->condvar->recv;
871
872 It starts not much different, except that this time, we register the node
873 port and not any special port - the clients only want to know which node
874 the server should be running, and in fact, they could also sue some kind
875 of election mechanism or similar.
876
877 The interesting change is that no port is created - the server is all
878 code, and does nothing. All it does is define a function C<client_connect>
879 that expects a client port and a nick as arguments. It then monitors the
880 client port and binds a receive callback on C<$SELF> that expects messages
881 to broadcast to all clients.
882
883 The two C<mon> calls are a bit tricky - the first C<mon> is a shorthand
884 for C<mon $client, $SELF>. The second does the normal "client has gone
885 away" clean-up action. Both could actually be rolled into one C<mon>
886 action.
887
888 C<$SELF> is a good hint that something interetsing is going on. And
889 indeed, when looking at the client, there is a new function, C<spawn>:
890
891 use common::sense;
892 use AnyEvent::MP;
893 use AnyEvent::MP::Global;
894
895 my $nick = shift;
896
897 configure;
898
899 $| = 1;
900
901 my $port = port;
902
903 my ($client, $server);
904
905 sub server_connect {
906 my $servernodes = AnyEvent::MP::Global::find "eg_chat_server2"
907 or return after 1, \&server_connect;
908
909 print "\rconnecting...\n";
910
911 $client = port { print "\r \r@_\n> " };
912 mon $client, sub {
913 print "\rdisconnected @_\n";
914 &server_connect;
915 };
916
917 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
918 mon $server, $client;
919 }
920
921 server_connect;
922
923 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
924 chomp (my $line = <STDIN>);
925 print "> ";
926 snd $server, $line
927 if $server;
928 });
929
930 print "> ";
931 AnyEvent->condvar->recv;
932
933 The client is quite similar to the previous one, but instead of contacting
934 the server port (which no longer exists), it C<spawn>s a new port on the
935 server I<node>:
936
937 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
938 mon $server, $client;
939
940 And of course immediately monitors it. The C<spawn> function creates a new
941 port on a remote node and returns its port ID. After creating the port it
942 calls a function on the remote node, passing any remaining arguments to
943 it, and - most importantly - within the context of the new port. The init
944 function can reside in a module (actually it normally I<should> reside
945 in a module) - AnyEvent::MP will automatically load the module if the
946 function isn't defined.
947
948 The C<spawn> function returns immediately, which means you can immediately
949 send messages to the port, long before the remote node has even heard
950 of our request to create a port on it. In fact, the remote node might
951 not even be running. Despite these troubling facts, everything should
952 work just fine: if the node isn't running (or the init function throws an
953 exception), then the monitor will trigger because the port doesn't exist.
954
955 If the spawn message gets delivered, but the monitoring message is not
956 because of network problems (monitoring, after all, is implemented by
957 passing a message, and messages can get lost), then this connection loss
958 will eventually trigger the monitoring action. On the remote node (which
959 reciprocally monitors the client) the port will also be cleaned up on
960 connection loss. When the node comes up and our monitoring message can be
961 delivered it will instantly fail because the port has been cleaned up in
962 the meantime.
963
964 If your head is spinning by now, that's fine - just keep in mind, after
965 creating a port, monitor "the other side" from it, and all will be cleaned
966 up just fine.
967
968 =head1 PART 4: Services
969
970 #TODO
971
972 =head1 SEE ALSO
973
974 L<AnyEvent::MP>
975
976 L<AnyEvent::MP::Global>
977
978 L<AnyEvent>
979
980 =head1 AUTHOR
981
982 Robin Redeker <elmex@ta-sa.org>
983 Marc Lehmann <schmorp@schmorp.de>
984