ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.107
Committed: Sat Mar 24 00:48:53 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.106: +90 -20 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9 root 1.106 $AnyEvent::MP::Kernel::SRCNODE # contains msg origin node id, for debugging
10    
11     snd_to_func $node, $func, @args # send msg to function
12     snd_on $node, @msg # snd message again (relay)
13     eval_on $node, $string[, @reply] # execute perl code on another node
14    
15     node_is_up $nodeid # return true if a node is connected
16     @nodes = up_nodes # return a list of all connected nodes
17     $guard = mon_nodes $callback->($node, $is_up, @reason) # connections up/downs
18    
19 root 1.1 =head1 DESCRIPTION
20    
21 root 1.106 This module implements most of the inner workings of AnyEvent::MP. It
22     offers mostly lower-level functions that deal with network connectivity
23     and special requests.
24    
25     You normally interface with AnyEvent::MP through a higher level interface
26     such as L<AnyEvent::MP> and L<Coro::MP>, although there is nothing wrong
27     with using the functions from this module.
28 root 1.3
29     =head1 GLOBALS AND FUNCTIONS
30 root 1.1
31     =over 4
32    
33     =cut
34    
35     package AnyEvent::MP::Kernel;
36    
37     use common::sense;
38     use Carp ();
39    
40 root 1.79 use AnyEvent ();
41     use Guard ();
42 root 1.1
43     use AnyEvent::MP::Node;
44     use AnyEvent::MP::Transport;
45    
46     use base "Exporter";
47    
48 root 1.106 # for re-export in AnyEvent::MP and Coro::MP
49     our @EXPORT_API = qw(
50     NODE $NODE
51     configure
52     node_of port_is_local
53     snd kil
54     db_set db_del
55     db_mon db_family db_keys db_values
56     );
57    
58     our @EXPORT_OK = (
59     # these are internal
60     qw(
61     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
62     add_node load_func
63     ),
64     @EXPORT_API,
65 root 1.71 );
66    
67 root 1.1 our @EXPORT = qw(
68 root 1.106 snd_to_func snd_on eval_on
69     port_is_local
70 root 1.46 up_nodes mon_nodes node_is_up
71 root 1.1 );
72    
73 root 1.6 sub load_func($) {
74     my $func = $_[0];
75    
76     unless (defined &$func) {
77     my $pkg = $func;
78     do {
79     $pkg =~ s/::[^:]+$//
80 root 1.63 or return sub { die "unable to resolve function '$func'" };
81 root 1.60
82     local $@;
83 root 1.61 unless (eval "require $pkg; 1") {
84     my $error = $@;
85     $error =~ /^Can't locate .*.pm in \@INC \(/
86     or return sub { die $error };
87     }
88 root 1.6 } until defined &$func;
89     }
90    
91     \&$func
92     }
93    
94 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
95    
96 root 1.1 sub nonce($) {
97 root 1.78 join "", map chr rand 256, 1 .. $_[0]
98 root 1.1 }
99    
100 root 1.78 sub nonce62($) {
101     join "", map $alnum[rand 62], 1 .. $_[0]
102 root 1.1 }
103    
104 root 1.20 our $CONFIG; # this node's configuration
105 root 1.104 our $SECURE;
106 root 1.21
107 root 1.64 our $RUNIQ; # remote uniq value
108     our $UNIQ; # per-process/node unique cookie
109     our $NODE;
110     our $ID = "a";
111 root 1.1
112     our %NODE; # node id to transport mapping, or "undef", for local node
113     our (%PORT, %PORT_DATA); # local ports
114    
115 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
116 root 1.1 our %LMON; # monitored _local_ ports
117    
118 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
119 root 1.85 our %BINDS;
120     our $BINDS; # our listeners, as arrayref
121 root 1.1
122 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
123 root 1.1
124 root 1.106 # initialise names for non-networked operation
125     {
126 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
127 root 1.106 my $now = AE::now;
128     $UNIQ =
129     (join "",
130     map $alnum[$_],
131     $$ / 62 % 62,
132     $$ % 62,
133     (int $now ) % 62,
134     (int $now * 100) % 62,
135     (int $now * 10000) % 62,
136     ) . nonce62 4
137     ;
138 root 1.78
139     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
140     $RUNIQ = nonce62 10;
141     $RUNIQ =~ s/(.)$/\U$1/;
142    
143 root 1.106 $NODE = "";
144 root 1.64 }
145    
146 root 1.1 sub NODE() {
147     $NODE
148     }
149    
150     sub node_of($) {
151 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
152 root 1.1
153 root 1.21 $node
154 root 1.1 }
155    
156 root 1.17 BEGIN {
157     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
158     ? sub () { 1 }
159     : sub () { 0 };
160     }
161 root 1.1
162 root 1.42 our $DELAY_TIMER;
163     our @DELAY_QUEUE;
164    
165 root 1.97 our $delay_run = sub {
166 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
167 root 1.97 };
168 root 1.42
169     sub delay($) {
170     push @DELAY_QUEUE, shift;
171 root 1.97 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
172 root 1.42 }
173    
174 root 1.96 =item $AnyEvent::MP::Kernel::SRCNODE
175    
176     During execution of a message callback, this variable contains the node ID
177     of the origin node.
178    
179     The main use of this variable is for debugging output - there are probably
180     very few other cases where you need to know the source node ID.
181    
182     =cut
183    
184 root 1.1 sub _inject {
185 root 1.96 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
186 root 1.1 &{ $PORT{+shift} or return };
187     }
188    
189 root 1.20 # this function adds a node-ref, so you can send stuff to it
190     # it is basically the central routing component.
191 root 1.1 sub add_node {
192 root 1.107 $NODE{$_[0]} || do {
193     my ($node) = @_;
194 root 1.1
195 root 1.107 length $node
196     or Carp::croak "'undef' or the empty string are not valid node/port IDs";
197 root 1.93
198 root 1.107 # registers itself in %NODE
199     new AnyEvent::MP::Node::Remote $node
200     }
201 root 1.13 }
202    
203 root 1.1 sub snd(@) {
204 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
205 root 1.1
206 root 1.86 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
207 root 1.1
208 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
209 root 1.2 ->{send} (["$portid", @_]);
210 root 1.1 }
211    
212 root 1.17 sub port_is_local($) {
213 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
214 root 1.17
215 root 1.106 $nodeid eq $NODE
216 root 1.17 }
217    
218 root 1.18 =item snd_to_func $node, $func, @args
219 root 1.11
220 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
221 root 1.11 this function with the given arguments on that node.
222    
223 root 1.20 This function can be used to implement C<spawn>-like interfaces.
224 root 1.11
225     =cut
226    
227 root 1.18 sub snd_to_func($$;@) {
228 root 1.21 my $nodeid = shift;
229 root 1.11
230 root 1.41 # on $NODE, we artificially delay... (for spawn)
231     # this is very ugly - maybe we should simply delay ALL messages,
232     # to avoid deep recursion issues. but that's so... slow...
233 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
234     if $nodeid ne $NODE;
235    
236 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
237 root 1.11 }
238    
239 root 1.18 =item snd_on $node, @msg
240    
241     Executes C<snd> with the given C<@msg> (which must include the destination
242     port) on the given node.
243    
244     =cut
245    
246     sub snd_on($@) {
247     my $node = shift;
248     snd $node, snd => @_;
249     }
250    
251 root 1.29 =item eval_on $node, $string[, @reply]
252 root 1.18
253 root 1.29 Evaluates the given string as Perl expression on the given node. When
254     @reply is specified, then it is used to construct a reply message with
255     C<"$@"> and any results from the eval appended.
256 root 1.18
257     =cut
258    
259 root 1.29 sub eval_on($$;@) {
260 root 1.18 my $node = shift;
261     snd $node, eval => @_;
262     }
263    
264 root 1.1 sub kil(@) {
265 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
266 root 1.1
267     length $portid
268 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
269 root 1.1
270 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
271 root 1.1 ->kill ("$portid", @_);
272     }
273    
274     #############################################################################
275 root 1.6 # node monitoring and info
276 root 1.3
277 root 1.106 =item $bool = node_is_up $nodeid
278 root 1.13
279     Returns true if the given node is "up", that is, the kernel thinks it has
280     a working connection to it.
281    
282 root 1.107 More precisely, if the node is up, returns C<1>. If the node is currently
283     connecting or otherwise known but not connected, returns C<0>. If nothing
284     is known about the node, returns C<undef>.
285 root 1.13
286     =cut
287    
288     sub node_is_up($) {
289 root 1.107 ($_[0] eq $NODE) || ($NODE{$_[0]} or return)->{transport}
290 root 1.13 ? 1 : 0
291     }
292    
293 root 1.106 =item @nodes = up_nodes
294 root 1.3
295 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
296     the node itself).
297 root 1.3
298     =cut
299    
300 root 1.49 sub up_nodes() {
301 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
302 root 1.3 }
303    
304 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
305 root 1.3
306 root 1.27 Registers a callback that is called each time a node goes up (a connection
307     is established) or down (the connection is lost).
308 root 1.3
309     Node up messages can only be followed by node down messages for the same
310     node, and vice versa.
311    
312 root 1.71 Note that monitoring a node is usually better done by monitoring its node
313 root 1.27 port. This function is mainly of interest to modules that are concerned
314     about the network topology and low-level connection handling.
315    
316     Callbacks I<must not> block and I<should not> send any messages.
317    
318     The function returns an optional guard which can be used to unregister
319 root 1.3 the monitoring callback again.
320    
321 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
322     or go up (and down).
323    
324     newnode $_, 1 for up_nodes;
325     mon_nodes \&newnode;
326    
327 root 1.3 =cut
328    
329     our %MON_NODES;
330    
331     sub mon_nodes($) {
332     my ($cb) = @_;
333    
334     $MON_NODES{$cb+0} = $cb;
335    
336 root 1.90 defined wantarray
337     and Guard::guard { delete $MON_NODES{$cb+0} }
338 root 1.3 }
339    
340     sub _inject_nodeevent($$;@) {
341 root 1.16 my ($node, $up, @reason) = @_;
342 root 1.3
343 root 1.107 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
344    
345 root 1.3 for my $cb (values %MON_NODES) {
346 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
347 root 1.98 or AE::log die => $@;
348 root 1.3 }
349     }
350    
351     #############################################################################
352 root 1.1 # self node code
353    
354 root 1.67 sub _kill {
355     my $port = shift;
356    
357     delete $PORT{$port}
358     or return; # killing nonexistent ports is O.K.
359     delete $PORT_DATA{$port};
360    
361     my $mon = delete $LMON{$port}
362     or !@_
363 root 1.98 or AE::log die => "unmonitored local port $port died with reason: @_";
364 root 1.67
365     $_->(@_) for values %$mon;
366     }
367    
368     sub _monitor {
369     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
370     unless exists $PORT{$_[1]};
371    
372     $LMON{$_[1]}{$_[2]+0} = $_[2];
373     }
374    
375     sub _unmonitor {
376 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
377     if exists $LMON{$_[1]};
378 root 1.67 }
379    
380 root 1.83 sub _secure_check {
381 root 1.104 $SECURE
382 root 1.105 and die "remote execution not allowed\n";
383 root 1.83 }
384    
385 root 1.107 our %NODE_REQ;
386    
387     %NODE_REQ = (
388     # "mproto" - monitoring protocol
389 root 1.1
390     # monitoring
391 root 1.65 mon0 => sub { # stop monitoring a port for another node
392 root 1.1 my $portid = shift;
393 root 1.96 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
394 root 1.1 },
395 root 1.65 mon1 => sub { # start monitoring a port for another node
396 root 1.1 my $portid = shift;
397 root 1.96 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
398 root 1.67 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
399 root 1.58 delete $node->{rmon}{$portid};
400 root 1.65 $node->send (["", kil0 => $portid, @_])
401 root 1.59 if $node && $node->{transport};
402 root 1.67 };
403 root 1.1 },
404 root 1.65 # another node has killed a monitored port
405     kil0 => sub {
406 root 1.96 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
407 root 1.1 or return;
408    
409     $_->(@_) for @$cbs;
410     },
411 root 1.107 # another node wants to kill a local port
412     kil1 => \&_kill,
413 root 1.1
414 root 1.18 # "public" services - not actually public
415 root 1.1
416     # relay message to another node / generic echo
417 root 1.88 snd => sub {
418     &snd
419 root 1.1 },
420 root 1.107 # ask if a node supports the given request, only works for fixed tags
421     can => sub {
422     my $method = shift;
423     snd @_, exists $NODE_REQ{$method};
424     },
425 root 1.1
426 root 1.30 # random utilities
427 root 1.1 eval => sub {
428 root 1.83 &_secure_check;
429 root 1.50 my @res = do { package main; eval shift };
430 root 1.1 snd @_, "$@", @res if @_;
431     },
432     time => sub {
433 root 1.76 snd @_, AE::now;
434 root 1.1 },
435     devnull => sub {
436     #
437     },
438 root 1.15 "" => sub {
439 root 1.27 # empty messages are keepalives or similar devnull-applications
440 root 1.15 },
441 root 1.1 );
442    
443 root 1.104 # the node port
444 root 1.107 new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE
445    
446 root 1.1 $PORT{""} = sub {
447     my $tag = shift;
448 root 1.83 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
449 root 1.98 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
450 root 1.1 };
451    
452 root 1.107 our $MPROTO = 1;
453 root 1.84
454     # tell everybody who connects our nproto
455     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
456 root 1.107 $_[0]{local_greeting}{mproto} = $MPROTO;
457 root 1.84 };
458    
459 root 1.69 #############################################################################
460 root 1.71 # seed management, try to keep connections to all seeds at all times
461 root 1.69
462 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
463     our %NODE_SEED; # map node ID to seed ID
464 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
465     our $SEED_WATCHER;
466 root 1.71 our $SEED_RETRY;
467 root 1.69
468     sub seed_connect {
469     my ($seed) = @_;
470    
471     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
472     or Carp::croak "$seed: unparsable seed address";
473    
474 root 1.98 AE::log 9 => "trying connect to seed node $seed.";
475 root 1.69
476 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
477     $host, $port,
478     on_greeted => sub {
479     # called after receiving remote greeting, learn remote node name
480    
481     # we rely on untrusted data here (the remote node name) this is
482     # hopefully ok, as this can at most be used for DOSing, which is easy
483     # when you can do MITM anyway.
484    
485     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
486     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
487 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
488 root 1.71 delete $SEED_NODE{$seed};
489     } else {
490     $SEED_NODE{$seed} = $_[0]{remote_node};
491     $NODE_SEED{$_[0]{remote_node}} = $seed;
492 root 1.93 # also start global service, if not running
493     # we need to check here in addition to the mon_nodes below
494     # because we might only learn late that a node is a seed
495     # and then we might already be connected
496     snd $_[0]{remote_node}, "g_slave"
497     unless $_[0]{remote_greeting}{global};
498 root 1.71 }
499     },
500 root 1.93 sub {
501 root 1.71 delete $SEED_CONNECT{$seed};
502     }
503 root 1.69 ;
504     }
505    
506     sub seed_all {
507 root 1.93 my @seeds = grep
508     !exists $SEED_CONNECT{$_}
509     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
510     keys %SEED_NODE;
511 root 1.69
512     if (@seeds) {
513 root 1.70 # start connection attempt for every seed we are not connected to yet
514 root 1.69 seed_connect $_
515     for @seeds;
516 root 1.71
517     $SEED_RETRY = $SEED_RETRY * 2 + rand;
518     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
519     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
520    
521     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
522    
523 root 1.69 } else {
524 root 1.71 # all seeds connected or connecting, no need to restart timer
525 root 1.69 undef $SEED_WATCHER;
526     }
527     }
528    
529     sub seed_again {
530 root 1.71 $SEED_RETRY = 1;
531     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
532 root 1.69 }
533    
534     # sets new seed list, starts connecting
535     sub set_seeds(@) {
536     %SEED_NODE = ();
537 root 1.71 %NODE_SEED = ();
538     %SEED_CONNECT = ();
539    
540 root 1.69 @SEED_NODE{@_} = ();
541    
542 root 1.71 seed_all;
543     }
544    
545     mon_nodes sub {
546 root 1.93 return unless exists $NODE_SEED{$_[0]};
547    
548     if ($_[1]) {
549     # each time a connection to a seed node goes up, make
550     # sure it runs the global service.
551     snd $_[0], "g_slave"
552     unless $NODE{$_[0]}{transport}{remote_greeting}{global};
553     } else {
554     # if we lost the connection to a seed node, make sure we are seeding
555     seed_again;
556     }
557 root 1.71 };
558    
559     #############################################################################
560 root 1.107 # keepalive code - used to kepe conenctions to certain nodes alive
561     # only used by global code atm., but ought to be exposed somehow.
562    
563     our $KEEPALIVE_RETRY;
564     our $KEEPALIVE_WATCHER;
565     our %KEEPALIVE; # we want to keep these nodes alive
566     our %KEEPALIVE_DOWN; # nodes that are down currently
567    
568     sub keepalive_all {
569     AE::log 9 => "keepalive: trying to establish connections with: "
570     . (join " ", keys %KEEPALIVE_DOWN)
571     . ".";
572    
573     (add_node $_)->connect
574     for keys %KEEPALIVE_DOWN;
575    
576     $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2 + rand;
577     $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
578     if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
579    
580     $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
581     }
582    
583     sub keepalive_again {
584     $KEEPALIVE_RETRY = 1;
585     $KEEPALIVE_WATCHER ||= AE::timer 1, 0, \&keepalive_all;
586     }
587    
588     sub keepalive_add {
589     return if $KEEPALIVE{$_[0]}++;
590    
591     return if node_is_up $_[0];
592     undef $KEEPALIVE_DOWN{$_[0]};
593     keepalive_again;
594     }
595    
596     sub keepalive_del {
597     return if --$KEEPALIVE{$_[0]};
598    
599     delete $KEEPALIVE {$_[0]};
600     delete $KEEPALIVE_DOWN{$_[0]};
601    
602     undef $KEEPALIVE_WATCHER
603     unless %KEEPALIVE_DOWN;
604     }
605    
606     mon_nodes sub {
607     return unless exists $KEEPALIVE{$_[0]};
608    
609     if ($_[1]) {
610     delete $KEEPALIVE_DOWN{$_[0]};
611    
612     undef $KEEPALIVE_WATCHER
613     unless %KEEPALIVE_DOWN;
614     } else {
615     # lost the conenction, try to connect again
616     undef $KEEPALIVE_DOWN{$_[0]};
617     keepalive_again;
618     }
619     };
620    
621     #############################################################################
622 root 1.71 # talk with/to global nodes
623    
624 root 1.72 # protocol messages:
625     #
626 root 1.73 # sent by all slave nodes (slave to master)
627     # g_slave database - make other global node master of the sender
628 root 1.72 #
629 root 1.73 # sent by any node to global nodes
630     # g_set database - set whole database
631 root 1.89 # g_upd family set del - update single family
632 root 1.73 # g_del family key - delete key from database
633     # g_get family key reply... - send reply with data
634     #
635     # send by global nodes
636     # g_global - node became global, similar to global=1 greeting
637     #
638     # database families
639     # "'l" -> node -> listeners
640     # "'g" -> node -> undef
641     # ...
642 root 1.72 #
643    
644 root 1.73 # used on all nodes:
645 root 1.88 our $MASTER; # the global node we bind ourselves to
646 root 1.78 our $MASTER_MON;
647     our %LOCAL_DB; # this node database
648    
649     our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
650 root 1.71
651 root 1.84 our $GPROTO = 1;
652    
653     # tell everybody who connects our nproto
654     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
655     $_[0]{local_greeting}{gproto} = $GPROTO;
656     };
657    
658 root 1.73 #############################################################################
659     # master selection
660    
661     # master requests
662     our %GLOBAL_REQ; # $id => \@req
663 root 1.71
664 root 1.74 sub global_req_add {
665 root 1.80 my ($id, $req) = @_;
666 root 1.71
667 root 1.74 return if exists $GLOBAL_REQ{$id};
668    
669 root 1.80 $GLOBAL_REQ{$id} = $req;
670 root 1.71
671 root 1.80 snd $MASTER, @$req
672 root 1.73 if $MASTER;
673 root 1.74 }
674 root 1.71
675 root 1.74 sub global_req_del {
676     delete $GLOBAL_REQ{$_[0]};
677     }
678    
679 root 1.88 #################################
680     # master rpc
681    
682     our %GLOBAL_RES;
683     our $GLOBAL_RES_ID = "a";
684    
685     sub global_call {
686     my $id = ++$GLOBAL_RES_ID;
687     $GLOBAL_RES{$id} = pop;
688     global_req_add $id, [@_, $id];
689     }
690    
691     $NODE_REQ{g_reply} = sub {
692     my $id = shift;
693     global_req_del $id;
694     my $cb = delete $GLOBAL_RES{$id}
695     or return;
696     &$cb
697     };
698    
699     #################################
700    
701 root 1.74 sub g_find {
702 root 1.80 global_req_add "g_find $_[0]", [g_find => $_[0]];
703 root 1.73 }
704 root 1.71
705 root 1.73 # reply for g_find started in Node.pm
706 root 1.79 $NODE_REQ{g_found} = sub {
707 root 1.74 global_req_del "g_find $_[0]";
708    
709 root 1.73 my $node = $NODE{$_[0]} or return;
710 root 1.71
711 root 1.79 $node->connect_to ($_[1]);
712 root 1.71 };
713    
714 root 1.73 sub master_set {
715     $MASTER = $_[0];
716 root 1.71
717 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
718 root 1.71
719 root 1.73 # (re-)send queued requests
720     snd $MASTER, @$_
721     for values %GLOBAL_REQ;
722     }
723 root 1.71
724 root 1.72 sub master_search {
725 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
726     for (keys %NODE_SEED) {
727 root 1.72 if (node_is_up $_) {
728     master_set $_;
729     return;
730 root 1.71 }
731 root 1.72 }
732 root 1.71
733 root 1.78 $MASTER_MON = mon_nodes sub {
734 root 1.72 return unless $_[1]; # we are only interested in node-ups
735     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
736 root 1.71
737 root 1.72 master_set $_[0];
738 root 1.71
739 root 1.78 $MASTER_MON = mon_nodes sub {
740 root 1.72 if ($_[0] eq $MASTER && !$_[1]) {
741     undef $MASTER;
742     master_search ();
743     }
744 root 1.71 };
745 root 1.72 };
746 root 1.71 }
747    
748 root 1.73 # other node wants to make us the master
749 root 1.79 $NODE_REQ{g_slave} = sub {
750 root 1.73 my ($db) = @_;
751    
752 root 1.80 # load global module and redo the request
753 root 1.73 require AnyEvent::MP::Global;
754 root 1.80 &{ $NODE_REQ{g_slave} }
755 root 1.71 };
756    
757 root 1.73 #############################################################################
758 root 1.79 # local database operations
759 root 1.71
760 root 1.79 # local database management
761 root 1.88
762 root 1.87 sub db_set($$;$) {
763 root 1.97 my ($family, $subkey) = @_;
764    
765 root 1.89 # if (ref $_[1]) {
766     # # bulk
767     # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
768     # $LOCAL_DB{$_[0]} = $_[1];
769     # snd $MASTER, g_upd => $_[0] => $_[1], \@del
770     # if defined $MASTER;
771     # } else {
772     # single-key
773 root 1.97 $LOCAL_DB{$family}{$subkey} = $_[2];
774     snd $MASTER, g_upd => $family => { $subkey => $_[2] }
775 root 1.89 if defined $MASTER;
776     # }
777 root 1.97
778     defined wantarray
779     and Guard::guard { db_del $family => $subkey }
780 root 1.79 }
781    
782 root 1.89 sub db_del($@) {
783     my $family = shift;
784    
785     delete @{ $LOCAL_DB{$family} }{@_};
786     snd $MASTER, g_upd => $family => undef, \@_
787 root 1.79 if defined $MASTER;
788     }
789    
790 root 1.88 # database query
791    
792     sub db_family {
793     my ($family, $cb) = @_;
794     global_call g_db_family => $family, $cb;
795     }
796    
797     sub db_keys {
798     my ($family, $cb) = @_;
799     global_call g_db_keys => $family, $cb;
800     }
801    
802     sub db_values {
803     my ($family, $cb) = @_;
804     global_call g_db_values => $family, $cb;
805 root 1.80 }
806    
807 root 1.88 # database monitoring
808 root 1.80
809 root 1.81 our %LOCAL_MON; # f, reply
810     our %MON_DB; # f, k, value
811 root 1.80
812 root 1.81 sub db_mon($@) {
813 root 1.84 my ($family, $cb) = @_;
814 root 1.81
815 root 1.84 if (my $db = $MON_DB{$family}) {
816 root 1.89 # we already monitor, so create a "dummy" change event
817     # this is postponed, which might be too late (we could process
818     # change events), so disable the callback at first
819     $LOCAL_MON{$family}{$cb+0} = sub { };
820     AE::postpone {
821     return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
822    
823     # set actual callback
824     $LOCAL_MON{$family}{$cb+0} = $cb;
825     $cb->($db, [keys %$db]);
826     };
827 root 1.81 } else {
828     # new monitor, request chg1 from upstream
829 root 1.89 $LOCAL_MON{$family}{$cb+0} = $cb;
830 root 1.81 global_req_add "mon1 $family" => [g_mon1 => $family];
831     $MON_DB{$family} = {};
832     }
833    
834 root 1.90 defined wantarray
835     and Guard::guard {
836     my $mon = $LOCAL_MON{$family};
837     delete $mon->{$cb+0};
838    
839     unless (%$mon) {
840     global_req_del "mon1 $family";
841    
842     # no global_req, because we don't care if we are not connected
843     snd $MASTER, g_mon0 => $family
844     if $MASTER;
845 root 1.80
846 root 1.90 delete $LOCAL_MON{$family};
847     delete $MON_DB{$family};
848     }
849 root 1.80 }
850     }
851    
852 root 1.82 # full update
853 root 1.80 $NODE_REQ{g_chg1} = sub {
854 root 1.96 return unless $SRCNODE eq $MASTER;
855 root 1.84 my ($f, $ndb) = @_;
856    
857     my $db = $MON_DB{$f};
858 root 1.89 my (@a, @c, @d);
859 root 1.81
860 root 1.82 # add or replace keys
861 root 1.84 while (my ($k, $v) = each %$ndb) {
862 root 1.89 exists $db->{$k}
863     ? push @c, $k
864     : push @a, $k;
865 root 1.84 $db->{$k} = $v;
866 root 1.82 }
867 root 1.81
868 root 1.82 # delete keys that are no longer present
869 root 1.84 for (grep !exists $ndb->{$_}, keys %$db) {
870     delete $db->{$_};
871 root 1.89 push @d, $_;
872 root 1.81 }
873 root 1.84
874 root 1.89 $_->($db, \@a, \@c, \@d)
875 root 1.84 for values %{ $LOCAL_MON{$_[0]} };
876 root 1.80 };
877    
878 root 1.82 # incremental update
879 root 1.84 $NODE_REQ{g_chg2} = sub {
880 root 1.96 return unless $SRCNODE eq $MASTER;
881 root 1.89 my ($family, $set, $del) = @_;
882    
883     my $db = $MON_DB{$family};
884 root 1.84
885 root 1.89 my (@a, @c);
886 root 1.84
887 root 1.89 while (my ($k, $v) = each %$set) {
888     exists $db->{$k}
889     ? push @c, $k
890     : push @a, $k;
891     $db->{$k} = $v;
892     }
893    
894     delete @$db{@$del};
895    
896     $_->($db, \@a, \@c, $del)
897     for values %{ $LOCAL_MON{$family} };
898 root 1.84 };
899 root 1.80
900 root 1.69 #############################################################################
901     # configure
902    
903 root 1.81 sub nodename {
904 root 1.69 require POSIX;
905     (POSIX::uname ())[1]
906     }
907    
908     sub _resolve($) {
909     my ($nodeid) = @_;
910    
911     my $cv = AE::cv;
912     my @res;
913    
914     $cv->begin (sub {
915     my %seen;
916     my @refs;
917     for (sort { $a->[0] <=> $b->[0] } @res) {
918     push @refs, $_->[1] unless $seen{$_->[1]}++
919     }
920     shift->send (@refs);
921     });
922    
923     my $idx;
924     for my $t (split /,/, $nodeid) {
925     my $pri = ++$idx;
926    
927 root 1.81 $t = length $t ? nodename . ":$t" : nodename
928 root 1.69 if $t =~ /^\d*$/;
929    
930     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
931     or Carp::croak "$t: unparsable transport descriptor";
932    
933     $port = "0" if $port eq "*";
934    
935     if ($host eq "*") {
936     $cv->begin;
937    
938 root 1.103 my $get_addr = sub {
939     my @addr;
940    
941     require Net::Interface;
942    
943     # Net::Interface hangs on some systems, so hope for the best
944     local $SIG{ALRM} = 'DEFAULT';
945     alarm 2;
946    
947     for my $if (Net::Interface->interfaces) {
948     # we statically lower-prioritise ipv6 here, TODO :()
949     for $_ ($if->address (Net::Interface::AF_INET ())) {
950     next if /^\x7f/; # skip localhost etc.
951     push @addr, $_;
952 root 1.69 }
953 root 1.103 for ($if->address (Net::Interface::AF_INET6 ())) {
954     #next if $if->scope ($_) <= 2;
955     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
956     push @addr, $_;
957 root 1.69 }
958     }
959 root 1.103
960     alarm 0;
961    
962     @addr
963     };
964    
965     my @addr;
966    
967     if (AnyEvent::WIN32) {
968     @addr = $get_addr->();
969     } else {
970     # use a child process, as Net::Interface is big, and we need it only once.
971    
972     pipe my $r, my $w
973     or die "pipe: $!";
974    
975     if (fork eq 0) {
976     close $r;
977     syswrite $w, pack "(C/a*)*", $get_addr->();
978     require POSIX;
979     POSIX::_exit (0);
980     } else {
981     close $w;
982    
983     my $addr;
984    
985     1 while sysread $r, $addr, 1024, length $addr;
986    
987     @addr = unpack "(C/a*)*", $addr;
988     }
989     }
990    
991     for my $ip (@addr) {
992     push @res, [
993     $pri += 1e-5,
994     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
995     ];
996     }
997     $cv->end;
998 root 1.69 } else {
999     $cv->begin;
1000     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
1001     for (@_) {
1002     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
1003     push @res, [
1004     $pri += 1e-5,
1005     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
1006     ];
1007     }
1008     $cv->end;
1009     };
1010     }
1011     }
1012    
1013     $cv->end;
1014    
1015     $cv
1016     }
1017    
1018     sub configure(@) {
1019     unshift @_, "profile" if @_ & 1;
1020     my (%kv) = @_;
1021    
1022     my $profile = delete $kv{profile};
1023    
1024 root 1.81 $profile = nodename
1025 root 1.69 unless defined $profile;
1026    
1027     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
1028    
1029 root 1.104 $SECURE = $CONFIG->{secure};
1030 root 1.83
1031 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
1032 root 1.69
1033     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
1034    
1035 root 1.106 my $node_obj = delete $NODE{$NODE}; # we do not support doing stuff before configure
1036    
1037 root 1.72 $NODE = $node;
1038 root 1.77
1039 root 1.81 $NODE =~ s/%n/nodename/ge;
1040    
1041     if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
1042 root 1.77 # nodes with randomised node names do not need randomised port names
1043     $UNIQ = "";
1044     }
1045 root 1.69
1046 root 1.106 $node_obj->{id} = $NODE;
1047     $NODE{$NODE} = $node_obj;
1048 root 1.69
1049     my $seeds = $CONFIG->{seeds};
1050     my $binds = $CONFIG->{binds};
1051    
1052     $binds ||= ["*"];
1053    
1054 root 1.98 AE::log 8 => "node $NODE starting up.";
1055 root 1.69
1056 root 1.85 $BINDS = [];
1057     %BINDS = ();
1058 root 1.69
1059     for (map _resolve $_, @$binds) {
1060     for my $bind ($_->recv) {
1061     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1062     or Carp::croak "$bind: unparsable local bind address";
1063    
1064     my $listener = AnyEvent::MP::Transport::mp_server
1065     $host,
1066     $port,
1067     prepare => sub {
1068     my (undef, $host, $port) = @_;
1069     $bind = AnyEvent::Socket::format_hostport $host, $port;
1070     0
1071     },
1072     ;
1073 root 1.85 $BINDS{$bind} = $listener;
1074     push @$BINDS, $bind;
1075 root 1.69 }
1076     }
1077    
1078 root 1.85 db_set "'l" => $NODE => $BINDS;
1079 root 1.73
1080 root 1.98 AE::log 8 => "node listens on [@$BINDS].";
1081 root 1.69
1082     # connect to all seednodes
1083     set_seeds map $_->recv, map _resolve $_, @$seeds;
1084    
1085 root 1.73 master_search;
1086    
1087 root 1.103 # save gobs of memory
1088     undef &_resolve;
1089     *configure = sub (@){ };
1090    
1091 root 1.69 for (@{ $CONFIG->{services} }) {
1092     if (ref) {
1093     my ($func, @args) = @$_;
1094     (load_func $func)->(@args);
1095     } elsif (s/::$//) {
1096     eval "require $_";
1097     die $@ if $@;
1098     } else {
1099     (load_func $_)->();
1100     }
1101     }
1102 root 1.102
1103     eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
1104     die "$@" if $@;
1105 root 1.69 }
1106    
1107 root 1.1 =back
1108    
1109 root 1.101 =head1 LOGGING
1110    
1111     AnyEvent::MP::Kernel logs high-level information about the current node,
1112     when nodes go up and down, and most runtime errors. It also logs some
1113     debugging and trace messages about network maintainance, such as seed
1114     connections and global node management.
1115    
1116 root 1.1 =head1 SEE ALSO
1117    
1118     L<AnyEvent::MP>.
1119    
1120     =head1 AUTHOR
1121    
1122     Marc Lehmann <schmorp@schmorp.de>
1123     http://home.schmorp.de/
1124    
1125     =cut
1126    
1127     1
1128