ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP/Kernel.pm
Revision: 1.109
Committed: Sat Mar 24 13:05:40 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.108: +7 -3 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9 root 1.106 $AnyEvent::MP::Kernel::SRCNODE # contains msg origin node id, for debugging
10    
11     snd_to_func $node, $func, @args # send msg to function
12     snd_on $node, @msg # snd message again (relay)
13     eval_on $node, $string[, @reply] # execute perl code on another node
14    
15     node_is_up $nodeid # return true if a node is connected
16     @nodes = up_nodes # return a list of all connected nodes
17     $guard = mon_nodes $callback->($node, $is_up, @reason) # connections up/downs
18    
19 root 1.1 =head1 DESCRIPTION
20    
21 root 1.106 This module implements most of the inner workings of AnyEvent::MP. It
22     offers mostly lower-level functions that deal with network connectivity
23     and special requests.
24    
25     You normally interface with AnyEvent::MP through a higher level interface
26     such as L<AnyEvent::MP> and L<Coro::MP>, although there is nothing wrong
27     with using the functions from this module.
28 root 1.3
29     =head1 GLOBALS AND FUNCTIONS
30 root 1.1
31     =over 4
32    
33     =cut
34    
35     package AnyEvent::MP::Kernel;
36    
37     use common::sense;
38     use Carp ();
39    
40 root 1.79 use AnyEvent ();
41     use Guard ();
42 root 1.1
43     use AnyEvent::MP::Node;
44     use AnyEvent::MP::Transport;
45    
46     use base "Exporter";
47    
48 root 1.106 # for re-export in AnyEvent::MP and Coro::MP
49     our @EXPORT_API = qw(
50     NODE $NODE
51     configure
52     node_of port_is_local
53     snd kil
54     db_set db_del
55     db_mon db_family db_keys db_values
56     );
57    
58     our @EXPORT_OK = (
59     # these are internal
60     qw(
61     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
62     add_node load_func
63     ),
64     @EXPORT_API,
65 root 1.71 );
66    
67 root 1.1 our @EXPORT = qw(
68 root 1.106 snd_to_func snd_on eval_on
69     port_is_local
70 root 1.46 up_nodes mon_nodes node_is_up
71 root 1.1 );
72    
73 root 1.6 sub load_func($) {
74     my $func = $_[0];
75    
76     unless (defined &$func) {
77     my $pkg = $func;
78     do {
79     $pkg =~ s/::[^:]+$//
80 root 1.63 or return sub { die "unable to resolve function '$func'" };
81 root 1.60
82     local $@;
83 root 1.61 unless (eval "require $pkg; 1") {
84     my $error = $@;
85     $error =~ /^Can't locate .*.pm in \@INC \(/
86     or return sub { die $error };
87     }
88 root 1.6 } until defined &$func;
89     }
90    
91     \&$func
92     }
93    
94 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
95    
96 root 1.1 sub nonce($) {
97 root 1.78 join "", map chr rand 256, 1 .. $_[0]
98 root 1.1 }
99    
100 root 1.78 sub nonce62($) {
101     join "", map $alnum[rand 62], 1 .. $_[0]
102 root 1.1 }
103    
104 root 1.20 our $CONFIG; # this node's configuration
105 root 1.104 our $SECURE;
106 root 1.21
107 root 1.64 our $RUNIQ; # remote uniq value
108     our $UNIQ; # per-process/node unique cookie
109     our $NODE;
110     our $ID = "a";
111 root 1.1
112     our %NODE; # node id to transport mapping, or "undef", for local node
113     our (%PORT, %PORT_DATA); # local ports
114    
115 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
116 root 1.1 our %LMON; # monitored _local_ ports
117    
118 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
119 root 1.85 our %BINDS;
120     our $BINDS; # our listeners, as arrayref
121 root 1.1
122 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
123 root 1.1
124 root 1.106 # initialise names for non-networked operation
125     {
126 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
127 root 1.106 my $now = AE::now;
128     $UNIQ =
129     (join "",
130     map $alnum[$_],
131     $$ / 62 % 62,
132     $$ % 62,
133     (int $now ) % 62,
134     (int $now * 100) % 62,
135     (int $now * 10000) % 62,
136     ) . nonce62 4
137     ;
138 root 1.78
139     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
140     $RUNIQ = nonce62 10;
141     $RUNIQ =~ s/(.)$/\U$1/;
142    
143 root 1.106 $NODE = "";
144 root 1.64 }
145    
146 root 1.1 sub NODE() {
147     $NODE
148     }
149    
150     sub node_of($) {
151 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
152 root 1.1
153 root 1.21 $node
154 root 1.1 }
155    
156 root 1.17 BEGIN {
157     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
158     ? sub () { 1 }
159     : sub () { 0 };
160     }
161 root 1.1
162 root 1.42 our $DELAY_TIMER;
163     our @DELAY_QUEUE;
164    
165 root 1.97 our $delay_run = sub {
166 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
167 root 1.97 };
168 root 1.42
169     sub delay($) {
170     push @DELAY_QUEUE, shift;
171 root 1.97 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
172 root 1.42 }
173    
174 root 1.96 =item $AnyEvent::MP::Kernel::SRCNODE
175    
176     During execution of a message callback, this variable contains the node ID
177     of the origin node.
178    
179     The main use of this variable is for debugging output - there are probably
180     very few other cases where you need to know the source node ID.
181    
182     =cut
183    
184 root 1.1 sub _inject {
185 root 1.96 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
186 root 1.1 &{ $PORT{+shift} or return };
187     }
188    
189 root 1.20 # this function adds a node-ref, so you can send stuff to it
190     # it is basically the central routing component.
191 root 1.1 sub add_node {
192 root 1.107 $NODE{$_[0]} || do {
193     my ($node) = @_;
194 root 1.1
195 root 1.107 length $node
196     or Carp::croak "'undef' or the empty string are not valid node/port IDs";
197 root 1.93
198 root 1.107 # registers itself in %NODE
199     new AnyEvent::MP::Node::Remote $node
200     }
201 root 1.13 }
202    
203 root 1.1 sub snd(@) {
204 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
205 root 1.1
206 root 1.86 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
207 root 1.1
208 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
209 root 1.2 ->{send} (["$portid", @_]);
210 root 1.1 }
211    
212 root 1.17 sub port_is_local($) {
213 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
214 root 1.17
215 root 1.106 $nodeid eq $NODE
216 root 1.17 }
217    
218 root 1.18 =item snd_to_func $node, $func, @args
219 root 1.11
220 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
221 root 1.11 this function with the given arguments on that node.
222    
223 root 1.20 This function can be used to implement C<spawn>-like interfaces.
224 root 1.11
225     =cut
226    
227 root 1.18 sub snd_to_func($$;@) {
228 root 1.21 my $nodeid = shift;
229 root 1.11
230 root 1.41 # on $NODE, we artificially delay... (for spawn)
231     # this is very ugly - maybe we should simply delay ALL messages,
232     # to avoid deep recursion issues. but that's so... slow...
233 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
234     if $nodeid ne $NODE;
235    
236 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
237 root 1.11 }
238    
239 root 1.18 =item snd_on $node, @msg
240    
241     Executes C<snd> with the given C<@msg> (which must include the destination
242     port) on the given node.
243    
244     =cut
245    
246     sub snd_on($@) {
247     my $node = shift;
248     snd $node, snd => @_;
249     }
250    
251 root 1.29 =item eval_on $node, $string[, @reply]
252 root 1.18
253 root 1.29 Evaluates the given string as Perl expression on the given node. When
254     @reply is specified, then it is used to construct a reply message with
255     C<"$@"> and any results from the eval appended.
256 root 1.18
257     =cut
258    
259 root 1.29 sub eval_on($$;@) {
260 root 1.18 my $node = shift;
261     snd $node, eval => @_;
262     }
263    
264 root 1.1 sub kil(@) {
265 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
266 root 1.1
267     length $portid
268 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
269 root 1.1
270 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
271 root 1.1 ->kill ("$portid", @_);
272     }
273    
274     #############################################################################
275 root 1.6 # node monitoring and info
276 root 1.3
277 root 1.106 =item $bool = node_is_up $nodeid
278 root 1.13
279     Returns true if the given node is "up", that is, the kernel thinks it has
280     a working connection to it.
281    
282 root 1.107 More precisely, if the node is up, returns C<1>. If the node is currently
283     connecting or otherwise known but not connected, returns C<0>. If nothing
284     is known about the node, returns C<undef>.
285 root 1.13
286     =cut
287    
288     sub node_is_up($) {
289 root 1.107 ($_[0] eq $NODE) || ($NODE{$_[0]} or return)->{transport}
290 root 1.13 ? 1 : 0
291     }
292    
293 root 1.106 =item @nodes = up_nodes
294 root 1.3
295 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
296     the node itself).
297 root 1.3
298     =cut
299    
300 root 1.49 sub up_nodes() {
301 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
302 root 1.3 }
303    
304 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
305 root 1.3
306 root 1.27 Registers a callback that is called each time a node goes up (a connection
307     is established) or down (the connection is lost).
308 root 1.3
309     Node up messages can only be followed by node down messages for the same
310     node, and vice versa.
311    
312 root 1.71 Note that monitoring a node is usually better done by monitoring its node
313 root 1.27 port. This function is mainly of interest to modules that are concerned
314     about the network topology and low-level connection handling.
315    
316     Callbacks I<must not> block and I<should not> send any messages.
317    
318     The function returns an optional guard which can be used to unregister
319 root 1.3 the monitoring callback again.
320    
321 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
322     or go up (and down).
323    
324     newnode $_, 1 for up_nodes;
325     mon_nodes \&newnode;
326    
327 root 1.3 =cut
328    
329     our %MON_NODES;
330    
331     sub mon_nodes($) {
332     my ($cb) = @_;
333    
334     $MON_NODES{$cb+0} = $cb;
335    
336 root 1.90 defined wantarray
337     and Guard::guard { delete $MON_NODES{$cb+0} }
338 root 1.3 }
339    
340     sub _inject_nodeevent($$;@) {
341 root 1.16 my ($node, $up, @reason) = @_;
342 root 1.3
343 root 1.107 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
344    
345 root 1.3 for my $cb (values %MON_NODES) {
346 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
347 root 1.98 or AE::log die => $@;
348 root 1.3 }
349     }
350    
351     #############################################################################
352 root 1.1 # self node code
353    
354 root 1.67 sub _kill {
355     my $port = shift;
356    
357     delete $PORT{$port}
358     or return; # killing nonexistent ports is O.K.
359     delete $PORT_DATA{$port};
360    
361     my $mon = delete $LMON{$port}
362     or !@_
363 root 1.98 or AE::log die => "unmonitored local port $port died with reason: @_";
364 root 1.67
365     $_->(@_) for values %$mon;
366     }
367    
368     sub _monitor {
369     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
370     unless exists $PORT{$_[1]};
371    
372     $LMON{$_[1]}{$_[2]+0} = $_[2];
373     }
374    
375     sub _unmonitor {
376 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
377     if exists $LMON{$_[1]};
378 root 1.67 }
379    
380 root 1.83 sub _secure_check {
381 root 1.104 $SECURE
382 root 1.105 and die "remote execution not allowed\n";
383 root 1.83 }
384    
385 root 1.107 our %NODE_REQ;
386    
387     %NODE_REQ = (
388     # "mproto" - monitoring protocol
389 root 1.1
390     # monitoring
391 root 1.65 mon0 => sub { # stop monitoring a port for another node
392 root 1.1 my $portid = shift;
393 root 1.96 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
394 root 1.1 },
395 root 1.65 mon1 => sub { # start monitoring a port for another node
396 root 1.1 my $portid = shift;
397 root 1.96 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
398 root 1.67 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
399 root 1.58 delete $node->{rmon}{$portid};
400 root 1.65 $node->send (["", kil0 => $portid, @_])
401 root 1.59 if $node && $node->{transport};
402 root 1.67 };
403 root 1.1 },
404 root 1.65 # another node has killed a monitored port
405     kil0 => sub {
406 root 1.96 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
407 root 1.1 or return;
408    
409     $_->(@_) for @$cbs;
410     },
411 root 1.107 # another node wants to kill a local port
412     kil1 => \&_kill,
413 root 1.1
414 root 1.18 # "public" services - not actually public
415 root 1.1
416     # relay message to another node / generic echo
417 root 1.88 snd => sub {
418     &snd
419 root 1.1 },
420 root 1.107 # ask if a node supports the given request, only works for fixed tags
421     can => sub {
422     my $method = shift;
423     snd @_, exists $NODE_REQ{$method};
424     },
425 root 1.1
426 root 1.30 # random utilities
427 root 1.1 eval => sub {
428 root 1.83 &_secure_check;
429 root 1.50 my @res = do { package main; eval shift };
430 root 1.1 snd @_, "$@", @res if @_;
431     },
432     time => sub {
433 root 1.76 snd @_, AE::now;
434 root 1.1 },
435     devnull => sub {
436     #
437     },
438 root 1.15 "" => sub {
439 root 1.27 # empty messages are keepalives or similar devnull-applications
440 root 1.15 },
441 root 1.1 );
442    
443 root 1.104 # the node port
444 root 1.107 new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE
445    
446 root 1.1 $PORT{""} = sub {
447     my $tag = shift;
448 root 1.83 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
449 root 1.98 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
450 root 1.1 };
451    
452 root 1.107 our $MPROTO = 1;
453 root 1.84
454     # tell everybody who connects our nproto
455     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
456 root 1.107 $_[0]{local_greeting}{mproto} = $MPROTO;
457 root 1.84 };
458    
459 root 1.69 #############################################################################
460 root 1.71 # seed management, try to keep connections to all seeds at all times
461 root 1.69
462 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
463     our %NODE_SEED; # map node ID to seed ID
464 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
465     our $SEED_WATCHER;
466 root 1.71 our $SEED_RETRY;
467 root 1.69
468     sub seed_connect {
469     my ($seed) = @_;
470    
471     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
472     or Carp::croak "$seed: unparsable seed address";
473    
474 root 1.98 AE::log 9 => "trying connect to seed node $seed.";
475 root 1.69
476 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
477     $host, $port,
478     on_greeted => sub {
479     # called after receiving remote greeting, learn remote node name
480    
481     # we rely on untrusted data here (the remote node name) this is
482     # hopefully ok, as this can at most be used for DOSing, which is easy
483     # when you can do MITM anyway.
484    
485     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
486     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
487 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
488 root 1.71 delete $SEED_NODE{$seed};
489     } else {
490     $SEED_NODE{$seed} = $_[0]{remote_node};
491     $NODE_SEED{$_[0]{remote_node}} = $seed;
492 root 1.93 # also start global service, if not running
493     # we need to check here in addition to the mon_nodes below
494     # because we might only learn late that a node is a seed
495     # and then we might already be connected
496     snd $_[0]{remote_node}, "g_slave"
497     unless $_[0]{remote_greeting}{global};
498 root 1.71 }
499     },
500 root 1.93 sub {
501 root 1.71 delete $SEED_CONNECT{$seed};
502     }
503 root 1.69 ;
504     }
505    
506     sub seed_all {
507 root 1.93 my @seeds = grep
508     !exists $SEED_CONNECT{$_}
509     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
510     keys %SEED_NODE;
511 root 1.69
512     if (@seeds) {
513 root 1.70 # start connection attempt for every seed we are not connected to yet
514 root 1.69 seed_connect $_
515     for @seeds;
516 root 1.71
517     $SEED_RETRY = $SEED_RETRY * 2 + rand;
518     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
519     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
520    
521     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
522    
523 root 1.69 } else {
524 root 1.71 # all seeds connected or connecting, no need to restart timer
525 root 1.69 undef $SEED_WATCHER;
526     }
527     }
528    
529     sub seed_again {
530 root 1.71 $SEED_RETRY = 1;
531     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
532 root 1.69 }
533    
534     # sets new seed list, starts connecting
535     sub set_seeds(@) {
536     %SEED_NODE = ();
537 root 1.71 %NODE_SEED = ();
538     %SEED_CONNECT = ();
539    
540 root 1.69 @SEED_NODE{@_} = ();
541    
542 root 1.71 seed_all;
543     }
544    
545     mon_nodes sub {
546 root 1.93 return unless exists $NODE_SEED{$_[0]};
547    
548     if ($_[1]) {
549     # each time a connection to a seed node goes up, make
550     # sure it runs the global service.
551     snd $_[0], "g_slave"
552     unless $NODE{$_[0]}{transport}{remote_greeting}{global};
553     } else {
554     # if we lost the connection to a seed node, make sure we are seeding
555     seed_again;
556     }
557 root 1.71 };
558    
559     #############################################################################
560 root 1.107 # keepalive code - used to kepe conenctions to certain nodes alive
561     # only used by global code atm., but ought to be exposed somehow.
562 root 1.109 #TODO: should probbaly be done directly by node objects
563 root 1.107
564     our $KEEPALIVE_RETRY;
565     our $KEEPALIVE_WATCHER;
566     our %KEEPALIVE; # we want to keep these nodes alive
567     our %KEEPALIVE_DOWN; # nodes that are down currently
568    
569     sub keepalive_all {
570     AE::log 9 => "keepalive: trying to establish connections with: "
571     . (join " ", keys %KEEPALIVE_DOWN)
572     . ".";
573    
574     (add_node $_)->connect
575     for keys %KEEPALIVE_DOWN;
576    
577     $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2 + rand;
578     $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
579     if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
580    
581     $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
582     }
583    
584     sub keepalive_again {
585     $KEEPALIVE_RETRY = 1;
586 root 1.108 keepalive_all;
587 root 1.107 }
588    
589     sub keepalive_add {
590     return if $KEEPALIVE{$_[0]}++;
591    
592     return if node_is_up $_[0];
593     undef $KEEPALIVE_DOWN{$_[0]};
594     keepalive_again;
595     }
596    
597     sub keepalive_del {
598     return if --$KEEPALIVE{$_[0]};
599    
600     delete $KEEPALIVE {$_[0]};
601     delete $KEEPALIVE_DOWN{$_[0]};
602    
603     undef $KEEPALIVE_WATCHER
604     unless %KEEPALIVE_DOWN;
605     }
606    
607     mon_nodes sub {
608     return unless exists $KEEPALIVE{$_[0]};
609    
610     if ($_[1]) {
611     delete $KEEPALIVE_DOWN{$_[0]};
612    
613     undef $KEEPALIVE_WATCHER
614     unless %KEEPALIVE_DOWN;
615     } else {
616     # lost the conenction, try to connect again
617     undef $KEEPALIVE_DOWN{$_[0]};
618     keepalive_again;
619     }
620     };
621    
622     #############################################################################
623 root 1.71 # talk with/to global nodes
624    
625 root 1.72 # protocol messages:
626     #
627 root 1.73 # sent by all slave nodes (slave to master)
628     # g_slave database - make other global node master of the sender
629 root 1.72 #
630 root 1.73 # sent by any node to global nodes
631     # g_set database - set whole database
632 root 1.89 # g_upd family set del - update single family
633 root 1.73 # g_del family key - delete key from database
634     # g_get family key reply... - send reply with data
635     #
636     # send by global nodes
637     # g_global - node became global, similar to global=1 greeting
638     #
639     # database families
640     # "'l" -> node -> listeners
641     # "'g" -> node -> undef
642     # ...
643 root 1.72 #
644    
645 root 1.73 # used on all nodes:
646 root 1.88 our $MASTER; # the global node we bind ourselves to
647 root 1.78 our $MASTER_MON;
648     our %LOCAL_DB; # this node database
649    
650     our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
651 root 1.71
652 root 1.84 our $GPROTO = 1;
653    
654     # tell everybody who connects our nproto
655     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
656     $_[0]{local_greeting}{gproto} = $GPROTO;
657     };
658    
659 root 1.73 #############################################################################
660     # master selection
661    
662     # master requests
663     our %GLOBAL_REQ; # $id => \@req
664 root 1.71
665 root 1.74 sub global_req_add {
666 root 1.80 my ($id, $req) = @_;
667 root 1.71
668 root 1.74 return if exists $GLOBAL_REQ{$id};
669    
670 root 1.80 $GLOBAL_REQ{$id} = $req;
671 root 1.71
672 root 1.80 snd $MASTER, @$req
673 root 1.73 if $MASTER;
674 root 1.74 }
675 root 1.71
676 root 1.74 sub global_req_del {
677     delete $GLOBAL_REQ{$_[0]};
678     }
679    
680 root 1.88 #################################
681     # master rpc
682    
683     our %GLOBAL_RES;
684     our $GLOBAL_RES_ID = "a";
685    
686     sub global_call {
687     my $id = ++$GLOBAL_RES_ID;
688     $GLOBAL_RES{$id} = pop;
689     global_req_add $id, [@_, $id];
690     }
691    
692     $NODE_REQ{g_reply} = sub {
693     my $id = shift;
694     global_req_del $id;
695     my $cb = delete $GLOBAL_RES{$id}
696     or return;
697     &$cb
698     };
699    
700     #################################
701    
702 root 1.74 sub g_find {
703 root 1.80 global_req_add "g_find $_[0]", [g_find => $_[0]];
704 root 1.73 }
705 root 1.71
706 root 1.73 # reply for g_find started in Node.pm
707 root 1.79 $NODE_REQ{g_found} = sub {
708 root 1.74 global_req_del "g_find $_[0]";
709    
710 root 1.73 my $node = $NODE{$_[0]} or return;
711 root 1.71
712 root 1.79 $node->connect_to ($_[1]);
713 root 1.71 };
714    
715 root 1.73 sub master_set {
716     $MASTER = $_[0];
717 root 1.71
718 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
719 root 1.71
720 root 1.73 # (re-)send queued requests
721     snd $MASTER, @$_
722     for values %GLOBAL_REQ;
723     }
724 root 1.71
725 root 1.72 sub master_search {
726 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
727     for (keys %NODE_SEED) {
728 root 1.72 if (node_is_up $_) {
729     master_set $_;
730     return;
731 root 1.71 }
732 root 1.72 }
733 root 1.71
734 root 1.78 $MASTER_MON = mon_nodes sub {
735 root 1.72 return unless $_[1]; # we are only interested in node-ups
736     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
737 root 1.71
738 root 1.72 master_set $_[0];
739 root 1.71
740 root 1.78 $MASTER_MON = mon_nodes sub {
741 root 1.72 if ($_[0] eq $MASTER && !$_[1]) {
742     undef $MASTER;
743     master_search ();
744     }
745 root 1.71 };
746 root 1.72 };
747 root 1.71 }
748    
749 root 1.73 # other node wants to make us the master
750 root 1.79 $NODE_REQ{g_slave} = sub {
751 root 1.73 my ($db) = @_;
752    
753 root 1.80 # load global module and redo the request
754 root 1.73 require AnyEvent::MP::Global;
755 root 1.80 &{ $NODE_REQ{g_slave} }
756 root 1.71 };
757    
758 root 1.73 #############################################################################
759 root 1.79 # local database operations
760 root 1.71
761 root 1.79 # local database management
762 root 1.88
763 root 1.87 sub db_set($$;$) {
764 root 1.97 my ($family, $subkey) = @_;
765    
766 root 1.89 # if (ref $_[1]) {
767     # # bulk
768     # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
769     # $LOCAL_DB{$_[0]} = $_[1];
770     # snd $MASTER, g_upd => $_[0] => $_[1], \@del
771     # if defined $MASTER;
772     # } else {
773     # single-key
774 root 1.97 $LOCAL_DB{$family}{$subkey} = $_[2];
775     snd $MASTER, g_upd => $family => { $subkey => $_[2] }
776 root 1.89 if defined $MASTER;
777     # }
778 root 1.97
779     defined wantarray
780     and Guard::guard { db_del $family => $subkey }
781 root 1.79 }
782    
783 root 1.89 sub db_del($@) {
784     my $family = shift;
785    
786 root 1.109 my @del = grep exists $LOCAL_DB{$family}{$_}, @_;
787    
788     return unless @del;
789    
790     delete @{ $LOCAL_DB{$family} }{@del};
791     snd $MASTER, g_upd => $family => undef, \@del
792 root 1.79 if defined $MASTER;
793     }
794    
795 root 1.88 # database query
796    
797     sub db_family {
798     my ($family, $cb) = @_;
799     global_call g_db_family => $family, $cb;
800     }
801    
802     sub db_keys {
803     my ($family, $cb) = @_;
804     global_call g_db_keys => $family, $cb;
805     }
806    
807     sub db_values {
808     my ($family, $cb) = @_;
809     global_call g_db_values => $family, $cb;
810 root 1.80 }
811    
812 root 1.88 # database monitoring
813 root 1.80
814 root 1.81 our %LOCAL_MON; # f, reply
815     our %MON_DB; # f, k, value
816 root 1.80
817 root 1.81 sub db_mon($@) {
818 root 1.84 my ($family, $cb) = @_;
819 root 1.81
820 root 1.84 if (my $db = $MON_DB{$family}) {
821 root 1.89 # we already monitor, so create a "dummy" change event
822     # this is postponed, which might be too late (we could process
823     # change events), so disable the callback at first
824     $LOCAL_MON{$family}{$cb+0} = sub { };
825     AE::postpone {
826     return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
827    
828     # set actual callback
829     $LOCAL_MON{$family}{$cb+0} = $cb;
830     $cb->($db, [keys %$db]);
831     };
832 root 1.81 } else {
833     # new monitor, request chg1 from upstream
834 root 1.89 $LOCAL_MON{$family}{$cb+0} = $cb;
835 root 1.81 global_req_add "mon1 $family" => [g_mon1 => $family];
836     $MON_DB{$family} = {};
837     }
838    
839 root 1.90 defined wantarray
840     and Guard::guard {
841     my $mon = $LOCAL_MON{$family};
842     delete $mon->{$cb+0};
843    
844     unless (%$mon) {
845     global_req_del "mon1 $family";
846    
847     # no global_req, because we don't care if we are not connected
848     snd $MASTER, g_mon0 => $family
849     if $MASTER;
850 root 1.80
851 root 1.90 delete $LOCAL_MON{$family};
852     delete $MON_DB{$family};
853     }
854 root 1.80 }
855     }
856    
857 root 1.82 # full update
858 root 1.80 $NODE_REQ{g_chg1} = sub {
859 root 1.96 return unless $SRCNODE eq $MASTER;
860 root 1.84 my ($f, $ndb) = @_;
861    
862     my $db = $MON_DB{$f};
863 root 1.89 my (@a, @c, @d);
864 root 1.81
865 root 1.82 # add or replace keys
866 root 1.84 while (my ($k, $v) = each %$ndb) {
867 root 1.89 exists $db->{$k}
868     ? push @c, $k
869     : push @a, $k;
870 root 1.84 $db->{$k} = $v;
871 root 1.82 }
872 root 1.81
873 root 1.82 # delete keys that are no longer present
874 root 1.84 for (grep !exists $ndb->{$_}, keys %$db) {
875     delete $db->{$_};
876 root 1.89 push @d, $_;
877 root 1.81 }
878 root 1.84
879 root 1.89 $_->($db, \@a, \@c, \@d)
880 root 1.84 for values %{ $LOCAL_MON{$_[0]} };
881 root 1.80 };
882    
883 root 1.82 # incremental update
884 root 1.84 $NODE_REQ{g_chg2} = sub {
885 root 1.96 return unless $SRCNODE eq $MASTER;
886 root 1.89 my ($family, $set, $del) = @_;
887    
888     my $db = $MON_DB{$family};
889 root 1.84
890 root 1.89 my (@a, @c);
891 root 1.84
892 root 1.89 while (my ($k, $v) = each %$set) {
893     exists $db->{$k}
894     ? push @c, $k
895     : push @a, $k;
896     $db->{$k} = $v;
897     }
898    
899     delete @$db{@$del};
900    
901     $_->($db, \@a, \@c, $del)
902     for values %{ $LOCAL_MON{$family} };
903 root 1.84 };
904 root 1.80
905 root 1.69 #############################################################################
906     # configure
907    
908 root 1.81 sub nodename {
909 root 1.69 require POSIX;
910     (POSIX::uname ())[1]
911     }
912    
913     sub _resolve($) {
914     my ($nodeid) = @_;
915    
916     my $cv = AE::cv;
917     my @res;
918    
919     $cv->begin (sub {
920     my %seen;
921     my @refs;
922     for (sort { $a->[0] <=> $b->[0] } @res) {
923     push @refs, $_->[1] unless $seen{$_->[1]}++
924     }
925     shift->send (@refs);
926     });
927    
928     my $idx;
929     for my $t (split /,/, $nodeid) {
930     my $pri = ++$idx;
931    
932 root 1.81 $t = length $t ? nodename . ":$t" : nodename
933 root 1.69 if $t =~ /^\d*$/;
934    
935     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
936     or Carp::croak "$t: unparsable transport descriptor";
937    
938     $port = "0" if $port eq "*";
939    
940     if ($host eq "*") {
941     $cv->begin;
942    
943 root 1.103 my $get_addr = sub {
944     my @addr;
945    
946     require Net::Interface;
947    
948     # Net::Interface hangs on some systems, so hope for the best
949     local $SIG{ALRM} = 'DEFAULT';
950     alarm 2;
951    
952     for my $if (Net::Interface->interfaces) {
953     # we statically lower-prioritise ipv6 here, TODO :()
954     for $_ ($if->address (Net::Interface::AF_INET ())) {
955     next if /^\x7f/; # skip localhost etc.
956     push @addr, $_;
957 root 1.69 }
958 root 1.103 for ($if->address (Net::Interface::AF_INET6 ())) {
959     #next if $if->scope ($_) <= 2;
960     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
961     push @addr, $_;
962 root 1.69 }
963     }
964 root 1.103
965     alarm 0;
966    
967     @addr
968     };
969    
970     my @addr;
971    
972     if (AnyEvent::WIN32) {
973     @addr = $get_addr->();
974     } else {
975     # use a child process, as Net::Interface is big, and we need it only once.
976    
977     pipe my $r, my $w
978     or die "pipe: $!";
979    
980     if (fork eq 0) {
981     close $r;
982     syswrite $w, pack "(C/a*)*", $get_addr->();
983     require POSIX;
984     POSIX::_exit (0);
985     } else {
986     close $w;
987    
988     my $addr;
989    
990     1 while sysread $r, $addr, 1024, length $addr;
991    
992     @addr = unpack "(C/a*)*", $addr;
993     }
994     }
995    
996     for my $ip (@addr) {
997     push @res, [
998     $pri += 1e-5,
999     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
1000     ];
1001     }
1002     $cv->end;
1003 root 1.69 } else {
1004     $cv->begin;
1005     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
1006     for (@_) {
1007     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
1008     push @res, [
1009     $pri += 1e-5,
1010     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
1011     ];
1012     }
1013     $cv->end;
1014     };
1015     }
1016     }
1017    
1018     $cv->end;
1019    
1020     $cv
1021     }
1022    
1023     sub configure(@) {
1024     unshift @_, "profile" if @_ & 1;
1025     my (%kv) = @_;
1026    
1027     my $profile = delete $kv{profile};
1028    
1029 root 1.81 $profile = nodename
1030 root 1.69 unless defined $profile;
1031    
1032     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
1033    
1034 root 1.104 $SECURE = $CONFIG->{secure};
1035 root 1.83
1036 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
1037 root 1.69
1038     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
1039    
1040 root 1.106 my $node_obj = delete $NODE{$NODE}; # we do not support doing stuff before configure
1041    
1042 root 1.72 $NODE = $node;
1043 root 1.77
1044 root 1.81 $NODE =~ s/%n/nodename/ge;
1045    
1046     if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
1047 root 1.77 # nodes with randomised node names do not need randomised port names
1048     $UNIQ = "";
1049     }
1050 root 1.69
1051 root 1.106 $node_obj->{id} = $NODE;
1052     $NODE{$NODE} = $node_obj;
1053 root 1.69
1054     my $seeds = $CONFIG->{seeds};
1055     my $binds = $CONFIG->{binds};
1056    
1057     $binds ||= ["*"];
1058    
1059 root 1.98 AE::log 8 => "node $NODE starting up.";
1060 root 1.69
1061 root 1.85 $BINDS = [];
1062     %BINDS = ();
1063 root 1.69
1064     for (map _resolve $_, @$binds) {
1065     for my $bind ($_->recv) {
1066     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1067     or Carp::croak "$bind: unparsable local bind address";
1068    
1069     my $listener = AnyEvent::MP::Transport::mp_server
1070     $host,
1071     $port,
1072     prepare => sub {
1073     my (undef, $host, $port) = @_;
1074     $bind = AnyEvent::Socket::format_hostport $host, $port;
1075     0
1076     },
1077     ;
1078 root 1.85 $BINDS{$bind} = $listener;
1079     push @$BINDS, $bind;
1080 root 1.69 }
1081     }
1082    
1083 root 1.85 db_set "'l" => $NODE => $BINDS;
1084 root 1.73
1085 root 1.98 AE::log 8 => "node listens on [@$BINDS].";
1086 root 1.69
1087     # connect to all seednodes
1088     set_seeds map $_->recv, map _resolve $_, @$seeds;
1089 root 1.73 master_search;
1090    
1091 root 1.103 # save gobs of memory
1092     undef &_resolve;
1093     *configure = sub (@){ };
1094    
1095 root 1.69 for (@{ $CONFIG->{services} }) {
1096     if (ref) {
1097     my ($func, @args) = @$_;
1098     (load_func $func)->(@args);
1099     } elsif (s/::$//) {
1100     eval "require $_";
1101     die $@ if $@;
1102     } else {
1103     (load_func $_)->();
1104     }
1105     }
1106 root 1.102
1107     eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
1108     die "$@" if $@;
1109 root 1.69 }
1110    
1111 root 1.1 =back
1112    
1113 root 1.101 =head1 LOGGING
1114    
1115     AnyEvent::MP::Kernel logs high-level information about the current node,
1116     when nodes go up and down, and most runtime errors. It also logs some
1117     debugging and trace messages about network maintainance, such as seed
1118     connections and global node management.
1119    
1120 root 1.1 =head1 SEE ALSO
1121    
1122     L<AnyEvent::MP>.
1123    
1124     =head1 AUTHOR
1125    
1126     Marc Lehmann <schmorp@schmorp.de>
1127     http://home.schmorp.de/
1128    
1129     =cut
1130    
1131     1
1132