ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.114
Committed: Tue Jun 28 11:38:51 2016 UTC (7 years, 11 months ago) by root
Branch: MAIN
Changes since 1.113: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 $AnyEvent::MP::Kernel::SRCNODE # contains msg origin node id, for debugging
10
11 snd_to_func $node, $func, @args # send msg to function
12 snd_on $node, @msg # snd message again (relay)
13 eval_on $node, $string[, @reply] # execute perl code on another node
14
15 node_is_up $nodeid # return true if a node is connected
16 @nodes = up_nodes # return a list of all connected nodes
17 $guard = mon_nodes $callback->($node, $is_up, @reason) # connections up/downs
18
19 =head1 DESCRIPTION
20
21 This module implements most of the inner workings of AnyEvent::MP. It
22 offers mostly lower-level functions that deal with network connectivity
23 and special requests.
24
25 You normally interface with AnyEvent::MP through a higher level interface
26 such as L<AnyEvent::MP> and L<Coro::MP>, although there is nothing wrong
27 with using the functions from this module.
28
29 =head1 GLOBALS AND FUNCTIONS
30
31 =over 4
32
33 =cut
34
35 package AnyEvent::MP::Kernel;
36
37 use common::sense;
38 use Carp ();
39
40 use AnyEvent ();
41 use Guard ();
42
43 use AnyEvent::MP::Node;
44 use AnyEvent::MP::Transport;
45
46 use base "Exporter";
47
48 # for re-export in AnyEvent::MP and Coro::MP
49 our @EXPORT_API = qw(
50 NODE $NODE
51 configure
52 node_of port_is_local
53 snd kil
54 db_set db_del
55 db_mon db_family db_keys db_values
56 );
57
58 our @EXPORT_OK = (
59 # these are internal
60 qw(
61 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
62 add_node load_func
63 ),
64 @EXPORT_API,
65 );
66
67 our @EXPORT = qw(
68 snd_to_func snd_on eval_on
69 port_is_local
70 up_nodes mon_nodes node_is_up
71 );
72
73 sub load_func($) {
74 my $func = $_[0];
75
76 unless (defined &$func) {
77 my $pkg = $func;
78 do {
79 $pkg =~ s/::[^:]+$//
80 or return sub { die "unable to resolve function '$func'" };
81
82 local $@;
83 unless (eval "require $pkg; 1") {
84 my $error = $@;
85 $error =~ /^Can't locate .*.pm in \@INC \(/
86 or return sub { die $error };
87 }
88 } until defined &$func;
89 }
90
91 \&$func
92 }
93
94 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
95
96 sub nonce($) {
97 join "", map chr rand 256, 1 .. $_[0]
98 }
99
100 sub nonce62($) {
101 join "", map $alnum[rand 62], 1 .. $_[0]
102 }
103
104 our $CONFIG; # this node's configuration
105 our $SECURE;
106
107 our $RUNIQ; # remote uniq value
108 our $UNIQ; # per-process/node unique cookie
109 our $NODE;
110 our $ID = "a";
111
112 our %NODE; # node id to transport mapping, or "undef", for local node
113 our (%PORT, %PORT_DATA); # local ports
114
115 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
116 our %LMON; # monitored _local_ ports
117
118 #our $GLOBAL; # true if node is a global ("directory") node
119 our %BINDS;
120 our $BINDS; # our listeners, as arrayref
121
122 our $SRCNODE; # holds the sending node _object_ during _inject
123 our $GLOBAL; # true when this is a global node (only set by AnyEvent::MP::Global)
124
125 # initialise names for non-networked operation
126 {
127 # ~54 bits, for local port names, lowercase $ID appended
128 my $now = AE::now;
129 $UNIQ =
130 (join "",
131 map $alnum[$_],
132 $$ / 62 % 62,
133 $$ % 62,
134 (int $now ) % 62,
135 (int $now * 100) % 62,
136 (int $now * 10000) % 62,
137 ) . nonce62 4
138 ;
139
140 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
141 $RUNIQ = nonce62 10;
142 $RUNIQ =~ s/(.)$/\U$1/;
143
144 $NODE = "";
145 }
146
147 sub NODE() {
148 $NODE
149 }
150
151 sub node_of($) {
152 my ($node, undef) = split /#/, $_[0], 2;
153
154 $node
155 }
156
157 BEGIN {
158 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
159 ? sub () { 1 }
160 : sub () { 0 };
161 }
162
163 our $DELAY_TIMER;
164 our @DELAY_QUEUE;
165
166 our $delay_run = sub {
167 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
168 };
169
170 sub delay($) {
171 push @DELAY_QUEUE, shift;
172 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
173 }
174
175 =item $AnyEvent::MP::Kernel::SRCNODE
176
177 During execution of a message callback, this variable contains the node ID
178 of the origin node.
179
180 The main use of this variable is for debugging output - there are probably
181 very few other cases where you need to know the source node ID.
182
183 =cut
184
185 sub _inject {
186 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
187 &{ $PORT{+shift} or return };
188 }
189
190 # this function adds a node-ref, so you can send stuff to it
191 # it is basically the central routing component.
192 sub add_node {
193 $NODE{$_[0]} || do {
194 my ($node) = @_;
195
196 length $node
197 or Carp::croak "'undef' or the empty string are not valid node/port IDs";
198
199 # registers itself in %NODE
200 new AnyEvent::MP::Node::Remote $node
201 }
202 }
203
204 sub snd(@) {
205 my ($nodeid, $portid) = split /#/, shift, 2;
206
207 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
208
209 ($NODE{$nodeid} || add_node $nodeid)
210 ->{send} (["$portid", @_]);
211 }
212
213 sub port_is_local($) {
214 my ($nodeid, undef) = split /#/, $_[0], 2;
215
216 $nodeid eq $NODE
217 }
218
219 =item snd_to_func $node, $func, @args
220
221 Expects a node ID and a name of a function. Asynchronously tries to call
222 this function with the given arguments on that node.
223
224 This function can be used to implement C<spawn>-like interfaces.
225
226 =cut
227
228 sub snd_to_func($$;@) {
229 my $nodeid = shift;
230
231 # on $NODE, we artificially delay... (for spawn)
232 # this is very ugly - maybe we should simply delay ALL messages,
233 # to avoid deep recursion issues. but that's so... slow...
234 $AnyEvent::MP::Node::Self::DELAY = 1
235 if $nodeid ne $NODE;
236
237 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
238 }
239
240 =item snd_on $node, @msg
241
242 Executes C<snd> with the given C<@msg> (which must include the destination
243 port) on the given node.
244
245 =cut
246
247 sub snd_on($@) {
248 my $node = shift;
249 snd $node, snd => @_;
250 }
251
252 =item eval_on $node, $string[, @reply]
253
254 Evaluates the given string as Perl expression on the given node. When
255 @reply is specified, then it is used to construct a reply message with
256 C<"$@"> and any results from the eval appended.
257
258 =cut
259
260 sub eval_on($$;@) {
261 my $node = shift;
262 snd $node, eval => @_;
263 }
264
265 sub kil(@) {
266 my ($nodeid, $portid) = split /#/, shift, 2;
267
268 length $portid
269 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
270
271 ($NODE{$nodeid} || add_node $nodeid)
272 ->kill ("$portid", @_);
273 }
274
275 #############################################################################
276 # node monitoring and info
277
278 =item $bool = node_is_up $nodeid
279
280 Returns true if the given node is "up", that is, the kernel thinks it has
281 a working connection to it.
282
283 More precisely, if the node is up, returns C<1>. If the node is currently
284 connecting or otherwise known but not connected, returns C<0>. If nothing
285 is known about the node, returns C<undef>.
286
287 =cut
288
289 sub node_is_up($) {
290 ($_[0] eq $NODE) || ($NODE{$_[0]} or return)->{transport}
291 ? 1 : 0
292 }
293
294 =item @nodes = up_nodes
295
296 Return the node IDs of all nodes that are currently connected (excluding
297 the node itself).
298
299 =cut
300
301 sub up_nodes() {
302 map $_->{id}, grep $_->{transport}, values %NODE
303 }
304
305 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
306
307 Registers a callback that is called each time a node goes up (a connection
308 is established) or down (the connection is lost).
309
310 Node up messages can only be followed by node down messages for the same
311 node, and vice versa.
312
313 Note that monitoring a node is usually better done by monitoring its node
314 port. This function is mainly of interest to modules that are concerned
315 about the network topology and low-level connection handling.
316
317 Callbacks I<must not> block and I<should not> send any messages.
318
319 The function returns an optional guard which can be used to unregister
320 the monitoring callback again.
321
322 Example: make sure you call function C<newnode> for all nodes that are up
323 or go up (and down).
324
325 newnode $_, 1 for up_nodes;
326 mon_nodes \&newnode;
327
328 =cut
329
330 our %MON_NODES;
331
332 sub mon_nodes($) {
333 my ($cb) = @_;
334
335 $MON_NODES{$cb+0} = $cb;
336
337 defined wantarray
338 and Guard::guard { delete $MON_NODES{$cb+0} }
339 }
340
341 sub _inject_nodeevent($$;@) {
342 my ($node, $up, @reason) = @_;
343
344 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
345
346 for my $cb (values %MON_NODES) {
347 eval { $cb->($node->{id}, $up, @reason); 1 }
348 or AE::log die => $@;
349 }
350 }
351
352 #############################################################################
353 # self node code
354
355 sub _kill {
356 my $port = shift;
357
358 delete $PORT{$port}
359 or return; # killing nonexistent ports is O.K.
360 delete $PORT_DATA{$port};
361
362 my $mon = delete $LMON{$port}
363 or !@_
364 or AE::log die => "unmonitored local port $port died with reason: @_";
365
366 $_->(@_) for values %$mon;
367 }
368
369 sub _monitor {
370 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
371 unless exists $PORT{$_[1]};
372
373 $LMON{$_[1]}{$_[2]+0} = $_[2];
374 }
375
376 sub _unmonitor {
377 delete $LMON{$_[1]}{$_[2]+0}
378 if exists $LMON{$_[1]};
379 }
380
381 sub _secure_check {
382 $SECURE
383 and die "remote execution not allowed\n";
384 }
385
386 our %NODE_REQ;
387
388 %NODE_REQ = (
389 # "mproto" - monitoring protocol
390
391 # monitoring
392 mon0 => sub { # stop monitoring a port for another node
393 my $portid = shift;
394 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
395 },
396 mon1 => sub { # start monitoring a port for another node
397 my $portid = shift;
398 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
399 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
400 delete $node->{rmon}{$portid};
401 $node->send (["", kil0 => $portid, @_])
402 if $node && $node->{transport};
403 };
404 },
405 # another node has killed a monitored port
406 kil0 => sub {
407 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
408 or return;
409
410 $_->(@_) for @$cbs;
411 },
412 # another node wants to kill a local port
413 kil1 => \&_kill,
414
415 # "public" services - not actually public
416
417 # relay message to another node / generic echo
418 snd => sub {
419 &snd
420 },
421 # ask if a node supports the given request, only works for fixed tags
422 can => sub {
423 my $method = shift;
424 snd @_, exists $NODE_REQ{$method};
425 },
426
427 # random utilities
428 eval => sub {
429 &_secure_check;
430 my @res = do { package main; eval shift };
431 snd @_, "$@", @res if @_;
432 },
433 time => sub {
434 snd @_, AE::now;
435 },
436 devnull => sub {
437 #
438 },
439 "" => sub {
440 # empty messages are keepalives or similar devnull-applications
441 },
442 );
443
444 # the node port
445 new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE
446
447 $PORT{""} = sub {
448 my $tag = shift;
449 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
450 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
451 };
452
453 our $MPROTO = 1;
454
455 # tell everybody who connects our nproto
456 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
457 $_[0]{local_greeting}{mproto} = $MPROTO;
458 };
459
460 #############################################################################
461 # seed management, try to keep connections to all seeds at all times
462
463 our %SEED_NODE; # seed ID => node ID|undef
464 our %NODE_SEED; # map node ID to seed ID
465 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
466 our $SEED_WATCHER;
467 our $SEED_RETRY;
468 our %GLOBAL_NODE; # global => undef
469
470 sub seed_connect {
471 my ($seed) = @_;
472
473 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
474 or Carp::croak "$seed: unparsable seed address";
475
476 AE::log 9 => "trying connect to seed node $seed.";
477
478 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
479 $host, $port,
480 on_greeted => sub {
481 # called after receiving remote greeting, learn remote node name
482
483 # we rely on untrusted data here (the remote node name) this is
484 # hopefully ok, as this can at most be used for DOSing, which is easy
485 # when you can do MITM anyway.
486
487 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
488 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
489 require AnyEvent::MP::Global; # every seed becomes a global node currently
490 delete $SEED_NODE{$seed};
491 } else {
492 $SEED_NODE{$seed} = $_[0]{remote_node};
493 $NODE_SEED{$_[0]{remote_node}} = $seed;
494
495 # also start global service, in case it isn't running
496 # since we probably switch conenctions, maybe we don't need to do this here?
497 snd $_[0]{remote_node}, "g_slave";
498 }
499 },
500 sub {
501 delete $SEED_CONNECT{$seed};
502 }
503 ;
504 }
505
506 sub seed_all {
507 my @seeds = grep
508 !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
509 keys %SEED_NODE;
510
511 if (@seeds) {
512 # start connection attempt for every seed we are not connected to yet
513 seed_connect $_
514 for grep !exists $SEED_CONNECT{$_}, @seeds;
515
516 $SEED_RETRY = $SEED_RETRY * 2;
517 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
518 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
519
520 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
521
522 } else {
523 # all seeds connected or connecting, no need to restart timer
524 undef $SEED_WATCHER;
525 }
526 }
527
528 sub seed_again {
529 $SEED_RETRY = (1 + rand) * 0.6;
530 $SEED_WATCHER ||= AE::timer 0, 0, \&seed_all;
531 }
532
533 # sets new seed list, starts connecting
534 sub set_seeds(@) {
535 %SEED_NODE = ();
536 %NODE_SEED = ();
537 %SEED_CONNECT = ();
538
539 @SEED_NODE{@_} = ();
540
541 seed_again;
542 }
543
544 # normal nodes only record global node connections
545 $NODE_REQ{g_global} = sub {
546 undef $GLOBAL_NODE{$SRCNODE};
547 };
548
549 mon_nodes sub {
550 delete $GLOBAL_NODE{$_[0]}
551 unless $_[1];
552
553 return unless exists $NODE_SEED{$_[0]};
554
555 if ($_[1]) {
556 # each time a connection to a seed node goes up, make
557 # sure it runs the global service.
558 snd $_[0], "g_slave";
559 } else {
560 # if we lost the connection to a seed node, make sure we are seeding
561 seed_again;
562 }
563 };
564
565 #############################################################################
566 # keepalive code - used to kepe conenctions to certain nodes alive
567 # only used by global code atm., but ought to be exposed somehow.
568 #TODO: should probbaly be done directly by node objects
569
570 our $KEEPALIVE_RETRY;
571 our $KEEPALIVE_WATCHER;
572 our %KEEPALIVE; # we want to keep these nodes alive
573 our %KEEPALIVE_DOWN; # nodes that are down currently
574
575 sub keepalive_all {
576 AE::log 9 => "keepalive: trying to establish connections with: "
577 . (join " ", keys %KEEPALIVE_DOWN)
578 . ".";
579
580 (add_node $_)->connect
581 for keys %KEEPALIVE_DOWN;
582
583 $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2;
584 $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
585 if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
586
587 $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
588 }
589
590 sub keepalive_again {
591 $KEEPALIVE_RETRY = (1 + rand) * 0.3;
592 keepalive_all;
593 }
594
595 sub keepalive_add {
596 return if $KEEPALIVE{$_[0]}++;
597
598 return if node_is_up $_[0];
599 undef $KEEPALIVE_DOWN{$_[0]};
600 keepalive_again;
601 }
602
603 sub keepalive_del {
604 return if --$KEEPALIVE{$_[0]};
605
606 delete $KEEPALIVE {$_[0]};
607 delete $KEEPALIVE_DOWN{$_[0]};
608
609 undef $KEEPALIVE_WATCHER
610 unless %KEEPALIVE_DOWN;
611 }
612
613 mon_nodes sub {
614 return unless exists $KEEPALIVE{$_[0]};
615
616 if ($_[1]) {
617 delete $KEEPALIVE_DOWN{$_[0]};
618
619 undef $KEEPALIVE_WATCHER
620 unless %KEEPALIVE_DOWN;
621 } else {
622 # lost the conenction, try to connect again
623 undef $KEEPALIVE_DOWN{$_[0]};
624 keepalive_again;
625 }
626 };
627
628 #############################################################################
629 # talk with/to global nodes
630
631 # protocol messages:
632 #
633 # sent by global nodes
634 # g_global - global nodes send this to all others
635 #
636 # database protocol
637 # g_slave database - make other global node master of the sender
638 # g_set database - global node's database to other global nodes
639 # g_upd family set del - update single family (any to global)
640 #
641 # slave <-> global protocol
642 # g_find node - query addresses for node (slave to global)
643 # g_found node binds - node addresses (global to slave)
644 # g_db_family family id - send g_reply with data (global to slave)
645 # g_db_keys family id - send g_reply with data (global to slave)
646 # g_db_values family id - send g_reply with data (global to slave)
647 # g_reply id result - result of any query (global to slave)
648 # g_mon1 family - start to monitor family, replies with g_chg1
649 # g_mon0 family - stop monitoring family
650 # g_chg1 family hash - initial value of family when starting to monitor
651 # g_chg2 family set del - like g_upd, but for monitoring only
652 #
653 # internal database families:
654 # "'l" -> node -> listeners
655 # "'g" -> node -> undef
656 # ...
657 #
658
659 # used on all nodes:
660 our $MASTER; # the global node we bind ourselves to
661 our $MASTER_MON;
662 our %LOCAL_DB; # this node database
663
664 our $GPROTO = 1;
665
666 # tell everybody who connects our nproto
667 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
668 $_[0]{local_greeting}{gproto} = $GPROTO;
669 };
670
671 #############################################################################
672 # master selection
673
674 # master requests
675 our %GLOBAL_REQ; # $id => \@req
676
677 sub global_req_add {
678 my ($id, $req) = @_;
679
680 return if exists $GLOBAL_REQ{$id};
681
682 $GLOBAL_REQ{$id} = $req;
683
684 snd $MASTER, @$req
685 if $MASTER;
686 }
687
688 sub global_req_del {
689 delete $GLOBAL_REQ{$_[0]};
690 }
691
692 #################################
693 # master rpc
694
695 our %GLOBAL_RES;
696 our $GLOBAL_RES_ID = "a";
697
698 sub global_call {
699 my $id = ++$GLOBAL_RES_ID;
700 $GLOBAL_RES{$id} = pop;
701 global_req_add $id, [@_, $id];
702 }
703
704 $NODE_REQ{g_reply} = sub {
705 my $id = shift;
706 global_req_del $id;
707 my $cb = delete $GLOBAL_RES{$id}
708 or return;
709 &$cb
710 };
711
712 #################################
713
714 sub g_find {
715 global_req_add "g_find $_[0]", [g_find => $_[0]];
716 }
717
718 # reply for g_find started in Node.pm
719 $NODE_REQ{g_found} = sub {
720 global_req_del "g_find $_[0]";
721
722 my $node = $NODE{$_[0]} or return;
723
724 $node->connect_to ($_[1]);
725 };
726
727 sub master_set {
728 $MASTER = $_[0];
729 AE::log 8 => "new master node: $MASTER.";
730
731 $MASTER_MON = mon_nodes sub {
732 if ($_[0] eq $MASTER && !$_[1]) {
733 undef $MASTER;
734 master_search ();
735 }
736 };
737
738 snd $MASTER, g_slave => \%LOCAL_DB;
739
740 # (re-)send queued requests
741 snd $MASTER, @$_
742 for values %GLOBAL_REQ;
743 }
744
745 sub master_search {
746 AE::log 9 => "starting search for master node.";
747
748 #TODO: should also look for other global nodes, but we don't know them #d#
749 for (keys %NODE_SEED) {
750 if (node_is_up $_) {
751 master_set $_;
752 return;
753 }
754 }
755
756 $MASTER_MON = mon_nodes sub {
757 return unless $_[1]; # we are only interested in node-ups
758 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
759
760 master_set $_[0];
761 };
762 }
763
764 # other node wants to make us the master, so start the global service
765 $NODE_REQ{g_slave} = sub {
766 # load global module and redo the request
767 require AnyEvent::MP::Global;
768 &{ $NODE_REQ{g_slave} }
769 };
770
771 #############################################################################
772 # local database operations
773
774 # canonical probably not needed
775 our $sv_eq_coder = JSON::XS->new->utf8->allow_nonref;
776
777 # are the two scalars equal? very very ugly and slow, need better way
778 sub sv_eq($$) {
779 ref $_[0] || ref $_[1]
780 ? (JSON::XS::encode $sv_eq_coder, $_[0]) eq (JSON::XS::encode $sv_eq_coder, $_[1])
781 : $_[0] eq $_[1]
782 && defined $_[0] == defined $_[1]
783 }
784
785 # local database management
786
787 sub db_del($@) {
788 my $family = shift;
789
790 my @del = grep exists $LOCAL_DB{$family}{$_}, @_;
791
792 return unless @del;
793
794 delete @{ $LOCAL_DB{$family} }{@del};
795 snd $MASTER, g_upd => $family => undef, \@del
796 if defined $MASTER;
797 }
798
799 sub db_set($$;$) {
800 my ($family, $subkey) = @_;
801
802 # if (ref $_[1]) {
803 # # bulk
804 # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
805 # $LOCAL_DB{$_[0]} = $_[1];
806 # snd $MASTER, g_upd => $_[0] => $_[1], \@del
807 # if defined $MASTER;
808 # } else {
809 # single-key
810 unless (exists $LOCAL_DB{$family}{$subkey} && sv_eq $LOCAL_DB{$family}{$subkey}, $_[2]) {
811 $LOCAL_DB{$family}{$subkey} = $_[2];
812 snd $MASTER, g_upd => $family => { $subkey => $_[2] }
813 if defined $MASTER;
814 }
815 # }
816
817 defined wantarray
818 and Guard::guard { db_del $family => $subkey }
819 }
820
821 # database query
822
823 sub db_family {
824 my ($family, $cb) = @_;
825 global_call g_db_family => $family, $cb;
826 }
827
828 sub db_keys {
829 my ($family, $cb) = @_;
830 global_call g_db_keys => $family, $cb;
831 }
832
833 sub db_values {
834 my ($family, $cb) = @_;
835 global_call g_db_values => $family, $cb;
836 }
837
838 # database monitoring
839
840 our %LOCAL_MON; # f, reply
841 our %MON_DB; # f, k, value
842
843 sub db_mon($@) {
844 my ($family, $cb) = @_;
845
846 if (my $db = $MON_DB{$family}) {
847 # we already monitor, so create a "dummy" change event
848 # this is postponed, which might be too late (we could process
849 # change events), so disable the callback at first
850 $LOCAL_MON{$family}{$cb+0} = sub { };
851 AE::postpone {
852 return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
853
854 # set actual callback
855 $LOCAL_MON{$family}{$cb+0} = $cb;
856 $cb->($db, [keys %$db]);
857 };
858 } else {
859 # new monitor, request chg1 from upstream
860 $LOCAL_MON{$family}{$cb+0} = $cb;
861 global_req_add "mon1 $family" => [g_mon1 => $family];
862 $MON_DB{$family} = {};
863 }
864
865 defined wantarray
866 and Guard::guard {
867 my $mon = $LOCAL_MON{$family};
868 delete $mon->{$cb+0};
869
870 unless (%$mon) {
871 global_req_del "mon1 $family";
872
873 # no global_req, because we don't care if we are not connected
874 snd $MASTER, g_mon0 => $family
875 if $MASTER;
876
877 delete $LOCAL_MON{$family};
878 delete $MON_DB{$family};
879 }
880 }
881 }
882
883 # full update
884 $NODE_REQ{g_chg1} = sub {
885 return unless $SRCNODE eq $MASTER;
886 my ($f, $ndb) = @_;
887
888 my $db = $MON_DB{$f};
889 my (@a, @c, @d);
890
891 # add or replace keys
892 while (my ($k, $v) = each %$ndb) {
893 exists $db->{$k}
894 ? push @c, $k
895 : push @a, $k;
896 $db->{$k} = $v;
897 }
898
899 # delete keys that are no longer present
900 for (grep !exists $ndb->{$_}, keys %$db) {
901 delete $db->{$_};
902 push @d, $_;
903 }
904
905 $_->($db, \@a, \@c, \@d)
906 for values %{ $LOCAL_MON{$_[0]} };
907 };
908
909 # incremental update
910 $NODE_REQ{g_chg2} = sub {
911 return unless $SRCNODE eq $MASTER;
912 my ($family, $set, $del) = @_;
913
914 my $db = $MON_DB{$family};
915
916 my (@a, @c);
917
918 while (my ($k, $v) = each %$set) {
919 exists $db->{$k}
920 ? push @c, $k
921 : push @a, $k;
922 $db->{$k} = $v;
923 }
924
925 delete @$db{@$del};
926
927 $_->($db, \@a, \@c, $del)
928 for values %{ $LOCAL_MON{$family} };
929 };
930
931 #############################################################################
932 # configure
933
934 sub nodename {
935 require POSIX;
936 (POSIX::uname ())[1]
937 }
938
939 sub _resolve($) {
940 my ($nodeid) = @_;
941
942 my $cv = AE::cv;
943 my @res;
944
945 $cv->begin (sub {
946 my %seen;
947 my @refs;
948 for (sort { $a->[0] <=> $b->[0] } @res) {
949 push @refs, $_->[1] unless $seen{$_->[1]}++
950 }
951 shift->send (@refs);
952 });
953
954 my $idx;
955 for my $t (split /,/, $nodeid) {
956 my $pri = ++$idx;
957
958 $t = length $t ? nodename . ":$t" : nodename
959 if $t =~ /^\d*$/;
960
961 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
962 or Carp::croak "$t: unparsable transport descriptor";
963
964 $port = "0" if $port eq "*";
965
966 if ($host eq "*") {
967 $cv->begin;
968
969 my $get_addr = sub {
970 my @addr;
971
972 require Net::Interface;
973
974 # Net::Interface hangs on some systems, so hope for the best
975 local $SIG{ALRM} = 'DEFAULT';
976 alarm 2;
977
978 for my $if (Net::Interface->interfaces) {
979 # we statically lower-prioritise ipv6 here, TODO :()
980 for $_ ($if->address (Net::Interface::AF_INET ())) {
981 next if /^\x7f/; # skip localhost etc.
982 push @addr, $_;
983 }
984 for ($if->address (Net::Interface::AF_INET6 ())) {
985 #next if $if->scope ($_) <= 2;
986 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
987 push @addr, $_;
988 }
989 }
990
991 alarm 0;
992
993 @addr
994 };
995
996 my @addr;
997
998 if (AnyEvent::WIN32) {
999 @addr = $get_addr->();
1000 } else {
1001 # use a child process, as Net::Interface is big, and we need it only once.
1002
1003 pipe my $r, my $w
1004 or die "pipe: $!";
1005
1006 if (fork eq 0) {
1007 close $r;
1008 syswrite $w, pack "(C/a*)*", $get_addr->();
1009 require POSIX;
1010 POSIX::_exit (0);
1011 } else {
1012 close $w;
1013
1014 my $addr;
1015
1016 1 while sysread $r, $addr, 1024, length $addr;
1017
1018 @addr = unpack "(C/a*)*", $addr;
1019 }
1020 }
1021
1022 for my $ip (@addr) {
1023 push @res, [
1024 $pri += 1e-5,
1025 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
1026 ];
1027 }
1028 $cv->end;
1029 } else {
1030 $cv->begin;
1031 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
1032 for (@_) {
1033 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
1034 push @res, [
1035 $pri += 1e-5,
1036 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
1037 ];
1038 }
1039 $cv->end;
1040 };
1041 }
1042 }
1043
1044 $cv->end;
1045
1046 $cv
1047 }
1048
1049 our @POST_CONFIGURE;
1050
1051 # not yet documented
1052 sub post_configure(&) {
1053 die "AnyEvent::MP::Kernel::post_configure must be called in void context" if defined wantarray;
1054
1055 push @POST_CONFIGURE, @_;
1056 (shift @POST_CONFIGURE)->() while $NODE && @POST_CONFIGURE;
1057 }
1058
1059 sub configure(@) {
1060 unshift @_, "profile" if @_ & 1;
1061 my (%kv) = @_;
1062
1063 my $profile = delete $kv{profile};
1064
1065 $profile = nodename
1066 unless defined $profile;
1067
1068 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
1069
1070 $SECURE = $CONFIG->{secure};
1071
1072 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
1073
1074 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
1075
1076 my $node_obj = delete $NODE{$NODE}; # we do not support doing stuff before configure
1077
1078 $NODE = $node;
1079
1080 $NODE =~ s/%n/nodename/ge;
1081
1082 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
1083 # nodes with randomised node names do not need randomised port names
1084 $UNIQ = "";
1085 }
1086
1087 $node_obj->{id} = $NODE;
1088 $NODE{$NODE} = $node_obj;
1089
1090 my $seeds = $CONFIG->{seeds};
1091 my $binds = $CONFIG->{binds};
1092
1093 $binds ||= ["*"];
1094
1095 AE::log 8 => "node $NODE starting up.";
1096
1097 $BINDS = [];
1098 %BINDS = ();
1099
1100 for (map _resolve $_, @$binds) {
1101 for my $bind ($_->recv) {
1102 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1103 or Carp::croak "$bind: unparsable local bind address";
1104
1105 my $listener = AnyEvent::MP::Transport::mp_server
1106 $host,
1107 $port,
1108 prepare => sub {
1109 my (undef, $host, $port) = @_;
1110 $bind = AnyEvent::Socket::format_hostport $host, $port;
1111 0
1112 },
1113 ;
1114 $BINDS{$bind} = $listener;
1115 push @$BINDS, $bind;
1116 }
1117 }
1118
1119 AE::log 9 => "running post config hooks and init.";
1120
1121 # might initialise Global, so need to do it before db_set
1122 post_configure { };
1123
1124 db_set "'l" => $NODE => $BINDS;
1125
1126 AE::log 8 => "node listens on [@$BINDS].";
1127
1128 # connect to all seednodes
1129 set_seeds map $_->recv, map _resolve $_, @$seeds;
1130 master_search;
1131
1132 # save gobs of memory
1133 undef &_resolve;
1134 *configure = sub (@){ };
1135
1136 AE::log 9 => "starting services.";
1137
1138 for (@{ $CONFIG->{services} }) {
1139 if (ref) {
1140 my ($func, @args) = @$_;
1141 (load_func $func)->(@args);
1142 } elsif (s/::$//) {
1143 eval "require $_";
1144 die $@ if $@;
1145 } else {
1146 (load_func $_)->();
1147 }
1148 }
1149
1150 eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
1151 die "$@" if $@;
1152 }
1153
1154 =back
1155
1156 =head1 LOGGING
1157
1158 AnyEvent::MP::Kernel logs high-level information about the current node,
1159 when nodes go up and down, and most runtime errors. It also logs some
1160 debugging and trace messages about network maintainance, such as seed
1161 connections and global node management.
1162
1163 =head1 SEE ALSO
1164
1165 L<AnyEvent::MP>.
1166
1167 =head1 AUTHOR
1168
1169 Marc Lehmann <schmorp@schmorp.de>
1170 http://home.schmorp.de/
1171
1172 =cut
1173
1174 1
1175