ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.118
Committed: Thu Aug 31 15:39:52 2017 UTC (6 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-2_0
Changes since 1.117: +2 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 $AnyEvent::MP::Kernel::SRCNODE # contains msg origin node id, for debugging
10
11 snd_to_func $node, $func, @args # send msg to function
12 snd_on $node, @msg # snd message again (relay)
13 eval_on $node, $string[, @reply] # execute perl code on another node
14
15 node_is_up $nodeid # return true if a node is connected
16 @nodes = up_nodes # return a list of all connected nodes
17 $guard = mon_nodes $callback->($node, $is_up, @reason) # connections up/downs
18
19 =head1 DESCRIPTION
20
21 This module implements most of the inner workings of AnyEvent::MP. It
22 offers mostly lower-level functions that deal with network connectivity
23 and special requests.
24
25 You normally interface with AnyEvent::MP through a higher level interface
26 such as L<AnyEvent::MP> and L<Coro::MP>, although there is nothing wrong
27 with using the functions from this module.
28
29 =head1 GLOBALS AND FUNCTIONS
30
31 =over 4
32
33 =cut
34
35 package AnyEvent::MP::Kernel;
36
37 use common::sense;
38 use Carp ();
39
40 use AnyEvent ();
41 use Guard ();
42
43 use AnyEvent::MP::Node;
44 use AnyEvent::MP::Transport;
45
46 use base "Exporter";
47
48 # for re-export in AnyEvent::MP and Coro::MP
49 our @EXPORT_API = qw(
50 NODE $NODE
51 configure
52 node_of port_is_local
53 snd kil
54 db_set db_del
55 db_mon db_family db_keys db_values
56 );
57
58 our @EXPORT_OK = (
59 # these are internal
60 qw(
61 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
62 add_node load_func
63 ),
64 @EXPORT_API,
65 );
66
67 our @EXPORT = qw(
68 snd_to_func snd_on eval_on
69 port_is_local
70 up_nodes mon_nodes node_is_up
71 );
72
73 our @CARP_NOT = (AnyEvent::MP::);
74
75 sub load_func($) {
76 my $func = $_[0];
77
78 unless (defined &$func) {
79 my $pkg = $func;
80 do {
81 $pkg =~ s/::[^:]+$//
82 or return sub { die "unable to resolve function '$func'" };
83
84 local $@;
85 unless (eval "require $pkg; 1") {
86 my $error = $@;
87 $error =~ /^Can't locate .*.pm in \@INC \(/
88 or return sub { die $error };
89 }
90 } until defined &$func;
91 }
92
93 \&$func
94 }
95
96 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
97
98 sub nonce($) {
99 join "", map chr rand 256, 1 .. $_[0]
100 }
101
102 sub nonce62($) {
103 join "", map $alnum[rand 62], 1 .. $_[0]
104 }
105
106 our $CONFIG; # this node's configuration
107 our $SECURE;
108
109 our $RUNIQ; # remote uniq value
110 our $UNIQ; # per-process/node unique cookie
111 our $NODE;
112 our $ID = "a";
113
114 our %NODE; # node id to transport mapping, or "undef", for local node
115 our (%PORT, %PORT_DATA); # local ports
116
117 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
118 our %LMON; # monitored _local_ ports
119
120 #our $GLOBAL; # true if node is a global ("directory") node
121 our %BINDS;
122 our $BINDS; # our listeners, as arrayref
123
124 our $SRCNODE; # holds the sending node _object_ during _inject
125 our $GLOBAL; # true when this is a global node (only set by AnyEvent::MP::Global)
126
127 # initialise names for non-networked operation
128 {
129 # ~54 bits, for local port names, lowercase $ID appended
130 my $now = AE::now;
131 $UNIQ =
132 (join "",
133 map $alnum[$_],
134 $$ / 62 % 62,
135 $$ % 62,
136 (int $now ) % 62,
137 (int $now * 100) % 62,
138 (int $now * 10000) % 62,
139 ) . nonce62 4
140 ;
141
142 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
143 $RUNIQ = nonce62 10;
144 $RUNIQ =~ s/(.)$/\U$1/;
145
146 $NODE = "";
147 }
148
149 sub NODE() {
150 $NODE
151 }
152
153 sub node_of($) {
154 my ($node, undef) = split /#/, $_[0], 2;
155
156 $node
157 }
158
159 BEGIN {
160 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
161 ? sub () { 1 }
162 : sub () { 0 };
163 }
164
165 our $DELAY_TIMER;
166 our @DELAY_QUEUE;
167
168 our $delay_run = sub {
169 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
170 };
171
172 sub delay($) {
173 push @DELAY_QUEUE, shift;
174 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
175 }
176
177 =item $AnyEvent::MP::Kernel::SRCNODE
178
179 During execution of a message callback, this variable contains the node ID
180 of the origin node.
181
182 The main use of this variable is for debugging output - there are probably
183 very few other cases where you need to know the source node ID.
184
185 =cut
186
187 sub _inject {
188 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;
189
190 &{ $PORT{+shift} or return };
191 }
192
193 # this function adds a node-ref, so you can send stuff to it
194 # it is basically the central routing component.
195 sub add_node {
196 $NODE{$_[0]} || do {
197 my ($node) = @_;
198
199 length $node
200 or Carp::croak "'undef' or the empty string are not valid node/port IDs";
201
202 # registers itself in %NODE
203 new AnyEvent::MP::Node::Remote $node
204 }
205 }
206
207 sub snd(@) {
208 my ($nodeid, $portid) = split /#/, shift, 2;
209
210 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;
211
212 ($NODE{$nodeid} || add_node $nodeid)
213 ->{send} (["$portid", @_]);
214 }
215
216 sub port_is_local($) {
217 my ($nodeid, undef) = split /#/, $_[0], 2;
218
219 $nodeid eq $NODE
220 }
221
222 =item snd_to_func $node, $func, @args
223
224 Expects a node ID and a name of a function. Asynchronously tries to call
225 this function with the given arguments on that node.
226
227 This function can be used to implement C<spawn>-like interfaces.
228
229 =cut
230
231 sub snd_to_func($$;@) {
232 my $nodeid = shift;
233
234 # on $NODE, we artificially delay... (for spawn)
235 # this is very ugly - maybe we should simply delay ALL messages,
236 # to avoid deep recursion issues. but that's so... slow...
237 $AnyEvent::MP::Node::Self::DELAY = 1
238 if $nodeid ne $NODE;
239
240 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
241 }
242
243 =item snd_on $node, @msg
244
245 Executes C<snd> with the given C<@msg> (which must include the destination
246 port) on the given node.
247
248 =cut
249
250 sub snd_on($@) {
251 my $node = shift;
252 snd $node, snd => @_;
253 }
254
255 =item eval_on $node, $string[, @reply]
256
257 Evaluates the given string as Perl expression on the given node. When
258 @reply is specified, then it is used to construct a reply message with
259 C<"$@"> and any results from the eval appended.
260
261 =cut
262
263 sub eval_on($$;@) {
264 my $node = shift;
265 snd $node, eval => @_;
266 }
267
268 sub kil(@) {
269 my ($nodeid, $portid) = split /#/, shift, 2;
270
271 length $portid
272 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
273
274 ($NODE{$nodeid} || add_node $nodeid)
275 ->kill ("$portid", @_);
276 }
277
278 #############################################################################
279 # node monitoring and info
280
281 =item $bool = node_is_up $nodeid
282
283 Returns true if the given node is "up", that is, the kernel thinks it has
284 a working connection to it.
285
286 More precisely, if the node is up, returns C<1>. If the node is currently
287 connecting or otherwise known but not connected, returns C<0>. If nothing
288 is known about the node, returns C<undef>.
289
290 =cut
291
292 sub node_is_up($) {
293 ($_[0] eq $NODE) || ($NODE{$_[0]} or return)->{transport}
294 ? 1 : 0
295 }
296
297 =item @nodes = up_nodes
298
299 Return the node IDs of all nodes that are currently connected (excluding
300 the node itself).
301
302 =cut
303
304 sub up_nodes() {
305 map $_->{id}, grep $_->{transport}, values %NODE
306 }
307
308 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
309
310 Registers a callback that is called each time a node goes up (a connection
311 is established) or down (the connection is lost).
312
313 Node up messages can only be followed by node down messages for the same
314 node, and vice versa.
315
316 Note that monitoring a node is usually better done by monitoring its node
317 port. This function is mainly of interest to modules that are concerned
318 about the network topology and low-level connection handling.
319
320 Callbacks I<must not> block and I<should not> send any messages.
321
322 The function returns an optional guard which can be used to unregister
323 the monitoring callback again.
324
325 Example: make sure you call function C<newnode> for all nodes that are up
326 or go up (and down).
327
328 newnode $_, 1 for up_nodes;
329 mon_nodes \&newnode;
330
331 =cut
332
333 our %MON_NODES;
334
335 sub mon_nodes($) {
336 my ($cb) = @_;
337
338 $MON_NODES{$cb+0} = $cb;
339
340 defined wantarray
341 and Guard::guard { delete $MON_NODES{$cb+0} }
342 }
343
344 sub _inject_nodeevent($$;@) {
345 my ($node, $up, @reason) = @_;
346
347 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
348
349 for my $cb (values %MON_NODES) {
350 eval { $cb->($node->{id}, $up, @reason); 1 }
351 or AE::log die => $@;
352 }
353 }
354
355 #############################################################################
356 # self node code
357
358 sub _kill {
359 my $port = shift;
360
361 delete $PORT{$port}
362 or return; # killing nonexistent ports is O.K.
363 delete $PORT_DATA{$port};
364
365 my $mon = delete $LMON{$port}
366 or !@_
367 or AE::log die => "unmonitored local port $port died with reason: @_";
368
369 $_->(@_) for values %$mon;
370 }
371
372 sub _monitor {
373 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
374 unless exists $PORT{$_[1]};
375
376 $LMON{$_[1]}{$_[2]+0} = $_[2];
377 }
378
379 sub _unmonitor {
380 delete $LMON{$_[1]}{$_[2]+0}
381 if exists $LMON{$_[1]};
382 }
383
384 sub _secure_check {
385 $SECURE
386 and die "remote execution not allowed\n";
387 }
388
389 our %NODE_REQ;
390
391 %NODE_REQ = (
392 # "mproto" - monitoring protocol
393
394 # monitoring
395 mon0 => sub { # stop monitoring a port for another node
396 my $portid = shift;
397 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
398 },
399 mon1 => sub { # start monitoring a port for another node
400 my $portid = shift;
401 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
402 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
403 delete $node->{rmon}{$portid};
404 $node->send (["", kil0 => $portid, @_])
405 if $node && $node->{transport};
406 };
407 },
408 # another node has killed a monitored port
409 kil0 => sub {
410 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
411 or return;
412
413 $_->(@_) for @$cbs;
414 },
415 # another node wants to kill a local port
416 kil1 => \&_kill,
417
418 # "public" services - not actually public
419
420 # relay message to another node / generic echo
421 snd => sub {
422 &snd
423 },
424 # ask if a node supports the given request, only works for fixed tags
425 can => sub {
426 my $method = shift;
427 snd @_, exists $NODE_REQ{$method};
428 },
429
430 # random utilities
431 eval => sub {
432 &_secure_check;
433 my @res = do { package main; eval shift };
434 snd @_, "$@", @res if @_;
435 },
436 time => sub {
437 snd @_, AE::now;
438 },
439 devnull => sub {
440 #
441 },
442 "" => sub {
443 # empty messages are keepalives or similar devnull-applications
444 },
445 );
446
447 # the node port
448 new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE
449
450 $PORT{""} = sub {
451 my $tag = shift;
452 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
453 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
454 };
455
456 our $MPROTO = 1;
457
458 # tell everybody who connects our nproto
459 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
460 $_[0]{local_greeting}{mproto} = $MPROTO;
461 };
462
463 #############################################################################
464 # seed management, try to keep connections to all seeds at all times
465
466 our %SEED_NODE; # seed ID => node ID|undef
467 our %NODE_SEED; # map node ID to seed ID
468 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
469 our $SEED_WATCHER;
470 our $SEED_RETRY;
471 our %GLOBAL_NODE; # global => undef
472
473 sub seed_connect {
474 my ($seed) = @_;
475
476 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
477 or Carp::croak "$seed: unparsable seed address";
478
479 AE::log 9 => "trying connect to seed node $seed.";
480
481 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
482 $host, $port,
483 on_greeted => sub {
484 # called after receiving remote greeting, learn remote node name
485
486 # we rely on untrusted data here (the remote node name) this is
487 # hopefully ok, as this can at most be used for DOSing, which is easy
488 # when you can do MITM anyway.
489
490 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
491 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
492 require AnyEvent::MP::Global; # every seed becomes a global node currently
493 delete $SEED_NODE{$seed};
494 } else {
495 $SEED_NODE{$seed} = $_[0]{remote_node};
496 $NODE_SEED{$_[0]{remote_node}} = $seed;
497
498 # also start global service, in case it isn't running
499 # since we probably switch conenctions, maybe we don't need to do this here?
500 snd $_[0]{remote_node}, "g_slave";
501 }
502 },
503 sub {
504 delete $SEED_CONNECT{$seed};
505 }
506 ;
507 }
508
509 sub seed_all {
510 my @seeds = grep
511 !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
512 keys %SEED_NODE;
513
514 if (@seeds) {
515 # start connection attempt for every seed we are not connected to yet
516 seed_connect $_
517 for grep !exists $SEED_CONNECT{$_}, @seeds;
518
519 $SEED_RETRY = $SEED_RETRY * 2;
520 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
521 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
522
523 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
524
525 } else {
526 # all seeds connected or connecting, no need to restart timer
527 undef $SEED_WATCHER;
528 }
529 }
530
531 sub seed_again {
532 $SEED_RETRY = (1 + rand) * 0.6;
533 $SEED_WATCHER ||= AE::timer 0, 0, \&seed_all;
534 }
535
536 # sets new seed list, starts connecting
537 sub set_seeds(@) {
538 %SEED_NODE = ();
539 %NODE_SEED = ();
540 %SEED_CONNECT = ();
541
542 @SEED_NODE{@_} = ();
543
544 seed_again;
545 }
546
547 # normal nodes only record global node connections
548 $NODE_REQ{g_global} = sub {
549 undef $GLOBAL_NODE{$SRCNODE};
550 };
551
552 mon_nodes sub {
553 delete $GLOBAL_NODE{$_[0]}
554 unless $_[1];
555
556 return unless exists $NODE_SEED{$_[0]};
557
558 if ($_[1]) {
559 # each time a connection to a seed node goes up, make
560 # sure it runs the global service.
561 snd $_[0], "g_slave";
562 } else {
563 # if we lost the connection to a seed node, make sure we are seeding
564 seed_again;
565 }
566 };
567
568 #############################################################################
569 # keepalive code - used to kepe conenctions to certain nodes alive
570 # only used by global code atm., but ought to be exposed somehow.
571 #TODO: should probbaly be done directly by node objects
572
573 our $KEEPALIVE_RETRY;
574 our $KEEPALIVE_WATCHER;
575 our %KEEPALIVE; # we want to keep these nodes alive
576 our %KEEPALIVE_DOWN; # nodes that are down currently
577
578 sub keepalive_all {
579 AE::log 9 => "keepalive: trying to establish connections with: "
580 . (join " ", keys %KEEPALIVE_DOWN)
581 . ".";
582
583 (add_node $_)->connect
584 for keys %KEEPALIVE_DOWN;
585
586 $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2;
587 $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
588 if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
589
590 $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
591 }
592
593 sub keepalive_again {
594 $KEEPALIVE_RETRY = (1 + rand) * 0.3;
595 keepalive_all;
596 }
597
598 sub keepalive_add {
599 return if $KEEPALIVE{$_[0]}++;
600
601 return if node_is_up $_[0];
602 undef $KEEPALIVE_DOWN{$_[0]};
603 keepalive_again;
604 }
605
606 sub keepalive_del {
607 return if --$KEEPALIVE{$_[0]};
608
609 delete $KEEPALIVE {$_[0]};
610 delete $KEEPALIVE_DOWN{$_[0]};
611
612 undef $KEEPALIVE_WATCHER
613 unless %KEEPALIVE_DOWN;
614 }
615
616 mon_nodes sub {
617 return unless exists $KEEPALIVE{$_[0]};
618
619 if ($_[1]) {
620 delete $KEEPALIVE_DOWN{$_[0]};
621
622 undef $KEEPALIVE_WATCHER
623 unless %KEEPALIVE_DOWN;
624 } else {
625 # lost the conenction, try to connect again
626 undef $KEEPALIVE_DOWN{$_[0]};
627 keepalive_again;
628 }
629 };
630
631 #############################################################################
632 # talk with/to global nodes
633
634 # protocol messages:
635 #
636 # sent by global nodes
637 # g_global - global nodes send this to all others
638 #
639 # database protocol
640 # g_slave database - make other global node master of the sender
641 # g_set database - global node's database to other global nodes
642 # g_upd family set del - update single family (any to global)
643 #
644 # slave <-> global protocol
645 # g_find node - query addresses for node (slave to global)
646 # g_found node binds - node addresses (global to slave)
647 # g_db_family family id - send g_reply with data (global to slave)
648 # g_db_keys family id - send g_reply with data (global to slave)
649 # g_db_values family id - send g_reply with data (global to slave)
650 # g_reply id result - result of any query (global to slave)
651 # g_mon1 family - start to monitor family, replies with g_chg1
652 # g_mon0 family - stop monitoring family
653 # g_chg1 family hash - initial value of family when starting to monitor
654 # g_chg2 family set del - like g_upd, but for monitoring only
655 #
656 # internal database families:
657 # "'l" -> node -> listeners
658 # "'g" -> node -> undef
659 # ...
660 #
661
662 # used on all nodes:
663 our $MASTER; # the global node we bind ourselves to
664 our $MASTER_MON;
665 our %LOCAL_DB; # this node database
666
667 our $GPROTO = 1;
668
669 # tell everybody who connects our gproto
670 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
671 $_[0]{local_greeting}{gproto} = $GPROTO;
672 };
673
674 #############################################################################
675 # master selection
676
677 # master requests
678 our %GLOBAL_REQ; # $id => \@req
679
680 sub global_req_add {
681 my ($id, $req) = @_;
682
683 return if exists $GLOBAL_REQ{$id};
684
685 $GLOBAL_REQ{$id} = $req;
686
687 snd $MASTER, @$req
688 if $MASTER;
689 }
690
691 sub global_req_del {
692 delete $GLOBAL_REQ{$_[0]};
693 }
694
695 #################################
696 # master rpc
697
698 our %GLOBAL_RES;
699 our $GLOBAL_RES_ID = "a";
700
701 sub global_call {
702 my $id = ++$GLOBAL_RES_ID;
703 $GLOBAL_RES{$id} = pop;
704 global_req_add $id, [@_, $id];
705 }
706
707 $NODE_REQ{g_reply} = sub {
708 my $id = shift;
709 global_req_del $id;
710 my $cb = delete $GLOBAL_RES{$id}
711 or return;
712 &$cb
713 };
714
715 #################################
716
717 sub g_find {
718 global_req_add "g_find $_[0]", [g_find => $_[0]];
719 }
720
721 # reply for g_find started in Node.pm
722 $NODE_REQ{g_found} = sub {
723 global_req_del "g_find $_[0]";
724
725 my $node = $NODE{$_[0]} or return;
726
727 $node->connect_to ($_[1]);
728 };
729
730 sub master_set {
731 $MASTER = $_[0];
732 AE::log 8 => "new master node: $MASTER.";
733
734 $MASTER_MON = mon_nodes sub {
735 if ($_[0] eq $MASTER && !$_[1]) {
736 undef $MASTER;
737 master_search ();
738 }
739 };
740
741 snd $MASTER, g_slave => \%LOCAL_DB;
742
743 # (re-)send queued requests
744 snd $MASTER, @$_
745 for values %GLOBAL_REQ;
746 }
747
748 sub master_search {
749 AE::log 9 => "starting search for master node.";
750
751 #TODO: should also look for other global nodes, but we don't know them
752 for (keys %NODE_SEED) {
753 if (node_is_up $_) {
754 master_set $_;
755 return;
756 }
757 }
758
759 $MASTER_MON = mon_nodes sub {
760 return unless $_[1]; # we are only interested in node-ups
761 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
762
763 master_set $_[0];
764 };
765 }
766
767 # other node wants to make us the master, so start the global service
768 $NODE_REQ{g_slave} = sub {
769 # load global module and redo the request
770 require AnyEvent::MP::Global;
771 &{ $NODE_REQ{g_slave} }
772 };
773
774 #############################################################################
775 # local database operations
776
777 # canonical probably not needed
778 our $sv_eq_coder = JSON::XS->new->utf8->allow_nonref;
779
780 # are the two scalars equal? very very ugly and slow, need better way
781 sub sv_eq($$) {
782 ref $_[0] || ref $_[1]
783 ? (JSON::XS::encode $sv_eq_coder, $_[0]) eq (JSON::XS::encode $sv_eq_coder, $_[1])
784 : $_[0] eq $_[1]
785 && defined $_[0] == defined $_[1]
786 }
787
788 # local database management
789
790 sub db_del($@) {
791 my $family = shift;
792
793 my @del = grep exists $LOCAL_DB{$family}{$_}, @_;
794
795 return unless @del;
796
797 delete @{ $LOCAL_DB{$family} }{@del};
798 snd $MASTER, g_upd => $family => undef, \@del
799 if defined $MASTER;
800 }
801
802 sub db_set($$;$) {
803 my ($family, $subkey) = @_;
804
805 # if (ref $_[1]) {
806 # # bulk
807 # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
808 # $LOCAL_DB{$_[0]} = $_[1];
809 # snd $MASTER, g_upd => $_[0] => $_[1], \@del
810 # if defined $MASTER;
811 # } else {
812 # single-key
813 unless (exists $LOCAL_DB{$family}{$subkey} && sv_eq $LOCAL_DB{$family}{$subkey}, $_[2]) {
814 $LOCAL_DB{$family}{$subkey} = $_[2];
815 snd $MASTER, g_upd => $family => { $subkey => $_[2] }
816 if defined $MASTER;
817 }
818 # }
819
820 defined wantarray
821 and Guard::guard { db_del $family => $subkey }
822 }
823
824 # database query
825
826 sub db_family {
827 my ($family, $cb) = @_;
828 global_call g_db_family => $family, $cb;
829 }
830
831 sub db_keys {
832 my ($family, $cb) = @_;
833 global_call g_db_keys => $family, $cb;
834 }
835
836 sub db_values {
837 my ($family, $cb) = @_;
838 global_call g_db_values => $family, $cb;
839 }
840
841 # database monitoring
842
843 our %LOCAL_MON; # f, reply
844 our %MON_DB; # f, k, value
845
846 sub db_mon($@) {
847 my ($family, $cb) = @_;
848
849 if (my $db = $MON_DB{$family}) {
850 # we already monitor, so create a "dummy" change event
851 # this is postponed, which might be too late (we could process
852 # change events), so disable the callback at first
853 $LOCAL_MON{$family}{$cb+0} = sub { };
854 AE::postpone {
855 return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
856
857 # set actual callback
858 $LOCAL_MON{$family}{$cb+0} = $cb;
859 $cb->($db, [keys %$db]);
860 };
861 } else {
862 # new monitor, request chg1 from upstream
863 $LOCAL_MON{$family}{$cb+0} = $cb;
864 global_req_add "mon1 $family" => [g_mon1 => $family];
865 $MON_DB{$family} = {};
866 }
867
868 defined wantarray
869 and Guard::guard {
870 my $mon = $LOCAL_MON{$family};
871 delete $mon->{$cb+0};
872
873 unless (%$mon) {
874 global_req_del "mon1 $family";
875
876 # no global_req, because we don't care if we are not connected
877 snd $MASTER, g_mon0 => $family
878 if $MASTER;
879
880 delete $LOCAL_MON{$family};
881 delete $MON_DB{$family};
882 }
883 }
884 }
885
886 # full update
887 $NODE_REQ{g_chg1} = sub {
888 return unless $SRCNODE eq $MASTER;
889 my ($f, $ndb) = @_;
890
891 my $db = $MON_DB{$f};
892 my (@a, @c, @d);
893
894 # add or replace keys
895 while (my ($k, $v) = each %$ndb) {
896 exists $db->{$k}
897 ? push @c, $k
898 : push @a, $k;
899 $db->{$k} = $v;
900 }
901
902 # delete keys that are no longer present
903 for (grep !exists $ndb->{$_}, keys %$db) {
904 delete $db->{$_};
905 push @d, $_;
906 }
907
908 $_->($db, \@a, \@c, \@d)
909 for values %{ $LOCAL_MON{$_[0]} };
910 };
911
912 # incremental update
913 $NODE_REQ{g_chg2} = sub {
914 return unless $SRCNODE eq $MASTER;
915 my ($family, $set, $del) = @_;
916
917 my $db = $MON_DB{$family};
918
919 my (@a, @c);
920
921 while (my ($k, $v) = each %$set) {
922 exists $db->{$k}
923 ? push @c, $k
924 : push @a, $k;
925 $db->{$k} = $v;
926 }
927
928 delete @$db{@$del};
929
930 $_->($db, \@a, \@c, $del)
931 for values %{ $LOCAL_MON{$family} };
932 };
933
934 #############################################################################
935 # configure
936
937 sub nodename {
938 require POSIX;
939 (POSIX::uname ())[1]
940 }
941
942 sub _resolve($) {
943 my ($nodeid) = @_;
944
945 my $cv = AE::cv;
946 my @res;
947
948 $cv->begin (sub {
949 my %seen;
950 my @refs;
951 for (sort { $a->[0] <=> $b->[0] } @res) {
952 push @refs, $_->[1] unless $seen{$_->[1]}++
953 }
954 shift->send (@refs);
955 });
956
957 my $idx;
958 for my $t (split /,/, $nodeid) {
959 my $pri = ++$idx;
960
961 $t = length $t ? nodename . ":$t" : nodename
962 if $t =~ /^\d*$/;
963
964 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
965 or Carp::croak "$t: unparsable transport descriptor";
966
967 $port = "0" if $port eq "*";
968
969 if ($host eq "*") {
970 $cv->begin;
971
972 my $get_addr = sub {
973 my @addr;
974
975 require Net::Interface;
976
977 # Net::Interface hangs on some systems, so hope for the best
978 local $SIG{ALRM} = 'DEFAULT';
979 alarm 2;
980
981 for my $if (Net::Interface->interfaces) {
982 # we statically lower-prioritise ipv6 here, TODO :()
983 for $_ ($if->address (Net::Interface::AF_INET ())) {
984 next if /^\x7f/; # skip localhost etc.
985 push @addr, $_;
986 }
987 for ($if->address (Net::Interface::AF_INET6 ())) {
988 #next if $if->scope ($_) <= 2;
989 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
990 push @addr, $_;
991 }
992 }
993
994 alarm 0;
995
996 @addr
997 };
998
999 my @addr;
1000
1001 if (AnyEvent::WIN32) {
1002 @addr = $get_addr->();
1003 } else {
1004 # use a child process, as Net::Interface is big, and we need it only once.
1005
1006 pipe my $r, my $w
1007 or die "pipe: $!";
1008
1009 if (fork eq 0) {
1010 close $r;
1011 syswrite $w, pack "(C/a*)*", $get_addr->();
1012 require POSIX;
1013 POSIX::_exit (0);
1014 } else {
1015 close $w;
1016
1017 my $addr;
1018
1019 1 while sysread $r, $addr, 1024, length $addr;
1020
1021 @addr = unpack "(C/a*)*", $addr;
1022 }
1023 }
1024
1025 for my $ip (@addr) {
1026 push @res, [
1027 $pri += 1e-5,
1028 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
1029 ];
1030 }
1031 $cv->end;
1032 } else {
1033 $cv->begin;
1034 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
1035 for (@_) {
1036 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
1037 push @res, [
1038 $pri += 1e-5,
1039 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
1040 ];
1041 }
1042 $cv->end;
1043 };
1044 }
1045 }
1046
1047 $cv->end;
1048
1049 $cv
1050 }
1051
1052 our @POST_CONFIGURE;
1053
1054 # not yet documented
1055 sub post_configure(&) {
1056 die "AnyEvent::MP::Kernel::post_configure must be called in void context" if defined wantarray;
1057
1058 push @POST_CONFIGURE, @_;
1059 (shift @POST_CONFIGURE)->() while $NODE && @POST_CONFIGURE;
1060 }
1061
1062 sub configure(@) {
1063 unshift @_, "profile" if @_ & 1;
1064 my (%kv) = @_;
1065
1066 my $profile = delete $kv{profile};
1067
1068 $profile = nodename
1069 unless defined $profile;
1070
1071 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
1072
1073 $SECURE = $CONFIG->{secure};
1074
1075 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
1076
1077 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
1078
1079 my $node_obj = delete $NODE{$NODE}; # we do not support doing stuff before configure
1080
1081 $NODE = $node;
1082
1083 $NODE =~ s/%n/nodename/ge;
1084
1085 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
1086 # nodes with randomised node names do not need randomised port names
1087 $UNIQ = "";
1088 }
1089
1090 $node_obj->{id} = $NODE;
1091 $NODE{$NODE} = $node_obj;
1092
1093 my $seeds = $CONFIG->{seeds};
1094 my $binds = $CONFIG->{binds};
1095
1096 $binds ||= ["*"];
1097
1098 AE::log 8 => "node $NODE starting up.";
1099
1100 $BINDS = [];
1101 %BINDS = ();
1102
1103 for (map _resolve $_, @$binds) {
1104 for my $bind ($_->recv) {
1105 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1106 or Carp::croak "$bind: unparsable local bind address";
1107
1108 my $listener = AnyEvent::MP::Transport::mp_server
1109 $host,
1110 $port,
1111 prepare => sub {
1112 my (undef, $host, $port) = @_;
1113 $bind = AnyEvent::Socket::format_hostport $host, $port;
1114 0
1115 },
1116 ;
1117 $BINDS{$bind} = $listener;
1118 push @$BINDS, $bind;
1119 }
1120 }
1121
1122 AE::log 9 => "running post config hooks and init.";
1123
1124 # might initialise Global, so need to do it before db_set
1125 post_configure { };
1126
1127 db_set "'l" => $NODE => $BINDS;
1128
1129 AE::log 8 => "node listens on [@$BINDS].";
1130
1131 # connect to all seednodes
1132 set_seeds map $_->recv, map _resolve $_, @$seeds;
1133 master_search;
1134
1135 # save gobs of memory
1136 undef &_resolve;
1137 *configure = sub (@){ };
1138
1139 AE::log 9 => "starting services.";
1140
1141 for (@{ $CONFIG->{services} }) {
1142 if (ref) {
1143 my ($func, @args) = @$_;
1144 (load_func $func)->(@args);
1145 } elsif (s/::$//) {
1146 eval "require $_";
1147 die $@ if $@;
1148 } else {
1149 (load_func $_)->();
1150 }
1151 }
1152
1153 eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
1154 die "$@" if $@;
1155 }
1156
1157 =back
1158
1159 =head1 LOGGING
1160
1161 AnyEvent::MP::Kernel logs high-level information about the current node,
1162 when nodes go up and down, and most runtime errors. It also logs some
1163 debugging and trace messages about network maintainance, such as seed
1164 connections and global node management.
1165
1166 =head1 SEE ALSO
1167
1168 L<AnyEvent::MP>.
1169
1170 =head1 AUTHOR
1171
1172 Marc Lehmann <schmorp@schmorp.de>
1173 http://home.schmorp.de/
1174
1175 =cut
1176
1177 1
1178