ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.89
Committed: Fri Mar 9 17:05:26 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.88: +51 -22 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29    
30 root 1.79 use AnyEvent ();
31     use Guard ();
32 root 1.1
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.71 our @EXPORT_OK = qw(
39     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40     );
41    
42 root 1.1 our @EXPORT = qw(
43 root 1.21 add_node load_func snd_to_func snd_on eval_on
44 root 1.1
45 root 1.34 NODE $NODE node_of snd kil port_is_local
46     configure
47 root 1.46 up_nodes mon_nodes node_is_up
48 root 1.79 db_set db_del db_reg
49 root 1.84 db_mon db_family db_keys db_values
50 root 1.1 );
51    
52 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
53 root 1.1
54 root 1.27 This value is called with an error or warning message, when e.g. a
55     connection could not be created, authorisation failed and so on.
56    
57     It I<must not> block or send messages -queue it and use an idle watcher if
58     you need to do any of these things.
59 root 1.1
60 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
61 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62     node connectivity and services, C<8> for debugging messages and C<9> for
63     tracing messages.
64    
65 root 1.1 The default simply logs the message to STDERR.
66    
67 root 1.44 =item @AnyEvent::MP::Kernel::WARN
68    
69     All code references in this array are called for every log message, from
70     the default C<$WARN> handler. This is an easy way to tie into the log
71     messages without disturbing others.
72    
73 root 1.1 =cut
74    
75 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
76 root 1.44 our @WARN;
77     our $WARN = sub {
78     &$_ for @WARN;
79 root 1.29
80     return if $WARNLEVEL < $_[0];
81    
82 root 1.16 my ($level, $msg) = @_;
83    
84 root 1.1 $msg =~ s/\n$//;
85 root 1.16
86     printf STDERR "%s <%d> %s\n",
87     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
88     $level,
89     $msg;
90 root 1.1 };
91    
92 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
93    
94     The maximum level at which warning messages will be printed to STDERR by
95     the default warn handler.
96    
97     =cut
98    
99 root 1.6 sub load_func($) {
100     my $func = $_[0];
101    
102     unless (defined &$func) {
103     my $pkg = $func;
104     do {
105     $pkg =~ s/::[^:]+$//
106 root 1.63 or return sub { die "unable to resolve function '$func'" };
107 root 1.60
108     local $@;
109 root 1.61 unless (eval "require $pkg; 1") {
110     my $error = $@;
111     $error =~ /^Can't locate .*.pm in \@INC \(/
112     or return sub { die $error };
113     }
114 root 1.6 } until defined &$func;
115     }
116    
117     \&$func
118     }
119    
120 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
121    
122 root 1.1 sub nonce($) {
123 root 1.78 join "", map chr rand 256, 1 .. $_[0]
124 root 1.1 }
125    
126 root 1.78 sub nonce62($) {
127     join "", map $alnum[rand 62], 1 .. $_[0]
128 root 1.1 }
129    
130     sub gen_uniq {
131 root 1.78 my $now = AE::now;
132     (join "",
133     map $alnum[$_],
134     $$ / 62 % 62,
135     $$ % 62,
136     (int $now ) % 62,
137     (int $now * 100) % 62,
138     (int $now * 10000) % 62,
139     ) . nonce62 4;
140 root 1.1 }
141    
142 root 1.20 our $CONFIG; # this node's configuration
143 root 1.83 our $SECURE = sub { 1 };
144 root 1.21
145 root 1.64 our $RUNIQ; # remote uniq value
146     our $UNIQ; # per-process/node unique cookie
147     our $NODE;
148     our $ID = "a";
149 root 1.1
150     our %NODE; # node id to transport mapping, or "undef", for local node
151     our (%PORT, %PORT_DATA); # local ports
152    
153 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
154 root 1.1 our %LMON; # monitored _local_ ports
155    
156 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
157 root 1.85 our %BINDS;
158     our $BINDS; # our listeners, as arrayref
159 root 1.1
160 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
161 root 1.1
162 root 1.69 sub _init_names {
163 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
164     $UNIQ = gen_uniq;
165    
166     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
167     $RUNIQ = nonce62 10;
168     $RUNIQ =~ s/(.)$/\U$1/;
169    
170     $NODE = "anon/$RUNIQ";
171 root 1.64 }
172    
173 root 1.69 _init_names;
174 root 1.64
175 root 1.1 sub NODE() {
176     $NODE
177     }
178    
179     sub node_of($) {
180 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
181 root 1.1
182 root 1.21 $node
183 root 1.1 }
184    
185 root 1.17 BEGIN {
186     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
187     ? sub () { 1 }
188     : sub () { 0 };
189     }
190 root 1.1
191 root 1.42 our $DELAY_TIMER;
192     our @DELAY_QUEUE;
193    
194     sub _delay_run {
195 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
196 root 1.42 }
197    
198     sub delay($) {
199     push @DELAY_QUEUE, shift;
200     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
201     }
202    
203 root 1.1 sub _inject {
204 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
205 root 1.1 &{ $PORT{+shift} or return };
206     }
207    
208 root 1.20 # this function adds a node-ref, so you can send stuff to it
209     # it is basically the central routing component.
210 root 1.1 sub add_node {
211 root 1.21 my ($node) = @_;
212 root 1.1
213 root 1.71 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
214 root 1.13 }
215    
216 root 1.1 sub snd(@) {
217 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
218 root 1.1
219 root 1.86 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
220 root 1.1
221 root 1.49 defined $nodeid #d#UGLY
222     or Carp::croak "'undef' is not a valid node ID/port ID";
223    
224 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
225 root 1.2 ->{send} (["$portid", @_]);
226 root 1.1 }
227    
228 root 1.17 =item $is_local = port_is_local $port
229    
230     Returns true iff the port is a local port.
231    
232     =cut
233    
234     sub port_is_local($) {
235 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
236 root 1.17
237 root 1.21 $NODE{$nodeid} == $NODE{""}
238 root 1.17 }
239    
240 root 1.18 =item snd_to_func $node, $func, @args
241 root 1.11
242 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
243 root 1.11 this function with the given arguments on that node.
244    
245 root 1.20 This function can be used to implement C<spawn>-like interfaces.
246 root 1.11
247     =cut
248    
249 root 1.18 sub snd_to_func($$;@) {
250 root 1.21 my $nodeid = shift;
251 root 1.11
252 root 1.41 # on $NODE, we artificially delay... (for spawn)
253     # this is very ugly - maybe we should simply delay ALL messages,
254     # to avoid deep recursion issues. but that's so... slow...
255 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
256     if $nodeid ne $NODE;
257    
258 root 1.49 defined $nodeid #d#UGLY
259     or Carp::croak "'undef' is not a valid node ID/port ID";
260    
261 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
262 root 1.11 }
263    
264 root 1.18 =item snd_on $node, @msg
265    
266     Executes C<snd> with the given C<@msg> (which must include the destination
267     port) on the given node.
268    
269     =cut
270    
271     sub snd_on($@) {
272     my $node = shift;
273     snd $node, snd => @_;
274     }
275    
276 root 1.29 =item eval_on $node, $string[, @reply]
277 root 1.18
278 root 1.29 Evaluates the given string as Perl expression on the given node. When
279     @reply is specified, then it is used to construct a reply message with
280     C<"$@"> and any results from the eval appended.
281 root 1.18
282     =cut
283    
284 root 1.29 sub eval_on($$;@) {
285 root 1.18 my $node = shift;
286     snd $node, eval => @_;
287     }
288    
289 root 1.1 sub kil(@) {
290 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
291 root 1.1
292     length $portid
293 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
294 root 1.1
295 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
296 root 1.1 ->kill ("$portid", @_);
297     }
298    
299     #############################################################################
300 root 1.6 # node monitoring and info
301 root 1.3
302 root 1.21 =item node_is_known $nodeid
303 root 1.13
304 root 1.71 #TODO#
305     Returns true iff the given node is currently known to this node.
306 root 1.13
307     =cut
308    
309     sub node_is_known($) {
310     exists $NODE{$_[0]}
311     }
312    
313 root 1.21 =item node_is_up $nodeid
314 root 1.13
315     Returns true if the given node is "up", that is, the kernel thinks it has
316     a working connection to it.
317    
318 root 1.69 If the node is known (to this local node) but not currently connected,
319     returns C<0>. If the node is not known, returns C<undef>.
320 root 1.13
321     =cut
322    
323     sub node_is_up($) {
324     ($NODE{$_[0]} or return)->{transport}
325     ? 1 : 0
326     }
327    
328 root 1.3 =item up_nodes
329    
330 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
331     the node itself).
332 root 1.3
333     =cut
334    
335 root 1.49 sub up_nodes() {
336 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
337 root 1.3 }
338    
339 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
340 root 1.3
341 root 1.27 Registers a callback that is called each time a node goes up (a connection
342     is established) or down (the connection is lost).
343 root 1.3
344     Node up messages can only be followed by node down messages for the same
345     node, and vice versa.
346    
347 root 1.71 Note that monitoring a node is usually better done by monitoring its node
348 root 1.27 port. This function is mainly of interest to modules that are concerned
349     about the network topology and low-level connection handling.
350    
351     Callbacks I<must not> block and I<should not> send any messages.
352    
353     The function returns an optional guard which can be used to unregister
354 root 1.3 the monitoring callback again.
355    
356 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
357     or go up (and down).
358    
359     newnode $_, 1 for up_nodes;
360     mon_nodes \&newnode;
361    
362 root 1.3 =cut
363    
364     our %MON_NODES;
365    
366     sub mon_nodes($) {
367     my ($cb) = @_;
368    
369     $MON_NODES{$cb+0} = $cb;
370    
371 root 1.79 defined wantarray && Guard::guard { delete $MON_NODES{$cb+0} }
372 root 1.3 }
373    
374     sub _inject_nodeevent($$;@) {
375 root 1.16 my ($node, $up, @reason) = @_;
376 root 1.3
377     for my $cb (values %MON_NODES) {
378 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
379 root 1.16 or $WARN->(1, $@);
380 root 1.3 }
381 root 1.16
382 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
383 root 1.3 }
384    
385     #############################################################################
386 root 1.1 # self node code
387    
388 root 1.67 sub _kill {
389     my $port = shift;
390    
391     delete $PORT{$port}
392     or return; # killing nonexistent ports is O.K.
393     delete $PORT_DATA{$port};
394    
395     my $mon = delete $LMON{$port}
396     or !@_
397     or $WARN->(2, "unmonitored local port $port died with reason: @_");
398    
399     $_->(@_) for values %$mon;
400     }
401    
402     sub _monitor {
403     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
404     unless exists $PORT{$_[1]};
405    
406     $LMON{$_[1]}{$_[2]+0} = $_[2];
407     }
408    
409     sub _unmonitor {
410 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
411     if exists $LMON{$_[1]};
412 root 1.67 }
413    
414 root 1.83 sub _secure_check {
415     $SECURE->($SRCNODE->{id})
416 root 1.88 or $SRCNODE->{id} eq $NODE
417 root 1.83 or die "remote execution attempt by insecure node\n";
418     }
419    
420 root 1.79 our %NODE_REQ = (
421 root 1.1 # internal services
422    
423     # monitoring
424 root 1.65 mon0 => sub { # stop monitoring a port for another node
425 root 1.1 my $portid = shift;
426 root 1.67 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
427 root 1.1 },
428 root 1.65 mon1 => sub { # start monitoring a port for another node
429 root 1.1 my $portid = shift;
430 root 1.67 Scalar::Util::weaken (my $node = $SRCNODE);
431     _monitor undef, $portid, $node->{rmon}{$portid} = sub {
432 root 1.58 delete $node->{rmon}{$portid};
433 root 1.65 $node->send (["", kil0 => $portid, @_])
434 root 1.59 if $node && $node->{transport};
435 root 1.67 };
436 root 1.1 },
437 root 1.65 # another node has killed a monitored port
438     kil0 => sub {
439 root 1.1 my $cbs = delete $SRCNODE->{lmon}{+shift}
440     or return;
441    
442     $_->(@_) for @$cbs;
443     },
444    
445 root 1.18 # "public" services - not actually public
446 root 1.1
447 root 1.65 # another node wants to kill a local port
448 root 1.66 kil => \&_kill,
449 root 1.65
450 root 1.88 # is the remote node considered secure?
451     # secure => sub {
452     # #TODO#
453     # },
454    
455 root 1.1 # relay message to another node / generic echo
456 root 1.88 snd => sub {
457     &_secure_check;
458     &snd
459 root 1.1 },
460    
461 root 1.30 # random utilities
462 root 1.1 eval => sub {
463 root 1.83 &_secure_check;
464 root 1.50 my @res = do { package main; eval shift };
465 root 1.1 snd @_, "$@", @res if @_;
466     },
467     time => sub {
468 root 1.88 &_secure_check;
469 root 1.76 snd @_, AE::now;
470 root 1.1 },
471     devnull => sub {
472     #
473     },
474 root 1.15 "" => sub {
475 root 1.27 # empty messages are keepalives or similar devnull-applications
476 root 1.15 },
477 root 1.1 );
478    
479 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
480 root 1.1 $PORT{""} = sub {
481     my $tag = shift;
482 root 1.83 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
483     $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@;
484 root 1.1 };
485    
486 root 1.84 our $NPROTO = 1;
487    
488     # tell everybody who connects our nproto
489     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
490     $_[0]{local_greeting}{nproto} = $NPROTO;
491     };
492    
493 root 1.69 #############################################################################
494 root 1.71 # seed management, try to keep connections to all seeds at all times
495 root 1.69
496 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
497     our %NODE_SEED; # map node ID to seed ID
498 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
499     our $SEED_WATCHER;
500 root 1.71 our $SEED_RETRY;
501 root 1.69
502     sub seed_connect {
503     my ($seed) = @_;
504    
505     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
506     or Carp::croak "$seed: unparsable seed address";
507    
508     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
509    
510 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
511     $host, $port,
512     on_greeted => sub {
513     # called after receiving remote greeting, learn remote node name
514    
515     # we rely on untrusted data here (the remote node name) this is
516     # hopefully ok, as this can at most be used for DOSing, which is easy
517     # when you can do MITM anyway.
518    
519     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
520     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
521 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
522 root 1.71 delete $SEED_NODE{$seed};
523     delete $NODE_SEED{$seed};
524     } else {
525     $SEED_NODE{$seed} = $_[0]{remote_node};
526     $NODE_SEED{$_[0]{remote_node}} = $seed;
527     }
528     },
529     on_destroy => sub {
530     delete $SEED_CONNECT{$seed};
531     },
532 root 1.69 sub {
533     $SEED_CONNECT{$seed} = 1;
534 root 1.71 }
535 root 1.69 ;
536     }
537    
538     sub seed_all {
539     my @seeds = grep {
540     !exists $SEED_CONNECT{$_}
541     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
542     } keys %SEED_NODE;
543    
544     if (@seeds) {
545 root 1.70 # start connection attempt for every seed we are not connected to yet
546 root 1.69 seed_connect $_
547     for @seeds;
548 root 1.71
549     $SEED_RETRY = $SEED_RETRY * 2 + rand;
550     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
551     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
552    
553     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
554    
555 root 1.69 } else {
556 root 1.71 # all seeds connected or connecting, no need to restart timer
557 root 1.69 undef $SEED_WATCHER;
558     }
559     }
560    
561     sub seed_again {
562 root 1.71 $SEED_RETRY = 1;
563     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
564 root 1.69 }
565    
566     # sets new seed list, starts connecting
567     sub set_seeds(@) {
568     %SEED_NODE = ();
569 root 1.71 %NODE_SEED = ();
570     %SEED_CONNECT = ();
571    
572 root 1.69 @SEED_NODE{@_} = ();
573    
574 root 1.71 seed_again;#d#
575     seed_all;
576     }
577    
578     mon_nodes sub {
579     # if we lost the connection to a seed node, make sure we are seeding
580     seed_again
581     if !$_[1] && exists $NODE_SEED{$_[0]};
582     };
583    
584     #############################################################################
585     # talk with/to global nodes
586    
587 root 1.72 # protocol messages:
588     #
589 root 1.73 # sent by all slave nodes (slave to master)
590     # g_slave database - make other global node master of the sender
591 root 1.72 #
592 root 1.73 # sent by any node to global nodes
593     # g_set database - set whole database
594 root 1.89 # g_upd family set del - update single family
595 root 1.73 # g_del family key - delete key from database
596     # g_get family key reply... - send reply with data
597     #
598     # send by global nodes
599     # g_global - node became global, similar to global=1 greeting
600     #
601     # database families
602     # "'l" -> node -> listeners
603     # "'g" -> node -> undef
604     # ...
605 root 1.72 #
606    
607 root 1.73 # used on all nodes:
608 root 1.88 our $MASTER; # the global node we bind ourselves to
609 root 1.78 our $MASTER_MON;
610     our %LOCAL_DB; # this node database
611    
612     our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
613 root 1.71
614 root 1.84 our $GPROTO = 1;
615    
616     # tell everybody who connects our nproto
617     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
618     $_[0]{local_greeting}{gproto} = $GPROTO;
619     };
620    
621 root 1.73 #############################################################################
622     # master selection
623    
624     # master requests
625     our %GLOBAL_REQ; # $id => \@req
626 root 1.71
627 root 1.74 sub global_req_add {
628 root 1.80 my ($id, $req) = @_;
629 root 1.71
630 root 1.74 return if exists $GLOBAL_REQ{$id};
631    
632 root 1.80 $GLOBAL_REQ{$id} = $req;
633 root 1.71
634 root 1.80 snd $MASTER, @$req
635 root 1.73 if $MASTER;
636 root 1.74 }
637 root 1.71
638 root 1.74 sub global_req_del {
639     delete $GLOBAL_REQ{$_[0]};
640     }
641    
642 root 1.88 #################################
643     # master rpc
644    
645     our %GLOBAL_RES;
646     our $GLOBAL_RES_ID = "a";
647    
648     sub global_call {
649     my $id = ++$GLOBAL_RES_ID;
650     $GLOBAL_RES{$id} = pop;
651     global_req_add $id, [@_, $id];
652     }
653    
654     $NODE_REQ{g_reply} = sub {
655     my $id = shift;
656     global_req_del $id;
657     my $cb = delete $GLOBAL_RES{$id}
658     or return;
659     &$cb
660     };
661    
662     #################################
663    
664 root 1.74 sub g_find {
665 root 1.80 global_req_add "g_find $_[0]", [g_find => $_[0]];
666 root 1.73 }
667 root 1.71
668 root 1.73 # reply for g_find started in Node.pm
669 root 1.79 $NODE_REQ{g_found} = sub {
670 root 1.74 global_req_del "g_find $_[0]";
671    
672 root 1.73 my $node = $NODE{$_[0]} or return;
673 root 1.71
674 root 1.79 $node->connect_to ($_[1]);
675 root 1.71 };
676    
677 root 1.73 sub master_set {
678     $MASTER = $_[0];
679 root 1.71
680 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
681 root 1.71
682 root 1.73 # (re-)send queued requests
683     snd $MASTER, @$_
684     for values %GLOBAL_REQ;
685     }
686 root 1.71
687 root 1.72 sub master_search {
688 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
689     for (keys %NODE_SEED) {
690 root 1.72 if (node_is_up $_) {
691     master_set $_;
692     return;
693 root 1.71 }
694 root 1.72 }
695 root 1.71
696 root 1.78 $MASTER_MON = mon_nodes sub {
697 root 1.72 return unless $_[1]; # we are only interested in node-ups
698     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
699 root 1.71
700 root 1.72 master_set $_[0];
701 root 1.71
702 root 1.78 $MASTER_MON = mon_nodes sub {
703 root 1.72 if ($_[0] eq $MASTER && !$_[1]) {
704     undef $MASTER;
705     master_search ();
706     }
707 root 1.71 };
708 root 1.72 };
709 root 1.71 }
710    
711 root 1.73 # other node wants to make us the master
712 root 1.79 $NODE_REQ{g_slave} = sub {
713 root 1.73 my ($db) = @_;
714    
715 root 1.80 # load global module and redo the request
716 root 1.73 require AnyEvent::MP::Global;
717 root 1.80 &{ $NODE_REQ{g_slave} }
718 root 1.71 };
719    
720 root 1.73 #############################################################################
721 root 1.79 # local database operations
722 root 1.71
723 root 1.79 # local database management
724 root 1.88
725 root 1.87 sub db_set($$;$) {
726 root 1.89 # if (ref $_[1]) {
727     # # bulk
728     # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
729     # $LOCAL_DB{$_[0]} = $_[1];
730     # snd $MASTER, g_upd => $_[0] => $_[1], \@del
731     # if defined $MASTER;
732     # } else {
733     # single-key
734     $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
735     snd $MASTER, g_upd => $_[0] => { $_[1] => $_[2] }
736     if defined $MASTER;
737     # }
738 root 1.79 }
739    
740 root 1.89 sub db_del($@) {
741     my $family = shift;
742    
743     delete @{ $LOCAL_DB{$family} }{@_};
744     snd $MASTER, g_upd => $family => undef, \@_
745 root 1.79 if defined $MASTER;
746     }
747    
748     sub db_reg($$;$) {
749     my ($family, $key) = @_;
750     &db_set;
751     Guard::guard { db_del $family => $key }
752     }
753 root 1.71
754 root 1.88 # database query
755    
756     sub db_family {
757     my ($family, $cb) = @_;
758     global_call g_db_family => $family, $cb;
759     }
760    
761     sub db_keys {
762     my ($family, $cb) = @_;
763     global_call g_db_keys => $family, $cb;
764     }
765    
766     sub db_values {
767     my ($family, $cb) = @_;
768     global_call g_db_values => $family, $cb;
769 root 1.80 }
770    
771 root 1.88 # database monitoring
772 root 1.80
773 root 1.81 our %LOCAL_MON; # f, reply
774     our %MON_DB; # f, k, value
775 root 1.80
776 root 1.81 sub db_mon($@) {
777 root 1.84 my ($family, $cb) = @_;
778 root 1.81
779 root 1.84 if (my $db = $MON_DB{$family}) {
780 root 1.89 # we already monitor, so create a "dummy" change event
781     # this is postponed, which might be too late (we could process
782     # change events), so disable the callback at first
783     $LOCAL_MON{$family}{$cb+0} = sub { };
784     AE::postpone {
785     return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
786    
787     # set actual callback
788     $LOCAL_MON{$family}{$cb+0} = $cb;
789     $cb->($db, [keys %$db]);
790     };
791 root 1.81 } else {
792     # new monitor, request chg1 from upstream
793 root 1.89 $LOCAL_MON{$family}{$cb+0} = $cb;
794 root 1.81 global_req_add "mon1 $family" => [g_mon1 => $family];
795     $MON_DB{$family} = {};
796     }
797    
798 root 1.80 Guard::guard {
799 root 1.81 my $mon = $LOCAL_MON{$family};
800 root 1.84 delete $mon->{$cb+0};
801 root 1.81
802     unless (%$mon) {
803     global_req_del "mon1 $family";
804 root 1.80
805     # no global_req, because we don't care if we are not connected
806 root 1.81 snd $MASTER, g_mon0 => $family
807 root 1.80 if $MASTER;
808    
809 root 1.81 delete $LOCAL_MON{$family};
810     delete $MON_DB{$family};
811 root 1.80 }
812     }
813     }
814    
815 root 1.82 # full update
816 root 1.80 $NODE_REQ{g_chg1} = sub {
817 root 1.84 my ($f, $ndb) = @_;
818    
819     my $db = $MON_DB{$f};
820 root 1.89 my (@a, @c, @d);
821 root 1.81
822 root 1.82 # add or replace keys
823 root 1.84 while (my ($k, $v) = each %$ndb) {
824 root 1.89 exists $db->{$k}
825     ? push @c, $k
826     : push @a, $k;
827 root 1.84 $db->{$k} = $v;
828 root 1.82 }
829 root 1.81
830 root 1.82 # delete keys that are no longer present
831 root 1.84 for (grep !exists $ndb->{$_}, keys %$db) {
832     delete $db->{$_};
833 root 1.89 push @d, $_;
834 root 1.81 }
835 root 1.84
836 root 1.89 $_->($db, \@a, \@c, \@d)
837 root 1.84 for values %{ $LOCAL_MON{$_[0]} };
838 root 1.80 };
839    
840 root 1.82 # incremental update
841 root 1.84 $NODE_REQ{g_chg2} = sub {
842 root 1.89 my ($family, $set, $del) = @_;
843    
844     my $db = $MON_DB{$family};
845 root 1.84
846 root 1.89 my (@a, @c);
847 root 1.84
848 root 1.89 while (my ($k, $v) = each %$set) {
849     exists $db->{$k}
850     ? push @c, $k
851     : push @a, $k;
852     $db->{$k} = $v;
853     }
854    
855     delete @$db{@$del};
856    
857     $_->($db, \@a, \@c, $del)
858     for values %{ $LOCAL_MON{$family} };
859 root 1.84 };
860 root 1.80
861 root 1.69 #############################################################################
862     # configure
863    
864 root 1.81 sub nodename {
865 root 1.69 require POSIX;
866     (POSIX::uname ())[1]
867     }
868    
869     sub _resolve($) {
870     my ($nodeid) = @_;
871    
872     my $cv = AE::cv;
873     my @res;
874    
875     $cv->begin (sub {
876     my %seen;
877     my @refs;
878     for (sort { $a->[0] <=> $b->[0] } @res) {
879     push @refs, $_->[1] unless $seen{$_->[1]}++
880     }
881     shift->send (@refs);
882     });
883    
884     my $idx;
885     for my $t (split /,/, $nodeid) {
886     my $pri = ++$idx;
887    
888 root 1.81 $t = length $t ? nodename . ":$t" : nodename
889 root 1.69 if $t =~ /^\d*$/;
890    
891     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
892     or Carp::croak "$t: unparsable transport descriptor";
893    
894     $port = "0" if $port eq "*";
895    
896     if ($host eq "*") {
897     $cv->begin;
898     # use fork_call, as Net::Interface is big, and we need it rarely.
899     require AnyEvent::Util;
900     AnyEvent::Util::fork_call (
901     sub {
902     my @addr;
903    
904     require Net::Interface;
905    
906     for my $if (Net::Interface->interfaces) {
907     # we statically lower-prioritise ipv6 here, TODO :()
908     for $_ ($if->address (Net::Interface::AF_INET ())) {
909     next if /^\x7f/; # skip localhost etc.
910     push @addr, $_;
911     }
912     for ($if->address (Net::Interface::AF_INET6 ())) {
913     #next if $if->scope ($_) <= 2;
914     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
915     push @addr, $_;
916     }
917    
918     }
919     @addr
920     }, sub {
921     for my $ip (@_) {
922     push @res, [
923     $pri += 1e-5,
924     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
925     ];
926     }
927     $cv->end;
928     }
929     );
930     } else {
931     $cv->begin;
932     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
933     for (@_) {
934     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
935     push @res, [
936     $pri += 1e-5,
937     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
938     ];
939     }
940     $cv->end;
941     };
942     }
943     }
944    
945     $cv->end;
946    
947     $cv
948     }
949    
950     sub configure(@) {
951     unshift @_, "profile" if @_ & 1;
952     my (%kv) = @_;
953    
954     delete $NODE{$NODE}; # we do not support doing stuff before configure
955     _init_names;
956    
957     my $profile = delete $kv{profile};
958    
959 root 1.81 $profile = nodename
960 root 1.69 unless defined $profile;
961    
962     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
963    
964 root 1.83 if (exists $CONFIG->{secure}) {
965     my $pass = !$CONFIG->{secure};
966     $SECURE = sub { $pass };
967     }
968    
969 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
970 root 1.69
971     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
972    
973 root 1.72 $NODE = $node;
974 root 1.77
975 root 1.81 $NODE =~ s/%n/nodename/ge;
976    
977     if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
978 root 1.77 # nodes with randomised node names do not need randomised port names
979     $UNIQ = "";
980     }
981 root 1.69
982     $NODE{$NODE} = $NODE{""};
983     $NODE{$NODE}{id} = $NODE;
984    
985     my $seeds = $CONFIG->{seeds};
986     my $binds = $CONFIG->{binds};
987    
988     $binds ||= ["*"];
989    
990     $WARN->(8, "node $NODE starting up.");
991    
992 root 1.85 $BINDS = [];
993     %BINDS = ();
994 root 1.69
995     for (map _resolve $_, @$binds) {
996     for my $bind ($_->recv) {
997     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
998     or Carp::croak "$bind: unparsable local bind address";
999    
1000     my $listener = AnyEvent::MP::Transport::mp_server
1001     $host,
1002     $port,
1003     prepare => sub {
1004     my (undef, $host, $port) = @_;
1005     $bind = AnyEvent::Socket::format_hostport $host, $port;
1006     0
1007     },
1008     ;
1009 root 1.85 $BINDS{$bind} = $listener;
1010     push @$BINDS, $bind;
1011 root 1.69 }
1012     }
1013    
1014 root 1.85 db_set "'l" => $NODE => $BINDS;
1015 root 1.73
1016 root 1.85 $WARN->(8, "node listens on [@$BINDS].");
1017 root 1.69
1018     # connect to all seednodes
1019     set_seeds map $_->recv, map _resolve $_, @$seeds;
1020    
1021 root 1.73 master_search;
1022    
1023 root 1.71 if ($NODE eq "atha") {;#d#
1024 root 1.72 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
1025 root 1.71 }
1026    
1027 root 1.69 for (@{ $CONFIG->{services} }) {
1028     if (ref) {
1029     my ($func, @args) = @$_;
1030     (load_func $func)->(@args);
1031     } elsif (s/::$//) {
1032     eval "require $_";
1033     die $@ if $@;
1034     } else {
1035     (load_func $_)->();
1036     }
1037     }
1038     }
1039    
1040 root 1.1 =back
1041    
1042     =head1 SEE ALSO
1043    
1044     L<AnyEvent::MP>.
1045    
1046     =head1 AUTHOR
1047    
1048     Marc Lehmann <schmorp@schmorp.de>
1049     http://home.schmorp.de/
1050    
1051     =cut
1052    
1053     1
1054