ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.96
Committed: Wed Mar 21 15:22:16 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.95: +19 -14 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29    
30 root 1.79 use AnyEvent ();
31     use Guard ();
32 root 1.1
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.71 our @EXPORT_OK = qw(
39     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40     );
41    
42 root 1.1 our @EXPORT = qw(
43 root 1.21 add_node load_func snd_to_func snd_on eval_on
44 root 1.1
45 root 1.34 NODE $NODE node_of snd kil port_is_local
46     configure
47 root 1.46 up_nodes mon_nodes node_is_up
48 root 1.79 db_set db_del db_reg
49 root 1.84 db_mon db_family db_keys db_values
50 root 1.1 );
51    
52 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
53 root 1.1
54 root 1.27 This value is called with an error or warning message, when e.g. a
55     connection could not be created, authorisation failed and so on.
56    
57     It I<must not> block or send messages -queue it and use an idle watcher if
58     you need to do any of these things.
59 root 1.1
60 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
61 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62     node connectivity and services, C<8> for debugging messages and C<9> for
63     tracing messages.
64    
65 root 1.1 The default simply logs the message to STDERR.
66    
67 root 1.44 =item @AnyEvent::MP::Kernel::WARN
68    
69     All code references in this array are called for every log message, from
70     the default C<$WARN> handler. This is an easy way to tie into the log
71     messages without disturbing others.
72    
73 root 1.1 =cut
74    
75 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
76 root 1.44 our @WARN;
77     our $WARN = sub {
78     &$_ for @WARN;
79 root 1.29
80     return if $WARNLEVEL < $_[0];
81    
82 root 1.16 my ($level, $msg) = @_;
83    
84 root 1.1 $msg =~ s/\n$//;
85 root 1.16
86     printf STDERR "%s <%d> %s\n",
87     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
88     $level,
89     $msg;
90 root 1.1 };
91    
92 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
93    
94     The maximum level at which warning messages will be printed to STDERR by
95     the default warn handler.
96    
97     =cut
98    
99 root 1.6 sub load_func($) {
100     my $func = $_[0];
101    
102     unless (defined &$func) {
103     my $pkg = $func;
104     do {
105     $pkg =~ s/::[^:]+$//
106 root 1.63 or return sub { die "unable to resolve function '$func'" };
107 root 1.60
108     local $@;
109 root 1.61 unless (eval "require $pkg; 1") {
110     my $error = $@;
111     $error =~ /^Can't locate .*.pm in \@INC \(/
112     or return sub { die $error };
113     }
114 root 1.6 } until defined &$func;
115     }
116    
117     \&$func
118     }
119    
120 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
121    
122 root 1.1 sub nonce($) {
123 root 1.78 join "", map chr rand 256, 1 .. $_[0]
124 root 1.1 }
125    
126 root 1.78 sub nonce62($) {
127     join "", map $alnum[rand 62], 1 .. $_[0]
128 root 1.1 }
129    
130     sub gen_uniq {
131 root 1.78 my $now = AE::now;
132     (join "",
133     map $alnum[$_],
134     $$ / 62 % 62,
135     $$ % 62,
136     (int $now ) % 62,
137     (int $now * 100) % 62,
138     (int $now * 10000) % 62,
139     ) . nonce62 4;
140 root 1.1 }
141    
142 root 1.20 our $CONFIG; # this node's configuration
143 root 1.83 our $SECURE = sub { 1 };
144 root 1.21
145 root 1.64 our $RUNIQ; # remote uniq value
146     our $UNIQ; # per-process/node unique cookie
147     our $NODE;
148     our $ID = "a";
149 root 1.1
150     our %NODE; # node id to transport mapping, or "undef", for local node
151     our (%PORT, %PORT_DATA); # local ports
152    
153 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
154 root 1.1 our %LMON; # monitored _local_ ports
155    
156 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
157 root 1.85 our %BINDS;
158     our $BINDS; # our listeners, as arrayref
159 root 1.1
160 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
161 root 1.1
162 root 1.69 sub _init_names {
163 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
164     $UNIQ = gen_uniq;
165    
166     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
167     $RUNIQ = nonce62 10;
168     $RUNIQ =~ s/(.)$/\U$1/;
169    
170     $NODE = "anon/$RUNIQ";
171 root 1.64 }
172    
173 root 1.69 _init_names;
174 root 1.64
175 root 1.1 sub NODE() {
176     $NODE
177     }
178    
179     sub node_of($) {
180 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
181 root 1.1
182 root 1.21 $node
183 root 1.1 }
184    
185 root 1.17 BEGIN {
186     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
187     ? sub () { 1 }
188     : sub () { 0 };
189     }
190 root 1.1
191 root 1.42 our $DELAY_TIMER;
192     our @DELAY_QUEUE;
193    
194     sub _delay_run {
195 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
196 root 1.42 }
197    
198     sub delay($) {
199     push @DELAY_QUEUE, shift;
200     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
201     }
202    
203 root 1.96 =item $AnyEvent::MP::Kernel::SRCNODE
204    
205     During execution of a message callback, this variable contains the node ID
206     of the origin node.
207    
208     The main use of this variable is for debugging output - there are probably
209     very few other cases where you need to know the source node ID.
210    
211     =cut
212    
213 root 1.1 sub _inject {
214 root 1.96 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
215 root 1.1 &{ $PORT{+shift} or return };
216     }
217    
218 root 1.20 # this function adds a node-ref, so you can send stuff to it
219     # it is basically the central routing component.
220 root 1.1 sub add_node {
221 root 1.21 my ($node) = @_;
222 root 1.1
223 root 1.94 length $node
224     or Carp::croak "'undef' or the empty string are not valid node/port IDs";
225 root 1.93
226 root 1.71 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
227 root 1.13 }
228    
229 root 1.1 sub snd(@) {
230 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
231 root 1.1
232 root 1.86 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
233 root 1.1
234 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
235 root 1.2 ->{send} (["$portid", @_]);
236 root 1.1 }
237    
238 root 1.17 =item $is_local = port_is_local $port
239    
240     Returns true iff the port is a local port.
241    
242     =cut
243    
244     sub port_is_local($) {
245 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
246 root 1.17
247 root 1.21 $NODE{$nodeid} == $NODE{""}
248 root 1.17 }
249    
250 root 1.18 =item snd_to_func $node, $func, @args
251 root 1.11
252 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
253 root 1.11 this function with the given arguments on that node.
254    
255 root 1.20 This function can be used to implement C<spawn>-like interfaces.
256 root 1.11
257     =cut
258    
259 root 1.18 sub snd_to_func($$;@) {
260 root 1.21 my $nodeid = shift;
261 root 1.11
262 root 1.41 # on $NODE, we artificially delay... (for spawn)
263     # this is very ugly - maybe we should simply delay ALL messages,
264     # to avoid deep recursion issues. but that's so... slow...
265 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
266     if $nodeid ne $NODE;
267    
268 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
269 root 1.11 }
270    
271 root 1.18 =item snd_on $node, @msg
272    
273     Executes C<snd> with the given C<@msg> (which must include the destination
274     port) on the given node.
275    
276     =cut
277    
278     sub snd_on($@) {
279     my $node = shift;
280     snd $node, snd => @_;
281     }
282    
283 root 1.29 =item eval_on $node, $string[, @reply]
284 root 1.18
285 root 1.29 Evaluates the given string as Perl expression on the given node. When
286     @reply is specified, then it is used to construct a reply message with
287     C<"$@"> and any results from the eval appended.
288 root 1.18
289     =cut
290    
291 root 1.29 sub eval_on($$;@) {
292 root 1.18 my $node = shift;
293     snd $node, eval => @_;
294     }
295    
296 root 1.1 sub kil(@) {
297 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
298 root 1.1
299     length $portid
300 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
301 root 1.1
302 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
303 root 1.1 ->kill ("$portid", @_);
304     }
305    
306     #############################################################################
307 root 1.6 # node monitoring and info
308 root 1.3
309 root 1.21 =item node_is_up $nodeid
310 root 1.13
311     Returns true if the given node is "up", that is, the kernel thinks it has
312     a working connection to it.
313    
314 root 1.95 If the node is up, returns C<1>. If the node is currently connecting or
315     otherwise known but not connected, returns C<0>. If nothing is known about
316     the node, returns C<undef>.
317 root 1.13
318     =cut
319    
320     sub node_is_up($) {
321     ($NODE{$_[0]} or return)->{transport}
322     ? 1 : 0
323     }
324    
325 root 1.3 =item up_nodes
326    
327 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
328     the node itself).
329 root 1.3
330     =cut
331    
332 root 1.49 sub up_nodes() {
333 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
334 root 1.3 }
335    
336 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
337 root 1.3
338 root 1.27 Registers a callback that is called each time a node goes up (a connection
339     is established) or down (the connection is lost).
340 root 1.3
341     Node up messages can only be followed by node down messages for the same
342     node, and vice versa.
343    
344 root 1.71 Note that monitoring a node is usually better done by monitoring its node
345 root 1.27 port. This function is mainly of interest to modules that are concerned
346     about the network topology and low-level connection handling.
347    
348     Callbacks I<must not> block and I<should not> send any messages.
349    
350     The function returns an optional guard which can be used to unregister
351 root 1.3 the monitoring callback again.
352    
353 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
354     or go up (and down).
355    
356     newnode $_, 1 for up_nodes;
357     mon_nodes \&newnode;
358    
359 root 1.3 =cut
360    
361     our %MON_NODES;
362    
363     sub mon_nodes($) {
364     my ($cb) = @_;
365    
366     $MON_NODES{$cb+0} = $cb;
367    
368 root 1.90 defined wantarray
369     and Guard::guard { delete $MON_NODES{$cb+0} }
370 root 1.3 }
371    
372     sub _inject_nodeevent($$;@) {
373 root 1.16 my ($node, $up, @reason) = @_;
374 root 1.3
375     for my $cb (values %MON_NODES) {
376 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
377 root 1.16 or $WARN->(1, $@);
378 root 1.3 }
379 root 1.16
380 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
381 root 1.3 }
382    
383     #############################################################################
384 root 1.1 # self node code
385    
386 root 1.67 sub _kill {
387     my $port = shift;
388    
389     delete $PORT{$port}
390     or return; # killing nonexistent ports is O.K.
391     delete $PORT_DATA{$port};
392    
393     my $mon = delete $LMON{$port}
394     or !@_
395     or $WARN->(2, "unmonitored local port $port died with reason: @_");
396    
397     $_->(@_) for values %$mon;
398     }
399    
400     sub _monitor {
401     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
402     unless exists $PORT{$_[1]};
403    
404     $LMON{$_[1]}{$_[2]+0} = $_[2];
405     }
406    
407     sub _unmonitor {
408 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
409     if exists $LMON{$_[1]};
410 root 1.67 }
411    
412 root 1.83 sub _secure_check {
413 root 1.96 &$SECURE
414 root 1.83 or die "remote execution attempt by insecure node\n";
415     }
416    
417 root 1.79 our %NODE_REQ = (
418 root 1.1 # internal services
419    
420     # monitoring
421 root 1.65 mon0 => sub { # stop monitoring a port for another node
422 root 1.1 my $portid = shift;
423 root 1.96 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
424 root 1.1 },
425 root 1.65 mon1 => sub { # start monitoring a port for another node
426 root 1.1 my $portid = shift;
427 root 1.96 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
428 root 1.67 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
429 root 1.58 delete $node->{rmon}{$portid};
430 root 1.65 $node->send (["", kil0 => $portid, @_])
431 root 1.59 if $node && $node->{transport};
432 root 1.67 };
433 root 1.1 },
434 root 1.65 # another node has killed a monitored port
435     kil0 => sub {
436 root 1.96 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
437 root 1.1 or return;
438    
439     $_->(@_) for @$cbs;
440     },
441    
442 root 1.18 # "public" services - not actually public
443 root 1.1
444 root 1.65 # another node wants to kill a local port
445 root 1.66 kil => \&_kill,
446 root 1.65
447 root 1.1 # relay message to another node / generic echo
448 root 1.88 snd => sub {
449     &_secure_check;
450     &snd
451 root 1.1 },
452    
453 root 1.30 # random utilities
454 root 1.1 eval => sub {
455 root 1.83 &_secure_check;
456 root 1.50 my @res = do { package main; eval shift };
457 root 1.1 snd @_, "$@", @res if @_;
458     },
459     time => sub {
460 root 1.88 &_secure_check;
461 root 1.76 snd @_, AE::now;
462 root 1.1 },
463     devnull => sub {
464     #
465     },
466 root 1.15 "" => sub {
467 root 1.27 # empty messages are keepalives or similar devnull-applications
468 root 1.15 },
469 root 1.1 );
470    
471 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
472 root 1.1 $PORT{""} = sub {
473     my $tag = shift;
474 root 1.83 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
475 root 1.96 $WARN->(2, "error processing node message from $SRCNODE: $@") if $@;
476 root 1.1 };
477    
478 root 1.84 our $NPROTO = 1;
479    
480     # tell everybody who connects our nproto
481     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
482     $_[0]{local_greeting}{nproto} = $NPROTO;
483     };
484    
485 root 1.69 #############################################################################
486 root 1.71 # seed management, try to keep connections to all seeds at all times
487 root 1.69
488 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
489     our %NODE_SEED; # map node ID to seed ID
490 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
491     our $SEED_WATCHER;
492 root 1.71 our $SEED_RETRY;
493 root 1.69
494     sub seed_connect {
495     my ($seed) = @_;
496    
497     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
498     or Carp::croak "$seed: unparsable seed address";
499    
500     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
501    
502 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
503     $host, $port,
504     on_greeted => sub {
505     # called after receiving remote greeting, learn remote node name
506    
507     # we rely on untrusted data here (the remote node name) this is
508     # hopefully ok, as this can at most be used for DOSing, which is easy
509     # when you can do MITM anyway.
510    
511     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
512     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
513 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
514 root 1.71 delete $SEED_NODE{$seed};
515     } else {
516     $SEED_NODE{$seed} = $_[0]{remote_node};
517     $NODE_SEED{$_[0]{remote_node}} = $seed;
518 root 1.93 # also start global service, if not running
519     # we need to check here in addition to the mon_nodes below
520     # because we might only learn late that a node is a seed
521     # and then we might already be connected
522     snd $_[0]{remote_node}, "g_slave"
523     unless $_[0]{remote_greeting}{global};
524 root 1.71 }
525     },
526 root 1.93 sub {
527 root 1.71 delete $SEED_CONNECT{$seed};
528     }
529 root 1.69 ;
530     }
531    
532     sub seed_all {
533 root 1.93 my @seeds = grep
534     !exists $SEED_CONNECT{$_}
535     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
536     keys %SEED_NODE;
537 root 1.69
538     if (@seeds) {
539 root 1.70 # start connection attempt for every seed we are not connected to yet
540 root 1.69 seed_connect $_
541     for @seeds;
542 root 1.71
543     $SEED_RETRY = $SEED_RETRY * 2 + rand;
544     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
545     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
546    
547     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
548    
549 root 1.69 } else {
550 root 1.71 # all seeds connected or connecting, no need to restart timer
551 root 1.69 undef $SEED_WATCHER;
552     }
553     }
554    
555     sub seed_again {
556 root 1.71 $SEED_RETRY = 1;
557     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
558 root 1.69 }
559    
560     # sets new seed list, starts connecting
561     sub set_seeds(@) {
562     %SEED_NODE = ();
563 root 1.71 %NODE_SEED = ();
564     %SEED_CONNECT = ();
565    
566 root 1.69 @SEED_NODE{@_} = ();
567    
568 root 1.71 seed_all;
569     }
570    
571     mon_nodes sub {
572 root 1.93 return unless exists $NODE_SEED{$_[0]};
573    
574     if ($_[1]) {
575     # each time a connection to a seed node goes up, make
576     # sure it runs the global service.
577     snd $_[0], "g_slave"
578     unless $NODE{$_[0]}{transport}{remote_greeting}{global};
579     } else {
580     # if we lost the connection to a seed node, make sure we are seeding
581     seed_again;
582     }
583 root 1.71 };
584    
585     #############################################################################
586     # talk with/to global nodes
587    
588 root 1.72 # protocol messages:
589     #
590 root 1.73 # sent by all slave nodes (slave to master)
591     # g_slave database - make other global node master of the sender
592 root 1.72 #
593 root 1.73 # sent by any node to global nodes
594     # g_set database - set whole database
595 root 1.89 # g_upd family set del - update single family
596 root 1.73 # g_del family key - delete key from database
597     # g_get family key reply... - send reply with data
598     #
599     # send by global nodes
600     # g_global - node became global, similar to global=1 greeting
601     #
602     # database families
603     # "'l" -> node -> listeners
604     # "'g" -> node -> undef
605     # ...
606 root 1.72 #
607    
608 root 1.73 # used on all nodes:
609 root 1.88 our $MASTER; # the global node we bind ourselves to
610 root 1.78 our $MASTER_MON;
611     our %LOCAL_DB; # this node database
612    
613     our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
614 root 1.71
615 root 1.84 our $GPROTO = 1;
616    
617     # tell everybody who connects our nproto
618     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
619     $_[0]{local_greeting}{gproto} = $GPROTO;
620     };
621    
622 root 1.73 #############################################################################
623     # master selection
624    
625     # master requests
626     our %GLOBAL_REQ; # $id => \@req
627 root 1.71
628 root 1.74 sub global_req_add {
629 root 1.80 my ($id, $req) = @_;
630 root 1.71
631 root 1.74 return if exists $GLOBAL_REQ{$id};
632    
633 root 1.80 $GLOBAL_REQ{$id} = $req;
634 root 1.71
635 root 1.80 snd $MASTER, @$req
636 root 1.73 if $MASTER;
637 root 1.74 }
638 root 1.71
639 root 1.74 sub global_req_del {
640     delete $GLOBAL_REQ{$_[0]};
641     }
642    
643 root 1.88 #################################
644     # master rpc
645    
646     our %GLOBAL_RES;
647     our $GLOBAL_RES_ID = "a";
648    
649     sub global_call {
650     my $id = ++$GLOBAL_RES_ID;
651     $GLOBAL_RES{$id} = pop;
652     global_req_add $id, [@_, $id];
653     }
654    
655     $NODE_REQ{g_reply} = sub {
656     my $id = shift;
657     global_req_del $id;
658     my $cb = delete $GLOBAL_RES{$id}
659     or return;
660     &$cb
661     };
662    
663     #################################
664    
665 root 1.74 sub g_find {
666 root 1.80 global_req_add "g_find $_[0]", [g_find => $_[0]];
667 root 1.73 }
668 root 1.71
669 root 1.73 # reply for g_find started in Node.pm
670 root 1.79 $NODE_REQ{g_found} = sub {
671 root 1.74 global_req_del "g_find $_[0]";
672    
673 root 1.73 my $node = $NODE{$_[0]} or return;
674 root 1.71
675 root 1.79 $node->connect_to ($_[1]);
676 root 1.71 };
677    
678 root 1.73 sub master_set {
679     $MASTER = $_[0];
680 root 1.71
681 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
682 root 1.71
683 root 1.73 # (re-)send queued requests
684     snd $MASTER, @$_
685     for values %GLOBAL_REQ;
686     }
687 root 1.71
688 root 1.72 sub master_search {
689 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
690     for (keys %NODE_SEED) {
691 root 1.72 if (node_is_up $_) {
692     master_set $_;
693     return;
694 root 1.71 }
695 root 1.72 }
696 root 1.71
697 root 1.78 $MASTER_MON = mon_nodes sub {
698 root 1.72 return unless $_[1]; # we are only interested in node-ups
699     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
700 root 1.71
701 root 1.72 master_set $_[0];
702 root 1.71
703 root 1.78 $MASTER_MON = mon_nodes sub {
704 root 1.72 if ($_[0] eq $MASTER && !$_[1]) {
705     undef $MASTER;
706     master_search ();
707     }
708 root 1.71 };
709 root 1.72 };
710 root 1.71 }
711    
712 root 1.73 # other node wants to make us the master
713 root 1.79 $NODE_REQ{g_slave} = sub {
714 root 1.73 my ($db) = @_;
715    
716 root 1.80 # load global module and redo the request
717 root 1.73 require AnyEvent::MP::Global;
718 root 1.80 &{ $NODE_REQ{g_slave} }
719 root 1.71 };
720    
721 root 1.73 #############################################################################
722 root 1.79 # local database operations
723 root 1.71
724 root 1.79 # local database management
725 root 1.88
726 root 1.87 sub db_set($$;$) {
727 root 1.89 # if (ref $_[1]) {
728     # # bulk
729     # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
730     # $LOCAL_DB{$_[0]} = $_[1];
731     # snd $MASTER, g_upd => $_[0] => $_[1], \@del
732     # if defined $MASTER;
733     # } else {
734     # single-key
735     $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
736     snd $MASTER, g_upd => $_[0] => { $_[1] => $_[2] }
737     if defined $MASTER;
738     # }
739 root 1.79 }
740    
741 root 1.89 sub db_del($@) {
742     my $family = shift;
743    
744     delete @{ $LOCAL_DB{$family} }{@_};
745     snd $MASTER, g_upd => $family => undef, \@_
746 root 1.79 if defined $MASTER;
747     }
748    
749     sub db_reg($$;$) {
750     my ($family, $key) = @_;
751     &db_set;
752     Guard::guard { db_del $family => $key }
753     }
754 root 1.71
755 root 1.88 # database query
756    
757     sub db_family {
758     my ($family, $cb) = @_;
759     global_call g_db_family => $family, $cb;
760     }
761    
762     sub db_keys {
763     my ($family, $cb) = @_;
764     global_call g_db_keys => $family, $cb;
765     }
766    
767     sub db_values {
768     my ($family, $cb) = @_;
769     global_call g_db_values => $family, $cb;
770 root 1.80 }
771    
772 root 1.88 # database monitoring
773 root 1.80
774 root 1.81 our %LOCAL_MON; # f, reply
775     our %MON_DB; # f, k, value
776 root 1.80
777 root 1.81 sub db_mon($@) {
778 root 1.84 my ($family, $cb) = @_;
779 root 1.81
780 root 1.84 if (my $db = $MON_DB{$family}) {
781 root 1.89 # we already monitor, so create a "dummy" change event
782     # this is postponed, which might be too late (we could process
783     # change events), so disable the callback at first
784     $LOCAL_MON{$family}{$cb+0} = sub { };
785     AE::postpone {
786     return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
787    
788     # set actual callback
789     $LOCAL_MON{$family}{$cb+0} = $cb;
790     $cb->($db, [keys %$db]);
791     };
792 root 1.81 } else {
793     # new monitor, request chg1 from upstream
794 root 1.89 $LOCAL_MON{$family}{$cb+0} = $cb;
795 root 1.81 global_req_add "mon1 $family" => [g_mon1 => $family];
796     $MON_DB{$family} = {};
797     }
798    
799 root 1.90 defined wantarray
800     and Guard::guard {
801     my $mon = $LOCAL_MON{$family};
802     delete $mon->{$cb+0};
803    
804     unless (%$mon) {
805     global_req_del "mon1 $family";
806    
807     # no global_req, because we don't care if we are not connected
808     snd $MASTER, g_mon0 => $family
809     if $MASTER;
810 root 1.80
811 root 1.90 delete $LOCAL_MON{$family};
812     delete $MON_DB{$family};
813     }
814 root 1.80 }
815     }
816    
817 root 1.82 # full update
818 root 1.80 $NODE_REQ{g_chg1} = sub {
819 root 1.96 return unless $SRCNODE eq $MASTER;
820 root 1.84 my ($f, $ndb) = @_;
821    
822     my $db = $MON_DB{$f};
823 root 1.89 my (@a, @c, @d);
824 root 1.81
825 root 1.82 # add or replace keys
826 root 1.84 while (my ($k, $v) = each %$ndb) {
827 root 1.89 exists $db->{$k}
828     ? push @c, $k
829     : push @a, $k;
830 root 1.84 $db->{$k} = $v;
831 root 1.82 }
832 root 1.81
833 root 1.82 # delete keys that are no longer present
834 root 1.84 for (grep !exists $ndb->{$_}, keys %$db) {
835     delete $db->{$_};
836 root 1.89 push @d, $_;
837 root 1.81 }
838 root 1.84
839 root 1.89 $_->($db, \@a, \@c, \@d)
840 root 1.84 for values %{ $LOCAL_MON{$_[0]} };
841 root 1.80 };
842    
843 root 1.82 # incremental update
844 root 1.84 $NODE_REQ{g_chg2} = sub {
845 root 1.96 return unless $SRCNODE eq $MASTER;
846 root 1.89 my ($family, $set, $del) = @_;
847    
848     my $db = $MON_DB{$family};
849 root 1.84
850 root 1.89 my (@a, @c);
851 root 1.84
852 root 1.89 while (my ($k, $v) = each %$set) {
853     exists $db->{$k}
854     ? push @c, $k
855     : push @a, $k;
856     $db->{$k} = $v;
857     }
858    
859     delete @$db{@$del};
860    
861     $_->($db, \@a, \@c, $del)
862     for values %{ $LOCAL_MON{$family} };
863 root 1.84 };
864 root 1.80
865 root 1.69 #############################################################################
866     # configure
867    
868 root 1.81 sub nodename {
869 root 1.69 require POSIX;
870     (POSIX::uname ())[1]
871     }
872    
873     sub _resolve($) {
874     my ($nodeid) = @_;
875    
876     my $cv = AE::cv;
877     my @res;
878    
879     $cv->begin (sub {
880     my %seen;
881     my @refs;
882     for (sort { $a->[0] <=> $b->[0] } @res) {
883     push @refs, $_->[1] unless $seen{$_->[1]}++
884     }
885     shift->send (@refs);
886     });
887    
888     my $idx;
889     for my $t (split /,/, $nodeid) {
890     my $pri = ++$idx;
891    
892 root 1.81 $t = length $t ? nodename . ":$t" : nodename
893 root 1.69 if $t =~ /^\d*$/;
894    
895     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
896     or Carp::croak "$t: unparsable transport descriptor";
897    
898     $port = "0" if $port eq "*";
899    
900     if ($host eq "*") {
901     $cv->begin;
902     # use fork_call, as Net::Interface is big, and we need it rarely.
903     require AnyEvent::Util;
904     AnyEvent::Util::fork_call (
905     sub {
906     my @addr;
907    
908     require Net::Interface;
909    
910     for my $if (Net::Interface->interfaces) {
911     # we statically lower-prioritise ipv6 here, TODO :()
912     for $_ ($if->address (Net::Interface::AF_INET ())) {
913     next if /^\x7f/; # skip localhost etc.
914     push @addr, $_;
915     }
916     for ($if->address (Net::Interface::AF_INET6 ())) {
917     #next if $if->scope ($_) <= 2;
918     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
919     push @addr, $_;
920     }
921    
922     }
923     @addr
924     }, sub {
925     for my $ip (@_) {
926     push @res, [
927     $pri += 1e-5,
928     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
929     ];
930     }
931     $cv->end;
932     }
933     );
934     } else {
935     $cv->begin;
936     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
937     for (@_) {
938     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
939     push @res, [
940     $pri += 1e-5,
941     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
942     ];
943     }
944     $cv->end;
945     };
946     }
947     }
948    
949     $cv->end;
950    
951     $cv
952     }
953    
954     sub configure(@) {
955     unshift @_, "profile" if @_ & 1;
956     my (%kv) = @_;
957    
958     delete $NODE{$NODE}; # we do not support doing stuff before configure
959     _init_names;
960    
961     my $profile = delete $kv{profile};
962    
963 root 1.81 $profile = nodename
964 root 1.69 unless defined $profile;
965    
966     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
967    
968 root 1.83 if (exists $CONFIG->{secure}) {
969 root 1.96 $SECURE = eval $CONFIG->{secure} ? "sub { 0 }" : "sub { 1 }";
970 root 1.83 }
971    
972 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
973 root 1.69
974     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
975    
976 root 1.72 $NODE = $node;
977 root 1.77
978 root 1.81 $NODE =~ s/%n/nodename/ge;
979    
980     if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
981 root 1.77 # nodes with randomised node names do not need randomised port names
982     $UNIQ = "";
983     }
984 root 1.69
985     $NODE{$NODE} = $NODE{""};
986     $NODE{$NODE}{id} = $NODE;
987    
988     my $seeds = $CONFIG->{seeds};
989     my $binds = $CONFIG->{binds};
990    
991     $binds ||= ["*"];
992    
993     $WARN->(8, "node $NODE starting up.");
994    
995 root 1.85 $BINDS = [];
996     %BINDS = ();
997 root 1.69
998     for (map _resolve $_, @$binds) {
999     for my $bind ($_->recv) {
1000     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1001     or Carp::croak "$bind: unparsable local bind address";
1002    
1003     my $listener = AnyEvent::MP::Transport::mp_server
1004     $host,
1005     $port,
1006     prepare => sub {
1007     my (undef, $host, $port) = @_;
1008     $bind = AnyEvent::Socket::format_hostport $host, $port;
1009     0
1010     },
1011     ;
1012 root 1.85 $BINDS{$bind} = $listener;
1013     push @$BINDS, $bind;
1014 root 1.69 }
1015     }
1016    
1017 root 1.85 db_set "'l" => $NODE => $BINDS;
1018 root 1.73
1019 root 1.85 $WARN->(8, "node listens on [@$BINDS].");
1020 root 1.69
1021     # connect to all seednodes
1022     set_seeds map $_->recv, map _resolve $_, @$seeds;
1023    
1024 root 1.73 master_search;
1025    
1026 root 1.69 for (@{ $CONFIG->{services} }) {
1027     if (ref) {
1028     my ($func, @args) = @$_;
1029     (load_func $func)->(@args);
1030     } elsif (s/::$//) {
1031     eval "require $_";
1032     die $@ if $@;
1033     } else {
1034     (load_func $_)->();
1035     }
1036     }
1037     }
1038    
1039 root 1.1 =back
1040    
1041     =head1 SEE ALSO
1042    
1043     L<AnyEvent::MP>.
1044    
1045     =head1 AUTHOR
1046    
1047     Marc Lehmann <schmorp@schmorp.de>
1048     http://home.schmorp.de/
1049    
1050     =cut
1051    
1052     1
1053