… | |
… | |
185 | module loaded: L<AnyEvent::MP::Global>. |
185 | module loaded: L<AnyEvent::MP::Global>. |
186 | |
186 | |
187 | That module provides us with a I<global registry>, which lets us share data |
187 | That module provides us with a I<global registry>, which lets us share data |
188 | among all I<nodes> in a network. Why do we need it you might ask? |
188 | among all I<nodes> in a network. Why do we need it you might ask? |
189 | |
189 | |
190 | The I<port ids> are just random strings, assigned by L<AnyEvent::MP>. We can't |
190 | The thing is, that the I<port ids> are just random strings, assigned by |
191 | know those I<port ids> in advance, so we don't know which I<port id> to send |
191 | L<AnyEvent::MP>. We can't know those I<port ids> in advance, so we don't know |
192 | messages to if the message is to be passed between I<nodes> (or UNIX |
192 | which I<port id> to send messages to if the message is to be passed between |
193 | processes). To find the right I<port> of another I<node> in the network we will |
193 | I<nodes> (or UNIX processes). To find the right I<port> of another I<node> in |
194 | need to communicate that somehow to the sender. And exactly that is what |
194 | the network we will need to communicate that somehow to the sender. And |
195 | L<AnyEvent::MP::Global> provides. |
195 | exactly that is what L<AnyEvent::MP::Global> provides. |
196 | |
196 | |
197 | =head3 initialise_node And The Network |
197 | =head3 initialise_node And The Network |
198 | |
198 | |
199 | Now, lets have a look at the next new thing, the C<initialise_node>: |
199 | Now, lets have a look at the next new thing, the C<initialise_node>: |
200 | |
200 | |
… | |
… | |
301 | like we have discussed in the first example. We again match for the I<tag> |
301 | like we have discussed in the first example. We again match for the I<tag> |
302 | C<test>. The difference is just that we don't end the application after |
302 | C<test>. The difference is just that we don't end the application after |
303 | receiving the first message. We just infinitely continue to look out for new |
303 | receiving the first message. We just infinitely continue to look out for new |
304 | messages. |
304 | messages. |
305 | |
305 | |
306 | =head1 The Chat Client |
|
|
307 | |
|
|
308 | OK, lets start by implementing the "frontend" of the client. We will |
|
|
309 | develop the client first and postpone the server for later, as the most |
|
|
310 | complex things actually happen in the client. |
|
|
311 | |
|
|
312 | We will use L<AnyEvent::Handle> to do non-blocking IO read on standard |
|
|
313 | input (all of this code deals with actually handling user input, no |
|
|
314 | message passing yet): |
|
|
315 | |
|
|
316 | #!perl |
|
|
317 | |
|
|
318 | use AnyEvent; |
|
|
319 | use AnyEvent::Handle; |
|
|
320 | |
|
|
321 | sub send_message { |
|
|
322 | die "This is where we will send the messages to the server" |
|
|
323 | . "in the next step of this tutorial.\n" |
|
|
324 | } |
|
|
325 | |
|
|
326 | # make an AnyEvent condition variable for the 'quit' condition |
|
|
327 | # (when we want to exit the client). |
|
|
328 | my $quit_cv = AnyEvent->condvar; |
|
|
329 | |
|
|
330 | my $stdin_hdl = AnyEvent::Handle->new ( |
|
|
331 | fh => *STDIN, |
|
|
332 | on_error => sub { $quit_cv->send }, |
|
|
333 | on_read => sub { |
|
|
334 | my ($hdl) = @_; |
|
|
335 | |
|
|
336 | $hdl->push_read (line => sub { |
|
|
337 | my ($hdl, $line) = @_; |
|
|
338 | |
|
|
339 | if ($line =~ /^\/quit/) { # /quit will end the client |
|
|
340 | $quit_cv->send; |
|
|
341 | } else { |
|
|
342 | send_message ($line); |
|
|
343 | } |
|
|
344 | }); |
|
|
345 | } |
|
|
346 | ); |
|
|
347 | |
|
|
348 | $quit_cv->recv; |
|
|
349 | |
|
|
350 | This is now a very basic client. Explaining explicitly what |
|
|
351 | L<AnyEvent::Handle> does or what a I<condvar> is all about is out of scope |
|
|
352 | of this document, please consult L<AnyEvent::Intro> or the manual pages |
|
|
353 | for L<AnyEvent> and L<AnyEvent::Handle>. |
|
|
354 | |
|
|
355 | =head1 First Steps Into Messaging |
|
|
356 | |
|
|
357 | To supply the C<send_message> function we now take a look at |
|
|
358 | L<AnyEvent::MP>. This is an example of how it might look like: |
|
|
359 | |
|
|
360 | ... # the use lines from the above snippet |
|
|
361 | |
|
|
362 | use AnyEvent::MP; |
|
|
363 | |
|
|
364 | sub send_message { |
|
|
365 | my ($msg) = @_; |
|
|
366 | |
|
|
367 | snd $server_port, message => $msg; |
|
|
368 | } |
|
|
369 | |
|
|
370 | ... # the rest of the above script |
|
|
371 | |
|
|
372 | The C<snd> function is exported by L<AnyEvent::MP>, it stands for 'send |
|
|
373 | a message'. The first argument is the I<port> (a I<port> is something |
|
|
374 | that can receive messages, represented by a printable string) of the |
|
|
375 | server which will receive the message. How we get this port will be |
|
|
376 | explained in the next step. |
|
|
377 | |
|
|
378 | The remaining arguments of C<snd> are C<message> and C<$msg>, the first |
|
|
379 | two elements of the I<message> (a I<message> in L<AnyEvent::MP> is a |
|
|
380 | simple list of values, which can be sent to a I<port>). |
|
|
381 | |
|
|
382 | So all the function does is send the two values C<message> (a constant |
|
|
383 | string to tell the server what to expect) and the actual message string. |
|
|
384 | |
|
|
385 | Thats all fine and simple so far, but where do we get the |
|
|
386 | C<$server_port>? Well, we need to get the unique I<port id> of the |
|
|
387 | server's port where it wants to receive all the incoming chat messages. A |
|
|
388 | I<port id> is unfortunately a very unique string, which we are unable to |
|
|
389 | know in advance. But L<AnyEvent::MP> supports the concept of 'registered |
|
|
390 | ports', which is basically a port on the server side registered under |
|
|
391 | a well known name. |
|
|
392 | |
|
|
393 | For example, the server has a port for receiving chat messages with a |
|
|
394 | unique I<port id> and registers it under the name C<chatter>. |
|
|
395 | |
|
|
396 | BTW, these "registered port names" should follow similar rules as Perl |
|
|
397 | identifiers, so you should prefix them with your package/module name to |
|
|
398 | make them unique, unless you use them in the main program. |
|
|
399 | |
|
|
400 | As I<messages> can only be sent to a I<port id> and not just to some name |
|
|
401 | we have to ask the server node for the I<port id> of the port registered |
|
|
402 | as C<chatter>. |
|
|
403 | |
|
|
404 | =head1 Finding The Chatter Port |
|
|
405 | |
|
|
406 | Ok, lots of talk, now some code. Now we will actually get the |
|
|
407 | C<$server_port> from the backend: |
|
|
408 | |
|
|
409 | ... |
|
|
410 | |
|
|
411 | use AnyEvent::MP; |
|
|
412 | |
|
|
413 | my $server_node = "127.0.0.1:1299"; |
|
|
414 | |
|
|
415 | my $client_port = port; |
|
|
416 | |
|
|
417 | snd $server_node, lookup => "chatter", $client_port, "resolved"; |
|
|
418 | |
|
|
419 | my $resolved_cv = AnyEvent->condvar; |
|
|
420 | my $server_port; |
|
|
421 | |
|
|
422 | # setup a receiver callback for the 'resolved' message: |
|
|
423 | rcv $client_port, resolved => sub { |
|
|
424 | my ($tag, $chatter_port_id) = @_; |
|
|
425 | |
|
|
426 | print "Resolved the server port 'chatter' to $chatter_port_id\n"; |
|
|
427 | $server_port = $chatter_port_id; |
|
|
428 | |
|
|
429 | $resolved_cv->send; |
|
|
430 | 1 |
|
|
431 | }; |
|
|
432 | |
|
|
433 | # lets block the client until we have resolved the server port. |
|
|
434 | $resolved_cv->recv; |
|
|
435 | |
|
|
436 | # now setup another receiver callback for the chat messages: |
|
|
437 | rcv $client_port, message => sub { |
|
|
438 | my ($tag, $msg) = @_; |
|
|
439 | |
|
|
440 | print "chat> $msg\n"; |
|
|
441 | 0 |
|
|
442 | }; |
|
|
443 | |
|
|
444 | # send a 'join' message to the server: |
|
|
445 | snd $server_port, join => "$client_port"; |
|
|
446 | |
|
|
447 | sub send_message { ... |
|
|
448 | |
|
|
449 | Now that was a lot of new stuff: |
|
|
450 | |
|
|
451 | First we define the C<$server_node>: In order to refer to another node |
|
|
452 | we need some kind of string to reference it - the node reference. The |
|
|
453 | I<noderef> is basically a comma separated list of C<address:port> |
|
|
454 | pairs. We assume in this tutorial that the server runs on C<127.0.0.1> |
|
|
455 | (localhost) on port 1299, which results in the noderef C<127.0.0.1:1299>. |
|
|
456 | |
|
|
457 | Next, in order to receive a reply from the other node or the server we |
|
|
458 | need to have a I<port> that messages can be sent to. This is what the |
|
|
459 | C<port> function will do for us, it just creates a new local port and |
|
|
460 | returns it's I<port ID> that can then be used to receive messages. |
|
|
461 | |
|
|
462 | When you look carefully, you will see that the first C<snd> uses the |
|
|
463 | C<$server_node> (a noderef) as destination port. Well, what I didn't |
|
|
464 | tell you yet is that each I<node> has a default I<port> to receive |
|
|
465 | messages. The ID of this port is the same as the noderef. |
|
|
466 | |
|
|
467 | This I<default port> provides some special services for us, for example |
|
|
468 | resolving a registered name to a I<port id> (a-ha! finally!). |
|
|
469 | |
|
|
470 | This is exactly what this line does: |
|
|
471 | |
|
|
472 | snd $server_node, lookup => "chatter", $client_port, "resolved"; |
|
|
473 | |
|
|
474 | This sends a message with first element being C<lookup>, followed by the |
|
|
475 | (hopefully) registered port name that we want to resolve to a I<port |
|
|
476 | id>: C<chatter>. And in order for the server node to be able to send us |
|
|
477 | back the resolved I<port ID> we have to tell it where to send it: The |
|
|
478 | result message will be sent to C<$client_port> (the I<port id> of the |
|
|
479 | port we just created), and will have the string C<resolved> as the first |
|
|
480 | element. |
|
|
481 | |
|
|
482 | When the node receives this message, it will look up the name, gobble up |
|
|
483 | all the extra arguments we passed, append the resolved name, and send the |
|
|
484 | resulting list as a message. |
|
|
485 | |
|
|
486 | Next we register a receiver for this C<lookup>-request. |
|
|
487 | |
|
|
488 | rcv $client_port, resolved => sub { |
|
|
489 | my ($tag, $chatter_port_id) = @_; |
|
|
490 | ... |
|
|
491 | 1 |
|
|
492 | }; |
|
|
493 | |
|
|
494 | This sets up a receiver on our own port for messages with the first |
|
|
495 | element being the string C<resolved>. Receivers can match the contents of |
|
|
496 | the messages before actually executing the specified callback. |
|
|
497 | |
|
|
498 | B<Please note> that the every C<rcv> callback has to return either a true |
|
|
499 | or a false value, indicating whether it is B<successful>/B<done> (true) or |
|
|
500 | still wants to B<continue> (false) receiving messages. |
|
|
501 | |
|
|
502 | In this case we tell the C<$client_port> to look into all the messages |
|
|
503 | it receives and look for the string C<resolved> in the first element of |
|
|
504 | the message. If it is found, the given callback will be called with the |
|
|
505 | message elements as arguments. |
|
|
506 | |
|
|
507 | Using a string as the first element of the message is called I<tagging> |
|
|
508 | the message. It's common practise to code the 'type' of a message into |
|
|
509 | it's first element, as this allows for simple matching. |
|
|
510 | |
|
|
511 | The result message will contain the I<port ID> of the well known port |
|
|
512 | C<chatter> as second element, which will be stored in C<$chatter_port_id>. |
|
|
513 | |
|
|
514 | This port ID will then be stored in C<$server_port>, followed by calling |
|
|
515 | C<send> on $resolved_cv> so the program will continue. |
|
|
516 | |
|
|
517 | The callback then returns a C<1> (a true value), to indicate that it has |
|
|
518 | done it's job and doesn't want to receive further C<resolved> messages. |
|
|
519 | |
|
|
520 | After this the chat message receiver callback is registered with the port: |
|
|
521 | |
|
|
522 | rcv $client_port, message => sub { |
|
|
523 | my ($tag, $msg) = @_; |
|
|
524 | |
|
|
525 | print "chat> $msg\n"; |
|
|
526 | |
|
|
527 | 0 |
|
|
528 | }; |
|
|
529 | |
|
|
530 | We assume that all messages that are broadcast to the clients by the |
|
|
531 | server contain the string tag C<message> as first element, and the actual |
|
|
532 | message as second element. The callback returns a false value this time, |
|
|
533 | to indicate that it is not yet done and wants to receive further messages. |
|
|
534 | |
|
|
535 | The last thing to do is to tell the server to send us new chat messages |
|
|
536 | from other clients. We do so by sending the message C<join> followed by |
|
|
537 | our own I<port ID>. |
|
|
538 | |
|
|
539 | # send the server a 'join' message: |
|
|
540 | snd $server_port, join => $client_port; |
|
|
541 | |
|
|
542 | This way the server knows where to send all the new messages to. |
|
|
543 | |
|
|
544 | =head1 The Completed Client |
|
|
545 | |
|
|
546 | This is the complete client script: |
|
|
547 | |
|
|
548 | #!perl |
|
|
549 | |
|
|
550 | use AnyEvent; |
|
|
551 | use AnyEvent::Handle; |
|
|
552 | use AnyEvent::MP; |
|
|
553 | |
|
|
554 | my $server_node = "127.0.0.1:1299"; |
|
|
555 | |
|
|
556 | my $client_port = port; |
|
|
557 | |
|
|
558 | snd $server_node, lookup => "chatter", $client_port, "resolved"; |
|
|
559 | |
|
|
560 | my $resolved_cv = AnyEvent->condvar; |
|
|
561 | my $server_port; |
|
|
562 | |
|
|
563 | # setup a receiver callback for the 'resolved' message: |
|
|
564 | rcv $client_port, resolved => sub { |
|
|
565 | my ($tag, $chatter_port_id) = @_; |
|
|
566 | |
|
|
567 | print "Resolved the server port 'chatter' to $chatter_port_id\n"; |
|
|
568 | $server_port = $chatter_port_id; |
|
|
569 | |
|
|
570 | $resolved_cv->send; |
|
|
571 | 1 |
|
|
572 | }; |
|
|
573 | |
|
|
574 | # lets block the client until we have resolved the server port. |
|
|
575 | $resolved_cv->recv; |
|
|
576 | |
|
|
577 | # now setup another receiver callback for the chat messages: |
|
|
578 | rcv $client_port, message => sub { |
|
|
579 | my ($tag, $msg) = @_; |
|
|
580 | |
|
|
581 | print "chat> $msg\n"; |
|
|
582 | 0 |
|
|
583 | }; |
|
|
584 | |
|
|
585 | # send a 'join' message to the server: |
|
|
586 | snd $server_port, join => "$client_port"; |
|
|
587 | |
|
|
588 | sub send_message { |
|
|
589 | my ($msg) = @_; |
|
|
590 | |
|
|
591 | snd $server_port, message => $msg; |
|
|
592 | } |
|
|
593 | |
|
|
594 | # make an AnyEvent condition variable for the 'quit' condition |
|
|
595 | # (when we want to exit the client). |
|
|
596 | my $quit_cv = AnyEvent->condvar; |
|
|
597 | |
|
|
598 | my $stdin_hdl = AnyEvent::Handle->new ( |
|
|
599 | fh => *STDIN, |
|
|
600 | on_error => sub { $quit_cv->send }, |
|
|
601 | on_read => sub { |
|
|
602 | my ($hdl) = @_; |
|
|
603 | |
|
|
604 | $hdl->push_read (line => sub { |
|
|
605 | my ($hdl, $line) = @_; |
|
|
606 | |
|
|
607 | if ($line =~ /^\/quit/) { # /quit will end the client |
|
|
608 | $quit_cv->send; |
|
|
609 | } else { |
|
|
610 | send_message ($line); |
|
|
611 | } |
|
|
612 | }); |
|
|
613 | } |
|
|
614 | ); |
|
|
615 | |
|
|
616 | $quit_cv->recv; |
|
|
617 | |
|
|
618 | =head1 The Server |
306 | =head2 The Sender |
619 | |
307 | |
620 | Ok, we finally come to the server. |
308 | Ok, now lets take a look at the sender: |
621 | |
309 | |
622 | The server of course also needs to set up a port, and in addition needs to |
310 | #!/opt/perl/bin/perl |
623 | I<register> it, so the clients can find it. |
|
|
624 | |
|
|
625 | Again, let's jump directly into the code: |
|
|
626 | |
|
|
627 | #!perl |
|
|
628 | |
|
|
629 | use AnyEvent; |
311 | use AnyEvent; |
630 | use AnyEvent::MP; |
312 | use AnyEvent::MP; |
|
|
313 | use AnyEvent::MP::Global; |
631 | |
314 | |
632 | become_public "127.0.0.1:1299"; |
315 | initialise_node "eg_simple_sender"; |
633 | |
316 | |
634 | my $chatter_port = port; |
317 | my $find_timer = |
|
|
318 | AnyEvent->timer (after => 0, interval => 1, cb => sub { |
|
|
319 | my $ports = AnyEvent::MP::Global::find "eg_receivers" |
|
|
320 | or return; |
635 | |
321 | |
636 | reg $chatter_port, "chatter"; |
322 | snd $_, test => time |
637 | |
323 | for @$ports; |
638 | my %client_ports; |
|
|
639 | |
|
|
640 | rcv $chatter_port, |
|
|
641 | join => sub { |
|
|
642 | my ($tag, $client_port) = @_; |
|
|
643 | |
|
|
644 | print "got new client port: $client_port\n"; |
|
|
645 | $client_ports{$client_port} = 1; |
|
|
646 | |
|
|
647 | 0 |
|
|
648 | }, |
|
|
649 | message => sub { |
|
|
650 | my ($tag, $msg) = @_; |
|
|
651 | |
|
|
652 | print "message> $msg\n"; |
|
|
653 | |
|
|
654 | snd $_, message => $msg |
|
|
655 | for keys %client_ports; |
|
|
656 | |
|
|
657 | 0 |
|
|
658 | }; |
324 | }); |
659 | |
325 | |
660 | AnyEvent->condvar->recv; |
326 | AnyEvent->condvar->recv; |
661 | |
327 | |
662 | That is all. Looks much simpler than the client, doesn't it? |
328 | It's even less code. The C<initialise_node> is known now from the receiver |
663 | |
329 | above. As discussed in the section where we setup the profiles we configure |
664 | Let's quickly look over it, as C<rcv> has already been discussed in the |
330 | this application to use the I<profile> C<eg_simple_sender>. |
665 | client part of this tutorial above. |
|
|
666 | |
331 | |
667 | First this: |
332 | Next we setup a timer that repeatedly calls this chunk of code: |
668 | |
333 | |
669 | become_public "127.0.0.1:1299"; |
334 | my $ports = AnyEvent::MP::Global::find "eg_receivers" |
|
|
335 | or return; |
670 | |
336 | |
671 | This will tell our I<node> to become a I<public> node, which means that it |
337 | snd $_, test => time |
672 | can be contacted via TCP. The first argument should be the I<noderef> the |
338 | for @$ports; |
673 | server wants to be reachable at. In this case it's the TCP port 1299 on |
|
|
674 | C<127.0.0.1>. |
|
|
675 | |
339 | |
676 | Next we set up two receivers, one for the C<join> messages and another one |
340 | The new function here is the C<find> function of L<AnyEvent::MP::Global>. It |
677 | for the actual messages of type C<messsage>. This is done with a single |
341 | searches in the I<global group> named C<eg_receivers> for ports. If none are |
678 | call to C<rcv>, which allows multiple C<< match => $callback >> pairs. |
342 | found C<undef> is returned and we wait for the next time the timer fires. |
679 | |
343 | |
680 | In the C<join> callback we receive the client port, which is simply |
344 | In case the receiver application has been connected and the newly added port by |
681 | remembered in the C<%client_ports> hash. In the C<message> callback we |
345 | the receiver has propagated to the sender C<find> returns an array reference |
682 | just iterate through all known C<%client_ports> and relay the message to |
346 | that contains the I<port id> of the receiver I<port(s)>. |
683 | them. |
|
|
684 | |
347 | |
685 | That concludes the server. |
348 | We then just send to every I<port> in the I<global group> a message consisting |
|
|
349 | of the I<tag> C<test> and the current time in form of a UNIX timestamp. |
686 | |
350 | |
687 | =head1 The Remaining Problems |
351 | And thats all. |
688 | |
|
|
689 | The implementation as shown still has some bugs. For instance: How does |
|
|
690 | the server know that the client isn't there anymore, so it can clean up |
|
|
691 | the C<%client_ports> hash? Also, the chat messages have no originator, so |
|
|
692 | we don't know who actually sent the message (which would be quite useful |
|
|
693 | for human-to-human interaction: to know who the other one is :). |
|
|
694 | |
|
|
695 | But aside from these issues I hope this tutorial showed you the basics of |
|
|
696 | L<AnyEvent::MP> and explained some common idioms. |
|
|
697 | |
|
|
698 | How to solve the reliability and C<%client_ports> cleanup problem will |
|
|
699 | be explained later in this tutorial (TODO). |
|
|
700 | |
|
|
701 | =head1 Inside The Protocol |
|
|
702 | |
|
|
703 | Now, for the interested parties, let me explain some details about the protocol |
|
|
704 | that L<AnyEvent::MP> nodes use to communicate to each other. If you are not |
|
|
705 | interested you can skip this section. |
|
|
706 | |
|
|
707 | Usually TCP is used for communication. Each I<node>, if configured to be |
|
|
708 | a I<public> node with the C<initialise_node> function will listen on the |
|
|
709 | configured TCP port (default is 4040). |
|
|
710 | |
|
|
711 | If then one I<node> wants to send a message to another I<node> it will |
|
|
712 | connect to the host and port given in the I<port ID>. |
|
|
713 | |
|
|
714 | Then some handshaking occurs to check whether both I<nodes> know the |
|
|
715 | I<shared secret>. Optionally, TLS can be enabled (about how to do this |
|
|
716 | exactly please consult the L<AnyEvent::MP> man page, just a hint: It |
|
|
717 | should be enough to put the private key and (self signed) certificate in |
|
|
718 | the C<~/.aemp-secret> file of all nodes). |
|
|
719 | |
|
|
720 | After the handshake, messages will be exchanged using a serialiser |
|
|
721 | (usually L<JSON> is used for this, but it is also possible to use other |
|
|
722 | serialization formats such as L<Storable>). |
|
|
723 | |
352 | |
724 | =head1 SEE ALSO |
353 | =head1 SEE ALSO |
725 | |
354 | |
726 | L<AnyEvent> |
355 | L<AnyEvent> |
727 | |
356 | |
728 | L<AnyEvent::Handle> |
357 | L<AnyEvent::Handle> |
729 | |
358 | |
730 | L<AnyEvent::MP> |
359 | L<AnyEvent::MP> |
731 | |
360 | |
|
|
361 | L<AnyEvent::MP::Global> |
|
|
362 | |
732 | =head1 AUTHOR |
363 | =head1 AUTHOR |
733 | |
364 | |
734 | Robin Redeker <elmex@ta-sa.org> |
365 | Robin Redeker <elmex@ta-sa.org> |
735 | |
366 | |