ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Intro.pod
Revision: 1.46
Committed: Sun Mar 4 18:48:27 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.45: +17 -20 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 Message Passing for the Non-Blocked Mind
2
3 =head1 Introduction and Terminology
4
5 This is a tutorial about how to get the swing of the new L<AnyEvent::MP>
6 module, which allows programs to transparently pass messages within the
7 process and to other processes on the same or a different host.
8
9 What kind of messages? Basically a message here means a list of Perl
10 strings, numbers, hashes and arrays, anything that can be expressed as a
11 L<JSON> text (as JSON is the default serialiser in the protocol). Here are
12 two examples:
13
14 write_log => 1251555874, "action was successful.\n"
15 123, ["a", "b", "c"], { foo => "bar" }
16
17 When using L<AnyEvent::MP> it is customary to use a descriptive string as
18 first element of a message that indicates the type of the message. This
19 element is called a I<tag> in L<AnyEvent::MP>, as some API functions
20 (C<rcv>) support matching it directly.
21
22 Supposedly you want to send a ping message with your current time to
23 somewhere, this is how such a message might look like (in Perl syntax):
24
25 ping => 1251381636
26
27 Now that we know what a message is, to which entities are those
28 messages being I<passed>? They are I<passed> to I<ports>. A I<port> is
29 a destination for messages but also a context to execute code: when
30 a runtime error occurs while executing code belonging to a port, the
31 exception will be raised on the port and can even travel to interested
32 parties on other nodes, which makes supervision of distributed processes
33 easy.
34
35 How do these ports relate to things you know? Each I<port> belongs
36 to a I<node>, and a I<node> is just the UNIX process that runs your
37 L<AnyEvent::MP> application.
38
39 Each I<node> is distinguished from other I<nodes> running on the same or
40 another host in a network by its I<node ID>. A I<node ID> is simply a
41 unique string chosen manually or assigned by L<AnyEvent::MP> in some way
42 (UNIX nodename, random string...).
43
44 Here is a diagram about how I<nodes>, I<ports> and UNIX processes relate
45 to each other. The setup consists of two nodes (more are of course
46 possible): Node C<A> (in UNIX process 7066) with the ports C<ABC> and
47 C<DEF>. And the node C<B> (in UNIX process 8321) with the ports C<FOO> and
48 C<BAR>.
49
50
51 |- PID: 7066 -| |- PID: 8321 -|
52 | | | |
53 | Node ID: A | | Node ID: B |
54 | | | |
55 | Port ABC =|= <----\ /-----> =|= Port FOO |
56 | | X | |
57 | Port DEF =|= <----/ \-----> =|= Port BAR |
58 | | | |
59 |-------------| |-------------|
60
61 The strings for the I<port IDs> here are just for illustrative
62 purposes: Even though I<ports> in L<AnyEvent::MP> are also identified by
63 strings, they can't be chosen manually and are assigned by the system
64 dynamically. These I<port IDs> are unique within a network and can also be
65 used to identify senders, or even as message tags for instance.
66
67 The next sections will explain the API of L<AnyEvent::MP> by going through
68 a few simple examples. Later some more complex idioms are introduced,
69 which are hopefully useful to solve some real world problems.
70
71 =head2 Passing Your First Message
72
73 For starters, let's have a look at the messaging API. The following
74 example is just a demo to show the basic elements of message passing with
75 L<AnyEvent::MP>.
76
77 The example should print: C<Ending with: 123>, in a rather complicated
78 way, by passing some message to a port.
79
80 use AnyEvent;
81 use AnyEvent::MP;
82
83 my $end_cv = AnyEvent->condvar;
84
85 my $port = port;
86
87 rcv $port, test => sub {
88 my ($data) = @_;
89 $end_cv->send ($data);
90 };
91
92 snd $port, test => 123;
93
94 print "Ending with: " . $end_cv->recv . "\n";
95
96 It already uses most of the essential functions inside
97 L<AnyEvent::MP>: First there is the C<port> function which creates a
98 I<port> and will return it's I<port ID>, a simple string.
99
100 This I<port ID> can be used to send messages to the port and install
101 handlers to receive messages on the port. Since it is a simple string
102 it can be safely passed to other I<nodes> in the network when you want
103 to refer to that specific port (usually used for RPC, where you need
104 to tell the other end which I<port> to send the reply to - messages in
105 L<AnyEvent::MP> have a destination, but no source).
106
107 The next function is C<rcv>:
108
109 rcv $port, test => sub { ... };
110
111 It installs a receiver callback on the I<port> that specified as the first
112 argument (it only works for "local" ports, i.e. ports created on the same
113 node). The next argument, in this example C<test>, specifies a I<tag> to
114 match. This means that whenever a message with the first element being
115 the string C<test> is received, the callback is called with the remaining
116 parts of that message.
117
118 Messages can be sent with the C<snd> function, which is used like this in
119 the example above:
120
121 snd $port, test => 123;
122
123 This will send the message C<'test', 123> to the I<port> with the I<port
124 ID> stored in C<$port>. Since in this case the receiver has a I<tag> match
125 on C<test> it will call the callback with the first argument being the
126 number C<123>.
127
128 The callback is a typical AnyEvent idiom: the callback just passes
129 that number on to the I<condition variable> C<$end_cv> which will then
130 pass the value to the print. Condition variables are out of the scope
131 of this tutorial and not often used with ports, so please consult the
132 L<AnyEvent::Intro> about them.
133
134 Passing messages inside just one process is boring. Before we can move on
135 and do interprocess message passing we first have to make sure some things
136 have been set up correctly for our nodes to talk to each other.
137
138 =head2 System Requirements and System Setup
139
140 Before we can start with real IPC we have to make sure some things work on
141 your system.
142
143 First we have to setup a I<shared secret>: for two L<AnyEvent::MP>
144 I<nodes> to be able to communicate with each other over the network it is
145 necessary to setup the same I<shared secret> for both of them, so they can
146 prove their trustworthyness to each other.
147
148 The easiest way is to set this up is to use the F<aemp> utility:
149
150 aemp gensecret
151
152 This creates a F<$HOME/.perl-anyevent-mp> config file and generates a
153 random shared secret. You can copy this file to any other system and
154 then communicate over the network (via TCP) with it. You can also select
155 your own shared secret (F<aemp setsecret>) and for increased security
156 requirements you can even create (or configure) a TLS certificate (F<aemp
157 gencert>), causing connections to not just be securely authenticated, but
158 also to be encrypted and protected against tinkering.
159
160 Connections will only be successfully established when the I<nodes>
161 that want to connect to each other have the same I<shared secret> (or
162 successfully verify the TLS certificate of the other side, in which case
163 no shared secret is required).
164
165 B<If something does not work as expected, and for example tcpdump shows
166 that the connections are closed almost immediately, you should make sure
167 that F<~/.perl-anyevent-mp> is the same on all hosts/user accounts that
168 you try to connect with each other!>
169
170 Thats is all for now, you will find some more advanced fiddling with the
171 C<aemp> utility later.
172
173 =head2 Shooting the Trouble
174
175 Sometimes things go wrong, and AnyEvent::MP, being a professional module,
176 does not gratuitously spill out messages to your screen.
177
178 To help troubleshooting any issues, there are two environment variables
179 that you can set. The first, C<PERL_ANYEVENT_MP_WARNLEVEL> sets the
180 logging level. The default is C<5>, which means nothing much is
181 printed. You can increase it to C<8> or C<9> to get more verbose
182 output. This is example output when starting a node:
183
184 2012-03-04 19:41:10 <8> node cerebro starting up.
185 2012-03-04 19:41:10 <8> node listens on [10.0.0.1:4040].
186 2012-03-04 19:41:10 <9> trying connect to seed node 10.0.0.19:4040.
187 2012-03-04 19:41:10 <9> 10.0.0.19:4040 connected as rain
188 2012-03-04 19:41:10 <7> rain is up ()
189
190 A lot of info, but at least you can see that it does something.
191
192 The other environment variable that can be useful is
193 C<PERL_ANYEVENT_MP_TRACE>, which, when set to a true value, will cause
194 most messages that are sent or received to be printed. For example, F<aemp
195 restart rijk> might output these message exchanges:
196
197 SND rijk <- [null,"eval","AnyEvent::Watchdog::Util::restart; ()","aemp/cerebro/z4kUPp2JT4#b"]
198 SND rain <- [null,"g_slave",{"'l":{"aemp/cerebro/z4kUPp2JT4":["10.0.0.1:48168"]}}]
199 SND rain <- [null,"g_find","rijk"]
200 RCV rain -> ["","g_found","rijk",["10.0.0.23:4040"]]
201 RCV rijk -> ["b",""]
202
203 =head1 PART 1: Passing Messages Between Processes
204
205 =head2 The Receiver
206
207 Lets split the previous example up into two programs: one that contains
208 the sender and one for the receiver. First the receiver application, in
209 full:
210
211 use AnyEvent;
212 use AnyEvent::MP;
213
214 configure nodeid => "eg_receiver/%u", binds => ["*:4040"];
215
216 my $port = port;
217 my $db_guard = db_reg eg_receivers => $port;
218
219 rcv $port, test => sub {
220 my ($data, $reply_port) = @_;
221
222 print "Received data: " . $data . "\n";
223 };
224
225 AnyEvent->condvar->recv;
226
227 =head3 AnyEvent::MP::Global
228
229 Now, that wasn't too bad, was it? OK, let's step through the new functions
230 and modules that have been used.
231
232 For starters, there is now an additional module being
233 used: L<AnyEvent::MP::Global>. This module provides us with a I<global
234 registry>, which lets us register ports in groups that are visible on all
235 I<nodes> in a network.
236
237 What is this useful for? Well, the I<port IDs> are random-looking strings,
238 assigned by L<AnyEvent::MP>. We cannot know those I<port IDs> in advance,
239 so we don't know which I<port ID> to send messages to, especially when the
240 message is to be passed between different I<nodes> (or UNIX processes). To
241 find the right I<port> of another I<node> in the network we will need
242 to communicate this somehow to the sender. And exactly that is what
243 L<AnyEvent::MP::Global> provides.
244
245 Especially in larger, more anonymous networks this is handy: imagine you
246 have a few database backends, a few web front-ends and some processing
247 distributed over a number of hosts: all of these would simply register
248 themselves in the appropriate group, and your web front-ends can start to
249 find some database backend.
250
251 =head3 C<configure> and Joining and Maintaining the Network
252
253 Now, let's have a look at the new function, C<configure>:
254
255 configure nodeid => "eg_receiver", binds => ["*:4040"];
256
257 Before we are able to send messages to other nodes we have to initialise
258 ourself to become a "distributed node". Initialising a node means naming
259 the node, optionally binding some TCP listeners so that other nodes can
260 contact it and connecting to a predefined set of seed addresses so the
261 node can discover the existing network - and the existing network can
262 discover the node!
263
264 All of this (and more) can be passed to the C<configure> function - later
265 we will see how we can do all this without even passing anything to
266 C<configure>!
267
268 The first parameter, C<nodeid>, specified the node ID (in this case
269 C<eg_receiver> - the default is to use the node name of the current host,
270 but for this example we want to be able to run many nodes on the same
271 machine). Node IDs need to be unique within the network and can be almost
272 any string - if you don't care, you can specify a node ID of C<anon/>
273 which will then be replaced by a random node name.
274
275 The second parameter, C<binds>, specifies a list of C<address:port> pairs
276 to bind TCP listeners on. The special "address" of C<*> means to bind on
277 every local IP address (this might not work on every OS, so it should not
278 be used unless you know it works).
279
280 The reason to bind on a TCP port is not just that other nodes can connect
281 to us: if no binds are specified, the node will still bind on a dynamic
282 port on all local addresses - but in this case we won't know the port, and
283 cannot tell other nodes to connect to it as seed node.
284
285 A I<seed> is a (fixed) TCP address of some other node in the network. To
286 explain the need for seeds we have to look at the topology of a typical
287 L<AnyEvent::MP> network. The topology is called a I<fully connected mesh>,
288 here an example with 4 nodes:
289
290 N1--N2
291 | \/ |
292 | /\ |
293 N3--N4
294
295 Now imagine another node - C<N5> - wants to connect itself to that network:
296
297 N1--N2
298 | \/ | N5
299 | /\ |
300 N3--N4
301
302 The new node needs to know the I<binds> of all nodes already
303 connected. Exactly this is what the I<seeds> are for: Let's assume that
304 the new node (C<N5>) uses the TCP address of the node C<N2> as seed. This
305 causes it to connect to C<N2>:
306
307 N1--N2____
308 | \/ | N5
309 | /\ |
310 N3--N4
311
312 C<N2> then tells C<N5> about the I<binds> of the other nodes it is
313 connected to, and C<N5> creates the rest of the connections:
314
315 /--------\
316 N1--N2____|
317 | \/ | N5
318 | /\ | /|
319 N3--N4--- |
320 \________/
321
322 All done: C<N5> is now happily connected to the rest of the network.
323
324 Apart form the obvious function - joining the network - seed nodes fulfill
325 another very important function: the connections created by connecting
326 to seed nodes are used to keep the network together - by trying to keep
327 connections to all seed nodes active, the network ensures that it will not
328 split into multiple networks without connection to each other.
329
330 This means that the graph created by all seed node connections must span
331 the whole network, in some way.
332
333 There are many ways of doing this - the most simple is probably to use
334 a single set of one or more seednodes as seednodes for all nodes in the
335 network - this creates a "hub" of seednodes that connect to each other,
336 and "leaf" nodes that connect to the nodes in the hub, keeping everything
337 together.
338
339 The process of joining a network takes time, during which the node is
340 already running. This also means it takes time until the node is fully
341 connected, and global groups and other information is available. The best
342 way to deal with this is to either retry regularly until you found the
343 resource you were looking for, or to only start services on demand after a
344 node has become available.
345
346 =head3 Registering the Receiver
347
348 Coming back to our example, we have now introduced the basic purpose of
349 L<AnyEvent::MP::Global> and C<configure> and its use of profiles. We
350 also set up our profiles for later use and now we will finally continue
351 talking about the receiver.
352
353 Let's look at the next line(s):
354
355 my $port = port;
356 my $db_guard = db_reg eg_receivers => $port;
357
358 The C<port> function has already been discussed. It simply creates a new
359 I<port> and returns the I<port ID>. The C<grp_reg> function, however, is
360 new: The first argument is the name of a I<global group>, and the second
361 argument is the I<port ID> to register in that group.
362
363 You can choose the name of such a I<global group> freely (prefixing your
364 package name is I<highly recommended> however and might be enforce din
365 future versions!). The purpose of such a group is to store a set of port
366 IDs. This set is made available throughout the L<AnyEvent::MP> network,
367 so that each node can see which ports belong to that group.
368
369 Later we will see how the sender looks for the ports in this global
370 group to send messages to them.
371
372 The last step in the example is to set up a receiver callback for those
373 messages, just as was discussed in the first example. We again match
374 for the tag C<test>. The difference is that this time we don't exit the
375 application after receiving the first message. Instead we continue to wait
376 for new messages indefinitely.
377
378 =head2 The Sender
379
380 Ok, now let's take a look at the sender code:
381
382 use AnyEvent;
383 use AnyEvent::MP;
384
385 configure nodeid => "eg_sender/%u", seeds => ["*:4040"];
386
387 my $find_timer =
388 AnyEvent->timer (after => 0, interval => 1, cb => sub {
389 my $ports = grp_get "eg_receivers"
390 or return;
391
392 snd $_, test => time
393 for @$ports;
394 });
395
396 AnyEvent->condvar->recv;
397
398 It's even less code. The C<configure> serves the same purpose as in the
399 receiver, but instead of specifying binds we specify a list of seeds -
400 which happens to be the same as the binds used by the receiver, which
401 becomes our seed node.
402
403 Next we set up a timer that repeatedly (every second) calls this chunk of
404 code:
405
406 my $ports = grp_get "eg_receivers"
407 or return;
408
409 snd $_, test => time
410 for @$ports;
411
412 The only new function here is the C<grp_get> function of
413 L<AnyEvent::MP::Global>. It searches in the global group named
414 C<eg_receivers> for ports. If none are found, it returns C<undef>, which
415 makes our code return instantly and wait for the next round, as nobody is
416 interested in our message.
417
418 As soon as the receiver application has connected and the information
419 about the newly added port in the receiver has propagated to the sender
420 node, C<grp_get> returns an array reference that contains the I<port ID> of
421 the receiver I<port(s)>.
422
423 We then just send a message with a tag and the current time to every
424 I<port> in the global group.
425
426 =head3 Splitting Network Configuration and Application Code
427
428 OK, so far, this works. In the real world, however, the person configuring
429 your application to run on a specific network (the end user or network
430 administrator) is often different to the person coding the application.
431
432 Or to put it differently: the arguments passed to configure are usually
433 provided not by the programmer, but by whoever is deploying the program.
434
435 To make this easy, AnyEvent::MP supports a simple configuration database,
436 using profiles, which can be managed using the F<aemp> command-line
437 utility (yes, this section is about the advanced tinkering we mentioned
438 before).
439
440 When you change both programs above to simply call
441
442 configure;
443
444 then AnyEvent::MP tries to look up a profile using the current node name
445 in its configuration database, falling back to some global default.
446
447 You can run "generic" nodes using the F<aemp> utility as well, and we will
448 exploit this in the following way: we configure a profile "seed" and run
449 a node using it, whose sole purpose is to be a seed node for our example
450 programs.
451
452 We bind the seed node to port 4040 on all interfaces:
453
454 aemp profile seed binds "*:4040"
455
456 And we configure all nodes to use this as seed node (this only works when
457 running on the same host, for multiple machines you would provide the IP
458 address or hostname of the node running the seed), and use a random name
459 (because we want to start multiple nodes on the same host):
460
461 aemp seeds "*:4040" nodeid anon/
462
463 Then we run the seed node:
464
465 aemp run profile seed
466
467 After that, we can start as many other nodes as we want, and they will all
468 use our generic seed node to discover each other.
469
470 In fact, starting many receivers nicely illustrates that the time sender
471 can have multiple receivers.
472
473 That's all for now - next we will teach you about monitoring by writing a
474 simple chat client and server :)
475
476 =head1 PART 2: Monitoring, Supervising, Exception Handling and Recovery
477
478 That's a mouthful, so what does it mean? Our previous example is what one
479 could call "very loosely coupled" - the sender doesn't care about whether
480 there are any receivers, and the receivers do not care if there is any
481 sender.
482
483 This can work fine for simple services, but most real-world applications
484 want to ensure that the side they are expecting to be there is actually
485 there. Going one step further: most bigger real-world applications even
486 want to ensure that if some component is missing, or has crashed, it will
487 still be there, by recovering and restarting the service.
488
489 AnyEvent::MP supports this by catching exceptions and network problems,
490 and notifying interested parties of this.
491
492 =head2 Exceptions, Port Context, Network Errors and Monitors
493
494 =head3 Exceptions
495
496 Exceptions are handled on a per-port basis: receive callbacks are executed
497 in a special context, the so-called I<port-context>: code that throws an
498 otherwise uncaught exception will cause the port to be C<kil>led. Killed
499 ports are destroyed automatically (killing ports is the only way to free
500 ports, incidentally).
501
502 Ports can be monitored, even from a different host, and when a port is
503 killed any entity monitoring it will be notified.
504
505 Here is a simple example:
506
507 use AnyEvent::MP;
508
509 # create a port, it always dies
510 my $port = port { die "oops" };
511
512 # monitor it
513 mon $port, sub {
514 warn "$port was killed (with reason @_)";
515 };
516
517 # now send it some message, causing it to die:
518 snd $port;
519
520 It first creates a port whose only action is to throw an exception,
521 and the monitors it with the C<mon> function. Afterwards it sends it a
522 message, causing it to die and call the monitoring callback:
523
524 anon/6WmIpj.a was killed (with reason die oops at xxx line 5.) at xxx line 9.
525
526 The callback was actually passed two arguments: C<die> (to indicate it did
527 throw an exception as opposed to, say, a network error) and the exception
528 message itself.
529
530 What happens when a port is killed before we have a chance to monitor
531 it? Granted, this is highly unlikely in our example, but when you program
532 in a network this can easily happen due to races between nodes.
533
534 use AnyEvent::MP;
535
536 my $port = port { die "oops" };
537
538 snd $port;
539
540 mon $port, sub {
541 warn "$port was killed (with reason @_)";
542 };
543
544 This time we will get something like:
545
546 anon/zpX.a was killed (with reason no_such_port cannot monitor nonexistent port)
547
548 Since the port was already gone, the kill reason is now C<no_such_port>
549 with some descriptive (we hope) error message.
550
551 In fact, the kill reason is usually some identifier as first argument
552 and a human-readable error message as second argument, but can be about
553 anything (it's a list) or even nothing - which is called a "normal" kill.
554
555 You can kill ports manually using the C<kil> function, which will be
556 treated like an error when any reason is specified:
557
558 kil $port, custom_error => "don't like your steenking face";
559
560 And a clean kill without any reason arguments:
561
562 kil $port;
563
564 By now you probably wonder what this "normal" kill business is: A common
565 idiom is to not specify a callback to C<mon>, but another port, such as
566 C<$SELF>:
567
568 mon $port, $SELF;
569
570 This basically means "monitor $port and kill me when it crashes". And a
571 "normal" kill does not count as a crash. This way you can easily link
572 ports together and make them crash together on errors (but allow you to
573 remove a port silently).
574
575 =head3 Port Context
576
577 When code runs in an environment where C<$SELF> contains its own port ID
578 and exceptions will be caught, it is said to run in a port context.
579
580 Since AnyEvent::MP is event-based, it is not uncommon to register
581 callbacks from C<rcv> handlers. As example, assume that the port receive
582 handler wants to C<die> a second later, using C<after>:
583
584 my $port = port {
585 after 1, sub { die "oops" };
586 };
587
588 Then you will find it does not work - when the after callback is executed,
589 it does not run in port context anymore, so exceptions will not be caught.
590
591 For these cases, AnyEvent::MP exports a special "closure constructor"
592 called C<psub>, which works just like perls built-in C<sub>:
593
594 my $port = port {
595 after 1, psub { die "oops" };
596 };
597
598 C<psub> stores C<$SELF> and returns a code reference. When the code
599 reference is invoked, it will run the code block within the context of
600 that port, so exception handling once more works as expected.
601
602 There is also a way to temporarily execute code in the context of some
603 port, namely C<peval>:
604
605 peval $port, sub {
606 # die'ing here will kil $port
607 };
608
609 The C<peval> function temporarily replaces C<$SELF> by the given C<$port>
610 and then executes the given sub in a port context.
611
612 =head3 Network Errors and the AEMP Guarantee
613
614 I mentioned another important source of monitoring failures: network
615 problems. When a node loses connection to another node, it will invoke all
616 monitoring actions as if the port was killed, even if it is possible that
617 the port still lives happily on another node (not being able to talk to a
618 node means we have no clue what's going on with it, it could be crashed,
619 but also still running without knowing we lost the connection).
620
621 So another way to view monitors is "notify me when some of my messages
622 couldn't be delivered". AEMP has a guarantee about message delivery to a
623 port: After starting a monitor, any message sent to a port will either
624 be delivered, or, when it is lost, any further messages will also be lost
625 until the monitoring action is invoked. After that, further messages
626 I<might> get delivered again.
627
628 This doesn't sound like a very big guarantee, but it is kind of the best
629 you can get while staying sane: Specifically, it means that there will
630 be no "holes" in the message sequence: all messages sent are delivered
631 in order, without any missing in between, and when some were lost, you
632 I<will> be notified of that, so you can take recovery action.
633
634 =head3 Supervising
635
636 Ok, so what is this crashing-everything-stuff going to make applications
637 I<more> stable? Well in fact, the goal is not really to make them more
638 stable, but to make them more resilient against actual errors and
639 crashes. And this is not done by crashing I<everything>, but by crashing
640 everything except a supervisor.
641
642 A supervisor is simply some code that ensures that an application (or a
643 part of it) is running, and if it crashes, is restarted properly.
644
645 To show how to do all this we will create a simple chat server that can
646 handle many chat clients. Both server and clients can be killed and
647 restarted, and even crash, to some extent.
648
649 =head2 Chatting, the Resilient Way
650
651 Without further ado, here is the chat server (to run it, we assume the
652 set-up explained earlier, with a separate F<aemp run> seed node):
653
654 use common::sense;
655 use AnyEvent::MP;
656 use AnyEvent::MP::Global;
657
658 configure;
659
660 my %clients;
661
662 sub msg {
663 print "relaying: $_[0]\n";
664 snd $_, $_[0]
665 for values %clients;
666 }
667
668 our $server = port;
669
670 rcv $server, join => sub {
671 my ($client, $nick) = @_;
672
673 $clients{$client} = $client;
674
675 mon $client, sub {
676 delete $clients{$client};
677 msg "$nick (quits, @_)";
678 };
679 msg "$nick (joins)";
680 };
681
682 rcv $server, privmsg => sub {
683 my ($nick, $msg) = @_;
684 msg "$nick: $msg";
685 };
686
687 grp_reg eg_chat_server => $server;
688
689 warn "server ready.\n";
690
691 AnyEvent->condvar->recv;
692
693 Looks like a lot, but it is actually quite simple: after your usual
694 preamble (this time we use common sense), we define a helper function that
695 sends some message to every registered chat client:
696
697 sub msg {
698 print "relaying: $_[0]\n";
699 snd $_, $_[0]
700 for values %clients;
701 }
702
703 The clients are stored in the hash C<%client>. Then we define a server
704 port and install two receivers on it, C<join>, which is sent by clients
705 to join the chat, and C<privmsg>, that clients use to send actual chat
706 messages.
707
708 C<join> is most complicated. It expects the client port and the nickname
709 to be passed in the message, and registers the client in C<%clients>.
710
711 rcv $server, join => sub {
712 my ($client, $nick) = @_;
713
714 $clients{$client} = $client;
715
716 The next step is to monitor the client. The monitoring action removes the
717 client and sends a quit message with the error to all remaining clients.
718
719 mon $client, sub {
720 delete $clients{$client};
721 msg "$nick (quits, @_)";
722 };
723
724 And finally, it creates a join message and sends it to all clients.
725
726 msg "$nick (joins)";
727 };
728
729 The C<privmsg> callback simply broadcasts the message to all clients:
730
731 rcv $server, privmsg => sub {
732 my ($nick, $msg) = @_;
733 msg "$nick: $msg";
734 };
735
736 And finally, the server registers itself in the server group, so that
737 clients can find it:
738
739 grp_reg eg_chat_server => $server;
740
741 Well, well... and where is this supervisor stuff? Well... we cheated,
742 it's not there. To not overcomplicate the example, we only put it into
743 the..... CLIENT!
744
745 =head3 The Client, and a Supervisor!
746
747 Again, here is the client, including supervisor, which makes it a bit
748 longer:
749
750 use common::sense;
751 use AnyEvent::MP;
752 use AnyEvent::MP::Global;
753
754 my $nick = shift;
755
756 configure;
757
758 my ($client, $server);
759
760 sub server_connect {
761 my $servernodes = grp_get "eg_chat_server"
762 or return after 1, \&server_connect;
763
764 print "\rconnecting...\n";
765
766 $client = port { print "\r \r@_\n> " };
767 mon $client, sub {
768 print "\rdisconnected @_\n";
769 &server_connect;
770 };
771
772 $server = $servernodes->[0];
773 snd $server, join => $client, $nick;
774 mon $server, $client;
775 }
776
777 server_connect;
778
779 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
780 chomp (my $line = <STDIN>);
781 print "> ";
782 snd $server, privmsg => $nick, $line
783 if $server;
784 });
785
786 $| = 1;
787 print "> ";
788 AnyEvent->condvar->recv;
789
790 The first thing the client does is to store the nick name (which is
791 expected as the only command line argument) in C<$nick>, for further
792 usage.
793
794 The next relevant thing is... finally... the supervisor:
795
796 sub server_connect {
797 my $servernodes = grp_get "eg_chat_server"
798 or return after 1, \&server_connect;
799
800 This looks up the server in the C<eg_chat_server> global group. If it
801 cannot find it (which is likely when the node is just starting up),
802 it will wait a second and then retry. This "wait a bit and retry"
803 is an important pattern, as distributed programming means lots of
804 things are going on asynchronously. In practise, one should use a more
805 intelligent algorithm, to possibly warn after an excessive number of
806 retries. Hopefully future versions of AnyEvent::MP will offer some
807 predefined supervisors, for now you will have to code it on your own.
808
809 Next it creates a local port for the server to send messages to, and
810 monitors it. When the port is killed, it will print "disconnected" and
811 tell the supervisor function to retry again.
812
813 $client = port { print "\r \r@_\n> " };
814 mon $client, sub {
815 print "\rdisconnected @_\n";
816 &server_connect;
817 };
818
819 Then everything is ready: the client will send a C<join> message with it's
820 local port to the server, and start monitoring it:
821
822 $server = $servernodes->[0];
823 snd $server, join => $client, $nick;
824 mon $server, $client;
825 }
826
827 The monitor will ensure that if the server crashes or goes away, the
828 client will be killed as well. This tells the user that the client was
829 disconnected, and will then start to connect the server again.
830
831 The rest of the program deals with the boring details of actually invoking
832 the supervisor function to start the whole client process and handle the
833 actual terminal input, sending it to the server.
834
835 You should now try to start the server and one or more clients in different
836 terminal windows (and the seed node):
837
838 perl eg/chat_client nick1
839 perl eg/chat_client nick2
840 perl eg/chat_server
841 aemp run profile seed
842
843 And then you can experiment with chatting, killing one or more clients, or
844 stopping and restarting the server, to see the monitoring in action.
845
846 The crucial point you should understand from this example is that
847 monitoring is usually symmetric: when you monitor some other port,
848 potentially on another node, that other port usually should monitor you,
849 too, so when the connection dies, both ports get killed, or at least both
850 sides can take corrective action. Exceptions are "servers" that serve
851 multiple clients at once and might only wish to clean up, and supervisors,
852 who of course should not normally get killed (unless they, too, have a
853 supervisor).
854
855 If you often think in object-oriented terms, then treat a port as an
856 object, C<port> is the constructor, the receive callbacks set by C<rcv>
857 act as methods, the C<kil> function becomes the explicit destructor and
858 C<mon> installs a destructor hook. Unlike conventional object oriented
859 programming, it can make sense to exchange ports more freely (for example,
860 to monitor one port from another).
861
862 There is ample room for improvement: the server should probably remember
863 the nickname in the C<join> handler instead of expecting it in every chat
864 message, it should probably monitor itself, and the client should not try
865 to send any messages unless a server is actually connected.
866
867 =head1 PART 3: TIMTOWTDI: Virtual Connections
868
869 The chat system developed in the previous sections is very "traditional"
870 in a way: you start some server(s) and some clients statically and they
871 start talking to each other.
872
873 Sometimes applications work more like "services": They can run on almost
874 any node and talks to itself on other nodes. The L<AnyEvent::MP::Global>
875 service for example monitors nodes joining the network and starts itself
876 automatically on other nodes (if it isn't running already).
877
878 A good way to design such applications is to put them into a module and
879 create "virtual connections" to other nodes - we call this the "bridge
880 head" method, because you start by creating a remote port (the bridge
881 head) and from that you start to bootstrap your application.
882
883 Since that sounds rather theoretical, let's redesign the chat server and
884 client using this design method.
885
886 Here is the server:
887
888 use common::sense;
889 use AnyEvent::MP;
890 use AnyEvent::MP::Global;
891
892 configure;
893
894 grp_reg eg_chat_server2 => $NODE;
895
896 my %clients;
897
898 sub msg {
899 print "relaying: $_[0]\n";
900 snd $_, $_[0]
901 for values %clients;
902 }
903
904 sub client_connect {
905 my ($client, $nick) = @_;
906
907 mon $client;
908 mon $client, sub {
909 delete $clients{$client};
910 msg "$nick (quits, @_)";
911 };
912
913 $clients{$client} = $client;
914
915 msg "$nick (joins)";
916
917 rcv $SELF, sub { msg "$nick: $_[0]" };
918 }
919
920 warn "server ready.\n";
921
922 AnyEvent->condvar->recv;
923
924 It starts out not much different then the previous example, except that
925 this time, we register the node port in the global group and not any port
926 we created - the clients only want to know which node the server should be
927 running on. In fact, they could also use some kind of election mechanism,
928 to find the node with lowest load or something like that.
929
930 The more interesting change is that indeed no server port is created -
931 the server consists only of code, and "does" nothing by itself. All it
932 does is define a function C<client_connect>, which expects a client port
933 and a nick name as arguments. It then monitors the client port and binds
934 a receive callback on C<$SELF>, which expects messages that in turn are
935 broadcast to all clients.
936
937 The two C<mon> calls are a bit tricky - the first C<mon> is a shorthand
938 for C<mon $client, $SELF>. The second does the normal "client has gone
939 away" clean-up action. Both could actually be rolled into one C<mon>
940 action.
941
942 C<$SELF> is a good hint that something interesting is going on. And
943 indeed, when looking at the client code, there is a new function,
944 C<spawn>:
945
946 use common::sense;
947 use AnyEvent::MP;
948 use AnyEvent::MP::Global;
949
950 my $nick = shift;
951
952 configure;
953
954 $| = 1;
955
956 my $port = port;
957
958 my ($client, $server);
959
960 sub server_connect {
961 my $servernodes = grp_get "eg_chat_server2"
962 or return after 1, \&server_connect;
963
964 print "\rconnecting...\n";
965
966 $client = port { print "\r \r@_\n> " };
967 mon $client, sub {
968 print "\rdisconnected @_\n";
969 &server_connect;
970 };
971
972 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
973 mon $server, $client;
974 }
975
976 server_connect;
977
978 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
979 chomp (my $line = <STDIN>);
980 print "> ";
981 snd $server, $line
982 if $server;
983 });
984
985 print "> ";
986 AnyEvent->condvar->recv;
987
988 The client is quite similar to the previous one, but instead of contacting
989 the server I<port> (which no longer exists), it C<spawn>s (creates) a new
990 the server I<port on node>:
991
992 $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
993 mon $server, $client;
994
995 And of course the first thing after creating it is monitoring it.
996
997 The C<spawn> function creates a new port on a remote node and returns
998 its port ID. After creating the port it calls a function on the remote
999 node, passing any remaining arguments to it, and - most importantly -
1000 executes the function within the context of the new port, so it can be
1001 manipulated by referring to C<$SELF>. The init function can reside in a
1002 module (actually it normally I<should> reside in a module) - AnyEvent::MP
1003 will automatically load the module if the function isn't defined.
1004
1005 The C<spawn> function returns immediately, which means you can instantly
1006 send messages to the port, long before the remote node has even heard
1007 of our request to create a port on it. In fact, the remote node might
1008 not even be running. Despite these troubling facts, everything should
1009 work just fine: if the node isn't running (or the init function throws an
1010 exception), then the monitor will trigger because the port doesn't exist.
1011
1012 If the spawn message gets delivered, but the monitoring message is not
1013 because of network problems (extremely unlikely, but monitoring, after
1014 all, is implemented by passing a message, and messages can get lost), then
1015 this connection loss will eventually trigger the monitoring action. On the
1016 remote node (which in return monitors the client) the port will also be
1017 cleaned up on connection loss. When the remote node comes up again and our
1018 monitoring message can be delivered, it will instantly fail because the
1019 port has been cleaned up in the meantime.
1020
1021 If your head is spinning by now, that's fine - just keep in mind, after
1022 creating a port, monitor it on the local node, and monitor "the other
1023 side" from the remote node, and all will be cleaned up just fine.
1024
1025 =head2 Services
1026
1027 Above it was mentioned that C<spawn> automatically loads modules, and this
1028 can be exploited in various ways.
1029
1030 Assume for a moment you put the server into a file called
1031 F<mymod/chatserver.pm> reachable from the current directory. Then you
1032 could run a node there with:
1033
1034 aemp run
1035
1036 The other nodes could C<spawn> the server by using
1037 C<mymod::chatserver::client_connect> as init function.
1038
1039 Likewise, when you have some service that starts automatically (similar to
1040 AnyEvent::MP::Global), then you can configure this service statically:
1041
1042 aemp profile mysrvnode services mymod::service::
1043 aemp run profile mysrvnode
1044
1045 And the module will automatically be loaded in the node, as specifying a
1046 module name (with C<::>-suffix) will simply load the module, which is then
1047 free to do whatever it wants.
1048
1049 Of course, you can also do it in the much more standard way by writing
1050 a module (e.g. C<BK::Backend::IRC>), installing it as part of a module
1051 distribution and then configure nodes, for example, if I want to run the
1052 Bummskraut IRC backend on a machine named "ruth", I could do this:
1053
1054 aemp profile ruth addservice BK::Backend::IRC::
1055
1056 And any F<aemp run> on that host will automatically have the Bummskraut
1057 IRC backend running.
1058
1059 That's plenty of possibilities you can use - it's all up to you how you
1060 structure your application.
1061
1062 =head1 PART 4: Coro::MP - selective receive
1063
1064 Not all problems lend themselves naturally to an event-based solution:
1065 sometimes things are easier if you can decide in what order you want to
1066 receive messages, irregardless of the order in which they were sent.
1067
1068 In these cases, L<Coro::MP> can provide a nice solution: instead of
1069 registering callbacks for each message type, C<Coro::MP> attached a
1070 (coro-) thread to a port. The thread can then opt to selectively receive
1071 messages it is interested in. Other messages are not lost, but queued, and
1072 can be received at a later time.
1073
1074 The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate
1075 module. It is, however, tightly integrated into C<AnyEvent::MP> - the
1076 ports it creates are fully compatible to C<AnyEvent::MP> ports.
1077
1078 In fact, C<Coro::MP> is more of an extension than a separate module: all
1079 functions exported by C<AnyEvent::MP> are exported by it as well.
1080
1081 To illustrate how programing with C<Coro::MP> looks like, consider the
1082 following (slightly contrived) example: Let's implement a server that
1083 accepts a C<< (write_file =>, $port, $path) >> message with a (source)
1084 port and a filename, followed by as many C<< (data => $port, $data) >>
1085 messages as required to fill the file, followed by an empty C<< (data =>
1086 $port) >> message.
1087
1088 The server only writes a single file at a time, other requests will stay
1089 in the queue until the current file has been finished.
1090
1091 Here is an example implementation that uses L<Coro::AIO> and largely
1092 ignores error handling:
1093
1094 my $ioserver = port_async {
1095 while () {
1096 my ($tag, $port, $path) = get_cond;
1097
1098 $tag eq "write_file"
1099 or die "only write_file messages expected";
1100
1101 my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1102 or die "$path: $!";
1103
1104 while () {
1105 my (undef, undef, $data) = get_cond {
1106 $_[0] eq "data" && $_[1] eq $port
1107 } 5
1108 or die "timeout waiting for data message from $port\n";
1109
1110 length $data or last;
1111
1112 aio_write $fh, undef, undef, $data, 0;
1113 };
1114 }
1115 };
1116
1117 mon $ioserver, sub {
1118 warn "ioserver was killed: @_\n";
1119 };
1120
1121 Let's go through it part by part.
1122
1123 my $ioserver = port_async {
1124
1125 Ports can be created by attaching a thread to an existing port via
1126 C<rcv_async>, or as here by calling C<port_async> with the code to execute
1127 as a thread. The C<async> component comes from the fact that threads are
1128 created using the C<Coro::async> function.
1129
1130 The thread runs in a normal port context (so C<$SELF> is set). In
1131 addition, when the thread returns, it will be C<kil> I<normally>, i.e.
1132 without a reason argument.
1133
1134 while () {
1135 my ($tag, $port, $path) = get_cond;
1136 or die "only write_file messages expected";
1137
1138 The thread is supposed to serve many file writes, which is why it executes
1139 in a loop. The first thing it does is fetch the next message, using
1140 C<get_cond>, the "conditional message get". Without a condition, it simply
1141 fetches the next message from the queue, which I<must> be a C<write_file>
1142 message.
1143
1144 The message contains the C<$path> to the file, which is then created:
1145
1146 my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1147 or die "$path: $!";
1148
1149 Then we enter a loop again, to serve as many C<data> messages as
1150 necessary:
1151
1152 while () {
1153 my (undef, undef, $data) = get_cond {
1154 $_[0] eq "data" && $_[1] eq $port
1155 } 5
1156 or die "timeout waiting for data message from $port\n";
1157
1158 This time, the condition is not empty, but instead a code block: similarly
1159 to grep, the code block will be called with C<@_> set to each message in
1160 the queue, and it has to return whether it wants to receive the message or
1161 not.
1162
1163 In this case we are interested in C<data> messages (C<< $_[0] eq "data"
1164 >>), whose first element is the source port (C<< $_[1] eq $port >>).
1165
1166 The condition must be this strict, as it is possible to receive both
1167 C<write_file> messages and C<data> messages from other ports while we
1168 handle the file writing.
1169
1170 The lone C<5> at the end is a timeout - when no matching message is
1171 received within C<5> seconds, we assume an error and C<die>.
1172
1173 When an empty C<data> message is received we are done and can close the
1174 file (which is done automatically as C<$fh> goes out of scope):
1175
1176 length $data or last;
1177
1178 Otherwise we need to write the data:
1179
1180 aio_write $fh, undef, undef, $data, 0;
1181
1182 That's basically it. Note that every process should have some kind of
1183 supervisor. In our case, the supervisor simply prints any error message:
1184
1185 mon $ioserver, sub {
1186 warn "ioserver was killed: @_\n";
1187 };
1188
1189 Here is a usage example:
1190
1191 port_async {
1192 snd $ioserver, write_file => $SELF, "/tmp/unsafe";
1193 snd $ioserver, data => $SELF, "abc\n";
1194 snd $ioserver, data => $SELF, "def\n";
1195 snd $ioserver, data => $SELF;
1196 };
1197
1198 The messages are sent without any flow control or acknowledgement (feel
1199 free to improve). Also, the source port does not actually need to be a
1200 port - any unique ID will do - but port identifiers happen to be a simple
1201 source of network-wide unique IDs.
1202
1203 Apart from C<get_cond> as seen above, there are other ways to receive
1204 messages. The C<write_file> message above could also selectively be
1205 received using a C<get> call:
1206
1207 my ($port, $path) = get "write_file";
1208
1209 This is simpler, but when some other code part sends an unexpected message
1210 to the C<$ioserver> it will stay in the queue forever. As a rule of thumb,
1211 every threaded port should have a "fetch next message unconditionally"
1212 somewhere, to avoid filling up the queue.
1213
1214 It is also possible to switch-like C<get_conds>:
1215
1216 get_cond {
1217 $_[0] eq "msg1" and return sub {
1218 my (undef, @msg1_data) = @_;
1219 ...;
1220 };
1221
1222 $_[0] eq "msg2" and return sub {
1223 my (undef, @msg2_data) = @_;
1224 ...;
1225 };
1226
1227 die "unexpected message $_[0] received";
1228 };
1229
1230 =head1 THE END
1231
1232 This is the end of this introduction, but hopefully not the end of
1233 your career as AEMP user. I hope the tutorial was enough to make the
1234 basic concepts clear. Keep in mind that distributed programming is not
1235 completely trivial, that AnyEvent::MP is still in it's infancy, and I hope
1236 it will be useful to create exciting new applications.
1237
1238 =head1 SEE ALSO
1239
1240 L<AnyEvent::MP>
1241
1242 L<AnyEvent::MP::Global>
1243
1244 L<Coro::MP>
1245
1246 L<AnyEvent>
1247
1248 =head1 AUTHOR
1249
1250 Robin Redeker <elmex@ta-sa.org>
1251 Marc Lehmann <schmorp@schmorp.de>
1252