ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.111
Committed: Sun Mar 25 20:50:49 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.110: +62 -43 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9 root 1.106 $AnyEvent::MP::Kernel::SRCNODE # contains msg origin node id, for debugging
10    
11     snd_to_func $node, $func, @args # send msg to function
12     snd_on $node, @msg # snd message again (relay)
13     eval_on $node, $string[, @reply] # execute perl code on another node
14    
15     node_is_up $nodeid # return true if a node is connected
16     @nodes = up_nodes # return a list of all connected nodes
17     $guard = mon_nodes $callback->($node, $is_up, @reason) # connections up/downs
18    
19 root 1.1 =head1 DESCRIPTION
20    
21 root 1.106 This module implements most of the inner workings of AnyEvent::MP. It
22     offers mostly lower-level functions that deal with network connectivity
23     and special requests.
24    
25     You normally interface with AnyEvent::MP through a higher level interface
26     such as L<AnyEvent::MP> and L<Coro::MP>, although there is nothing wrong
27     with using the functions from this module.
28 root 1.3
29     =head1 GLOBALS AND FUNCTIONS
30 root 1.1
31     =over 4
32    
33     =cut
34    
35     package AnyEvent::MP::Kernel;
36    
37     use common::sense;
38     use Carp ();
39    
40 root 1.79 use AnyEvent ();
41     use Guard ();
42 root 1.1
43     use AnyEvent::MP::Node;
44     use AnyEvent::MP::Transport;
45    
46     use base "Exporter";
47    
48 root 1.106 # for re-export in AnyEvent::MP and Coro::MP
49     our @EXPORT_API = qw(
50     NODE $NODE
51     configure
52     node_of port_is_local
53     snd kil
54     db_set db_del
55     db_mon db_family db_keys db_values
56     );
57    
58     our @EXPORT_OK = (
59     # these are internal
60     qw(
61     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
62     add_node load_func
63     ),
64     @EXPORT_API,
65 root 1.71 );
66    
67 root 1.1 our @EXPORT = qw(
68 root 1.106 snd_to_func snd_on eval_on
69     port_is_local
70 root 1.46 up_nodes mon_nodes node_is_up
71 root 1.1 );
72    
73 root 1.6 sub load_func($) {
74     my $func = $_[0];
75    
76     unless (defined &$func) {
77     my $pkg = $func;
78     do {
79     $pkg =~ s/::[^:]+$//
80 root 1.63 or return sub { die "unable to resolve function '$func'" };
81 root 1.60
82     local $@;
83 root 1.61 unless (eval "require $pkg; 1") {
84     my $error = $@;
85     $error =~ /^Can't locate .*.pm in \@INC \(/
86     or return sub { die $error };
87     }
88 root 1.6 } until defined &$func;
89     }
90    
91     \&$func
92     }
93    
94 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
95    
96 root 1.1 sub nonce($) {
97 root 1.78 join "", map chr rand 256, 1 .. $_[0]
98 root 1.1 }
99    
100 root 1.78 sub nonce62($) {
101     join "", map $alnum[rand 62], 1 .. $_[0]
102 root 1.1 }
103    
104 root 1.20 our $CONFIG; # this node's configuration
105 root 1.104 our $SECURE;
106 root 1.21
107 root 1.64 our $RUNIQ; # remote uniq value
108     our $UNIQ; # per-process/node unique cookie
109     our $NODE;
110     our $ID = "a";
111 root 1.1
112     our %NODE; # node id to transport mapping, or "undef", for local node
113     our (%PORT, %PORT_DATA); # local ports
114    
115 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
116 root 1.1 our %LMON; # monitored _local_ ports
117    
118 root 1.111 #our $GLOBAL; # true if node is a global ("directory") node
119 root 1.85 our %BINDS;
120     our $BINDS; # our listeners, as arrayref
121 root 1.1
122 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
123 root 1.111 our $GLOBAL; # true when this is a global node (only set by AnyEvent::MP::Global)
124 root 1.1
125 root 1.106 # initialise names for non-networked operation
126     {
127 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
128 root 1.106 my $now = AE::now;
129     $UNIQ =
130     (join "",
131     map $alnum[$_],
132     $$ / 62 % 62,
133     $$ % 62,
134     (int $now ) % 62,
135     (int $now * 100) % 62,
136     (int $now * 10000) % 62,
137     ) . nonce62 4
138     ;
139 root 1.78
140     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
141     $RUNIQ = nonce62 10;
142     $RUNIQ =~ s/(.)$/\U$1/;
143    
144 root 1.106 $NODE = "";
145 root 1.64 }
146    
147 root 1.1 sub NODE() {
148     $NODE
149     }
150    
151     sub node_of($) {
152 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
153 root 1.1
154 root 1.21 $node
155 root 1.1 }
156    
157 root 1.17 BEGIN {
158     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
159     ? sub () { 1 }
160     : sub () { 0 };
161     }
162 root 1.1
163 root 1.42 our $DELAY_TIMER;
164     our @DELAY_QUEUE;
165    
166 root 1.97 our $delay_run = sub {
167 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
168 root 1.97 };
169 root 1.42
170     sub delay($) {
171     push @DELAY_QUEUE, shift;
172 root 1.97 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
173 root 1.42 }
174    
175 root 1.96 =item $AnyEvent::MP::Kernel::SRCNODE
176    
177     During execution of a message callback, this variable contains the node ID
178     of the origin node.
179    
180     The main use of this variable is for debugging output - there are probably
181     very few other cases where you need to know the source node ID.
182    
183     =cut
184    
185 root 1.1 sub _inject {
186 root 1.96 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
187 root 1.1 &{ $PORT{+shift} or return };
188     }
189    
190 root 1.20 # this function adds a node-ref, so you can send stuff to it
191     # it is basically the central routing component.
192 root 1.1 sub add_node {
193 root 1.107 $NODE{$_[0]} || do {
194     my ($node) = @_;
195 root 1.1
196 root 1.107 length $node
197     or Carp::croak "'undef' or the empty string are not valid node/port IDs";
198 root 1.93
199 root 1.107 # registers itself in %NODE
200     new AnyEvent::MP::Node::Remote $node
201     }
202 root 1.13 }
203    
204 root 1.1 sub snd(@) {
205 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
206 root 1.1
207 root 1.86 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
208 root 1.1
209 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
210 root 1.2 ->{send} (["$portid", @_]);
211 root 1.1 }
212    
213 root 1.17 sub port_is_local($) {
214 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
215 root 1.17
216 root 1.106 $nodeid eq $NODE
217 root 1.17 }
218    
219 root 1.18 =item snd_to_func $node, $func, @args
220 root 1.11
221 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
222 root 1.11 this function with the given arguments on that node.
223    
224 root 1.20 This function can be used to implement C<spawn>-like interfaces.
225 root 1.11
226     =cut
227    
228 root 1.18 sub snd_to_func($$;@) {
229 root 1.21 my $nodeid = shift;
230 root 1.11
231 root 1.41 # on $NODE, we artificially delay... (for spawn)
232     # this is very ugly - maybe we should simply delay ALL messages,
233     # to avoid deep recursion issues. but that's so... slow...
234 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
235     if $nodeid ne $NODE;
236    
237 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
238 root 1.11 }
239    
240 root 1.18 =item snd_on $node, @msg
241    
242     Executes C<snd> with the given C<@msg> (which must include the destination
243     port) on the given node.
244    
245     =cut
246    
247     sub snd_on($@) {
248     my $node = shift;
249     snd $node, snd => @_;
250     }
251    
252 root 1.29 =item eval_on $node, $string[, @reply]
253 root 1.18
254 root 1.29 Evaluates the given string as Perl expression on the given node. When
255     @reply is specified, then it is used to construct a reply message with
256     C<"$@"> and any results from the eval appended.
257 root 1.18
258     =cut
259    
260 root 1.29 sub eval_on($$;@) {
261 root 1.18 my $node = shift;
262     snd $node, eval => @_;
263     }
264    
265 root 1.1 sub kil(@) {
266 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
267 root 1.1
268     length $portid
269 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
270 root 1.1
271 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
272 root 1.1 ->kill ("$portid", @_);
273     }
274    
275     #############################################################################
276 root 1.6 # node monitoring and info
277 root 1.3
278 root 1.106 =item $bool = node_is_up $nodeid
279 root 1.13
280     Returns true if the given node is "up", that is, the kernel thinks it has
281     a working connection to it.
282    
283 root 1.107 More precisely, if the node is up, returns C<1>. If the node is currently
284     connecting or otherwise known but not connected, returns C<0>. If nothing
285     is known about the node, returns C<undef>.
286 root 1.13
287     =cut
288    
289     sub node_is_up($) {
290 root 1.107 ($_[0] eq $NODE) || ($NODE{$_[0]} or return)->{transport}
291 root 1.13 ? 1 : 0
292     }
293    
294 root 1.106 =item @nodes = up_nodes
295 root 1.3
296 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
297     the node itself).
298 root 1.3
299     =cut
300    
301 root 1.49 sub up_nodes() {
302 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
303 root 1.3 }
304    
305 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
306 root 1.3
307 root 1.27 Registers a callback that is called each time a node goes up (a connection
308     is established) or down (the connection is lost).
309 root 1.3
310     Node up messages can only be followed by node down messages for the same
311     node, and vice versa.
312    
313 root 1.71 Note that monitoring a node is usually better done by monitoring its node
314 root 1.27 port. This function is mainly of interest to modules that are concerned
315     about the network topology and low-level connection handling.
316    
317     Callbacks I<must not> block and I<should not> send any messages.
318    
319     The function returns an optional guard which can be used to unregister
320 root 1.3 the monitoring callback again.
321    
322 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
323     or go up (and down).
324    
325     newnode $_, 1 for up_nodes;
326     mon_nodes \&newnode;
327    
328 root 1.3 =cut
329    
330     our %MON_NODES;
331    
332     sub mon_nodes($) {
333     my ($cb) = @_;
334    
335     $MON_NODES{$cb+0} = $cb;
336    
337 root 1.90 defined wantarray
338     and Guard::guard { delete $MON_NODES{$cb+0} }
339 root 1.3 }
340    
341     sub _inject_nodeevent($$;@) {
342 root 1.16 my ($node, $up, @reason) = @_;
343 root 1.3
344 root 1.107 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
345    
346 root 1.3 for my $cb (values %MON_NODES) {
347 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
348 root 1.98 or AE::log die => $@;
349 root 1.3 }
350     }
351    
352     #############################################################################
353 root 1.1 # self node code
354    
355 root 1.67 sub _kill {
356     my $port = shift;
357    
358     delete $PORT{$port}
359     or return; # killing nonexistent ports is O.K.
360     delete $PORT_DATA{$port};
361    
362     my $mon = delete $LMON{$port}
363     or !@_
364 root 1.98 or AE::log die => "unmonitored local port $port died with reason: @_";
365 root 1.67
366     $_->(@_) for values %$mon;
367     }
368    
369     sub _monitor {
370     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
371     unless exists $PORT{$_[1]};
372    
373     $LMON{$_[1]}{$_[2]+0} = $_[2];
374     }
375    
376     sub _unmonitor {
377 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
378     if exists $LMON{$_[1]};
379 root 1.67 }
380    
381 root 1.83 sub _secure_check {
382 root 1.104 $SECURE
383 root 1.105 and die "remote execution not allowed\n";
384 root 1.83 }
385    
386 root 1.107 our %NODE_REQ;
387    
388     %NODE_REQ = (
389     # "mproto" - monitoring protocol
390 root 1.1
391     # monitoring
392 root 1.65 mon0 => sub { # stop monitoring a port for another node
393 root 1.1 my $portid = shift;
394 root 1.96 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
395 root 1.1 },
396 root 1.65 mon1 => sub { # start monitoring a port for another node
397 root 1.1 my $portid = shift;
398 root 1.96 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
399 root 1.67 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
400 root 1.58 delete $node->{rmon}{$portid};
401 root 1.65 $node->send (["", kil0 => $portid, @_])
402 root 1.59 if $node && $node->{transport};
403 root 1.67 };
404 root 1.1 },
405 root 1.65 # another node has killed a monitored port
406     kil0 => sub {
407 root 1.96 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
408 root 1.1 or return;
409    
410     $_->(@_) for @$cbs;
411     },
412 root 1.107 # another node wants to kill a local port
413     kil1 => \&_kill,
414 root 1.1
415 root 1.18 # "public" services - not actually public
416 root 1.1
417     # relay message to another node / generic echo
418 root 1.88 snd => sub {
419     &snd
420 root 1.1 },
421 root 1.107 # ask if a node supports the given request, only works for fixed tags
422     can => sub {
423     my $method = shift;
424     snd @_, exists $NODE_REQ{$method};
425     },
426 root 1.1
427 root 1.30 # random utilities
428 root 1.1 eval => sub {
429 root 1.83 &_secure_check;
430 root 1.50 my @res = do { package main; eval shift };
431 root 1.1 snd @_, "$@", @res if @_;
432     },
433     time => sub {
434 root 1.76 snd @_, AE::now;
435 root 1.1 },
436     devnull => sub {
437     #
438     },
439 root 1.15 "" => sub {
440 root 1.27 # empty messages are keepalives or similar devnull-applications
441 root 1.15 },
442 root 1.1 );
443    
444 root 1.104 # the node port
445 root 1.107 new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE
446    
447 root 1.1 $PORT{""} = sub {
448     my $tag = shift;
449 root 1.83 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
450 root 1.98 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
451 root 1.1 };
452    
453 root 1.107 our $MPROTO = 1;
454 root 1.84
455     # tell everybody who connects our nproto
456     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
457 root 1.107 $_[0]{local_greeting}{mproto} = $MPROTO;
458 root 1.84 };
459    
460 root 1.69 #############################################################################
461 root 1.71 # seed management, try to keep connections to all seeds at all times
462 root 1.69
463 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
464     our %NODE_SEED; # map node ID to seed ID
465 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
466     our $SEED_WATCHER;
467 root 1.71 our $SEED_RETRY;
468 root 1.111 our %GLOBAL_NODE; # global => undef
469 root 1.69
470     sub seed_connect {
471     my ($seed) = @_;
472    
473     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
474     or Carp::croak "$seed: unparsable seed address";
475    
476 root 1.98 AE::log 9 => "trying connect to seed node $seed.";
477 root 1.69
478 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
479     $host, $port,
480     on_greeted => sub {
481     # called after receiving remote greeting, learn remote node name
482    
483     # we rely on untrusted data here (the remote node name) this is
484     # hopefully ok, as this can at most be used for DOSing, which is easy
485     # when you can do MITM anyway.
486    
487     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
488     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
489 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
490 root 1.71 delete $SEED_NODE{$seed};
491     } else {
492     $SEED_NODE{$seed} = $_[0]{remote_node};
493     $NODE_SEED{$_[0]{remote_node}} = $seed;
494 root 1.111
495     # also start global service, in case it isn't running
496     # since we probably switch conenctions, maybe we don't need to do this here?
497     snd $_[0]{remote_node}, "g_slave";
498 root 1.71 }
499     },
500 root 1.93 sub {
501 root 1.71 delete $SEED_CONNECT{$seed};
502     }
503 root 1.69 ;
504     }
505    
506     sub seed_all {
507 root 1.93 my @seeds = grep
508     !exists $SEED_CONNECT{$_}
509     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
510     keys %SEED_NODE;
511 root 1.69
512     if (@seeds) {
513 root 1.70 # start connection attempt for every seed we are not connected to yet
514 root 1.69 seed_connect $_
515     for @seeds;
516 root 1.71
517 root 1.111 $SEED_RETRY = $SEED_RETRY * 2;
518 root 1.71 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
519     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
520    
521     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
522    
523 root 1.69 } else {
524 root 1.71 # all seeds connected or connecting, no need to restart timer
525 root 1.69 undef $SEED_WATCHER;
526     }
527     }
528    
529     sub seed_again {
530 root 1.111 $SEED_RETRY = (1 + rand) * 0.6;
531     $SEED_WATCHER ||= AE::timer $SEED_RETRY, 0, \&seed_all;
532 root 1.69 }
533    
534     # sets new seed list, starts connecting
535     sub set_seeds(@) {
536     %SEED_NODE = ();
537 root 1.71 %NODE_SEED = ();
538     %SEED_CONNECT = ();
539    
540 root 1.69 @SEED_NODE{@_} = ();
541    
542 root 1.71 seed_all;
543     }
544    
545 root 1.111 # normal nodes only record global node connections
546     $NODE_REQ{g_global} = sub {
547     undef $GLOBAL_NODE{$SRCNODE};
548     };
549    
550 root 1.71 mon_nodes sub {
551 root 1.111 delete $GLOBAL_NODE{$_[0]}
552     unless $_[1];
553    
554 root 1.93 return unless exists $NODE_SEED{$_[0]};
555    
556     if ($_[1]) {
557     # each time a connection to a seed node goes up, make
558     # sure it runs the global service.
559 root 1.111 snd $_[0], "g_slave";
560 root 1.93 } else {
561     # if we lost the connection to a seed node, make sure we are seeding
562     seed_again;
563     }
564 root 1.71 };
565    
566     #############################################################################
567 root 1.107 # keepalive code - used to kepe conenctions to certain nodes alive
568     # only used by global code atm., but ought to be exposed somehow.
569 root 1.109 #TODO: should probbaly be done directly by node objects
570 root 1.107
571     our $KEEPALIVE_RETRY;
572     our $KEEPALIVE_WATCHER;
573     our %KEEPALIVE; # we want to keep these nodes alive
574     our %KEEPALIVE_DOWN; # nodes that are down currently
575    
576     sub keepalive_all {
577     AE::log 9 => "keepalive: trying to establish connections with: "
578     . (join " ", keys %KEEPALIVE_DOWN)
579     . ".";
580    
581     (add_node $_)->connect
582     for keys %KEEPALIVE_DOWN;
583    
584 root 1.111 $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2;
585 root 1.107 $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
586     if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
587    
588     $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
589     }
590    
591     sub keepalive_again {
592 root 1.111 $KEEPALIVE_RETRY = (1 + rand) * 0.3;
593 root 1.108 keepalive_all;
594 root 1.107 }
595    
596     sub keepalive_add {
597     return if $KEEPALIVE{$_[0]}++;
598    
599     return if node_is_up $_[0];
600     undef $KEEPALIVE_DOWN{$_[0]};
601     keepalive_again;
602     }
603    
604     sub keepalive_del {
605     return if --$KEEPALIVE{$_[0]};
606    
607     delete $KEEPALIVE {$_[0]};
608     delete $KEEPALIVE_DOWN{$_[0]};
609    
610     undef $KEEPALIVE_WATCHER
611     unless %KEEPALIVE_DOWN;
612     }
613    
614     mon_nodes sub {
615     return unless exists $KEEPALIVE{$_[0]};
616    
617     if ($_[1]) {
618     delete $KEEPALIVE_DOWN{$_[0]};
619    
620     undef $KEEPALIVE_WATCHER
621     unless %KEEPALIVE_DOWN;
622     } else {
623     # lost the conenction, try to connect again
624     undef $KEEPALIVE_DOWN{$_[0]};
625     keepalive_again;
626     }
627     };
628    
629     #############################################################################
630 root 1.71 # talk with/to global nodes
631    
632 root 1.72 # protocol messages:
633     #
634 root 1.110 # sent by global nodes
635 root 1.111 # g_global - global nodes send this to all others
636 root 1.72 #
637 root 1.110 # database protocol
638     # g_slave database - make other global node master of the sender
639     # g_set database - global node's database to other global nodes
640 root 1.111 # g_upd family set del - update single family (any to global)
641 root 1.73 #
642 root 1.110 # slave <-> global protocol
643     # g_find node - query addresses for node (slave to global)
644     # g_found node binds - node addresses (global to slave)
645     # g_db_family family id - send g_reply with data (global to slave)
646     # g_db_keys family id - send g_reply with data (global to slave)
647     # g_db_values family id - send g_reply with data (global to slave)
648     # g_reply id result - result of any query (global to slave)
649     # g_mon1 family - start to monitor family, replies with g_chg1
650     # g_mon0 family - stop monitoring family
651     # g_chg1 family hash - initial value of family when starting to monitor
652     # g_chg2 family set del - like g_upd, but for monitoring only
653 root 1.73 #
654 root 1.111 # internal database families:
655 root 1.73 # "'l" -> node -> listeners
656     # "'g" -> node -> undef
657     # ...
658 root 1.72 #
659    
660 root 1.73 # used on all nodes:
661 root 1.88 our $MASTER; # the global node we bind ourselves to
662 root 1.78 our $MASTER_MON;
663     our %LOCAL_DB; # this node database
664    
665 root 1.84 our $GPROTO = 1;
666    
667     # tell everybody who connects our nproto
668     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
669     $_[0]{local_greeting}{gproto} = $GPROTO;
670     };
671    
672 root 1.73 #############################################################################
673     # master selection
674    
675     # master requests
676     our %GLOBAL_REQ; # $id => \@req
677 root 1.71
678 root 1.74 sub global_req_add {
679 root 1.80 my ($id, $req) = @_;
680 root 1.71
681 root 1.74 return if exists $GLOBAL_REQ{$id};
682    
683 root 1.80 $GLOBAL_REQ{$id} = $req;
684 root 1.71
685 root 1.80 snd $MASTER, @$req
686 root 1.73 if $MASTER;
687 root 1.74 }
688 root 1.71
689 root 1.74 sub global_req_del {
690     delete $GLOBAL_REQ{$_[0]};
691     }
692    
693 root 1.88 #################################
694     # master rpc
695    
696     our %GLOBAL_RES;
697     our $GLOBAL_RES_ID = "a";
698    
699     sub global_call {
700     my $id = ++$GLOBAL_RES_ID;
701     $GLOBAL_RES{$id} = pop;
702     global_req_add $id, [@_, $id];
703     }
704    
705     $NODE_REQ{g_reply} = sub {
706     my $id = shift;
707     global_req_del $id;
708     my $cb = delete $GLOBAL_RES{$id}
709     or return;
710     &$cb
711     };
712    
713     #################################
714    
715 root 1.74 sub g_find {
716 root 1.80 global_req_add "g_find $_[0]", [g_find => $_[0]];
717 root 1.73 }
718 root 1.71
719 root 1.73 # reply for g_find started in Node.pm
720 root 1.79 $NODE_REQ{g_found} = sub {
721 root 1.74 global_req_del "g_find $_[0]";
722    
723 root 1.73 my $node = $NODE{$_[0]} or return;
724 root 1.71
725 root 1.79 $node->connect_to ($_[1]);
726 root 1.71 };
727    
728 root 1.73 sub master_set {
729     $MASTER = $_[0];
730 root 1.111 AE::log 8 => "new master node: $MASTER.";
731    
732     $MASTER_MON = mon_nodes sub {
733     if ($_[0] eq $MASTER && !$_[1]) {
734     undef $MASTER;
735     master_search ();
736     }
737     };
738 root 1.71
739 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
740 root 1.71
741 root 1.73 # (re-)send queued requests
742     snd $MASTER, @$_
743     for values %GLOBAL_REQ;
744     }
745 root 1.71
746 root 1.72 sub master_search {
747 root 1.111 AE::log 9 => "starting search for master node.";
748    
749 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
750     for (keys %NODE_SEED) {
751 root 1.72 if (node_is_up $_) {
752     master_set $_;
753     return;
754 root 1.71 }
755 root 1.72 }
756 root 1.71
757 root 1.78 $MASTER_MON = mon_nodes sub {
758 root 1.72 return unless $_[1]; # we are only interested in node-ups
759     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
760 root 1.71
761 root 1.72 master_set $_[0];
762     };
763 root 1.71 }
764    
765 root 1.110 # other node wants to make us the master, so start the global service
766 root 1.79 $NODE_REQ{g_slave} = sub {
767 root 1.80 # load global module and redo the request
768 root 1.73 require AnyEvent::MP::Global;
769 root 1.80 &{ $NODE_REQ{g_slave} }
770 root 1.71 };
771    
772 root 1.73 #############################################################################
773 root 1.79 # local database operations
774 root 1.71
775 root 1.111 # canonical probably not needed
776     our $sv_eq_coder = JSON::XS->new->utf8->allow_nonref;
777    
778     # are the two scalars equal? very very ugly and slow, need better way
779     sub sv_eq($$) {
780     ref $_[0] || ref $_[1]
781     ? (JSON::XS::encode $sv_eq_coder, $_[0]) eq (JSON::XS::encode $sv_eq_coder, $_[1])
782     : $_[0] eq $_[1]
783     && defined $_[0] == defined $_[1]
784     }
785    
786 root 1.79 # local database management
787 root 1.88
788 root 1.111 sub db_del($@) {
789     my $family = shift;
790    
791     my @del = grep exists $LOCAL_DB{$family}{$_}, @_;
792    
793     return unless @del;
794    
795     delete @{ $LOCAL_DB{$family} }{@del};
796     snd $MASTER, g_upd => $family => undef, \@del
797     if defined $MASTER;
798     }
799    
800 root 1.87 sub db_set($$;$) {
801 root 1.97 my ($family, $subkey) = @_;
802    
803 root 1.89 # if (ref $_[1]) {
804     # # bulk
805     # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
806     # $LOCAL_DB{$_[0]} = $_[1];
807     # snd $MASTER, g_upd => $_[0] => $_[1], \@del
808     # if defined $MASTER;
809     # } else {
810     # single-key
811 root 1.111 unless (exists $LOCAL_DB{$family}{$subkey} && sv_eq $LOCAL_DB{$family}{$subkey}, $_[2]) {
812     $LOCAL_DB{$family}{$subkey} = $_[2];
813     snd $MASTER, g_upd => $family => { $subkey => $_[2] }
814     if defined $MASTER;
815     }
816 root 1.89 # }
817 root 1.97
818     defined wantarray
819     and Guard::guard { db_del $family => $subkey }
820 root 1.79 }
821    
822 root 1.88 # database query
823    
824     sub db_family {
825     my ($family, $cb) = @_;
826     global_call g_db_family => $family, $cb;
827     }
828    
829     sub db_keys {
830     my ($family, $cb) = @_;
831     global_call g_db_keys => $family, $cb;
832     }
833    
834     sub db_values {
835     my ($family, $cb) = @_;
836     global_call g_db_values => $family, $cb;
837 root 1.80 }
838    
839 root 1.88 # database monitoring
840 root 1.80
841 root 1.81 our %LOCAL_MON; # f, reply
842     our %MON_DB; # f, k, value
843 root 1.80
844 root 1.81 sub db_mon($@) {
845 root 1.84 my ($family, $cb) = @_;
846 root 1.81
847 root 1.84 if (my $db = $MON_DB{$family}) {
848 root 1.89 # we already monitor, so create a "dummy" change event
849     # this is postponed, which might be too late (we could process
850     # change events), so disable the callback at first
851     $LOCAL_MON{$family}{$cb+0} = sub { };
852     AE::postpone {
853     return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
854    
855     # set actual callback
856     $LOCAL_MON{$family}{$cb+0} = $cb;
857     $cb->($db, [keys %$db]);
858     };
859 root 1.81 } else {
860     # new monitor, request chg1 from upstream
861 root 1.89 $LOCAL_MON{$family}{$cb+0} = $cb;
862 root 1.81 global_req_add "mon1 $family" => [g_mon1 => $family];
863     $MON_DB{$family} = {};
864     }
865    
866 root 1.90 defined wantarray
867     and Guard::guard {
868     my $mon = $LOCAL_MON{$family};
869     delete $mon->{$cb+0};
870    
871     unless (%$mon) {
872     global_req_del "mon1 $family";
873    
874     # no global_req, because we don't care if we are not connected
875     snd $MASTER, g_mon0 => $family
876     if $MASTER;
877 root 1.80
878 root 1.90 delete $LOCAL_MON{$family};
879     delete $MON_DB{$family};
880     }
881 root 1.80 }
882     }
883    
884 root 1.82 # full update
885 root 1.80 $NODE_REQ{g_chg1} = sub {
886 root 1.96 return unless $SRCNODE eq $MASTER;
887 root 1.84 my ($f, $ndb) = @_;
888    
889     my $db = $MON_DB{$f};
890 root 1.89 my (@a, @c, @d);
891 root 1.81
892 root 1.82 # add or replace keys
893 root 1.84 while (my ($k, $v) = each %$ndb) {
894 root 1.89 exists $db->{$k}
895     ? push @c, $k
896     : push @a, $k;
897 root 1.84 $db->{$k} = $v;
898 root 1.82 }
899 root 1.81
900 root 1.82 # delete keys that are no longer present
901 root 1.84 for (grep !exists $ndb->{$_}, keys %$db) {
902     delete $db->{$_};
903 root 1.89 push @d, $_;
904 root 1.81 }
905 root 1.84
906 root 1.89 $_->($db, \@a, \@c, \@d)
907 root 1.84 for values %{ $LOCAL_MON{$_[0]} };
908 root 1.80 };
909    
910 root 1.82 # incremental update
911 root 1.84 $NODE_REQ{g_chg2} = sub {
912 root 1.96 return unless $SRCNODE eq $MASTER;
913 root 1.89 my ($family, $set, $del) = @_;
914    
915     my $db = $MON_DB{$family};
916 root 1.84
917 root 1.89 my (@a, @c);
918 root 1.84
919 root 1.89 while (my ($k, $v) = each %$set) {
920     exists $db->{$k}
921     ? push @c, $k
922     : push @a, $k;
923     $db->{$k} = $v;
924     }
925    
926     delete @$db{@$del};
927    
928     $_->($db, \@a, \@c, $del)
929     for values %{ $LOCAL_MON{$family} };
930 root 1.84 };
931 root 1.80
932 root 1.69 #############################################################################
933     # configure
934    
935 root 1.81 sub nodename {
936 root 1.69 require POSIX;
937     (POSIX::uname ())[1]
938     }
939    
940     sub _resolve($) {
941     my ($nodeid) = @_;
942    
943     my $cv = AE::cv;
944     my @res;
945    
946     $cv->begin (sub {
947     my %seen;
948     my @refs;
949     for (sort { $a->[0] <=> $b->[0] } @res) {
950     push @refs, $_->[1] unless $seen{$_->[1]}++
951     }
952     shift->send (@refs);
953     });
954    
955     my $idx;
956     for my $t (split /,/, $nodeid) {
957     my $pri = ++$idx;
958    
959 root 1.81 $t = length $t ? nodename . ":$t" : nodename
960 root 1.69 if $t =~ /^\d*$/;
961    
962     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
963     or Carp::croak "$t: unparsable transport descriptor";
964    
965     $port = "0" if $port eq "*";
966    
967     if ($host eq "*") {
968     $cv->begin;
969    
970 root 1.103 my $get_addr = sub {
971     my @addr;
972    
973     require Net::Interface;
974    
975     # Net::Interface hangs on some systems, so hope for the best
976     local $SIG{ALRM} = 'DEFAULT';
977     alarm 2;
978    
979     for my $if (Net::Interface->interfaces) {
980     # we statically lower-prioritise ipv6 here, TODO :()
981     for $_ ($if->address (Net::Interface::AF_INET ())) {
982     next if /^\x7f/; # skip localhost etc.
983     push @addr, $_;
984 root 1.69 }
985 root 1.103 for ($if->address (Net::Interface::AF_INET6 ())) {
986     #next if $if->scope ($_) <= 2;
987     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
988     push @addr, $_;
989 root 1.69 }
990     }
991 root 1.103
992     alarm 0;
993    
994     @addr
995     };
996    
997     my @addr;
998    
999     if (AnyEvent::WIN32) {
1000     @addr = $get_addr->();
1001     } else {
1002     # use a child process, as Net::Interface is big, and we need it only once.
1003    
1004     pipe my $r, my $w
1005     or die "pipe: $!";
1006    
1007     if (fork eq 0) {
1008     close $r;
1009     syswrite $w, pack "(C/a*)*", $get_addr->();
1010     require POSIX;
1011     POSIX::_exit (0);
1012     } else {
1013     close $w;
1014    
1015     my $addr;
1016    
1017     1 while sysread $r, $addr, 1024, length $addr;
1018    
1019     @addr = unpack "(C/a*)*", $addr;
1020     }
1021     }
1022    
1023     for my $ip (@addr) {
1024     push @res, [
1025     $pri += 1e-5,
1026     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
1027     ];
1028     }
1029     $cv->end;
1030 root 1.69 } else {
1031     $cv->begin;
1032     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
1033     for (@_) {
1034     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
1035     push @res, [
1036     $pri += 1e-5,
1037     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
1038     ];
1039     }
1040     $cv->end;
1041     };
1042     }
1043     }
1044    
1045     $cv->end;
1046    
1047     $cv
1048     }
1049    
1050     sub configure(@) {
1051     unshift @_, "profile" if @_ & 1;
1052     my (%kv) = @_;
1053    
1054     my $profile = delete $kv{profile};
1055    
1056 root 1.81 $profile = nodename
1057 root 1.69 unless defined $profile;
1058    
1059     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
1060    
1061 root 1.104 $SECURE = $CONFIG->{secure};
1062 root 1.83
1063 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
1064 root 1.69
1065     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
1066    
1067 root 1.106 my $node_obj = delete $NODE{$NODE}; # we do not support doing stuff before configure
1068    
1069 root 1.72 $NODE = $node;
1070 root 1.77
1071 root 1.81 $NODE =~ s/%n/nodename/ge;
1072    
1073     if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
1074 root 1.77 # nodes with randomised node names do not need randomised port names
1075     $UNIQ = "";
1076     }
1077 root 1.69
1078 root 1.106 $node_obj->{id} = $NODE;
1079     $NODE{$NODE} = $node_obj;
1080 root 1.69
1081     my $seeds = $CONFIG->{seeds};
1082     my $binds = $CONFIG->{binds};
1083    
1084     $binds ||= ["*"];
1085    
1086 root 1.98 AE::log 8 => "node $NODE starting up.";
1087 root 1.69
1088 root 1.85 $BINDS = [];
1089     %BINDS = ();
1090 root 1.69
1091     for (map _resolve $_, @$binds) {
1092     for my $bind ($_->recv) {
1093     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1094     or Carp::croak "$bind: unparsable local bind address";
1095    
1096     my $listener = AnyEvent::MP::Transport::mp_server
1097     $host,
1098     $port,
1099     prepare => sub {
1100     my (undef, $host, $port) = @_;
1101     $bind = AnyEvent::Socket::format_hostport $host, $port;
1102     0
1103     },
1104     ;
1105 root 1.85 $BINDS{$bind} = $listener;
1106     push @$BINDS, $bind;
1107 root 1.69 }
1108     }
1109    
1110 root 1.85 db_set "'l" => $NODE => $BINDS;
1111 root 1.73
1112 root 1.98 AE::log 8 => "node listens on [@$BINDS].";
1113 root 1.69
1114     # connect to all seednodes
1115     set_seeds map $_->recv, map _resolve $_, @$seeds;
1116 root 1.73 master_search;
1117    
1118 root 1.103 # save gobs of memory
1119     undef &_resolve;
1120     *configure = sub (@){ };
1121    
1122 root 1.69 for (@{ $CONFIG->{services} }) {
1123     if (ref) {
1124     my ($func, @args) = @$_;
1125     (load_func $func)->(@args);
1126     } elsif (s/::$//) {
1127     eval "require $_";
1128     die $@ if $@;
1129     } else {
1130     (load_func $_)->();
1131     }
1132     }
1133 root 1.102
1134     eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
1135     die "$@" if $@;
1136 root 1.69 }
1137    
1138 root 1.1 =back
1139    
1140 root 1.101 =head1 LOGGING
1141    
1142     AnyEvent::MP::Kernel logs high-level information about the current node,
1143     when nodes go up and down, and most runtime errors. It also logs some
1144     debugging and trace messages about network maintainance, such as seed
1145     connections and global node management.
1146    
1147 root 1.1 =head1 SEE ALSO
1148    
1149     L<AnyEvent::MP>.
1150    
1151     =head1 AUTHOR
1152    
1153     Marc Lehmann <schmorp@schmorp.de>
1154     http://home.schmorp.de/
1155    
1156     =cut
1157    
1158     1
1159