=head1 Message Passing for the Non-Blocked Mind =head1 Introduction and Terminology This is a tutorial about how to get the swing of the new L module, which allows us to transparently pass messages to our own process and to other processes on another or the same host. What kind of messages? Well, basically a message here means a list of Perl strings, numbers, hashes and arrays, anything that can be expressed as a L text (as JSON is used by default in the protocol). It's custom in L to have a string which describes the type of the message as first element (this is called a I in L), as some API functions (C) support matching it directly. So supposedly you want to send a ping message with your current time to something, this is how such a message might look like (in Perl syntax): ['ping', 1251381636] And next you might ask: between which entities are those messages being I? They are I between I. I are just sources and destinations for messages. How do these ports relate to things you know? Well, each I belongs to a I, and a I is just the UNIX process that runs your L application. Each I is distinguished from other I running on the same host or multiple hosts in a network by it's I. A I can be manually assigned or L will assign one it self for you. So, you might want to visualize it like this (setup is two nodes (more are of course possible): Node C (in UNIX process 7066) with ports C and C and C (in UNIX process 8321) with ports C and C). |- PID: 7066 -| |- PID: 8321 -| | | | | | Node ID: A | | Node ID: B | | | | | | Port ABC =|= <----\ /-----> =|= Port FOO | | | X | | | Port DEF =|= <----/ \-----> =|= Port BAR | | | | | |-------------| |-------------| The strings for the ports here are just for illustrative purposes. Even if in reality I in L are also identified by strings they can't be choosen manually and are assigned randomly. These I should also not be used directly for other purposes than referring to an endpoint for messages. The next sections will explain the API of L. First the API is laid out by simple examples. Later some more complex idioms are introduced, which are maybe useful to solve some real world purposes. # In this tutorial I'll show you how to write a simple chat server based on # L. This example is used because it nicely shows how to organise a # simple application, but keep in mind that every node trusts any other, so this # chat cannot be used to implement a real public chat server and client system, # but it can be used to implement a distributed chat server for example. =head1 Passing Your First Message As start lets have a look at the messaging API. The next example is just a demo to show the basic elements of message passing with L. It shout just print: "Ending with: 123". So here the code: use AnyEvent; use AnyEvent::MP; my $end_cv = AnyEvent->condvar; my $port = port; rcv $port, test => sub { my ($data) = @_; $end_cv->send ($data); }; snd $port, test => 123; print "Ending with: " . $end_cv->recv . "\n"; It already contains most functions of the essential L API. First there is the C function which will create a I and will return it's I. That I can be used to send and receive messages. That I is a simple string and can be safely passed to other I in the network to refer to that specific port (usually used for RPC, where you need to tell the other end which I to send the reply to). Next function is C: rcv $port, test => sub { ... }; It sets up a receiver callback on a specific I which needs to be specified as the first argument. The next argument, in this example C, is a I match. This means that whenever a message, with the first element being the string C, is received the callback is called with the remaining parts of that message. Messages can be send with the C function, which looks like this in the example above: snd $port, test => 123; This will send the message C<['test', 123]> to the I with the I in C<$port>. The receiver got a I match on C and will call the callback with the first argument being the number C<123>. That callback then just passes that number on to the I C<$end_cv> which will then pass the value to the print. But I are out of the scope of this tutorial. So please consult the L about them. But passing messages inside one process is boring, but before we can continue and take the next step to interprocess message passing we first have to make sure some things have been setup. =head1 System Requirements and System Setup Before we can start with real IPC we have to make sure some things work on your system. First we have to setup a I: for two L I to be able to communicate with each other and authenticate each other it is necessary to setup the same I for both of them (or use TLS certificates). The easiest way is to set this up is to use the F utility: aemp gensecret This creates a F<$HOME/.perl-anyevent-mp> config file and generates a random shared secret. You can copy this file to any other system and then communicate over the network (via TCP) with it. You can also select your own shared secret (F) and for increased security requirements you can even create a TLS certificate (F), causing connections to not just be authenticated, but also to be encrypted. Connections will only be successful when the I that want to connect to each other have the same I (or successfully verify the TLS certificate of the other side). B is the same on all hosts/user accounts that you try to connect with each other!> Thats all for now, there is more fiddling around with the C utility later. =head1 Passing Messages Between Processes =head2 The Receiver Lets split the previous example up into two small programs. First the receiver application: #!/opt/perl/bin/perl use AnyEvent; use AnyEvent::MP; use AnyEvent::MP::Global; initialise_node "eg_simple_receiver"; my $port = port; AnyEvent::MP::Global::register $port, "eg_receivers"; rcv $port, test => sub { my ($data, $reply_port) = @_; print "Received data: " . $data . "\n"; }; AnyEvent->condvar->recv; =head3 AnyEvent::MP::Global Now, that wasn't too bad, was it? Ok, lets step through the new functions and modules that have been used. For starters there is now an additional module loaded: L. That module provides us with a I, which lets us share data among all I in a network. Why do we need it you might ask? The I are just random strings, assigned by L. We can't know those I in advance, so we don't know which I to send messages to if the message is to be passed between I (or UNIX processes). To find the right I of another I in the network we will need to communicate that somehow to the sender. And exactly that is what L provides. =head3 initialise_node And The Network Now, lets have a look at the next new thing, the C: initialise_node "eg_simple_receiver"; Before we are able to send messages to other nodes we have to initialise ourself. The first argument, the string C<"eg_simple_receiver">, is called the I of this node. A profile holds some information about the application that is going to be a node in an L network. Most importantly the profile allows you to set the I that your application will use. You can also set I in the profile, meaning that you can define TCP ports that the application will listen on for incoming connections from other nodes of the network. Next you can configure I in profile. A I is just a TCP endpoint which tells the application where to find other nodes of it's network. To explain this a bit more detailed we have to look at the topology of an L network. The topology is called a I, here an example with 4 nodes: N1--N2 | \/ | | /\ | N3--N4 Now imagine another I C. wants to connect itself to that network: N1--N2 | \/ | N5 | /\ | N3--N4 The new node needs to know the I of all of those 4 already connected nodes. And exactly this is what the I are for. Now lets assume that the new node C has as I the TCP endpoint of the node C. It then connects to C: N1--N2____ | \/ | N5 | /\ | N3--N4 C then tells C the I of the other nodes it is connected to, and C builds up the rest of the connections: /--------\ N1--N2____| | \/ | N5 | /\ | /| N3--N4--- | \________/ Finished. C is now happily connected to the rest of the network. =head1 The Chat Client OK, lets start by implementing the "frontend" of the client. We will develop the client first and postpone the server for later, as the most complex things actually happen in the client. We will use L to do non-blocking IO read on standard input (all of this code deals with actually handling user input, no message passing yet): #!perl use AnyEvent; use AnyEvent::Handle; sub send_message { die "This is where we will send the messages to the server" . "in the next step of this tutorial.\n" } # make an AnyEvent condition variable for the 'quit' condition # (when we want to exit the client). my $quit_cv = AnyEvent->condvar; my $stdin_hdl = AnyEvent::Handle->new ( fh => *STDIN, on_error => sub { $quit_cv->send }, on_read => sub { my ($hdl) = @_; $hdl->push_read (line => sub { my ($hdl, $line) = @_; if ($line =~ /^\/quit/) { # /quit will end the client $quit_cv->send; } else { send_message ($line); } }); } ); $quit_cv->recv; This is now a very basic client. Explaining explicitly what L does or what a I is all about is out of scope of this document, please consult L or the manual pages for L and L. =head1 First Steps Into Messaging To supply the C function we now take a look at L. This is an example of how it might look like: ... # the use lines from the above snippet use AnyEvent::MP; sub send_message { my ($msg) = @_; snd $server_port, message => $msg; } ... # the rest of the above script The C function is exported by L, it stands for 'send a message'. The first argument is the I (a I is something that can receive messages, represented by a printable string) of the server which will receive the message. How we get this port will be explained in the next step. The remaining arguments of C are C and C<$msg>, the first two elements of the I (a I in L is a simple list of values, which can be sent to a I). So all the function does is send the two values C (a constant string to tell the server what to expect) and the actual message string. Thats all fine and simple so far, but where do we get the C<$server_port>? Well, we need to get the unique I of the server's port where it wants to receive all the incoming chat messages. A I is unfortunately a very unique string, which we are unable to know in advance. But L supports the concept of 'registered ports', which is basically a port on the server side registered under a well known name. For example, the server has a port for receiving chat messages with a unique I and registers it under the name C. BTW, these "registered port names" should follow similar rules as Perl identifiers, so you should prefix them with your package/module name to make them unique, unless you use them in the main program. As I can only be sent to a I and not just to some name we have to ask the server node for the I of the port registered as C. =head1 Finding The Chatter Port Ok, lots of talk, now some code. Now we will actually get the C<$server_port> from the backend: ... use AnyEvent::MP; my $server_node = "127.0.0.1:1299"; my $client_port = port; snd $server_node, lookup => "chatter", $client_port, "resolved"; my $resolved_cv = AnyEvent->condvar; my $server_port; # setup a receiver callback for the 'resolved' message: rcv $client_port, resolved => sub { my ($tag, $chatter_port_id) = @_; print "Resolved the server port 'chatter' to $chatter_port_id\n"; $server_port = $chatter_port_id; $resolved_cv->send; 1 }; # lets block the client until we have resolved the server port. $resolved_cv->recv; # now setup another receiver callback for the chat messages: rcv $client_port, message => sub { my ($tag, $msg) = @_; print "chat> $msg\n"; 0 }; # send a 'join' message to the server: snd $server_port, join => "$client_port"; sub send_message { ... Now that was a lot of new stuff: First we define the C<$server_node>: In order to refer to another node we need some kind of string to reference it - the node reference. The I is basically a comma separated list of C pairs. We assume in this tutorial that the server runs on C<127.0.0.1> (localhost) on port 1299, which results in the noderef C<127.0.0.1:1299>. Next, in order to receive a reply from the other node or the server we need to have a I that messages can be sent to. This is what the C function will do for us, it just creates a new local port and returns it's I that can then be used to receive messages. When you look carefully, you will see that the first C uses the C<$server_node> (a noderef) as destination port. Well, what I didn't tell you yet is that each I has a default I to receive messages. The ID of this port is the same as the noderef. This I provides some special services for us, for example resolving a registered name to a I (a-ha! finally!). This is exactly what this line does: snd $server_node, lookup => "chatter", $client_port, "resolved"; This sends a message with first element being C, followed by the (hopefully) registered port name that we want to resolve to a I: C. And in order for the server node to be able to send us back the resolved I we have to tell it where to send it: The result message will be sent to C<$client_port> (the I of the port we just created), and will have the string C as the first element. When the node receives this message, it will look up the name, gobble up all the extra arguments we passed, append the resolved name, and send the resulting list as a message. Next we register a receiver for this C-request. rcv $client_port, resolved => sub { my ($tag, $chatter_port_id) = @_; ... 1 }; This sets up a receiver on our own port for messages with the first element being the string C. Receivers can match the contents of the messages before actually executing the specified callback. B that the every C callback has to return either a true or a false value, indicating whether it is B/B (true) or still wants to B (false) receiving messages. In this case we tell the C<$client_port> to look into all the messages it receives and look for the string C in the first element of the message. If it is found, the given callback will be called with the message elements as arguments. Using a string as the first element of the message is called I the message. It's common practise to code the 'type' of a message into it's first element, as this allows for simple matching. The result message will contain the I of the well known port C as second element, which will be stored in C<$chatter_port_id>. This port ID will then be stored in C<$server_port>, followed by calling C on $resolved_cv> so the program will continue. The callback then returns a C<1> (a true value), to indicate that it has done it's job and doesn't want to receive further C messages. After this the chat message receiver callback is registered with the port: rcv $client_port, message => sub { my ($tag, $msg) = @_; print "chat> $msg\n"; 0 }; We assume that all messages that are broadcast to the clients by the server contain the string tag C as first element, and the actual message as second element. The callback returns a false value this time, to indicate that it is not yet done and wants to receive further messages. The last thing to do is to tell the server to send us new chat messages from other clients. We do so by sending the message C followed by our own I. # send the server a 'join' message: snd $server_port, join => $client_port; This way the server knows where to send all the new messages to. =head1 The Completed Client This is the complete client script: #!perl use AnyEvent; use AnyEvent::Handle; use AnyEvent::MP; my $server_node = "127.0.0.1:1299"; my $client_port = port; snd $server_node, lookup => "chatter", $client_port, "resolved"; my $resolved_cv = AnyEvent->condvar; my $server_port; # setup a receiver callback for the 'resolved' message: rcv $client_port, resolved => sub { my ($tag, $chatter_port_id) = @_; print "Resolved the server port 'chatter' to $chatter_port_id\n"; $server_port = $chatter_port_id; $resolved_cv->send; 1 }; # lets block the client until we have resolved the server port. $resolved_cv->recv; # now setup another receiver callback for the chat messages: rcv $client_port, message => sub { my ($tag, $msg) = @_; print "chat> $msg\n"; 0 }; # send a 'join' message to the server: snd $server_port, join => "$client_port"; sub send_message { my ($msg) = @_; snd $server_port, message => $msg; } # make an AnyEvent condition variable for the 'quit' condition # (when we want to exit the client). my $quit_cv = AnyEvent->condvar; my $stdin_hdl = AnyEvent::Handle->new ( fh => *STDIN, on_error => sub { $quit_cv->send }, on_read => sub { my ($hdl) = @_; $hdl->push_read (line => sub { my ($hdl, $line) = @_; if ($line =~ /^\/quit/) { # /quit will end the client $quit_cv->send; } else { send_message ($line); } }); } ); $quit_cv->recv; =head1 The Server Ok, we finally come to the server. The server of course also needs to set up a port, and in addition needs to I it, so the clients can find it. Again, let's jump directly into the code: #!perl use AnyEvent; use AnyEvent::MP; become_public "127.0.0.1:1299"; my $chatter_port = port; reg $chatter_port, "chatter"; my %client_ports; rcv $chatter_port, join => sub { my ($tag, $client_port) = @_; print "got new client port: $client_port\n"; $client_ports{$client_port} = 1; 0 }, message => sub { my ($tag, $msg) = @_; print "message> $msg\n"; snd $_, message => $msg for keys %client_ports; 0 }; AnyEvent->condvar->recv; That is all. Looks much simpler than the client, doesn't it? Let's quickly look over it, as C has already been discussed in the client part of this tutorial above. First this: become_public "127.0.0.1:1299"; This will tell our I to become a I node, which means that it can be contacted via TCP. The first argument should be the I the server wants to be reachable at. In this case it's the TCP port 1299 on C<127.0.0.1>. Next we set up two receivers, one for the C messages and another one for the actual messages of type C. This is done with a single call to C, which allows multiple C<< match => $callback >> pairs. In the C callback we receive the client port, which is simply remembered in the C<%client_ports> hash. In the C callback we just iterate through all known C<%client_ports> and relay the message to them. That concludes the server. =head1 The Remaining Problems The implementation as shown still has some bugs. For instance: How does the server know that the client isn't there anymore, so it can clean up the C<%client_ports> hash? Also, the chat messages have no originator, so we don't know who actually sent the message (which would be quite useful for human-to-human interaction: to know who the other one is :). But aside from these issues I hope this tutorial showed you the basics of L and explained some common idioms. How to solve the reliability and C<%client_ports> cleanup problem will be explained later in this tutorial (TODO). =head1 Inside The Protocol Now, for the interested parties, let me explain some details about the protocol that L nodes use to communicate to each other. If you are not interested you can skip this section. Usually TCP is used for communication. Each I, if configured to be a I node with the C function will listen on the configured TCP port (default is 4040). If then one I wants to send a message to another I it will connect to the host and port given in the I. Then some handshaking occurs to check whether both I know the I. Optionally, TLS can be enabled (about how to do this exactly please consult the L man page, just a hint: It should be enough to put the private key and (self signed) certificate in the C<~/.aemp-secret> file of all nodes). After the handshake, messages will be exchanged using a serialiser (usually L is used for this, but it is also possible to use other serialization formats such as L). =head1 SEE ALSO L L L =head1 AUTHOR Robin Redeker