ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Intro.pod
Revision: 1.59
Committed: Sat Mar 24 00:48:53 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.58: +6 -5 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.4 =head1 Message Passing for the Non-Blocked Mind
2 elmex 1.1
3 root 1.8 =head1 Introduction and Terminology
4 elmex 1.1
5 root 1.59 This is a tutorial about how to get the swing of the L<AnyEvent::MP>
6     module family, which allows processes to transparently pass messages to
7     itself and to other processes on the same or a different host.
8 elmex 1.1
9 root 1.23 What kind of messages? Basically a message here means a list of Perl
10 root 1.15 strings, numbers, hashes and arrays, anything that can be expressed as a
11 root 1.43 L<JSON> text (as JSON is the default serialiser in the protocol). Here are
12     two examples:
13 elmex 1.1
14 root 1.23 write_log => 1251555874, "action was successful.\n"
15     123, ["a", "b", "c"], { foo => "bar" }
16 elmex 1.21
17 root 1.23 When using L<AnyEvent::MP> it is customary to use a descriptive string as
18 root 1.46 first element of a message that indicates the type of the message. This
19 root 1.23 element is called a I<tag> in L<AnyEvent::MP>, as some API functions
20     (C<rcv>) support matching it directly.
21    
22 root 1.59 Supposedly you want to send some kind of ping message with your current
23     time to somewhere, this is how such a message might look like (in Perl
24     syntax):
25 root 1.23
26     ping => 1251381636
27    
28     Now that we know what a message is, to which entities are those
29     messages being I<passed>? They are I<passed> to I<ports>. A I<port> is
30     a destination for messages but also a context to execute code: when
31     a runtime error occurs while executing code belonging to a port, the
32     exception will be raised on the port and can even travel to interested
33     parties on other nodes, which makes supervision of distributed processes
34     easy.
35    
36     How do these ports relate to things you know? Each I<port> belongs
37     to a I<node>, and a I<node> is just the UNIX process that runs your
38     L<AnyEvent::MP> application.
39    
40     Each I<node> is distinguished from other I<nodes> running on the same or
41     another host in a network by its I<node ID>. A I<node ID> is simply a
42     unique string chosen manually or assigned by L<AnyEvent::MP> in some way
43     (UNIX nodename, random string...).
44    
45     Here is a diagram about how I<nodes>, I<ports> and UNIX processes relate
46     to each other. The setup consists of two nodes (more are of course
47     possible): Node C<A> (in UNIX process 7066) with the ports C<ABC> and
48     C<DEF>. And the node C<B> (in UNIX process 8321) with the ports C<FOO> and
49     C<BAR>.
50 elmex 1.17
51    
52     |- PID: 7066 -| |- PID: 8321 -|
53     | | | |
54     | Node ID: A | | Node ID: B |
55     | | | |
56     | Port ABC =|= <----\ /-----> =|= Port FOO |
57     | | X | |
58     | Port DEF =|= <----/ \-----> =|= Port BAR |
59     | | | |
60     |-------------| |-------------|
61    
62 root 1.23 The strings for the I<port IDs> here are just for illustrative
63     purposes: Even though I<ports> in L<AnyEvent::MP> are also identified by
64 root 1.43 strings, they can't be chosen manually and are assigned by the system
65 root 1.23 dynamically. These I<port IDs> are unique within a network and can also be
66 root 1.46 used to identify senders, or even as message tags for instance.
67 root 1.23
68     The next sections will explain the API of L<AnyEvent::MP> by going through
69     a few simple examples. Later some more complex idioms are introduced,
70     which are hopefully useful to solve some real world problems.
71 root 1.8
72 root 1.39 =head2 Passing Your First Message
73 elmex 1.16
74 root 1.46 For starters, let's have a look at the messaging API. The following
75     example is just a demo to show the basic elements of message passing with
76 root 1.24 L<AnyEvent::MP>.
77    
78     The example should print: C<Ending with: 123>, in a rather complicated
79     way, by passing some message to a port.
80 elmex 1.16
81     use AnyEvent;
82     use AnyEvent::MP;
83    
84     my $end_cv = AnyEvent->condvar;
85    
86     my $port = port;
87    
88     rcv $port, test => sub {
89     my ($data) = @_;
90     $end_cv->send ($data);
91     };
92    
93     snd $port, test => 123;
94    
95     print "Ending with: " . $end_cv->recv . "\n";
96    
97 root 1.24 It already uses most of the essential functions inside
98 root 1.46 L<AnyEvent::MP>: First there is the C<port> function which creates a
99 root 1.24 I<port> and will return it's I<port ID>, a simple string.
100    
101     This I<port ID> can be used to send messages to the port and install
102     handlers to receive messages on the port. Since it is a simple string
103     it can be safely passed to other I<nodes> in the network when you want
104     to refer to that specific port (usually used for RPC, where you need
105     to tell the other end which I<port> to send the reply to - messages in
106     L<AnyEvent::MP> have a destination, but no source).
107 elmex 1.17
108 root 1.24 The next function is C<rcv>:
109 elmex 1.16
110 elmex 1.17 rcv $port, test => sub { ... };
111 elmex 1.16
112 root 1.24 It installs a receiver callback on the I<port> that specified as the first
113     argument (it only works for "local" ports, i.e. ports created on the same
114     node). The next argument, in this example C<test>, specifies a I<tag> to
115     match. This means that whenever a message with the first element being
116     the string C<test> is received, the callback is called with the remaining
117 elmex 1.17 parts of that message.
118    
119 root 1.24 Messages can be sent with the C<snd> function, which is used like this in
120     the example above:
121 elmex 1.17
122     snd $port, test => 123;
123    
124 root 1.24 This will send the message C<'test', 123> to the I<port> with the I<port
125     ID> stored in C<$port>. Since in this case the receiver has a I<tag> match
126     on C<test> it will call the callback with the first argument being the
127     number C<123>.
128    
129 root 1.43 The callback is a typical AnyEvent idiom: the callback just passes
130 root 1.24 that number on to the I<condition variable> C<$end_cv> which will then
131     pass the value to the print. Condition variables are out of the scope
132     of this tutorial and not often used with ports, so please consult the
133 elmex 1.17 L<AnyEvent::Intro> about them.
134    
135 root 1.24 Passing messages inside just one process is boring. Before we can move on
136     and do interprocess message passing we first have to make sure some things
137     have been set up correctly for our nodes to talk to each other.
138 elmex 1.17
139 root 1.39 =head2 System Requirements and System Setup
140 elmex 1.17
141 root 1.25 Before we can start with real IPC we have to make sure some things work on
142     your system.
143 elmex 1.17
144 root 1.25 First we have to setup a I<shared secret>: for two L<AnyEvent::MP>
145     I<nodes> to be able to communicate with each other over the network it is
146     necessary to setup the same I<shared secret> for both of them, so they can
147     prove their trustworthyness to each other.
148 elmex 1.17
149     The easiest way is to set this up is to use the F<aemp> utility:
150    
151     aemp gensecret
152    
153 root 1.25 This creates a F<$HOME/.perl-anyevent-mp> config file and generates a
154     random shared secret. You can copy this file to any other system and
155     then communicate over the network (via TCP) with it. You can also select
156     your own shared secret (F<aemp setsecret>) and for increased security
157     requirements you can even create (or configure) a TLS certificate (F<aemp
158     gencert>), causing connections to not just be securely authenticated, but
159     also to be encrypted and protected against tinkering.
160    
161     Connections will only be successfully established when the I<nodes>
162     that want to connect to each other have the same I<shared secret> (or
163     successfully verify the TLS certificate of the other side, in which case
164     no shared secret is required).
165 elmex 1.17
166     B<If something does not work as expected, and for example tcpdump shows
167     that the connections are closed almost immediately, you should make sure
168     that F<~/.perl-anyevent-mp> is the same on all hosts/user accounts that
169     you try to connect with each other!>
170 elmex 1.16
171 root 1.25 Thats is all for now, you will find some more advanced fiddling with the
172     C<aemp> utility later.
173    
174 root 1.35 =head2 Shooting the Trouble
175    
176     Sometimes things go wrong, and AnyEvent::MP, being a professional module,
177 root 1.43 does not gratuitously spill out messages to your screen.
178 root 1.35
179     To help troubleshooting any issues, there are two environment variables
180 root 1.55 that you can set. The first, C<AE_VERBOSE> sets the logging level of
181 root 1.57 L<AnyEvent::Log>, which AnyEvent::MP uses. The default is C<4>, which
182 root 1.55 means nothing much is printed. You can increase it to C<8> or C<9> to get
183     more verbose output. This is example output when starting a node (somewhat
184     abridged to get shorter lines):
185    
186     2012-03-22 01:41:43.59 debug AE::Util: using Guard module to implement guards.
187     2012-03-22 01:41:43.62 debug AE::MP::Kernel: node cerebro/slwK2LEq7O starting up.
188     2012-03-22 01:41:43.62 debug AE::MP::Kernel: node listens on [10.0.0.1:52110].
189     2012-03-22 01:41:43.62 trace AE::MP::Kernel: trying connect to seed node 10.0.0.19:4040.
190 root 1.57 2012-03-22 01:41:43.66 trace AE::MP::Transport: 10.0.0.19:4040 connected as rain.
191     2012-03-22 01:41:43.66 info AE::MP::Kernel: rain is up.
192 root 1.55
193     A lot of info, but at least you can see that it does something. To only
194     get info about AnyEvent::MP, you can use C<AE_LOG=AnyEvent::MP=+log> in
195     your environment.
196 root 1.35
197     The other environment variable that can be useful is
198     C<PERL_ANYEVENT_MP_TRACE>, which, when set to a true value, will cause
199 root 1.46 most messages that are sent or received to be printed. For example, F<aemp
200     restart rijk> might output these message exchanges:
201 root 1.35
202 root 1.46 SND rijk <- [null,"eval","AnyEvent::Watchdog::Util::restart; ()","aemp/cerebro/z4kUPp2JT4#b"]
203     SND rain <- [null,"g_slave",{"'l":{"aemp/cerebro/z4kUPp2JT4":["10.0.0.1:48168"]}}]
204     SND rain <- [null,"g_find","rijk"]
205     RCV rain -> ["","g_found","rijk",["10.0.0.23:4040"]]
206     RCV rijk -> ["b",""]
207 elmex 1.18
208 root 1.30 =head1 PART 1: Passing Messages Between Processes
209 elmex 1.18
210     =head2 The Receiver
211    
212 root 1.25 Lets split the previous example up into two programs: one that contains
213     the sender and one for the receiver. First the receiver application, in
214     full:
215 elmex 1.18
216     use AnyEvent;
217     use AnyEvent::MP;
218    
219 root 1.45 configure nodeid => "eg_receiver/%u", binds => ["*:4040"];
220 elmex 1.18
221     my $port = port;
222 root 1.47 db_set eg_receivers => $port;
223 elmex 1.18
224     rcv $port, test => sub {
225     my ($data, $reply_port) = @_;
226    
227     print "Received data: " . $data . "\n";
228     };
229    
230     AnyEvent->condvar->recv;
231    
232 root 1.51 Now, that wasn't too bad, was it? OK, let's go through the new functions
233 root 1.47 that have been used.
234 elmex 1.18
235 root 1.44 =head3 C<configure> and Joining and Maintaining the Network
236 elmex 1.18
237 root 1.47 First let's have a look at C<configure>:
238 elmex 1.18
239 root 1.47 configure nodeid => "eg_receiver/%u", binds => ["*:4040"];
240 elmex 1.18
241 root 1.58 Before we are able to send messages to other nodes we have to configure
242     the node to become a "networked node". Configuring a node means naming
243     the node and binding some TCP listeners so that other nodes can contact
244     it. The choice on whether a process becomes a networked node or not must
245     be done before doing anything else with AnyEvent::MP.
246 root 1.47
247 root 1.58 Additionally, to actually link all nodes in a network together, you should
248 root 1.47 specify a number of seed addresses, which will be used by the node to
249     connect itself into an existing network, as we will see shortly.
250 root 1.26
251 root 1.58 All of this info (and more) can be passed to the C<configure> function -
252     later we will see how we can do all this without even passing anything to
253 root 1.28 C<configure>!
254    
255 root 1.58 Back to the function call in the program: the first parameter, C<nodeid>,
256     specified the node ID (in this case C<eg_receiver/%u> - the default is to
257     use the node name of the current host plus C</%u>, which goves the node a
258     name with a random suffix to make it unique, but for this example we want
259     the node to have a bit more personality, and name it C<eg_receiver> with a
260     random suffix.
261 root 1.47
262     Why the random suffix? Node IDs need to be unique within the network and
263     appending a random suffix is the easiest way to do that.
264 root 1.28
265     The second parameter, C<binds>, specifies a list of C<address:port> pairs
266     to bind TCP listeners on. The special "address" of C<*> means to bind on
267 root 1.47 every local IP address (this might not work on every OS, so explicit IP
268     addresses are best).
269 root 1.28
270     The reason to bind on a TCP port is not just that other nodes can connect
271     to us: if no binds are specified, the node will still bind on a dynamic
272     port on all local addresses - but in this case we won't know the port, and
273     cannot tell other nodes to connect to it as seed node.
274    
275 root 1.47 Now, a I<seed> is simply the TCP address of some other node in the
276     network, often the same string as used for the C<binds> parameter of the
277     other node. The need for seeds is easy to explain: I<somehow> the nodes
278     of an aemp network have to find each other, and often this means over the
279     internet. So broadcasts are out.
280    
281 root 1.56 Instead, a node usually specifies the addresses of one or few (for
282     redundancy) other nodes, some of which should be up. Two nodes can set
283     each other as seeds without any issues. You could even specify all nodes
284     as seeds for all nodes, for total redundancy. But the common case is to
285     have some more or less central, stable servers running seed services for
286     other nodes.
287 root 1.47
288     All you need to do to ensure that an AnyEvent::MP network connects
289 root 1.54 together is to make sure that all seed nodes are connected together via
290     their seed connections, i.e., all connections from seed nodes to I<their>
291 root 1.56 seed nodes form a connected graph.
292 root 1.47
293     A node tries to keep connections open to all of it's seed nodes at all
294     times, while other connections are made on demand only.
295    
296 root 1.56 The simplest way to do that would be for all nodes to use the same seed
297     nodes: seed nodes would seed each other, and all other nodes would connect
298     to the seed nodes.
299    
300 root 1.47 All of this ensures that the network stays one network - even if all the
301     nodes in one half of the net are separated from the nodes in the other
302     half by some network problem, once that is over, they will eventually
303     become a single network again.
304    
305     In addition to creating the network, a node also expects the seed nodes to
306 root 1.54 run the shared database service - if need be, by automatically starting
307     it, so you don't normally need to configure this explicitly.
308 root 1.47
309     The process of joining a network takes time, during which the node
310     is already running. This means it takes time until the node is
311     fully connected, and information about services in the network are
312 root 1.54 available. This is why most AnyEvent::MP programs either just register
313     themselves in the database and wait to be "found" by others, or they start
314     to monitor the database until some nodes of the required type show up.
315 root 1.47
316     We will see how this is done later, in the sender program.
317 elmex 1.19
318 root 1.28 =head3 Registering the Receiver
319 elmex 1.19
320 root 1.47 Coming back to our example, after the node has been configured for network
321     access, it is time to publish some service, namely the receive service.
322 elmex 1.19
323 root 1.47 For that, let's look at the next lines:
324 elmex 1.19
325     my $port = port;
326 root 1.47 db_set eg_receivers => $port;
327 elmex 1.19
328 root 1.27 The C<port> function has already been discussed. It simply creates a new
329 root 1.51 I<port> and returns the I<port ID>. The C<db_set> function, however, is
330 root 1.47 new: The first argument is the name of a I<database family> and the second
331     argument is the name of a I<subkey> within that family. The third argument
332     would be the I<value> to be associated with the family and subkey, but,
333     since it is missing, it will simply be C<undef>.
334    
335 root 1.51 What is a "family" you wonder? Well, AnyEvent::MP comes with a distributed
336     database. This database runs on so-called "global" nodes, which usually
337     are the seed nodes of your network. The database structure is "simply" a
338     hash of hashes of values.
339 root 1.47
340 root 1.51 To illustrate this with Perl syntax, assume the database was stored in
341     C<%DB>, then the C<db_set> function more or less would do this:
342 root 1.47
343     $DB{eg_receivers}{$port} = undef;
344    
345     So the ominous "family" selects a hash in the database, and the "subkey"
346 root 1.51 is simply the key in this hash - C<db_set> very much works like this
347 root 1.47 assignment.
348    
349     The family namespace is shared by all nodes in a network, so the names
350     should be reasonably unique, for example, they could start with the name
351 root 1.48 of your module, or the name of the program, using your port name or node
352     name as subkey.
353 root 1.27
354 root 1.47 The purpose behind adding this key to the database is that the sender can
355     look it up and find our port. We will shortly see how.
356 root 1.27
357     The last step in the example is to set up a receiver callback for those
358     messages, just as was discussed in the first example. We again match
359     for the tag C<test>. The difference is that this time we don't exit the
360     application after receiving the first message. Instead we continue to wait
361     for new messages indefinitely.
362 elmex 1.19
363 elmex 1.20 =head2 The Sender
364 root 1.8
365 root 1.48 OK, now let's take a look at the sender code:
366 root 1.4
367 elmex 1.1 use AnyEvent;
368     use AnyEvent::MP;
369    
370 root 1.45 configure nodeid => "eg_sender/%u", seeds => ["*:4040"];
371 elmex 1.1
372 root 1.47 my $guard = db_mon eg_receivers => sub {
373 root 1.50 my ($family, $a, $c, $d) = @_;
374 root 1.47 return unless %$family;
375    
376     # now there are some receivers, send them a message
377 root 1.50 snd $_ => test => time
378 root 1.47 for keys %$family;
379     };
380 elmex 1.1
381     AnyEvent->condvar->recv;
382    
383 root 1.28 It's even less code. The C<configure> serves the same purpose as in the
384 root 1.48 receiver, but instead of specifying binds we specify a list of seeds - the
385     only seed happens to be the same as the bind used by the receiver, which
386 root 1.47 therefore becomes our seed node.
387 root 1.27
388 root 1.48 Remember the part about having to wait till things become available? Well,
389     after configure returns, nothing has been done yet - the node is not
390     connected to the network, knows nothing about the database contents, and
391     it can take ages (for a computer :) for this situation to change.
392 root 1.47
393     Therefore, the sender waits, in this case by using the C<db_mon>
394     function. This function registers an interest in a specific database
395 root 1.48 family (in this case C<eg_receivers>). Each time something inside the
396     family changes (a key is added, changed or deleted), it will call our
397     callback with the family hash as first argument, and the list of keys as
398     second argument.
399    
400     The callback only checks whether the C<%$family> has is empty - if it is,
401     then it doesn't do anything. But eventually the family will contain the
402     port subkey we set in the sender. Then it will send a message to it (and
403     any other receiver in the same family). Likewise, should the receiver go
404     away and come back, or should another receiver come up, it will again send
405     a message to all of them.
406 root 1.47
407     You can experiment by having multiple receivers - you have to change the
408     "binds" parameter in the receiver to the seeds used in the sender to start
409     up additional receivers, but then you can start as many as you like. If
410     you specify proper IP addresses for the seeds, you can even run them on
411     different computers.
412    
413     Each time you start the sender, it will send a message to all receivers it
414 root 1.48 finds (you have to interrupt it manually afterwards).
415 root 1.47
416 root 1.51 Additional experiments you could try include using
417     C<PERL_ANYEVENT_MP_TRACE=1> to see which messages are exchanged, or
418     starting the sender before the receiver and see how long it then takes to
419     find the receiver.
420 root 1.27
421 root 1.28 =head3 Splitting Network Configuration and Application Code
422    
423 root 1.49 OK, so far, this works reasonably. In the real world, however, the person
424     configuring your application to run on a specific network (the end user
425     or network administrator) is often different to the person coding the
426     application.
427 root 1.28
428     Or to put it differently: the arguments passed to configure are usually
429 root 1.49 provided not by the programmer, but by whoever is deploying the program -
430     even in the example above, we would like to be able to just start senders
431     and receivers without having to patch the programs.
432 root 1.28
433     To make this easy, AnyEvent::MP supports a simple configuration database,
434     using profiles, which can be managed using the F<aemp> command-line
435 root 1.49 utility (yes, this section is about the advanced tinkering mentioned
436 root 1.30 before).
437 root 1.28
438     When you change both programs above to simply call
439    
440     configure;
441    
442     then AnyEvent::MP tries to look up a profile using the current node name
443     in its configuration database, falling back to some global default.
444    
445     You can run "generic" nodes using the F<aemp> utility as well, and we will
446     exploit this in the following way: we configure a profile "seed" and run
447     a node using it, whose sole purpose is to be a seed node for our example
448     programs.
449    
450     We bind the seed node to port 4040 on all interfaces:
451    
452 root 1.29 aemp profile seed binds "*:4040"
453 root 1.28
454     And we configure all nodes to use this as seed node (this only works when
455 root 1.51 running on the same host, for multiple machines you would replace the C<*>
456     by the IP address or hostname of the node running the seed), by changing
457     the global settings shared between all profiles:
458 root 1.28
459 root 1.49 aemp seeds "*:4040"
460 root 1.28
461     Then we run the seed node:
462    
463     aemp run profile seed
464    
465 root 1.49 After that, we can start as many other nodes as we want, and they will
466     all use our generic seed node to discover each other. The reason we can
467     start our existing programs even though they specify "incompatible"
468     parameters to C<configure> is that the configuration file (by default)
469     takes precedence over any arguments passed to C<configure>.
470 elmex 1.7
471 root 1.30 That's all for now - next we will teach you about monitoring by writing a
472     simple chat client and server :)
473    
474     =head1 PART 2: Monitoring, Supervising, Exception Handling and Recovery
475    
476     That's a mouthful, so what does it mean? Our previous example is what one
477     could call "very loosely coupled" - the sender doesn't care about whether
478     there are any receivers, and the receivers do not care if there is any
479     sender.
480    
481     This can work fine for simple services, but most real-world applications
482     want to ensure that the side they are expecting to be there is actually
483     there. Going one step further: most bigger real-world applications even
484     want to ensure that if some component is missing, or has crashed, it will
485     still be there, by recovering and restarting the service.
486    
487     AnyEvent::MP supports this by catching exceptions and network problems,
488 root 1.49 and notifying interested parties of these.
489 root 1.30
490 root 1.41 =head2 Exceptions, Port Context, Network Errors and Monitors
491 root 1.30
492     =head3 Exceptions
493    
494 root 1.49 Exceptions are handled on a per-port basis: all receive callbacks are
495     executed in a special context, the so-called I<port-context>: code
496     that throws an otherwise uncaught exception will cause the port to be
497     C<kil>led. Killed ports are destroyed automatically (killing ports is
498     actually the only way to free ports).
499 root 1.30
500 root 1.49 Ports can be monitored, even from a different node and host, and when a
501     port is killed, any entity monitoring it will be notified.
502 root 1.30
503     Here is a simple example:
504    
505     use AnyEvent::MP;
506    
507     # create a port, it always dies
508     my $port = port { die "oops" };
509    
510     # monitor it
511     mon $port, sub {
512     warn "$port was killed (with reason @_)";
513     };
514    
515     # now send it some message, causing it to die:
516     snd $port;
517    
518 root 1.49 AnyEvent->condvar->recv;
519    
520 root 1.30 It first creates a port whose only action is to throw an exception,
521     and the monitors it with the C<mon> function. Afterwards it sends it a
522     message, causing it to die and call the monitoring callback:
523    
524     anon/6WmIpj.a was killed (with reason die oops at xxx line 5.) at xxx line 9.
525    
526 root 1.49 The callback was actually passed two arguments: C<die>, to indicate it
527     did throw an I<exception> as opposed to, say, a network error, and the
528     exception message itself.
529 root 1.30
530     What happens when a port is killed before we have a chance to monitor
531     it? Granted, this is highly unlikely in our example, but when you program
532     in a network this can easily happen due to races between nodes.
533    
534     use AnyEvent::MP;
535    
536     my $port = port { die "oops" };
537    
538     snd $port;
539    
540     mon $port, sub {
541     warn "$port was killed (with reason @_)";
542     };
543    
544 root 1.49 AnyEvent->condvar->recv;
545    
546 root 1.51 This time we will get something else:
547 root 1.30
548 root 1.51 2012-03-21 00:50:36 <2> unmonitored local port fADb died with reason: die oops at - line 3.
549     anon/fADb was killed (with reason no_such_port cannot monitor nonexistent port)
550 root 1.30
551 root 1.57 The first line is an error message that is printed when a port dies that
552     isn't being monitored, because that is normally a bug. When later a C<mon>
553     is attempted, it is immediately killed, because the port is already
554     gone. The kill reason is now C<no_such_port> with some descriptive (we
555     hope) error message.
556 root 1.30
557 root 1.51 As you probably suspect from these examples, the kill reason is usually
558     some identifier as first argument and a human-readable error message as
559     second argument - all kill reasons by AnyEvent::MP itself follow this
560     pattern. But the kill reason can be anything: it is simply a list of
561     values you can choose yourself. It can even be nothing (an empty list) -
562     this is called a "normal" kill.
563    
564     Apart from die'ing, you can kill ports manually using the C<kil>
565     function. Using the C<kil> function will be treated like an error when a
566     non-empty reason is specified:
567 root 1.30
568 root 1.51 kil $port, custom_error => "don't like your steenking face";
569 root 1.30
570 root 1.51 And a I<normal> kill without any reason arguments:
571 root 1.30
572     kil $port;
573    
574     By now you probably wonder what this "normal" kill business is: A common
575     idiom is to not specify a callback to C<mon>, but another port, such as
576     C<$SELF>:
577    
578     mon $port, $SELF;
579    
580 root 1.51 This basically means "monitor $port and kill me when it crashes" - and
581     the thing is, a "normal" kill does not count as a crash. This way you can
582     easily link ports together and make them crash together on errors, while
583     allowing you to remove a port silently when it has done it's job properly.
584 root 1.30
585 root 1.34 =head3 Port Context
586    
587 root 1.51 Code runs in the so-called "port context". That means C<$SELF> contains
588     its own port ID and exceptions that the code throws will be caught.
589 root 1.34
590     Since AnyEvent::MP is event-based, it is not uncommon to register
591 root 1.51 callbacks from within C<rcv> handlers. As example, assume that the
592     following port receive handler wants to C<die> a second later, using
593     C<after>:
594 root 1.34
595     my $port = port {
596     after 1, sub { die "oops" };
597     };
598    
599 root 1.51 If you try this out, you would find it does not work - when the C<after>
600     callback is executed, it does not run in the port context anymore, so
601     exceptions will not be caught.
602 root 1.34
603 root 1.41 For these cases, AnyEvent::MP exports a special "closure constructor"
604 root 1.51 called C<psub>, which works mostly like perl's built-in C<sub>:
605 root 1.34
606     my $port = port {
607     after 1, psub { die "oops" };
608     };
609    
610 root 1.51 C<psub> remembers the port context and returns a code reference. When the
611     code reference is invoked, it will run the code block within the context
612     that it was created in, so exception handling once more works as expected.
613 root 1.34
614 root 1.49 There is even a way to temporarily execute code in the context of some
615 root 1.41 port, namely C<peval>:
616    
617     peval $port, sub {
618     # die'ing here will kil $port
619     };
620    
621     The C<peval> function temporarily replaces C<$SELF> by the given C<$port>
622     and then executes the given sub in a port context.
623    
624 root 1.30 =head3 Network Errors and the AEMP Guarantee
625    
626 root 1.52 Earlier we mentioned another important source of monitoring failures:
627     network problems. When a node loses connection to another node, it will
628     invoke all monitoring actions, just as if the port was killed, I<even if
629     it is possible that the port is still happily alive on another node> (not
630     being able to talk to a node means we have no clue what's going on with
631     it, it could be crashed, but also still running without knowing we lost
632     the connection).
633 root 1.30
634 root 1.52 So another way to view monitors is: "notify me when some of my messages
635 root 1.30 couldn't be delivered". AEMP has a guarantee about message delivery to a
636     port: After starting a monitor, any message sent to a port will either
637     be delivered, or, when it is lost, any further messages will also be lost
638 elmex 1.31 until the monitoring action is invoked. After that, further messages
639 root 1.30 I<might> get delivered again.
640    
641     This doesn't sound like a very big guarantee, but it is kind of the best
642 root 1.52 you can get while staying sane: Specifically, it means that there will be
643     no "holes" in the message sequence: all messages sent are delivered in
644     order, without any of them missing in between, and when some were lost,
645     you I<will> be notified of that, so you can take recovery action.
646 root 1.30
647 root 1.49 And, obviously, the guarantee only works in the presence of
648     correctly-working hardware, and no relevant bugs inside AEMP itself.
649    
650 root 1.30 =head3 Supervising
651    
652 root 1.49 OK, so how is this crashing-everything-stuff going to make applications
653 root 1.52 I<more> stable? Well, in fact, the goal is not really to make them
654     more stable, but to make them more resilient against actual errors
655     and crashes. And this is not done by crashing I<everything>, but by
656     crashing everything except a I<supervisor> that then cleans up and sgtarts
657     everything again.
658 root 1.30
659 elmex 1.31 A supervisor is simply some code that ensures that an application (or a
660 root 1.49 part of it) is running, and if it crashes, is restarted properly. That is,
661     it supervises a service by starting and restarting it, as necessary.
662 root 1.30
663     To show how to do all this we will create a simple chat server that can
664     handle many chat clients. Both server and clients can be killed and
665 root 1.49 restarted, and even crash, to some extent, without disturbing the chat
666     functionality.
667 root 1.30
668     =head2 Chatting, the Resilient Way
669    
670     Without further ado, here is the chat server (to run it, we assume the
671 root 1.49 set-up explained earlier, with a separate F<aemp run seed> node):
672 root 1.30
673     use common::sense;
674     use AnyEvent::MP;
675    
676     configure;
677    
678     my %clients;
679    
680     sub msg {
681     print "relaying: $_[0]\n";
682     snd $_, $_[0]
683     for values %clients;
684     }
685    
686     our $server = port;
687    
688     rcv $server, join => sub {
689     my ($client, $nick) = @_;
690    
691     $clients{$client} = $client;
692    
693     mon $client, sub {
694     delete $clients{$client};
695     msg "$nick (quits, @_)";
696     };
697     msg "$nick (joins)";
698     };
699    
700     rcv $server, privmsg => sub {
701     my ($nick, $msg) = @_;
702     msg "$nick: $msg";
703     };
704    
705 root 1.49 db_set eg_chat_server => $server;
706 root 1.30
707     warn "server ready.\n";
708    
709     AnyEvent->condvar->recv;
710    
711 elmex 1.31 Looks like a lot, but it is actually quite simple: after your usual
712 root 1.30 preamble (this time we use common sense), we define a helper function that
713     sends some message to every registered chat client:
714    
715     sub msg {
716     print "relaying: $_[0]\n";
717     snd $_, $_[0]
718     for values %clients;
719     }
720    
721     The clients are stored in the hash C<%client>. Then we define a server
722     port and install two receivers on it, C<join>, which is sent by clients
723     to join the chat, and C<privmsg>, that clients use to send actual chat
724     messages.
725    
726     C<join> is most complicated. It expects the client port and the nickname
727     to be passed in the message, and registers the client in C<%clients>.
728    
729     rcv $server, join => sub {
730     my ($client, $nick) = @_;
731    
732     $clients{$client} = $client;
733    
734     The next step is to monitor the client. The monitoring action removes the
735     client and sends a quit message with the error to all remaining clients.
736    
737     mon $client, sub {
738     delete $clients{$client};
739     msg "$nick (quits, @_)";
740     };
741    
742     And finally, it creates a join message and sends it to all clients.
743    
744     msg "$nick (joins)";
745     };
746    
747     The C<privmsg> callback simply broadcasts the message to all clients:
748    
749     rcv $server, privmsg => sub {
750     my ($nick, $msg) = @_;
751     msg "$nick: $msg";
752     };
753    
754 elmex 1.31 And finally, the server registers itself in the server group, so that
755 root 1.30 clients can find it:
756    
757 root 1.52 db_set eg_chat_server => $server;
758 root 1.30
759     Well, well... and where is this supervisor stuff? Well... we cheated,
760     it's not there. To not overcomplicate the example, we only put it into
761     the..... CLIENT!
762    
763     =head3 The Client, and a Supervisor!
764    
765     Again, here is the client, including supervisor, which makes it a bit
766     longer:
767    
768     use common::sense;
769     use AnyEvent::MP;
770    
771 root 1.49 my $nick = shift || "anonymous";
772 root 1.30
773     configure;
774    
775     my ($client, $server);
776    
777     sub server_connect {
778 root 1.49 my $db_mon;
779     $db_mon = db_mon eg_chat_server => sub {
780     return unless %{ $_[0] };
781     undef $db_mon;
782    
783     print "\rconnecting...\n";
784    
785     $client = port { print "\r \r@_\n> " };
786     mon $client, sub {
787     print "\rdisconnected @_\n";
788     &server_connect;
789     };
790 root 1.30
791 root 1.49 $server = (keys %{ $_[0] })[0];
792 root 1.30
793 root 1.49 snd $server, join => $client, $nick;
794     mon $server, $client;
795 root 1.30 };
796     }
797    
798     server_connect;
799    
800 root 1.34 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
801 root 1.30 chomp (my $line = <STDIN>);
802     print "> ";
803     snd $server, privmsg => $nick, $line
804     if $server;
805     });
806    
807     $| = 1;
808     print "> ";
809     AnyEvent->condvar->recv;
810    
811     The first thing the client does is to store the nick name (which is
812     expected as the only command line argument) in C<$nick>, for further
813     usage.
814    
815     The next relevant thing is... finally... the supervisor:
816    
817     sub server_connect {
818 root 1.52 my $db_mon;
819     $db_mon = db_mon eg_chat_server => sub {
820     return unless %{ $_[0] };
821     undef $db_mon; # stop monitoring
822 root 1.30
823 root 1.52 This monitors the C<eg_chat_server> database family. It waits until a
824     chat server becomes available. When that happens, it "connects" to it
825     by creating a client port that receives and prints chat messages, and
826     monitoring it:
827 root 1.30
828     $client = port { print "\r \r@_\n> " };
829     mon $client, sub {
830     print "\rdisconnected @_\n";
831     &server_connect;
832     };
833    
834 root 1.52 If the client port dies (for whatever reason), the "supervisor" will start
835     looking for a server again - the semantics of C<db_mon> ensure that it
836     will immediately find it if there is a server port.
837    
838     After this, everything is ready: the client will send a C<join> message
839     with its local port to the server, and start monitoring it:
840    
841     $server = (keys %{ $_[0] })[0];
842 root 1.30
843     snd $server, join => $client, $nick;
844     mon $server, $client;
845     }
846    
847 root 1.52 This second monitor will ensure that, when the server port crashes or goes
848     away (e.g. due to network problems), the client port will be killed as
849     well. This tells the user that the client was disconnected, and will then
850     start to connect the server again.
851 root 1.30
852     The rest of the program deals with the boring details of actually invoking
853     the supervisor function to start the whole client process and handle the
854     actual terminal input, sending it to the server.
855    
856 root 1.52 Now... the "supervisor" in this example is a bit of a cheat - it doesn't
857     really clean up much (because the cleanup done by AnyEvent::MP suffices),
858     and there isn't much of a restarting action either - if the server isn't
859     there because it crashed, well, it isn't there.
860    
861     In the real world, one would often add a timeout that would trigger when
862     the server couldn't be found within some time limit, and then complain,
863     or even try to start a new server. Or the supervisor would have to do
864     some real cleanups, such as rolling back database transactions when the
865     database thread crashes. For this simple chat server, however, this simple
866     supervisor works fine. Hopefully future versions of AnyEvent::MP will
867     offer some predefined supervisors, for now you will have to code it on
868     your own.
869    
870 elmex 1.31 You should now try to start the server and one or more clients in different
871 root 1.30 terminal windows (and the seed node):
872    
873     perl eg/chat_client nick1
874     perl eg/chat_client nick2
875     perl eg/chat_server
876     aemp run profile seed
877    
878     And then you can experiment with chatting, killing one or more clients, or
879     stopping and restarting the server, to see the monitoring in action.
880    
881 root 1.33 The crucial point you should understand from this example is that
882     monitoring is usually symmetric: when you monitor some other port,
883     potentially on another node, that other port usually should monitor you,
884     too, so when the connection dies, both ports get killed, or at least both
885     sides can take corrective action. Exceptions are "servers" that serve
886     multiple clients at once and might only wish to clean up, and supervisors,
887     who of course should not normally get killed (unless they, too, have a
888     supervisor).
889    
890 root 1.52 If you often think in object-oriented terms, then you can think of a port
891     as an object: C<port> is the constructor, the receive callbacks set by
892     C<rcv> act as methods, the C<kil> function becomes the explicit destructor
893     and C<mon> installs a destructor hook. Unlike conventional object oriented
894     programming, it can make sense to exchange port IDs more freely (for
895     example, to monitor one port from another), because it is cheap to send
896     port IDs over the network, and AnyEvent::MP blurs the distinction between
897     local and remote ports.
898    
899     Lastly, there is ample room for improvement in this example: the server
900     should probably remember the nickname in the C<join> handler instead of
901     expecting it in every chat message, it should probably monitor itself, and
902     the client should not try to send any messages unless a server is actually
903     connected.
904 root 1.30
905     =head1 PART 3: TIMTOWTDI: Virtual Connections
906    
907 root 1.34 The chat system developed in the previous sections is very "traditional"
908     in a way: you start some server(s) and some clients statically and they
909     start talking to each other.
910    
911     Sometimes applications work more like "services": They can run on almost
912 root 1.52 any node and even talk to copies of themselves on other nodes in case they
913     are distributed. The L<AnyEvent::MP::Global> service for example monitors
914     nodes joining the network and sometimes even starts itself on other nodes.
915    
916     One good way to design such services is to put them into a module and
917     create "virtual connections" to other nodes. We call this the "bridge
918     head" method, because you start by I<creating a remote port> (the bridge
919 root 1.34 head) and from that you start to bootstrap your application.
920    
921 root 1.52 Since that sounds rather theoretical, let us redesign the chat server and
922 root 1.34 client using this design method.
923    
924 root 1.52 As usual, we start with the full program - here is the server:
925 root 1.34
926     use common::sense;
927     use AnyEvent::MP;
928    
929     configure;
930    
931 root 1.52 db_set eg_chat_server2 => $NODE;
932 root 1.34
933     my %clients;
934    
935     sub msg {
936     print "relaying: $_[0]\n";
937     snd $_, $_[0]
938     for values %clients;
939     }
940    
941     sub client_connect {
942     my ($client, $nick) = @_;
943    
944     mon $client;
945 root 1.52 mon $client, psub {
946 root 1.34 delete $clients{$client};
947     msg "$nick (quits, @_)";
948     };
949    
950     $clients{$client} = $client;
951    
952     msg "$nick (joins)";
953    
954     rcv $SELF, sub { msg "$nick: $_[0]" };
955     }
956    
957     warn "server ready.\n";
958    
959     AnyEvent->condvar->recv;
960    
961 root 1.39 It starts out not much different then the previous example, except that
962 root 1.52 this time, we register the node port in the database and not a port we
963     created - the clients only want to know which node the server should
964     be running on, and there can only be one such server (or service) per
965     node. In fact, the clients could also use some kind of election mechanism,
966     to find the node with lowest node ID, or lowest load, or something like
967     that.
968    
969     The much more interesting difference to the previous server is that
970     indeed no server port is created - the server consists only of code,
971     and "does" nothing by itself. All it "does" is to define a function
972     named C<client_connect>, which expects a client port and a nick name as
973     arguments. It then monitors the client port and binds a receive callback
974     on C<$SELF>, which expects messages that in turn are broadcast to all
975     clients.
976 root 1.34
977     The two C<mon> calls are a bit tricky - the first C<mon> is a shorthand
978     for C<mon $client, $SELF>. The second does the normal "client has gone
979 root 1.52 away" clean-up action.
980 root 1.34
981 root 1.52 The last line, the C<rcv $SELF>, is a good hint that something interesting
982     is going on. And indeed, when looking at the client code, you can see a
983     new function, C<spawn>:
984     #todo#
985 root 1.34
986     use common::sense;
987     use AnyEvent::MP;
988    
989     my $nick = shift;
990    
991     configure;
992    
993     $| = 1;
994    
995     my $port = port;
996    
997     my ($client, $server);
998    
999     sub server_connect {
1000 root 1.40 my $servernodes = grp_get "eg_chat_server2"
1001 root 1.34 or return after 1, \&server_connect;
1002    
1003     print "\rconnecting...\n";
1004    
1005     $client = port { print "\r \r@_\n> " };
1006     mon $client, sub {
1007     print "\rdisconnected @_\n";
1008     &server_connect;
1009     };
1010    
1011     $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
1012     mon $server, $client;
1013     }
1014    
1015     server_connect;
1016    
1017     my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
1018     chomp (my $line = <STDIN>);
1019     print "> ";
1020     snd $server, $line
1021     if $server;
1022     });
1023    
1024     print "> ";
1025     AnyEvent->condvar->recv;
1026    
1027     The client is quite similar to the previous one, but instead of contacting
1028 root 1.39 the server I<port> (which no longer exists), it C<spawn>s (creates) a new
1029     the server I<port on node>:
1030 root 1.34
1031     $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
1032     mon $server, $client;
1033    
1034 root 1.39 And of course the first thing after creating it is monitoring it.
1035 root 1.34
1036 root 1.52 Phew, let's go through this in slow motion: the C<spawn> function creates
1037     a new port on a remote node and returns its port ID. After creating
1038     the port it calls a function on the remote node, passing any remaining
1039     arguments to it, and - most importantly - executes the function within
1040     the context of the new port, so it can be manipulated by referring to
1041     C<$SELF>. The init function can reside in a module (actually it normally
1042     I<should> reside in a module) - AnyEvent::MP will automatically load the
1043     module if the function isn't defined.
1044 root 1.39
1045     The C<spawn> function returns immediately, which means you can instantly
1046 root 1.34 send messages to the port, long before the remote node has even heard
1047     of our request to create a port on it. In fact, the remote node might
1048     not even be running. Despite these troubling facts, everything should
1049     work just fine: if the node isn't running (or the init function throws an
1050     exception), then the monitor will trigger because the port doesn't exist.
1051    
1052     If the spawn message gets delivered, but the monitoring message is not
1053 root 1.39 because of network problems (extremely unlikely, but monitoring, after
1054     all, is implemented by passing a message, and messages can get lost), then
1055     this connection loss will eventually trigger the monitoring action. On the
1056     remote node (which in return monitors the client) the port will also be
1057     cleaned up on connection loss. When the remote node comes up again and our
1058     monitoring message can be delivered, it will instantly fail because the
1059     port has been cleaned up in the meantime.
1060 root 1.34
1061     If your head is spinning by now, that's fine - just keep in mind, after
1062 root 1.52 creating a port using C<spawn>, monitor it on the local node, and monitor
1063     "the other side" from the remote node, and all will be cleaned up just
1064     fine.
1065 root 1.34
1066 root 1.36 =head2 Services
1067 root 1.34
1068 root 1.53 Above it was mentioned that C<spawn> automatically loads modules. This can
1069     be exploited in various useful ways.
1070 root 1.36
1071     Assume for a moment you put the server into a file called
1072     F<mymod/chatserver.pm> reachable from the current directory. Then you
1073     could run a node there with:
1074    
1075     aemp run
1076    
1077     The other nodes could C<spawn> the server by using
1078 root 1.53 C<mymod::chatserver::client_connect> as init function - without any other
1079     configuration.
1080 root 1.36
1081 root 1.53 Likewise, when you have some service that starts automatically when loaded
1082     (similar to AnyEvent::MP::Global), then you can configure this service
1083     statically:
1084 root 1.36
1085     aemp profile mysrvnode services mymod::service::
1086     aemp run profile mysrvnode
1087    
1088 root 1.39 And the module will automatically be loaded in the node, as specifying a
1089 root 1.38 module name (with C<::>-suffix) will simply load the module, which is then
1090     free to do whatever it wants.
1091 root 1.36
1092     Of course, you can also do it in the much more standard way by writing
1093     a module (e.g. C<BK::Backend::IRC>), installing it as part of a module
1094 root 1.53 distribution and then configure nodes. For example, if I wanted to run the
1095 root 1.36 Bummskraut IRC backend on a machine named "ruth", I could do this:
1096    
1097     aemp profile ruth addservice BK::Backend::IRC::
1098    
1099 root 1.43 And any F<aemp run> on that host will automatically have the Bummskraut
1100     IRC backend running.
1101 root 1.36
1102 root 1.53 There are plenty of possibilities you can use - it's all up to you how you
1103 root 1.36 structure your application.
1104 elmex 1.7
1105 root 1.42 =head1 PART 4: Coro::MP - selective receive
1106    
1107     Not all problems lend themselves naturally to an event-based solution:
1108     sometimes things are easier if you can decide in what order you want to
1109 root 1.53 receive messages, regardless of the order in which they were sent.
1110 root 1.42
1111     In these cases, L<Coro::MP> can provide a nice solution: instead of
1112 root 1.53 registering callbacks for each message type, C<Coro::MP> attaches a
1113 root 1.42 (coro-) thread to a port. The thread can then opt to selectively receive
1114     messages it is interested in. Other messages are not lost, but queued, and
1115     can be received at a later time.
1116    
1117 root 1.43 The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate
1118 root 1.42 module. It is, however, tightly integrated into C<AnyEvent::MP> - the
1119     ports it creates are fully compatible to C<AnyEvent::MP> ports.
1120    
1121     In fact, C<Coro::MP> is more of an extension than a separate module: all
1122     functions exported by C<AnyEvent::MP> are exported by it as well.
1123    
1124     To illustrate how programing with C<Coro::MP> looks like, consider the
1125     following (slightly contrived) example: Let's implement a server that
1126     accepts a C<< (write_file =>, $port, $path) >> message with a (source)
1127     port and a filename, followed by as many C<< (data => $port, $data) >>
1128     messages as required to fill the file, followed by an empty C<< (data =>
1129     $port) >> message.
1130    
1131     The server only writes a single file at a time, other requests will stay
1132     in the queue until the current file has been finished.
1133    
1134     Here is an example implementation that uses L<Coro::AIO> and largely
1135     ignores error handling:
1136    
1137     my $ioserver = port_async {
1138     while () {
1139     my ($tag, $port, $path) = get_cond;
1140    
1141     $tag eq "write_file"
1142     or die "only write_file messages expected";
1143    
1144     my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1145     or die "$path: $!";
1146    
1147     while () {
1148     my (undef, undef, $data) = get_cond {
1149     $_[0] eq "data" && $_[1] eq $port
1150     } 5
1151     or die "timeout waiting for data message from $port\n";
1152    
1153     length $data or last;
1154    
1155     aio_write $fh, undef, undef, $data, 0;
1156     };
1157     }
1158     };
1159    
1160     mon $ioserver, sub {
1161     warn "ioserver was killed: @_\n";
1162     };
1163    
1164 root 1.53 Let's go through it, section by section.
1165 root 1.42
1166     my $ioserver = port_async {
1167    
1168 root 1.43 Ports can be created by attaching a thread to an existing port via
1169 root 1.53 C<rcv_async>, or as in this example, by calling C<port_async> with the
1170     code to execute as a thread. The C<async> component comes from the fact
1171     that threads are created using the C<Coro::async> function.
1172 root 1.42
1173     The thread runs in a normal port context (so C<$SELF> is set). In
1174     addition, when the thread returns, it will be C<kil> I<normally>, i.e.
1175     without a reason argument.
1176    
1177     while () {
1178     my ($tag, $port, $path) = get_cond;
1179     or die "only write_file messages expected";
1180    
1181 root 1.53 The thread is supposed to serve many file writes, which is why it
1182     executes in a loop. The first thing it does is fetch the next message,
1183     using C<get_cond>, the "conditional message get". Without arguments, it
1184     merely fetches the I<next> message from the queue, which I<must> be a
1185     C<write_file> message.
1186 root 1.42
1187     The message contains the C<$path> to the file, which is then created:
1188    
1189     my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1190     or die "$path: $!";
1191    
1192     Then we enter a loop again, to serve as many C<data> messages as
1193 root 1.43 necessary:
1194 root 1.42
1195     while () {
1196     my (undef, undef, $data) = get_cond {
1197     $_[0] eq "data" && $_[1] eq $port
1198     } 5
1199     or die "timeout waiting for data message from $port\n";
1200    
1201     This time, the condition is not empty, but instead a code block: similarly
1202     to grep, the code block will be called with C<@_> set to each message in
1203     the queue, and it has to return whether it wants to receive the message or
1204     not.
1205    
1206     In this case we are interested in C<data> messages (C<< $_[0] eq "data"
1207     >>), whose first element is the source port (C<< $_[1] eq $port >>).
1208    
1209     The condition must be this strict, as it is possible to receive both
1210     C<write_file> messages and C<data> messages from other ports while we
1211     handle the file writing.
1212    
1213 root 1.53 The lone C<5> argument at the end is a timeout - when no matching message
1214     is received within C<5> seconds, we assume an error and C<die>.
1215 root 1.42
1216     When an empty C<data> message is received we are done and can close the
1217     file (which is done automatically as C<$fh> goes out of scope):
1218    
1219     length $data or last;
1220    
1221     Otherwise we need to write the data:
1222    
1223     aio_write $fh, undef, undef, $data, 0;
1224    
1225 root 1.53 And that's basically it. Note that every port thread should have some
1226     kind of supervisor. In our case, the supervisor simply prints any error
1227     message:
1228 root 1.42
1229     mon $ioserver, sub {
1230     warn "ioserver was killed: @_\n";
1231     };
1232    
1233     Here is a usage example:
1234    
1235     port_async {
1236     snd $ioserver, write_file => $SELF, "/tmp/unsafe";
1237     snd $ioserver, data => $SELF, "abc\n";
1238     snd $ioserver, data => $SELF, "def\n";
1239     snd $ioserver, data => $SELF;
1240     };
1241    
1242     The messages are sent without any flow control or acknowledgement (feel
1243     free to improve). Also, the source port does not actually need to be a
1244     port - any unique ID will do - but port identifiers happen to be a simple
1245     source of network-wide unique IDs.
1246    
1247     Apart from C<get_cond> as seen above, there are other ways to receive
1248     messages. The C<write_file> message above could also selectively be
1249     received using a C<get> call:
1250    
1251     my ($port, $path) = get "write_file";
1252    
1253     This is simpler, but when some other code part sends an unexpected message
1254     to the C<$ioserver> it will stay in the queue forever. As a rule of thumb,
1255     every threaded port should have a "fetch next message unconditionally"
1256     somewhere, to avoid filling up the queue.
1257    
1258 root 1.53 Finally, it is also possible to use more switch-like C<get_conds>:
1259 root 1.42
1260     get_cond {
1261     $_[0] eq "msg1" and return sub {
1262     my (undef, @msg1_data) = @_;
1263     ...;
1264     };
1265    
1266     $_[0] eq "msg2" and return sub {
1267     my (undef, @msg2_data) = @_;
1268     ...;
1269     };
1270    
1271     die "unexpected message $_[0] received";
1272     };
1273    
1274 root 1.37 =head1 THE END
1275    
1276     This is the end of this introduction, but hopefully not the end of
1277 root 1.43 your career as AEMP user. I hope the tutorial was enough to make the
1278 root 1.37 basic concepts clear. Keep in mind that distributed programming is not
1279 root 1.53 completely trivial, in fact, it's pretty complicated. We hope AEMP makes
1280     it simpler and will be useful to create exciting new applications.
1281 root 1.37
1282 elmex 1.1 =head1 SEE ALSO
1283    
1284     L<AnyEvent::MP>
1285    
1286 elmex 1.20 L<AnyEvent::MP::Global>
1287    
1288 root 1.42 L<Coro::MP>
1289    
1290 root 1.34 L<AnyEvent>
1291    
1292 elmex 1.1 =head1 AUTHOR
1293    
1294     Robin Redeker <elmex@ta-sa.org>
1295 root 1.32 Marc Lehmann <schmorp@schmorp.de>
1296 root 1.4