… | |
… | |
296 | |
296 | |
297 | use AnyEvent; |
297 | use AnyEvent; |
298 | use AnyEvent::Handle; |
298 | use AnyEvent::Handle; |
299 | use AnyEvent::MP; |
299 | use AnyEvent::MP; |
300 | |
300 | |
|
|
301 | my $server_node = "127.0.0.1:1299"; |
|
|
302 | |
|
|
303 | my $client_port = port; |
|
|
304 | |
|
|
305 | snd $server_node, lookup => "chatter", $client_port, "resolved"; |
|
|
306 | |
301 | my $resolved_cv = AnyEvent->condvar; |
307 | my $resolved_cv = AnyEvent->condvar; |
302 | |
|
|
303 | my $client_port = create_port; |
|
|
304 | |
|
|
305 | my $server_node = "localhost:1299#"; |
|
|
306 | |
|
|
307 | snd $server_node, wkp => "chatter", "$client_port", "resolved"; |
|
|
308 | |
|
|
309 | my $server_port; |
308 | my $server_port; |
310 | |
309 | |
311 | # setup a receiver callback for the 'resolved' message: |
310 | # setup a receiver callback for the 'resolved' message: |
312 | $client_port->rcv (resolved => sub { |
311 | rcv $client_port, resolved => sub { |
313 | my ($client_port, $type, $chatter_port_id) = @_; |
312 | my ($tag, $chatter_port_id) = @_; |
314 | |
313 | |
315 | print "Resolved the server port 'chatter' to $chatter_port_id\n"; |
314 | print "Resolved the server port 'chatter' to $chatter_port_id\n"; |
316 | $server_port = $chatter_port_id; |
315 | $server_port = $chatter_port_id; |
317 | |
316 | |
318 | $resolved_cv->send; |
317 | $resolved_cv->send; |
319 | 1 |
318 | 1 |
320 | }); |
319 | }; |
321 | |
320 | |
322 | # lets block the client until we resolved the server port. |
321 | # lets block the client until we have resolved the server port. |
323 | $resolved_cv->recv; |
322 | $resolved_cv->recv; |
324 | |
323 | |
325 | # now setup another receiver callback for the chat messages: |
324 | # now setup another receiver callback for the chat messages: |
326 | $client_port->rcv (message => sub { |
325 | rcv $client_port, message => sub { |
327 | my ($client_port, $type, $msg) = @_; |
326 | my ($tag, $msg) = @_; |
328 | |
327 | |
329 | print "chat> $msg\n"; |
328 | print "chat> $msg\n"; |
330 | 0 |
329 | 0 |
331 | }); |
330 | }; |
332 | |
331 | |
333 | # send the server a 'join' message: |
332 | # send a 'join' message to the server: |
334 | snd $server_port, join => "$client_port"; |
333 | snd $server_port, join => "$client_port"; |
335 | |
334 | |
336 | sub send_message { |
335 | sub send_message { |
337 | my ($msg) = @_; |
336 | my ($msg) = @_; |
338 | |
337 | |
… | |
… | |
342 | # make an AnyEvent condition variable for the 'quit' condition |
341 | # make an AnyEvent condition variable for the 'quit' condition |
343 | # (when we want to exit the client). |
342 | # (when we want to exit the client). |
344 | my $quit_cv = AnyEvent->condvar; |
343 | my $quit_cv = AnyEvent->condvar; |
345 | |
344 | |
346 | my $stdin_hdl = AnyEvent::Handle->new ( |
345 | my $stdin_hdl = AnyEvent::Handle->new ( |
347 | fh => \*STDIN, |
346 | fh => *STDIN, |
|
|
347 | on_error => sub { $quit_cv->send }, |
348 | on_read => sub { |
348 | on_read => sub { |
349 | my ($hdl) = @_; |
349 | my ($hdl) = @_; |
350 | |
350 | |
351 | $hdl->push_read (line => sub { |
351 | $hdl->push_read (line => sub { |
352 | my ($hdl, $line) = @_; |
352 | my ($hdl, $line) = @_; |
353 | |
353 | |
354 | if ($line =~ /^\/quit/) { # /quit will end the client |
354 | if ($line =~ /^\/quit/) { # /quit will end the client |
355 | $quit_cv->send; |
355 | $quit_cv->send; |
356 | |
|
|
357 | } else { |
356 | } else { |
358 | send_message ($line); |
357 | send_message ($line); |
359 | } |
358 | } |
360 | }); |
359 | }); |
361 | } |
360 | } |
… | |
… | |
363 | |
362 | |
364 | $quit_cv->recv; |
363 | $quit_cv->recv; |
365 | |
364 | |
366 | =head1 The Server |
365 | =head1 The Server |
367 | |
366 | |
368 | Ok, now finally to the server. What do we need? Well, we need to setup |
367 | Ok, we finally come to the server. |
369 | the well known port C<chatter> where all clients send their messages to. |
|
|
370 | |
368 | |
371 | Up and into code right now: |
369 | The server of course also needs to set up a port, and in addition needs to |
|
|
370 | I<register> it, so the clients can find it. |
|
|
371 | |
|
|
372 | Again, let's jump directly into the code: |
372 | |
373 | |
373 | #!perl |
374 | #!perl |
374 | |
375 | |
375 | use AnyEvent; |
376 | use AnyEvent; |
376 | use AnyEvent::MP; |
377 | use AnyEvent::MP; |
377 | |
378 | |
378 | become_public "localhost:1299"; |
379 | become_public "127.0.0.1:1299"; |
379 | |
380 | |
380 | my $chatter_port = create_port; |
381 | my $chatter_port = port; |
381 | $chatter_port->register ("chatter"); |
382 | |
|
|
383 | reg $chatter_port, "chatter"; |
382 | |
384 | |
383 | my %client_ports; |
385 | my %client_ports; |
384 | |
386 | |
385 | $chatter_port->rcv (join => sub { |
387 | rcv $chatter_port, |
386 | my ($chatter_port, $type, $client_port) = @_; |
388 | join => sub { |
|
|
389 | my ($tag, $client_port) = @_; |
387 | |
390 | |
388 | print "got new client port: $client_port\n"; |
391 | print "got new client port: $client_port\n"; |
389 | |
|
|
390 | $client_ports{$client_port} = 1; |
392 | $client_ports{$client_port} = 1; |
|
|
393 | |
391 | 0 |
394 | 0 |
392 | }); |
395 | }, |
|
|
396 | message => sub { |
|
|
397 | my ($tag, $msg) = @_; |
393 | |
398 | |
394 | $chatter_port->rcv (message => sub { |
|
|
395 | my ($chatter_port, $type, $msg) = @_; |
|
|
396 | |
|
|
397 | print "message> $msg\n"; |
399 | print "message> $msg\n"; |
398 | |
400 | |
399 | snd $_, message => $msg for keys %client_ports; |
401 | snd $_, message => $msg |
|
|
402 | for keys %client_ports; |
|
|
403 | |
400 | 0 |
404 | 0 |
401 | }); |
405 | }); |
402 | |
406 | |
403 | AnyEvent->condvar->recv; |
407 | AnyEvent->condvar->recv; |
404 | |
408 | |
405 | This is all. Looks much easier, doesn't it? I'll explain it only shortly, as |
409 | That is all. Looks much simpler than the client, doesn't it? |
406 | we had the discussion of the C<rcv> method in the client part of this tutorial |
410 | |
407 | above. |
411 | Let's quickly look over it, as C<rcv> has already been discussed in the |
|
|
412 | client part of this tutorial above. |
408 | |
413 | |
409 | First this: |
414 | First this: |
410 | |
415 | |
411 | become_public "localhost:1299"; |
416 | become_public "127.0.0.1:1299"; |
412 | |
417 | |
413 | This will tell our I<node> to become a I<public> node, which means that it can |
418 | This will tell our I<node> to become a I<public> node, which means that it |
414 | be contacted via TCP. The first argument should be the I<noderef> the server |
419 | can be contacted via TCP. The first argument should be the I<noderef> the |
415 | wants to be reachable at. In this case it's the TCP port 1299 on localhost. |
420 | server wants to be reachable at. In this case it's the TCP port 1299 on |
|
|
421 | C<127.0.0.1>. |
416 | |
422 | |
417 | Next we basically setup two receivers, one for the C<join> messages and |
423 | Next we set up two receivers, one for the C<join> messages and another one |
418 | another one for the actual messages of type C<messsage>. |
424 | for the actual messages of type C<messsage>. This is done with a single |
|
|
425 | call to C<rcv>, which allows multiple C<< match => $callback >> pairs. |
419 | |
426 | |
420 | In the C<join> message we get the client's port, which we just remember in the |
427 | In the C<join> callback we receive the client port, which is simply |
421 | C<%client_ports> hash. In the receiver for the message type C<message> we will |
428 | remembered in the C<%client_ports> hash. In the C<message> callback we |
422 | just iterate through all known C<%client_ports> and relay the message to them. |
429 | just iterate through all known C<%client_ports> and relay the message to |
|
|
430 | them. |
423 | |
431 | |
424 | And thats it. |
432 | That concludes the server. |
425 | |
433 | |
426 | =head1 The Remaining Problems |
434 | =head1 The Remaining Problems |
427 | |
435 | |
428 | The shown implementation still has some bugs. For instance: How does the |
436 | The implementation as shown still has some bugs. For instance: How does |
429 | server know that the client isn't there anymore, and can cleanup the |
437 | the server know that the client isn't there anymore, so it can clean up |
430 | C<%client_ports> hash? And also the chat messages have no originator, |
438 | the C<%client_ports> hash? Also, the chat messages have no originator, so |
431 | so we don't know who actually sent the message (which would be quite useful |
439 | we don't know who actually sent the message (which would be quite useful |
432 | for human-to-human interaction: to know who the other one is :). |
440 | for human-to-human interaction: to know who the other one is :). |
433 | |
441 | |
434 | But aside from these issues I hope this tutorial got you the swing of |
442 | But aside from these issues I hope this tutorial showed you the basics of |
435 | L<AnyEvent::MP> and explained some common idioms. |
443 | L<AnyEvent::MP> and explained some common idioms. |
436 | |
444 | |
437 | How to solve the reliability and C<%client_ports> cleanup problem will |
445 | How to solve the reliability and C<%client_ports> cleanup problem will |
438 | be explained later in this tutorial (TODO). |
446 | be explained later in this tutorial (TODO). |
439 | |
447 | |
… | |
… | |
443 | that L<AnyEvent::MP> nodes use to communicate to each other. If you are not |
451 | that L<AnyEvent::MP> nodes use to communicate to each other. If you are not |
444 | interested you can skip this section. |
452 | interested you can skip this section. |
445 | |
453 | |
446 | Usually TCP is used for communication. Each I<node>, if configured to be a |
454 | Usually TCP is used for communication. Each I<node>, if configured to be a |
447 | I<public> node with the C<become_public> function will listen on the configured |
455 | I<public> node with the C<become_public> function will listen on the configured |
448 | TCP port (default is usually 4040). |
456 | TCP port (default is 4040). |
449 | |
457 | |
450 | If now one I<node> wants to send a message to another I<node> it will connect |
458 | If then one I<node> wants to send a message to another I<node> it will |
451 | to the host and port given in the I<port id>. |
459 | connect to the host and port given in the I<port ID>. |
452 | |
460 | |
453 | Then some handshaking occurs to check whether both I<nodes> have the same |
461 | Then some handshaking occurs to check whether both I<nodes> know the |
454 | I<shared secret> configured. Optionally even TLS can be enabled (about how to |
462 | I<shared secret>. Optionally, TLS can be enabled (about how to do this |
455 | do this exactly please consult the L<AnyEvent::MP> man pages, just a hint: It |
463 | exactly please consult the L<AnyEvent::MP> man page, just a hint: It |
456 | should be enough to put the private key and (self signed) certificate in the |
464 | should be enough to put the private key and (self signed) certificate in |
457 | C<~/.aemp-secret> file of all nodes). |
465 | the C<~/.aemp-secret> file of all nodes). |
458 | |
466 | |
|
|
467 | After the handshake, messages will be exchanged using a serialiser |
459 | Now the serialized (usually L<JSON> is used for this, but it is also possible |
468 | (usually L<JSON> is used for this, but it is also possible to use other |
460 | to use other serialization formats, like L<Storable>) messages are sent over |
469 | serialization formats such as L<Storable>). |
461 | the wire using a simple framing protocol. |
|
|
462 | |
470 | |
463 | =head1 SEE ALSO |
471 | =head1 SEE ALSO |
464 | |
472 | |
465 | L<AnyEvent> |
473 | L<AnyEvent> |
466 | |
474 | |