ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Intro.pod
Revision: 1.58
Committed: Fri Mar 23 21:16:25 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.57: +14 -12 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.4 =head1 Message Passing for the Non-Blocked Mind
2 elmex 1.1
3 root 1.8 =head1 Introduction and Terminology
4 elmex 1.1
5 root 1.4 This is a tutorial about how to get the swing of the new L<AnyEvent::MP>
6 root 1.23 module, which allows programs to transparently pass messages within the
7     process and to other processes on the same or a different host.
8 elmex 1.1
9 root 1.23 What kind of messages? Basically a message here means a list of Perl
10 root 1.15 strings, numbers, hashes and arrays, anything that can be expressed as a
11 root 1.43 L<JSON> text (as JSON is the default serialiser in the protocol). Here are
12     two examples:
13 elmex 1.1
14 root 1.23 write_log => 1251555874, "action was successful.\n"
15     123, ["a", "b", "c"], { foo => "bar" }
16 elmex 1.21
17 root 1.23 When using L<AnyEvent::MP> it is customary to use a descriptive string as
18 root 1.46 first element of a message that indicates the type of the message. This
19 root 1.23 element is called a I<tag> in L<AnyEvent::MP>, as some API functions
20     (C<rcv>) support matching it directly.
21    
22     Supposedly you want to send a ping message with your current time to
23     somewhere, this is how such a message might look like (in Perl syntax):
24    
25     ping => 1251381636
26    
27     Now that we know what a message is, to which entities are those
28     messages being I<passed>? They are I<passed> to I<ports>. A I<port> is
29     a destination for messages but also a context to execute code: when
30     a runtime error occurs while executing code belonging to a port, the
31     exception will be raised on the port and can even travel to interested
32     parties on other nodes, which makes supervision of distributed processes
33     easy.
34    
35     How do these ports relate to things you know? Each I<port> belongs
36     to a I<node>, and a I<node> is just the UNIX process that runs your
37     L<AnyEvent::MP> application.
38    
39     Each I<node> is distinguished from other I<nodes> running on the same or
40     another host in a network by its I<node ID>. A I<node ID> is simply a
41     unique string chosen manually or assigned by L<AnyEvent::MP> in some way
42     (UNIX nodename, random string...).
43    
44     Here is a diagram about how I<nodes>, I<ports> and UNIX processes relate
45     to each other. The setup consists of two nodes (more are of course
46     possible): Node C<A> (in UNIX process 7066) with the ports C<ABC> and
47     C<DEF>. And the node C<B> (in UNIX process 8321) with the ports C<FOO> and
48     C<BAR>.
49 elmex 1.17
50    
51     |- PID: 7066 -| |- PID: 8321 -|
52     | | | |
53     | Node ID: A | | Node ID: B |
54     | | | |
55     | Port ABC =|= <----\ /-----> =|= Port FOO |
56     | | X | |
57     | Port DEF =|= <----/ \-----> =|= Port BAR |
58     | | | |
59     |-------------| |-------------|
60    
61 root 1.23 The strings for the I<port IDs> here are just for illustrative
62     purposes: Even though I<ports> in L<AnyEvent::MP> are also identified by
63 root 1.43 strings, they can't be chosen manually and are assigned by the system
64 root 1.23 dynamically. These I<port IDs> are unique within a network and can also be
65 root 1.46 used to identify senders, or even as message tags for instance.
66 root 1.23
67     The next sections will explain the API of L<AnyEvent::MP> by going through
68     a few simple examples. Later some more complex idioms are introduced,
69     which are hopefully useful to solve some real world problems.
70 root 1.8
71 root 1.39 =head2 Passing Your First Message
72 elmex 1.16
73 root 1.46 For starters, let's have a look at the messaging API. The following
74     example is just a demo to show the basic elements of message passing with
75 root 1.24 L<AnyEvent::MP>.
76    
77     The example should print: C<Ending with: 123>, in a rather complicated
78     way, by passing some message to a port.
79 elmex 1.16
80     use AnyEvent;
81     use AnyEvent::MP;
82    
83     my $end_cv = AnyEvent->condvar;
84    
85     my $port = port;
86    
87     rcv $port, test => sub {
88     my ($data) = @_;
89     $end_cv->send ($data);
90     };
91    
92     snd $port, test => 123;
93    
94     print "Ending with: " . $end_cv->recv . "\n";
95    
96 root 1.24 It already uses most of the essential functions inside
97 root 1.46 L<AnyEvent::MP>: First there is the C<port> function which creates a
98 root 1.24 I<port> and will return it's I<port ID>, a simple string.
99    
100     This I<port ID> can be used to send messages to the port and install
101     handlers to receive messages on the port. Since it is a simple string
102     it can be safely passed to other I<nodes> in the network when you want
103     to refer to that specific port (usually used for RPC, where you need
104     to tell the other end which I<port> to send the reply to - messages in
105     L<AnyEvent::MP> have a destination, but no source).
106 elmex 1.17
107 root 1.24 The next function is C<rcv>:
108 elmex 1.16
109 elmex 1.17 rcv $port, test => sub { ... };
110 elmex 1.16
111 root 1.24 It installs a receiver callback on the I<port> that specified as the first
112     argument (it only works for "local" ports, i.e. ports created on the same
113     node). The next argument, in this example C<test>, specifies a I<tag> to
114     match. This means that whenever a message with the first element being
115     the string C<test> is received, the callback is called with the remaining
116 elmex 1.17 parts of that message.
117    
118 root 1.24 Messages can be sent with the C<snd> function, which is used like this in
119     the example above:
120 elmex 1.17
121     snd $port, test => 123;
122    
123 root 1.24 This will send the message C<'test', 123> to the I<port> with the I<port
124     ID> stored in C<$port>. Since in this case the receiver has a I<tag> match
125     on C<test> it will call the callback with the first argument being the
126     number C<123>.
127    
128 root 1.43 The callback is a typical AnyEvent idiom: the callback just passes
129 root 1.24 that number on to the I<condition variable> C<$end_cv> which will then
130     pass the value to the print. Condition variables are out of the scope
131     of this tutorial and not often used with ports, so please consult the
132 elmex 1.17 L<AnyEvent::Intro> about them.
133    
134 root 1.24 Passing messages inside just one process is boring. Before we can move on
135     and do interprocess message passing we first have to make sure some things
136     have been set up correctly for our nodes to talk to each other.
137 elmex 1.17
138 root 1.39 =head2 System Requirements and System Setup
139 elmex 1.17
140 root 1.25 Before we can start with real IPC we have to make sure some things work on
141     your system.
142 elmex 1.17
143 root 1.25 First we have to setup a I<shared secret>: for two L<AnyEvent::MP>
144     I<nodes> to be able to communicate with each other over the network it is
145     necessary to setup the same I<shared secret> for both of them, so they can
146     prove their trustworthyness to each other.
147 elmex 1.17
148     The easiest way is to set this up is to use the F<aemp> utility:
149    
150     aemp gensecret
151    
152 root 1.25 This creates a F<$HOME/.perl-anyevent-mp> config file and generates a
153     random shared secret. You can copy this file to any other system and
154     then communicate over the network (via TCP) with it. You can also select
155     your own shared secret (F<aemp setsecret>) and for increased security
156     requirements you can even create (or configure) a TLS certificate (F<aemp
157     gencert>), causing connections to not just be securely authenticated, but
158     also to be encrypted and protected against tinkering.
159    
160     Connections will only be successfully established when the I<nodes>
161     that want to connect to each other have the same I<shared secret> (or
162     successfully verify the TLS certificate of the other side, in which case
163     no shared secret is required).
164 elmex 1.17
165     B<If something does not work as expected, and for example tcpdump shows
166     that the connections are closed almost immediately, you should make sure
167     that F<~/.perl-anyevent-mp> is the same on all hosts/user accounts that
168     you try to connect with each other!>
169 elmex 1.16
170 root 1.25 Thats is all for now, you will find some more advanced fiddling with the
171     C<aemp> utility later.
172    
173 root 1.35 =head2 Shooting the Trouble
174    
175     Sometimes things go wrong, and AnyEvent::MP, being a professional module,
176 root 1.43 does not gratuitously spill out messages to your screen.
177 root 1.35
178     To help troubleshooting any issues, there are two environment variables
179 root 1.55 that you can set. The first, C<AE_VERBOSE> sets the logging level of
180 root 1.57 L<AnyEvent::Log>, which AnyEvent::MP uses. The default is C<4>, which
181 root 1.55 means nothing much is printed. You can increase it to C<8> or C<9> to get
182     more verbose output. This is example output when starting a node (somewhat
183     abridged to get shorter lines):
184    
185     2012-03-22 01:41:43.59 debug AE::Util: using Guard module to implement guards.
186     2012-03-22 01:41:43.62 debug AE::MP::Kernel: node cerebro/slwK2LEq7O starting up.
187     2012-03-22 01:41:43.62 debug AE::MP::Kernel: node listens on [10.0.0.1:52110].
188     2012-03-22 01:41:43.62 trace AE::MP::Kernel: trying connect to seed node 10.0.0.19:4040.
189 root 1.57 2012-03-22 01:41:43.66 trace AE::MP::Transport: 10.0.0.19:4040 connected as rain.
190     2012-03-22 01:41:43.66 info AE::MP::Kernel: rain is up.
191 root 1.55
192     A lot of info, but at least you can see that it does something. To only
193     get info about AnyEvent::MP, you can use C<AE_LOG=AnyEvent::MP=+log> in
194     your environment.
195 root 1.35
196     The other environment variable that can be useful is
197     C<PERL_ANYEVENT_MP_TRACE>, which, when set to a true value, will cause
198 root 1.46 most messages that are sent or received to be printed. For example, F<aemp
199     restart rijk> might output these message exchanges:
200 root 1.35
201 root 1.46 SND rijk <- [null,"eval","AnyEvent::Watchdog::Util::restart; ()","aemp/cerebro/z4kUPp2JT4#b"]
202     SND rain <- [null,"g_slave",{"'l":{"aemp/cerebro/z4kUPp2JT4":["10.0.0.1:48168"]}}]
203     SND rain <- [null,"g_find","rijk"]
204     RCV rain -> ["","g_found","rijk",["10.0.0.23:4040"]]
205     RCV rijk -> ["b",""]
206 elmex 1.18
207 root 1.30 =head1 PART 1: Passing Messages Between Processes
208 elmex 1.18
209     =head2 The Receiver
210    
211 root 1.25 Lets split the previous example up into two programs: one that contains
212     the sender and one for the receiver. First the receiver application, in
213     full:
214 elmex 1.18
215     use AnyEvent;
216     use AnyEvent::MP;
217    
218 root 1.45 configure nodeid => "eg_receiver/%u", binds => ["*:4040"];
219 elmex 1.18
220     my $port = port;
221 root 1.47 db_set eg_receivers => $port;
222 elmex 1.18
223     rcv $port, test => sub {
224     my ($data, $reply_port) = @_;
225    
226     print "Received data: " . $data . "\n";
227     };
228    
229     AnyEvent->condvar->recv;
230    
231 root 1.51 Now, that wasn't too bad, was it? OK, let's go through the new functions
232 root 1.47 that have been used.
233 elmex 1.18
234 root 1.44 =head3 C<configure> and Joining and Maintaining the Network
235 elmex 1.18
236 root 1.47 First let's have a look at C<configure>:
237 elmex 1.18
238 root 1.47 configure nodeid => "eg_receiver/%u", binds => ["*:4040"];
239 elmex 1.18
240 root 1.58 Before we are able to send messages to other nodes we have to configure
241     the node to become a "networked node". Configuring a node means naming
242     the node and binding some TCP listeners so that other nodes can contact
243     it. The choice on whether a process becomes a networked node or not must
244     be done before doing anything else with AnyEvent::MP.
245 root 1.47
246 root 1.58 Additionally, to actually link all nodes in a network together, you should
247 root 1.47 specify a number of seed addresses, which will be used by the node to
248     connect itself into an existing network, as we will see shortly.
249 root 1.26
250 root 1.58 All of this info (and more) can be passed to the C<configure> function -
251     later we will see how we can do all this without even passing anything to
252 root 1.28 C<configure>!
253    
254 root 1.58 Back to the function call in the program: the first parameter, C<nodeid>,
255     specified the node ID (in this case C<eg_receiver/%u> - the default is to
256     use the node name of the current host plus C</%u>, which goves the node a
257     name with a random suffix to make it unique, but for this example we want
258     the node to have a bit more personality, and name it C<eg_receiver> with a
259     random suffix.
260 root 1.47
261     Why the random suffix? Node IDs need to be unique within the network and
262     appending a random suffix is the easiest way to do that.
263 root 1.28
264     The second parameter, C<binds>, specifies a list of C<address:port> pairs
265     to bind TCP listeners on. The special "address" of C<*> means to bind on
266 root 1.47 every local IP address (this might not work on every OS, so explicit IP
267     addresses are best).
268 root 1.28
269     The reason to bind on a TCP port is not just that other nodes can connect
270     to us: if no binds are specified, the node will still bind on a dynamic
271     port on all local addresses - but in this case we won't know the port, and
272     cannot tell other nodes to connect to it as seed node.
273    
274 root 1.47 Now, a I<seed> is simply the TCP address of some other node in the
275     network, often the same string as used for the C<binds> parameter of the
276     other node. The need for seeds is easy to explain: I<somehow> the nodes
277     of an aemp network have to find each other, and often this means over the
278     internet. So broadcasts are out.
279    
280 root 1.56 Instead, a node usually specifies the addresses of one or few (for
281     redundancy) other nodes, some of which should be up. Two nodes can set
282     each other as seeds without any issues. You could even specify all nodes
283     as seeds for all nodes, for total redundancy. But the common case is to
284     have some more or less central, stable servers running seed services for
285     other nodes.
286 root 1.47
287     All you need to do to ensure that an AnyEvent::MP network connects
288 root 1.54 together is to make sure that all seed nodes are connected together via
289     their seed connections, i.e., all connections from seed nodes to I<their>
290 root 1.56 seed nodes form a connected graph.
291 root 1.47
292     A node tries to keep connections open to all of it's seed nodes at all
293     times, while other connections are made on demand only.
294    
295 root 1.56 The simplest way to do that would be for all nodes to use the same seed
296     nodes: seed nodes would seed each other, and all other nodes would connect
297     to the seed nodes.
298    
299 root 1.47 All of this ensures that the network stays one network - even if all the
300     nodes in one half of the net are separated from the nodes in the other
301     half by some network problem, once that is over, they will eventually
302     become a single network again.
303    
304     In addition to creating the network, a node also expects the seed nodes to
305 root 1.54 run the shared database service - if need be, by automatically starting
306     it, so you don't normally need to configure this explicitly.
307 root 1.47
308     The process of joining a network takes time, during which the node
309     is already running. This means it takes time until the node is
310     fully connected, and information about services in the network are
311 root 1.54 available. This is why most AnyEvent::MP programs either just register
312     themselves in the database and wait to be "found" by others, or they start
313     to monitor the database until some nodes of the required type show up.
314 root 1.47
315     We will see how this is done later, in the sender program.
316 elmex 1.19
317 root 1.28 =head3 Registering the Receiver
318 elmex 1.19
319 root 1.47 Coming back to our example, after the node has been configured for network
320     access, it is time to publish some service, namely the receive service.
321 elmex 1.19
322 root 1.47 For that, let's look at the next lines:
323 elmex 1.19
324     my $port = port;
325 root 1.47 db_set eg_receivers => $port;
326 elmex 1.19
327 root 1.27 The C<port> function has already been discussed. It simply creates a new
328 root 1.51 I<port> and returns the I<port ID>. The C<db_set> function, however, is
329 root 1.47 new: The first argument is the name of a I<database family> and the second
330     argument is the name of a I<subkey> within that family. The third argument
331     would be the I<value> to be associated with the family and subkey, but,
332     since it is missing, it will simply be C<undef>.
333    
334 root 1.51 What is a "family" you wonder? Well, AnyEvent::MP comes with a distributed
335     database. This database runs on so-called "global" nodes, which usually
336     are the seed nodes of your network. The database structure is "simply" a
337     hash of hashes of values.
338 root 1.47
339 root 1.51 To illustrate this with Perl syntax, assume the database was stored in
340     C<%DB>, then the C<db_set> function more or less would do this:
341 root 1.47
342     $DB{eg_receivers}{$port} = undef;
343    
344     So the ominous "family" selects a hash in the database, and the "subkey"
345 root 1.51 is simply the key in this hash - C<db_set> very much works like this
346 root 1.47 assignment.
347    
348     The family namespace is shared by all nodes in a network, so the names
349     should be reasonably unique, for example, they could start with the name
350 root 1.48 of your module, or the name of the program, using your port name or node
351     name as subkey.
352 root 1.27
353 root 1.47 The purpose behind adding this key to the database is that the sender can
354     look it up and find our port. We will shortly see how.
355 root 1.27
356     The last step in the example is to set up a receiver callback for those
357     messages, just as was discussed in the first example. We again match
358     for the tag C<test>. The difference is that this time we don't exit the
359     application after receiving the first message. Instead we continue to wait
360     for new messages indefinitely.
361 elmex 1.19
362 elmex 1.20 =head2 The Sender
363 root 1.8
364 root 1.48 OK, now let's take a look at the sender code:
365 root 1.4
366 elmex 1.1 use AnyEvent;
367     use AnyEvent::MP;
368    
369 root 1.45 configure nodeid => "eg_sender/%u", seeds => ["*:4040"];
370 elmex 1.1
371 root 1.47 my $guard = db_mon eg_receivers => sub {
372 root 1.50 my ($family, $a, $c, $d) = @_;
373 root 1.47 return unless %$family;
374    
375     # now there are some receivers, send them a message
376 root 1.50 snd $_ => test => time
377 root 1.47 for keys %$family;
378     };
379 elmex 1.1
380     AnyEvent->condvar->recv;
381    
382 root 1.28 It's even less code. The C<configure> serves the same purpose as in the
383 root 1.48 receiver, but instead of specifying binds we specify a list of seeds - the
384     only seed happens to be the same as the bind used by the receiver, which
385 root 1.47 therefore becomes our seed node.
386 root 1.27
387 root 1.48 Remember the part about having to wait till things become available? Well,
388     after configure returns, nothing has been done yet - the node is not
389     connected to the network, knows nothing about the database contents, and
390     it can take ages (for a computer :) for this situation to change.
391 root 1.47
392     Therefore, the sender waits, in this case by using the C<db_mon>
393     function. This function registers an interest in a specific database
394 root 1.48 family (in this case C<eg_receivers>). Each time something inside the
395     family changes (a key is added, changed or deleted), it will call our
396     callback with the family hash as first argument, and the list of keys as
397     second argument.
398    
399     The callback only checks whether the C<%$family> has is empty - if it is,
400     then it doesn't do anything. But eventually the family will contain the
401     port subkey we set in the sender. Then it will send a message to it (and
402     any other receiver in the same family). Likewise, should the receiver go
403     away and come back, or should another receiver come up, it will again send
404     a message to all of them.
405 root 1.47
406     You can experiment by having multiple receivers - you have to change the
407     "binds" parameter in the receiver to the seeds used in the sender to start
408     up additional receivers, but then you can start as many as you like. If
409     you specify proper IP addresses for the seeds, you can even run them on
410     different computers.
411    
412     Each time you start the sender, it will send a message to all receivers it
413 root 1.48 finds (you have to interrupt it manually afterwards).
414 root 1.47
415 root 1.51 Additional experiments you could try include using
416     C<PERL_ANYEVENT_MP_TRACE=1> to see which messages are exchanged, or
417     starting the sender before the receiver and see how long it then takes to
418     find the receiver.
419 root 1.27
420 root 1.28 =head3 Splitting Network Configuration and Application Code
421    
422 root 1.49 OK, so far, this works reasonably. In the real world, however, the person
423     configuring your application to run on a specific network (the end user
424     or network administrator) is often different to the person coding the
425     application.
426 root 1.28
427     Or to put it differently: the arguments passed to configure are usually
428 root 1.49 provided not by the programmer, but by whoever is deploying the program -
429     even in the example above, we would like to be able to just start senders
430     and receivers without having to patch the programs.
431 root 1.28
432     To make this easy, AnyEvent::MP supports a simple configuration database,
433     using profiles, which can be managed using the F<aemp> command-line
434 root 1.49 utility (yes, this section is about the advanced tinkering mentioned
435 root 1.30 before).
436 root 1.28
437     When you change both programs above to simply call
438    
439     configure;
440    
441     then AnyEvent::MP tries to look up a profile using the current node name
442     in its configuration database, falling back to some global default.
443    
444     You can run "generic" nodes using the F<aemp> utility as well, and we will
445     exploit this in the following way: we configure a profile "seed" and run
446     a node using it, whose sole purpose is to be a seed node for our example
447     programs.
448    
449     We bind the seed node to port 4040 on all interfaces:
450    
451 root 1.29 aemp profile seed binds "*:4040"
452 root 1.28
453     And we configure all nodes to use this as seed node (this only works when
454 root 1.51 running on the same host, for multiple machines you would replace the C<*>
455     by the IP address or hostname of the node running the seed), by changing
456     the global settings shared between all profiles:
457 root 1.28
458 root 1.49 aemp seeds "*:4040"
459 root 1.28
460     Then we run the seed node:
461    
462     aemp run profile seed
463    
464 root 1.49 After that, we can start as many other nodes as we want, and they will
465     all use our generic seed node to discover each other. The reason we can
466     start our existing programs even though they specify "incompatible"
467     parameters to C<configure> is that the configuration file (by default)
468     takes precedence over any arguments passed to C<configure>.
469 elmex 1.7
470 root 1.30 That's all for now - next we will teach you about monitoring by writing a
471     simple chat client and server :)
472    
473     =head1 PART 2: Monitoring, Supervising, Exception Handling and Recovery
474    
475     That's a mouthful, so what does it mean? Our previous example is what one
476     could call "very loosely coupled" - the sender doesn't care about whether
477     there are any receivers, and the receivers do not care if there is any
478     sender.
479    
480     This can work fine for simple services, but most real-world applications
481     want to ensure that the side they are expecting to be there is actually
482     there. Going one step further: most bigger real-world applications even
483     want to ensure that if some component is missing, or has crashed, it will
484     still be there, by recovering and restarting the service.
485    
486     AnyEvent::MP supports this by catching exceptions and network problems,
487 root 1.49 and notifying interested parties of these.
488 root 1.30
489 root 1.41 =head2 Exceptions, Port Context, Network Errors and Monitors
490 root 1.30
491     =head3 Exceptions
492    
493 root 1.49 Exceptions are handled on a per-port basis: all receive callbacks are
494     executed in a special context, the so-called I<port-context>: code
495     that throws an otherwise uncaught exception will cause the port to be
496     C<kil>led. Killed ports are destroyed automatically (killing ports is
497     actually the only way to free ports).
498 root 1.30
499 root 1.49 Ports can be monitored, even from a different node and host, and when a
500     port is killed, any entity monitoring it will be notified.
501 root 1.30
502     Here is a simple example:
503    
504     use AnyEvent::MP;
505    
506     # create a port, it always dies
507     my $port = port { die "oops" };
508    
509     # monitor it
510     mon $port, sub {
511     warn "$port was killed (with reason @_)";
512     };
513    
514     # now send it some message, causing it to die:
515     snd $port;
516    
517 root 1.49 AnyEvent->condvar->recv;
518    
519 root 1.30 It first creates a port whose only action is to throw an exception,
520     and the monitors it with the C<mon> function. Afterwards it sends it a
521     message, causing it to die and call the monitoring callback:
522    
523     anon/6WmIpj.a was killed (with reason die oops at xxx line 5.) at xxx line 9.
524    
525 root 1.49 The callback was actually passed two arguments: C<die>, to indicate it
526     did throw an I<exception> as opposed to, say, a network error, and the
527     exception message itself.
528 root 1.30
529     What happens when a port is killed before we have a chance to monitor
530     it? Granted, this is highly unlikely in our example, but when you program
531     in a network this can easily happen due to races between nodes.
532    
533     use AnyEvent::MP;
534    
535     my $port = port { die "oops" };
536    
537     snd $port;
538    
539     mon $port, sub {
540     warn "$port was killed (with reason @_)";
541     };
542    
543 root 1.49 AnyEvent->condvar->recv;
544    
545 root 1.51 This time we will get something else:
546 root 1.30
547 root 1.51 2012-03-21 00:50:36 <2> unmonitored local port fADb died with reason: die oops at - line 3.
548     anon/fADb was killed (with reason no_such_port cannot monitor nonexistent port)
549 root 1.30
550 root 1.57 The first line is an error message that is printed when a port dies that
551     isn't being monitored, because that is normally a bug. When later a C<mon>
552     is attempted, it is immediately killed, because the port is already
553     gone. The kill reason is now C<no_such_port> with some descriptive (we
554     hope) error message.
555 root 1.30
556 root 1.51 As you probably suspect from these examples, the kill reason is usually
557     some identifier as first argument and a human-readable error message as
558     second argument - all kill reasons by AnyEvent::MP itself follow this
559     pattern. But the kill reason can be anything: it is simply a list of
560     values you can choose yourself. It can even be nothing (an empty list) -
561     this is called a "normal" kill.
562    
563     Apart from die'ing, you can kill ports manually using the C<kil>
564     function. Using the C<kil> function will be treated like an error when a
565     non-empty reason is specified:
566 root 1.30
567 root 1.51 kil $port, custom_error => "don't like your steenking face";
568 root 1.30
569 root 1.51 And a I<normal> kill without any reason arguments:
570 root 1.30
571     kil $port;
572    
573     By now you probably wonder what this "normal" kill business is: A common
574     idiom is to not specify a callback to C<mon>, but another port, such as
575     C<$SELF>:
576    
577     mon $port, $SELF;
578    
579 root 1.51 This basically means "monitor $port and kill me when it crashes" - and
580     the thing is, a "normal" kill does not count as a crash. This way you can
581     easily link ports together and make them crash together on errors, while
582     allowing you to remove a port silently when it has done it's job properly.
583 root 1.30
584 root 1.34 =head3 Port Context
585    
586 root 1.51 Code runs in the so-called "port context". That means C<$SELF> contains
587     its own port ID and exceptions that the code throws will be caught.
588 root 1.34
589     Since AnyEvent::MP is event-based, it is not uncommon to register
590 root 1.51 callbacks from within C<rcv> handlers. As example, assume that the
591     following port receive handler wants to C<die> a second later, using
592     C<after>:
593 root 1.34
594     my $port = port {
595     after 1, sub { die "oops" };
596     };
597    
598 root 1.51 If you try this out, you would find it does not work - when the C<after>
599     callback is executed, it does not run in the port context anymore, so
600     exceptions will not be caught.
601 root 1.34
602 root 1.41 For these cases, AnyEvent::MP exports a special "closure constructor"
603 root 1.51 called C<psub>, which works mostly like perl's built-in C<sub>:
604 root 1.34
605     my $port = port {
606     after 1, psub { die "oops" };
607     };
608    
609 root 1.51 C<psub> remembers the port context and returns a code reference. When the
610     code reference is invoked, it will run the code block within the context
611     that it was created in, so exception handling once more works as expected.
612 root 1.34
613 root 1.49 There is even a way to temporarily execute code in the context of some
614 root 1.41 port, namely C<peval>:
615    
616     peval $port, sub {
617     # die'ing here will kil $port
618     };
619    
620     The C<peval> function temporarily replaces C<$SELF> by the given C<$port>
621     and then executes the given sub in a port context.
622    
623 root 1.30 =head3 Network Errors and the AEMP Guarantee
624    
625 root 1.52 Earlier we mentioned another important source of monitoring failures:
626     network problems. When a node loses connection to another node, it will
627     invoke all monitoring actions, just as if the port was killed, I<even if
628     it is possible that the port is still happily alive on another node> (not
629     being able to talk to a node means we have no clue what's going on with
630     it, it could be crashed, but also still running without knowing we lost
631     the connection).
632 root 1.30
633 root 1.52 So another way to view monitors is: "notify me when some of my messages
634 root 1.30 couldn't be delivered". AEMP has a guarantee about message delivery to a
635     port: After starting a monitor, any message sent to a port will either
636     be delivered, or, when it is lost, any further messages will also be lost
637 elmex 1.31 until the monitoring action is invoked. After that, further messages
638 root 1.30 I<might> get delivered again.
639    
640     This doesn't sound like a very big guarantee, but it is kind of the best
641 root 1.52 you can get while staying sane: Specifically, it means that there will be
642     no "holes" in the message sequence: all messages sent are delivered in
643     order, without any of them missing in between, and when some were lost,
644     you I<will> be notified of that, so you can take recovery action.
645 root 1.30
646 root 1.49 And, obviously, the guarantee only works in the presence of
647     correctly-working hardware, and no relevant bugs inside AEMP itself.
648    
649 root 1.30 =head3 Supervising
650    
651 root 1.49 OK, so how is this crashing-everything-stuff going to make applications
652 root 1.52 I<more> stable? Well, in fact, the goal is not really to make them
653     more stable, but to make them more resilient against actual errors
654     and crashes. And this is not done by crashing I<everything>, but by
655     crashing everything except a I<supervisor> that then cleans up and sgtarts
656     everything again.
657 root 1.30
658 elmex 1.31 A supervisor is simply some code that ensures that an application (or a
659 root 1.49 part of it) is running, and if it crashes, is restarted properly. That is,
660     it supervises a service by starting and restarting it, as necessary.
661 root 1.30
662     To show how to do all this we will create a simple chat server that can
663     handle many chat clients. Both server and clients can be killed and
664 root 1.49 restarted, and even crash, to some extent, without disturbing the chat
665     functionality.
666 root 1.30
667     =head2 Chatting, the Resilient Way
668    
669     Without further ado, here is the chat server (to run it, we assume the
670 root 1.49 set-up explained earlier, with a separate F<aemp run seed> node):
671 root 1.30
672     use common::sense;
673     use AnyEvent::MP;
674    
675     configure;
676    
677     my %clients;
678    
679     sub msg {
680     print "relaying: $_[0]\n";
681     snd $_, $_[0]
682     for values %clients;
683     }
684    
685     our $server = port;
686    
687     rcv $server, join => sub {
688     my ($client, $nick) = @_;
689    
690     $clients{$client} = $client;
691    
692     mon $client, sub {
693     delete $clients{$client};
694     msg "$nick (quits, @_)";
695     };
696     msg "$nick (joins)";
697     };
698    
699     rcv $server, privmsg => sub {
700     my ($nick, $msg) = @_;
701     msg "$nick: $msg";
702     };
703    
704 root 1.49 db_set eg_chat_server => $server;
705 root 1.30
706     warn "server ready.\n";
707    
708     AnyEvent->condvar->recv;
709    
710 elmex 1.31 Looks like a lot, but it is actually quite simple: after your usual
711 root 1.30 preamble (this time we use common sense), we define a helper function that
712     sends some message to every registered chat client:
713    
714     sub msg {
715     print "relaying: $_[0]\n";
716     snd $_, $_[0]
717     for values %clients;
718     }
719    
720     The clients are stored in the hash C<%client>. Then we define a server
721     port and install two receivers on it, C<join>, which is sent by clients
722     to join the chat, and C<privmsg>, that clients use to send actual chat
723     messages.
724    
725     C<join> is most complicated. It expects the client port and the nickname
726     to be passed in the message, and registers the client in C<%clients>.
727    
728     rcv $server, join => sub {
729     my ($client, $nick) = @_;
730    
731     $clients{$client} = $client;
732    
733     The next step is to monitor the client. The monitoring action removes the
734     client and sends a quit message with the error to all remaining clients.
735    
736     mon $client, sub {
737     delete $clients{$client};
738     msg "$nick (quits, @_)";
739     };
740    
741     And finally, it creates a join message and sends it to all clients.
742    
743     msg "$nick (joins)";
744     };
745    
746     The C<privmsg> callback simply broadcasts the message to all clients:
747    
748     rcv $server, privmsg => sub {
749     my ($nick, $msg) = @_;
750     msg "$nick: $msg";
751     };
752    
753 elmex 1.31 And finally, the server registers itself in the server group, so that
754 root 1.30 clients can find it:
755    
756 root 1.52 db_set eg_chat_server => $server;
757 root 1.30
758     Well, well... and where is this supervisor stuff? Well... we cheated,
759     it's not there. To not overcomplicate the example, we only put it into
760     the..... CLIENT!
761    
762     =head3 The Client, and a Supervisor!
763    
764     Again, here is the client, including supervisor, which makes it a bit
765     longer:
766    
767     use common::sense;
768     use AnyEvent::MP;
769    
770 root 1.49 my $nick = shift || "anonymous";
771 root 1.30
772     configure;
773    
774     my ($client, $server);
775    
776     sub server_connect {
777 root 1.49 my $db_mon;
778     $db_mon = db_mon eg_chat_server => sub {
779     return unless %{ $_[0] };
780     undef $db_mon;
781    
782     print "\rconnecting...\n";
783    
784     $client = port { print "\r \r@_\n> " };
785     mon $client, sub {
786     print "\rdisconnected @_\n";
787     &server_connect;
788     };
789 root 1.30
790 root 1.49 $server = (keys %{ $_[0] })[0];
791 root 1.30
792 root 1.49 snd $server, join => $client, $nick;
793     mon $server, $client;
794 root 1.30 };
795     }
796    
797     server_connect;
798    
799 root 1.34 my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
800 root 1.30 chomp (my $line = <STDIN>);
801     print "> ";
802     snd $server, privmsg => $nick, $line
803     if $server;
804     });
805    
806     $| = 1;
807     print "> ";
808     AnyEvent->condvar->recv;
809    
810     The first thing the client does is to store the nick name (which is
811     expected as the only command line argument) in C<$nick>, for further
812     usage.
813    
814     The next relevant thing is... finally... the supervisor:
815    
816     sub server_connect {
817 root 1.52 my $db_mon;
818     $db_mon = db_mon eg_chat_server => sub {
819     return unless %{ $_[0] };
820     undef $db_mon; # stop monitoring
821 root 1.30
822 root 1.52 This monitors the C<eg_chat_server> database family. It waits until a
823     chat server becomes available. When that happens, it "connects" to it
824     by creating a client port that receives and prints chat messages, and
825     monitoring it:
826 root 1.30
827     $client = port { print "\r \r@_\n> " };
828     mon $client, sub {
829     print "\rdisconnected @_\n";
830     &server_connect;
831     };
832    
833 root 1.52 If the client port dies (for whatever reason), the "supervisor" will start
834     looking for a server again - the semantics of C<db_mon> ensure that it
835     will immediately find it if there is a server port.
836    
837     After this, everything is ready: the client will send a C<join> message
838     with its local port to the server, and start monitoring it:
839    
840     $server = (keys %{ $_[0] })[0];
841 root 1.30
842     snd $server, join => $client, $nick;
843     mon $server, $client;
844     }
845    
846 root 1.52 This second monitor will ensure that, when the server port crashes or goes
847     away (e.g. due to network problems), the client port will be killed as
848     well. This tells the user that the client was disconnected, and will then
849     start to connect the server again.
850 root 1.30
851     The rest of the program deals with the boring details of actually invoking
852     the supervisor function to start the whole client process and handle the
853     actual terminal input, sending it to the server.
854    
855 root 1.52 Now... the "supervisor" in this example is a bit of a cheat - it doesn't
856     really clean up much (because the cleanup done by AnyEvent::MP suffices),
857     and there isn't much of a restarting action either - if the server isn't
858     there because it crashed, well, it isn't there.
859    
860     In the real world, one would often add a timeout that would trigger when
861     the server couldn't be found within some time limit, and then complain,
862     or even try to start a new server. Or the supervisor would have to do
863     some real cleanups, such as rolling back database transactions when the
864     database thread crashes. For this simple chat server, however, this simple
865     supervisor works fine. Hopefully future versions of AnyEvent::MP will
866     offer some predefined supervisors, for now you will have to code it on
867     your own.
868    
869 elmex 1.31 You should now try to start the server and one or more clients in different
870 root 1.30 terminal windows (and the seed node):
871    
872     perl eg/chat_client nick1
873     perl eg/chat_client nick2
874     perl eg/chat_server
875     aemp run profile seed
876    
877     And then you can experiment with chatting, killing one or more clients, or
878     stopping and restarting the server, to see the monitoring in action.
879    
880 root 1.33 The crucial point you should understand from this example is that
881     monitoring is usually symmetric: when you monitor some other port,
882     potentially on another node, that other port usually should monitor you,
883     too, so when the connection dies, both ports get killed, or at least both
884     sides can take corrective action. Exceptions are "servers" that serve
885     multiple clients at once and might only wish to clean up, and supervisors,
886     who of course should not normally get killed (unless they, too, have a
887     supervisor).
888    
889 root 1.52 If you often think in object-oriented terms, then you can think of a port
890     as an object: C<port> is the constructor, the receive callbacks set by
891     C<rcv> act as methods, the C<kil> function becomes the explicit destructor
892     and C<mon> installs a destructor hook. Unlike conventional object oriented
893     programming, it can make sense to exchange port IDs more freely (for
894     example, to monitor one port from another), because it is cheap to send
895     port IDs over the network, and AnyEvent::MP blurs the distinction between
896     local and remote ports.
897    
898     Lastly, there is ample room for improvement in this example: the server
899     should probably remember the nickname in the C<join> handler instead of
900     expecting it in every chat message, it should probably monitor itself, and
901     the client should not try to send any messages unless a server is actually
902     connected.
903 root 1.30
904     =head1 PART 3: TIMTOWTDI: Virtual Connections
905    
906 root 1.34 The chat system developed in the previous sections is very "traditional"
907     in a way: you start some server(s) and some clients statically and they
908     start talking to each other.
909    
910     Sometimes applications work more like "services": They can run on almost
911 root 1.52 any node and even talk to copies of themselves on other nodes in case they
912     are distributed. The L<AnyEvent::MP::Global> service for example monitors
913     nodes joining the network and sometimes even starts itself on other nodes.
914    
915     One good way to design such services is to put them into a module and
916     create "virtual connections" to other nodes. We call this the "bridge
917     head" method, because you start by I<creating a remote port> (the bridge
918 root 1.34 head) and from that you start to bootstrap your application.
919    
920 root 1.52 Since that sounds rather theoretical, let us redesign the chat server and
921 root 1.34 client using this design method.
922    
923 root 1.52 As usual, we start with the full program - here is the server:
924 root 1.34
925     use common::sense;
926     use AnyEvent::MP;
927    
928     configure;
929    
930 root 1.52 db_set eg_chat_server2 => $NODE;
931 root 1.34
932     my %clients;
933    
934     sub msg {
935     print "relaying: $_[0]\n";
936     snd $_, $_[0]
937     for values %clients;
938     }
939    
940     sub client_connect {
941     my ($client, $nick) = @_;
942    
943     mon $client;
944 root 1.52 mon $client, psub {
945 root 1.34 delete $clients{$client};
946     msg "$nick (quits, @_)";
947     };
948    
949     $clients{$client} = $client;
950    
951     msg "$nick (joins)";
952    
953     rcv $SELF, sub { msg "$nick: $_[0]" };
954     }
955    
956     warn "server ready.\n";
957    
958     AnyEvent->condvar->recv;
959    
960 root 1.39 It starts out not much different then the previous example, except that
961 root 1.52 this time, we register the node port in the database and not a port we
962     created - the clients only want to know which node the server should
963     be running on, and there can only be one such server (or service) per
964     node. In fact, the clients could also use some kind of election mechanism,
965     to find the node with lowest node ID, or lowest load, or something like
966     that.
967    
968     The much more interesting difference to the previous server is that
969     indeed no server port is created - the server consists only of code,
970     and "does" nothing by itself. All it "does" is to define a function
971     named C<client_connect>, which expects a client port and a nick name as
972     arguments. It then monitors the client port and binds a receive callback
973     on C<$SELF>, which expects messages that in turn are broadcast to all
974     clients.
975 root 1.34
976     The two C<mon> calls are a bit tricky - the first C<mon> is a shorthand
977     for C<mon $client, $SELF>. The second does the normal "client has gone
978 root 1.52 away" clean-up action.
979 root 1.34
980 root 1.52 The last line, the C<rcv $SELF>, is a good hint that something interesting
981     is going on. And indeed, when looking at the client code, you can see a
982     new function, C<spawn>:
983     #todo#
984 root 1.34
985     use common::sense;
986     use AnyEvent::MP;
987    
988     my $nick = shift;
989    
990     configure;
991    
992     $| = 1;
993    
994     my $port = port;
995    
996     my ($client, $server);
997    
998     sub server_connect {
999 root 1.40 my $servernodes = grp_get "eg_chat_server2"
1000 root 1.34 or return after 1, \&server_connect;
1001    
1002     print "\rconnecting...\n";
1003    
1004     $client = port { print "\r \r@_\n> " };
1005     mon $client, sub {
1006     print "\rdisconnected @_\n";
1007     &server_connect;
1008     };
1009    
1010     $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
1011     mon $server, $client;
1012     }
1013    
1014     server_connect;
1015    
1016     my $w = AnyEvent->io (fh => 0, poll => 'r', cb => sub {
1017     chomp (my $line = <STDIN>);
1018     print "> ";
1019     snd $server, $line
1020     if $server;
1021     });
1022    
1023     print "> ";
1024     AnyEvent->condvar->recv;
1025    
1026     The client is quite similar to the previous one, but instead of contacting
1027 root 1.39 the server I<port> (which no longer exists), it C<spawn>s (creates) a new
1028     the server I<port on node>:
1029 root 1.34
1030     $server = spawn $servernodes->[0], "::client_connect", $client, $nick;
1031     mon $server, $client;
1032    
1033 root 1.39 And of course the first thing after creating it is monitoring it.
1034 root 1.34
1035 root 1.52 Phew, let's go through this in slow motion: the C<spawn> function creates
1036     a new port on a remote node and returns its port ID. After creating
1037     the port it calls a function on the remote node, passing any remaining
1038     arguments to it, and - most importantly - executes the function within
1039     the context of the new port, so it can be manipulated by referring to
1040     C<$SELF>. The init function can reside in a module (actually it normally
1041     I<should> reside in a module) - AnyEvent::MP will automatically load the
1042     module if the function isn't defined.
1043 root 1.39
1044     The C<spawn> function returns immediately, which means you can instantly
1045 root 1.34 send messages to the port, long before the remote node has even heard
1046     of our request to create a port on it. In fact, the remote node might
1047     not even be running. Despite these troubling facts, everything should
1048     work just fine: if the node isn't running (or the init function throws an
1049     exception), then the monitor will trigger because the port doesn't exist.
1050    
1051     If the spawn message gets delivered, but the monitoring message is not
1052 root 1.39 because of network problems (extremely unlikely, but monitoring, after
1053     all, is implemented by passing a message, and messages can get lost), then
1054     this connection loss will eventually trigger the monitoring action. On the
1055     remote node (which in return monitors the client) the port will also be
1056     cleaned up on connection loss. When the remote node comes up again and our
1057     monitoring message can be delivered, it will instantly fail because the
1058     port has been cleaned up in the meantime.
1059 root 1.34
1060     If your head is spinning by now, that's fine - just keep in mind, after
1061 root 1.52 creating a port using C<spawn>, monitor it on the local node, and monitor
1062     "the other side" from the remote node, and all will be cleaned up just
1063     fine.
1064 root 1.34
1065 root 1.36 =head2 Services
1066 root 1.34
1067 root 1.53 Above it was mentioned that C<spawn> automatically loads modules. This can
1068     be exploited in various useful ways.
1069 root 1.36
1070     Assume for a moment you put the server into a file called
1071     F<mymod/chatserver.pm> reachable from the current directory. Then you
1072     could run a node there with:
1073    
1074     aemp run
1075    
1076     The other nodes could C<spawn> the server by using
1077 root 1.53 C<mymod::chatserver::client_connect> as init function - without any other
1078     configuration.
1079 root 1.36
1080 root 1.53 Likewise, when you have some service that starts automatically when loaded
1081     (similar to AnyEvent::MP::Global), then you can configure this service
1082     statically:
1083 root 1.36
1084     aemp profile mysrvnode services mymod::service::
1085     aemp run profile mysrvnode
1086    
1087 root 1.39 And the module will automatically be loaded in the node, as specifying a
1088 root 1.38 module name (with C<::>-suffix) will simply load the module, which is then
1089     free to do whatever it wants.
1090 root 1.36
1091     Of course, you can also do it in the much more standard way by writing
1092     a module (e.g. C<BK::Backend::IRC>), installing it as part of a module
1093 root 1.53 distribution and then configure nodes. For example, if I wanted to run the
1094 root 1.36 Bummskraut IRC backend on a machine named "ruth", I could do this:
1095    
1096     aemp profile ruth addservice BK::Backend::IRC::
1097    
1098 root 1.43 And any F<aemp run> on that host will automatically have the Bummskraut
1099     IRC backend running.
1100 root 1.36
1101 root 1.53 There are plenty of possibilities you can use - it's all up to you how you
1102 root 1.36 structure your application.
1103 elmex 1.7
1104 root 1.42 =head1 PART 4: Coro::MP - selective receive
1105    
1106     Not all problems lend themselves naturally to an event-based solution:
1107     sometimes things are easier if you can decide in what order you want to
1108 root 1.53 receive messages, regardless of the order in which they were sent.
1109 root 1.42
1110     In these cases, L<Coro::MP> can provide a nice solution: instead of
1111 root 1.53 registering callbacks for each message type, C<Coro::MP> attaches a
1112 root 1.42 (coro-) thread to a port. The thread can then opt to selectively receive
1113     messages it is interested in. Other messages are not lost, but queued, and
1114     can be received at a later time.
1115    
1116 root 1.43 The C<Coro::MP> module is not part of L<AnyEvent::MP>, but a separate
1117 root 1.42 module. It is, however, tightly integrated into C<AnyEvent::MP> - the
1118     ports it creates are fully compatible to C<AnyEvent::MP> ports.
1119    
1120     In fact, C<Coro::MP> is more of an extension than a separate module: all
1121     functions exported by C<AnyEvent::MP> are exported by it as well.
1122    
1123     To illustrate how programing with C<Coro::MP> looks like, consider the
1124     following (slightly contrived) example: Let's implement a server that
1125     accepts a C<< (write_file =>, $port, $path) >> message with a (source)
1126     port and a filename, followed by as many C<< (data => $port, $data) >>
1127     messages as required to fill the file, followed by an empty C<< (data =>
1128     $port) >> message.
1129    
1130     The server only writes a single file at a time, other requests will stay
1131     in the queue until the current file has been finished.
1132    
1133     Here is an example implementation that uses L<Coro::AIO> and largely
1134     ignores error handling:
1135    
1136     my $ioserver = port_async {
1137     while () {
1138     my ($tag, $port, $path) = get_cond;
1139    
1140     $tag eq "write_file"
1141     or die "only write_file messages expected";
1142    
1143     my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1144     or die "$path: $!";
1145    
1146     while () {
1147     my (undef, undef, $data) = get_cond {
1148     $_[0] eq "data" && $_[1] eq $port
1149     } 5
1150     or die "timeout waiting for data message from $port\n";
1151    
1152     length $data or last;
1153    
1154     aio_write $fh, undef, undef, $data, 0;
1155     };
1156     }
1157     };
1158    
1159     mon $ioserver, sub {
1160     warn "ioserver was killed: @_\n";
1161     };
1162    
1163 root 1.53 Let's go through it, section by section.
1164 root 1.42
1165     my $ioserver = port_async {
1166    
1167 root 1.43 Ports can be created by attaching a thread to an existing port via
1168 root 1.53 C<rcv_async>, or as in this example, by calling C<port_async> with the
1169     code to execute as a thread. The C<async> component comes from the fact
1170     that threads are created using the C<Coro::async> function.
1171 root 1.42
1172     The thread runs in a normal port context (so C<$SELF> is set). In
1173     addition, when the thread returns, it will be C<kil> I<normally>, i.e.
1174     without a reason argument.
1175    
1176     while () {
1177     my ($tag, $port, $path) = get_cond;
1178     or die "only write_file messages expected";
1179    
1180 root 1.53 The thread is supposed to serve many file writes, which is why it
1181     executes in a loop. The first thing it does is fetch the next message,
1182     using C<get_cond>, the "conditional message get". Without arguments, it
1183     merely fetches the I<next> message from the queue, which I<must> be a
1184     C<write_file> message.
1185 root 1.42
1186     The message contains the C<$path> to the file, which is then created:
1187    
1188     my $fh = aio_open $path, O_WRONLY|O_CREAT, 0666
1189     or die "$path: $!";
1190    
1191     Then we enter a loop again, to serve as many C<data> messages as
1192 root 1.43 necessary:
1193 root 1.42
1194     while () {
1195     my (undef, undef, $data) = get_cond {
1196     $_[0] eq "data" && $_[1] eq $port
1197     } 5
1198     or die "timeout waiting for data message from $port\n";
1199    
1200     This time, the condition is not empty, but instead a code block: similarly
1201     to grep, the code block will be called with C<@_> set to each message in
1202     the queue, and it has to return whether it wants to receive the message or
1203     not.
1204    
1205     In this case we are interested in C<data> messages (C<< $_[0] eq "data"
1206     >>), whose first element is the source port (C<< $_[1] eq $port >>).
1207    
1208     The condition must be this strict, as it is possible to receive both
1209     C<write_file> messages and C<data> messages from other ports while we
1210     handle the file writing.
1211    
1212 root 1.53 The lone C<5> argument at the end is a timeout - when no matching message
1213     is received within C<5> seconds, we assume an error and C<die>.
1214 root 1.42
1215     When an empty C<data> message is received we are done and can close the
1216     file (which is done automatically as C<$fh> goes out of scope):
1217    
1218     length $data or last;
1219    
1220     Otherwise we need to write the data:
1221    
1222     aio_write $fh, undef, undef, $data, 0;
1223    
1224 root 1.53 And that's basically it. Note that every port thread should have some
1225     kind of supervisor. In our case, the supervisor simply prints any error
1226     message:
1227 root 1.42
1228     mon $ioserver, sub {
1229     warn "ioserver was killed: @_\n";
1230     };
1231    
1232     Here is a usage example:
1233    
1234     port_async {
1235     snd $ioserver, write_file => $SELF, "/tmp/unsafe";
1236     snd $ioserver, data => $SELF, "abc\n";
1237     snd $ioserver, data => $SELF, "def\n";
1238     snd $ioserver, data => $SELF;
1239     };
1240    
1241     The messages are sent without any flow control or acknowledgement (feel
1242     free to improve). Also, the source port does not actually need to be a
1243     port - any unique ID will do - but port identifiers happen to be a simple
1244     source of network-wide unique IDs.
1245    
1246     Apart from C<get_cond> as seen above, there are other ways to receive
1247     messages. The C<write_file> message above could also selectively be
1248     received using a C<get> call:
1249    
1250     my ($port, $path) = get "write_file";
1251    
1252     This is simpler, but when some other code part sends an unexpected message
1253     to the C<$ioserver> it will stay in the queue forever. As a rule of thumb,
1254     every threaded port should have a "fetch next message unconditionally"
1255     somewhere, to avoid filling up the queue.
1256    
1257 root 1.53 Finally, it is also possible to use more switch-like C<get_conds>:
1258 root 1.42
1259     get_cond {
1260     $_[0] eq "msg1" and return sub {
1261     my (undef, @msg1_data) = @_;
1262     ...;
1263     };
1264    
1265     $_[0] eq "msg2" and return sub {
1266     my (undef, @msg2_data) = @_;
1267     ...;
1268     };
1269    
1270     die "unexpected message $_[0] received";
1271     };
1272    
1273 root 1.37 =head1 THE END
1274    
1275     This is the end of this introduction, but hopefully not the end of
1276 root 1.43 your career as AEMP user. I hope the tutorial was enough to make the
1277 root 1.37 basic concepts clear. Keep in mind that distributed programming is not
1278 root 1.53 completely trivial, in fact, it's pretty complicated. We hope AEMP makes
1279     it simpler and will be useful to create exciting new applications.
1280 root 1.37
1281 elmex 1.1 =head1 SEE ALSO
1282    
1283     L<AnyEvent::MP>
1284    
1285 elmex 1.20 L<AnyEvent::MP::Global>
1286    
1287 root 1.42 L<Coro::MP>
1288    
1289 root 1.34 L<AnyEvent>
1290    
1291 elmex 1.1 =head1 AUTHOR
1292    
1293     Robin Redeker <elmex@ta-sa.org>
1294 root 1.32 Marc Lehmann <schmorp@schmorp.de>
1295 root 1.4