ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.120
Committed: Fri Nov 15 09:47:39 2019 UTC (4 years, 6 months ago) by root
Branch: MAIN
CVS Tags: HEAD
Changes since 1.119: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 $AnyEvent::MP::Kernel::SRCNODE # contains msg origin node id, for debugging
10
11 snd_to_func $node, $func, @args # send msg to function
12 snd_on $node, @msg # snd message again (relay)
13 eval_on $node, $string[, @reply] # execute perl code on another node
14
15 node_is_up $nodeid # return true if a node is connected
16 @nodes = up_nodes # return a list of all connected nodes
17 $guard = mon_nodes $callback->($node, $is_up, @reason) # connections up/downs
18
19 =head1 DESCRIPTION
20
21 This module implements most of the inner workings of AnyEvent::MP. It
22 offers mostly lower-level functions that deal with network connectivity
23 and special requests.
24
25 You normally interface with AnyEvent::MP through a higher level interface
26 such as L<AnyEvent::MP> and L<Coro::MP>, although there is nothing wrong
27 with using the functions from this module.
28
29 =head1 GLOBALS AND FUNCTIONS
30
31 =over 4
32
33 =cut
34
35 package AnyEvent::MP::Kernel;
36
37 use common::sense;
38 use Carp ();
39
40 use AnyEvent ();
41 use Guard ();
42
43 use AnyEvent::MP::Node;
44 use AnyEvent::MP::Transport;
45
46 use base "Exporter";
47
48 # for re-export in AnyEvent::MP and Coro::MP
49 our @EXPORT_API = qw(
50 NODE $NODE
51 configure
52 node_of port_is_local
53 snd kil
54 db_set db_del
55 db_mon db_family db_keys db_values
56 );
57
58 our @EXPORT_OK = (
59 # these are internal
60 qw(
61 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
62 add_node load_func
63 ),
64 @EXPORT_API,
65 );
66
67 our @EXPORT = qw(
68 snd_to_func snd_on eval_on
69 port_is_local
70 up_nodes mon_nodes node_is_up
71 );
72
73 our @CARP_NOT = (AnyEvent::MP::);
74
75 sub load_func($) {
76 my $func = $_[0];
77
78 unless (defined &$func) {
79 my $pkg = $func;
80 do {
81 $pkg =~ s/::[^:]+$//
82 or return sub { die "unable to resolve function '$func'" };
83
84 local $@;
85 unless (eval "require $pkg; 1") {
86 my $error = $@;
87 $error =~ /^Can't locate .*.pm in \@INC \(/
88 or return sub { die $error };
89 }
90 } until defined &$func;
91 }
92
93 \&$func
94 }
95
96 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
97
98 sub nonce($) {
99 join "", map chr rand 256, 1 .. $_[0]
100 }
101
102 sub nonce62($) {
103 join "", map $alnum[rand 62], 1 .. $_[0]
104 }
105
106 our $CONFIG; # this node's configuration
107 our $SECURE;
108
109 our $RUNIQ; # remote uniq value
110 our $UNIQ; # per-process/node unique cookie
111 our $NODE;
112 our $ID = "a";
113
114 our %NODE; # node id to transport mapping, or "undef", for local node
115 our (%PORT, %PORT_DATA); # local ports
116
117 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
118 our %LMON; # monitored _local_ ports
119
120 #our $GLOBAL; # true if node is a global ("directory") node
121 our %BINDS;
122 our $BINDS; # our listeners, as arrayref
123
124 our $SRCNODE; # holds the sending node _object_ during _inject
125 our $GLOBAL; # true when this is a global node (only set by AnyEvent::MP::Global)
126
127 # initialise names for non-networked operation
128 {
129 # ~54 bits, for local port names, lowercase $ID appended
130 my $now = AE::now;
131 $UNIQ =
132 (join "",
133 map $alnum[$_],
134 $$ / 62 % 62,
135 $$ % 62,
136 (int $now ) % 62,
137 (int $now * 100) % 62,
138 (int $now * 10000) % 62,
139 ) . nonce62 4
140 ;
141
142 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
143 $RUNIQ = nonce62 10;
144 $RUNIQ =~ s/(.)$/\U$1/;
145
146 $NODE = "";
147 }
148
149 sub NODE() {
150 $NODE
151 }
152
153 sub node_of($) {
154 my ($node, undef) = split /#/, $_[0], 2;
155
156 $node
157 }
158
159 BEGIN {
160 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
161 ? sub () { 1 }
162 : sub () { 0 };
163 }
164
165 our $DELAY_TIMER;
166 our @DELAY_QUEUE;
167
168 our $delay_run = sub {
169 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
170 };
171
172 sub delay($) {
173 push @DELAY_QUEUE, shift;
174 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
175 }
176
177 =item $AnyEvent::MP::Kernel::SRCNODE
178
179 During execution of a message callback, this variable contains the node ID
180 of the origin node.
181
182 The main use of this variable is for debugging output - there are probably
183 very few other cases where you need to know the source node ID.
184
185 =cut
186
187 sub _inject {
188 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;
189
190 &{ $PORT{+shift} or return };
191 }
192
193 # this function adds a node-ref, so you can send stuff to it
194 # it is basically the central routing component.
195 sub add_node {
196 $NODE{$_[0]} || do {
197 my ($node) = @_;
198
199 length $node
200 or Carp::croak "'undef' or the empty string are not valid node/port IDs";
201
202 # registers itself in %NODE
203 new AnyEvent::MP::Node::Remote $node
204 }
205 }
206
207 sub snd(@) {
208 my ($nodeid, $portid) = split /#/, shift, 2;
209
210 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (["$portid", @_]) } . "\n" if TRACE && @_;
211
212 ($NODE{$nodeid} || add_node $nodeid)
213 ->{send} (["$portid", @_]);
214 }
215
216 sub port_is_local($) {
217 my ($nodeid, undef) = split /#/, $_[0], 2;
218
219 $nodeid eq $NODE
220 }
221
222 =item snd_to_func $node, $func, @args
223
224 Expects a node ID and a name of a function. Asynchronously tries to call
225 this function with the given arguments on that node.
226
227 This function can be used to implement C<spawn>-like interfaces.
228
229 =cut
230
231 sub snd_to_func($$;@) {
232 my $nodeid = shift;
233
234 # on $NODE, we artificially delay... (for spawn)
235 # this is very ugly - maybe we should simply delay ALL messages,
236 # to avoid deep recursion issues. but that's so... slow...
237 $AnyEvent::MP::Node::Self::DELAY = 1
238 if $nodeid ne $NODE;
239
240 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
241 }
242
243 =item snd_on $node, @msg
244
245 Executes C<snd> with the given C<@msg> (which must include the destination
246 port) on the given node.
247
248 =cut
249
250 sub snd_on($@) {
251 my $node = shift;
252 snd $node, snd => @_;
253 }
254
255 =item eval_on $node, $string[, @reply]
256
257 Evaluates the given string as Perl expression on the given node. When
258 @reply is specified, then it is used to construct a reply message with
259 C<"$@"> and any results from the eval appended.
260
261 =cut
262
263 sub eval_on($$;@) {
264 my $node = shift;
265 snd $node, eval => @_;
266 }
267
268 sub kil(@) {
269 my ($nodeid, $portid) = split /#/, shift, 2;
270
271 length $portid
272 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
273
274 ($NODE{$nodeid} || add_node $nodeid)
275 ->kill ("$portid", @_);
276 }
277
278 #############################################################################
279 # node monitoring and info
280
281 =item $bool = node_is_up $nodeid
282
283 Returns true if the given node is "up", that is, the kernel thinks it has
284 a working connection to it.
285
286 More precisely, if the node is up, returns C<1>. If the node is currently
287 connecting or otherwise known but not connected, returns C<0>. If nothing
288 is known about the node, returns C<undef>.
289
290 =cut
291
292 sub node_is_up($) {
293 ($_[0] eq $NODE) || ($NODE{$_[0]} or return)->{transport}
294 ? 1 : 0
295 }
296
297 =item @nodes = up_nodes
298
299 Return the node IDs of all nodes that are currently connected (excluding
300 the node itself).
301
302 =cut
303
304 sub up_nodes() {
305 map $_->{id}, grep $_->{transport}, values %NODE
306 }
307
308 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
309
310 Registers a callback that is called each time a node goes up (a connection
311 is established) or down (the connection is lost).
312
313 Node up messages can only be followed by node down messages for the same
314 node, and vice versa.
315
316 Note that monitoring a node is usually better done by monitoring its node
317 port. This function is mainly of interest to modules that are concerned
318 about the network topology and low-level connection handling.
319
320 Callbacks I<must not> block and I<should not> send any messages.
321
322 The function returns an optional guard which can be used to unregister
323 the monitoring callback again.
324
325 Example: make sure you call function C<newnode> for all nodes that are up
326 or go up (and down).
327
328 newnode $_, 1 for up_nodes;
329 mon_nodes \&newnode;
330
331 =cut
332
333 our %MON_NODES;
334
335 sub mon_nodes($) {
336 my ($cb) = @_;
337
338 $MON_NODES{$cb+0} = $cb;
339
340 defined wantarray
341 and Guard::guard { delete $MON_NODES{$cb+0} }
342 }
343
344 sub _inject_nodeevent($$;@) {
345 my ($node, $up, @reason) = @_;
346
347 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
348
349 for my $cb (values %MON_NODES) {
350 eval { $cb->($node->{id}, $up, @reason); 1 }
351 or AE::log die => $@;
352 }
353 }
354
355 #############################################################################
356 # self node code
357
358 sub _kill {
359 my $port = shift;
360
361 delete $PORT{$port}
362 or return; # killing nonexistent ports is O.K.
363 delete $PORT_DATA{$port};
364
365 my $mon = delete $LMON{$port}
366 or !@_
367 or AE::log die => "unmonitored local port $port died with reason: @_";
368
369 $_->(@_) for values %$mon;
370 }
371
372 sub _monitor {
373 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
374 unless exists $PORT{$_[1]};
375
376 $LMON{$_[1]}{$_[2]+0} = $_[2];
377 }
378
379 sub _unmonitor {
380 delete $LMON{$_[1]}{$_[2]+0}
381 if exists $LMON{$_[1]};
382 }
383
384 sub _secure_check {
385 $SECURE
386 and die "remote execution not allowed\n";
387 }
388
389 our %NODE_REQ;
390
391 %NODE_REQ = (
392 # "mproto" - monitoring protocol
393
394 # monitoring
395 mon0 => sub { # stop monitoring a port for another node
396 my $portid = shift;
397 # the if exists should not be needed, but there is apparently a bug
398 # elsewhere, and this works around that, silently suppressing that bug. sigh.
399 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid}
400 if exists $NODE{$SRCNODE};
401 },
402 mon1 => sub { # start monitoring a port for another node
403 my $portid = shift;
404 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
405 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
406 delete $node->{rmon}{$portid};
407 $node->send (["", kil0 => $portid, @_])
408 if $node && $node->{transport};
409 };
410 },
411 # another node has killed a monitored port
412 kil0 => sub {
413 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
414 or return;
415
416 $_->(@_) for @$cbs;
417 },
418 # another node wants to kill a local port
419 kil1 => \&_kill,
420
421 # "public" services - not actually public
422
423 # relay message to another node / generic echo
424 snd => sub {
425 &snd
426 },
427 # ask if a node supports the given request, only works for fixed tags
428 can => sub {
429 my $method = shift;
430 snd @_, exists $NODE_REQ{$method};
431 },
432
433 # random utilities
434 eval => sub {
435 &_secure_check;
436 my @res = do { package main; eval shift };
437 snd @_, "$@", @res if @_;
438 },
439 time => sub {
440 snd @_, AE::now;
441 },
442 devnull => sub {
443 #
444 },
445 "" => sub {
446 # empty messages are keepalives or similar devnull-applications
447 },
448 );
449
450 # the node port
451 new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE
452
453 $PORT{""} = sub {
454 my $tag = shift;
455 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
456 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
457 };
458
459 our $MPROTO = 1;
460
461 # tell everybody who connects our nproto
462 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
463 $_[0]{local_greeting}{mproto} = $MPROTO;
464 };
465
466 #############################################################################
467 # seed management, try to keep connections to all seeds at all times
468
469 our %SEED_NODE; # seed ID => node ID|undef
470 our %NODE_SEED; # map node ID to seed ID
471 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
472 our $SEED_WATCHER;
473 our $SEED_RETRY;
474 our %GLOBAL_NODE; # global => undef
475
476 sub seed_connect {
477 my ($seed) = @_;
478
479 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
480 or Carp::croak "$seed: unparsable seed address";
481
482 AE::log 9 => "trying connect to seed node $seed.";
483
484 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
485 $host, $port,
486 on_greeted => sub {
487 # called after receiving remote greeting, learn remote node name
488
489 # we rely on untrusted data here (the remote node name) this is
490 # hopefully ok, as this can at most be used for DOSing, which is easy
491 # when you can do MITM anyway.
492
493 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
494 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
495 require AnyEvent::MP::Global; # every seed becomes a global node currently
496 delete $SEED_NODE{$seed};
497 } else {
498 $SEED_NODE{$seed} = $_[0]{remote_node};
499 $NODE_SEED{$_[0]{remote_node}} = $seed;
500
501 # also start global service, in case it isn't running
502 # since we probably switch conenctions, maybe we don't need to do this here?
503 snd $_[0]{remote_node}, "g_slave";
504 }
505 },
506 sub {
507 delete $SEED_CONNECT{$seed};
508 }
509 ;
510 }
511
512 sub seed_all {
513 my @seeds = grep
514 !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
515 keys %SEED_NODE;
516
517 if (@seeds) {
518 # start connection attempt for every seed we are not connected to yet
519 seed_connect $_
520 for grep !exists $SEED_CONNECT{$_}, @seeds;
521
522 $SEED_RETRY = $SEED_RETRY * 2;
523 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
524 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
525
526 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
527
528 } else {
529 # all seeds connected or connecting, no need to restart timer
530 undef $SEED_WATCHER;
531 }
532 }
533
534 sub seed_again {
535 $SEED_RETRY = (1 + rand) * 0.6;
536 $SEED_WATCHER ||= AE::timer 0, 0, \&seed_all;
537 }
538
539 # sets new seed list, starts connecting
540 sub set_seeds(@) {
541 %SEED_NODE = ();
542 %NODE_SEED = ();
543 %SEED_CONNECT = ();
544
545 @SEED_NODE{@_} = ();
546
547 seed_again;
548 }
549
550 # normal nodes only record global node connections
551 $NODE_REQ{g_global} = sub {
552 undef $GLOBAL_NODE{$SRCNODE};
553 };
554
555 mon_nodes sub {
556 delete $GLOBAL_NODE{$_[0]}
557 unless $_[1];
558
559 return unless exists $NODE_SEED{$_[0]};
560
561 if ($_[1]) {
562 # each time a connection to a seed node goes up, make
563 # sure it runs the global service.
564 snd $_[0], "g_slave";
565 } else {
566 # if we lost the connection to a seed node, make sure we are seeding
567 seed_again;
568 }
569 };
570
571 #############################################################################
572 # keepalive code - used to kepe conenctions to certain nodes alive
573 # only used by global code atm., but ought to be exposed somehow.
574 #TODO: should probbaly be done directly by node objects
575
576 our $KEEPALIVE_RETRY;
577 our $KEEPALIVE_WATCHER;
578 our %KEEPALIVE; # we want to keep these nodes alive
579 our %KEEPALIVE_DOWN; # nodes that are down currently
580
581 sub keepalive_all {
582 AE::log 9 => "keepalive: trying to establish connections with: "
583 . (join " ", keys %KEEPALIVE_DOWN)
584 . ".";
585
586 (add_node $_)->connect
587 for keys %KEEPALIVE_DOWN;
588
589 $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2;
590 $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
591 if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
592
593 $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
594 }
595
596 sub keepalive_again {
597 $KEEPALIVE_RETRY = (1 + rand) * 0.3;
598 keepalive_all;
599 }
600
601 sub keepalive_add {
602 return if $KEEPALIVE{$_[0]}++;
603
604 return if node_is_up $_[0];
605 undef $KEEPALIVE_DOWN{$_[0]};
606 keepalive_again;
607 }
608
609 sub keepalive_del {
610 return if --$KEEPALIVE{$_[0]};
611
612 delete $KEEPALIVE {$_[0]};
613 delete $KEEPALIVE_DOWN{$_[0]};
614
615 undef $KEEPALIVE_WATCHER
616 unless %KEEPALIVE_DOWN;
617 }
618
619 mon_nodes sub {
620 return unless exists $KEEPALIVE{$_[0]};
621
622 if ($_[1]) {
623 delete $KEEPALIVE_DOWN{$_[0]};
624
625 undef $KEEPALIVE_WATCHER
626 unless %KEEPALIVE_DOWN;
627 } else {
628 # lost the conenction, try to connect again
629 undef $KEEPALIVE_DOWN{$_[0]};
630 keepalive_again;
631 }
632 };
633
634 #############################################################################
635 # talk with/to global nodes
636
637 # protocol messages:
638 #
639 # sent by global nodes
640 # g_global - global nodes send this to all others
641 #
642 # database protocol
643 # g_slave database - make other global node master of the sender
644 # g_set database - global node's database to other global nodes
645 # g_upd family set del - update single family (any to global)
646 #
647 # slave <-> global protocol
648 # g_find node - query addresses for node (slave to global)
649 # g_found node binds - node addresses (global to slave)
650 # g_db_family family id - send g_reply with data (global to slave)
651 # g_db_keys family id - send g_reply with data (global to slave)
652 # g_db_values family id - send g_reply with data (global to slave)
653 # g_reply id result - result of any query (global to slave)
654 # g_mon1 family - start to monitor family, replies with g_chg1
655 # g_mon0 family - stop monitoring family
656 # g_chg1 family hash - initial value of family when starting to monitor
657 # g_chg2 family set del - like g_upd, but for monitoring only
658 #
659 # internal database families:
660 # "'l" -> node -> listeners
661 # "'g" -> node -> undef
662 # ...
663 #
664
665 # used on all nodes:
666 our $MASTER; # the global node we bind ourselves to
667 our $MASTER_MON;
668 our %LOCAL_DB; # this node database
669
670 our $GPROTO = 1;
671
672 # tell everybody who connects our gproto
673 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
674 $_[0]{local_greeting}{gproto} = $GPROTO;
675 };
676
677 #############################################################################
678 # master selection
679
680 # master requests
681 our %GLOBAL_REQ; # $id => \@req
682
683 sub global_req_add {
684 my ($id, $req) = @_;
685
686 return if exists $GLOBAL_REQ{$id};
687
688 $GLOBAL_REQ{$id} = $req;
689
690 snd $MASTER, @$req
691 if $MASTER;
692 }
693
694 sub global_req_del {
695 delete $GLOBAL_REQ{$_[0]};
696 }
697
698 #################################
699 # master rpc
700
701 our %GLOBAL_RES;
702 our $GLOBAL_RES_ID = "a";
703
704 sub global_call {
705 my $id = ++$GLOBAL_RES_ID;
706 $GLOBAL_RES{$id} = pop;
707 global_req_add $id, [@_, $id];
708 }
709
710 $NODE_REQ{g_reply} = sub {
711 my $id = shift;
712 global_req_del $id;
713 my $cb = delete $GLOBAL_RES{$id}
714 or return;
715 &$cb
716 };
717
718 #################################
719
720 sub g_find {
721 global_req_add "g_find $_[0]", [g_find => $_[0]];
722 }
723
724 # reply for g_find started in Node.pm
725 $NODE_REQ{g_found} = sub {
726 global_req_del "g_find $_[0]";
727
728 my $node = $NODE{$_[0]} or return;
729
730 $node->connect_to ($_[1]);
731 };
732
733 sub master_set {
734 $MASTER = $_[0];
735 AE::log 8 => "new master node: $MASTER.";
736
737 $MASTER_MON = mon_nodes sub {
738 if ($_[0] eq $MASTER && !$_[1]) {
739 undef $MASTER;
740 master_search ();
741 }
742 };
743
744 snd $MASTER, g_slave => \%LOCAL_DB;
745
746 # (re-)send queued requests
747 snd $MASTER, @$_
748 for values %GLOBAL_REQ;
749 }
750
751 sub master_search {
752 AE::log 9 => "starting search for master node.";
753
754 #TODO: should also look for other global nodes, but we don't know them
755 for (keys %NODE_SEED) {
756 if (node_is_up $_) {
757 master_set $_;
758 return;
759 }
760 }
761
762 $MASTER_MON = mon_nodes sub {
763 return unless $_[1]; # we are only interested in node-ups
764 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
765
766 master_set $_[0];
767 };
768 }
769
770 # other node wants to make us the master, so start the global service
771 $NODE_REQ{g_slave} = sub {
772 # load global module and redo the request
773 require AnyEvent::MP::Global;
774 &{ $NODE_REQ{g_slave} }
775 };
776
777 #############################################################################
778 # local database operations
779
780 # canonical probably not needed
781 our $sv_eq_coder = JSON::XS->new->utf8->allow_nonref;
782
783 # are the two scalars equal? very very ugly and slow, need better way
784 sub sv_eq($$) {
785 ref $_[0] || ref $_[1]
786 ? (JSON::XS::encode $sv_eq_coder, $_[0]) eq (JSON::XS::encode $sv_eq_coder, $_[1])
787 : $_[0] eq $_[1]
788 && defined $_[0] == defined $_[1]
789 }
790
791 # local database management
792
793 sub db_del($@) {
794 my $family = shift;
795
796 my @del = grep exists $LOCAL_DB{$family}{$_}, @_;
797
798 return unless @del;
799
800 delete @{ $LOCAL_DB{$family} }{@del};
801 snd $MASTER, g_upd => $family => undef, \@del
802 if defined $MASTER;
803 }
804
805 sub db_set($$;$) {
806 my ($family, $subkey) = @_;
807
808 # if (ref $_[1]) {
809 # # bulk
810 # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
811 # $LOCAL_DB{$_[0]} = $_[1];
812 # snd $MASTER, g_upd => $_[0] => $_[1], \@del
813 # if defined $MASTER;
814 # } else {
815 # single-key
816 unless (exists $LOCAL_DB{$family}{$subkey} && sv_eq $LOCAL_DB{$family}{$subkey}, $_[2]) {
817 $LOCAL_DB{$family}{$subkey} = $_[2];
818 snd $MASTER, g_upd => $family => { $subkey => $_[2] }
819 if defined $MASTER;
820 }
821 # }
822
823 defined wantarray
824 and Guard::guard { db_del $family => $subkey }
825 }
826
827 # database query
828
829 sub db_family {
830 my ($family, $cb) = @_;
831 global_call g_db_family => $family, $cb;
832 }
833
834 sub db_keys {
835 my ($family, $cb) = @_;
836 global_call g_db_keys => $family, $cb;
837 }
838
839 sub db_values {
840 my ($family, $cb) = @_;
841 global_call g_db_values => $family, $cb;
842 }
843
844 # database monitoring
845
846 our %LOCAL_MON; # f, reply
847 our %MON_DB; # f, k, value
848
849 sub db_mon($@) {
850 my ($family, $cb) = @_;
851
852 if (my $db = $MON_DB{$family}) {
853 # we already monitor, so create a "dummy" change event
854 # this is postponed, which might be too late (we could process
855 # change events), so disable the callback at first
856 $LOCAL_MON{$family}{$cb+0} = sub { };
857 AE::postpone {
858 return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
859
860 # set actual callback
861 $LOCAL_MON{$family}{$cb+0} = $cb;
862 $cb->($db, [keys %$db]);
863 };
864 } else {
865 # new monitor, request chg1 from upstream
866 $LOCAL_MON{$family}{$cb+0} = $cb;
867 global_req_add "mon1 $family" => [g_mon1 => $family];
868 $MON_DB{$family} = {};
869 }
870
871 defined wantarray
872 and Guard::guard {
873 my $mon = $LOCAL_MON{$family};
874 delete $mon->{$cb+0};
875
876 unless (%$mon) {
877 global_req_del "mon1 $family";
878
879 # no global_req, because we don't care if we are not connected
880 snd $MASTER, g_mon0 => $family
881 if $MASTER;
882
883 delete $LOCAL_MON{$family};
884 delete $MON_DB{$family};
885 }
886 }
887 }
888
889 # full update
890 $NODE_REQ{g_chg1} = sub {
891 return unless $SRCNODE eq $MASTER;
892 my ($f, $ndb) = @_;
893
894 my $db = $MON_DB{$f};
895 my (@a, @c, @d);
896
897 # add or replace keys
898 while (my ($k, $v) = each %$ndb) {
899 exists $db->{$k}
900 ? push @c, $k
901 : push @a, $k;
902 $db->{$k} = $v;
903 }
904
905 # delete keys that are no longer present
906 for (grep !exists $ndb->{$_}, keys %$db) {
907 delete $db->{$_};
908 push @d, $_;
909 }
910
911 $_->($db, \@a, \@c, \@d)
912 for values %{ $LOCAL_MON{$_[0]} };
913 };
914
915 # incremental update
916 $NODE_REQ{g_chg2} = sub {
917 return unless $SRCNODE eq $MASTER;
918 my ($family, $set, $del) = @_;
919
920 my $db = $MON_DB{$family};
921
922 my (@a, @c);
923
924 while (my ($k, $v) = each %$set) {
925 exists $db->{$k}
926 ? push @c, $k
927 : push @a, $k;
928 $db->{$k} = $v;
929 }
930
931 delete @$db{@$del};
932
933 $_->($db, \@a, \@c, $del)
934 for values %{ $LOCAL_MON{$family} };
935 };
936
937 #############################################################################
938 # configure
939
940 sub nodename {
941 require POSIX;
942 (POSIX::uname ())[1]
943 }
944
945 sub _resolve($) {
946 my ($nodeid) = @_;
947
948 my $cv = AE::cv;
949 my @res;
950
951 $cv->begin (sub {
952 my %seen;
953 my @refs;
954 for (sort { $a->[0] <=> $b->[0] } @res) {
955 push @refs, $_->[1] unless $seen{$_->[1]}++
956 }
957 shift->send (@refs);
958 });
959
960 my $idx;
961 for my $t (split /,/, $nodeid) {
962 my $pri = ++$idx;
963
964 $t = length $t ? nodename . ":$t" : nodename
965 if $t =~ /^\d*$/;
966
967 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
968 or Carp::croak "$t: unparsable transport descriptor";
969
970 $port = "0" if $port eq "*";
971
972 if ($host eq "*") {
973 $cv->begin;
974
975 my $get_addr = sub {
976 my @addr;
977
978 require Net::Interface;
979
980 # Net::Interface hangs on some systems, so hope for the best
981 local $SIG{ALRM} = 'DEFAULT';
982 alarm 2;
983
984 for my $if (Net::Interface->interfaces) {
985 # we statically lower-prioritise ipv6 here, TODO :()
986 for $_ ($if->address (Net::Interface::AF_INET ())) {
987 next if /^\x7f/; # skip localhost etc.
988 push @addr, $_;
989 }
990 for ($if->address (Net::Interface::AF_INET6 ())) {
991 #next if $if->scope ($_) <= 2;
992 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
993 push @addr, $_;
994 }
995 }
996
997 alarm 0;
998
999 @addr
1000 };
1001
1002 my @addr;
1003
1004 if (AnyEvent::WIN32) {
1005 @addr = $get_addr->();
1006 } else {
1007 # use a child process, as Net::Interface is big, and we need it only once.
1008
1009 pipe my $r, my $w
1010 or die "pipe: $!";
1011
1012 if (fork eq 0) {
1013 close $r;
1014 syswrite $w, pack "(C/a*)*", $get_addr->();
1015 require POSIX;
1016 POSIX::_exit (0);
1017 } else {
1018 close $w;
1019
1020 my $addr;
1021
1022 1 while sysread $r, $addr, 1024, length $addr;
1023
1024 @addr = unpack "(C/a*)*", $addr;
1025 }
1026 }
1027
1028 for my $ip (@addr) {
1029 push @res, [
1030 $pri += 1e-5,
1031 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
1032 ];
1033 }
1034 $cv->end;
1035 } else {
1036 $cv->begin;
1037 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
1038 for (@_) {
1039 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
1040 push @res, [
1041 $pri += 1e-5,
1042 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
1043 ];
1044 }
1045 $cv->end;
1046 };
1047 }
1048 }
1049
1050 $cv->end;
1051
1052 $cv
1053 }
1054
1055 our @POST_CONFIGURE;
1056
1057 # not yet documented
1058 sub post_configure(&) {
1059 die "AnyEvent::MP::Kernel::post_configure must be called in void context" if defined wantarray;
1060
1061 push @POST_CONFIGURE, @_;
1062 (shift @POST_CONFIGURE)->() while $NODE && @POST_CONFIGURE;
1063 }
1064
1065 sub configure(@) {
1066 unshift @_, "profile" if @_ & 1;
1067 my (%kv) = @_;
1068
1069 my $profile = delete $kv{profile};
1070
1071 $profile = nodename
1072 unless defined $profile;
1073
1074 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
1075
1076 $SECURE = $CONFIG->{secure};
1077
1078 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
1079
1080 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
1081
1082 my $node_obj = delete $NODE{$NODE}; # we do not support doing stuff before configure
1083
1084 $NODE = $node;
1085
1086 $NODE =~ s/%n/nodename/ge;
1087
1088 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
1089 # nodes with randomised node names do not need randomised port names
1090 $UNIQ = "";
1091 }
1092
1093 $node_obj->{id} = $NODE;
1094 $NODE{$NODE} = $node_obj;
1095
1096 my $seeds = $CONFIG->{seeds};
1097 my $binds = $CONFIG->{binds};
1098
1099 $binds ||= ["*"];
1100
1101 AE::log 8 => "node $NODE starting up.";
1102
1103 $BINDS = [];
1104 %BINDS = ();
1105
1106 for (map _resolve $_, @$binds) {
1107 for my $bind ($_->recv) {
1108 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1109 or Carp::croak "$bind: unparsable local bind address";
1110
1111 my $listener = AnyEvent::MP::Transport::mp_server
1112 $host,
1113 $port,
1114 prepare => sub {
1115 my (undef, $host, $port) = @_;
1116 $bind = AnyEvent::Socket::format_hostport $host, $port;
1117 0
1118 },
1119 ;
1120 $BINDS{$bind} = $listener;
1121 push @$BINDS, $bind;
1122 }
1123 }
1124
1125 AE::log 9 => "running post config hooks and init.";
1126
1127 # might initialise Global, so need to do it before db_set
1128 post_configure { };
1129
1130 db_set "'l" => $NODE => $BINDS;
1131
1132 AE::log 8 => "node listens on [@$BINDS].";
1133
1134 # connect to all seednodes
1135 set_seeds map $_->recv, map _resolve $_, @$seeds;
1136 master_search;
1137
1138 # save gobs of memory
1139 undef &_resolve;
1140 *configure = sub (@){ };
1141
1142 AE::log 9 => "starting services.";
1143
1144 for (@{ $CONFIG->{services} }) {
1145 if (ref) {
1146 my ($func, @args) = @$_;
1147 (load_func $func)->(@args);
1148 } elsif (s/::$//) {
1149 eval "require $_";
1150 die $@ if $@;
1151 } else {
1152 (load_func $_)->();
1153 }
1154 }
1155
1156 eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
1157 die "$@" if $@;
1158 }
1159
1160 =back
1161
1162 =head1 LOGGING
1163
1164 AnyEvent::MP::Kernel logs high-level information about the current node,
1165 when nodes go up and down, and most runtime errors. It also logs some
1166 debugging and trace messages about network maintainance, such as seed
1167 connections and global node management.
1168
1169 =head1 SEE ALSO
1170
1171 L<AnyEvent::MP>.
1172
1173 =head1 AUTHOR
1174
1175 Marc Lehmann <schmorp@schmorp.de>
1176 http://home.schmorp.de/
1177
1178 =cut
1179
1180 1
1181