ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.108
Committed: Sat Mar 24 01:19:47 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.107: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 $AnyEvent::MP::Kernel::SRCNODE # contains msg origin node id, for debugging
10
11 snd_to_func $node, $func, @args # send msg to function
12 snd_on $node, @msg # snd message again (relay)
13 eval_on $node, $string[, @reply] # execute perl code on another node
14
15 node_is_up $nodeid # return true if a node is connected
16 @nodes = up_nodes # return a list of all connected nodes
17 $guard = mon_nodes $callback->($node, $is_up, @reason) # connections up/downs
18
19 =head1 DESCRIPTION
20
21 This module implements most of the inner workings of AnyEvent::MP. It
22 offers mostly lower-level functions that deal with network connectivity
23 and special requests.
24
25 You normally interface with AnyEvent::MP through a higher level interface
26 such as L<AnyEvent::MP> and L<Coro::MP>, although there is nothing wrong
27 with using the functions from this module.
28
29 =head1 GLOBALS AND FUNCTIONS
30
31 =over 4
32
33 =cut
34
35 package AnyEvent::MP::Kernel;
36
37 use common::sense;
38 use Carp ();
39
40 use AnyEvent ();
41 use Guard ();
42
43 use AnyEvent::MP::Node;
44 use AnyEvent::MP::Transport;
45
46 use base "Exporter";
47
48 # for re-export in AnyEvent::MP and Coro::MP
49 our @EXPORT_API = qw(
50 NODE $NODE
51 configure
52 node_of port_is_local
53 snd kil
54 db_set db_del
55 db_mon db_family db_keys db_values
56 );
57
58 our @EXPORT_OK = (
59 # these are internal
60 qw(
61 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
62 add_node load_func
63 ),
64 @EXPORT_API,
65 );
66
67 our @EXPORT = qw(
68 snd_to_func snd_on eval_on
69 port_is_local
70 up_nodes mon_nodes node_is_up
71 );
72
73 sub load_func($) {
74 my $func = $_[0];
75
76 unless (defined &$func) {
77 my $pkg = $func;
78 do {
79 $pkg =~ s/::[^:]+$//
80 or return sub { die "unable to resolve function '$func'" };
81
82 local $@;
83 unless (eval "require $pkg; 1") {
84 my $error = $@;
85 $error =~ /^Can't locate .*.pm in \@INC \(/
86 or return sub { die $error };
87 }
88 } until defined &$func;
89 }
90
91 \&$func
92 }
93
94 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
95
96 sub nonce($) {
97 join "", map chr rand 256, 1 .. $_[0]
98 }
99
100 sub nonce62($) {
101 join "", map $alnum[rand 62], 1 .. $_[0]
102 }
103
104 our $CONFIG; # this node's configuration
105 our $SECURE;
106
107 our $RUNIQ; # remote uniq value
108 our $UNIQ; # per-process/node unique cookie
109 our $NODE;
110 our $ID = "a";
111
112 our %NODE; # node id to transport mapping, or "undef", for local node
113 our (%PORT, %PORT_DATA); # local ports
114
115 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
116 our %LMON; # monitored _local_ ports
117
118 our $GLOBAL; # true if node is a global ("directory") node
119 our %BINDS;
120 our $BINDS; # our listeners, as arrayref
121
122 our $SRCNODE; # holds the sending node _object_ during _inject
123
124 # initialise names for non-networked operation
125 {
126 # ~54 bits, for local port names, lowercase $ID appended
127 my $now = AE::now;
128 $UNIQ =
129 (join "",
130 map $alnum[$_],
131 $$ / 62 % 62,
132 $$ % 62,
133 (int $now ) % 62,
134 (int $now * 100) % 62,
135 (int $now * 10000) % 62,
136 ) . nonce62 4
137 ;
138
139 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
140 $RUNIQ = nonce62 10;
141 $RUNIQ =~ s/(.)$/\U$1/;
142
143 $NODE = "";
144 }
145
146 sub NODE() {
147 $NODE
148 }
149
150 sub node_of($) {
151 my ($node, undef) = split /#/, $_[0], 2;
152
153 $node
154 }
155
156 BEGIN {
157 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
158 ? sub () { 1 }
159 : sub () { 0 };
160 }
161
162 our $DELAY_TIMER;
163 our @DELAY_QUEUE;
164
165 our $delay_run = sub {
166 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
167 };
168
169 sub delay($) {
170 push @DELAY_QUEUE, shift;
171 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
172 }
173
174 =item $AnyEvent::MP::Kernel::SRCNODE
175
176 During execution of a message callback, this variable contains the node ID
177 of the origin node.
178
179 The main use of this variable is for debugging output - there are probably
180 very few other cases where you need to know the source node ID.
181
182 =cut
183
184 sub _inject {
185 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
186 &{ $PORT{+shift} or return };
187 }
188
189 # this function adds a node-ref, so you can send stuff to it
190 # it is basically the central routing component.
191 sub add_node {
192 $NODE{$_[0]} || do {
193 my ($node) = @_;
194
195 length $node
196 or Carp::croak "'undef' or the empty string are not valid node/port IDs";
197
198 # registers itself in %NODE
199 new AnyEvent::MP::Node::Remote $node
200 }
201 }
202
203 sub snd(@) {
204 my ($nodeid, $portid) = split /#/, shift, 2;
205
206 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
207
208 ($NODE{$nodeid} || add_node $nodeid)
209 ->{send} (["$portid", @_]);
210 }
211
212 sub port_is_local($) {
213 my ($nodeid, undef) = split /#/, $_[0], 2;
214
215 $nodeid eq $NODE
216 }
217
218 =item snd_to_func $node, $func, @args
219
220 Expects a node ID and a name of a function. Asynchronously tries to call
221 this function with the given arguments on that node.
222
223 This function can be used to implement C<spawn>-like interfaces.
224
225 =cut
226
227 sub snd_to_func($$;@) {
228 my $nodeid = shift;
229
230 # on $NODE, we artificially delay... (for spawn)
231 # this is very ugly - maybe we should simply delay ALL messages,
232 # to avoid deep recursion issues. but that's so... slow...
233 $AnyEvent::MP::Node::Self::DELAY = 1
234 if $nodeid ne $NODE;
235
236 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
237 }
238
239 =item snd_on $node, @msg
240
241 Executes C<snd> with the given C<@msg> (which must include the destination
242 port) on the given node.
243
244 =cut
245
246 sub snd_on($@) {
247 my $node = shift;
248 snd $node, snd => @_;
249 }
250
251 =item eval_on $node, $string[, @reply]
252
253 Evaluates the given string as Perl expression on the given node. When
254 @reply is specified, then it is used to construct a reply message with
255 C<"$@"> and any results from the eval appended.
256
257 =cut
258
259 sub eval_on($$;@) {
260 my $node = shift;
261 snd $node, eval => @_;
262 }
263
264 sub kil(@) {
265 my ($nodeid, $portid) = split /#/, shift, 2;
266
267 length $portid
268 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
269
270 ($NODE{$nodeid} || add_node $nodeid)
271 ->kill ("$portid", @_);
272 }
273
274 #############################################################################
275 # node monitoring and info
276
277 =item $bool = node_is_up $nodeid
278
279 Returns true if the given node is "up", that is, the kernel thinks it has
280 a working connection to it.
281
282 More precisely, if the node is up, returns C<1>. If the node is currently
283 connecting or otherwise known but not connected, returns C<0>. If nothing
284 is known about the node, returns C<undef>.
285
286 =cut
287
288 sub node_is_up($) {
289 ($_[0] eq $NODE) || ($NODE{$_[0]} or return)->{transport}
290 ? 1 : 0
291 }
292
293 =item @nodes = up_nodes
294
295 Return the node IDs of all nodes that are currently connected (excluding
296 the node itself).
297
298 =cut
299
300 sub up_nodes() {
301 map $_->{id}, grep $_->{transport}, values %NODE
302 }
303
304 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
305
306 Registers a callback that is called each time a node goes up (a connection
307 is established) or down (the connection is lost).
308
309 Node up messages can only be followed by node down messages for the same
310 node, and vice versa.
311
312 Note that monitoring a node is usually better done by monitoring its node
313 port. This function is mainly of interest to modules that are concerned
314 about the network topology and low-level connection handling.
315
316 Callbacks I<must not> block and I<should not> send any messages.
317
318 The function returns an optional guard which can be used to unregister
319 the monitoring callback again.
320
321 Example: make sure you call function C<newnode> for all nodes that are up
322 or go up (and down).
323
324 newnode $_, 1 for up_nodes;
325 mon_nodes \&newnode;
326
327 =cut
328
329 our %MON_NODES;
330
331 sub mon_nodes($) {
332 my ($cb) = @_;
333
334 $MON_NODES{$cb+0} = $cb;
335
336 defined wantarray
337 and Guard::guard { delete $MON_NODES{$cb+0} }
338 }
339
340 sub _inject_nodeevent($$;@) {
341 my ($node, $up, @reason) = @_;
342
343 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
344
345 for my $cb (values %MON_NODES) {
346 eval { $cb->($node->{id}, $up, @reason); 1 }
347 or AE::log die => $@;
348 }
349 }
350
351 #############################################################################
352 # self node code
353
354 sub _kill {
355 my $port = shift;
356
357 delete $PORT{$port}
358 or return; # killing nonexistent ports is O.K.
359 delete $PORT_DATA{$port};
360
361 my $mon = delete $LMON{$port}
362 or !@_
363 or AE::log die => "unmonitored local port $port died with reason: @_";
364
365 $_->(@_) for values %$mon;
366 }
367
368 sub _monitor {
369 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
370 unless exists $PORT{$_[1]};
371
372 $LMON{$_[1]}{$_[2]+0} = $_[2];
373 }
374
375 sub _unmonitor {
376 delete $LMON{$_[1]}{$_[2]+0}
377 if exists $LMON{$_[1]};
378 }
379
380 sub _secure_check {
381 $SECURE
382 and die "remote execution not allowed\n";
383 }
384
385 our %NODE_REQ;
386
387 %NODE_REQ = (
388 # "mproto" - monitoring protocol
389
390 # monitoring
391 mon0 => sub { # stop monitoring a port for another node
392 my $portid = shift;
393 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
394 },
395 mon1 => sub { # start monitoring a port for another node
396 my $portid = shift;
397 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
398 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
399 delete $node->{rmon}{$portid};
400 $node->send (["", kil0 => $portid, @_])
401 if $node && $node->{transport};
402 };
403 },
404 # another node has killed a monitored port
405 kil0 => sub {
406 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
407 or return;
408
409 $_->(@_) for @$cbs;
410 },
411 # another node wants to kill a local port
412 kil1 => \&_kill,
413
414 # "public" services - not actually public
415
416 # relay message to another node / generic echo
417 snd => sub {
418 &snd
419 },
420 # ask if a node supports the given request, only works for fixed tags
421 can => sub {
422 my $method = shift;
423 snd @_, exists $NODE_REQ{$method};
424 },
425
426 # random utilities
427 eval => sub {
428 &_secure_check;
429 my @res = do { package main; eval shift };
430 snd @_, "$@", @res if @_;
431 },
432 time => sub {
433 snd @_, AE::now;
434 },
435 devnull => sub {
436 #
437 },
438 "" => sub {
439 # empty messages are keepalives or similar devnull-applications
440 },
441 );
442
443 # the node port
444 new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE
445
446 $PORT{""} = sub {
447 my $tag = shift;
448 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
449 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
450 };
451
452 our $MPROTO = 1;
453
454 # tell everybody who connects our nproto
455 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
456 $_[0]{local_greeting}{mproto} = $MPROTO;
457 };
458
459 #############################################################################
460 # seed management, try to keep connections to all seeds at all times
461
462 our %SEED_NODE; # seed ID => node ID|undef
463 our %NODE_SEED; # map node ID to seed ID
464 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
465 our $SEED_WATCHER;
466 our $SEED_RETRY;
467
468 sub seed_connect {
469 my ($seed) = @_;
470
471 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
472 or Carp::croak "$seed: unparsable seed address";
473
474 AE::log 9 => "trying connect to seed node $seed.";
475
476 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
477 $host, $port,
478 on_greeted => sub {
479 # called after receiving remote greeting, learn remote node name
480
481 # we rely on untrusted data here (the remote node name) this is
482 # hopefully ok, as this can at most be used for DOSing, which is easy
483 # when you can do MITM anyway.
484
485 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
486 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
487 require AnyEvent::MP::Global; # every seed becomes a global node currently
488 delete $SEED_NODE{$seed};
489 } else {
490 $SEED_NODE{$seed} = $_[0]{remote_node};
491 $NODE_SEED{$_[0]{remote_node}} = $seed;
492 # also start global service, if not running
493 # we need to check here in addition to the mon_nodes below
494 # because we might only learn late that a node is a seed
495 # and then we might already be connected
496 snd $_[0]{remote_node}, "g_slave"
497 unless $_[0]{remote_greeting}{global};
498 }
499 },
500 sub {
501 delete $SEED_CONNECT{$seed};
502 }
503 ;
504 }
505
506 sub seed_all {
507 my @seeds = grep
508 !exists $SEED_CONNECT{$_}
509 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
510 keys %SEED_NODE;
511
512 if (@seeds) {
513 # start connection attempt for every seed we are not connected to yet
514 seed_connect $_
515 for @seeds;
516
517 $SEED_RETRY = $SEED_RETRY * 2 + rand;
518 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
519 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
520
521 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
522
523 } else {
524 # all seeds connected or connecting, no need to restart timer
525 undef $SEED_WATCHER;
526 }
527 }
528
529 sub seed_again {
530 $SEED_RETRY = 1;
531 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
532 }
533
534 # sets new seed list, starts connecting
535 sub set_seeds(@) {
536 %SEED_NODE = ();
537 %NODE_SEED = ();
538 %SEED_CONNECT = ();
539
540 @SEED_NODE{@_} = ();
541
542 seed_all;
543 }
544
545 mon_nodes sub {
546 return unless exists $NODE_SEED{$_[0]};
547
548 if ($_[1]) {
549 # each time a connection to a seed node goes up, make
550 # sure it runs the global service.
551 snd $_[0], "g_slave"
552 unless $NODE{$_[0]}{transport}{remote_greeting}{global};
553 } else {
554 # if we lost the connection to a seed node, make sure we are seeding
555 seed_again;
556 }
557 };
558
559 #############################################################################
560 # keepalive code - used to kepe conenctions to certain nodes alive
561 # only used by global code atm., but ought to be exposed somehow.
562
563 our $KEEPALIVE_RETRY;
564 our $KEEPALIVE_WATCHER;
565 our %KEEPALIVE; # we want to keep these nodes alive
566 our %KEEPALIVE_DOWN; # nodes that are down currently
567
568 sub keepalive_all {
569 AE::log 9 => "keepalive: trying to establish connections with: "
570 . (join " ", keys %KEEPALIVE_DOWN)
571 . ".";
572
573 (add_node $_)->connect
574 for keys %KEEPALIVE_DOWN;
575
576 $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2 + rand;
577 $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
578 if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
579
580 $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
581 }
582
583 sub keepalive_again {
584 $KEEPALIVE_RETRY = 1;
585 keepalive_all;
586 }
587
588 sub keepalive_add {
589 return if $KEEPALIVE{$_[0]}++;
590
591 return if node_is_up $_[0];
592 undef $KEEPALIVE_DOWN{$_[0]};
593 keepalive_again;
594 }
595
596 sub keepalive_del {
597 return if --$KEEPALIVE{$_[0]};
598
599 delete $KEEPALIVE {$_[0]};
600 delete $KEEPALIVE_DOWN{$_[0]};
601
602 undef $KEEPALIVE_WATCHER
603 unless %KEEPALIVE_DOWN;
604 }
605
606 mon_nodes sub {
607 return unless exists $KEEPALIVE{$_[0]};
608
609 if ($_[1]) {
610 delete $KEEPALIVE_DOWN{$_[0]};
611
612 undef $KEEPALIVE_WATCHER
613 unless %KEEPALIVE_DOWN;
614 } else {
615 # lost the conenction, try to connect again
616 undef $KEEPALIVE_DOWN{$_[0]};
617 keepalive_again;
618 }
619 };
620
621 #############################################################################
622 # talk with/to global nodes
623
624 # protocol messages:
625 #
626 # sent by all slave nodes (slave to master)
627 # g_slave database - make other global node master of the sender
628 #
629 # sent by any node to global nodes
630 # g_set database - set whole database
631 # g_upd family set del - update single family
632 # g_del family key - delete key from database
633 # g_get family key reply... - send reply with data
634 #
635 # send by global nodes
636 # g_global - node became global, similar to global=1 greeting
637 #
638 # database families
639 # "'l" -> node -> listeners
640 # "'g" -> node -> undef
641 # ...
642 #
643
644 # used on all nodes:
645 our $MASTER; # the global node we bind ourselves to
646 our $MASTER_MON;
647 our %LOCAL_DB; # this node database
648
649 our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
650
651 our $GPROTO = 1;
652
653 # tell everybody who connects our nproto
654 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
655 $_[0]{local_greeting}{gproto} = $GPROTO;
656 };
657
658 #############################################################################
659 # master selection
660
661 # master requests
662 our %GLOBAL_REQ; # $id => \@req
663
664 sub global_req_add {
665 my ($id, $req) = @_;
666
667 return if exists $GLOBAL_REQ{$id};
668
669 $GLOBAL_REQ{$id} = $req;
670
671 snd $MASTER, @$req
672 if $MASTER;
673 }
674
675 sub global_req_del {
676 delete $GLOBAL_REQ{$_[0]};
677 }
678
679 #################################
680 # master rpc
681
682 our %GLOBAL_RES;
683 our $GLOBAL_RES_ID = "a";
684
685 sub global_call {
686 my $id = ++$GLOBAL_RES_ID;
687 $GLOBAL_RES{$id} = pop;
688 global_req_add $id, [@_, $id];
689 }
690
691 $NODE_REQ{g_reply} = sub {
692 my $id = shift;
693 global_req_del $id;
694 my $cb = delete $GLOBAL_RES{$id}
695 or return;
696 &$cb
697 };
698
699 #################################
700
701 sub g_find {
702 global_req_add "g_find $_[0]", [g_find => $_[0]];
703 }
704
705 # reply for g_find started in Node.pm
706 $NODE_REQ{g_found} = sub {
707 global_req_del "g_find $_[0]";
708
709 my $node = $NODE{$_[0]} or return;
710
711 $node->connect_to ($_[1]);
712 };
713
714 sub master_set {
715 $MASTER = $_[0];
716
717 snd $MASTER, g_slave => \%LOCAL_DB;
718
719 # (re-)send queued requests
720 snd $MASTER, @$_
721 for values %GLOBAL_REQ;
722 }
723
724 sub master_search {
725 #TODO: should also look for other global nodes, but we don't know them #d#
726 for (keys %NODE_SEED) {
727 if (node_is_up $_) {
728 master_set $_;
729 return;
730 }
731 }
732
733 $MASTER_MON = mon_nodes sub {
734 return unless $_[1]; # we are only interested in node-ups
735 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
736
737 master_set $_[0];
738
739 $MASTER_MON = mon_nodes sub {
740 if ($_[0] eq $MASTER && !$_[1]) {
741 undef $MASTER;
742 master_search ();
743 }
744 };
745 };
746 }
747
748 # other node wants to make us the master
749 $NODE_REQ{g_slave} = sub {
750 my ($db) = @_;
751
752 # load global module and redo the request
753 require AnyEvent::MP::Global;
754 &{ $NODE_REQ{g_slave} }
755 };
756
757 #############################################################################
758 # local database operations
759
760 # local database management
761
762 sub db_set($$;$) {
763 my ($family, $subkey) = @_;
764
765 # if (ref $_[1]) {
766 # # bulk
767 # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
768 # $LOCAL_DB{$_[0]} = $_[1];
769 # snd $MASTER, g_upd => $_[0] => $_[1], \@del
770 # if defined $MASTER;
771 # } else {
772 # single-key
773 $LOCAL_DB{$family}{$subkey} = $_[2];
774 snd $MASTER, g_upd => $family => { $subkey => $_[2] }
775 if defined $MASTER;
776 # }
777
778 defined wantarray
779 and Guard::guard { db_del $family => $subkey }
780 }
781
782 sub db_del($@) {
783 my $family = shift;
784
785 delete @{ $LOCAL_DB{$family} }{@_};
786 snd $MASTER, g_upd => $family => undef, \@_
787 if defined $MASTER;
788 }
789
790 # database query
791
792 sub db_family {
793 my ($family, $cb) = @_;
794 global_call g_db_family => $family, $cb;
795 }
796
797 sub db_keys {
798 my ($family, $cb) = @_;
799 global_call g_db_keys => $family, $cb;
800 }
801
802 sub db_values {
803 my ($family, $cb) = @_;
804 global_call g_db_values => $family, $cb;
805 }
806
807 # database monitoring
808
809 our %LOCAL_MON; # f, reply
810 our %MON_DB; # f, k, value
811
812 sub db_mon($@) {
813 my ($family, $cb) = @_;
814
815 if (my $db = $MON_DB{$family}) {
816 # we already monitor, so create a "dummy" change event
817 # this is postponed, which might be too late (we could process
818 # change events), so disable the callback at first
819 $LOCAL_MON{$family}{$cb+0} = sub { };
820 AE::postpone {
821 return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
822
823 # set actual callback
824 $LOCAL_MON{$family}{$cb+0} = $cb;
825 $cb->($db, [keys %$db]);
826 };
827 } else {
828 # new monitor, request chg1 from upstream
829 $LOCAL_MON{$family}{$cb+0} = $cb;
830 global_req_add "mon1 $family" => [g_mon1 => $family];
831 $MON_DB{$family} = {};
832 }
833
834 defined wantarray
835 and Guard::guard {
836 my $mon = $LOCAL_MON{$family};
837 delete $mon->{$cb+0};
838
839 unless (%$mon) {
840 global_req_del "mon1 $family";
841
842 # no global_req, because we don't care if we are not connected
843 snd $MASTER, g_mon0 => $family
844 if $MASTER;
845
846 delete $LOCAL_MON{$family};
847 delete $MON_DB{$family};
848 }
849 }
850 }
851
852 # full update
853 $NODE_REQ{g_chg1} = sub {
854 return unless $SRCNODE eq $MASTER;
855 my ($f, $ndb) = @_;
856
857 my $db = $MON_DB{$f};
858 my (@a, @c, @d);
859
860 # add or replace keys
861 while (my ($k, $v) = each %$ndb) {
862 exists $db->{$k}
863 ? push @c, $k
864 : push @a, $k;
865 $db->{$k} = $v;
866 }
867
868 # delete keys that are no longer present
869 for (grep !exists $ndb->{$_}, keys %$db) {
870 delete $db->{$_};
871 push @d, $_;
872 }
873
874 $_->($db, \@a, \@c, \@d)
875 for values %{ $LOCAL_MON{$_[0]} };
876 };
877
878 # incremental update
879 $NODE_REQ{g_chg2} = sub {
880 return unless $SRCNODE eq $MASTER;
881 my ($family, $set, $del) = @_;
882
883 my $db = $MON_DB{$family};
884
885 my (@a, @c);
886
887 while (my ($k, $v) = each %$set) {
888 exists $db->{$k}
889 ? push @c, $k
890 : push @a, $k;
891 $db->{$k} = $v;
892 }
893
894 delete @$db{@$del};
895
896 $_->($db, \@a, \@c, $del)
897 for values %{ $LOCAL_MON{$family} };
898 };
899
900 #############################################################################
901 # configure
902
903 sub nodename {
904 require POSIX;
905 (POSIX::uname ())[1]
906 }
907
908 sub _resolve($) {
909 my ($nodeid) = @_;
910
911 my $cv = AE::cv;
912 my @res;
913
914 $cv->begin (sub {
915 my %seen;
916 my @refs;
917 for (sort { $a->[0] <=> $b->[0] } @res) {
918 push @refs, $_->[1] unless $seen{$_->[1]}++
919 }
920 shift->send (@refs);
921 });
922
923 my $idx;
924 for my $t (split /,/, $nodeid) {
925 my $pri = ++$idx;
926
927 $t = length $t ? nodename . ":$t" : nodename
928 if $t =~ /^\d*$/;
929
930 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
931 or Carp::croak "$t: unparsable transport descriptor";
932
933 $port = "0" if $port eq "*";
934
935 if ($host eq "*") {
936 $cv->begin;
937
938 my $get_addr = sub {
939 my @addr;
940
941 require Net::Interface;
942
943 # Net::Interface hangs on some systems, so hope for the best
944 local $SIG{ALRM} = 'DEFAULT';
945 alarm 2;
946
947 for my $if (Net::Interface->interfaces) {
948 # we statically lower-prioritise ipv6 here, TODO :()
949 for $_ ($if->address (Net::Interface::AF_INET ())) {
950 next if /^\x7f/; # skip localhost etc.
951 push @addr, $_;
952 }
953 for ($if->address (Net::Interface::AF_INET6 ())) {
954 #next if $if->scope ($_) <= 2;
955 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
956 push @addr, $_;
957 }
958 }
959
960 alarm 0;
961
962 @addr
963 };
964
965 my @addr;
966
967 if (AnyEvent::WIN32) {
968 @addr = $get_addr->();
969 } else {
970 # use a child process, as Net::Interface is big, and we need it only once.
971
972 pipe my $r, my $w
973 or die "pipe: $!";
974
975 if (fork eq 0) {
976 close $r;
977 syswrite $w, pack "(C/a*)*", $get_addr->();
978 require POSIX;
979 POSIX::_exit (0);
980 } else {
981 close $w;
982
983 my $addr;
984
985 1 while sysread $r, $addr, 1024, length $addr;
986
987 @addr = unpack "(C/a*)*", $addr;
988 }
989 }
990
991 for my $ip (@addr) {
992 push @res, [
993 $pri += 1e-5,
994 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
995 ];
996 }
997 $cv->end;
998 } else {
999 $cv->begin;
1000 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
1001 for (@_) {
1002 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
1003 push @res, [
1004 $pri += 1e-5,
1005 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
1006 ];
1007 }
1008 $cv->end;
1009 };
1010 }
1011 }
1012
1013 $cv->end;
1014
1015 $cv
1016 }
1017
1018 sub configure(@) {
1019 unshift @_, "profile" if @_ & 1;
1020 my (%kv) = @_;
1021
1022 my $profile = delete $kv{profile};
1023
1024 $profile = nodename
1025 unless defined $profile;
1026
1027 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
1028
1029 $SECURE = $CONFIG->{secure};
1030
1031 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
1032
1033 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
1034
1035 my $node_obj = delete $NODE{$NODE}; # we do not support doing stuff before configure
1036
1037 $NODE = $node;
1038
1039 $NODE =~ s/%n/nodename/ge;
1040
1041 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
1042 # nodes with randomised node names do not need randomised port names
1043 $UNIQ = "";
1044 }
1045
1046 $node_obj->{id} = $NODE;
1047 $NODE{$NODE} = $node_obj;
1048
1049 my $seeds = $CONFIG->{seeds};
1050 my $binds = $CONFIG->{binds};
1051
1052 $binds ||= ["*"];
1053
1054 AE::log 8 => "node $NODE starting up.";
1055
1056 $BINDS = [];
1057 %BINDS = ();
1058
1059 for (map _resolve $_, @$binds) {
1060 for my $bind ($_->recv) {
1061 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1062 or Carp::croak "$bind: unparsable local bind address";
1063
1064 my $listener = AnyEvent::MP::Transport::mp_server
1065 $host,
1066 $port,
1067 prepare => sub {
1068 my (undef, $host, $port) = @_;
1069 $bind = AnyEvent::Socket::format_hostport $host, $port;
1070 0
1071 },
1072 ;
1073 $BINDS{$bind} = $listener;
1074 push @$BINDS, $bind;
1075 }
1076 }
1077
1078 db_set "'l" => $NODE => $BINDS;
1079
1080 AE::log 8 => "node listens on [@$BINDS].";
1081
1082 # connect to all seednodes
1083 set_seeds map $_->recv, map _resolve $_, @$seeds;
1084
1085 master_search;
1086
1087 # save gobs of memory
1088 undef &_resolve;
1089 *configure = sub (@){ };
1090
1091 for (@{ $CONFIG->{services} }) {
1092 if (ref) {
1093 my ($func, @args) = @$_;
1094 (load_func $func)->(@args);
1095 } elsif (s/::$//) {
1096 eval "require $_";
1097 die $@ if $@;
1098 } else {
1099 (load_func $_)->();
1100 }
1101 }
1102
1103 eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
1104 die "$@" if $@;
1105 }
1106
1107 =back
1108
1109 =head1 LOGGING
1110
1111 AnyEvent::MP::Kernel logs high-level information about the current node,
1112 when nodes go up and down, and most runtime errors. It also logs some
1113 debugging and trace messages about network maintainance, such as seed
1114 connections and global node management.
1115
1116 =head1 SEE ALSO
1117
1118 L<AnyEvent::MP>.
1119
1120 =head1 AUTHOR
1121
1122 Marc Lehmann <schmorp@schmorp.de>
1123 http://home.schmorp.de/
1124
1125 =cut
1126
1127 1
1128