ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Intro.pod
Revision: 1.53
Committed: Wed Mar 21 01:36:59 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.52: +28 -25 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.4 =head1 Message Passing for the Non-Blocked Mind
2 elmex 1.1
3 root 1.8 =head1 Introduction and Terminology
4 elmex 1.1
5 root 1.4 This is a tutorial about how to get the swing of the new L<AnyEvent::MP>
6 root 1.23 module, which allows programs to transparently pass messages within the
7     process and to other processes on the same or a different host.
8 elmex 1.1
9 root 1.23 What kind of messages? Basically a message here means a list of Perl
10 root 1.15 strings, numbers, hashes and arrays, anything that can be expressed as a
11 root 1.43 L<JSON> text (as JSON is the default serialiser in the protocol). Here are
12     two examples:
13 elmex 1.1
14 root 1.23 write_log => 1251555874, "action was successful.\n"
15     123, ["a", "b", "c"], { foo => "bar" }
16 elmex 1.21
17 root 1.23 When using L<AnyEvent::MP> it is customary to use a descriptive string as
18 root 1.46 first element of a message that indicates the type of the message. This
19 root 1.23 element is called a I<tag> in L<AnyEvent::MP>, as some API functions
20     (C<rcv>) support matching it directly.
21    
22     Supposedly you want to send a ping message with your current time to
23     somewhere, this is how such a message might look like (in Perl syntax):
24    
25     ping => 1251381636
26    
27     Now that we know what a message is, to which entities are those
28     messages being I<passed>? They are I<passed> to I<ports>. A I<port> is
29     a destination for messages but also a context to execute code: when
30     a runtime error occurs while executing code belonging to a port, the
31     exception will be raised on the port and can even travel to interested
32     parties on other nodes, which makes supervision of distributed processes
33     easy.
34    
35     How do these ports relate to things you know? Each I<port> belongs
36     to a I<node>, and a I<node> is just the UNIX process that runs your
37     L<AnyEvent::MP> application.
38    
39     Each I<node> is distinguished from other I<nodes> running on the same or
40     another host in a network by its I<node ID>. A I<node ID> is simply a
41     unique string chosen manually or assigned by L<AnyEvent::MP> in some way
42     (UNIX nodename, random string...).
43    
44     Here is a diagram about how I<nodes>, I<ports> and UNIX processes relate
45     to each other. The setup consists of two nodes (more are of course
46     possible): Node C<A> (in UNIX process 7066) with the ports C<ABC> and
47     C<DEF>. And the node C<B> (in UNIX process 8321) with the ports C<FOO> and
48     C<BAR>.
49 elmex 1.17
50    
51     |- PID: 7066 -| |- PID: 8321 -|
52     | | | |
53     | Node ID: A | | Node ID: B |
54     | | | |
55     | Port ABC =|= <----\ /-----> =|= Port FOO |
56     | | X | |
57     | Port DEF =|= <----/ \-----> =|= Port BAR |
58     | | | |
59     |-------------| |-------------|
60    
61 root 1.23 The strings for the I<port IDs> here are just for illustrative
62     purposes: Even though I<ports> in L<AnyEvent::MP> are also identified by
63 root 1.43 strings, they can't be chosen manually and are assigned by the system
64 root 1.23 dynamically. These I<port IDs> are unique within a network and can also be
65 root 1.46 used to identify senders, or even as message tags for instance.
66 root 1.23
67     The next sections will explain the API of L<AnyEvent::MP> by going through
68     a few simple examples. Later some more complex idioms are introduced,
69     which are hopefully useful to solve some real world problems.
70 root 1.8
71 root 1.39 =head2 Passing Your First Message
72 elmex 1.16
73 root 1.46 For starters, let's have a look at the messaging API. The following
74     example is just a demo to show the basic elements of message passing with
75 root 1.24 L<AnyEvent::MP>.
76    
77     The example should print: C<Ending with: 123>, in a rather complicated
78     way, by passing some message to a port.
79 elmex 1.16
80     use AnyEvent;
81     use AnyEvent::MP;
82    
83     my $end_cv = AnyEvent->condvar;
84    
85     my $port = port;
86    
87     rcv $port, test => sub {
88     my ($data) = @_;
89     $end_cv->send ($data);
90     };
91    
92     snd $port, test => 123;
93    
94     print "Ending with: " . $end_cv->recv . "\n";
95    
96 root 1.24 It already uses most of the essential functions inside
97 root 1.46 L<AnyEvent::MP>: First there is the C<port> function which creates a
98 root 1.24 I<port> and will return it's I<port ID>, a simple string.
99    
100     This I<port ID> can be used to send messages to the port and install
101     handlers to receive messages on the port. Since it is a simple string
102     it can be safely passed to other I<nodes> in the network when you want
103     to refer to that specific port (usually used for RPC, where you need
104     to tell the other end which I<port> to send the reply to - messages in
105     L<AnyEvent::MP> have a destination, but no source).
106 elmex 1.17
107 root 1.24 The next function is C<rcv>:
108 elmex 1.16
109 elmex 1.17 rcv $port, test => sub { ... };
110 elmex 1.16
111 root 1.24 It installs a receiver callback on the I<port> that specified as the first
112     argument (it only works for "local" ports, i.e. ports created on the same
113     node). The next argument, in this example C<test>, specifies a I<tag> to
114     match. This means that whenever a message with the first element being
115     the string C<test> is received, the callback is called with the remaining
116 elmex 1.17 parts of that message.
117    
118 root 1.24 Messages can be sent with the C<snd> function, which is used like this in
119     the example above:
120 elmex 1.17
121     snd $port, test => 123;
122    
123 root 1.24 This will send the message C<'test', 123> to the I<port> with the I<port
124     ID> stored in C<$port>. Since in this case the receiver has a I<tag> match
125     on C<test> it will call the callback with the first argument being the
126     number C<123>.
127    
128 root 1.43 The callback is a typical AnyEvent idiom: the callback just passes
129 root 1.24 that number on to the I<condition variable> C<$end_cv> which will then
130     pass the value to the print. Condition variables are out of the scope
131     of this tutorial and not often used with ports, so please consult the
132 elmex 1.17 L<AnyEvent::Intro> about them.
133    
134 root 1.24 Passing messages inside just one process is boring. Before we can move on
135     and do interprocess message passing we first have to make sure some things
136     have been set up correctly for our nodes to talk to each other.
137 elmex 1.17
138 root 1.39 =head2 System Requirements and System Setup
139 elmex 1.17
140 root 1.25 Before we can start with real IPC we have to make sure some things work on
141     your system.
142 elmex 1.17
143 root 1.25 First we have to setup a I<shared secret>: for two L<AnyEvent::MP>
144     I<nodes> to be able to communicate with each other over the network it is
145     necessary to setup the same I<shared secret> for both of them, so they can
146     prove their trustworthyness to each other.
147 elmex 1.17
148     The easiest way is to set this up is to use the F<aemp> utility:
149    
150     aemp gensecret
151    
152 root 1.25 This creates a F<$HOME/.perl-anyevent-mp> config file and generates a
153     random shared secret. You can copy this file to any other system and
154     then communicate over the network (via TCP) with it. You can also select
155     your own shared secret (F<aemp setsecret>) and for increased security
156     requirements you can even create (or configure) a TLS certificate (F<aemp
157     gencert>), causing connections to not just be securely authenticated, but
158     also to be encrypted and protected against tinkering.
159    
160     Connections will only be successfully established when the I<nodes>
161     that want to connect to each other have the same I<shared secret> (or
162     successfully verify the TLS certificate of the other side, in which case
163     no shared secret is required).
164 elmex 1.17
165     B<If something does not work as expected, and for example tcpdump shows
166     that the connections are closed almost immediately, you should make sure
167     that F<~/.perl-anyevent-mp> is the same on all hosts/user accounts that
168     you try to connect with each other!>
169 elmex 1.16
170 root 1.25 Thats is all for now, you will find some more advanced fiddling with the
171     C<aemp> utility later.
172    
173 root 1.35 =head2 Shooting the Trouble
174    
175     Sometimes things go wrong, and AnyEvent::MP, being a professional module,
176 root 1.43 does not gratuitously spill out messages to your screen.
177 root 1.35
178     To help troubleshooting any issues, there are two environment variables
179     that you can set. The first, C<PERL_ANYEVENT_MP_WARNLEVEL> sets the
180     logging level. The default is C<5>, which means nothing much is
181 root 1.43 printed. You can increase it to C<8> or C<9> to get more verbose
182 root 1.35 output. This is example output when starting a node:
183    
184 root 1.46 2012-03-04 19:41:10 <8> node cerebro starting up.
185     2012-03-04 19:41:10 <8> node listens on [10.0.0.1:4040].
186     2012-03-04 19:41:10 <9> trying connect to seed node 10.0.0.19:4040.
187     2012-03-04 19:41:10 <9> 10.0.0.19:4040 connected as rain
188     2012-03-04 19:41:10 <7> rain is up ()
189 root 1.35
190     A lot of info, but at least you can see that it does something.
191    
192     The other environment variable that can be useful is
193     C<PERL_ANYEVENT_MP_TRACE>, which, when set to a true value, will cause
194 root 1.46 most messages that are sent or received to be printed. For example, F<aemp
195     restart rijk> might output these message exchanges:
196 root 1.35
197 root 1.46 SND rijk <- [null,"eval","AnyEvent::Watchdog::Util::restart; ()","aemp/cerebro/z4kUPp2JT4#b"]
198     SND rain <- [null,"g_slave",{"'l":{"aemp/cerebro/z4kUPp2JT4":["10.0.0.1:48168"]}}]
199     SND rain <- [null,"g_find","rijk"]
200     RCV rain -> ["","g_found","rijk",["10.0.0.23:4040"]]
201     RCV rijk -> ["b",""]
202 elmex 1.18
203 root 1.30 =head1 PART 1: Passing Messages Between Processes
204 elmex 1.18
205     =head2 The Receiver
206    
207 root 1.25 Lets split the previous example up into two programs: one that contains
208     the sender and one for the receiver. First the receiver application, in
209     full:
210 elmex 1.18
211     use AnyEvent;
212     use AnyEvent::MP;
213    
214 root 1.45 configure nodeid => "eg_receiver/%u", binds => ["*:4040"];
215 elmex 1.18
216     my $port = port;
217 root 1.47 db_set eg_receivers => $port;
218 elmex 1.18
219     rcv $port, test => sub {
220     my ($data, $reply_port) = @_;
221    
222     print "Received data: " . $data . "\n";
223     };
224    
225     AnyEvent->condvar->recv;
226    
227 root 1.51 Now, that wasn't too bad, was it? OK, let's go through the new functions
228 root 1.47 that have been used.
229 elmex 1.18
230 root 1.44 =head3 C<configure> and Joining and Maintaining the Network
231 elmex 1.18
232 root 1.47 First let's have a look at C<configure>:
233 elmex 1.18
234 root 1.47 configure nodeid => "eg_receiver/%u", binds => ["*:4040"];
235 elmex 1.18
236     Before we are able to send messages to other nodes we have to initialise
237 root 1.26 ourself to become a "distributed node". Initialising a node means naming
238 root 1.47 the node and binding some TCP listeners so that other nodes can
239     contact it.
240    
241     Additionally, to actually link all nodes in a network together, you can
242     specify a number of seed addresses, which will be used by the node to
243     connect itself into an existing network, as we will see shortly.
244 root 1.26
245 root 1.28 All of this (and more) can be passed to the C<configure> function - later
246     we will see how we can do all this without even passing anything to
247     C<configure>!
248    
249     The first parameter, C<nodeid>, specified the node ID (in this case
250 root 1.47 C<eg_receiver/%u> - the default is to use the node name of the current
251     host plus C</%u>, which goves the node a name with a random suffix to
252     make it unique, but for this example we want the node to have a bit more
253     personality, and name it C<eg_receiver> with a random suffix.
254    
255     Why the random suffix? Node IDs need to be unique within the network and
256     appending a random suffix is the easiest way to do that.
257 root 1.28
258     The second parameter, C<binds>, specifies a list of C<address:port> pairs
259     to bind TCP listeners on. The special "address" of C<*> means to bind on
260 root 1.47 every local IP address (this might not work on every OS, so explicit IP
261     addresses are best).
262 root 1.28
263     The reason to bind on a TCP port is not just that other nodes can connect
264     to us: if no binds are specified, the node will still bind on a dynamic
265     port on all local addresses - but in this case we won't know the port, and
266     cannot tell other nodes to connect to it as seed node.
267    
268 root 1.47 Now, a I<seed> is simply the TCP address of some other node in the
269     network, often the same string as used for the C<binds> parameter of the
270     other node. The need for seeds is easy to explain: I<somehow> the nodes
271     of an aemp network have to find each other, and often this means over the
272     internet. So broadcasts are out.
273    
274     Instead, a node usually specifies the addresses of a few (for redundancy)
275     other nodes, some of which should be up. Two nodes can set each other as
276     seeds without any issues. You could even specify all nodes as seeds for
277     all nodes, for total redundancy. But the common case is to have some more
278     or less central, stable servers running seed services for other nodes.
279    
280     All you need to do to ensure that an AnyEvent::MP network connects
281     together is to make sure that all connections from nodes to their seed
282     nodes I<somehow> span the whole network. The simplest way to do that would
283     be for all nodes to specify a single node as seed node, and you would get
284     a star topology. If you specify all nodes as seed nodes, you get a fully
285     meshed network (that's what previous releases of AnyEvent::MP actually
286     did).
287    
288     A node tries to keep connections open to all of it's seed nodes at all
289     times, while other connections are made on demand only.
290    
291     All of this ensures that the network stays one network - even if all the
292     nodes in one half of the net are separated from the nodes in the other
293     half by some network problem, once that is over, they will eventually
294     become a single network again.
295    
296     In addition to creating the network, a node also expects the seed nodes to
297     run the shared database service - if need be, by automatically starting it,
298     so you don't normally need to configure this explicitly.
299    
300     #TODO# later?#d#
301     The process of joining a network takes time, during which the node
302     is already running. This means it takes time until the node is
303     fully connected, and information about services in the network are
304     available. This is why most AnyEvent::MP programs start by waiting a while
305     until the information they need is available.
306    
307     We will see how this is done later, in the sender program.
308 elmex 1.19
309 root 1.28 =head3 Registering the Receiver
310 elmex 1.19
311 root 1.47 Coming back to our example, after the node has been configured for network
312     access, it is time to publish some service, namely the receive service.
313 elmex 1.19
314 root 1.47 For that, let's look at the next lines:
315 elmex 1.19
316     my $port = port;
317 root 1.47 db_set eg_receivers => $port;
318 elmex 1.19
319 root 1.27 The C<port> function has already been discussed. It simply creates a new
320 root 1.51 I<port> and returns the I<port ID>. The C<db_set> function, however, is
321 root 1.47 new: The first argument is the name of a I<database family> and the second
322     argument is the name of a I<subkey> within that family. The third argument
323     would be the I<value> to be associated with the family and subkey, but,
324     since it is missing, it will simply be C<undef>.
325    
326 root 1.51 What is a "family" you wonder? Well, AnyEvent::MP comes with a distributed
327     database. This database runs on so-called "global" nodes, which usually
328     are the seed nodes of your network. The database structure is "simply" a
329     hash of hashes of values.
330 root 1.47
331 root 1.51 To illustrate this with Perl syntax, assume the database was stored in
332     C<%DB>, then the C<db_set> function more or less would do this:
333 root 1.47
334     $DB{eg_receivers}{$port} = undef;
335    
336     So the ominous "family" selects a hash in the database, and the "subkey"
337 root 1.51 is simply the key in this hash - C<db_set> very much works like this
338 root 1.47 assignment.
339    
340     The family namespace is shared by all nodes in a network, so the names
341     should be reasonably unique, for example, they could start with the name
342 root 1.48 of your module, or the name of the program, using your port name or node
343     name as subkey.
344 root 1.27
345 root 1.47 The purpose behind adding this key to the database is that the sender can
346     look it up and find our port. We will shortly see how.
347 root 1.27
348     The last step in the example is to set up a receiver callback for those
349     messages, just as was discussed in the first example. We again match
350     for the tag C<test>. The difference is that this time we don't exit the
351     application after receiving the first message. Instead we continue to wait
352     for new messages indefinitely.
353 elmex 1.19
354 elmex 1.20 =head2 The Sender
355 root 1.8
356 root 1.48 OK, now let's take a look at the sender code:
357 root 1.4
358 elmex 1.1 use AnyEvent;
359     use AnyEvent::MP;
360    
361 root 1.45 configure nodeid => "eg_sender/%u", seeds => ["*:4040"];
362 elmex 1.1
363 root 1.47 my $guard = db_mon eg_receivers => sub {
364 root 1.50 my ($family, $a, $c, $d) = @_;
365 root 1.47 return unless %$family;
366    
367     # now there are some receivers, send them a message
368 root 1.50 snd $_ => test => time
369 root 1.47 for keys %$family;
370     };
371 elmex 1.1
372     AnyEvent->condvar->recv;
373    
374 root 1.28 It's even less code. The C<configure> serves the same purpose as in the
375 root 1.48 receiver, but instead of specifying binds we specify a list of seeds - the
376     only seed happens to be the same as the bind used by the receiver, which
377 root 1.47 therefore becomes our seed node.
378 root 1.27
379 root 1.48 Remember the part about having to wait till things become available? Well,
380     after configure returns, nothing has been done yet - the node is not
381     connected to the network, knows nothing about the database contents, and
382     it can take ages (for a computer :) for this situation to change.
383 root 1.47
384     Therefore, the sender waits, in this case by using the C<db_mon>
385     function. This function registers an interest in a specific database
386 root 1.48 family (in this case C<eg_receivers>). Each time something inside the
387     family changes (a key is added, changed or deleted), it will call our
388     callback with the family hash as first argument, and the list of keys as
389     second argument.
390    
391     The callback only checks whether the C<%$family> has is empty - if it is,
392     then it doesn't do anything. But eventually the family will contain the
393     port subkey we set in the sender. Then it will send a message to it (and
394     any other receiver in the same family). Likewise, should the receiver go
395     away and come back, or should another receiver come up, it will again send
396     a message to all of them.
397 root 1.47
398     You can experiment by having multiple receivers - you have to change the
399     "binds" parameter in the receiver to the seeds used in the sender to start
400     up additional receivers, but then you can start as many as you like. If
401     you specify proper IP addresses for the seeds, you can even run them on
402     different computers.
403    
404     Each time you start the sender, it will send a message to all receivers it
405 root 1.48 finds (you have to interrupt it manually afterwards).
406 root 1.47
407 root 1.51 Additional experiments you could try include using
408     C<PERL_ANYEVENT_MP_TRACE=1> to see which messages are exchanged, or
409     starting the sender before the receiver and see how long it then takes to
410     find the receiver.
411 root 1.27
412 root 1.28 =head3 Splitting Network Configuration and Application Code
413    
414 root 1.49 OK, so far, this works reasonably. In the real world, however, the person
415     configuring your application to run on a specific network (the end user
416     or network administrator) is often different to the person coding the
417     application.
418 root 1.28
419     Or to put it differently: the arguments passed to configure are usually
420 root 1.49 provided not by the programmer, but by whoever is deploying the program -
421     even in the example above, we would like to be able to just start senders
422     and receivers without having to patch the programs.
423 root 1.28
424     To make this easy, AnyEvent::MP supports a simple configuration database,
425     using profiles, which can be managed using the F<aemp> command-line
426 root 1.49 utility (yes, this section is about the advanced tinkering mentioned
427 root 1.30 before).
428 root 1.28
429     When you change both programs above to simply call
430    
431     configure;
432    
433     then AnyEvent::MP tries to look up a profile using the current node name
434     in its configuration database, falling back to some global default.
435    
436     You can run "generic" nodes using the F<aemp> utility as well, and we will
437     exploit this in the following way: we configure a profile "seed" and run
438     a node using it, whose sole purpose is to be a seed node for our example
439     programs.
440    
441     We bind the seed node to port 4040 on all interfaces:
442    
443 root 1.29 aemp profile seed binds "*:4040"
444 root 1.28
445     And we configure all nodes to use this as seed node (this only works when
446 root 1.51 running on the same host, for multiple machines you would replace the C<*>
447     by the IP address or hostname of the node running the seed), by changing
448     the global settings shared between all profiles:
449 root 1.28
450 root 1.49 aemp seeds "*:4040"
451 root 1.28
452     Then we run the seed node:
453    
454     aemp run profile seed
455    
456 root 1.49 After that, we can start as many other nodes as we want, and they will
457     all use our generic seed node to discover each other. The reason we can
458     start our existing programs even though they specify "incompatible"
459     parameters to C<configure> is that the configuration file (by default)
460     takes precedence over any arguments passed to C<configure>.
461 elmex 1.7
462 root 1.30 That's all for now - next we will teach you about monitoring by writing a
463     simple chat client and server :)
464    
465     =head1 PART 2: Monitoring, Supervising, Exception Handling and Recovery
466    
467     That's a mouthful, so what does it mean? Our previous example is what one
468     could call "very loosely coupled" - the sender doesn't care about whether
469     there are any receivers, and the receivers do not care if there is any
470     sender.
471    
472     This can work fine for simple services, but most real-world applications
473     want to ensure that the side they are expecting to be there is actually
474     there. Going one step further: most bigger real-world applications even
475     want to ensure that if some component is missing, or has crashed, it will
476     still be there, by recovering and restarting the service.
477    
478     AnyEvent::MP supports this by catching exceptions and network problems,
479 root 1.49 and notifying interested parties of these.
480 root 1.30
481 root 1.41 =head2 Exceptions, Port Context, Network Errors and Monitors
482 root 1.30
483     =head3 Exceptions
484    
485 root 1.49 Exceptions are handled on a per-port basis: all receive callbacks are
486     executed in a special context, the so-called I<port-context>: code
487     that throws an otherwise uncaught exception will cause the port to be
488     C<kil>led. Killed ports are destroyed automatically (killing ports is
489     actually the only way to free ports).
490 root 1.30
491 root 1.49 Ports can be monitored, even from a different node and host, and when a
492     port is killed, any entity monitoring it will be notified.
493 root 1.30
494     Here is a simple example:
495    
496     use AnyEvent::MP;
497    
498     # create a port, it always dies
499     my $port = port { die "oops" };
500    
501     # monitor it
502     mon $port, sub {
503     warn "$port was killed (with reason @_)";
504     };
505    
506     # now send it some message, causing it to die:
507     snd $port;
508    
509 root 1.49 AnyEvent->condvar->recv;
510    
511 root 1.30 It first creates a port whose only action is to throw an exception,
512     and the monitors it with the C<mon> function. Afterwards it sends it a
513     message, causing it to die and call the monitoring callback:
514    
515     anon/6WmIpj.a was killed (with reason die oops at xxx line 5.) at xxx line 9.
516    
517 root 1.49 The callback was actually passed two arguments: C<die>, to indicate it
518     did throw an I<exception> as opposed to, say, a network error, and the
519     exception message itself.
520 root 1.30
521     What happens when a port is killed before we have a chance to monitor
522     it? Granted, this is highly unlikely in our example, but when you program
523     in a network this can easily happen due to races between nodes.
524    
525     use AnyEvent::MP;
526    
527     my $port = port { die "oops" };
528    
529     snd $port;
530    
531     mon $port, sub {
532     warn "$port was killed (with reason @_)";
533     };
534    
535 root 1.49 AnyEvent->condvar->recv;
536    
537 root 1.51 This time we will get something else:
538 root 1.30
539 root 1.51 2012-03-21 00:50:36 <2> unmonitored local port fADb died with reason: die oops at - line 3.
540     anon/fADb was killed (with reason no_such_port cannot monitor nonexistent port)
541 root 1.30
542 root 1.51 The first line is a warning that is printed when a port dies that isn't
543     being monitored, because that is normally a bug. When later a C<mon> is
544     attempted, it is immediately killed, because the port is already gone. The
545     kill reason is now C<no_such_port> with some descriptive (we hope) error
546     message.
547 root 1.30
548 root 1.51 As you probably suspect from these examples, the kill reason is usually
549     some identifier as first argument and a human-readable error message as
550     second argument - all kill reasons by AnyEvent::MP itself follow this
551     pattern. But the kill reason can be anything: it is simply a list of
552     values you can choose yourself. It can even be nothing (an empty list) -
553     this is called a "normal" kill.
554    
555     Apart from die'ing, you can kill ports manually using the C<kil>
556     function. Using the C<kil> function will be treated like an error when a
557     non-empty reason is specified:
558 root 1.30
559 root 1.51 kil $port, custom_error => "don't like your steenking face";
560 root 1.30
561 root 1.51 And a I<normal> kill without any reason arguments:
562 root 1.30
563     kil $port;
564    
565     By now you probably wonder what this "normal" kill business is: A common
566     idiom is to not specify a callback to C<mon>, but another port, such as
567     C<$SELF>:
568    
569     mon $port, $SELF;
570    
571 root 1.51 This basically means "monitor $port and kill me when it crashes" - and
572     the thing is, a "normal" kill does not count as a crash. This way you can
573     easily link ports together and make them crash together on errors, while
574     allowing you to remove a port silently when it has done it's job properly.
575 root 1.30
576 root 1.34 =head3 Port Context
577    
578 root 1.51 Code runs in the so-called "port context". That means C<$SELF> contains
579     its own port ID and exceptions that the code throws will be caught.
580 root 1.34
581     Since AnyEvent::MP is event-based, it is not uncommon to register
582 root 1.51 callbacks from within C<rcv> handlers. As example, assume that the
583     following port receive handler wants to C<die> a second later, using
584     C<after>:
585 root 1.34
586     my $port = port {
587     after 1, sub { die "oops" };
588     };
589    
590 root 1.51 If you try this out, you would find it does not work - when the C<after>
591     callback is executed, it does not run in the port context anymore, so
592     exceptions will not be caught.
593 root 1.34
594 root 1.41 For these cases, AnyEvent::MP exports a special "closure constructor"
595 root 1.51 called C<psub>, which works mostly like perl's built-in C<sub>:
596 root 1.34
597     my $port = port {
598     after 1, psub { die "oops" };
599     };
600    
601 root 1.51 C<psub> remembers the port context and returns a code reference. When the
602     code reference is invoked, it will run the code block within the context
603     that it was created in, so exception handling once more works as expected.
604 root 1.34
605 root 1.49 There is even a way to temporarily execute code in the context of some
606 root 1.41 port, namely C<peval>:
607    
608     peval $port, sub {
609     # die'ing here will kil $port
610     };
611    
612     The C<peval> function temporarily replaces C<$SELF> by the given C<$port>
613     and then executes the given sub in a port context.
614    
615 root 1.30 =head3 Network Errors and the AEMP Guarantee
616    
617 root 1.52 Earlier we mentioned another important source of monitoring failures:
618     network problems. When a node loses connection to another node, it will
619     invoke all monitoring actions, just as if the port was killed, I<even if
620     it is possible that the port is still happily alive on another node> (not
621     being able to talk to a node means we have no clue what's going on with
622     it, it could be crashed, but also still running without knowing we lost
623     the connection).
624 root 1.30
625 root 1.52 So another way to view monitors is: "notify me when some of my messages
626 root 1.30 couldn't be delivered". AEMP has a guarantee about message delivery to a
627     port: After starting a monitor, any message sent to a port will either
628     be delivered, or, when it is lost, any further messages will also be lost
629 elmex 1.31 until the monitoring action is invoked. After that, further messages
630 root 1.30 I<might> get delivered again.
631    
632     This doesn't sound like a very big guarantee, but it is kind of the best
633 root 1.52 you can get while staying sane: Specifically, it means that there will be
634     no "holes" in the message sequence: all messages sent are delivered in
635     order, without any of them missing in between, and when some were lost,
636     you I<will> be notified of that, so you can take recovery action.
637 root 1.30
638 root 1.49 And, obviously, the guarantee only works in the presence of
639     correctly-working hardware, and no relevant bugs inside AEMP itself.
640    
641 root 1.30 =head3 Supervising
642    
643 root 1.49 OK, so how is this crashing-everything-stuff going to make applications
644 root 1.52 I<more> stable? Well, in fact, the goal is not really to make them
645     more stable, but to make them more resilient against actual errors
646     and crashes. And this is not done by crashing I<everything>, but by
647     crashing everything except a I<supervisor> that then cleans up and sgtarts
648     everything again.
649 root 1.30
650 elmex 1.31 A supervisor is simply some code that ensures that an application (or a
651 root 1.49 part of it) is running, and if it crashes, is restarted properly. That is,
652     it supervises a service by starting and restarting it, as necessary.
653 root 1.30
654     To show how to do all this we will create a simple chat server that can
655     handle many chat clients. Both server and clients can be killed and
656 root 1.49 restarted, and even crash, to some extent, without disturbing the chat
657     functionality.
658 root 1.30
659     =head2 Chatting, the Resilient Way
660    
661     Without further ado, here is the chat server (to run it, we assume the
662 root 1.49 set-up explained earlier, with a separate F<aemp run seed> node):
663 root 1.30
664     use common::sense;
665     use AnyEvent::MP;
666     use AnyEvent::MP::Global;
667    
668     configure;
669    
670     my %clients;
671    
672     sub msg {
673     print "relaying: $_[0]\n";
674     snd $_, $_[0]
675     for values %clients;
676     }
677    
678     our $server = port;
679    
680     rcv $server, join => sub {
681     my ($client, $nick) = @_;
682    
683     $clients{$client} = $client;
684    
685     mon $client, sub {
686     delete $clients{$client};
687     msg "$nick (quits, @_)";
688     };
689     msg "$nick (joins)";
690     };
691    
692     rcv $server, privmsg => sub {
693     my ($nick, $msg) = @_;
694     msg "$nick: $msg";
695     };
696    
697 root 1.49 db_set eg_chat_server => $server;
698 root 1.30
699     warn "server ready.\n";
700    
701     AnyEvent->condvar->recv;
702    
703 elmex 1.31 Looks like a lot, but it is actually quite simple: after your usual
704 root 1.30 preamble (this time we use common sense), we define a helper function that
705     sends some message to every registered chat client:
706    
707     sub msg {
708     print "relaying: $_[0]\n";
709     snd $_, $_[0]
710     for values %clients;
711     }
712    
713     The clients are stored in the hash C<%client>. Then we define a server
714     port and install two receivers on it, C<join>, which is sent by clients
715     to join the chat, and C<privmsg>, that clients use to send actual chat
716     messages.
717    
718     C<join> is most complicated. It expects the client port and the nickname
719     to be passed in the message, and registers the client in C<%clients>.
720    
721     rcv $server, join => sub {
722     my ($client, $nick) = @_;
723    
724     $clients{$client} = $client;
725    
726     The next step is to monitor the client. The monitoring action removes the
727     client and sends a quit message with the error to all remaining clients.
728    
729     mon $client, sub {
730     delete $clients{$client};
731     msg "$nick (quits, @_)";
732     };
733    
734     And finally, it creates a join message and sends it to all clients.
735    
736     msg "$nick (joins)";
737     };
738    
739     The C<privmsg> callback simply broadcasts the message to all clients:
740    
741     rcv $server, privmsg => sub {
742     my ($nick, $msg) = @_;
743     msg "$nick: $msg";
744     };
745    
746 elmex 1.31 And finally, the server registers itself in the server group, so that
747 root 1.30 clients can find it:
748    
749 root 1.52 db_set eg_chat_server => $server;
750 root 1.30
751     Well, well... and where is this supervisor stuff? Well... we cheated,
752     it's not there. To not overcomplicate the example, we only put it into
753     the..... CLIENT!
754    
755     =head3 The Client, and a Supervisor!
756    
757     Again, here is the client, including supervisor, which makes it a bit
758     longer:
759    
760     use common::sense;
761     use AnyEvent::MP;
762    
763 root 1.49 my $nick = shift || "anonymous";
764 root 1.30
765     configure;
766    
767     my ($client, $server);
768    
769     sub server_connect {
770 root 1.49 my $db_mon;
771     $db_mon = db_mon eg_chat_server => sub {
772     return unless %{ $_[0] };
773     undef $db_mon;
774    
775     print "\rconnecting...\n";
776    
777     $client = port { print "\r \r@_\n> " };
778     mon $client, sub {
779     print "\rdisconnected @_\n";
780     &server_connect;
781     };
782 root 1.30
783 root 1.49 $server = (keys %{ $_[0] })[0];
784 root 1.30
785 root 1.49 snd $server, join => $client, $nick;
786     mon $server, $client;
787 root 1.30 };
788     }
789    
790     server_connect;
791    
792 root 1.34 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
793 root 1.30 chomp (my $line = <STDIN>);
794     print "> ";
795     snd $server, privmsg => $nick, $line
796     if $server;
797     });
798    
799     $| = 1;
800     print "> ";
801     AnyEvent->condvar->recv;
802    
803     The first thing the client does is to store the nick name (which is
804     expected as the only command line argument) in C<$nick>, for further
805     usage.
806    
807     The next relevant thing is... finally... the supervisor:
808    
809     sub server_connect {
810 root 1.52 my $db_mon;
811     $db_mon = db_mon eg_chat_server => sub {
812     return unless %{ $_[0] };
813     undef $db_mon; # stop monitoring
814 root 1.30
815 root 1.52 This monitors the C<eg_chat_server> database family. It waits until a
816     chat server becomes available. When that happens, it "connects" to it
817     by creating a client port that receives and prints chat messages, and
818     monitoring it:
819 root 1.30
820     $client = port { print "\r \r@_\n> " };
821     mon $client, sub {
822     print "\rdisconnected @_\n";
823     &server_connect;
824     };
825    
826 root 1.52 If the client port dies (for whatever reason), the "supervisor" will start
827     looking for a server again - the semantics of C<db_mon> ensure that it
828     will immediately find it if there is a server port.
829    
830     After this, everything is ready: the client will send a C<join> message
831     with its local port to the server, and start monitoring it:
832    
833     $server = (keys %{ $_[0] })[0];
834 root 1.30
835     snd $server, join => $client, $nick;
836     mon $server, $client;
837     }
838    
839 root 1.52 This second monitor will ensure that, when the server port crashes or goes
840     away (e.g. due to network problems), the client port will be killed as
841     well. This tells the user that the client was disconnected, and will then
842     start to connect the server again.
843 root 1.30
844     The rest of the program deals with the boring details of actually invoking
845     the supervisor function to start the whole client process and handle the
846     actual terminal input, sending it to the server.
847    
848 root 1.52 Now... the "supervisor" in this example is a bit of a cheat - it doesn't
849     really clean up much (because the cleanup done by AnyEvent::MP suffices),
850     and there isn't much of a restarting action either - if the server isn't
851     there because it crashed, well, it isn't there.
852    
853     In the real world, one would often add a timeout that would trigger when
854     the server couldn't be found within some time limit, and then complain,
855     or even try to start a new server. Or the supervisor would have to do
856     some real cleanups, such as rolling back database transactions when the
857     database thread crashes. For this simple chat server, however, this simple
858     supervisor works fine. Hopefully future versions of AnyEvent::MP will
859     offer some predefined supervisors, for now you will have to code it on
860     your own.
861    
862 elmex 1.31 You should now try to start the server and one or more clients in different
863 root 1.30 terminal windows (and the seed node):
864    
865     perl eg/chat_client nick1
866     perl eg/chat_client nick2
867     perl eg/chat_server
868     aemp run profile seed
869    
870     And then you can experiment with chatting, killing one or more clients, or
871     stopping and restarting the server, to see the monitoring in action.
872    
873 root 1.33 The crucial point you should understand from this example is that
874     monitoring is usually symmetric: when you monitor some other port,
875     potentially on another node, that other port usually should monitor you,
876     too, so when the connection dies, both ports get killed, or at least both
877     sides can take corrective action. Exceptions are "servers" that serve
878     multiple clients at once and might only wish to clean up, and supervisors,
879     who of course should not normally get killed (unless they, too, have a
880     supervisor).
881    
882 root 1.52 If you often think in object-oriented terms, then you can think of a port
883     as an object: C<port> is the constructor, the receive callbacks set by
884     C<rcv> act as methods, the C<kil> function becomes the explicit destructor
885     and C<mon> installs a destructor hook. Unlike conventional object oriented
886     programming, it can make sense to exchange port IDs more freely (for
887     example, to monitor one port from another), because it is cheap to send
888     port IDs over the network, and AnyEvent::MP blurs the distinction between
889     local and remote ports.
890    
891     Lastly, there is ample room for improvement in this example: the server
892     should probably remember the nickname in the C<join> handler instead of
893     expecting it in every chat message, it should probably monitor itself, and
894     the client should not try to send any messages unless a server is actually
895     connected.
896 root 1.30
897     =head1 PART 3: TIMTOWTDI: Virtual Connections
898    
899 root 1.34 The chat system developed in the previous sections is very "traditional"
900     in a way: you start some server(s) and some clients statically and they
901     start talking to each other.
902    
903     Sometimes applications work more like "services": They can run on almost
904 root 1.52 any node and even talk to copies of themselves on other nodes in case they
905     are distributed. The L<AnyEvent::MP::Global> service for example monitors
906     nodes joining the network and sometimes even starts itself on other nodes.
907    
908     One good way to design such services is to put them into a module and
909     create "virtual connections" to other nodes. We call this the "bridge
910     head" method, because you start by I<creating a remote port> (the bridge
911 root 1.34 head) and from that you start to bootstrap your application.
912    
913 root 1.52 Since that sounds rather theoretical, let us redesign the chat server and
914 root 1.34 client using this design method.
915    
916 root 1.52 As usual, we start with the full program - here is the server:
917 root 1.34
918     use common::sense;
919     use AnyEvent::MP;
920    
921     configure;
922    
923 root 1.52 db_set eg_chat_server2 => $NODE;
924 root 1.34
925     my %clients;
926    
927     sub msg {
928     print "relaying: $_[0]\n";
929     snd $_, $_[0]
930     for values %clients;
931     }
932    
933     sub client_connect {
934     my ($client, $nick) = @_;
935    
936     mon $client;
937 root 1.52 mon $client, psub {
938 root 1.34 delete $clients{$client};
939     msg "$nick (quits, @_)";
940     };
941    
942     $clients{$client} = $client;
943    
944     msg "$nick (joins)";
945    
946     rcv $SELF, sub { msg "$nick: $_[0]" };
947     }
948    
949     warn "server ready.\n";
950    
951     AnyEvent->condvar->recv;
952    
953 root 1.39 It starts out not much different then the previous example, except that
954 root 1.52 this time, we register the node port in the database and not a port we
955     created - the clients only want to know which node the server should
956     be running on, and there can only be one such server (or service) per
957     node. In fact, the clients could also use some kind of election mechanism,
958     to find the node with lowest node ID, or lowest load, or something like
959     that.
960    
961     The much more interesting difference to the previous server is that
962     indeed no server port is created - the server consists only of code,
963     and "does" nothing by itself. All it "does" is to define a function
964     named C<client_connect>, which expects a client port and a nick name as
965     arguments. It then monitors the client port and binds a receive callback
966     on C<$SELF>, which expects messages that in turn are broadcast to all
967     clients.
968 root 1.34
969     The two C<mon> calls are a bit tricky - the first C<mon> is a shorthand
970     for C<mon $client, $SELF>. The second does the normal "client has gone
971 root 1.52 away" clean-up action.
972 root 1.34
973 root 1.52 The last line, the C<rcv $SELF>, is a good hint that something interesting
974     is going on. And indeed, when looking at the client code, you can see a
975     new function, C<spawn>:
976     #todo#
977 root 1.34
978     use common::sense;
979     use AnyEvent::MP;
980     use AnyEvent::MP::Global;
981    
982     my $nick = shift;
983    
984     configure;
985    
986     $| = 1;
987    
988     my $port = port;
989    
990     my ($client, $server);
991    
992     sub server_connect {
993 root 1.40 my $servernodes = grp_get "eg_chat_server2"
994 root 1.34 or return after 1, \&server_connect;
995    
996     print "\rconnecting...\n";
997    
998     $client = port { print "\r \r@_\n> " };
999     mon $client, sub {
1000     print "\rdisconnected @_\n";
1001     &server_connect;
1002     };
1003    
1004     $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
1005     mon $server, $client;
1006     }
1007    
1008     server_connect;
1009    
1010     my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
1011     chomp (my $line = <STDIN>);
1012     print "> ";
1013     snd $server, $line
1014     if $server;
1015     });
1016    
1017     print "> ";
1018     AnyEvent->condvar->recv;
1019    
1020     The client is quite similar to the previous one, but instead of contacting
1021 root 1.39 the server I<port> (which no longer exists), it C<spawn>s (creates) a new
1022     the server I<port on node>:
1023 root 1.34
1024     $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
1025     mon $server, $client;
1026    
1027 root 1.39 And of course the first thing after creating it is monitoring it.
1028 root 1.34
1029 root 1.52 Phew, let's go through this in slow motion: the C<spawn> function creates
1030     a new port on a remote node and returns its port ID. After creating
1031     the port it calls a function on the remote node, passing any remaining
1032     arguments to it, and - most importantly - executes the function within
1033     the context of the new port, so it can be manipulated by referring to
1034     C<$SELF>. The init function can reside in a module (actually it normally
1035     I<should> reside in a module) - AnyEvent::MP will automatically load the
1036     module if the function isn't defined.
1037 root 1.39
1038     The C<spawn> function returns immediately, which means you can instantly
1039 root 1.34 send messages to the port, long before the remote node has even heard
1040     of our request to create a port on it. In fact, the remote node might
1041     not even be running. Despite these troubling facts, everything should
1042     work just fine: if the node isn't running (or the init function throws an
1043     exception), then the monitor will trigger because the port doesn't exist.
1044    
1045     If the spawn message gets delivered, but the monitoring message is not
1046 root 1.39 because of network problems (extremely unlikely, but monitoring, after
1047     all, is implemented by passing a message, and messages can get lost), then
1048     this connection loss will eventually trigger the monitoring action. On the
1049     remote node (which in return monitors the client) the port will also be
1050     cleaned up on connection loss. When the remote node comes up again and our
1051     monitoring message can be delivered, it will instantly fail because the
1052     port has been cleaned up in the meantime.
1053 root 1.34
1054     If your head is spinning by now, that's fine - just keep in mind, after
1055 root 1.52 creating a port using C<spawn>, monitor it on the local node, and monitor
1056     "the other side" from the remote node, and all will be cleaned up just
1057     fine.
1058 root 1.34
1059 root 1.36 =head2 Services
1060 root 1.34
1061 root 1.53 Above it was mentioned that C<spawn> automatically loads modules. This can
1062     be exploited in various useful ways.
1063 root 1.36
1064     Assume for a moment you put the server into a file called
1065     F<mymod/chatserver.pm> reachable from the current directory. Then you
1066     could run a node there with:
1067    
1068     aemp run
1069    
1070     The other nodes could C<spawn> the server by using
1071 root 1.53 C<mymod::chatserver::client_connect> as init function - without any other
1072     configuration.
1073 root 1.36
1074 root 1.53 Likewise, when you have some service that starts automatically when loaded
1075     (similar to AnyEvent::MP::Global), then you can configure this service
1076     statically:
1077 root 1.36
1078     aemp profile mysrvnode services mymod::service::
1079     aemp run profile mysrvnode
1080    
1081 root 1.39 And the module will automatically be loaded in the node, as specifying a
1082 root 1.38 module name (with C<::>-suffix) will simply load the module, which is then
1083     free to do whatever it wants.
1084 root 1.36
1085     Of course, you can also do it in the much more standard way by writing
1086     a module (e.g. C<BK::Backend::IRC>), installing it as part of a module
1087 root 1.53 distribution and then configure nodes. For example, if I wanted to run the
1088 root 1.36 Bummskraut IRC backend on a machine named "ruth", I could do this:
1089    
1090     aemp profile ruth addservice BK::Backend::IRC::
1091    
1092 root 1.43 And any F<aemp run> on that host will automatically have the Bummskraut
1093     IRC backend running.
1094 root 1.36
1095 root 1.53 There are plenty of possibilities you can use - it's all up to you how you
1096 root 1.36 structure your application.
1097 elmex 1.7
1098 root 1.42 =head1 PART 4: Coro::MP - selective receive
1099    
1100     Not all problems lend themselves naturally to an event-based solution:
1101     sometimes things are easier if you can decide in what order you want to
1102 root 1.53 receive messages, regardless of the order in which they were sent.
1103 root 1.42
1104     In these cases, L<Coro::MP> can provide a nice solution: instead of
1105 root 1.53 registering callbacks for each message type, C<Coro::MP> attaches a
1106 root 1.42 (coro-) thread to a port. The thread can then opt to selectively receive
1107     messages it is interested in. Other messages are not lost, but queued, and
1108     can be received at a later time.
1109    
1110 root 1.43 The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate
1111 root 1.42 module. It is, however, tightly integrated into C<AnyEvent::MP> - the
1112     ports it creates are fully compatible to C<AnyEvent::MP> ports.
1113    
1114     In fact, C<Coro::MP> is more of an extension than a separate module: all
1115     functions exported by C<AnyEvent::MP> are exported by it as well.
1116    
1117     To illustrate how programing with C<Coro::MP> looks like, consider the
1118     following (slightly contrived) example: Let's implement a server that
1119     accepts a C<< (write_file =>, $port, $path) >> message with a (source)
1120     port and a filename, followed by as many C<< (data => $port, $data) >>
1121     messages as required to fill the file, followed by an empty C<< (data =>
1122     $port) >> message.
1123    
1124     The server only writes a single file at a time, other requests will stay
1125     in the queue until the current file has been finished.
1126    
1127     Here is an example implementation that uses L<Coro::AIO> and largely
1128     ignores error handling:
1129    
1130     my $ioserver = port_async {
1131     while () {
1132     my ($tag, $port, $path) = get_cond;
1133    
1134     $tag eq "write_file"
1135     or die "only write_file messages expected";
1136    
1137     my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1138     or die "$path: $!";
1139    
1140     while () {
1141     my (undef, undef, $data) = get_cond {
1142     $_[0] eq "data" && $_[1] eq $port
1143     } 5
1144     or die "timeout waiting for data message from $port\n";
1145    
1146     length $data or last;
1147    
1148     aio_write $fh, undef, undef, $data, 0;
1149     };
1150     }
1151     };
1152    
1153     mon $ioserver, sub {
1154     warn "ioserver was killed: @_\n";
1155     };
1156    
1157 root 1.53 Let's go through it, section by section.
1158 root 1.42
1159     my $ioserver = port_async {
1160    
1161 root 1.43 Ports can be created by attaching a thread to an existing port via
1162 root 1.53 C<rcv_async>, or as in this example, by calling C<port_async> with the
1163     code to execute as a thread. The C<async> component comes from the fact
1164     that threads are created using the C<Coro::async> function.
1165 root 1.42
1166     The thread runs in a normal port context (so C<$SELF> is set). In
1167     addition, when the thread returns, it will be C<kil> I<normally>, i.e.
1168     without a reason argument.
1169    
1170     while () {
1171     my ($tag, $port, $path) = get_cond;
1172     or die "only write_file messages expected";
1173    
1174 root 1.53 The thread is supposed to serve many file writes, which is why it
1175     executes in a loop. The first thing it does is fetch the next message,
1176     using C<get_cond>, the "conditional message get". Without arguments, it
1177     merely fetches the I<next> message from the queue, which I<must> be a
1178     C<write_file> message.
1179 root 1.42
1180     The message contains the C<$path> to the file, which is then created:
1181    
1182     my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1183     or die "$path: $!";
1184    
1185     Then we enter a loop again, to serve as many C<data> messages as
1186 root 1.43 necessary:
1187 root 1.42
1188     while () {
1189     my (undef, undef, $data) = get_cond {
1190     $_[0] eq "data" && $_[1] eq $port
1191     } 5
1192     or die "timeout waiting for data message from $port\n";
1193    
1194     This time, the condition is not empty, but instead a code block: similarly
1195     to grep, the code block will be called with C<@_> set to each message in
1196     the queue, and it has to return whether it wants to receive the message or
1197     not.
1198    
1199     In this case we are interested in C<data> messages (C<< $_[0] eq "data"
1200     >>), whose first element is the source port (C<< $_[1] eq $port >>).
1201    
1202     The condition must be this strict, as it is possible to receive both
1203     C<write_file> messages and C<data> messages from other ports while we
1204     handle the file writing.
1205    
1206 root 1.53 The lone C<5> argument at the end is a timeout - when no matching message
1207     is received within C<5> seconds, we assume an error and C<die>.
1208 root 1.42
1209     When an empty C<data> message is received we are done and can close the
1210     file (which is done automatically as C<$fh> goes out of scope):
1211    
1212     length $data or last;
1213    
1214     Otherwise we need to write the data:
1215    
1216     aio_write $fh, undef, undef, $data, 0;
1217    
1218 root 1.53 And that's basically it. Note that every port thread should have some
1219     kind of supervisor. In our case, the supervisor simply prints any error
1220     message:
1221 root 1.42
1222     mon $ioserver, sub {
1223     warn "ioserver was killed: @_\n";
1224     };
1225    
1226     Here is a usage example:
1227    
1228     port_async {
1229     snd $ioserver, write_file => $SELF, "/tmp/unsafe";
1230     snd $ioserver, data => $SELF, "abc\n";
1231     snd $ioserver, data => $SELF, "def\n";
1232     snd $ioserver, data => $SELF;
1233     };
1234    
1235     The messages are sent without any flow control or acknowledgement (feel
1236     free to improve). Also, the source port does not actually need to be a
1237     port - any unique ID will do - but port identifiers happen to be a simple
1238     source of network-wide unique IDs.
1239    
1240     Apart from C<get_cond> as seen above, there are other ways to receive
1241     messages. The C<write_file> message above could also selectively be
1242     received using a C<get> call:
1243    
1244     my ($port, $path) = get "write_file";
1245    
1246     This is simpler, but when some other code part sends an unexpected message
1247     to the C<$ioserver> it will stay in the queue forever. As a rule of thumb,
1248     every threaded port should have a "fetch next message unconditionally"
1249     somewhere, to avoid filling up the queue.
1250    
1251 root 1.53 Finally, it is also possible to use more switch-like C<get_conds>:
1252 root 1.42
1253     get_cond {
1254     $_[0] eq "msg1" and return sub {
1255     my (undef, @msg1_data) = @_;
1256     ...;
1257     };
1258    
1259     $_[0] eq "msg2" and return sub {
1260     my (undef, @msg2_data) = @_;
1261     ...;
1262     };
1263    
1264     die "unexpected message $_[0] received";
1265     };
1266    
1267 root 1.37 =head1 THE END
1268    
1269     This is the end of this introduction, but hopefully not the end of
1270 root 1.43 your career as AEMP user. I hope the tutorial was enough to make the
1271 root 1.37 basic concepts clear. Keep in mind that distributed programming is not
1272 root 1.53 completely trivial, in fact, it's pretty complicated. We hope AEMP makes
1273     it simpler and will be useful to create exciting new applications.
1274 root 1.37
1275 elmex 1.1 =head1 SEE ALSO
1276    
1277     L<AnyEvent::MP>
1278    
1279 elmex 1.20 L<AnyEvent::MP::Global>
1280    
1281 root 1.42 L<Coro::MP>
1282    
1283 root 1.34 L<AnyEvent>
1284    
1285 elmex 1.1 =head1 AUTHOR
1286    
1287     Robin Redeker <elmex@ta-sa.org>
1288 root 1.32 Marc Lehmann <schmorp@schmorp.de>
1289 root 1.4