ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.118
Committed: Thu Aug 31 15:39:52 2017 UTC (6 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-2_0
Changes since 1.117: +2 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9 root 1.106 $AnyEvent::MP::Kernel::SRCNODE # contains msg origin node id, for debugging
10    
11     snd_to_func $node, $func, @args # send msg to function
12     snd_on $node, @msg # snd message again (relay)
13     eval_on $node, $string[, @reply] # execute perl code on another node
14    
15     node_is_up $nodeid # return true if a node is connected
16     @nodes = up_nodes # return a list of all connected nodes
17     $guard = mon_nodes $callback->($node, $is_up, @reason) # connections up/downs
18    
19 root 1.1 =head1 DESCRIPTION
20    
21 root 1.106 This module implements most of the inner workings of AnyEvent::MP. It
22     offers mostly lower-level functions that deal with network connectivity
23     and special requests.
24    
25     You normally interface with AnyEvent::MP through a higher level interface
26     such as L<AnyEvent::MP> and L<Coro::MP>, although there is nothing wrong
27     with using the functions from this module.
28 root 1.3
29     =head1 GLOBALS AND FUNCTIONS
30 root 1.1
31     =over 4
32    
33     =cut
34    
35     package AnyEvent::MP::Kernel;
36    
37     use common::sense;
38     use Carp ();
39    
40 root 1.79 use AnyEvent ();
41     use Guard ();
42 root 1.1
43     use AnyEvent::MP::Node;
44     use AnyEvent::MP::Transport;
45    
46     use base "Exporter";
47    
48 root 1.106 # for re-export in AnyEvent::MP and Coro::MP
49     our @EXPORT_API = qw(
50     NODE $NODE
51     configure
52     node_of port_is_local
53     snd kil
54     db_set db_del
55     db_mon db_family db_keys db_values
56     );
57    
58     our @EXPORT_OK = (
59     # these are internal
60     qw(
61     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
62     add_node load_func
63     ),
64     @EXPORT_API,
65 root 1.71 );
66    
67 root 1.1 our @EXPORT = qw(
68 root 1.106 snd_to_func snd_on eval_on
69     port_is_local
70 root 1.46 up_nodes mon_nodes node_is_up
71 root 1.1 );
72    
73 root 1.118 our @CARP_NOT = (AnyEvent::MP::);
74    
75 root 1.6 sub load_func($) {
76     my $func = $_[0];
77    
78     unless (defined &$func) {
79     my $pkg = $func;
80     do {
81     $pkg =~ s/::[^:]+$//
82 root 1.63 or return sub { die "unable to resolve function '$func'" };
83 root 1.60
84     local $@;
85 root 1.61 unless (eval "require $pkg; 1") {
86     my $error = $@;
87     $error =~ /^Can't locate .*.pm in \@INC \(/
88     or return sub { die $error };
89     }
90 root 1.6 } until defined &$func;
91     }
92    
93     \&$func
94     }
95    
96 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
97    
98 root 1.1 sub nonce($) {
99 root 1.78 join "", map chr rand 256, 1 .. $_[0]
100 root 1.1 }
101    
102 root 1.78 sub nonce62($) {
103     join "", map $alnum[rand 62], 1 .. $_[0]
104 root 1.1 }
105    
106 root 1.20 our $CONFIG; # this node's configuration
107 root 1.104 our $SECURE;
108 root 1.21
109 root 1.64 our $RUNIQ; # remote uniq value
110     our $UNIQ; # per-process/node unique cookie
111     our $NODE;
112     our $ID = "a";
113 root 1.1
114     our %NODE; # node id to transport mapping, or "undef", for local node
115     our (%PORT, %PORT_DATA); # local ports
116    
117 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
118 root 1.1 our %LMON; # monitored _local_ ports
119    
120 root 1.111 #our $GLOBAL; # true if node is a global ("directory") node
121 root 1.85 our %BINDS;
122     our $BINDS; # our listeners, as arrayref
123 root 1.1
124 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
125 root 1.111 our $GLOBAL; # true when this is a global node (only set by AnyEvent::MP::Global)
126 root 1.1
127 root 1.106 # initialise names for non-networked operation
128     {
129 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
130 root 1.106 my $now = AE::now;
131 root 1.112 $UNIQ =
132 root 1.106 (join "",
133     map $alnum[$_],
134     $$ / 62 % 62,
135     $$ % 62,
136     (int $now ) % 62,
137     (int $now * 100) % 62,
138     (int $now * 10000) % 62,
139     ) . nonce62 4
140     ;
141 root 1.78
142     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
143     $RUNIQ = nonce62 10;
144     $RUNIQ =~ s/(.)$/\U$1/;
145    
146 root 1.106 $NODE = "";
147 root 1.64 }
148    
149 root 1.1 sub NODE() {
150     $NODE
151     }
152    
153     sub node_of($) {
154 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
155 root 1.1
156 root 1.21 $node
157 root 1.1 }
158    
159 root 1.17 BEGIN {
160     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
161     ? sub () { 1 }
162     : sub () { 0 };
163     }
164 root 1.1
165 root 1.42 our $DELAY_TIMER;
166     our @DELAY_QUEUE;
167    
168 root 1.97 our $delay_run = sub {
169 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
170 root 1.97 };
171 root 1.42
172     sub delay($) {
173     push @DELAY_QUEUE, shift;
174 root 1.97 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
175 root 1.42 }
176    
177 root 1.96 =item $AnyEvent::MP::Kernel::SRCNODE
178    
179     During execution of a message callback, this variable contains the node ID
180     of the origin node.
181    
182     The main use of this variable is for debugging output - there are probably
183     very few other cases where you need to know the source node ID.
184    
185     =cut
186    
187 root 1.1 sub _inject {
188 root 1.117 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;
189    
190 root 1.1 &{ $PORT{+shift} or return };
191     }
192    
193 root 1.20 # this function adds a node-ref, so you can send stuff to it
194     # it is basically the central routing component.
195 root 1.1 sub add_node {
196 root 1.107 $NODE{$_[0]} || do {
197     my ($node) = @_;
198 root 1.1
199 root 1.107 length $node
200     or Carp::croak "'undef' or the empty string are not valid node/port IDs";
201 root 1.93
202 root 1.107 # registers itself in %NODE
203     new AnyEvent::MP::Node::Remote $node
204     }
205 root 1.13 }
206    
207 root 1.1 sub snd(@) {
208 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
209 root 1.1
210 root 1.117 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;
211 root 1.1
212 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
213 root 1.2 ->{send} (["$portid", @_]);
214 root 1.1 }
215    
216 root 1.17 sub port_is_local($) {
217 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
218 root 1.17
219 root 1.106 $nodeid eq $NODE
220 root 1.17 }
221    
222 root 1.18 =item snd_to_func $node, $func, @args
223 root 1.11
224 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
225 root 1.11 this function with the given arguments on that node.
226    
227 root 1.20 This function can be used to implement C<spawn>-like interfaces.
228 root 1.11
229     =cut
230    
231 root 1.18 sub snd_to_func($$;@) {
232 root 1.21 my $nodeid = shift;
233 root 1.11
234 root 1.41 # on $NODE, we artificially delay... (for spawn)
235     # this is very ugly - maybe we should simply delay ALL messages,
236     # to avoid deep recursion issues. but that's so... slow...
237 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
238     if $nodeid ne $NODE;
239    
240 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
241 root 1.11 }
242    
243 root 1.18 =item snd_on $node, @msg
244    
245     Executes C<snd> with the given C<@msg> (which must include the destination
246     port) on the given node.
247    
248     =cut
249    
250     sub snd_on($@) {
251     my $node = shift;
252     snd $node, snd => @_;
253     }
254    
255 root 1.29 =item eval_on $node, $string[, @reply]
256 root 1.18
257 root 1.29 Evaluates the given string as Perl expression on the given node. When
258     @reply is specified, then it is used to construct a reply message with
259     C<"$@"> and any results from the eval appended.
260 root 1.18
261     =cut
262    
263 root 1.29 sub eval_on($$;@) {
264 root 1.18 my $node = shift;
265     snd $node, eval => @_;
266     }
267    
268 root 1.1 sub kil(@) {
269 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
270 root 1.1
271     length $portid
272 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
273 root 1.1
274 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
275 root 1.1 ->kill ("$portid", @_);
276     }
277    
278     #############################################################################
279 root 1.6 # node monitoring and info
280 root 1.3
281 root 1.106 =item $bool = node_is_up $nodeid
282 root 1.13
283     Returns true if the given node is "up", that is, the kernel thinks it has
284     a working connection to it.
285    
286 root 1.107 More precisely, if the node is up, returns C<1>. If the node is currently
287     connecting or otherwise known but not connected, returns C<0>. If nothing
288     is known about the node, returns C<undef>.
289 root 1.13
290     =cut
291    
292     sub node_is_up($) {
293 root 1.107 ($_[0] eq $NODE) || ($NODE{$_[0]} or return)->{transport}
294 root 1.13 ? 1 : 0
295     }
296    
297 root 1.106 =item @nodes = up_nodes
298 root 1.3
299 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
300     the node itself).
301 root 1.3
302     =cut
303    
304 root 1.49 sub up_nodes() {
305 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
306 root 1.3 }
307    
308 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
309 root 1.3
310 root 1.27 Registers a callback that is called each time a node goes up (a connection
311     is established) or down (the connection is lost).
312 root 1.3
313     Node up messages can only be followed by node down messages for the same
314     node, and vice versa.
315    
316 root 1.71 Note that monitoring a node is usually better done by monitoring its node
317 root 1.27 port. This function is mainly of interest to modules that are concerned
318     about the network topology and low-level connection handling.
319    
320     Callbacks I<must not> block and I<should not> send any messages.
321    
322     The function returns an optional guard which can be used to unregister
323 root 1.3 the monitoring callback again.
324    
325 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
326     or go up (and down).
327    
328     newnode $_, 1 for up_nodes;
329     mon_nodes \&newnode;
330    
331 root 1.3 =cut
332    
333     our %MON_NODES;
334    
335     sub mon_nodes($) {
336     my ($cb) = @_;
337    
338     $MON_NODES{$cb+0} = $cb;
339    
340 root 1.90 defined wantarray
341     and Guard::guard { delete $MON_NODES{$cb+0} }
342 root 1.3 }
343    
344     sub _inject_nodeevent($$;@) {
345 root 1.16 my ($node, $up, @reason) = @_;
346 root 1.3
347 root 1.107 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
348    
349 root 1.3 for my $cb (values %MON_NODES) {
350 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
351 root 1.98 or AE::log die => $@;
352 root 1.3 }
353     }
354    
355     #############################################################################
356 root 1.1 # self node code
357    
358 root 1.67 sub _kill {
359     my $port = shift;
360    
361     delete $PORT{$port}
362     or return; # killing nonexistent ports is O.K.
363     delete $PORT_DATA{$port};
364    
365     my $mon = delete $LMON{$port}
366     or !@_
367 root 1.98 or AE::log die => "unmonitored local port $port died with reason: @_";
368 root 1.67
369     $_->(@_) for values %$mon;
370     }
371    
372     sub _monitor {
373     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
374     unless exists $PORT{$_[1]};
375    
376     $LMON{$_[1]}{$_[2]+0} = $_[2];
377     }
378    
379     sub _unmonitor {
380 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
381     if exists $LMON{$_[1]};
382 root 1.67 }
383    
384 root 1.83 sub _secure_check {
385 root 1.104 $SECURE
386 root 1.105 and die "remote execution not allowed\n";
387 root 1.83 }
388    
389 root 1.107 our %NODE_REQ;
390    
391     %NODE_REQ = (
392     # "mproto" - monitoring protocol
393 root 1.1
394     # monitoring
395 root 1.65 mon0 => sub { # stop monitoring a port for another node
396 root 1.1 my $portid = shift;
397 root 1.96 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
398 root 1.1 },
399 root 1.65 mon1 => sub { # start monitoring a port for another node
400 root 1.1 my $portid = shift;
401 root 1.96 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
402 root 1.67 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
403 root 1.58 delete $node->{rmon}{$portid};
404 root 1.65 $node->send (["", kil0 => $portid, @_])
405 root 1.59 if $node && $node->{transport};
406 root 1.67 };
407 root 1.1 },
408 root 1.65 # another node has killed a monitored port
409     kil0 => sub {
410 root 1.96 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
411 root 1.1 or return;
412    
413     $_->(@_) for @$cbs;
414     },
415 root 1.107 # another node wants to kill a local port
416     kil1 => \&_kill,
417 root 1.1
418 root 1.18 # "public" services - not actually public
419 root 1.1
420     # relay message to another node / generic echo
421 root 1.88 snd => sub {
422     &snd
423 root 1.1 },
424 root 1.107 # ask if a node supports the given request, only works for fixed tags
425     can => sub {
426     my $method = shift;
427     snd @_, exists $NODE_REQ{$method};
428     },
429 root 1.1
430 root 1.30 # random utilities
431 root 1.1 eval => sub {
432 root 1.83 &_secure_check;
433 root 1.50 my @res = do { package main; eval shift };
434 root 1.1 snd @_, "$@", @res if @_;
435     },
436     time => sub {
437 root 1.76 snd @_, AE::now;
438 root 1.1 },
439     devnull => sub {
440     #
441     },
442 root 1.15 "" => sub {
443 root 1.27 # empty messages are keepalives or similar devnull-applications
444 root 1.15 },
445 root 1.1 );
446    
447 root 1.104 # the node port
448 root 1.107 new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE
449    
450 root 1.1 $PORT{""} = sub {
451     my $tag = shift;
452 root 1.83 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
453 root 1.98 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
454 root 1.1 };
455    
456 root 1.107 our $MPROTO = 1;
457 root 1.84
458     # tell everybody who connects our nproto
459     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
460 root 1.107 $_[0]{local_greeting}{mproto} = $MPROTO;
461 root 1.84 };
462    
463 root 1.69 #############################################################################
464 root 1.71 # seed management, try to keep connections to all seeds at all times
465 root 1.69
466 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
467     our %NODE_SEED; # map node ID to seed ID
468 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
469     our $SEED_WATCHER;
470 root 1.71 our $SEED_RETRY;
471 root 1.111 our %GLOBAL_NODE; # global => undef
472 root 1.69
473     sub seed_connect {
474     my ($seed) = @_;
475    
476     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
477     or Carp::croak "$seed: unparsable seed address";
478    
479 root 1.98 AE::log 9 => "trying connect to seed node $seed.";
480 root 1.69
481 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
482     $host, $port,
483     on_greeted => sub {
484     # called after receiving remote greeting, learn remote node name
485    
486     # we rely on untrusted data here (the remote node name) this is
487     # hopefully ok, as this can at most be used for DOSing, which is easy
488     # when you can do MITM anyway.
489    
490     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
491     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
492 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
493 root 1.71 delete $SEED_NODE{$seed};
494     } else {
495     $SEED_NODE{$seed} = $_[0]{remote_node};
496     $NODE_SEED{$_[0]{remote_node}} = $seed;
497 root 1.111
498     # also start global service, in case it isn't running
499     # since we probably switch conenctions, maybe we don't need to do this here?
500     snd $_[0]{remote_node}, "g_slave";
501 root 1.71 }
502     },
503 root 1.93 sub {
504 root 1.71 delete $SEED_CONNECT{$seed};
505     }
506 root 1.69 ;
507     }
508    
509     sub seed_all {
510 root 1.93 my @seeds = grep
511 root 1.113 !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
512 root 1.93 keys %SEED_NODE;
513 root 1.69
514     if (@seeds) {
515 root 1.70 # start connection attempt for every seed we are not connected to yet
516 root 1.69 seed_connect $_
517 root 1.113 for grep !exists $SEED_CONNECT{$_}, @seeds;
518 root 1.71
519 root 1.111 $SEED_RETRY = $SEED_RETRY * 2;
520 root 1.71 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
521     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
522    
523     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
524    
525 root 1.69 } else {
526 root 1.71 # all seeds connected or connecting, no need to restart timer
527 root 1.69 undef $SEED_WATCHER;
528     }
529     }
530    
531     sub seed_again {
532 root 1.111 $SEED_RETRY = (1 + rand) * 0.6;
533 root 1.113 $SEED_WATCHER ||= AE::timer 0, 0, \&seed_all;
534 root 1.69 }
535    
536     # sets new seed list, starts connecting
537     sub set_seeds(@) {
538     %SEED_NODE = ();
539 root 1.71 %NODE_SEED = ();
540     %SEED_CONNECT = ();
541    
542 root 1.69 @SEED_NODE{@_} = ();
543    
544 root 1.114 seed_again;
545 root 1.71 }
546    
547 root 1.111 # normal nodes only record global node connections
548     $NODE_REQ{g_global} = sub {
549     undef $GLOBAL_NODE{$SRCNODE};
550     };
551    
552 root 1.71 mon_nodes sub {
553 root 1.111 delete $GLOBAL_NODE{$_[0]}
554     unless $_[1];
555    
556 root 1.93 return unless exists $NODE_SEED{$_[0]};
557    
558     if ($_[1]) {
559     # each time a connection to a seed node goes up, make
560     # sure it runs the global service.
561 root 1.111 snd $_[0], "g_slave";
562 root 1.93 } else {
563     # if we lost the connection to a seed node, make sure we are seeding
564     seed_again;
565     }
566 root 1.71 };
567    
568     #############################################################################
569 root 1.107 # keepalive code - used to kepe conenctions to certain nodes alive
570     # only used by global code atm., but ought to be exposed somehow.
571 root 1.109 #TODO: should probbaly be done directly by node objects
572 root 1.107
573     our $KEEPALIVE_RETRY;
574     our $KEEPALIVE_WATCHER;
575     our %KEEPALIVE; # we want to keep these nodes alive
576     our %KEEPALIVE_DOWN; # nodes that are down currently
577    
578     sub keepalive_all {
579     AE::log 9 => "keepalive: trying to establish connections with: "
580     . (join " ", keys %KEEPALIVE_DOWN)
581     . ".";
582    
583     (add_node $_)->connect
584     for keys %KEEPALIVE_DOWN;
585    
586 root 1.111 $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2;
587 root 1.107 $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
588     if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
589    
590     $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
591     }
592    
593     sub keepalive_again {
594 root 1.111 $KEEPALIVE_RETRY = (1 + rand) * 0.3;
595 root 1.108 keepalive_all;
596 root 1.107 }
597    
598     sub keepalive_add {
599     return if $KEEPALIVE{$_[0]}++;
600    
601     return if node_is_up $_[0];
602     undef $KEEPALIVE_DOWN{$_[0]};
603     keepalive_again;
604     }
605    
606     sub keepalive_del {
607     return if --$KEEPALIVE{$_[0]};
608    
609     delete $KEEPALIVE {$_[0]};
610     delete $KEEPALIVE_DOWN{$_[0]};
611    
612     undef $KEEPALIVE_WATCHER
613     unless %KEEPALIVE_DOWN;
614     }
615    
616     mon_nodes sub {
617     return unless exists $KEEPALIVE{$_[0]};
618    
619     if ($_[1]) {
620     delete $KEEPALIVE_DOWN{$_[0]};
621    
622     undef $KEEPALIVE_WATCHER
623     unless %KEEPALIVE_DOWN;
624     } else {
625     # lost the conenction, try to connect again
626     undef $KEEPALIVE_DOWN{$_[0]};
627     keepalive_again;
628     }
629     };
630    
631     #############################################################################
632 root 1.71 # talk with/to global nodes
633    
634 root 1.72 # protocol messages:
635     #
636 root 1.110 # sent by global nodes
637 root 1.111 # g_global - global nodes send this to all others
638 root 1.72 #
639 root 1.110 # database protocol
640     # g_slave database - make other global node master of the sender
641     # g_set database - global node's database to other global nodes
642 root 1.111 # g_upd family set del - update single family (any to global)
643 root 1.73 #
644 root 1.110 # slave <-> global protocol
645     # g_find node - query addresses for node (slave to global)
646     # g_found node binds - node addresses (global to slave)
647     # g_db_family family id - send g_reply with data (global to slave)
648     # g_db_keys family id - send g_reply with data (global to slave)
649     # g_db_values family id - send g_reply with data (global to slave)
650     # g_reply id result - result of any query (global to slave)
651     # g_mon1 family - start to monitor family, replies with g_chg1
652     # g_mon0 family - stop monitoring family
653     # g_chg1 family hash - initial value of family when starting to monitor
654     # g_chg2 family set del - like g_upd, but for monitoring only
655 root 1.73 #
656 root 1.111 # internal database families:
657 root 1.73 # "'l" -> node -> listeners
658     # "'g" -> node -> undef
659     # ...
660 root 1.72 #
661    
662 root 1.73 # used on all nodes:
663 root 1.88 our $MASTER; # the global node we bind ourselves to
664 root 1.78 our $MASTER_MON;
665     our %LOCAL_DB; # this node database
666    
667 root 1.84 our $GPROTO = 1;
668    
669 root 1.116 # tell everybody who connects our gproto
670 root 1.84 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
671     $_[0]{local_greeting}{gproto} = $GPROTO;
672     };
673    
674 root 1.73 #############################################################################
675     # master selection
676    
677     # master requests
678     our %GLOBAL_REQ; # $id => \@req
679 root 1.71
680 root 1.74 sub global_req_add {
681 root 1.80 my ($id, $req) = @_;
682 root 1.71
683 root 1.74 return if exists $GLOBAL_REQ{$id};
684    
685 root 1.80 $GLOBAL_REQ{$id} = $req;
686 root 1.71
687 root 1.80 snd $MASTER, @$req
688 root 1.73 if $MASTER;
689 root 1.74 }
690 root 1.71
691 root 1.74 sub global_req_del {
692     delete $GLOBAL_REQ{$_[0]};
693     }
694    
695 root 1.88 #################################
696     # master rpc
697    
698     our %GLOBAL_RES;
699     our $GLOBAL_RES_ID = "a";
700    
701     sub global_call {
702     my $id = ++$GLOBAL_RES_ID;
703     $GLOBAL_RES{$id} = pop;
704     global_req_add $id, [@_, $id];
705     }
706    
707     $NODE_REQ{g_reply} = sub {
708     my $id = shift;
709     global_req_del $id;
710     my $cb = delete $GLOBAL_RES{$id}
711     or return;
712     &$cb
713     };
714    
715     #################################
716    
717 root 1.74 sub g_find {
718 root 1.80 global_req_add "g_find $_[0]", [g_find => $_[0]];
719 root 1.73 }
720 root 1.71
721 root 1.73 # reply for g_find started in Node.pm
722 root 1.79 $NODE_REQ{g_found} = sub {
723 root 1.74 global_req_del "g_find $_[0]";
724    
725 root 1.73 my $node = $NODE{$_[0]} or return;
726 root 1.71
727 root 1.79 $node->connect_to ($_[1]);
728 root 1.71 };
729    
730 root 1.73 sub master_set {
731     $MASTER = $_[0];
732 root 1.111 AE::log 8 => "new master node: $MASTER.";
733    
734     $MASTER_MON = mon_nodes sub {
735     if ($_[0] eq $MASTER && !$_[1]) {
736     undef $MASTER;
737     master_search ();
738     }
739     };
740 root 1.71
741 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
742 root 1.71
743 root 1.73 # (re-)send queued requests
744     snd $MASTER, @$_
745     for values %GLOBAL_REQ;
746     }
747 root 1.71
748 root 1.72 sub master_search {
749 root 1.111 AE::log 9 => "starting search for master node.";
750    
751 root 1.115 #TODO: should also look for other global nodes, but we don't know them
752 root 1.73 for (keys %NODE_SEED) {
753 root 1.72 if (node_is_up $_) {
754     master_set $_;
755     return;
756 root 1.71 }
757 root 1.72 }
758 root 1.71
759 root 1.78 $MASTER_MON = mon_nodes sub {
760 root 1.72 return unless $_[1]; # we are only interested in node-ups
761     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
762 root 1.71
763 root 1.72 master_set $_[0];
764     };
765 root 1.71 }
766    
767 root 1.110 # other node wants to make us the master, so start the global service
768 root 1.79 $NODE_REQ{g_slave} = sub {
769 root 1.80 # load global module and redo the request
770 root 1.73 require AnyEvent::MP::Global;
771 root 1.80 &{ $NODE_REQ{g_slave} }
772 root 1.71 };
773    
774 root 1.73 #############################################################################
775 root 1.79 # local database operations
776 root 1.71
777 root 1.111 # canonical probably not needed
778     our $sv_eq_coder = JSON::XS->new->utf8->allow_nonref;
779    
780     # are the two scalars equal? very very ugly and slow, need better way
781     sub sv_eq($$) {
782     ref $_[0] || ref $_[1]
783     ? (JSON::XS::encode $sv_eq_coder, $_[0]) eq (JSON::XS::encode $sv_eq_coder, $_[1])
784     : $_[0] eq $_[1]
785     && defined $_[0] == defined $_[1]
786     }
787    
788 root 1.79 # local database management
789 root 1.88
790 root 1.111 sub db_del($@) {
791     my $family = shift;
792    
793     my @del = grep exists $LOCAL_DB{$family}{$_}, @_;
794    
795     return unless @del;
796    
797     delete @{ $LOCAL_DB{$family} }{@del};
798     snd $MASTER, g_upd => $family => undef, \@del
799     if defined $MASTER;
800     }
801    
802 root 1.87 sub db_set($$;$) {
803 root 1.97 my ($family, $subkey) = @_;
804    
805 root 1.89 # if (ref $_[1]) {
806     # # bulk
807     # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
808     # $LOCAL_DB{$_[0]} = $_[1];
809     # snd $MASTER, g_upd => $_[0] => $_[1], \@del
810     # if defined $MASTER;
811     # } else {
812     # single-key
813 root 1.111 unless (exists $LOCAL_DB{$family}{$subkey} && sv_eq $LOCAL_DB{$family}{$subkey}, $_[2]) {
814     $LOCAL_DB{$family}{$subkey} = $_[2];
815     snd $MASTER, g_upd => $family => { $subkey => $_[2] }
816     if defined $MASTER;
817     }
818 root 1.89 # }
819 root 1.97
820     defined wantarray
821     and Guard::guard { db_del $family => $subkey }
822 root 1.79 }
823    
824 root 1.88 # database query
825    
826     sub db_family {
827     my ($family, $cb) = @_;
828     global_call g_db_family => $family, $cb;
829     }
830    
831     sub db_keys {
832     my ($family, $cb) = @_;
833     global_call g_db_keys => $family, $cb;
834     }
835    
836     sub db_values {
837     my ($family, $cb) = @_;
838     global_call g_db_values => $family, $cb;
839 root 1.80 }
840    
841 root 1.88 # database monitoring
842 root 1.80
843 root 1.81 our %LOCAL_MON; # f, reply
844     our %MON_DB; # f, k, value
845 root 1.80
846 root 1.81 sub db_mon($@) {
847 root 1.84 my ($family, $cb) = @_;
848 root 1.81
849 root 1.84 if (my $db = $MON_DB{$family}) {
850 root 1.89 # we already monitor, so create a "dummy" change event
851     # this is postponed, which might be too late (we could process
852     # change events), so disable the callback at first
853     $LOCAL_MON{$family}{$cb+0} = sub { };
854     AE::postpone {
855     return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
856    
857     # set actual callback
858     $LOCAL_MON{$family}{$cb+0} = $cb;
859     $cb->($db, [keys %$db]);
860     };
861 root 1.81 } else {
862     # new monitor, request chg1 from upstream
863 root 1.89 $LOCAL_MON{$family}{$cb+0} = $cb;
864 root 1.81 global_req_add "mon1 $family" => [g_mon1 => $family];
865     $MON_DB{$family} = {};
866     }
867    
868 root 1.90 defined wantarray
869     and Guard::guard {
870     my $mon = $LOCAL_MON{$family};
871     delete $mon->{$cb+0};
872    
873     unless (%$mon) {
874     global_req_del "mon1 $family";
875    
876     # no global_req, because we don't care if we are not connected
877     snd $MASTER, g_mon0 => $family
878     if $MASTER;
879 root 1.80
880 root 1.90 delete $LOCAL_MON{$family};
881     delete $MON_DB{$family};
882     }
883 root 1.80 }
884     }
885    
886 root 1.82 # full update
887 root 1.80 $NODE_REQ{g_chg1} = sub {
888 root 1.96 return unless $SRCNODE eq $MASTER;
889 root 1.84 my ($f, $ndb) = @_;
890    
891     my $db = $MON_DB{$f};
892 root 1.89 my (@a, @c, @d);
893 root 1.81
894 root 1.82 # add or replace keys
895 root 1.84 while (my ($k, $v) = each %$ndb) {
896 root 1.89 exists $db->{$k}
897     ? push @c, $k
898     : push @a, $k;
899 root 1.84 $db->{$k} = $v;
900 root 1.82 }
901 root 1.81
902 root 1.82 # delete keys that are no longer present
903 root 1.84 for (grep !exists $ndb->{$_}, keys %$db) {
904     delete $db->{$_};
905 root 1.89 push @d, $_;
906 root 1.81 }
907 root 1.84
908 root 1.89 $_->($db, \@a, \@c, \@d)
909 root 1.84 for values %{ $LOCAL_MON{$_[0]} };
910 root 1.80 };
911    
912 root 1.82 # incremental update
913 root 1.84 $NODE_REQ{g_chg2} = sub {
914 root 1.96 return unless $SRCNODE eq $MASTER;
915 root 1.89 my ($family, $set, $del) = @_;
916    
917     my $db = $MON_DB{$family};
918 root 1.84
919 root 1.89 my (@a, @c);
920 root 1.84
921 root 1.89 while (my ($k, $v) = each %$set) {
922     exists $db->{$k}
923     ? push @c, $k
924     : push @a, $k;
925     $db->{$k} = $v;
926     }
927    
928     delete @$db{@$del};
929    
930     $_->($db, \@a, \@c, $del)
931     for values %{ $LOCAL_MON{$family} };
932 root 1.84 };
933 root 1.80
934 root 1.69 #############################################################################
935     # configure
936    
937 root 1.81 sub nodename {
938 root 1.69 require POSIX;
939     (POSIX::uname ())[1]
940     }
941    
942     sub _resolve($) {
943     my ($nodeid) = @_;
944    
945     my $cv = AE::cv;
946     my @res;
947    
948     $cv->begin (sub {
949     my %seen;
950     my @refs;
951     for (sort { $a->[0] <=> $b->[0] } @res) {
952     push @refs, $_->[1] unless $seen{$_->[1]}++
953     }
954     shift->send (@refs);
955     });
956    
957     my $idx;
958     for my $t (split /,/, $nodeid) {
959     my $pri = ++$idx;
960    
961 root 1.81 $t = length $t ? nodename . ":$t" : nodename
962 root 1.69 if $t =~ /^\d*$/;
963    
964     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
965     or Carp::croak "$t: unparsable transport descriptor";
966    
967     $port = "0" if $port eq "*";
968    
969     if ($host eq "*") {
970     $cv->begin;
971    
972 root 1.103 my $get_addr = sub {
973     my @addr;
974    
975     require Net::Interface;
976    
977     # Net::Interface hangs on some systems, so hope for the best
978     local $SIG{ALRM} = 'DEFAULT';
979     alarm 2;
980    
981     for my $if (Net::Interface->interfaces) {
982     # we statically lower-prioritise ipv6 here, TODO :()
983     for $_ ($if->address (Net::Interface::AF_INET ())) {
984     next if /^\x7f/; # skip localhost etc.
985     push @addr, $_;
986 root 1.69 }
987 root 1.103 for ($if->address (Net::Interface::AF_INET6 ())) {
988     #next if $if->scope ($_) <= 2;
989     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
990     push @addr, $_;
991 root 1.69 }
992     }
993 root 1.103
994     alarm 0;
995    
996     @addr
997     };
998    
999     my @addr;
1000    
1001     if (AnyEvent::WIN32) {
1002     @addr = $get_addr->();
1003     } else {
1004     # use a child process, as Net::Interface is big, and we need it only once.
1005    
1006     pipe my $r, my $w
1007     or die "pipe: $!";
1008    
1009     if (fork eq 0) {
1010     close $r;
1011     syswrite $w, pack "(C/a*)*", $get_addr->();
1012     require POSIX;
1013     POSIX::_exit (0);
1014     } else {
1015     close $w;
1016    
1017     my $addr;
1018    
1019     1 while sysread $r, $addr, 1024, length $addr;
1020    
1021     @addr = unpack "(C/a*)*", $addr;
1022     }
1023     }
1024    
1025     for my $ip (@addr) {
1026     push @res, [
1027     $pri += 1e-5,
1028     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
1029     ];
1030     }
1031     $cv->end;
1032 root 1.69 } else {
1033     $cv->begin;
1034     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
1035     for (@_) {
1036     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
1037     push @res, [
1038     $pri += 1e-5,
1039     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
1040     ];
1041     }
1042     $cv->end;
1043     };
1044     }
1045     }
1046    
1047     $cv->end;
1048    
1049     $cv
1050     }
1051    
1052 root 1.112 our @POST_CONFIGURE;
1053    
1054     # not yet documented
1055     sub post_configure(&) {
1056     die "AnyEvent::MP::Kernel::post_configure must be called in void context" if defined wantarray;
1057    
1058     push @POST_CONFIGURE, @_;
1059     (shift @POST_CONFIGURE)->() while $NODE && @POST_CONFIGURE;
1060     }
1061    
1062 root 1.69 sub configure(@) {
1063     unshift @_, "profile" if @_ & 1;
1064     my (%kv) = @_;
1065    
1066     my $profile = delete $kv{profile};
1067    
1068 root 1.81 $profile = nodename
1069 root 1.69 unless defined $profile;
1070    
1071     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
1072    
1073 root 1.104 $SECURE = $CONFIG->{secure};
1074 root 1.83
1075 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
1076 root 1.69
1077     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
1078    
1079 root 1.106 my $node_obj = delete $NODE{$NODE}; # we do not support doing stuff before configure
1080    
1081 root 1.72 $NODE = $node;
1082 root 1.77
1083 root 1.81 $NODE =~ s/%n/nodename/ge;
1084    
1085     if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
1086 root 1.77 # nodes with randomised node names do not need randomised port names
1087     $UNIQ = "";
1088     }
1089 root 1.69
1090 root 1.106 $node_obj->{id} = $NODE;
1091     $NODE{$NODE} = $node_obj;
1092 root 1.69
1093     my $seeds = $CONFIG->{seeds};
1094     my $binds = $CONFIG->{binds};
1095    
1096     $binds ||= ["*"];
1097    
1098 root 1.98 AE::log 8 => "node $NODE starting up.";
1099 root 1.69
1100 root 1.85 $BINDS = [];
1101     %BINDS = ();
1102 root 1.69
1103     for (map _resolve $_, @$binds) {
1104     for my $bind ($_->recv) {
1105     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1106     or Carp::croak "$bind: unparsable local bind address";
1107    
1108     my $listener = AnyEvent::MP::Transport::mp_server
1109     $host,
1110     $port,
1111     prepare => sub {
1112     my (undef, $host, $port) = @_;
1113     $bind = AnyEvent::Socket::format_hostport $host, $port;
1114     0
1115     },
1116     ;
1117 root 1.85 $BINDS{$bind} = $listener;
1118     push @$BINDS, $bind;
1119 root 1.69 }
1120     }
1121    
1122 root 1.112 AE::log 9 => "running post config hooks and init.";
1123    
1124     # might initialise Global, so need to do it before db_set
1125     post_configure { };
1126    
1127 root 1.85 db_set "'l" => $NODE => $BINDS;
1128 root 1.73
1129 root 1.98 AE::log 8 => "node listens on [@$BINDS].";
1130 root 1.69
1131     # connect to all seednodes
1132     set_seeds map $_->recv, map _resolve $_, @$seeds;
1133 root 1.73 master_search;
1134    
1135 root 1.103 # save gobs of memory
1136     undef &_resolve;
1137     *configure = sub (@){ };
1138    
1139 root 1.112 AE::log 9 => "starting services.";
1140    
1141 root 1.69 for (@{ $CONFIG->{services} }) {
1142     if (ref) {
1143     my ($func, @args) = @$_;
1144     (load_func $func)->(@args);
1145     } elsif (s/::$//) {
1146     eval "require $_";
1147     die $@ if $@;
1148     } else {
1149     (load_func $_)->();
1150     }
1151     }
1152 root 1.102
1153     eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
1154     die "$@" if $@;
1155 root 1.69 }
1156    
1157 root 1.1 =back
1158    
1159 root 1.101 =head1 LOGGING
1160    
1161     AnyEvent::MP::Kernel logs high-level information about the current node,
1162     when nodes go up and down, and most runtime errors. It also logs some
1163     debugging and trace messages about network maintainance, such as seed
1164     connections and global node management.
1165    
1166 root 1.1 =head1 SEE ALSO
1167    
1168     L<AnyEvent::MP>.
1169    
1170     =head1 AUTHOR
1171    
1172     Marc Lehmann <schmorp@schmorp.de>
1173     http://home.schmorp.de/
1174    
1175     =cut
1176    
1177     1
1178