ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.116
Committed: Sun Aug 28 14:24:19 2016 UTC (7 years, 9 months ago) by root
Branch: MAIN
Changes since 1.115: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9 root 1.106 $AnyEvent::MP::Kernel::SRCNODE # contains msg origin node id, for debugging
10    
11     snd_to_func $node, $func, @args # send msg to function
12     snd_on $node, @msg # snd message again (relay)
13     eval_on $node, $string[, @reply] # execute perl code on another node
14    
15     node_is_up $nodeid # return true if a node is connected
16     @nodes = up_nodes # return a list of all connected nodes
17     $guard = mon_nodes $callback->($node, $is_up, @reason) # connections up/downs
18    
19 root 1.1 =head1 DESCRIPTION
20    
21 root 1.106 This module implements most of the inner workings of AnyEvent::MP. It
22     offers mostly lower-level functions that deal with network connectivity
23     and special requests.
24    
25     You normally interface with AnyEvent::MP through a higher level interface
26     such as L<AnyEvent::MP> and L<Coro::MP>, although there is nothing wrong
27     with using the functions from this module.
28 root 1.3
29     =head1 GLOBALS AND FUNCTIONS
30 root 1.1
31     =over 4
32    
33     =cut
34    
35     package AnyEvent::MP::Kernel;
36    
37     use common::sense;
38     use Carp ();
39    
40 root 1.79 use AnyEvent ();
41     use Guard ();
42 root 1.1
43     use AnyEvent::MP::Node;
44     use AnyEvent::MP::Transport;
45    
46     use base "Exporter";
47    
48 root 1.106 # for re-export in AnyEvent::MP and Coro::MP
49     our @EXPORT_API = qw(
50     NODE $NODE
51     configure
52     node_of port_is_local
53     snd kil
54     db_set db_del
55     db_mon db_family db_keys db_values
56     );
57    
58     our @EXPORT_OK = (
59     # these are internal
60     qw(
61     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
62     add_node load_func
63     ),
64     @EXPORT_API,
65 root 1.71 );
66    
67 root 1.1 our @EXPORT = qw(
68 root 1.106 snd_to_func snd_on eval_on
69     port_is_local
70 root 1.46 up_nodes mon_nodes node_is_up
71 root 1.1 );
72    
73 root 1.6 sub load_func($) {
74     my $func = $_[0];
75    
76     unless (defined &$func) {
77     my $pkg = $func;
78     do {
79     $pkg =~ s/::[^:]+$//
80 root 1.63 or return sub { die "unable to resolve function '$func'" };
81 root 1.60
82     local $@;
83 root 1.61 unless (eval "require $pkg; 1") {
84     my $error = $@;
85     $error =~ /^Can't locate .*.pm in \@INC \(/
86     or return sub { die $error };
87     }
88 root 1.6 } until defined &$func;
89     }
90    
91     \&$func
92     }
93    
94 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
95    
96 root 1.1 sub nonce($) {
97 root 1.78 join "", map chr rand 256, 1 .. $_[0]
98 root 1.1 }
99    
100 root 1.78 sub nonce62($) {
101     join "", map $alnum[rand 62], 1 .. $_[0]
102 root 1.1 }
103    
104 root 1.20 our $CONFIG; # this node's configuration
105 root 1.104 our $SECURE;
106 root 1.21
107 root 1.64 our $RUNIQ; # remote uniq value
108     our $UNIQ; # per-process/node unique cookie
109     our $NODE;
110     our $ID = "a";
111 root 1.1
112     our %NODE; # node id to transport mapping, or "undef", for local node
113     our (%PORT, %PORT_DATA); # local ports
114    
115 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
116 root 1.1 our %LMON; # monitored _local_ ports
117    
118 root 1.111 #our $GLOBAL; # true if node is a global ("directory") node
119 root 1.85 our %BINDS;
120     our $BINDS; # our listeners, as arrayref
121 root 1.1
122 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
123 root 1.111 our $GLOBAL; # true when this is a global node (only set by AnyEvent::MP::Global)
124 root 1.1
125 root 1.106 # initialise names for non-networked operation
126     {
127 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
128 root 1.106 my $now = AE::now;
129 root 1.112 $UNIQ =
130 root 1.106 (join "",
131     map $alnum[$_],
132     $$ / 62 % 62,
133     $$ % 62,
134     (int $now ) % 62,
135     (int $now * 100) % 62,
136     (int $now * 10000) % 62,
137     ) . nonce62 4
138     ;
139 root 1.78
140     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
141     $RUNIQ = nonce62 10;
142     $RUNIQ =~ s/(.)$/\U$1/;
143    
144 root 1.106 $NODE = "";
145 root 1.64 }
146    
147 root 1.1 sub NODE() {
148     $NODE
149     }
150    
151     sub node_of($) {
152 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
153 root 1.1
154 root 1.21 $node
155 root 1.1 }
156    
157 root 1.17 BEGIN {
158     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
159     ? sub () { 1 }
160     : sub () { 0 };
161     }
162 root 1.1
163 root 1.42 our $DELAY_TIMER;
164     our @DELAY_QUEUE;
165    
166 root 1.97 our $delay_run = sub {
167 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
168 root 1.97 };
169 root 1.42
170     sub delay($) {
171     push @DELAY_QUEUE, shift;
172 root 1.97 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
173 root 1.42 }
174    
175 root 1.96 =item $AnyEvent::MP::Kernel::SRCNODE
176    
177     During execution of a message callback, this variable contains the node ID
178     of the origin node.
179    
180     The main use of this variable is for debugging output - there are probably
181     very few other cases where you need to know the source node ID.
182    
183     =cut
184    
185 root 1.1 sub _inject {
186 root 1.96 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
187 root 1.1 &{ $PORT{+shift} or return };
188     }
189    
190 root 1.20 # this function adds a node-ref, so you can send stuff to it
191     # it is basically the central routing component.
192 root 1.1 sub add_node {
193 root 1.107 $NODE{$_[0]} || do {
194     my ($node) = @_;
195 root 1.1
196 root 1.107 length $node
197     or Carp::croak "'undef' or the empty string are not valid node/port IDs";
198 root 1.93
199 root 1.107 # registers itself in %NODE
200     new AnyEvent::MP::Node::Remote $node
201     }
202 root 1.13 }
203    
204 root 1.1 sub snd(@) {
205 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
206 root 1.1
207 root 1.86 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
208 root 1.1
209 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
210 root 1.2 ->{send} (["$portid", @_]);
211 root 1.1 }
212    
213 root 1.17 sub port_is_local($) {
214 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
215 root 1.17
216 root 1.106 $nodeid eq $NODE
217 root 1.17 }
218    
219 root 1.18 =item snd_to_func $node, $func, @args
220 root 1.11
221 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
222 root 1.11 this function with the given arguments on that node.
223    
224 root 1.20 This function can be used to implement C<spawn>-like interfaces.
225 root 1.11
226     =cut
227    
228 root 1.18 sub snd_to_func($$;@) {
229 root 1.21 my $nodeid = shift;
230 root 1.11
231 root 1.41 # on $NODE, we artificially delay... (for spawn)
232     # this is very ugly - maybe we should simply delay ALL messages,
233     # to avoid deep recursion issues. but that's so... slow...
234 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
235     if $nodeid ne $NODE;
236    
237 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
238 root 1.11 }
239    
240 root 1.18 =item snd_on $node, @msg
241    
242     Executes C<snd> with the given C<@msg> (which must include the destination
243     port) on the given node.
244    
245     =cut
246    
247     sub snd_on($@) {
248     my $node = shift;
249     snd $node, snd => @_;
250     }
251    
252 root 1.29 =item eval_on $node, $string[, @reply]
253 root 1.18
254 root 1.29 Evaluates the given string as Perl expression on the given node. When
255     @reply is specified, then it is used to construct a reply message with
256     C<"$@"> and any results from the eval appended.
257 root 1.18
258     =cut
259    
260 root 1.29 sub eval_on($$;@) {
261 root 1.18 my $node = shift;
262     snd $node, eval => @_;
263     }
264    
265 root 1.1 sub kil(@) {
266 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
267 root 1.1
268     length $portid
269 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
270 root 1.1
271 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
272 root 1.1 ->kill ("$portid", @_);
273     }
274    
275     #############################################################################
276 root 1.6 # node monitoring and info
277 root 1.3
278 root 1.106 =item $bool = node_is_up $nodeid
279 root 1.13
280     Returns true if the given node is "up", that is, the kernel thinks it has
281     a working connection to it.
282    
283 root 1.107 More precisely, if the node is up, returns C<1>. If the node is currently
284     connecting or otherwise known but not connected, returns C<0>. If nothing
285     is known about the node, returns C<undef>.
286 root 1.13
287     =cut
288    
289     sub node_is_up($) {
290 root 1.107 ($_[0] eq $NODE) || ($NODE{$_[0]} or return)->{transport}
291 root 1.13 ? 1 : 0
292     }
293    
294 root 1.106 =item @nodes = up_nodes
295 root 1.3
296 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
297     the node itself).
298 root 1.3
299     =cut
300    
301 root 1.49 sub up_nodes() {
302 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
303 root 1.3 }
304    
305 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
306 root 1.3
307 root 1.27 Registers a callback that is called each time a node goes up (a connection
308     is established) or down (the connection is lost).
309 root 1.3
310     Node up messages can only be followed by node down messages for the same
311     node, and vice versa.
312    
313 root 1.71 Note that monitoring a node is usually better done by monitoring its node
314 root 1.27 port. This function is mainly of interest to modules that are concerned
315     about the network topology and low-level connection handling.
316    
317     Callbacks I<must not> block and I<should not> send any messages.
318    
319     The function returns an optional guard which can be used to unregister
320 root 1.3 the monitoring callback again.
321    
322 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
323     or go up (and down).
324    
325     newnode $_, 1 for up_nodes;
326     mon_nodes \&newnode;
327    
328 root 1.3 =cut
329    
330     our %MON_NODES;
331    
332     sub mon_nodes($) {
333     my ($cb) = @_;
334    
335     $MON_NODES{$cb+0} = $cb;
336    
337 root 1.90 defined wantarray
338     and Guard::guard { delete $MON_NODES{$cb+0} }
339 root 1.3 }
340    
341     sub _inject_nodeevent($$;@) {
342 root 1.16 my ($node, $up, @reason) = @_;
343 root 1.3
344 root 1.107 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
345    
346 root 1.3 for my $cb (values %MON_NODES) {
347 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
348 root 1.98 or AE::log die => $@;
349 root 1.3 }
350     }
351    
352     #############################################################################
353 root 1.1 # self node code
354    
355 root 1.67 sub _kill {
356     my $port = shift;
357    
358     delete $PORT{$port}
359     or return; # killing nonexistent ports is O.K.
360     delete $PORT_DATA{$port};
361    
362     my $mon = delete $LMON{$port}
363     or !@_
364 root 1.98 or AE::log die => "unmonitored local port $port died with reason: @_";
365 root 1.67
366     $_->(@_) for values %$mon;
367     }
368    
369     sub _monitor {
370     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
371     unless exists $PORT{$_[1]};
372    
373     $LMON{$_[1]}{$_[2]+0} = $_[2];
374     }
375    
376     sub _unmonitor {
377 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
378     if exists $LMON{$_[1]};
379 root 1.67 }
380    
381 root 1.83 sub _secure_check {
382 root 1.104 $SECURE
383 root 1.105 and die "remote execution not allowed\n";
384 root 1.83 }
385    
386 root 1.107 our %NODE_REQ;
387    
388     %NODE_REQ = (
389     # "mproto" - monitoring protocol
390 root 1.1
391     # monitoring
392 root 1.65 mon0 => sub { # stop monitoring a port for another node
393 root 1.1 my $portid = shift;
394 root 1.96 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
395 root 1.1 },
396 root 1.65 mon1 => sub { # start monitoring a port for another node
397 root 1.1 my $portid = shift;
398 root 1.96 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
399 root 1.67 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
400 root 1.58 delete $node->{rmon}{$portid};
401 root 1.65 $node->send (["", kil0 => $portid, @_])
402 root 1.59 if $node && $node->{transport};
403 root 1.67 };
404 root 1.1 },
405 root 1.65 # another node has killed a monitored port
406     kil0 => sub {
407 root 1.96 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
408 root 1.1 or return;
409    
410     $_->(@_) for @$cbs;
411     },
412 root 1.107 # another node wants to kill a local port
413     kil1 => \&_kill,
414 root 1.1
415 root 1.18 # "public" services - not actually public
416 root 1.1
417     # relay message to another node / generic echo
418 root 1.88 snd => sub {
419     &snd
420 root 1.1 },
421 root 1.107 # ask if a node supports the given request, only works for fixed tags
422     can => sub {
423     my $method = shift;
424     snd @_, exists $NODE_REQ{$method};
425     },
426 root 1.1
427 root 1.30 # random utilities
428 root 1.1 eval => sub {
429 root 1.83 &_secure_check;
430 root 1.50 my @res = do { package main; eval shift };
431 root 1.1 snd @_, "$@", @res if @_;
432     },
433     time => sub {
434 root 1.76 snd @_, AE::now;
435 root 1.1 },
436     devnull => sub {
437     #
438     },
439 root 1.15 "" => sub {
440 root 1.27 # empty messages are keepalives or similar devnull-applications
441 root 1.15 },
442 root 1.1 );
443    
444 root 1.104 # the node port
445 root 1.107 new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE
446    
447 root 1.1 $PORT{""} = sub {
448     my $tag = shift;
449 root 1.83 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
450 root 1.98 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
451 root 1.1 };
452    
453 root 1.107 our $MPROTO = 1;
454 root 1.84
455     # tell everybody who connects our nproto
456     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
457 root 1.107 $_[0]{local_greeting}{mproto} = $MPROTO;
458 root 1.84 };
459    
460 root 1.69 #############################################################################
461 root 1.71 # seed management, try to keep connections to all seeds at all times
462 root 1.69
463 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
464     our %NODE_SEED; # map node ID to seed ID
465 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
466     our $SEED_WATCHER;
467 root 1.71 our $SEED_RETRY;
468 root 1.111 our %GLOBAL_NODE; # global => undef
469 root 1.69
470     sub seed_connect {
471     my ($seed) = @_;
472    
473     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
474     or Carp::croak "$seed: unparsable seed address";
475    
476 root 1.98 AE::log 9 => "trying connect to seed node $seed.";
477 root 1.69
478 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
479     $host, $port,
480     on_greeted => sub {
481     # called after receiving remote greeting, learn remote node name
482    
483     # we rely on untrusted data here (the remote node name) this is
484     # hopefully ok, as this can at most be used for DOSing, which is easy
485     # when you can do MITM anyway.
486    
487     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
488     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
489 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
490 root 1.71 delete $SEED_NODE{$seed};
491     } else {
492     $SEED_NODE{$seed} = $_[0]{remote_node};
493     $NODE_SEED{$_[0]{remote_node}} = $seed;
494 root 1.111
495     # also start global service, in case it isn't running
496     # since we probably switch conenctions, maybe we don't need to do this here?
497     snd $_[0]{remote_node}, "g_slave";
498 root 1.71 }
499     },
500 root 1.93 sub {
501 root 1.71 delete $SEED_CONNECT{$seed};
502     }
503 root 1.69 ;
504     }
505    
506     sub seed_all {
507 root 1.93 my @seeds = grep
508 root 1.113 !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
509 root 1.93 keys %SEED_NODE;
510 root 1.69
511     if (@seeds) {
512 root 1.70 # start connection attempt for every seed we are not connected to yet
513 root 1.69 seed_connect $_
514 root 1.113 for grep !exists $SEED_CONNECT{$_}, @seeds;
515 root 1.71
516 root 1.111 $SEED_RETRY = $SEED_RETRY * 2;
517 root 1.71 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
518     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
519    
520     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
521    
522 root 1.69 } else {
523 root 1.71 # all seeds connected or connecting, no need to restart timer
524 root 1.69 undef $SEED_WATCHER;
525     }
526     }
527    
528     sub seed_again {
529 root 1.111 $SEED_RETRY = (1 + rand) * 0.6;
530 root 1.113 $SEED_WATCHER ||= AE::timer 0, 0, \&seed_all;
531 root 1.69 }
532    
533     # sets new seed list, starts connecting
534     sub set_seeds(@) {
535     %SEED_NODE = ();
536 root 1.71 %NODE_SEED = ();
537     %SEED_CONNECT = ();
538    
539 root 1.69 @SEED_NODE{@_} = ();
540    
541 root 1.114 seed_again;
542 root 1.71 }
543    
544 root 1.111 # normal nodes only record global node connections
545     $NODE_REQ{g_global} = sub {
546     undef $GLOBAL_NODE{$SRCNODE};
547     };
548    
549 root 1.71 mon_nodes sub {
550 root 1.111 delete $GLOBAL_NODE{$_[0]}
551     unless $_[1];
552    
553 root 1.93 return unless exists $NODE_SEED{$_[0]};
554    
555     if ($_[1]) {
556     # each time a connection to a seed node goes up, make
557     # sure it runs the global service.
558 root 1.111 snd $_[0], "g_slave";
559 root 1.93 } else {
560     # if we lost the connection to a seed node, make sure we are seeding
561     seed_again;
562     }
563 root 1.71 };
564    
565     #############################################################################
566 root 1.107 # keepalive code - used to kepe conenctions to certain nodes alive
567     # only used by global code atm., but ought to be exposed somehow.
568 root 1.109 #TODO: should probbaly be done directly by node objects
569 root 1.107
570     our $KEEPALIVE_RETRY;
571     our $KEEPALIVE_WATCHER;
572     our %KEEPALIVE; # we want to keep these nodes alive
573     our %KEEPALIVE_DOWN; # nodes that are down currently
574    
575     sub keepalive_all {
576     AE::log 9 => "keepalive: trying to establish connections with: "
577     . (join " ", keys %KEEPALIVE_DOWN)
578     . ".";
579    
580     (add_node $_)->connect
581     for keys %KEEPALIVE_DOWN;
582    
583 root 1.111 $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2;
584 root 1.107 $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
585     if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
586    
587     $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
588     }
589    
590     sub keepalive_again {
591 root 1.111 $KEEPALIVE_RETRY = (1 + rand) * 0.3;
592 root 1.108 keepalive_all;
593 root 1.107 }
594    
595     sub keepalive_add {
596     return if $KEEPALIVE{$_[0]}++;
597    
598     return if node_is_up $_[0];
599     undef $KEEPALIVE_DOWN{$_[0]};
600     keepalive_again;
601     }
602    
603     sub keepalive_del {
604     return if --$KEEPALIVE{$_[0]};
605    
606     delete $KEEPALIVE {$_[0]};
607     delete $KEEPALIVE_DOWN{$_[0]};
608    
609     undef $KEEPALIVE_WATCHER
610     unless %KEEPALIVE_DOWN;
611     }
612    
613     mon_nodes sub {
614     return unless exists $KEEPALIVE{$_[0]};
615    
616     if ($_[1]) {
617     delete $KEEPALIVE_DOWN{$_[0]};
618    
619     undef $KEEPALIVE_WATCHER
620     unless %KEEPALIVE_DOWN;
621     } else {
622     # lost the conenction, try to connect again
623     undef $KEEPALIVE_DOWN{$_[0]};
624     keepalive_again;
625     }
626     };
627    
628     #############################################################################
629 root 1.71 # talk with/to global nodes
630    
631 root 1.72 # protocol messages:
632     #
633 root 1.110 # sent by global nodes
634 root 1.111 # g_global - global nodes send this to all others
635 root 1.72 #
636 root 1.110 # database protocol
637     # g_slave database - make other global node master of the sender
638     # g_set database - global node's database to other global nodes
639 root 1.111 # g_upd family set del - update single family (any to global)
640 root 1.73 #
641 root 1.110 # slave <-> global protocol
642     # g_find node - query addresses for node (slave to global)
643     # g_found node binds - node addresses (global to slave)
644     # g_db_family family id - send g_reply with data (global to slave)
645     # g_db_keys family id - send g_reply with data (global to slave)
646     # g_db_values family id - send g_reply with data (global to slave)
647     # g_reply id result - result of any query (global to slave)
648     # g_mon1 family - start to monitor family, replies with g_chg1
649     # g_mon0 family - stop monitoring family
650     # g_chg1 family hash - initial value of family when starting to monitor
651     # g_chg2 family set del - like g_upd, but for monitoring only
652 root 1.73 #
653 root 1.111 # internal database families:
654 root 1.73 # "'l" -> node -> listeners
655     # "'g" -> node -> undef
656     # ...
657 root 1.72 #
658    
659 root 1.73 # used on all nodes:
660 root 1.88 our $MASTER; # the global node we bind ourselves to
661 root 1.78 our $MASTER_MON;
662     our %LOCAL_DB; # this node database
663    
664 root 1.84 our $GPROTO = 1;
665    
666 root 1.116 # tell everybody who connects our gproto
667 root 1.84 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
668     $_[0]{local_greeting}{gproto} = $GPROTO;
669     };
670    
671 root 1.73 #############################################################################
672     # master selection
673    
674     # master requests
675     our %GLOBAL_REQ; # $id => \@req
676 root 1.71
677 root 1.74 sub global_req_add {
678 root 1.80 my ($id, $req) = @_;
679 root 1.71
680 root 1.74 return if exists $GLOBAL_REQ{$id};
681    
682 root 1.80 $GLOBAL_REQ{$id} = $req;
683 root 1.71
684 root 1.80 snd $MASTER, @$req
685 root 1.73 if $MASTER;
686 root 1.74 }
687 root 1.71
688 root 1.74 sub global_req_del {
689     delete $GLOBAL_REQ{$_[0]};
690     }
691    
692 root 1.88 #################################
693     # master rpc
694    
695     our %GLOBAL_RES;
696     our $GLOBAL_RES_ID = "a";
697    
698     sub global_call {
699     my $id = ++$GLOBAL_RES_ID;
700     $GLOBAL_RES{$id} = pop;
701     global_req_add $id, [@_, $id];
702     }
703    
704     $NODE_REQ{g_reply} = sub {
705     my $id = shift;
706     global_req_del $id;
707     my $cb = delete $GLOBAL_RES{$id}
708     or return;
709     &$cb
710     };
711    
712     #################################
713    
714 root 1.74 sub g_find {
715 root 1.80 global_req_add "g_find $_[0]", [g_find => $_[0]];
716 root 1.73 }
717 root 1.71
718 root 1.73 # reply for g_find started in Node.pm
719 root 1.79 $NODE_REQ{g_found} = sub {
720 root 1.74 global_req_del "g_find $_[0]";
721    
722 root 1.73 my $node = $NODE{$_[0]} or return;
723 root 1.71
724 root 1.79 $node->connect_to ($_[1]);
725 root 1.71 };
726    
727 root 1.73 sub master_set {
728     $MASTER = $_[0];
729 root 1.111 AE::log 8 => "new master node: $MASTER.";
730    
731     $MASTER_MON = mon_nodes sub {
732     if ($_[0] eq $MASTER && !$_[1]) {
733     undef $MASTER;
734     master_search ();
735     }
736     };
737 root 1.71
738 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
739 root 1.71
740 root 1.73 # (re-)send queued requests
741     snd $MASTER, @$_
742     for values %GLOBAL_REQ;
743     }
744 root 1.71
745 root 1.72 sub master_search {
746 root 1.111 AE::log 9 => "starting search for master node.";
747    
748 root 1.115 #TODO: should also look for other global nodes, but we don't know them
749 root 1.73 for (keys %NODE_SEED) {
750 root 1.72 if (node_is_up $_) {
751     master_set $_;
752     return;
753 root 1.71 }
754 root 1.72 }
755 root 1.71
756 root 1.78 $MASTER_MON = mon_nodes sub {
757 root 1.72 return unless $_[1]; # we are only interested in node-ups
758     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
759 root 1.71
760 root 1.72 master_set $_[0];
761     };
762 root 1.71 }
763    
764 root 1.110 # other node wants to make us the master, so start the global service
765 root 1.79 $NODE_REQ{g_slave} = sub {
766 root 1.80 # load global module and redo the request
767 root 1.73 require AnyEvent::MP::Global;
768 root 1.80 &{ $NODE_REQ{g_slave} }
769 root 1.71 };
770    
771 root 1.73 #############################################################################
772 root 1.79 # local database operations
773 root 1.71
774 root 1.111 # canonical probably not needed
775     our $sv_eq_coder = JSON::XS->new->utf8->allow_nonref;
776    
777     # are the two scalars equal? very very ugly and slow, need better way
778     sub sv_eq($$) {
779     ref $_[0] || ref $_[1]
780     ? (JSON::XS::encode $sv_eq_coder, $_[0]) eq (JSON::XS::encode $sv_eq_coder, $_[1])
781     : $_[0] eq $_[1]
782     && defined $_[0] == defined $_[1]
783     }
784    
785 root 1.79 # local database management
786 root 1.88
787 root 1.111 sub db_del($@) {
788     my $family = shift;
789    
790     my @del = grep exists $LOCAL_DB{$family}{$_}, @_;
791    
792     return unless @del;
793    
794     delete @{ $LOCAL_DB{$family} }{@del};
795     snd $MASTER, g_upd => $family => undef, \@del
796     if defined $MASTER;
797     }
798    
799 root 1.87 sub db_set($$;$) {
800 root 1.97 my ($family, $subkey) = @_;
801    
802 root 1.89 # if (ref $_[1]) {
803     # # bulk
804     # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
805     # $LOCAL_DB{$_[0]} = $_[1];
806     # snd $MASTER, g_upd => $_[0] => $_[1], \@del
807     # if defined $MASTER;
808     # } else {
809     # single-key
810 root 1.111 unless (exists $LOCAL_DB{$family}{$subkey} && sv_eq $LOCAL_DB{$family}{$subkey}, $_[2]) {
811     $LOCAL_DB{$family}{$subkey} = $_[2];
812     snd $MASTER, g_upd => $family => { $subkey => $_[2] }
813     if defined $MASTER;
814     }
815 root 1.89 # }
816 root 1.97
817     defined wantarray
818     and Guard::guard { db_del $family => $subkey }
819 root 1.79 }
820    
821 root 1.88 # database query
822    
823     sub db_family {
824     my ($family, $cb) = @_;
825     global_call g_db_family => $family, $cb;
826     }
827    
828     sub db_keys {
829     my ($family, $cb) = @_;
830     global_call g_db_keys => $family, $cb;
831     }
832    
833     sub db_values {
834     my ($family, $cb) = @_;
835     global_call g_db_values => $family, $cb;
836 root 1.80 }
837    
838 root 1.88 # database monitoring
839 root 1.80
840 root 1.81 our %LOCAL_MON; # f, reply
841     our %MON_DB; # f, k, value
842 root 1.80
843 root 1.81 sub db_mon($@) {
844 root 1.84 my ($family, $cb) = @_;
845 root 1.81
846 root 1.84 if (my $db = $MON_DB{$family}) {
847 root 1.89 # we already monitor, so create a "dummy" change event
848     # this is postponed, which might be too late (we could process
849     # change events), so disable the callback at first
850     $LOCAL_MON{$family}{$cb+0} = sub { };
851     AE::postpone {
852     return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
853    
854     # set actual callback
855     $LOCAL_MON{$family}{$cb+0} = $cb;
856     $cb->($db, [keys %$db]);
857     };
858 root 1.81 } else {
859     # new monitor, request chg1 from upstream
860 root 1.89 $LOCAL_MON{$family}{$cb+0} = $cb;
861 root 1.81 global_req_add "mon1 $family" => [g_mon1 => $family];
862     $MON_DB{$family} = {};
863     }
864    
865 root 1.90 defined wantarray
866     and Guard::guard {
867     my $mon = $LOCAL_MON{$family};
868     delete $mon->{$cb+0};
869    
870     unless (%$mon) {
871     global_req_del "mon1 $family";
872    
873     # no global_req, because we don't care if we are not connected
874     snd $MASTER, g_mon0 => $family
875     if $MASTER;
876 root 1.80
877 root 1.90 delete $LOCAL_MON{$family};
878     delete $MON_DB{$family};
879     }
880 root 1.80 }
881     }
882    
883 root 1.82 # full update
884 root 1.80 $NODE_REQ{g_chg1} = sub {
885 root 1.96 return unless $SRCNODE eq $MASTER;
886 root 1.84 my ($f, $ndb) = @_;
887    
888     my $db = $MON_DB{$f};
889 root 1.89 my (@a, @c, @d);
890 root 1.81
891 root 1.82 # add or replace keys
892 root 1.84 while (my ($k, $v) = each %$ndb) {
893 root 1.89 exists $db->{$k}
894     ? push @c, $k
895     : push @a, $k;
896 root 1.84 $db->{$k} = $v;
897 root 1.82 }
898 root 1.81
899 root 1.82 # delete keys that are no longer present
900 root 1.84 for (grep !exists $ndb->{$_}, keys %$db) {
901     delete $db->{$_};
902 root 1.89 push @d, $_;
903 root 1.81 }
904 root 1.84
905 root 1.89 $_->($db, \@a, \@c, \@d)
906 root 1.84 for values %{ $LOCAL_MON{$_[0]} };
907 root 1.80 };
908    
909 root 1.82 # incremental update
910 root 1.84 $NODE_REQ{g_chg2} = sub {
911 root 1.96 return unless $SRCNODE eq $MASTER;
912 root 1.89 my ($family, $set, $del) = @_;
913    
914     my $db = $MON_DB{$family};
915 root 1.84
916 root 1.89 my (@a, @c);
917 root 1.84
918 root 1.89 while (my ($k, $v) = each %$set) {
919     exists $db->{$k}
920     ? push @c, $k
921     : push @a, $k;
922     $db->{$k} = $v;
923     }
924    
925     delete @$db{@$del};
926    
927     $_->($db, \@a, \@c, $del)
928     for values %{ $LOCAL_MON{$family} };
929 root 1.84 };
930 root 1.80
931 root 1.69 #############################################################################
932     # configure
933    
934 root 1.81 sub nodename {
935 root 1.69 require POSIX;
936     (POSIX::uname ())[1]
937     }
938    
939     sub _resolve($) {
940     my ($nodeid) = @_;
941    
942     my $cv = AE::cv;
943     my @res;
944    
945     $cv->begin (sub {
946     my %seen;
947     my @refs;
948     for (sort { $a->[0] <=> $b->[0] } @res) {
949     push @refs, $_->[1] unless $seen{$_->[1]}++
950     }
951     shift->send (@refs);
952     });
953    
954     my $idx;
955     for my $t (split /,/, $nodeid) {
956     my $pri = ++$idx;
957    
958 root 1.81 $t = length $t ? nodename . ":$t" : nodename
959 root 1.69 if $t =~ /^\d*$/;
960    
961     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
962     or Carp::croak "$t: unparsable transport descriptor";
963    
964     $port = "0" if $port eq "*";
965    
966     if ($host eq "*") {
967     $cv->begin;
968    
969 root 1.103 my $get_addr = sub {
970     my @addr;
971    
972     require Net::Interface;
973    
974     # Net::Interface hangs on some systems, so hope for the best
975     local $SIG{ALRM} = 'DEFAULT';
976     alarm 2;
977    
978     for my $if (Net::Interface->interfaces) {
979     # we statically lower-prioritise ipv6 here, TODO :()
980     for $_ ($if->address (Net::Interface::AF_INET ())) {
981     next if /^\x7f/; # skip localhost etc.
982     push @addr, $_;
983 root 1.69 }
984 root 1.103 for ($if->address (Net::Interface::AF_INET6 ())) {
985     #next if $if->scope ($_) <= 2;
986     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
987     push @addr, $_;
988 root 1.69 }
989     }
990 root 1.103
991     alarm 0;
992    
993     @addr
994     };
995    
996     my @addr;
997    
998     if (AnyEvent::WIN32) {
999     @addr = $get_addr->();
1000     } else {
1001     # use a child process, as Net::Interface is big, and we need it only once.
1002    
1003     pipe my $r, my $w
1004     or die "pipe: $!";
1005    
1006     if (fork eq 0) {
1007     close $r;
1008     syswrite $w, pack "(C/a*)*", $get_addr->();
1009     require POSIX;
1010     POSIX::_exit (0);
1011     } else {
1012     close $w;
1013    
1014     my $addr;
1015    
1016     1 while sysread $r, $addr, 1024, length $addr;
1017    
1018     @addr = unpack "(C/a*)*", $addr;
1019     }
1020     }
1021    
1022     for my $ip (@addr) {
1023     push @res, [
1024     $pri += 1e-5,
1025     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
1026     ];
1027     }
1028     $cv->end;
1029 root 1.69 } else {
1030     $cv->begin;
1031     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
1032     for (@_) {
1033     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
1034     push @res, [
1035     $pri += 1e-5,
1036     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
1037     ];
1038     }
1039     $cv->end;
1040     };
1041     }
1042     }
1043    
1044     $cv->end;
1045    
1046     $cv
1047     }
1048    
1049 root 1.112 our @POST_CONFIGURE;
1050    
1051     # not yet documented
1052     sub post_configure(&) {
1053     die "AnyEvent::MP::Kernel::post_configure must be called in void context" if defined wantarray;
1054    
1055     push @POST_CONFIGURE, @_;
1056     (shift @POST_CONFIGURE)->() while $NODE && @POST_CONFIGURE;
1057     }
1058    
1059 root 1.69 sub configure(@) {
1060     unshift @_, "profile" if @_ & 1;
1061     my (%kv) = @_;
1062    
1063     my $profile = delete $kv{profile};
1064    
1065 root 1.81 $profile = nodename
1066 root 1.69 unless defined $profile;
1067    
1068     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
1069    
1070 root 1.104 $SECURE = $CONFIG->{secure};
1071 root 1.83
1072 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
1073 root 1.69
1074     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
1075    
1076 root 1.106 my $node_obj = delete $NODE{$NODE}; # we do not support doing stuff before configure
1077    
1078 root 1.72 $NODE = $node;
1079 root 1.77
1080 root 1.81 $NODE =~ s/%n/nodename/ge;
1081    
1082     if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
1083 root 1.77 # nodes with randomised node names do not need randomised port names
1084     $UNIQ = "";
1085     }
1086 root 1.69
1087 root 1.106 $node_obj->{id} = $NODE;
1088     $NODE{$NODE} = $node_obj;
1089 root 1.69
1090     my $seeds = $CONFIG->{seeds};
1091     my $binds = $CONFIG->{binds};
1092    
1093     $binds ||= ["*"];
1094    
1095 root 1.98 AE::log 8 => "node $NODE starting up.";
1096 root 1.69
1097 root 1.85 $BINDS = [];
1098     %BINDS = ();
1099 root 1.69
1100     for (map _resolve $_, @$binds) {
1101     for my $bind ($_->recv) {
1102     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1103     or Carp::croak "$bind: unparsable local bind address";
1104    
1105     my $listener = AnyEvent::MP::Transport::mp_server
1106     $host,
1107     $port,
1108     prepare => sub {
1109     my (undef, $host, $port) = @_;
1110     $bind = AnyEvent::Socket::format_hostport $host, $port;
1111     0
1112     },
1113     ;
1114 root 1.85 $BINDS{$bind} = $listener;
1115     push @$BINDS, $bind;
1116 root 1.69 }
1117     }
1118    
1119 root 1.112 AE::log 9 => "running post config hooks and init.";
1120    
1121     # might initialise Global, so need to do it before db_set
1122     post_configure { };
1123    
1124 root 1.85 db_set "'l" => $NODE => $BINDS;
1125 root 1.73
1126 root 1.98 AE::log 8 => "node listens on [@$BINDS].";
1127 root 1.69
1128     # connect to all seednodes
1129     set_seeds map $_->recv, map _resolve $_, @$seeds;
1130 root 1.73 master_search;
1131    
1132 root 1.103 # save gobs of memory
1133     undef &_resolve;
1134     *configure = sub (@){ };
1135    
1136 root 1.112 AE::log 9 => "starting services.";
1137    
1138 root 1.69 for (@{ $CONFIG->{services} }) {
1139     if (ref) {
1140     my ($func, @args) = @$_;
1141     (load_func $func)->(@args);
1142     } elsif (s/::$//) {
1143     eval "require $_";
1144     die $@ if $@;
1145     } else {
1146     (load_func $_)->();
1147     }
1148     }
1149 root 1.102
1150     eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
1151     die "$@" if $@;
1152 root 1.69 }
1153    
1154 root 1.1 =back
1155    
1156 root 1.101 =head1 LOGGING
1157    
1158     AnyEvent::MP::Kernel logs high-level information about the current node,
1159     when nodes go up and down, and most runtime errors. It also logs some
1160     debugging and trace messages about network maintainance, such as seed
1161     connections and global node management.
1162    
1163 root 1.1 =head1 SEE ALSO
1164    
1165     L<AnyEvent::MP>.
1166    
1167     =head1 AUTHOR
1168    
1169     Marc Lehmann <schmorp@schmorp.de>
1170     http://home.schmorp.de/
1171    
1172     =cut
1173    
1174     1
1175