ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.110
Committed: Sat Mar 24 16:50:15 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.109: +19 -11 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9 root 1.106 $AnyEvent::MP::Kernel::SRCNODE # contains msg origin node id, for debugging
10    
11     snd_to_func $node, $func, @args # send msg to function
12     snd_on $node, @msg # snd message again (relay)
13     eval_on $node, $string[, @reply] # execute perl code on another node
14    
15     node_is_up $nodeid # return true if a node is connected
16     @nodes = up_nodes # return a list of all connected nodes
17     $guard = mon_nodes $callback->($node, $is_up, @reason) # connections up/downs
18    
19 root 1.1 =head1 DESCRIPTION
20    
21 root 1.106 This module implements most of the inner workings of AnyEvent::MP. It
22     offers mostly lower-level functions that deal with network connectivity
23     and special requests.
24    
25     You normally interface with AnyEvent::MP through a higher level interface
26     such as L<AnyEvent::MP> and L<Coro::MP>, although there is nothing wrong
27     with using the functions from this module.
28 root 1.3
29     =head1 GLOBALS AND FUNCTIONS
30 root 1.1
31     =over 4
32    
33     =cut
34    
35     package AnyEvent::MP::Kernel;
36    
37     use common::sense;
38     use Carp ();
39    
40 root 1.79 use AnyEvent ();
41     use Guard ();
42 root 1.1
43     use AnyEvent::MP::Node;
44     use AnyEvent::MP::Transport;
45    
46     use base "Exporter";
47    
48 root 1.106 # for re-export in AnyEvent::MP and Coro::MP
49     our @EXPORT_API = qw(
50     NODE $NODE
51     configure
52     node_of port_is_local
53     snd kil
54     db_set db_del
55     db_mon db_family db_keys db_values
56     );
57    
58     our @EXPORT_OK = (
59     # these are internal
60     qw(
61     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
62     add_node load_func
63     ),
64     @EXPORT_API,
65 root 1.71 );
66    
67 root 1.1 our @EXPORT = qw(
68 root 1.106 snd_to_func snd_on eval_on
69     port_is_local
70 root 1.46 up_nodes mon_nodes node_is_up
71 root 1.1 );
72    
73 root 1.6 sub load_func($) {
74     my $func = $_[0];
75    
76     unless (defined &$func) {
77     my $pkg = $func;
78     do {
79     $pkg =~ s/::[^:]+$//
80 root 1.63 or return sub { die "unable to resolve function '$func'" };
81 root 1.60
82     local $@;
83 root 1.61 unless (eval "require $pkg; 1") {
84     my $error = $@;
85     $error =~ /^Can't locate .*.pm in \@INC \(/
86     or return sub { die $error };
87     }
88 root 1.6 } until defined &$func;
89     }
90    
91     \&$func
92     }
93    
94 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
95    
96 root 1.1 sub nonce($) {
97 root 1.78 join "", map chr rand 256, 1 .. $_[0]
98 root 1.1 }
99    
100 root 1.78 sub nonce62($) {
101     join "", map $alnum[rand 62], 1 .. $_[0]
102 root 1.1 }
103    
104 root 1.20 our $CONFIG; # this node's configuration
105 root 1.104 our $SECURE;
106 root 1.21
107 root 1.64 our $RUNIQ; # remote uniq value
108     our $UNIQ; # per-process/node unique cookie
109     our $NODE;
110     our $ID = "a";
111 root 1.1
112     our %NODE; # node id to transport mapping, or "undef", for local node
113     our (%PORT, %PORT_DATA); # local ports
114    
115 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
116 root 1.1 our %LMON; # monitored _local_ ports
117    
118 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
119 root 1.85 our %BINDS;
120     our $BINDS; # our listeners, as arrayref
121 root 1.1
122 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
123 root 1.1
124 root 1.106 # initialise names for non-networked operation
125     {
126 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
127 root 1.106 my $now = AE::now;
128     $UNIQ =
129     (join "",
130     map $alnum[$_],
131     $$ / 62 % 62,
132     $$ % 62,
133     (int $now ) % 62,
134     (int $now * 100) % 62,
135     (int $now * 10000) % 62,
136     ) . nonce62 4
137     ;
138 root 1.78
139     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
140     $RUNIQ = nonce62 10;
141     $RUNIQ =~ s/(.)$/\U$1/;
142    
143 root 1.106 $NODE = "";
144 root 1.64 }
145    
146 root 1.1 sub NODE() {
147     $NODE
148     }
149    
150     sub node_of($) {
151 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
152 root 1.1
153 root 1.21 $node
154 root 1.1 }
155    
156 root 1.17 BEGIN {
157     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
158     ? sub () { 1 }
159     : sub () { 0 };
160     }
161 root 1.1
162 root 1.42 our $DELAY_TIMER;
163     our @DELAY_QUEUE;
164    
165 root 1.97 our $delay_run = sub {
166 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
167 root 1.97 };
168 root 1.42
169     sub delay($) {
170     push @DELAY_QUEUE, shift;
171 root 1.97 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
172 root 1.42 }
173    
174 root 1.96 =item $AnyEvent::MP::Kernel::SRCNODE
175    
176     During execution of a message callback, this variable contains the node ID
177     of the origin node.
178    
179     The main use of this variable is for debugging output - there are probably
180     very few other cases where you need to know the source node ID.
181    
182     =cut
183    
184 root 1.1 sub _inject {
185 root 1.96 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
186 root 1.1 &{ $PORT{+shift} or return };
187     }
188    
189 root 1.20 # this function adds a node-ref, so you can send stuff to it
190     # it is basically the central routing component.
191 root 1.1 sub add_node {
192 root 1.107 $NODE{$_[0]} || do {
193     my ($node) = @_;
194 root 1.1
195 root 1.107 length $node
196     or Carp::croak "'undef' or the empty string are not valid node/port IDs";
197 root 1.93
198 root 1.107 # registers itself in %NODE
199     new AnyEvent::MP::Node::Remote $node
200     }
201 root 1.13 }
202    
203 root 1.1 sub snd(@) {
204 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
205 root 1.1
206 root 1.86 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
207 root 1.1
208 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
209 root 1.2 ->{send} (["$portid", @_]);
210 root 1.1 }
211    
212 root 1.17 sub port_is_local($) {
213 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
214 root 1.17
215 root 1.106 $nodeid eq $NODE
216 root 1.17 }
217    
218 root 1.18 =item snd_to_func $node, $func, @args
219 root 1.11
220 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
221 root 1.11 this function with the given arguments on that node.
222    
223 root 1.20 This function can be used to implement C<spawn>-like interfaces.
224 root 1.11
225     =cut
226    
227 root 1.18 sub snd_to_func($$;@) {
228 root 1.21 my $nodeid = shift;
229 root 1.11
230 root 1.41 # on $NODE, we artificially delay... (for spawn)
231     # this is very ugly - maybe we should simply delay ALL messages,
232     # to avoid deep recursion issues. but that's so... slow...
233 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
234     if $nodeid ne $NODE;
235    
236 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
237 root 1.11 }
238    
239 root 1.18 =item snd_on $node, @msg
240    
241     Executes C<snd> with the given C<@msg> (which must include the destination
242     port) on the given node.
243    
244     =cut
245    
246     sub snd_on($@) {
247     my $node = shift;
248     snd $node, snd => @_;
249     }
250    
251 root 1.29 =item eval_on $node, $string[, @reply]
252 root 1.18
253 root 1.29 Evaluates the given string as Perl expression on the given node. When
254     @reply is specified, then it is used to construct a reply message with
255     C<"$@"> and any results from the eval appended.
256 root 1.18
257     =cut
258    
259 root 1.29 sub eval_on($$;@) {
260 root 1.18 my $node = shift;
261     snd $node, eval => @_;
262     }
263    
264 root 1.1 sub kil(@) {
265 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
266 root 1.1
267     length $portid
268 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
269 root 1.1
270 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
271 root 1.1 ->kill ("$portid", @_);
272     }
273    
274     #############################################################################
275 root 1.6 # node monitoring and info
276 root 1.3
277 root 1.106 =item $bool = node_is_up $nodeid
278 root 1.13
279     Returns true if the given node is "up", that is, the kernel thinks it has
280     a working connection to it.
281    
282 root 1.107 More precisely, if the node is up, returns C<1>. If the node is currently
283     connecting or otherwise known but not connected, returns C<0>. If nothing
284     is known about the node, returns C<undef>.
285 root 1.13
286     =cut
287    
288     sub node_is_up($) {
289 root 1.107 ($_[0] eq $NODE) || ($NODE{$_[0]} or return)->{transport}
290 root 1.13 ? 1 : 0
291     }
292    
293 root 1.106 =item @nodes = up_nodes
294 root 1.3
295 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
296     the node itself).
297 root 1.3
298     =cut
299    
300 root 1.49 sub up_nodes() {
301 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
302 root 1.3 }
303    
304 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
305 root 1.3
306 root 1.27 Registers a callback that is called each time a node goes up (a connection
307     is established) or down (the connection is lost).
308 root 1.3
309     Node up messages can only be followed by node down messages for the same
310     node, and vice versa.
311    
312 root 1.71 Note that monitoring a node is usually better done by monitoring its node
313 root 1.27 port. This function is mainly of interest to modules that are concerned
314     about the network topology and low-level connection handling.
315    
316     Callbacks I<must not> block and I<should not> send any messages.
317    
318     The function returns an optional guard which can be used to unregister
319 root 1.3 the monitoring callback again.
320    
321 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
322     or go up (and down).
323    
324     newnode $_, 1 for up_nodes;
325     mon_nodes \&newnode;
326    
327 root 1.3 =cut
328    
329     our %MON_NODES;
330    
331     sub mon_nodes($) {
332     my ($cb) = @_;
333    
334     $MON_NODES{$cb+0} = $cb;
335    
336 root 1.90 defined wantarray
337     and Guard::guard { delete $MON_NODES{$cb+0} }
338 root 1.3 }
339    
340     sub _inject_nodeevent($$;@) {
341 root 1.16 my ($node, $up, @reason) = @_;
342 root 1.3
343 root 1.107 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
344    
345 root 1.3 for my $cb (values %MON_NODES) {
346 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
347 root 1.98 or AE::log die => $@;
348 root 1.3 }
349     }
350    
351     #############################################################################
352 root 1.1 # self node code
353    
354 root 1.67 sub _kill {
355     my $port = shift;
356    
357     delete $PORT{$port}
358     or return; # killing nonexistent ports is O.K.
359     delete $PORT_DATA{$port};
360    
361     my $mon = delete $LMON{$port}
362     or !@_
363 root 1.98 or AE::log die => "unmonitored local port $port died with reason: @_";
364 root 1.67
365     $_->(@_) for values %$mon;
366     }
367    
368     sub _monitor {
369     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
370     unless exists $PORT{$_[1]};
371    
372     $LMON{$_[1]}{$_[2]+0} = $_[2];
373     }
374    
375     sub _unmonitor {
376 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
377     if exists $LMON{$_[1]};
378 root 1.67 }
379    
380 root 1.83 sub _secure_check {
381 root 1.104 $SECURE
382 root 1.105 and die "remote execution not allowed\n";
383 root 1.83 }
384    
385 root 1.107 our %NODE_REQ;
386    
387     %NODE_REQ = (
388     # "mproto" - monitoring protocol
389 root 1.1
390     # monitoring
391 root 1.65 mon0 => sub { # stop monitoring a port for another node
392 root 1.1 my $portid = shift;
393 root 1.96 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
394 root 1.1 },
395 root 1.65 mon1 => sub { # start monitoring a port for another node
396 root 1.1 my $portid = shift;
397 root 1.96 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
398 root 1.67 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
399 root 1.58 delete $node->{rmon}{$portid};
400 root 1.65 $node->send (["", kil0 => $portid, @_])
401 root 1.59 if $node && $node->{transport};
402 root 1.67 };
403 root 1.1 },
404 root 1.65 # another node has killed a monitored port
405     kil0 => sub {
406 root 1.96 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
407 root 1.1 or return;
408    
409     $_->(@_) for @$cbs;
410     },
411 root 1.107 # another node wants to kill a local port
412     kil1 => \&_kill,
413 root 1.1
414 root 1.18 # "public" services - not actually public
415 root 1.1
416     # relay message to another node / generic echo
417 root 1.88 snd => sub {
418     &snd
419 root 1.1 },
420 root 1.107 # ask if a node supports the given request, only works for fixed tags
421     can => sub {
422     my $method = shift;
423     snd @_, exists $NODE_REQ{$method};
424     },
425 root 1.1
426 root 1.30 # random utilities
427 root 1.1 eval => sub {
428 root 1.83 &_secure_check;
429 root 1.50 my @res = do { package main; eval shift };
430 root 1.1 snd @_, "$@", @res if @_;
431     },
432     time => sub {
433 root 1.76 snd @_, AE::now;
434 root 1.1 },
435     devnull => sub {
436     #
437     },
438 root 1.15 "" => sub {
439 root 1.27 # empty messages are keepalives or similar devnull-applications
440 root 1.15 },
441 root 1.1 );
442    
443 root 1.104 # the node port
444 root 1.107 new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE
445    
446 root 1.1 $PORT{""} = sub {
447     my $tag = shift;
448 root 1.83 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
449 root 1.98 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
450 root 1.1 };
451    
452 root 1.107 our $MPROTO = 1;
453 root 1.84
454     # tell everybody who connects our nproto
455     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
456 root 1.107 $_[0]{local_greeting}{mproto} = $MPROTO;
457 root 1.84 };
458    
459 root 1.69 #############################################################################
460 root 1.71 # seed management, try to keep connections to all seeds at all times
461 root 1.69
462 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
463     our %NODE_SEED; # map node ID to seed ID
464 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
465     our $SEED_WATCHER;
466 root 1.71 our $SEED_RETRY;
467 root 1.69
468     sub seed_connect {
469     my ($seed) = @_;
470    
471     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
472     or Carp::croak "$seed: unparsable seed address";
473    
474 root 1.98 AE::log 9 => "trying connect to seed node $seed.";
475 root 1.69
476 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
477     $host, $port,
478     on_greeted => sub {
479     # called after receiving remote greeting, learn remote node name
480    
481     # we rely on untrusted data here (the remote node name) this is
482     # hopefully ok, as this can at most be used for DOSing, which is easy
483     # when you can do MITM anyway.
484    
485     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
486     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
487 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
488 root 1.71 delete $SEED_NODE{$seed};
489     } else {
490     $SEED_NODE{$seed} = $_[0]{remote_node};
491     $NODE_SEED{$_[0]{remote_node}} = $seed;
492 root 1.93 # also start global service, if not running
493     # we need to check here in addition to the mon_nodes below
494     # because we might only learn late that a node is a seed
495     # and then we might already be connected
496     snd $_[0]{remote_node}, "g_slave"
497     unless $_[0]{remote_greeting}{global};
498 root 1.71 }
499     },
500 root 1.93 sub {
501 root 1.71 delete $SEED_CONNECT{$seed};
502     }
503 root 1.69 ;
504     }
505    
506     sub seed_all {
507 root 1.93 my @seeds = grep
508     !exists $SEED_CONNECT{$_}
509     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
510     keys %SEED_NODE;
511 root 1.69
512     if (@seeds) {
513 root 1.70 # start connection attempt for every seed we are not connected to yet
514 root 1.69 seed_connect $_
515     for @seeds;
516 root 1.71
517     $SEED_RETRY = $SEED_RETRY * 2 + rand;
518     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
519     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
520    
521     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
522    
523 root 1.69 } else {
524 root 1.71 # all seeds connected or connecting, no need to restart timer
525 root 1.69 undef $SEED_WATCHER;
526     }
527     }
528    
529     sub seed_again {
530 root 1.71 $SEED_RETRY = 1;
531     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
532 root 1.69 }
533    
534     # sets new seed list, starts connecting
535     sub set_seeds(@) {
536     %SEED_NODE = ();
537 root 1.71 %NODE_SEED = ();
538     %SEED_CONNECT = ();
539    
540 root 1.69 @SEED_NODE{@_} = ();
541    
542 root 1.71 seed_all;
543     }
544    
545     mon_nodes sub {
546 root 1.93 return unless exists $NODE_SEED{$_[0]};
547    
548     if ($_[1]) {
549     # each time a connection to a seed node goes up, make
550     # sure it runs the global service.
551     snd $_[0], "g_slave"
552     unless $NODE{$_[0]}{transport}{remote_greeting}{global};
553     } else {
554     # if we lost the connection to a seed node, make sure we are seeding
555     seed_again;
556     }
557 root 1.71 };
558    
559     #############################################################################
560 root 1.107 # keepalive code - used to kepe conenctions to certain nodes alive
561     # only used by global code atm., but ought to be exposed somehow.
562 root 1.109 #TODO: should probbaly be done directly by node objects
563 root 1.107
564     our $KEEPALIVE_RETRY;
565     our $KEEPALIVE_WATCHER;
566     our %KEEPALIVE; # we want to keep these nodes alive
567     our %KEEPALIVE_DOWN; # nodes that are down currently
568    
569     sub keepalive_all {
570     AE::log 9 => "keepalive: trying to establish connections with: "
571     . (join " ", keys %KEEPALIVE_DOWN)
572     . ".";
573    
574     (add_node $_)->connect
575     for keys %KEEPALIVE_DOWN;
576    
577     $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2 + rand;
578     $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
579     if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
580    
581     $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
582     }
583    
584     sub keepalive_again {
585     $KEEPALIVE_RETRY = 1;
586 root 1.108 keepalive_all;
587 root 1.107 }
588    
589     sub keepalive_add {
590     return if $KEEPALIVE{$_[0]}++;
591    
592     return if node_is_up $_[0];
593     undef $KEEPALIVE_DOWN{$_[0]};
594     keepalive_again;
595     }
596    
597     sub keepalive_del {
598     return if --$KEEPALIVE{$_[0]};
599    
600     delete $KEEPALIVE {$_[0]};
601     delete $KEEPALIVE_DOWN{$_[0]};
602    
603     undef $KEEPALIVE_WATCHER
604     unless %KEEPALIVE_DOWN;
605     }
606    
607     mon_nodes sub {
608     return unless exists $KEEPALIVE{$_[0]};
609    
610     if ($_[1]) {
611     delete $KEEPALIVE_DOWN{$_[0]};
612    
613     undef $KEEPALIVE_WATCHER
614     unless %KEEPALIVE_DOWN;
615     } else {
616     # lost the conenction, try to connect again
617     undef $KEEPALIVE_DOWN{$_[0]};
618     keepalive_again;
619     }
620     };
621    
622     #############################################################################
623 root 1.71 # talk with/to global nodes
624    
625 root 1.72 # protocol messages:
626     #
627 root 1.110 # sent by global nodes
628     # g_global - node became global, similar to global=1 greeting
629 root 1.72 #
630 root 1.110 # database protocol
631     # g_slave database - make other global node master of the sender
632     # g_set database - global node's database to other global nodes
633     # g_upd family set del - update single family (all to global)
634 root 1.73 #
635 root 1.110 # slave <-> global protocol
636     # g_find node - query addresses for node (slave to global)
637     # g_found node binds - node addresses (global to slave)
638     # g_db_family family id - send g_reply with data (global to slave)
639     # g_db_keys family id - send g_reply with data (global to slave)
640     # g_db_values family id - send g_reply with data (global to slave)
641     # g_reply id result - result of any query (global to slave)
642     # g_mon1 family - start to monitor family, replies with g_chg1
643     # g_mon0 family - stop monitoring family
644     # g_chg1 family hash - initial value of family when starting to monitor
645     # g_chg2 family set del - like g_upd, but for monitoring only
646 root 1.73 #
647 root 1.110 # internal database families
648 root 1.73 # "'l" -> node -> listeners
649     # "'g" -> node -> undef
650     # ...
651 root 1.72 #
652    
653 root 1.73 # used on all nodes:
654 root 1.88 our $MASTER; # the global node we bind ourselves to
655 root 1.78 our $MASTER_MON;
656     our %LOCAL_DB; # this node database
657    
658     our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
659 root 1.71
660 root 1.84 our $GPROTO = 1;
661    
662     # tell everybody who connects our nproto
663     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
664     $_[0]{local_greeting}{gproto} = $GPROTO;
665     };
666    
667 root 1.73 #############################################################################
668     # master selection
669    
670     # master requests
671     our %GLOBAL_REQ; # $id => \@req
672 root 1.71
673 root 1.74 sub global_req_add {
674 root 1.80 my ($id, $req) = @_;
675 root 1.71
676 root 1.74 return if exists $GLOBAL_REQ{$id};
677    
678 root 1.80 $GLOBAL_REQ{$id} = $req;
679 root 1.71
680 root 1.80 snd $MASTER, @$req
681 root 1.73 if $MASTER;
682 root 1.74 }
683 root 1.71
684 root 1.74 sub global_req_del {
685     delete $GLOBAL_REQ{$_[0]};
686     }
687    
688 root 1.88 #################################
689     # master rpc
690    
691     our %GLOBAL_RES;
692     our $GLOBAL_RES_ID = "a";
693    
694     sub global_call {
695     my $id = ++$GLOBAL_RES_ID;
696     $GLOBAL_RES{$id} = pop;
697     global_req_add $id, [@_, $id];
698     }
699    
700     $NODE_REQ{g_reply} = sub {
701     my $id = shift;
702     global_req_del $id;
703     my $cb = delete $GLOBAL_RES{$id}
704     or return;
705     &$cb
706     };
707    
708     #################################
709    
710 root 1.74 sub g_find {
711 root 1.80 global_req_add "g_find $_[0]", [g_find => $_[0]];
712 root 1.73 }
713 root 1.71
714 root 1.73 # reply for g_find started in Node.pm
715 root 1.79 $NODE_REQ{g_found} = sub {
716 root 1.74 global_req_del "g_find $_[0]";
717    
718 root 1.73 my $node = $NODE{$_[0]} or return;
719 root 1.71
720 root 1.79 $node->connect_to ($_[1]);
721 root 1.71 };
722    
723 root 1.73 sub master_set {
724     $MASTER = $_[0];
725 root 1.71
726 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
727 root 1.71
728 root 1.73 # (re-)send queued requests
729     snd $MASTER, @$_
730     for values %GLOBAL_REQ;
731     }
732 root 1.71
733 root 1.72 sub master_search {
734 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
735     for (keys %NODE_SEED) {
736 root 1.72 if (node_is_up $_) {
737     master_set $_;
738     return;
739 root 1.71 }
740 root 1.72 }
741 root 1.71
742 root 1.78 $MASTER_MON = mon_nodes sub {
743 root 1.72 return unless $_[1]; # we are only interested in node-ups
744     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
745 root 1.71
746 root 1.72 master_set $_[0];
747 root 1.71
748 root 1.78 $MASTER_MON = mon_nodes sub {
749 root 1.72 if ($_[0] eq $MASTER && !$_[1]) {
750     undef $MASTER;
751     master_search ();
752     }
753 root 1.71 };
754 root 1.72 };
755 root 1.71 }
756    
757 root 1.110 # other node wants to make us the master, so start the global service
758 root 1.79 $NODE_REQ{g_slave} = sub {
759 root 1.73 my ($db) = @_;
760    
761 root 1.80 # load global module and redo the request
762 root 1.73 require AnyEvent::MP::Global;
763 root 1.80 &{ $NODE_REQ{g_slave} }
764 root 1.71 };
765    
766 root 1.73 #############################################################################
767 root 1.79 # local database operations
768 root 1.71
769 root 1.79 # local database management
770 root 1.88
771 root 1.87 sub db_set($$;$) {
772 root 1.97 my ($family, $subkey) = @_;
773    
774 root 1.89 # if (ref $_[1]) {
775     # # bulk
776     # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
777     # $LOCAL_DB{$_[0]} = $_[1];
778     # snd $MASTER, g_upd => $_[0] => $_[1], \@del
779     # if defined $MASTER;
780     # } else {
781     # single-key
782 root 1.97 $LOCAL_DB{$family}{$subkey} = $_[2];
783     snd $MASTER, g_upd => $family => { $subkey => $_[2] }
784 root 1.89 if defined $MASTER;
785     # }
786 root 1.97
787     defined wantarray
788     and Guard::guard { db_del $family => $subkey }
789 root 1.79 }
790    
791 root 1.89 sub db_del($@) {
792     my $family = shift;
793    
794 root 1.109 my @del = grep exists $LOCAL_DB{$family}{$_}, @_;
795    
796     return unless @del;
797    
798     delete @{ $LOCAL_DB{$family} }{@del};
799     snd $MASTER, g_upd => $family => undef, \@del
800 root 1.79 if defined $MASTER;
801     }
802    
803 root 1.88 # database query
804    
805     sub db_family {
806     my ($family, $cb) = @_;
807     global_call g_db_family => $family, $cb;
808     }
809    
810     sub db_keys {
811     my ($family, $cb) = @_;
812     global_call g_db_keys => $family, $cb;
813     }
814    
815     sub db_values {
816     my ($family, $cb) = @_;
817     global_call g_db_values => $family, $cb;
818 root 1.80 }
819    
820 root 1.88 # database monitoring
821 root 1.80
822 root 1.81 our %LOCAL_MON; # f, reply
823     our %MON_DB; # f, k, value
824 root 1.80
825 root 1.81 sub db_mon($@) {
826 root 1.84 my ($family, $cb) = @_;
827 root 1.81
828 root 1.84 if (my $db = $MON_DB{$family}) {
829 root 1.89 # we already monitor, so create a "dummy" change event
830     # this is postponed, which might be too late (we could process
831     # change events), so disable the callback at first
832     $LOCAL_MON{$family}{$cb+0} = sub { };
833     AE::postpone {
834     return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
835    
836     # set actual callback
837     $LOCAL_MON{$family}{$cb+0} = $cb;
838     $cb->($db, [keys %$db]);
839     };
840 root 1.81 } else {
841     # new monitor, request chg1 from upstream
842 root 1.89 $LOCAL_MON{$family}{$cb+0} = $cb;
843 root 1.81 global_req_add "mon1 $family" => [g_mon1 => $family];
844     $MON_DB{$family} = {};
845     }
846    
847 root 1.90 defined wantarray
848     and Guard::guard {
849     my $mon = $LOCAL_MON{$family};
850     delete $mon->{$cb+0};
851    
852     unless (%$mon) {
853     global_req_del "mon1 $family";
854    
855     # no global_req, because we don't care if we are not connected
856     snd $MASTER, g_mon0 => $family
857     if $MASTER;
858 root 1.80
859 root 1.90 delete $LOCAL_MON{$family};
860     delete $MON_DB{$family};
861     }
862 root 1.80 }
863     }
864    
865 root 1.82 # full update
866 root 1.80 $NODE_REQ{g_chg1} = sub {
867 root 1.96 return unless $SRCNODE eq $MASTER;
868 root 1.84 my ($f, $ndb) = @_;
869    
870     my $db = $MON_DB{$f};
871 root 1.89 my (@a, @c, @d);
872 root 1.81
873 root 1.82 # add or replace keys
874 root 1.84 while (my ($k, $v) = each %$ndb) {
875 root 1.89 exists $db->{$k}
876     ? push @c, $k
877     : push @a, $k;
878 root 1.84 $db->{$k} = $v;
879 root 1.82 }
880 root 1.81
881 root 1.82 # delete keys that are no longer present
882 root 1.84 for (grep !exists $ndb->{$_}, keys %$db) {
883     delete $db->{$_};
884 root 1.89 push @d, $_;
885 root 1.81 }
886 root 1.84
887 root 1.89 $_->($db, \@a, \@c, \@d)
888 root 1.84 for values %{ $LOCAL_MON{$_[0]} };
889 root 1.80 };
890    
891 root 1.82 # incremental update
892 root 1.84 $NODE_REQ{g_chg2} = sub {
893 root 1.96 return unless $SRCNODE eq $MASTER;
894 root 1.89 my ($family, $set, $del) = @_;
895    
896     my $db = $MON_DB{$family};
897 root 1.84
898 root 1.89 my (@a, @c);
899 root 1.84
900 root 1.89 while (my ($k, $v) = each %$set) {
901     exists $db->{$k}
902     ? push @c, $k
903     : push @a, $k;
904     $db->{$k} = $v;
905     }
906    
907     delete @$db{@$del};
908    
909     $_->($db, \@a, \@c, $del)
910     for values %{ $LOCAL_MON{$family} };
911 root 1.84 };
912 root 1.80
913 root 1.69 #############################################################################
914     # configure
915    
916 root 1.81 sub nodename {
917 root 1.69 require POSIX;
918     (POSIX::uname ())[1]
919     }
920    
921     sub _resolve($) {
922     my ($nodeid) = @_;
923    
924     my $cv = AE::cv;
925     my @res;
926    
927     $cv->begin (sub {
928     my %seen;
929     my @refs;
930     for (sort { $a->[0] <=> $b->[0] } @res) {
931     push @refs, $_->[1] unless $seen{$_->[1]}++
932     }
933     shift->send (@refs);
934     });
935    
936     my $idx;
937     for my $t (split /,/, $nodeid) {
938     my $pri = ++$idx;
939    
940 root 1.81 $t = length $t ? nodename . ":$t" : nodename
941 root 1.69 if $t =~ /^\d*$/;
942    
943     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
944     or Carp::croak "$t: unparsable transport descriptor";
945    
946     $port = "0" if $port eq "*";
947    
948     if ($host eq "*") {
949     $cv->begin;
950    
951 root 1.103 my $get_addr = sub {
952     my @addr;
953    
954     require Net::Interface;
955    
956     # Net::Interface hangs on some systems, so hope for the best
957     local $SIG{ALRM} = 'DEFAULT';
958     alarm 2;
959    
960     for my $if (Net::Interface->interfaces) {
961     # we statically lower-prioritise ipv6 here, TODO :()
962     for $_ ($if->address (Net::Interface::AF_INET ())) {
963     next if /^\x7f/; # skip localhost etc.
964     push @addr, $_;
965 root 1.69 }
966 root 1.103 for ($if->address (Net::Interface::AF_INET6 ())) {
967     #next if $if->scope ($_) <= 2;
968     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
969     push @addr, $_;
970 root 1.69 }
971     }
972 root 1.103
973     alarm 0;
974    
975     @addr
976     };
977    
978     my @addr;
979    
980     if (AnyEvent::WIN32) {
981     @addr = $get_addr->();
982     } else {
983     # use a child process, as Net::Interface is big, and we need it only once.
984    
985     pipe my $r, my $w
986     or die "pipe: $!";
987    
988     if (fork eq 0) {
989     close $r;
990     syswrite $w, pack "(C/a*)*", $get_addr->();
991     require POSIX;
992     POSIX::_exit (0);
993     } else {
994     close $w;
995    
996     my $addr;
997    
998     1 while sysread $r, $addr, 1024, length $addr;
999    
1000     @addr = unpack "(C/a*)*", $addr;
1001     }
1002     }
1003    
1004     for my $ip (@addr) {
1005     push @res, [
1006     $pri += 1e-5,
1007     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
1008     ];
1009     }
1010     $cv->end;
1011 root 1.69 } else {
1012     $cv->begin;
1013     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
1014     for (@_) {
1015     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
1016     push @res, [
1017     $pri += 1e-5,
1018     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
1019     ];
1020     }
1021     $cv->end;
1022     };
1023     }
1024     }
1025    
1026     $cv->end;
1027    
1028     $cv
1029     }
1030    
1031     sub configure(@) {
1032     unshift @_, "profile" if @_ & 1;
1033     my (%kv) = @_;
1034    
1035     my $profile = delete $kv{profile};
1036    
1037 root 1.81 $profile = nodename
1038 root 1.69 unless defined $profile;
1039    
1040     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
1041    
1042 root 1.104 $SECURE = $CONFIG->{secure};
1043 root 1.83
1044 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
1045 root 1.69
1046     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
1047    
1048 root 1.106 my $node_obj = delete $NODE{$NODE}; # we do not support doing stuff before configure
1049    
1050 root 1.72 $NODE = $node;
1051 root 1.77
1052 root 1.81 $NODE =~ s/%n/nodename/ge;
1053    
1054     if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
1055 root 1.77 # nodes with randomised node names do not need randomised port names
1056     $UNIQ = "";
1057     }
1058 root 1.69
1059 root 1.106 $node_obj->{id} = $NODE;
1060     $NODE{$NODE} = $node_obj;
1061 root 1.69
1062     my $seeds = $CONFIG->{seeds};
1063     my $binds = $CONFIG->{binds};
1064    
1065     $binds ||= ["*"];
1066    
1067 root 1.98 AE::log 8 => "node $NODE starting up.";
1068 root 1.69
1069 root 1.85 $BINDS = [];
1070     %BINDS = ();
1071 root 1.69
1072     for (map _resolve $_, @$binds) {
1073     for my $bind ($_->recv) {
1074     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1075     or Carp::croak "$bind: unparsable local bind address";
1076    
1077     my $listener = AnyEvent::MP::Transport::mp_server
1078     $host,
1079     $port,
1080     prepare => sub {
1081     my (undef, $host, $port) = @_;
1082     $bind = AnyEvent::Socket::format_hostport $host, $port;
1083     0
1084     },
1085     ;
1086 root 1.85 $BINDS{$bind} = $listener;
1087     push @$BINDS, $bind;
1088 root 1.69 }
1089     }
1090    
1091 root 1.85 db_set "'l" => $NODE => $BINDS;
1092 root 1.73
1093 root 1.98 AE::log 8 => "node listens on [@$BINDS].";
1094 root 1.69
1095     # connect to all seednodes
1096     set_seeds map $_->recv, map _resolve $_, @$seeds;
1097 root 1.73 master_search;
1098    
1099 root 1.103 # save gobs of memory
1100     undef &_resolve;
1101     *configure = sub (@){ };
1102    
1103 root 1.69 for (@{ $CONFIG->{services} }) {
1104     if (ref) {
1105     my ($func, @args) = @$_;
1106     (load_func $func)->(@args);
1107     } elsif (s/::$//) {
1108     eval "require $_";
1109     die $@ if $@;
1110     } else {
1111     (load_func $_)->();
1112     }
1113     }
1114 root 1.102
1115     eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
1116     die "$@" if $@;
1117 root 1.69 }
1118    
1119 root 1.1 =back
1120    
1121 root 1.101 =head1 LOGGING
1122    
1123     AnyEvent::MP::Kernel logs high-level information about the current node,
1124     when nodes go up and down, and most runtime errors. It also logs some
1125     debugging and trace messages about network maintainance, such as seed
1126     connections and global node management.
1127    
1128 root 1.1 =head1 SEE ALSO
1129    
1130     L<AnyEvent::MP>.
1131    
1132     =head1 AUTHOR
1133    
1134     Marc Lehmann <schmorp@schmorp.de>
1135     http://home.schmorp.de/
1136    
1137     =cut
1138    
1139     1
1140