ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Intro.pod
Revision: 1.40
Committed: Sat Sep 5 21:16:59 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-1_1, rel-1_2, rel-1_21
Changes since 1.39: +21 -22 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 Message Passing for the Non-Blocked Mind
2
3 =head1 Introduction and Terminology
4
5 This is a tutorial about how to get the swing of the new L<AnyEvent::MP>
6 module, which allows programs to transparently pass messages within the
7 process and to other processes on the same or a different host.
8
9 What kind of messages? Basically a message here means a list of Perl
10 strings, numbers, hashes and arrays, anything that can be expressed as a
11 L<JSON> text (as JSON is used by default in the protocol). Here are two
12 examples:
13
14 write_log => 1251555874, "action was successful.\n"
15 123, ["a", "b", "c"], { foo => "bar" }
16
17 When using L<AnyEvent::MP> it is customary to use a descriptive string as
18 first element of a message, that indictes the type of the message. This
19 element is called a I<tag> in L<AnyEvent::MP>, as some API functions
20 (C<rcv>) support matching it directly.
21
22 Supposedly you want to send a ping message with your current time to
23 somewhere, this is how such a message might look like (in Perl syntax):
24
25 ping => 1251381636
26
27 Now that we know what a message is, to which entities are those
28 messages being I<passed>? They are I<passed> to I<ports>. A I<port> is
29 a destination for messages but also a context to execute code: when
30 a runtime error occurs while executing code belonging to a port, the
31 exception will be raised on the port and can even travel to interested
32 parties on other nodes, which makes supervision of distributed processes
33 easy.
34
35 How do these ports relate to things you know? Each I<port> belongs
36 to a I<node>, and a I<node> is just the UNIX process that runs your
37 L<AnyEvent::MP> application.
38
39 Each I<node> is distinguished from other I<nodes> running on the same or
40 another host in a network by its I<node ID>. A I<node ID> is simply a
41 unique string chosen manually or assigned by L<AnyEvent::MP> in some way
42 (UNIX nodename, random string...).
43
44 Here is a diagram about how I<nodes>, I<ports> and UNIX processes relate
45 to each other. The setup consists of two nodes (more are of course
46 possible): Node C<A> (in UNIX process 7066) with the ports C<ABC> and
47 C<DEF>. And the node C<B> (in UNIX process 8321) with the ports C<FOO> and
48 C<BAR>.
49
50
51 |- PID: 7066 -| |- PID: 8321 -|
52 | | | |
53 | Node ID: A | | Node ID: B |
54 | | | |
55 | Port ABC =|= <----\ /-----> =|= Port FOO |
56 | | X | |
57 | Port DEF =|= <----/ \-----> =|= Port BAR |
58 | | | |
59 |-------------| |-------------|
60
61 The strings for the I<port IDs> here are just for illustrative
62 purposes: Even though I<ports> in L<AnyEvent::MP> are also identified by
63 strings, they can't be choosen manually and are assigned by the system
64 dynamically. These I<port IDs> are unique within a network and can also be
65 used to identify senders or as message tags for instance.
66
67 The next sections will explain the API of L<AnyEvent::MP> by going through
68 a few simple examples. Later some more complex idioms are introduced,
69 which are hopefully useful to solve some real world problems.
70
71 =head2 Passing Your First Message
72
73 As a start lets have a look at the messaging API. The following example
74 is just a demo to show the basic elements of message passing with
75 L<AnyEvent::MP>.
76
77 The example should print: C<Ending with: 123>, in a rather complicated
78 way, by passing some message to a port.
79
80 use AnyEvent;
81 use AnyEvent::MP;
82
83 my $end_cv = AnyEvent->condvar;
84
85 my $port = port;
86
87 rcv $port, test => sub {
88 my ($data) = @_;
89 $end_cv->send ($data);
90 };
91
92 snd $port, test => 123;
93
94 print "Ending with: " . $end_cv->recv . "\n";
95
96 It already uses most of the essential functions inside
97 L<AnyEvent::MP>: First there is the C<port> function which will create a
98 I<port> and will return it's I<port ID>, a simple string.
99
100 This I<port ID> can be used to send messages to the port and install
101 handlers to receive messages on the port. Since it is a simple string
102 it can be safely passed to other I<nodes> in the network when you want
103 to refer to that specific port (usually used for RPC, where you need
104 to tell the other end which I<port> to send the reply to - messages in
105 L<AnyEvent::MP> have a destination, but no source).
106
107 The next function is C<rcv>:
108
109 rcv $port, test => sub { ... };
110
111 It installs a receiver callback on the I<port> that specified as the first
112 argument (it only works for "local" ports, i.e. ports created on the same
113 node). The next argument, in this example C<test>, specifies a I<tag> to
114 match. This means that whenever a message with the first element being
115 the string C<test> is received, the callback is called with the remaining
116 parts of that message.
117
118 Messages can be sent with the C<snd> function, which is used like this in
119 the example above:
120
121 snd $port, test => 123;
122
123 This will send the message C<'test', 123> to the I<port> with the I<port
124 ID> stored in C<$port>. Since in this case the receiver has a I<tag> match
125 on C<test> it will call the callback with the first argument being the
126 number C<123>.
127
128 The callback is a typicall AnyEvent idiom: the callback just passes
129 that number on to the I<condition variable> C<$end_cv> which will then
130 pass the value to the print. Condition variables are out of the scope
131 of this tutorial and not often used with ports, so please consult the
132 L<AnyEvent::Intro> about them.
133
134 Passing messages inside just one process is boring. Before we can move on
135 and do interprocess message passing we first have to make sure some things
136 have been set up correctly for our nodes to talk to each other.
137
138 =head2 System Requirements and System Setup
139
140 Before we can start with real IPC we have to make sure some things work on
141 your system.
142
143 First we have to setup a I<shared secret>: for two L<AnyEvent::MP>
144 I<nodes> to be able to communicate with each other over the network it is
145 necessary to setup the same I<shared secret> for both of them, so they can
146 prove their trustworthyness to each other.
147
148 The easiest way is to set this up is to use the F<aemp> utility:
149
150 aemp gensecret
151
152 This creates a F<$HOME/.perl-anyevent-mp> config file and generates a
153 random shared secret. You can copy this file to any other system and
154 then communicate over the network (via TCP) with it. You can also select
155 your own shared secret (F<aemp setsecret>) and for increased security
156 requirements you can even create (or configure) a TLS certificate (F<aemp
157 gencert>), causing connections to not just be securely authenticated, but
158 also to be encrypted and protected against tinkering.
159
160 Connections will only be successfully established when the I<nodes>
161 that want to connect to each other have the same I<shared secret> (or
162 successfully verify the TLS certificate of the other side, in which case
163 no shared secret is required).
164
165 B<If something does not work as expected, and for example tcpdump shows
166 that the connections are closed almost immediately, you should make sure
167 that F<~/.perl-anyevent-mp> is the same on all hosts/user accounts that
168 you try to connect with each other!>
169
170 Thats is all for now, you will find some more advanced fiddling with the
171 C<aemp> utility later.
172
173 =head2 Shooting the Trouble
174
175 Sometimes things go wrong, and AnyEvent::MP, being a professional module,
176 does not gratitiously spill out messages to your screen.
177
178 To help troubleshooting any issues, there are two environment variables
179 that you can set. The first, C<PERL_ANYEVENT_MP_WARNLEVEL> sets the
180 logging level. The default is C<5>, which means nothing much is
181 printed. Youc an increase it to C<8> or C<9> to get more verbose
182 output. This is example output when starting a node:
183
184 2009-08-31 19:51:50 <8> node anon/5RloFvvYL8jfSScXNL8EpX starting up.
185 2009-08-31 19:51:50 <7> starting global service.
186 2009-08-31 19:51:50 <9> 10.0.0.17:4040 connected as ruth
187 2009-08-31 19:51:50 <7> ruth is up ()
188 2009-08-31 19:51:50 <9> ruth told us it knows about {"doom":["10.0.0.5:45143"],"rain":["10.0.0.19:4040"],"anon/4SYrtJ3ft5l1C16w2hto3t":["10.0.0.1:45920","[2002:58c6:438b:20:21d:60ff:fee8:6e36]:35788","[fd00::a00:1]:37104"],"frank":["10.0.0.18:4040"]}.
189 2009-08-31 19:51:50 <9> connecting to doom with [10.0.0.5:45143]
190 2009-08-31 19:51:50 <9> connecting to anon/4SYrtJ3ft5l1C16w2hto3t with [10.0.0.1:45920 [2002:58c6:438b:20:21d:60ff:fee8:6e36]:35788 [fd00::a00:1]:37104]
191 2009-08-31 19:51:50 <9> ruth told us its addresses (10.0.0.17:4040).
192
193 A lot of info, but at least you can see that it does something.
194
195 The other environment variable that can be useful is
196 C<PERL_ANYEVENT_MP_TRACE>, which, when set to a true value, will cause
197 most messages that are sent or received to be printed. In the above
198 example you would see something like:
199
200 SND ruth <- ["addr",["10.0.0.1:49358","[2002:58c6:438b:20:21d:60ff:fee8:6e36]:58884","[fd00::a00:1]:45006"]]
201 RCV ruth -> ["","AnyEvent::MP::_spawn","20QA7cWubCLTWUhFgBKOx2.x","AnyEvent::MP::Global::connect",0,"ruth"]
202 RCV ruth -> ["","mon1","20QA7cWubCLTWUhFgBKOx2.x"]
203 RCV ruth -> ["20QA7cWubCLTWUhFgBKOx2.x","addr",["10.0.0.17:4040"]]
204 RCV ruth -> ["20QA7cWubCLTWUhFgBKOx2.x","nodes",{"doom":["10.0.0.5:45143"],"rain":["10.0.0.19:4040"],"anon/4SYrtJ3ft5l1C16w2hto3t":["10.0.0.1:45920","[2002:58c6:438b:20:21d:60ff:fee8:6e36]:35788","[fd00::a00:1]:37104"],"frank":["10.0.0.18:4040"]}]
205
206 =head1 PART 1: Passing Messages Between Processes
207
208 =head2 The Receiver
209
210 Lets split the previous example up into two programs: one that contains
211 the sender and one for the receiver. First the receiver application, in
212 full:
213
214 use AnyEvent;
215 use AnyEvent::MP;
216 use AnyEvent::MP::Global;
217
218 configure nodeid => "eg_receiver", binds => ["*:4040"];
219
220 my $port = port;
221
222 grp_reg eg_receivers => $port;
223
224 rcv $port, test => sub {
225 my ($data, $reply_port) = @_;
226
227 print "Received data: " . $data . "\n";
228 };
229
230 AnyEvent->condvar->recv;
231
232 =head3 AnyEvent::MP::Global
233
234 Now, that wasn't too bad, was it? Ok, let's step through the new functions
235 and modules that have been used.
236
237 For starters, there is now an additional module being
238 used: L<AnyEvent::MP::Global>. This module provides us with a I<global
239 registry>, which lets us register ports in groups that are visible on all
240 I<nodes> in a network.
241
242 What is this useful for? Well, the I<port IDs> are random-looking strings,
243 assigned by L<AnyEvent::MP>. We cannot know those I<port IDs> in advance,
244 so we don't know which I<port ID> to send messages to, especially when the
245 message is to be passed between different I<nodes> (or UNIX processes). To
246 find the right I<port> of another I<node> in the network we will need
247 to communicate this somehow to the sender. And exactly that is what
248 L<AnyEvent::MP::Global> provides.
249
250 Especially in larger, more anonymous networks this is handy: imagine you
251 have a few database backends, a few web frontends and some processing
252 distributed over a number of hosts: all of these would simply register
253 themselves in the appropriate group, and your web frontends can start to
254 find some database backend.
255
256 =head3 C<configure> and the Network
257
258 Now, let's have a look at the new function, C<configure>:
259
260 configure nodeid => "eg_receiver", binds => ["*:4040"];
261
262 Before we are able to send messages to other nodes we have to initialise
263 ourself to become a "distributed node". Initialising a node means naming
264 the node, optionally binding some TCP listeners so that other nodes can
265 contact it and connecting to a predefined set of seed addresses so the
266 node can discover the existing network - and the existing network can
267 discover the node!
268
269 All of this (and more) can be passed to the C<configure> function - later
270 we will see how we can do all this without even passing anything to
271 C<configure>!
272
273 The first parameter, C<nodeid>, specified the node ID (in this case
274 C<eg_receiver> - the default is to use the node name of the current host,
275 but for this example we want to be able to run many nodes on the same
276 machine). Node IDs need to be unique within the network and can be almost
277 any string - if you don't care, you can specify a node ID of C<anon/>
278 which will then be replaced by a random node name.
279
280 The second parameter, C<binds>, specifies a list of C<address:port> pairs
281 to bind TCP listeners on. The special "address" of C<*> means to bind on
282 every local IP address.
283
284 The reason to bind on a TCP port is not just that other nodes can connect
285 to us: if no binds are specified, the node will still bind on a dynamic
286 port on all local addresses - but in this case we won't know the port, and
287 cannot tell other nodes to connect to it as seed node.
288
289 A I<seed> is a (fixed) TCP address of some other node in the network. To
290 explain the need for seeds we have to look at the topology of a typical
291 L<AnyEvent::MP> network. The topology is called a I<fully connected mesh>,
292 here an example with 4 nodes:
293
294 N1--N2
295 | \/ |
296 | /\ |
297 N3--N4
298
299 Now imagine another node - C<N5> - wants to connect itself to that network:
300
301 N1--N2
302 | \/ | N5
303 | /\ |
304 N3--N4
305
306 The new node needs to know the I<binds> of all nodes already
307 connected. Exactly this is what the I<seeds> are for: Let's assume that
308 the new node (C<N5>) uses the TCP address of the node C<N2> as seed. This
309 cuases it to connect to C<N2>:
310
311 N1--N2____
312 | \/ | N5
313 | /\ |
314 N3--N4
315
316 C<N2> then tells C<N5> about the I<binds> of the other nodes it is
317 connected to, and C<N5> creates the rest of the connections:
318
319 /--------\
320 N1--N2____|
321 | \/ | N5
322 | /\ | /|
323 N3--N4--- |
324 \________/
325
326 All done: C<N5> is now happily connected to the rest of the network.
327
328 Of course, this process takes time, during which the node is already
329 running. This also means it takes time until the node is fully connected,
330 and global groups and other information is available. The best way to deal
331 with this is to either retry regularly until you found the resource you
332 were looking for, or to only start services on demand after a node has
333 become available.
334
335 =head3 Registering the Receiver
336
337 Coming back to our example, we have now introduced the basic purpose of
338 L<AnyEvent::MP::Global> and C<configure> and its use of profiles. We
339 also set up our profiles for later use and now we will finally continue
340 talking about the receiver.
341
342 Let's look at the next line(s):
343
344 my $port = port;
345 grp_reg eg_receivers => $port;
346
347 The C<port> function has already been discussed. It simply creates a new
348 I<port> and returns the I<port ID>. The C<grp_reg> function, however, is
349 new: The first argument is the name of a I<global group>, and the second
350 argument is the I<port ID> to register in that group. group>.
351
352 You can choose the name of such a I<global group> freely (prefixing your
353 package name is I<highly recommended> however and might be enforce din
354 future versions!). The purpose of such a group is to store a set of port
355 IDs. This set is made available throughout the L<AnyEvent::MP> network,
356 so that each node can see which ports belong to that group.
357
358 Later we will see how the sender looks for the ports in this global
359 group to send messages to them.
360
361 The last step in the example is to set up a receiver callback for those
362 messages, just as was discussed in the first example. We again match
363 for the tag C<test>. The difference is that this time we don't exit the
364 application after receiving the first message. Instead we continue to wait
365 for new messages indefinitely.
366
367 =head2 The Sender
368
369 Ok, now let's take a look at the sender code:
370
371 use AnyEvent;
372 use AnyEvent::MP;
373 use AnyEvent::MP::Global;
374
375 configure nodeid => "eg_sender", seeds => ["*:4040"];
376
377 my $find_timer =
378 AnyEvent->timer (after => 0, interval => 1, cb => sub {
379 my $ports = grp_get "eg_receivers"
380 or return;
381
382 snd $_, test => time
383 for @$ports;
384 });
385
386 AnyEvent->condvar->recv;
387
388 It's even less code. The C<configure> serves the same purpose as in the
389 receiver, but instead of specifying binds we specify a list of seeds -
390 which happens to be the same as the binds used by the receiver, which
391 becomes our seed node.
392
393 Next we set up a timer that repeatedly (every second) calls this chunk of
394 code:
395
396 my $ports = grp_get "eg_receivers"
397 or return;
398
399 snd $_, test => time
400 for @$ports;
401
402 The only new function here is the C<grp_get> function of
403 L<AnyEvent::MP::Global>. It searches in the global group named
404 C<eg_receivers> for ports. If none are found, it returns C<undef>, which
405 makes our code return instantly and wait for the next round, as nobody is
406 interested in our message.
407
408 As soon as the receiver application has connected and the information
409 about the newly added port in the receiver has propagated to the sender
410 node, C<grp_get> returns an array reference that contains the I<port ID> of
411 the receiver I<port(s)>.
412
413 We then just send a message with a tag and the current time to every
414 I<port> in the global group.
415
416 =head3 Splitting Network Configuration and Application Code
417
418 Ok, so far, this works. In the real world, however, the person configuring
419 your application to run on a specific network (the end user or network
420 administrator) is often different to the person coding the application.
421
422 Or to put it differently: the arguments passed to configure are usually
423 provided not by the programmer, but by whoever is deploying the program.
424
425 To make this easy, AnyEvent::MP supports a simple configuration database,
426 using profiles, which can be managed using the F<aemp> command-line
427 utility (yes, this section is about the advanced tinkering we mentioned
428 before).
429
430 When you change both programs above to simply call
431
432 configure;
433
434 then AnyEvent::MP tries to look up a profile using the current node name
435 in its configuration database, falling back to some global default.
436
437 You can run "generic" nodes using the F<aemp> utility as well, and we will
438 exploit this in the following way: we configure a profile "seed" and run
439 a node using it, whose sole purpose is to be a seed node for our example
440 programs.
441
442 We bind the seed node to port 4040 on all interfaces:
443
444 aemp profile seed binds "*:4040"
445
446 And we configure all nodes to use this as seed node (this only works when
447 running on the same host, for multiple machines you would provide the IP
448 address or hostname of the node running the seed), and use a random name
449 (because we want to start multiple nodes on the same host):
450
451 aemp seeds "*:4040" nodeid anon/
452
453 Then we run the seed node:
454
455 aemp run profile seed
456
457 After that, we can start as many other nodes as we want, and they will all
458 use our generic seed node to discover each other.
459
460 In fact, starting many receivers nicely illustrates that the time sender
461 can have multiple receivers.
462
463 That's all for now - next we will teach you about monitoring by writing a
464 simple chat client and server :)
465
466 =head1 PART 2: Monitoring, Supervising, Exception Handling and Recovery
467
468 That's a mouthful, so what does it mean? Our previous example is what one
469 could call "very loosely coupled" - the sender doesn't care about whether
470 there are any receivers, and the receivers do not care if there is any
471 sender.
472
473 This can work fine for simple services, but most real-world applications
474 want to ensure that the side they are expecting to be there is actually
475 there. Going one step further: most bigger real-world applications even
476 want to ensure that if some component is missing, or has crashed, it will
477 still be there, by recovering and restarting the service.
478
479 AnyEvent::MP supports this by catching exceptions and network problems,
480 and notifying interested parties of this.
481
482 =head2 Exceptions, Network Errors and Monitors
483
484 =head3 Exceptions
485
486 Exceptions are handled on a per-port basis: receive callbacks are executed
487 in a special context, the port-context, and code that throws an uncaught
488 exception will cause the port to be C<kil>led. Killed ports are destroyed
489 automatically (killing ports is the only way to free ports, incidentally).
490
491 Ports can be monitored, even from a different host, and when a port is
492 killed any entity monitoring it will be notified.
493
494 Here is a simple example:
495
496 use AnyEvent::MP;
497
498 # create a port, it always dies
499 my $port = port { die "oops" };
500
501 # monitor it
502 mon $port, sub {
503 warn "$port was killed (with reason @_)";
504 };
505
506 # now send it some message, causing it to die:
507 snd $port;
508
509 It first creates a port whose only action is to throw an exception,
510 and the monitors it with the C<mon> function. Afterwards it sends it a
511 message, causing it to die and call the monitoring callback:
512
513 anon/6WmIpj.a was killed (with reason die oops at xxx line 5.) at xxx line 9.
514
515 The callback was actually passed two arguments: C<die> (to indicate it did
516 throw an exception as opposed to, say, a network error) and the exception
517 message itself.
518
519 What happens when a port is killed before we have a chance to monitor
520 it? Granted, this is highly unlikely in our example, but when you program
521 in a network this can easily happen due to races between nodes.
522
523 use AnyEvent::MP;
524
525 my $port = port { die "oops" };
526
527 snd $port;
528
529 mon $port, sub {
530 warn "$port was killed (with reason @_)";
531 };
532
533 This time we will get something like:
534
535 anon/zpX.a was killed (with reason no_such_port cannot monitor nonexistent port)
536
537 Since the port was already gone, the kill reason is now C<no_such_port>
538 with some descriptive (we hope) error message.
539
540 In fact, the kill reason is usually some identifier as first argument
541 and a human-readable error message as second argument, but can be about
542 anything (it's a list) or even nothing - which is called a "normal" kill.
543
544 You can kill ports manually using the C<kil> function, which will be
545 treated like an error when any reason is specified:
546
547 kil $port, custom_error => "don't like your steenking face";
548
549 And a clean kill without any reason arguments:
550
551 kil $port;
552
553 By now you probably wonder what this "normal" kill business is: A common
554 idiom is to not specify a callback to C<mon>, but another port, such as
555 C<$SELF>:
556
557 mon $port, $SELF;
558
559 This basically means "monitor $port and kill me when it crashes". And a
560 "normal" kill does not count as a crash. This way you can easily link
561 ports together and make them crash together on errors (but allow you to
562 remove a port silently).
563
564 =head3 Port Context
565
566 When code runs in an environment where C<$SELF> contains its own port ID
567 and exceptions will be caught, it is said to run in a port context.
568
569 Since AnyEvent::MP is event-based, it is not uncommon to register
570 callbacks from C<rcv> handlers. As example, assume that the port receive
571 handler wants to C<die> a second later, using C<after>:
572
573 my $port = port {
574 after 1, sub { die "oops" };
575 };
576
577 Then you will find it does not work - when the after callback is executed,
578 it does not run in port context anymore, so exceptions will not be caught.
579
580 For these cases, AnyEvent::MP exports a special "close constructor" called
581 C<psub>, which works just like perl's builtin C<sub>:
582
583 my $port = port {
584 after 1, psub { die "oops" };
585 };
586
587 C<psub> stores C<$SELF> and returns a code reference. When the code
588 reference is invoked, it will run the code block within the context of
589 that port, so exception handling once more works as expected.
590
591 =head3 Network Errors and the AEMP Guarantee
592
593 I mentioned another important source of monitoring failures: network
594 problems. When a node loses connection to another node, it will invoke all
595 monitoring actions as if the port was killed, even if it is possible that
596 the port still lives happily on another node (not being able to talk to a
597 node means we have no clue what's going on with it, it could be crashed,
598 but also still running without knowing we lost the connection).
599
600 So another way to view monitors is "notify me when some of my messages
601 couldn't be delivered". AEMP has a guarantee about message delivery to a
602 port: After starting a monitor, any message sent to a port will either
603 be delivered, or, when it is lost, any further messages will also be lost
604 until the monitoring action is invoked. After that, further messages
605 I<might> get delivered again.
606
607 This doesn't sound like a very big guarantee, but it is kind of the best
608 you can get while staying sane: Specifically, it means that there will
609 be no "holes" in the message sequence: all messages sent are delivered
610 in order, without any missing in between, and when some were lost, you
611 I<will> be notified of that, so you can take recovery action.
612
613 =head3 Supervising
614
615 Ok, so what is this crashing-everything-stuff going to make applications
616 I<more> stable? Well in fact, the goal is not really to make them more
617 stable, but to make them more resilient against actual errors and
618 crashes. And this is not done by crashing I<everything>, but by crashing
619 everything except a supervisor.
620
621 A supervisor is simply some code that ensures that an application (or a
622 part of it) is running, and if it crashes, is restarted properly.
623
624 To show how to do all this we will create a simple chat server that can
625 handle many chat clients. Both server and clients can be killed and
626 restarted, and even crash, to some extent.
627
628 =head2 Chatting, the Resilient Way
629
630 Without further ado, here is the chat server (to run it, we assume the
631 set-up explained earlier, with a separate F<aemp run> seed node):
632
633 use common::sense;
634 use AnyEvent::MP;
635 use AnyEvent::MP::Global;
636
637 configure;
638
639 my %clients;
640
641 sub msg {
642 print "relaying: $_[0]\n";
643 snd $_, $_[0]
644 for values %clients;
645 }
646
647 our $server = port;
648
649 rcv $server, join => sub {
650 my ($client, $nick) = @_;
651
652 $clients{$client} = $client;
653
654 mon $client, sub {
655 delete $clients{$client};
656 msg "$nick (quits, @_)";
657 };
658 msg "$nick (joins)";
659 };
660
661 rcv $server, privmsg => sub {
662 my ($nick, $msg) = @_;
663 msg "$nick: $msg";
664 };
665
666 grp_reg eg_chat_server => $server;
667
668 warn "server ready.\n";
669
670 AnyEvent->condvar->recv;
671
672 Looks like a lot, but it is actually quite simple: after your usual
673 preamble (this time we use common sense), we define a helper function that
674 sends some message to every registered chat client:
675
676 sub msg {
677 print "relaying: $_[0]\n";
678 snd $_, $_[0]
679 for values %clients;
680 }
681
682 The clients are stored in the hash C<%client>. Then we define a server
683 port and install two receivers on it, C<join>, which is sent by clients
684 to join the chat, and C<privmsg>, that clients use to send actual chat
685 messages.
686
687 C<join> is most complicated. It expects the client port and the nickname
688 to be passed in the message, and registers the client in C<%clients>.
689
690 rcv $server, join => sub {
691 my ($client, $nick) = @_;
692
693 $clients{$client} = $client;
694
695 The next step is to monitor the client. The monitoring action removes the
696 client and sends a quit message with the error to all remaining clients.
697
698 mon $client, sub {
699 delete $clients{$client};
700 msg "$nick (quits, @_)";
701 };
702
703 And finally, it creates a join message and sends it to all clients.
704
705 msg "$nick (joins)";
706 };
707
708 The C<privmsg> callback simply broadcasts the message to all clients:
709
710 rcv $server, privmsg => sub {
711 my ($nick, $msg) = @_;
712 msg "$nick: $msg";
713 };
714
715 And finally, the server registers itself in the server group, so that
716 clients can find it:
717
718 grp_reg eg_chat_server => $server;
719
720 Well, well... and where is this supervisor stuff? Well... we cheated,
721 it's not there. To not overcomplicate the example, we only put it into
722 the..... CLIENT!
723
724 =head3 The Client, and a Supervisor!
725
726 Again, here is the client, including supervisor, which makes it a bit
727 longer:
728
729 use common::sense;
730 use AnyEvent::MP;
731 use AnyEvent::MP::Global;
732
733 my $nick = shift;
734
735 configure;
736
737 my ($client, $server);
738
739 sub server_connect {
740 my $servernodes = grp_get "eg_chat_server"
741 or return after 1, \&server_connect;
742
743 print "\rconnecting...\n";
744
745 $client = port { print "\r \r@_\n> " };
746 mon $client, sub {
747 print "\rdisconnected @_\n";
748 &server_connect;
749 };
750
751 $server = $servernodes->[0];
752 snd $server, join => $client, $nick;
753 mon $server, $client;
754 }
755
756 server_connect;
757
758 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
759 chomp (my $line = <STDIN>);
760 print "> ";
761 snd $server, privmsg => $nick, $line
762 if $server;
763 });
764
765 $| = 1;
766 print "> ";
767 AnyEvent->condvar->recv;
768
769 The first thing the client does is to store the nick name (which is
770 expected as the only command line argument) in C<$nick>, for further
771 usage.
772
773 The next relevant thing is... finally... the supervisor:
774
775 sub server_connect {
776 my $servernodes = grp_get "eg_chat_server"
777 or return after 1, \&server_connect;
778
779 This looks up the server in the C<eg_chat_server> global group. If it
780 cannot find it (which is likely when the node is just starting up),
781 it will wait a second and then retry. This "wait a bit and retry"
782 is an important pattern, as distributed programming means lots of
783 things are going on asynchronously. In practise, one should use a more
784 intelligent algorithm, to possibly warn after an excessive number of
785 retries. Hopefully future versions of AnyEvent::MP will offer some
786 predefined supervisors, for now you will have to code it on your own.
787
788 Next it creates a local port for the server to send messages to, and
789 monitors it. When the port is killed, it will print "disconnected" and
790 tell the supervisor function to retry again.
791
792 $client = port { print "\r \r@_\n> " };
793 mon $client, sub {
794 print "\rdisconnected @_\n";
795 &server_connect;
796 };
797
798 Then everything is ready: the client will send a C<join> message with it's
799 local port to the server, and start monitoring it:
800
801 $server = $servernodes->[0];
802 snd $server, join => $client, $nick;
803 mon $server, $client;
804 }
805
806 The monitor will ensure that if the server crashes or goes away, the
807 client will be killed as well. This tells the user that the client was
808 disconnected, and will then start to connect the server again.
809
810 The rest of the program deals with the boring details of actually invoking
811 the supervisor function to start the whole client process and handle the
812 actual terminal input, sending it to the server.
813
814 You should now try to start the server and one or more clients in different
815 terminal windows (and the seed node):
816
817 perl eg/chat_client nick1
818 perl eg/chat_client nick2
819 perl eg/chat_server
820 aemp run profile seed
821
822 And then you can experiment with chatting, killing one or more clients, or
823 stopping and restarting the server, to see the monitoring in action.
824
825 The crucial point you should understand from this example is that
826 monitoring is usually symmetric: when you monitor some other port,
827 potentially on another node, that other port usually should monitor you,
828 too, so when the connection dies, both ports get killed, or at least both
829 sides can take corrective action. Exceptions are "servers" that serve
830 multiple clients at once and might only wish to clean up, and supervisors,
831 who of course should not normally get killed (unless they, too, have a
832 supervisor).
833
834 If you often think in object-oriented terms, then treat a port as an
835 object, C<port> is the constructor, the receive callbacks set by C<rcv>
836 act as methods, the C<kil> function becomes the explicit destructor and
837 C<mon> installs a destructor hook. Unlike conventional object oriented
838 programming, it can make sense to exchange ports more freely (for example,
839 to monitor one port from another).
840
841 There is ample room for improvement: the server should probably remember
842 the nickname in the C<join> handler instead of expecting it in every chat
843 message, it should probably monitor itself, and the client should not try
844 to send any messages unless a server is actually connected.
845
846 =head1 PART 3: TIMTOWTDI: Virtual Connections
847
848 The chat system developed in the previous sections is very "traditional"
849 in a way: you start some server(s) and some clients statically and they
850 start talking to each other.
851
852 Sometimes applications work more like "services": They can run on almost
853 any node and talks to itself on other nodes. The L<AnyEvent::MP::Global>
854 service for example monitors nodes joining the network and starts itself
855 automatically on other nodes (if it isn't running already).
856
857 A good way to design such applications is to put them into a module and
858 create "virtual connections" to other nodes - we call this the "bridge
859 head" method, because you start by creating a remote port (the bridge
860 head) and from that you start to bootstrap your application.
861
862 Since that sounds rather theoretical, let's redesign the chat server and
863 client using this design method.
864
865 Here is the server:
866
867 use common::sense;
868 use AnyEvent::MP;
869 use AnyEvent::MP::Global;
870
871 configure;
872
873 grp_reg eg_chat_server2 => $NODE;
874
875 my %clients;
876
877 sub msg {
878 print "relaying: $_[0]\n";
879 snd $_, $_[0]
880 for values %clients;
881 }
882
883 sub client_connect {
884 my ($client, $nick) = @_;
885
886 mon $client;
887 mon $client, sub {
888 delete $clients{$client};
889 msg "$nick (quits, @_)";
890 };
891
892 $clients{$client} = $client;
893
894 msg "$nick (joins)";
895
896 rcv $SELF, sub { msg "$nick: $_[0]" };
897 }
898
899 warn "server ready.\n";
900
901 AnyEvent->condvar->recv;
902
903 It starts out not much different then the previous example, except that
904 this time, we register the node port in the global group and not any port
905 we created - the clients only want to know which node the server should be
906 running on. In fact, they could also use some kind of election mechanism,
907 to find the node with lowest load or something like that.
908
909 The more interesting change is that indeed no server port is created -
910 the server consists only of code, and "does" nothing by itself. All it
911 does is define a function C<client_connect>, which expects a client port
912 and a nick name as arguments. It then monitors the client port and binds
913 a receive callback on C<$SELF>, which expects messages that in turn are
914 broadcast to all clients.
915
916 The two C<mon> calls are a bit tricky - the first C<mon> is a shorthand
917 for C<mon $client, $SELF>. The second does the normal "client has gone
918 away" clean-up action. Both could actually be rolled into one C<mon>
919 action.
920
921 C<$SELF> is a good hint that something interesting is going on. And
922 indeed, when looking at the client code, there is a new function,
923 C<spawn>:
924
925 use common::sense;
926 use AnyEvent::MP;
927 use AnyEvent::MP::Global;
928
929 my $nick = shift;
930
931 configure;
932
933 $| = 1;
934
935 my $port = port;
936
937 my ($client, $server);
938
939 sub server_connect {
940 my $servernodes = grp_get "eg_chat_server2"
941 or return after 1, \&server_connect;
942
943 print "\rconnecting...\n";
944
945 $client = port { print "\r \r@_\n> " };
946 mon $client, sub {
947 print "\rdisconnected @_\n";
948 &server_connect;
949 };
950
951 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
952 mon $server, $client;
953 }
954
955 server_connect;
956
957 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
958 chomp (my $line = <STDIN>);
959 print "> ";
960 snd $server, $line
961 if $server;
962 });
963
964 print "> ";
965 AnyEvent->condvar->recv;
966
967 The client is quite similar to the previous one, but instead of contacting
968 the server I<port> (which no longer exists), it C<spawn>s (creates) a new
969 the server I<port on node>:
970
971 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
972 mon $server, $client;
973
974 And of course the first thing after creating it is monitoring it.
975
976 The C<spawn> function creates a new port on a remote node and returns
977 its port ID. After creating the port it calls a function on the remote
978 node, passing any remaining arguments to it, and - most importantly -
979 executes the function within the context of the new port, so it can be
980 manipulated by refering to C<$SELF>. The init function can reside in a
981 module (actually it normally I<should> reside in a module) - AnyEvent::MP
982 will automatically load the module if the function isn't defined.
983
984 The C<spawn> function returns immediately, which means you can instantly
985 send messages to the port, long before the remote node has even heard
986 of our request to create a port on it. In fact, the remote node might
987 not even be running. Despite these troubling facts, everything should
988 work just fine: if the node isn't running (or the init function throws an
989 exception), then the monitor will trigger because the port doesn't exist.
990
991 If the spawn message gets delivered, but the monitoring message is not
992 because of network problems (extremely unlikely, but monitoring, after
993 all, is implemented by passing a message, and messages can get lost), then
994 this connection loss will eventually trigger the monitoring action. On the
995 remote node (which in return monitors the client) the port will also be
996 cleaned up on connection loss. When the remote node comes up again and our
997 monitoring message can be delivered, it will instantly fail because the
998 port has been cleaned up in the meantime.
999
1000 If your head is spinning by now, that's fine - just keep in mind, after
1001 creating a port, monitor it on the local node, and monitor "the other
1002 side" from the remote node, and all will be cleaned up just fine.
1003
1004 =head2 Services
1005
1006 Above it was mentioned that C<spawn> automatically loads modules, and this
1007 can be exploited in various ways.
1008
1009 Assume for a moment you put the server into a file called
1010 F<mymod/chatserver.pm> reachable from the current directory. Then you
1011 could run a node there with:
1012
1013 aemp run
1014
1015 The other nodes could C<spawn> the server by using
1016 C<mymod::chatserver::client_connect> as init function.
1017
1018 Likewise, when you have some service that starts automatically (similar to
1019 AnyEvent::MP::Global), then you can configure this service statically:
1020
1021 aemp profile mysrvnode services mymod::service::
1022 aemp run profile mysrvnode
1023
1024 And the module will automatically be loaded in the node, as specifying a
1025 module name (with C<::>-suffix) will simply load the module, which is then
1026 free to do whatever it wants.
1027
1028 Of course, you can also do it in the much more standard way by writing
1029 a module (e.g. C<BK::Backend::IRC>), installing it as part of a module
1030 distribution and then configure nodes, for example, if I want to run the
1031 Bummskraut IRC backend on a machine named "ruth", I could do this:
1032
1033 aemp profile ruth addservice BK::Backend::IRC::
1034
1035 And any F<aemp run> on that host will automaticllay have the bummskraut
1036 irc backend running.
1037
1038 That's plenty of possibilities you can use - it's all up to you how you
1039 structure your application.
1040
1041 =head1 THE END
1042
1043 This is the end of this introduction, but hopefully not the end of
1044 your career als AEMP user. I hope the tutorial was enough to make the
1045 basic concepts clear. Keep in mind that distributed programming is not
1046 completely trivial, that AnyEvent::MP is still in it's infancy, and I hope
1047 it will be useful to create exciting new applications.
1048
1049 =head1 SEE ALSO
1050
1051 L<AnyEvent::MP>
1052
1053 L<AnyEvent::MP::Global>
1054
1055 L<AnyEvent>
1056
1057 =head1 AUTHOR
1058
1059 Robin Redeker <elmex@ta-sa.org>
1060 Marc Lehmann <schmorp@schmorp.de>
1061