ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.109
Committed: Sat Mar 24 13:05:40 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.108: +7 -3 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 $AnyEvent::MP::Kernel::SRCNODE # contains msg origin node id, for debugging
10
11 snd_to_func $node, $func, @args # send msg to function
12 snd_on $node, @msg # snd message again (relay)
13 eval_on $node, $string[, @reply] # execute perl code on another node
14
15 node_is_up $nodeid # return true if a node is connected
16 @nodes = up_nodes # return a list of all connected nodes
17 $guard = mon_nodes $callback->($node, $is_up, @reason) # connections up/downs
18
19 =head1 DESCRIPTION
20
21 This module implements most of the inner workings of AnyEvent::MP. It
22 offers mostly lower-level functions that deal with network connectivity
23 and special requests.
24
25 You normally interface with AnyEvent::MP through a higher level interface
26 such as L<AnyEvent::MP> and L<Coro::MP>, although there is nothing wrong
27 with using the functions from this module.
28
29 =head1 GLOBALS AND FUNCTIONS
30
31 =over 4
32
33 =cut
34
35 package AnyEvent::MP::Kernel;
36
37 use common::sense;
38 use Carp ();
39
40 use AnyEvent ();
41 use Guard ();
42
43 use AnyEvent::MP::Node;
44 use AnyEvent::MP::Transport;
45
46 use base "Exporter";
47
48 # for re-export in AnyEvent::MP and Coro::MP
49 our @EXPORT_API = qw(
50 NODE $NODE
51 configure
52 node_of port_is_local
53 snd kil
54 db_set db_del
55 db_mon db_family db_keys db_values
56 );
57
58 our @EXPORT_OK = (
59 # these are internal
60 qw(
61 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
62 add_node load_func
63 ),
64 @EXPORT_API,
65 );
66
67 our @EXPORT = qw(
68 snd_to_func snd_on eval_on
69 port_is_local
70 up_nodes mon_nodes node_is_up
71 );
72
73 sub load_func($) {
74 my $func = $_[0];
75
76 unless (defined &$func) {
77 my $pkg = $func;
78 do {
79 $pkg =~ s/::[^:]+$//
80 or return sub { die "unable to resolve function '$func'" };
81
82 local $@;
83 unless (eval "require $pkg; 1") {
84 my $error = $@;
85 $error =~ /^Can't locate .*.pm in \@INC \(/
86 or return sub { die $error };
87 }
88 } until defined &$func;
89 }
90
91 \&$func
92 }
93
94 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
95
96 sub nonce($) {
97 join "", map chr rand 256, 1 .. $_[0]
98 }
99
100 sub nonce62($) {
101 join "", map $alnum[rand 62], 1 .. $_[0]
102 }
103
104 our $CONFIG; # this node's configuration
105 our $SECURE;
106
107 our $RUNIQ; # remote uniq value
108 our $UNIQ; # per-process/node unique cookie
109 our $NODE;
110 our $ID = "a";
111
112 our %NODE; # node id to transport mapping, or "undef", for local node
113 our (%PORT, %PORT_DATA); # local ports
114
115 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
116 our %LMON; # monitored _local_ ports
117
118 our $GLOBAL; # true if node is a global ("directory") node
119 our %BINDS;
120 our $BINDS; # our listeners, as arrayref
121
122 our $SRCNODE; # holds the sending node _object_ during _inject
123
124 # initialise names for non-networked operation
125 {
126 # ~54 bits, for local port names, lowercase $ID appended
127 my $now = AE::now;
128 $UNIQ =
129 (join "",
130 map $alnum[$_],
131 $$ / 62 % 62,
132 $$ % 62,
133 (int $now ) % 62,
134 (int $now * 100) % 62,
135 (int $now * 10000) % 62,
136 ) . nonce62 4
137 ;
138
139 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
140 $RUNIQ = nonce62 10;
141 $RUNIQ =~ s/(.)$/\U$1/;
142
143 $NODE = "";
144 }
145
146 sub NODE() {
147 $NODE
148 }
149
150 sub node_of($) {
151 my ($node, undef) = split /#/, $_[0], 2;
152
153 $node
154 }
155
156 BEGIN {
157 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
158 ? sub () { 1 }
159 : sub () { 0 };
160 }
161
162 our $DELAY_TIMER;
163 our @DELAY_QUEUE;
164
165 our $delay_run = sub {
166 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
167 };
168
169 sub delay($) {
170 push @DELAY_QUEUE, shift;
171 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
172 }
173
174 =item $AnyEvent::MP::Kernel::SRCNODE
175
176 During execution of a message callback, this variable contains the node ID
177 of the origin node.
178
179 The main use of this variable is for debugging output - there are probably
180 very few other cases where you need to know the source node ID.
181
182 =cut
183
184 sub _inject {
185 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
186 &{ $PORT{+shift} or return };
187 }
188
189 # this function adds a node-ref, so you can send stuff to it
190 # it is basically the central routing component.
191 sub add_node {
192 $NODE{$_[0]} || do {
193 my ($node) = @_;
194
195 length $node
196 or Carp::croak "'undef' or the empty string are not valid node/port IDs";
197
198 # registers itself in %NODE
199 new AnyEvent::MP::Node::Remote $node
200 }
201 }
202
203 sub snd(@) {
204 my ($nodeid, $portid) = split /#/, shift, 2;
205
206 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
207
208 ($NODE{$nodeid} || add_node $nodeid)
209 ->{send} (["$portid", @_]);
210 }
211
212 sub port_is_local($) {
213 my ($nodeid, undef) = split /#/, $_[0], 2;
214
215 $nodeid eq $NODE
216 }
217
218 =item snd_to_func $node, $func, @args
219
220 Expects a node ID and a name of a function. Asynchronously tries to call
221 this function with the given arguments on that node.
222
223 This function can be used to implement C<spawn>-like interfaces.
224
225 =cut
226
227 sub snd_to_func($$;@) {
228 my $nodeid = shift;
229
230 # on $NODE, we artificially delay... (for spawn)
231 # this is very ugly - maybe we should simply delay ALL messages,
232 # to avoid deep recursion issues. but that's so... slow...
233 $AnyEvent::MP::Node::Self::DELAY = 1
234 if $nodeid ne $NODE;
235
236 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
237 }
238
239 =item snd_on $node, @msg
240
241 Executes C<snd> with the given C<@msg> (which must include the destination
242 port) on the given node.
243
244 =cut
245
246 sub snd_on($@) {
247 my $node = shift;
248 snd $node, snd => @_;
249 }
250
251 =item eval_on $node, $string[, @reply]
252
253 Evaluates the given string as Perl expression on the given node. When
254 @reply is specified, then it is used to construct a reply message with
255 C<"$@"> and any results from the eval appended.
256
257 =cut
258
259 sub eval_on($$;@) {
260 my $node = shift;
261 snd $node, eval => @_;
262 }
263
264 sub kil(@) {
265 my ($nodeid, $portid) = split /#/, shift, 2;
266
267 length $portid
268 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
269
270 ($NODE{$nodeid} || add_node $nodeid)
271 ->kill ("$portid", @_);
272 }
273
274 #############################################################################
275 # node monitoring and info
276
277 =item $bool = node_is_up $nodeid
278
279 Returns true if the given node is "up", that is, the kernel thinks it has
280 a working connection to it.
281
282 More precisely, if the node is up, returns C<1>. If the node is currently
283 connecting or otherwise known but not connected, returns C<0>. If nothing
284 is known about the node, returns C<undef>.
285
286 =cut
287
288 sub node_is_up($) {
289 ($_[0] eq $NODE) || ($NODE{$_[0]} or return)->{transport}
290 ? 1 : 0
291 }
292
293 =item @nodes = up_nodes
294
295 Return the node IDs of all nodes that are currently connected (excluding
296 the node itself).
297
298 =cut
299
300 sub up_nodes() {
301 map $_->{id}, grep $_->{transport}, values %NODE
302 }
303
304 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
305
306 Registers a callback that is called each time a node goes up (a connection
307 is established) or down (the connection is lost).
308
309 Node up messages can only be followed by node down messages for the same
310 node, and vice versa.
311
312 Note that monitoring a node is usually better done by monitoring its node
313 port. This function is mainly of interest to modules that are concerned
314 about the network topology and low-level connection handling.
315
316 Callbacks I<must not> block and I<should not> send any messages.
317
318 The function returns an optional guard which can be used to unregister
319 the monitoring callback again.
320
321 Example: make sure you call function C<newnode> for all nodes that are up
322 or go up (and down).
323
324 newnode $_, 1 for up_nodes;
325 mon_nodes \&newnode;
326
327 =cut
328
329 our %MON_NODES;
330
331 sub mon_nodes($) {
332 my ($cb) = @_;
333
334 $MON_NODES{$cb+0} = $cb;
335
336 defined wantarray
337 and Guard::guard { delete $MON_NODES{$cb+0} }
338 }
339
340 sub _inject_nodeevent($$;@) {
341 my ($node, $up, @reason) = @_;
342
343 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
344
345 for my $cb (values %MON_NODES) {
346 eval { $cb->($node->{id}, $up, @reason); 1 }
347 or AE::log die => $@;
348 }
349 }
350
351 #############################################################################
352 # self node code
353
354 sub _kill {
355 my $port = shift;
356
357 delete $PORT{$port}
358 or return; # killing nonexistent ports is O.K.
359 delete $PORT_DATA{$port};
360
361 my $mon = delete $LMON{$port}
362 or !@_
363 or AE::log die => "unmonitored local port $port died with reason: @_";
364
365 $_->(@_) for values %$mon;
366 }
367
368 sub _monitor {
369 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
370 unless exists $PORT{$_[1]};
371
372 $LMON{$_[1]}{$_[2]+0} = $_[2];
373 }
374
375 sub _unmonitor {
376 delete $LMON{$_[1]}{$_[2]+0}
377 if exists $LMON{$_[1]};
378 }
379
380 sub _secure_check {
381 $SECURE
382 and die "remote execution not allowed\n";
383 }
384
385 our %NODE_REQ;
386
387 %NODE_REQ = (
388 # "mproto" - monitoring protocol
389
390 # monitoring
391 mon0 => sub { # stop monitoring a port for another node
392 my $portid = shift;
393 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
394 },
395 mon1 => sub { # start monitoring a port for another node
396 my $portid = shift;
397 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
398 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
399 delete $node->{rmon}{$portid};
400 $node->send (["", kil0 => $portid, @_])
401 if $node && $node->{transport};
402 };
403 },
404 # another node has killed a monitored port
405 kil0 => sub {
406 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
407 or return;
408
409 $_->(@_) for @$cbs;
410 },
411 # another node wants to kill a local port
412 kil1 => \&_kill,
413
414 # "public" services - not actually public
415
416 # relay message to another node / generic echo
417 snd => sub {
418 &snd
419 },
420 # ask if a node supports the given request, only works for fixed tags
421 can => sub {
422 my $method = shift;
423 snd @_, exists $NODE_REQ{$method};
424 },
425
426 # random utilities
427 eval => sub {
428 &_secure_check;
429 my @res = do { package main; eval shift };
430 snd @_, "$@", @res if @_;
431 },
432 time => sub {
433 snd @_, AE::now;
434 },
435 devnull => sub {
436 #
437 },
438 "" => sub {
439 # empty messages are keepalives or similar devnull-applications
440 },
441 );
442
443 # the node port
444 new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE
445
446 $PORT{""} = sub {
447 my $tag = shift;
448 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
449 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
450 };
451
452 our $MPROTO = 1;
453
454 # tell everybody who connects our nproto
455 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
456 $_[0]{local_greeting}{mproto} = $MPROTO;
457 };
458
459 #############################################################################
460 # seed management, try to keep connections to all seeds at all times
461
462 our %SEED_NODE; # seed ID => node ID|undef
463 our %NODE_SEED; # map node ID to seed ID
464 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
465 our $SEED_WATCHER;
466 our $SEED_RETRY;
467
468 sub seed_connect {
469 my ($seed) = @_;
470
471 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
472 or Carp::croak "$seed: unparsable seed address";
473
474 AE::log 9 => "trying connect to seed node $seed.";
475
476 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
477 $host, $port,
478 on_greeted => sub {
479 # called after receiving remote greeting, learn remote node name
480
481 # we rely on untrusted data here (the remote node name) this is
482 # hopefully ok, as this can at most be used for DOSing, which is easy
483 # when you can do MITM anyway.
484
485 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
486 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
487 require AnyEvent::MP::Global; # every seed becomes a global node currently
488 delete $SEED_NODE{$seed};
489 } else {
490 $SEED_NODE{$seed} = $_[0]{remote_node};
491 $NODE_SEED{$_[0]{remote_node}} = $seed;
492 # also start global service, if not running
493 # we need to check here in addition to the mon_nodes below
494 # because we might only learn late that a node is a seed
495 # and then we might already be connected
496 snd $_[0]{remote_node}, "g_slave"
497 unless $_[0]{remote_greeting}{global};
498 }
499 },
500 sub {
501 delete $SEED_CONNECT{$seed};
502 }
503 ;
504 }
505
506 sub seed_all {
507 my @seeds = grep
508 !exists $SEED_CONNECT{$_}
509 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
510 keys %SEED_NODE;
511
512 if (@seeds) {
513 # start connection attempt for every seed we are not connected to yet
514 seed_connect $_
515 for @seeds;
516
517 $SEED_RETRY = $SEED_RETRY * 2 + rand;
518 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
519 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
520
521 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
522
523 } else {
524 # all seeds connected or connecting, no need to restart timer
525 undef $SEED_WATCHER;
526 }
527 }
528
529 sub seed_again {
530 $SEED_RETRY = 1;
531 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
532 }
533
534 # sets new seed list, starts connecting
535 sub set_seeds(@) {
536 %SEED_NODE = ();
537 %NODE_SEED = ();
538 %SEED_CONNECT = ();
539
540 @SEED_NODE{@_} = ();
541
542 seed_all;
543 }
544
545 mon_nodes sub {
546 return unless exists $NODE_SEED{$_[0]};
547
548 if ($_[1]) {
549 # each time a connection to a seed node goes up, make
550 # sure it runs the global service.
551 snd $_[0], "g_slave"
552 unless $NODE{$_[0]}{transport}{remote_greeting}{global};
553 } else {
554 # if we lost the connection to a seed node, make sure we are seeding
555 seed_again;
556 }
557 };
558
559 #############################################################################
560 # keepalive code - used to kepe conenctions to certain nodes alive
561 # only used by global code atm., but ought to be exposed somehow.
562 #TODO: should probbaly be done directly by node objects
563
564 our $KEEPALIVE_RETRY;
565 our $KEEPALIVE_WATCHER;
566 our %KEEPALIVE; # we want to keep these nodes alive
567 our %KEEPALIVE_DOWN; # nodes that are down currently
568
569 sub keepalive_all {
570 AE::log 9 => "keepalive: trying to establish connections with: "
571 . (join " ", keys %KEEPALIVE_DOWN)
572 . ".";
573
574 (add_node $_)->connect
575 for keys %KEEPALIVE_DOWN;
576
577 $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2 + rand;
578 $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
579 if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
580
581 $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
582 }
583
584 sub keepalive_again {
585 $KEEPALIVE_RETRY = 1;
586 keepalive_all;
587 }
588
589 sub keepalive_add {
590 return if $KEEPALIVE{$_[0]}++;
591
592 return if node_is_up $_[0];
593 undef $KEEPALIVE_DOWN{$_[0]};
594 keepalive_again;
595 }
596
597 sub keepalive_del {
598 return if --$KEEPALIVE{$_[0]};
599
600 delete $KEEPALIVE {$_[0]};
601 delete $KEEPALIVE_DOWN{$_[0]};
602
603 undef $KEEPALIVE_WATCHER
604 unless %KEEPALIVE_DOWN;
605 }
606
607 mon_nodes sub {
608 return unless exists $KEEPALIVE{$_[0]};
609
610 if ($_[1]) {
611 delete $KEEPALIVE_DOWN{$_[0]};
612
613 undef $KEEPALIVE_WATCHER
614 unless %KEEPALIVE_DOWN;
615 } else {
616 # lost the conenction, try to connect again
617 undef $KEEPALIVE_DOWN{$_[0]};
618 keepalive_again;
619 }
620 };
621
622 #############################################################################
623 # talk with/to global nodes
624
625 # protocol messages:
626 #
627 # sent by all slave nodes (slave to master)
628 # g_slave database - make other global node master of the sender
629 #
630 # sent by any node to global nodes
631 # g_set database - set whole database
632 # g_upd family set del - update single family
633 # g_del family key - delete key from database
634 # g_get family key reply... - send reply with data
635 #
636 # send by global nodes
637 # g_global - node became global, similar to global=1 greeting
638 #
639 # database families
640 # "'l" -> node -> listeners
641 # "'g" -> node -> undef
642 # ...
643 #
644
645 # used on all nodes:
646 our $MASTER; # the global node we bind ourselves to
647 our $MASTER_MON;
648 our %LOCAL_DB; # this node database
649
650 our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
651
652 our $GPROTO = 1;
653
654 # tell everybody who connects our nproto
655 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
656 $_[0]{local_greeting}{gproto} = $GPROTO;
657 };
658
659 #############################################################################
660 # master selection
661
662 # master requests
663 our %GLOBAL_REQ; # $id => \@req
664
665 sub global_req_add {
666 my ($id, $req) = @_;
667
668 return if exists $GLOBAL_REQ{$id};
669
670 $GLOBAL_REQ{$id} = $req;
671
672 snd $MASTER, @$req
673 if $MASTER;
674 }
675
676 sub global_req_del {
677 delete $GLOBAL_REQ{$_[0]};
678 }
679
680 #################################
681 # master rpc
682
683 our %GLOBAL_RES;
684 our $GLOBAL_RES_ID = "a";
685
686 sub global_call {
687 my $id = ++$GLOBAL_RES_ID;
688 $GLOBAL_RES{$id} = pop;
689 global_req_add $id, [@_, $id];
690 }
691
692 $NODE_REQ{g_reply} = sub {
693 my $id = shift;
694 global_req_del $id;
695 my $cb = delete $GLOBAL_RES{$id}
696 or return;
697 &$cb
698 };
699
700 #################################
701
702 sub g_find {
703 global_req_add "g_find $_[0]", [g_find => $_[0]];
704 }
705
706 # reply for g_find started in Node.pm
707 $NODE_REQ{g_found} = sub {
708 global_req_del "g_find $_[0]";
709
710 my $node = $NODE{$_[0]} or return;
711
712 $node->connect_to ($_[1]);
713 };
714
715 sub master_set {
716 $MASTER = $_[0];
717
718 snd $MASTER, g_slave => \%LOCAL_DB;
719
720 # (re-)send queued requests
721 snd $MASTER, @$_
722 for values %GLOBAL_REQ;
723 }
724
725 sub master_search {
726 #TODO: should also look for other global nodes, but we don't know them #d#
727 for (keys %NODE_SEED) {
728 if (node_is_up $_) {
729 master_set $_;
730 return;
731 }
732 }
733
734 $MASTER_MON = mon_nodes sub {
735 return unless $_[1]; # we are only interested in node-ups
736 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
737
738 master_set $_[0];
739
740 $MASTER_MON = mon_nodes sub {
741 if ($_[0] eq $MASTER && !$_[1]) {
742 undef $MASTER;
743 master_search ();
744 }
745 };
746 };
747 }
748
749 # other node wants to make us the master
750 $NODE_REQ{g_slave} = sub {
751 my ($db) = @_;
752
753 # load global module and redo the request
754 require AnyEvent::MP::Global;
755 &{ $NODE_REQ{g_slave} }
756 };
757
758 #############################################################################
759 # local database operations
760
761 # local database management
762
763 sub db_set($$;$) {
764 my ($family, $subkey) = @_;
765
766 # if (ref $_[1]) {
767 # # bulk
768 # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
769 # $LOCAL_DB{$_[0]} = $_[1];
770 # snd $MASTER, g_upd => $_[0] => $_[1], \@del
771 # if defined $MASTER;
772 # } else {
773 # single-key
774 $LOCAL_DB{$family}{$subkey} = $_[2];
775 snd $MASTER, g_upd => $family => { $subkey => $_[2] }
776 if defined $MASTER;
777 # }
778
779 defined wantarray
780 and Guard::guard { db_del $family => $subkey }
781 }
782
783 sub db_del($@) {
784 my $family = shift;
785
786 my @del = grep exists $LOCAL_DB{$family}{$_}, @_;
787
788 return unless @del;
789
790 delete @{ $LOCAL_DB{$family} }{@del};
791 snd $MASTER, g_upd => $family => undef, \@del
792 if defined $MASTER;
793 }
794
795 # database query
796
797 sub db_family {
798 my ($family, $cb) = @_;
799 global_call g_db_family => $family, $cb;
800 }
801
802 sub db_keys {
803 my ($family, $cb) = @_;
804 global_call g_db_keys => $family, $cb;
805 }
806
807 sub db_values {
808 my ($family, $cb) = @_;
809 global_call g_db_values => $family, $cb;
810 }
811
812 # database monitoring
813
814 our %LOCAL_MON; # f, reply
815 our %MON_DB; # f, k, value
816
817 sub db_mon($@) {
818 my ($family, $cb) = @_;
819
820 if (my $db = $MON_DB{$family}) {
821 # we already monitor, so create a "dummy" change event
822 # this is postponed, which might be too late (we could process
823 # change events), so disable the callback at first
824 $LOCAL_MON{$family}{$cb+0} = sub { };
825 AE::postpone {
826 return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
827
828 # set actual callback
829 $LOCAL_MON{$family}{$cb+0} = $cb;
830 $cb->($db, [keys %$db]);
831 };
832 } else {
833 # new monitor, request chg1 from upstream
834 $LOCAL_MON{$family}{$cb+0} = $cb;
835 global_req_add "mon1 $family" => [g_mon1 => $family];
836 $MON_DB{$family} = {};
837 }
838
839 defined wantarray
840 and Guard::guard {
841 my $mon = $LOCAL_MON{$family};
842 delete $mon->{$cb+0};
843
844 unless (%$mon) {
845 global_req_del "mon1 $family";
846
847 # no global_req, because we don't care if we are not connected
848 snd $MASTER, g_mon0 => $family
849 if $MASTER;
850
851 delete $LOCAL_MON{$family};
852 delete $MON_DB{$family};
853 }
854 }
855 }
856
857 # full update
858 $NODE_REQ{g_chg1} = sub {
859 return unless $SRCNODE eq $MASTER;
860 my ($f, $ndb) = @_;
861
862 my $db = $MON_DB{$f};
863 my (@a, @c, @d);
864
865 # add or replace keys
866 while (my ($k, $v) = each %$ndb) {
867 exists $db->{$k}
868 ? push @c, $k
869 : push @a, $k;
870 $db->{$k} = $v;
871 }
872
873 # delete keys that are no longer present
874 for (grep !exists $ndb->{$_}, keys %$db) {
875 delete $db->{$_};
876 push @d, $_;
877 }
878
879 $_->($db, \@a, \@c, \@d)
880 for values %{ $LOCAL_MON{$_[0]} };
881 };
882
883 # incremental update
884 $NODE_REQ{g_chg2} = sub {
885 return unless $SRCNODE eq $MASTER;
886 my ($family, $set, $del) = @_;
887
888 my $db = $MON_DB{$family};
889
890 my (@a, @c);
891
892 while (my ($k, $v) = each %$set) {
893 exists $db->{$k}
894 ? push @c, $k
895 : push @a, $k;
896 $db->{$k} = $v;
897 }
898
899 delete @$db{@$del};
900
901 $_->($db, \@a, \@c, $del)
902 for values %{ $LOCAL_MON{$family} };
903 };
904
905 #############################################################################
906 # configure
907
908 sub nodename {
909 require POSIX;
910 (POSIX::uname ())[1]
911 }
912
913 sub _resolve($) {
914 my ($nodeid) = @_;
915
916 my $cv = AE::cv;
917 my @res;
918
919 $cv->begin (sub {
920 my %seen;
921 my @refs;
922 for (sort { $a->[0] <=> $b->[0] } @res) {
923 push @refs, $_->[1] unless $seen{$_->[1]}++
924 }
925 shift->send (@refs);
926 });
927
928 my $idx;
929 for my $t (split /,/, $nodeid) {
930 my $pri = ++$idx;
931
932 $t = length $t ? nodename . ":$t" : nodename
933 if $t =~ /^\d*$/;
934
935 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
936 or Carp::croak "$t: unparsable transport descriptor";
937
938 $port = "0" if $port eq "*";
939
940 if ($host eq "*") {
941 $cv->begin;
942
943 my $get_addr = sub {
944 my @addr;
945
946 require Net::Interface;
947
948 # Net::Interface hangs on some systems, so hope for the best
949 local $SIG{ALRM} = 'DEFAULT';
950 alarm 2;
951
952 for my $if (Net::Interface->interfaces) {
953 # we statically lower-prioritise ipv6 here, TODO :()
954 for $_ ($if->address (Net::Interface::AF_INET ())) {
955 next if /^\x7f/; # skip localhost etc.
956 push @addr, $_;
957 }
958 for ($if->address (Net::Interface::AF_INET6 ())) {
959 #next if $if->scope ($_) <= 2;
960 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
961 push @addr, $_;
962 }
963 }
964
965 alarm 0;
966
967 @addr
968 };
969
970 my @addr;
971
972 if (AnyEvent::WIN32) {
973 @addr = $get_addr->();
974 } else {
975 # use a child process, as Net::Interface is big, and we need it only once.
976
977 pipe my $r, my $w
978 or die "pipe: $!";
979
980 if (fork eq 0) {
981 close $r;
982 syswrite $w, pack "(C/a*)*", $get_addr->();
983 require POSIX;
984 POSIX::_exit (0);
985 } else {
986 close $w;
987
988 my $addr;
989
990 1 while sysread $r, $addr, 1024, length $addr;
991
992 @addr = unpack "(C/a*)*", $addr;
993 }
994 }
995
996 for my $ip (@addr) {
997 push @res, [
998 $pri += 1e-5,
999 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
1000 ];
1001 }
1002 $cv->end;
1003 } else {
1004 $cv->begin;
1005 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
1006 for (@_) {
1007 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
1008 push @res, [
1009 $pri += 1e-5,
1010 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
1011 ];
1012 }
1013 $cv->end;
1014 };
1015 }
1016 }
1017
1018 $cv->end;
1019
1020 $cv
1021 }
1022
1023 sub configure(@) {
1024 unshift @_, "profile" if @_ & 1;
1025 my (%kv) = @_;
1026
1027 my $profile = delete $kv{profile};
1028
1029 $profile = nodename
1030 unless defined $profile;
1031
1032 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
1033
1034 $SECURE = $CONFIG->{secure};
1035
1036 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
1037
1038 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
1039
1040 my $node_obj = delete $NODE{$NODE}; # we do not support doing stuff before configure
1041
1042 $NODE = $node;
1043
1044 $NODE =~ s/%n/nodename/ge;
1045
1046 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
1047 # nodes with randomised node names do not need randomised port names
1048 $UNIQ = "";
1049 }
1050
1051 $node_obj->{id} = $NODE;
1052 $NODE{$NODE} = $node_obj;
1053
1054 my $seeds = $CONFIG->{seeds};
1055 my $binds = $CONFIG->{binds};
1056
1057 $binds ||= ["*"];
1058
1059 AE::log 8 => "node $NODE starting up.";
1060
1061 $BINDS = [];
1062 %BINDS = ();
1063
1064 for (map _resolve $_, @$binds) {
1065 for my $bind ($_->recv) {
1066 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1067 or Carp::croak "$bind: unparsable local bind address";
1068
1069 my $listener = AnyEvent::MP::Transport::mp_server
1070 $host,
1071 $port,
1072 prepare => sub {
1073 my (undef, $host, $port) = @_;
1074 $bind = AnyEvent::Socket::format_hostport $host, $port;
1075 0
1076 },
1077 ;
1078 $BINDS{$bind} = $listener;
1079 push @$BINDS, $bind;
1080 }
1081 }
1082
1083 db_set "'l" => $NODE => $BINDS;
1084
1085 AE::log 8 => "node listens on [@$BINDS].";
1086
1087 # connect to all seednodes
1088 set_seeds map $_->recv, map _resolve $_, @$seeds;
1089 master_search;
1090
1091 # save gobs of memory
1092 undef &_resolve;
1093 *configure = sub (@){ };
1094
1095 for (@{ $CONFIG->{services} }) {
1096 if (ref) {
1097 my ($func, @args) = @$_;
1098 (load_func $func)->(@args);
1099 } elsif (s/::$//) {
1100 eval "require $_";
1101 die $@ if $@;
1102 } else {
1103 (load_func $_)->();
1104 }
1105 }
1106
1107 eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
1108 die "$@" if $@;
1109 }
1110
1111 =back
1112
1113 =head1 LOGGING
1114
1115 AnyEvent::MP::Kernel logs high-level information about the current node,
1116 when nodes go up and down, and most runtime errors. It also logs some
1117 debugging and trace messages about network maintainance, such as seed
1118 connections and global node management.
1119
1120 =head1 SEE ALSO
1121
1122 L<AnyEvent::MP>.
1123
1124 =head1 AUTHOR
1125
1126 Marc Lehmann <schmorp@schmorp.de>
1127 http://home.schmorp.de/
1128
1129 =cut
1130
1131 1
1132