ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.119
Committed: Tue Jul 24 07:09:29 2018 UTC (5 years, 10 months ago) by root
Branch: MAIN
CVS Tags: rel-2_02, rel-2_01
Changes since 1.118: +4 -1 lines
Log Message:
2.01

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9 root 1.106 $AnyEvent::MP::Kernel::SRCNODE # contains msg origin node id, for debugging
10    
11     snd_to_func $node, $func, @args # send msg to function
12     snd_on $node, @msg # snd message again (relay)
13     eval_on $node, $string[, @reply] # execute perl code on another node
14    
15     node_is_up $nodeid # return true if a node is connected
16     @nodes = up_nodes # return a list of all connected nodes
17     $guard = mon_nodes $callback->($node, $is_up, @reason) # connections up/downs
18    
19 root 1.1 =head1 DESCRIPTION
20    
21 root 1.106 This module implements most of the inner workings of AnyEvent::MP. It
22     offers mostly lower-level functions that deal with network connectivity
23     and special requests.
24    
25     You normally interface with AnyEvent::MP through a higher level interface
26     such as L<AnyEvent::MP> and L<Coro::MP>, although there is nothing wrong
27     with using the functions from this module.
28 root 1.3
29     =head1 GLOBALS AND FUNCTIONS
30 root 1.1
31     =over 4
32    
33     =cut
34    
35     package AnyEvent::MP::Kernel;
36    
37     use common::sense;
38     use Carp ();
39    
40 root 1.79 use AnyEvent ();
41     use Guard ();
42 root 1.1
43     use AnyEvent::MP::Node;
44     use AnyEvent::MP::Transport;
45    
46     use base "Exporter";
47    
48 root 1.106 # for re-export in AnyEvent::MP and Coro::MP
49     our @EXPORT_API = qw(
50     NODE $NODE
51     configure
52     node_of port_is_local
53     snd kil
54     db_set db_del
55     db_mon db_family db_keys db_values
56     );
57    
58     our @EXPORT_OK = (
59     # these are internal
60     qw(
61     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
62     add_node load_func
63     ),
64     @EXPORT_API,
65 root 1.71 );
66    
67 root 1.1 our @EXPORT = qw(
68 root 1.106 snd_to_func snd_on eval_on
69     port_is_local
70 root 1.46 up_nodes mon_nodes node_is_up
71 root 1.1 );
72    
73 root 1.118 our @CARP_NOT = (AnyEvent::MP::);
74    
75 root 1.6 sub load_func($) {
76     my $func = $_[0];
77    
78     unless (defined &$func) {
79     my $pkg = $func;
80     do {
81     $pkg =~ s/::[^:]+$//
82 root 1.63 or return sub { die "unable to resolve function '$func'" };
83 root 1.60
84     local $@;
85 root 1.61 unless (eval "require $pkg; 1") {
86     my $error = $@;
87     $error =~ /^Can't locate .*.pm in \@INC \(/
88     or return sub { die $error };
89     }
90 root 1.6 } until defined &$func;
91     }
92    
93     \&$func
94     }
95    
96 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
97    
98 root 1.1 sub nonce($) {
99 root 1.78 join "", map chr rand 256, 1 .. $_[0]
100 root 1.1 }
101    
102 root 1.78 sub nonce62($) {
103     join "", map $alnum[rand 62], 1 .. $_[0]
104 root 1.1 }
105    
106 root 1.20 our $CONFIG; # this node's configuration
107 root 1.104 our $SECURE;
108 root 1.21
109 root 1.64 our $RUNIQ; # remote uniq value
110     our $UNIQ; # per-process/node unique cookie
111     our $NODE;
112     our $ID = "a";
113 root 1.1
114     our %NODE; # node id to transport mapping, or "undef", for local node
115     our (%PORT, %PORT_DATA); # local ports
116    
117 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
118 root 1.1 our %LMON; # monitored _local_ ports
119    
120 root 1.111 #our $GLOBAL; # true if node is a global ("directory") node
121 root 1.85 our %BINDS;
122     our $BINDS; # our listeners, as arrayref
123 root 1.1
124 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
125 root 1.111 our $GLOBAL; # true when this is a global node (only set by AnyEvent::MP::Global)
126 root 1.1
127 root 1.106 # initialise names for non-networked operation
128     {
129 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
130 root 1.106 my $now = AE::now;
131 root 1.112 $UNIQ =
132 root 1.106 (join "",
133     map $alnum[$_],
134     $$ / 62 % 62,
135     $$ % 62,
136     (int $now ) % 62,
137     (int $now * 100) % 62,
138     (int $now * 10000) % 62,
139     ) . nonce62 4
140     ;
141 root 1.78
142     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
143     $RUNIQ = nonce62 10;
144     $RUNIQ =~ s/(.)$/\U$1/;
145    
146 root 1.106 $NODE = "";
147 root 1.64 }
148    
149 root 1.1 sub NODE() {
150     $NODE
151     }
152    
153     sub node_of($) {
154 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
155 root 1.1
156 root 1.21 $node
157 root 1.1 }
158    
159 root 1.17 BEGIN {
160     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
161     ? sub () { 1 }
162     : sub () { 0 };
163     }
164 root 1.1
165 root 1.42 our $DELAY_TIMER;
166     our @DELAY_QUEUE;
167    
168 root 1.97 our $delay_run = sub {
169 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
170 root 1.97 };
171 root 1.42
172     sub delay($) {
173     push @DELAY_QUEUE, shift;
174 root 1.97 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
175 root 1.42 }
176    
177 root 1.96 =item $AnyEvent::MP::Kernel::SRCNODE
178    
179     During execution of a message callback, this variable contains the node ID
180     of the origin node.
181    
182     The main use of this variable is for debugging output - there are probably
183     very few other cases where you need to know the source node ID.
184    
185     =cut
186    
187 root 1.1 sub _inject {
188 root 1.117 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;
189    
190 root 1.1 &{ $PORT{+shift} or return };
191     }
192    
193 root 1.20 # this function adds a node-ref, so you can send stuff to it
194     # it is basically the central routing component.
195 root 1.1 sub add_node {
196 root 1.107 $NODE{$_[0]} || do {
197     my ($node) = @_;
198 root 1.1
199 root 1.107 length $node
200     or Carp::croak "'undef' or the empty string are not valid node/port IDs";
201 root 1.93
202 root 1.107 # registers itself in %NODE
203     new AnyEvent::MP::Node::Remote $node
204     }
205 root 1.13 }
206    
207 root 1.1 sub snd(@) {
208 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
209 root 1.1
210 root 1.117 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;
211 root 1.1
212 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
213 root 1.2 ->{send} (["$portid", @_]);
214 root 1.1 }
215    
216 root 1.17 sub port_is_local($) {
217 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
218 root 1.17
219 root 1.106 $nodeid eq $NODE
220 root 1.17 }
221    
222 root 1.18 =item snd_to_func $node, $func, @args
223 root 1.11
224 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
225 root 1.11 this function with the given arguments on that node.
226    
227 root 1.20 This function can be used to implement C<spawn>-like interfaces.
228 root 1.11
229     =cut
230    
231 root 1.18 sub snd_to_func($$;@) {
232 root 1.21 my $nodeid = shift;
233 root 1.11
234 root 1.41 # on $NODE, we artificially delay... (for spawn)
235     # this is very ugly - maybe we should simply delay ALL messages,
236     # to avoid deep recursion issues. but that's so... slow...
237 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
238     if $nodeid ne $NODE;
239    
240 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
241 root 1.11 }
242    
243 root 1.18 =item snd_on $node, @msg
244    
245     Executes C<snd> with the given C<@msg> (which must include the destination
246     port) on the given node.
247    
248     =cut
249    
250     sub snd_on($@) {
251     my $node = shift;
252     snd $node, snd => @_;
253     }
254    
255 root 1.29 =item eval_on $node, $string[, @reply]
256 root 1.18
257 root 1.29 Evaluates the given string as Perl expression on the given node. When
258     @reply is specified, then it is used to construct a reply message with
259     C<"$@"> and any results from the eval appended.
260 root 1.18
261     =cut
262    
263 root 1.29 sub eval_on($$;@) {
264 root 1.18 my $node = shift;
265     snd $node, eval => @_;
266     }
267    
268 root 1.1 sub kil(@) {
269 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
270 root 1.1
271     length $portid
272 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
273 root 1.1
274 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
275 root 1.1 ->kill ("$portid", @_);
276     }
277    
278     #############################################################################
279 root 1.6 # node monitoring and info
280 root 1.3
281 root 1.106 =item $bool = node_is_up $nodeid
282 root 1.13
283     Returns true if the given node is "up", that is, the kernel thinks it has
284     a working connection to it.
285    
286 root 1.107 More precisely, if the node is up, returns C<1>. If the node is currently
287     connecting or otherwise known but not connected, returns C<0>. If nothing
288     is known about the node, returns C<undef>.
289 root 1.13
290     =cut
291    
292     sub node_is_up($) {
293 root 1.107 ($_[0] eq $NODE) || ($NODE{$_[0]} or return)->{transport}
294 root 1.13 ? 1 : 0
295     }
296    
297 root 1.106 =item @nodes = up_nodes
298 root 1.3
299 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
300     the node itself).
301 root 1.3
302     =cut
303    
304 root 1.49 sub up_nodes() {
305 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
306 root 1.3 }
307    
308 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
309 root 1.3
310 root 1.27 Registers a callback that is called each time a node goes up (a connection
311     is established) or down (the connection is lost).
312 root 1.3
313     Node up messages can only be followed by node down messages for the same
314     node, and vice versa.
315    
316 root 1.71 Note that monitoring a node is usually better done by monitoring its node
317 root 1.27 port. This function is mainly of interest to modules that are concerned
318     about the network topology and low-level connection handling.
319    
320     Callbacks I<must not> block and I<should not> send any messages.
321    
322     The function returns an optional guard which can be used to unregister
323 root 1.3 the monitoring callback again.
324    
325 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
326     or go up (and down).
327    
328     newnode $_, 1 for up_nodes;
329     mon_nodes \&newnode;
330    
331 root 1.3 =cut
332    
333     our %MON_NODES;
334    
335     sub mon_nodes($) {
336     my ($cb) = @_;
337    
338     $MON_NODES{$cb+0} = $cb;
339    
340 root 1.90 defined wantarray
341     and Guard::guard { delete $MON_NODES{$cb+0} }
342 root 1.3 }
343    
344     sub _inject_nodeevent($$;@) {
345 root 1.16 my ($node, $up, @reason) = @_;
346 root 1.3
347 root 1.107 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
348    
349 root 1.3 for my $cb (values %MON_NODES) {
350 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
351 root 1.98 or AE::log die => $@;
352 root 1.3 }
353     }
354    
355     #############################################################################
356 root 1.1 # self node code
357    
358 root 1.67 sub _kill {
359     my $port = shift;
360    
361     delete $PORT{$port}
362     or return; # killing nonexistent ports is O.K.
363     delete $PORT_DATA{$port};
364    
365     my $mon = delete $LMON{$port}
366     or !@_
367 root 1.98 or AE::log die => "unmonitored local port $port died with reason: @_";
368 root 1.67
369     $_->(@_) for values %$mon;
370     }
371    
372     sub _monitor {
373     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
374     unless exists $PORT{$_[1]};
375    
376     $LMON{$_[1]}{$_[2]+0} = $_[2];
377     }
378    
379     sub _unmonitor {
380 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
381     if exists $LMON{$_[1]};
382 root 1.67 }
383    
384 root 1.83 sub _secure_check {
385 root 1.104 $SECURE
386 root 1.105 and die "remote execution not allowed\n";
387 root 1.83 }
388    
389 root 1.107 our %NODE_REQ;
390    
391     %NODE_REQ = (
392     # "mproto" - monitoring protocol
393 root 1.1
394     # monitoring
395 root 1.65 mon0 => sub { # stop monitoring a port for another node
396 root 1.1 my $portid = shift;
397 root 1.119 # the if exists should not be needed, but there is apparently a bug
398     # elsewhere, and this works around that, silently suppressing that bug. sigh.
399     _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid}
400     if exists $NODE{$SRCNODE};
401 root 1.1 },
402 root 1.65 mon1 => sub { # start monitoring a port for another node
403 root 1.1 my $portid = shift;
404 root 1.96 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
405 root 1.67 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
406 root 1.58 delete $node->{rmon}{$portid};
407 root 1.65 $node->send (["", kil0 => $portid, @_])
408 root 1.59 if $node && $node->{transport};
409 root 1.67 };
410 root 1.1 },
411 root 1.65 # another node has killed a monitored port
412     kil0 => sub {
413 root 1.96 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
414 root 1.1 or return;
415    
416     $_->(@_) for @$cbs;
417     },
418 root 1.107 # another node wants to kill a local port
419     kil1 => \&_kill,
420 root 1.1
421 root 1.18 # "public" services - not actually public
422 root 1.1
423     # relay message to another node / generic echo
424 root 1.88 snd => sub {
425     &snd
426 root 1.1 },
427 root 1.107 # ask if a node supports the given request, only works for fixed tags
428     can => sub {
429     my $method = shift;
430     snd @_, exists $NODE_REQ{$method};
431     },
432 root 1.1
433 root 1.30 # random utilities
434 root 1.1 eval => sub {
435 root 1.83 &_secure_check;
436 root 1.50 my @res = do { package main; eval shift };
437 root 1.1 snd @_, "$@", @res if @_;
438     },
439     time => sub {
440 root 1.76 snd @_, AE::now;
441 root 1.1 },
442     devnull => sub {
443     #
444     },
445 root 1.15 "" => sub {
446 root 1.27 # empty messages are keepalives or similar devnull-applications
447 root 1.15 },
448 root 1.1 );
449    
450 root 1.104 # the node port
451 root 1.107 new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE
452    
453 root 1.1 $PORT{""} = sub {
454     my $tag = shift;
455 root 1.83 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
456 root 1.98 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
457 root 1.1 };
458    
459 root 1.107 our $MPROTO = 1;
460 root 1.84
461     # tell everybody who connects our nproto
462     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
463 root 1.107 $_[0]{local_greeting}{mproto} = $MPROTO;
464 root 1.84 };
465    
466 root 1.69 #############################################################################
467 root 1.71 # seed management, try to keep connections to all seeds at all times
468 root 1.69
469 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
470     our %NODE_SEED; # map node ID to seed ID
471 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
472     our $SEED_WATCHER;
473 root 1.71 our $SEED_RETRY;
474 root 1.111 our %GLOBAL_NODE; # global => undef
475 root 1.69
476     sub seed_connect {
477     my ($seed) = @_;
478    
479     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
480     or Carp::croak "$seed: unparsable seed address";
481    
482 root 1.98 AE::log 9 => "trying connect to seed node $seed.";
483 root 1.69
484 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
485     $host, $port,
486     on_greeted => sub {
487     # called after receiving remote greeting, learn remote node name
488    
489     # we rely on untrusted data here (the remote node name) this is
490     # hopefully ok, as this can at most be used for DOSing, which is easy
491     # when you can do MITM anyway.
492    
493     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
494     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
495 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
496 root 1.71 delete $SEED_NODE{$seed};
497     } else {
498     $SEED_NODE{$seed} = $_[0]{remote_node};
499     $NODE_SEED{$_[0]{remote_node}} = $seed;
500 root 1.111
501     # also start global service, in case it isn't running
502     # since we probably switch conenctions, maybe we don't need to do this here?
503     snd $_[0]{remote_node}, "g_slave";
504 root 1.71 }
505     },
506 root 1.93 sub {
507 root 1.71 delete $SEED_CONNECT{$seed};
508     }
509 root 1.69 ;
510     }
511    
512     sub seed_all {
513 root 1.93 my @seeds = grep
514 root 1.113 !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
515 root 1.93 keys %SEED_NODE;
516 root 1.69
517     if (@seeds) {
518 root 1.70 # start connection attempt for every seed we are not connected to yet
519 root 1.69 seed_connect $_
520 root 1.113 for grep !exists $SEED_CONNECT{$_}, @seeds;
521 root 1.71
522 root 1.111 $SEED_RETRY = $SEED_RETRY * 2;
523 root 1.71 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
524     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
525    
526     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
527    
528 root 1.69 } else {
529 root 1.71 # all seeds connected or connecting, no need to restart timer
530 root 1.69 undef $SEED_WATCHER;
531     }
532     }
533    
534     sub seed_again {
535 root 1.111 $SEED_RETRY = (1 + rand) * 0.6;
536 root 1.113 $SEED_WATCHER ||= AE::timer 0, 0, \&seed_all;
537 root 1.69 }
538    
539     # sets new seed list, starts connecting
540     sub set_seeds(@) {
541     %SEED_NODE = ();
542 root 1.71 %NODE_SEED = ();
543     %SEED_CONNECT = ();
544    
545 root 1.69 @SEED_NODE{@_} = ();
546    
547 root 1.114 seed_again;
548 root 1.71 }
549    
550 root 1.111 # normal nodes only record global node connections
551     $NODE_REQ{g_global} = sub {
552     undef $GLOBAL_NODE{$SRCNODE};
553     };
554    
555 root 1.71 mon_nodes sub {
556 root 1.111 delete $GLOBAL_NODE{$_[0]}
557     unless $_[1];
558    
559 root 1.93 return unless exists $NODE_SEED{$_[0]};
560    
561     if ($_[1]) {
562     # each time a connection to a seed node goes up, make
563     # sure it runs the global service.
564 root 1.111 snd $_[0], "g_slave";
565 root 1.93 } else {
566     # if we lost the connection to a seed node, make sure we are seeding
567     seed_again;
568     }
569 root 1.71 };
570    
571     #############################################################################
572 root 1.107 # keepalive code - used to kepe conenctions to certain nodes alive
573     # only used by global code atm., but ought to be exposed somehow.
574 root 1.109 #TODO: should probbaly be done directly by node objects
575 root 1.107
576     our $KEEPALIVE_RETRY;
577     our $KEEPALIVE_WATCHER;
578     our %KEEPALIVE; # we want to keep these nodes alive
579     our %KEEPALIVE_DOWN; # nodes that are down currently
580    
581     sub keepalive_all {
582     AE::log 9 => "keepalive: trying to establish connections with: "
583     . (join " ", keys %KEEPALIVE_DOWN)
584     . ".";
585    
586     (add_node $_)->connect
587     for keys %KEEPALIVE_DOWN;
588    
589 root 1.111 $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2;
590 root 1.107 $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
591     if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
592    
593     $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
594     }
595    
596     sub keepalive_again {
597 root 1.111 $KEEPALIVE_RETRY = (1 + rand) * 0.3;
598 root 1.108 keepalive_all;
599 root 1.107 }
600    
601     sub keepalive_add {
602     return if $KEEPALIVE{$_[0]}++;
603    
604     return if node_is_up $_[0];
605     undef $KEEPALIVE_DOWN{$_[0]};
606     keepalive_again;
607     }
608    
609     sub keepalive_del {
610     return if --$KEEPALIVE{$_[0]};
611    
612     delete $KEEPALIVE {$_[0]};
613     delete $KEEPALIVE_DOWN{$_[0]};
614    
615     undef $KEEPALIVE_WATCHER
616     unless %KEEPALIVE_DOWN;
617     }
618    
619     mon_nodes sub {
620     return unless exists $KEEPALIVE{$_[0]};
621    
622     if ($_[1]) {
623     delete $KEEPALIVE_DOWN{$_[0]};
624    
625     undef $KEEPALIVE_WATCHER
626     unless %KEEPALIVE_DOWN;
627     } else {
628     # lost the conenction, try to connect again
629     undef $KEEPALIVE_DOWN{$_[0]};
630     keepalive_again;
631     }
632     };
633    
634     #############################################################################
635 root 1.71 # talk with/to global nodes
636    
637 root 1.72 # protocol messages:
638     #
639 root 1.110 # sent by global nodes
640 root 1.111 # g_global - global nodes send this to all others
641 root 1.72 #
642 root 1.110 # database protocol
643     # g_slave database - make other global node master of the sender
644     # g_set database - global node's database to other global nodes
645 root 1.111 # g_upd family set del - update single family (any to global)
646 root 1.73 #
647 root 1.110 # slave <-> global protocol
648     # g_find node - query addresses for node (slave to global)
649     # g_found node binds - node addresses (global to slave)
650     # g_db_family family id - send g_reply with data (global to slave)
651     # g_db_keys family id - send g_reply with data (global to slave)
652     # g_db_values family id - send g_reply with data (global to slave)
653     # g_reply id result - result of any query (global to slave)
654     # g_mon1 family - start to monitor family, replies with g_chg1
655     # g_mon0 family - stop monitoring family
656     # g_chg1 family hash - initial value of family when starting to monitor
657     # g_chg2 family set del - like g_upd, but for monitoring only
658 root 1.73 #
659 root 1.111 # internal database families:
660 root 1.73 # "'l" -> node -> listeners
661     # "'g" -> node -> undef
662     # ...
663 root 1.72 #
664    
665 root 1.73 # used on all nodes:
666 root 1.88 our $MASTER; # the global node we bind ourselves to
667 root 1.78 our $MASTER_MON;
668     our %LOCAL_DB; # this node database
669    
670 root 1.84 our $GPROTO = 1;
671    
672 root 1.116 # tell everybody who connects our gproto
673 root 1.84 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
674     $_[0]{local_greeting}{gproto} = $GPROTO;
675     };
676    
677 root 1.73 #############################################################################
678     # master selection
679    
680     # master requests
681     our %GLOBAL_REQ; # $id => \@req
682 root 1.71
683 root 1.74 sub global_req_add {
684 root 1.80 my ($id, $req) = @_;
685 root 1.71
686 root 1.74 return if exists $GLOBAL_REQ{$id};
687    
688 root 1.80 $GLOBAL_REQ{$id} = $req;
689 root 1.71
690 root 1.80 snd $MASTER, @$req
691 root 1.73 if $MASTER;
692 root 1.74 }
693 root 1.71
694 root 1.74 sub global_req_del {
695     delete $GLOBAL_REQ{$_[0]};
696     }
697    
698 root 1.88 #################################
699     # master rpc
700    
701     our %GLOBAL_RES;
702     our $GLOBAL_RES_ID = "a";
703    
704     sub global_call {
705     my $id = ++$GLOBAL_RES_ID;
706     $GLOBAL_RES{$id} = pop;
707     global_req_add $id, [@_, $id];
708     }
709    
710     $NODE_REQ{g_reply} = sub {
711     my $id = shift;
712     global_req_del $id;
713     my $cb = delete $GLOBAL_RES{$id}
714     or return;
715     &$cb
716     };
717    
718     #################################
719    
720 root 1.74 sub g_find {
721 root 1.80 global_req_add "g_find $_[0]", [g_find => $_[0]];
722 root 1.73 }
723 root 1.71
724 root 1.73 # reply for g_find started in Node.pm
725 root 1.79 $NODE_REQ{g_found} = sub {
726 root 1.74 global_req_del "g_find $_[0]";
727    
728 root 1.73 my $node = $NODE{$_[0]} or return;
729 root 1.71
730 root 1.79 $node->connect_to ($_[1]);
731 root 1.71 };
732    
733 root 1.73 sub master_set {
734     $MASTER = $_[0];
735 root 1.111 AE::log 8 => "new master node: $MASTER.";
736    
737     $MASTER_MON = mon_nodes sub {
738     if ($_[0] eq $MASTER && !$_[1]) {
739     undef $MASTER;
740     master_search ();
741     }
742     };
743 root 1.71
744 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
745 root 1.71
746 root 1.73 # (re-)send queued requests
747     snd $MASTER, @$_
748     for values %GLOBAL_REQ;
749     }
750 root 1.71
751 root 1.72 sub master_search {
752 root 1.111 AE::log 9 => "starting search for master node.";
753    
754 root 1.115 #TODO: should also look for other global nodes, but we don't know them
755 root 1.73 for (keys %NODE_SEED) {
756 root 1.72 if (node_is_up $_) {
757     master_set $_;
758     return;
759 root 1.71 }
760 root 1.72 }
761 root 1.71
762 root 1.78 $MASTER_MON = mon_nodes sub {
763 root 1.72 return unless $_[1]; # we are only interested in node-ups
764     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
765 root 1.71
766 root 1.72 master_set $_[0];
767     };
768 root 1.71 }
769    
770 root 1.110 # other node wants to make us the master, so start the global service
771 root 1.79 $NODE_REQ{g_slave} = sub {
772 root 1.80 # load global module and redo the request
773 root 1.73 require AnyEvent::MP::Global;
774 root 1.80 &{ $NODE_REQ{g_slave} }
775 root 1.71 };
776    
777 root 1.73 #############################################################################
778 root 1.79 # local database operations
779 root 1.71
780 root 1.111 # canonical probably not needed
781     our $sv_eq_coder = JSON::XS->new->utf8->allow_nonref;
782    
783     # are the two scalars equal? very very ugly and slow, need better way
784     sub sv_eq($$) {
785     ref $_[0] || ref $_[1]
786     ? (JSON::XS::encode $sv_eq_coder, $_[0]) eq (JSON::XS::encode $sv_eq_coder, $_[1])
787     : $_[0] eq $_[1]
788     && defined $_[0] == defined $_[1]
789     }
790    
791 root 1.79 # local database management
792 root 1.88
793 root 1.111 sub db_del($@) {
794     my $family = shift;
795    
796     my @del = grep exists $LOCAL_DB{$family}{$_}, @_;
797    
798     return unless @del;
799    
800     delete @{ $LOCAL_DB{$family} }{@del};
801     snd $MASTER, g_upd => $family => undef, \@del
802     if defined $MASTER;
803     }
804    
805 root 1.87 sub db_set($$;$) {
806 root 1.97 my ($family, $subkey) = @_;
807    
808 root 1.89 # if (ref $_[1]) {
809     # # bulk
810     # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
811     # $LOCAL_DB{$_[0]} = $_[1];
812     # snd $MASTER, g_upd => $_[0] => $_[1], \@del
813     # if defined $MASTER;
814     # } else {
815     # single-key
816 root 1.111 unless (exists $LOCAL_DB{$family}{$subkey} && sv_eq $LOCAL_DB{$family}{$subkey}, $_[2]) {
817     $LOCAL_DB{$family}{$subkey} = $_[2];
818     snd $MASTER, g_upd => $family => { $subkey => $_[2] }
819     if defined $MASTER;
820     }
821 root 1.89 # }
822 root 1.97
823     defined wantarray
824     and Guard::guard { db_del $family => $subkey }
825 root 1.79 }
826    
827 root 1.88 # database query
828    
829     sub db_family {
830     my ($family, $cb) = @_;
831     global_call g_db_family => $family, $cb;
832     }
833    
834     sub db_keys {
835     my ($family, $cb) = @_;
836     global_call g_db_keys => $family, $cb;
837     }
838    
839     sub db_values {
840     my ($family, $cb) = @_;
841     global_call g_db_values => $family, $cb;
842 root 1.80 }
843    
844 root 1.88 # database monitoring
845 root 1.80
846 root 1.81 our %LOCAL_MON; # f, reply
847     our %MON_DB; # f, k, value
848 root 1.80
849 root 1.81 sub db_mon($@) {
850 root 1.84 my ($family, $cb) = @_;
851 root 1.81
852 root 1.84 if (my $db = $MON_DB{$family}) {
853 root 1.89 # we already monitor, so create a "dummy" change event
854     # this is postponed, which might be too late (we could process
855     # change events), so disable the callback at first
856     $LOCAL_MON{$family}{$cb+0} = sub { };
857     AE::postpone {
858     return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
859    
860     # set actual callback
861     $LOCAL_MON{$family}{$cb+0} = $cb;
862     $cb->($db, [keys %$db]);
863     };
864 root 1.81 } else {
865     # new monitor, request chg1 from upstream
866 root 1.89 $LOCAL_MON{$family}{$cb+0} = $cb;
867 root 1.81 global_req_add "mon1 $family" => [g_mon1 => $family];
868     $MON_DB{$family} = {};
869     }
870    
871 root 1.90 defined wantarray
872     and Guard::guard {
873     my $mon = $LOCAL_MON{$family};
874     delete $mon->{$cb+0};
875    
876     unless (%$mon) {
877     global_req_del "mon1 $family";
878    
879     # no global_req, because we don't care if we are not connected
880     snd $MASTER, g_mon0 => $family
881     if $MASTER;
882 root 1.80
883 root 1.90 delete $LOCAL_MON{$family};
884     delete $MON_DB{$family};
885     }
886 root 1.80 }
887     }
888    
889 root 1.82 # full update
890 root 1.80 $NODE_REQ{g_chg1} = sub {
891 root 1.96 return unless $SRCNODE eq $MASTER;
892 root 1.84 my ($f, $ndb) = @_;
893    
894     my $db = $MON_DB{$f};
895 root 1.89 my (@a, @c, @d);
896 root 1.81
897 root 1.82 # add or replace keys
898 root 1.84 while (my ($k, $v) = each %$ndb) {
899 root 1.89 exists $db->{$k}
900     ? push @c, $k
901     : push @a, $k;
902 root 1.84 $db->{$k} = $v;
903 root 1.82 }
904 root 1.81
905 root 1.82 # delete keys that are no longer present
906 root 1.84 for (grep !exists $ndb->{$_}, keys %$db) {
907     delete $db->{$_};
908 root 1.89 push @d, $_;
909 root 1.81 }
910 root 1.84
911 root 1.89 $_->($db, \@a, \@c, \@d)
912 root 1.84 for values %{ $LOCAL_MON{$_[0]} };
913 root 1.80 };
914    
915 root 1.82 # incremental update
916 root 1.84 $NODE_REQ{g_chg2} = sub {
917 root 1.96 return unless $SRCNODE eq $MASTER;
918 root 1.89 my ($family, $set, $del) = @_;
919    
920     my $db = $MON_DB{$family};
921 root 1.84
922 root 1.89 my (@a, @c);
923 root 1.84
924 root 1.89 while (my ($k, $v) = each %$set) {
925     exists $db->{$k}
926     ? push @c, $k
927     : push @a, $k;
928     $db->{$k} = $v;
929     }
930    
931     delete @$db{@$del};
932    
933     $_->($db, \@a, \@c, $del)
934     for values %{ $LOCAL_MON{$family} };
935 root 1.84 };
936 root 1.80
937 root 1.69 #############################################################################
938     # configure
939    
940 root 1.81 sub nodename {
941 root 1.69 require POSIX;
942     (POSIX::uname ())[1]
943     }
944    
945     sub _resolve($) {
946     my ($nodeid) = @_;
947    
948     my $cv = AE::cv;
949     my @res;
950    
951     $cv->begin (sub {
952     my %seen;
953     my @refs;
954     for (sort { $a->[0] <=> $b->[0] } @res) {
955     push @refs, $_->[1] unless $seen{$_->[1]}++
956     }
957     shift->send (@refs);
958     });
959    
960     my $idx;
961     for my $t (split /,/, $nodeid) {
962     my $pri = ++$idx;
963    
964 root 1.81 $t = length $t ? nodename . ":$t" : nodename
965 root 1.69 if $t =~ /^\d*$/;
966    
967     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
968     or Carp::croak "$t: unparsable transport descriptor";
969    
970     $port = "0" if $port eq "*";
971    
972     if ($host eq "*") {
973     $cv->begin;
974    
975 root 1.103 my $get_addr = sub {
976     my @addr;
977    
978     require Net::Interface;
979    
980     # Net::Interface hangs on some systems, so hope for the best
981     local $SIG{ALRM} = 'DEFAULT';
982     alarm 2;
983    
984     for my $if (Net::Interface->interfaces) {
985     # we statically lower-prioritise ipv6 here, TODO :()
986     for $_ ($if->address (Net::Interface::AF_INET ())) {
987     next if /^\x7f/; # skip localhost etc.
988     push @addr, $_;
989 root 1.69 }
990 root 1.103 for ($if->address (Net::Interface::AF_INET6 ())) {
991     #next if $if->scope ($_) <= 2;
992     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
993     push @addr, $_;
994 root 1.69 }
995     }
996 root 1.103
997     alarm 0;
998    
999     @addr
1000     };
1001    
1002     my @addr;
1003    
1004     if (AnyEvent::WIN32) {
1005     @addr = $get_addr->();
1006     } else {
1007     # use a child process, as Net::Interface is big, and we need it only once.
1008    
1009     pipe my $r, my $w
1010     or die "pipe: $!";
1011    
1012     if (fork eq 0) {
1013     close $r;
1014     syswrite $w, pack "(C/a*)*", $get_addr->();
1015     require POSIX;
1016     POSIX::_exit (0);
1017     } else {
1018     close $w;
1019    
1020     my $addr;
1021    
1022     1 while sysread $r, $addr, 1024, length $addr;
1023    
1024     @addr = unpack "(C/a*)*", $addr;
1025     }
1026     }
1027    
1028     for my $ip (@addr) {
1029     push @res, [
1030     $pri += 1e-5,
1031     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
1032     ];
1033     }
1034     $cv->end;
1035 root 1.69 } else {
1036     $cv->begin;
1037     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
1038     for (@_) {
1039     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
1040     push @res, [
1041     $pri += 1e-5,
1042     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
1043     ];
1044     }
1045     $cv->end;
1046     };
1047     }
1048     }
1049    
1050     $cv->end;
1051    
1052     $cv
1053     }
1054    
1055 root 1.112 our @POST_CONFIGURE;
1056    
1057     # not yet documented
1058     sub post_configure(&) {
1059     die "AnyEvent::MP::Kernel::post_configure must be called in void context" if defined wantarray;
1060    
1061     push @POST_CONFIGURE, @_;
1062     (shift @POST_CONFIGURE)->() while $NODE && @POST_CONFIGURE;
1063     }
1064    
1065 root 1.69 sub configure(@) {
1066     unshift @_, "profile" if @_ & 1;
1067     my (%kv) = @_;
1068    
1069     my $profile = delete $kv{profile};
1070    
1071 root 1.81 $profile = nodename
1072 root 1.69 unless defined $profile;
1073    
1074     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
1075    
1076 root 1.104 $SECURE = $CONFIG->{secure};
1077 root 1.83
1078 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
1079 root 1.69
1080     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
1081    
1082 root 1.106 my $node_obj = delete $NODE{$NODE}; # we do not support doing stuff before configure
1083    
1084 root 1.72 $NODE = $node;
1085 root 1.77
1086 root 1.81 $NODE =~ s/%n/nodename/ge;
1087    
1088     if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
1089 root 1.77 # nodes with randomised node names do not need randomised port names
1090     $UNIQ = "";
1091     }
1092 root 1.69
1093 root 1.106 $node_obj->{id} = $NODE;
1094     $NODE{$NODE} = $node_obj;
1095 root 1.69
1096     my $seeds = $CONFIG->{seeds};
1097     my $binds = $CONFIG->{binds};
1098    
1099     $binds ||= ["*"];
1100    
1101 root 1.98 AE::log 8 => "node $NODE starting up.";
1102 root 1.69
1103 root 1.85 $BINDS = [];
1104     %BINDS = ();
1105 root 1.69
1106     for (map _resolve $_, @$binds) {
1107     for my $bind ($_->recv) {
1108     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1109     or Carp::croak "$bind: unparsable local bind address";
1110    
1111     my $listener = AnyEvent::MP::Transport::mp_server
1112     $host,
1113     $port,
1114     prepare => sub {
1115     my (undef, $host, $port) = @_;
1116     $bind = AnyEvent::Socket::format_hostport $host, $port;
1117     0
1118     },
1119     ;
1120 root 1.85 $BINDS{$bind} = $listener;
1121     push @$BINDS, $bind;
1122 root 1.69 }
1123     }
1124    
1125 root 1.112 AE::log 9 => "running post config hooks and init.";
1126    
1127     # might initialise Global, so need to do it before db_set
1128     post_configure { };
1129    
1130 root 1.85 db_set "'l" => $NODE => $BINDS;
1131 root 1.73
1132 root 1.98 AE::log 8 => "node listens on [@$BINDS].";
1133 root 1.69
1134     # connect to all seednodes
1135     set_seeds map $_->recv, map _resolve $_, @$seeds;
1136 root 1.73 master_search;
1137    
1138 root 1.103 # save gobs of memory
1139     undef &_resolve;
1140     *configure = sub (@){ };
1141    
1142 root 1.112 AE::log 9 => "starting services.";
1143    
1144 root 1.69 for (@{ $CONFIG->{services} }) {
1145     if (ref) {
1146     my ($func, @args) = @$_;
1147     (load_func $func)->(@args);
1148     } elsif (s/::$//) {
1149     eval "require $_";
1150     die $@ if $@;
1151     } else {
1152     (load_func $_)->();
1153     }
1154     }
1155 root 1.102
1156     eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
1157     die "$@" if $@;
1158 root 1.69 }
1159    
1160 root 1.1 =back
1161    
1162 root 1.101 =head1 LOGGING
1163    
1164     AnyEvent::MP::Kernel logs high-level information about the current node,
1165     when nodes go up and down, and most runtime errors. It also logs some
1166     debugging and trace messages about network maintainance, such as seed
1167     connections and global node management.
1168    
1169 root 1.1 =head1 SEE ALSO
1170    
1171     L<AnyEvent::MP>.
1172    
1173     =head1 AUTHOR
1174    
1175     Marc Lehmann <schmorp@schmorp.de>
1176     http://home.schmorp.de/
1177    
1178     =cut
1179    
1180     1
1181