ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.88
Committed: Thu Mar 8 21:37:51 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.87: +50 -9 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29    
30 root 1.79 use AnyEvent ();
31     use Guard ();
32 root 1.1
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.71 our @EXPORT_OK = qw(
39     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40     );
41    
42 root 1.1 our @EXPORT = qw(
43 root 1.21 add_node load_func snd_to_func snd_on eval_on
44 root 1.1
45 root 1.34 NODE $NODE node_of snd kil port_is_local
46     configure
47 root 1.46 up_nodes mon_nodes node_is_up
48 root 1.79 db_set db_del db_reg
49 root 1.84 db_mon db_family db_keys db_values
50 root 1.1 );
51    
52 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
53 root 1.1
54 root 1.27 This value is called with an error or warning message, when e.g. a
55     connection could not be created, authorisation failed and so on.
56    
57     It I<must not> block or send messages -queue it and use an idle watcher if
58     you need to do any of these things.
59 root 1.1
60 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
61 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62     node connectivity and services, C<8> for debugging messages and C<9> for
63     tracing messages.
64    
65 root 1.1 The default simply logs the message to STDERR.
66    
67 root 1.44 =item @AnyEvent::MP::Kernel::WARN
68    
69     All code references in this array are called for every log message, from
70     the default C<$WARN> handler. This is an easy way to tie into the log
71     messages without disturbing others.
72    
73 root 1.1 =cut
74    
75 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
76 root 1.44 our @WARN;
77     our $WARN = sub {
78     &$_ for @WARN;
79 root 1.29
80     return if $WARNLEVEL < $_[0];
81    
82 root 1.16 my ($level, $msg) = @_;
83    
84 root 1.1 $msg =~ s/\n$//;
85 root 1.16
86     printf STDERR "%s <%d> %s\n",
87     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
88     $level,
89     $msg;
90 root 1.1 };
91    
92 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
93    
94     The maximum level at which warning messages will be printed to STDERR by
95     the default warn handler.
96    
97     =cut
98    
99 root 1.6 sub load_func($) {
100     my $func = $_[0];
101    
102     unless (defined &$func) {
103     my $pkg = $func;
104     do {
105     $pkg =~ s/::[^:]+$//
106 root 1.63 or return sub { die "unable to resolve function '$func'" };
107 root 1.60
108     local $@;
109 root 1.61 unless (eval "require $pkg; 1") {
110     my $error = $@;
111     $error =~ /^Can't locate .*.pm in \@INC \(/
112     or return sub { die $error };
113     }
114 root 1.6 } until defined &$func;
115     }
116    
117     \&$func
118     }
119    
120 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
121    
122 root 1.1 sub nonce($) {
123 root 1.78 join "", map chr rand 256, 1 .. $_[0]
124 root 1.1 }
125    
126 root 1.78 sub nonce62($) {
127     join "", map $alnum[rand 62], 1 .. $_[0]
128 root 1.1 }
129    
130     sub gen_uniq {
131 root 1.78 my $now = AE::now;
132     (join "",
133     map $alnum[$_],
134     $$ / 62 % 62,
135     $$ % 62,
136     (int $now ) % 62,
137     (int $now * 100) % 62,
138     (int $now * 10000) % 62,
139     ) . nonce62 4;
140 root 1.1 }
141    
142 root 1.20 our $CONFIG; # this node's configuration
143 root 1.83 our $SECURE = sub { 1 };
144 root 1.21
145 root 1.64 our $RUNIQ; # remote uniq value
146     our $UNIQ; # per-process/node unique cookie
147     our $NODE;
148     our $ID = "a";
149 root 1.1
150     our %NODE; # node id to transport mapping, or "undef", for local node
151     our (%PORT, %PORT_DATA); # local ports
152    
153 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
154 root 1.1 our %LMON; # monitored _local_ ports
155    
156 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
157 root 1.85 our %BINDS;
158     our $BINDS; # our listeners, as arrayref
159 root 1.1
160 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
161 root 1.1
162 root 1.69 sub _init_names {
163 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
164     $UNIQ = gen_uniq;
165    
166     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
167     $RUNIQ = nonce62 10;
168     $RUNIQ =~ s/(.)$/\U$1/;
169    
170     $NODE = "anon/$RUNIQ";
171 root 1.64 }
172    
173 root 1.69 _init_names;
174 root 1.64
175 root 1.1 sub NODE() {
176     $NODE
177     }
178    
179     sub node_of($) {
180 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
181 root 1.1
182 root 1.21 $node
183 root 1.1 }
184    
185 root 1.17 BEGIN {
186     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
187     ? sub () { 1 }
188     : sub () { 0 };
189     }
190 root 1.1
191 root 1.42 our $DELAY_TIMER;
192     our @DELAY_QUEUE;
193    
194     sub _delay_run {
195 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
196 root 1.42 }
197    
198     sub delay($) {
199     push @DELAY_QUEUE, shift;
200     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
201     }
202    
203 root 1.1 sub _inject {
204 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
205 root 1.1 &{ $PORT{+shift} or return };
206     }
207    
208 root 1.20 # this function adds a node-ref, so you can send stuff to it
209     # it is basically the central routing component.
210 root 1.1 sub add_node {
211 root 1.21 my ($node) = @_;
212 root 1.1
213 root 1.71 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
214 root 1.13 }
215    
216 root 1.1 sub snd(@) {
217 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
218 root 1.1
219 root 1.86 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
220 root 1.1
221 root 1.49 defined $nodeid #d#UGLY
222     or Carp::croak "'undef' is not a valid node ID/port ID";
223    
224 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
225 root 1.2 ->{send} (["$portid", @_]);
226 root 1.1 }
227    
228 root 1.17 =item $is_local = port_is_local $port
229    
230     Returns true iff the port is a local port.
231    
232     =cut
233    
234     sub port_is_local($) {
235 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
236 root 1.17
237 root 1.21 $NODE{$nodeid} == $NODE{""}
238 root 1.17 }
239    
240 root 1.18 =item snd_to_func $node, $func, @args
241 root 1.11
242 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
243 root 1.11 this function with the given arguments on that node.
244    
245 root 1.20 This function can be used to implement C<spawn>-like interfaces.
246 root 1.11
247     =cut
248    
249 root 1.18 sub snd_to_func($$;@) {
250 root 1.21 my $nodeid = shift;
251 root 1.11
252 root 1.41 # on $NODE, we artificially delay... (for spawn)
253     # this is very ugly - maybe we should simply delay ALL messages,
254     # to avoid deep recursion issues. but that's so... slow...
255 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
256     if $nodeid ne $NODE;
257    
258 root 1.49 defined $nodeid #d#UGLY
259     or Carp::croak "'undef' is not a valid node ID/port ID";
260    
261 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
262 root 1.11 }
263    
264 root 1.18 =item snd_on $node, @msg
265    
266     Executes C<snd> with the given C<@msg> (which must include the destination
267     port) on the given node.
268    
269     =cut
270    
271     sub snd_on($@) {
272     my $node = shift;
273     snd $node, snd => @_;
274     }
275    
276 root 1.29 =item eval_on $node, $string[, @reply]
277 root 1.18
278 root 1.29 Evaluates the given string as Perl expression on the given node. When
279     @reply is specified, then it is used to construct a reply message with
280     C<"$@"> and any results from the eval appended.
281 root 1.18
282     =cut
283    
284 root 1.29 sub eval_on($$;@) {
285 root 1.18 my $node = shift;
286     snd $node, eval => @_;
287     }
288    
289 root 1.1 sub kil(@) {
290 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
291 root 1.1
292     length $portid
293 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
294 root 1.1
295 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
296 root 1.1 ->kill ("$portid", @_);
297     }
298    
299     #############################################################################
300 root 1.6 # node monitoring and info
301 root 1.3
302 root 1.21 =item node_is_known $nodeid
303 root 1.13
304 root 1.71 #TODO#
305     Returns true iff the given node is currently known to this node.
306 root 1.13
307     =cut
308    
309     sub node_is_known($) {
310     exists $NODE{$_[0]}
311     }
312    
313 root 1.21 =item node_is_up $nodeid
314 root 1.13
315     Returns true if the given node is "up", that is, the kernel thinks it has
316     a working connection to it.
317    
318 root 1.69 If the node is known (to this local node) but not currently connected,
319     returns C<0>. If the node is not known, returns C<undef>.
320 root 1.13
321     =cut
322    
323     sub node_is_up($) {
324     ($NODE{$_[0]} or return)->{transport}
325     ? 1 : 0
326     }
327    
328 root 1.3 =item up_nodes
329    
330 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
331     the node itself).
332 root 1.3
333     =cut
334    
335 root 1.49 sub up_nodes() {
336 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
337 root 1.3 }
338    
339 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
340 root 1.3
341 root 1.27 Registers a callback that is called each time a node goes up (a connection
342     is established) or down (the connection is lost).
343 root 1.3
344     Node up messages can only be followed by node down messages for the same
345     node, and vice versa.
346    
347 root 1.71 Note that monitoring a node is usually better done by monitoring its node
348 root 1.27 port. This function is mainly of interest to modules that are concerned
349     about the network topology and low-level connection handling.
350    
351     Callbacks I<must not> block and I<should not> send any messages.
352    
353     The function returns an optional guard which can be used to unregister
354 root 1.3 the monitoring callback again.
355    
356 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
357     or go up (and down).
358    
359     newnode $_, 1 for up_nodes;
360     mon_nodes \&newnode;
361    
362 root 1.3 =cut
363    
364     our %MON_NODES;
365    
366     sub mon_nodes($) {
367     my ($cb) = @_;
368    
369     $MON_NODES{$cb+0} = $cb;
370    
371 root 1.79 defined wantarray && Guard::guard { delete $MON_NODES{$cb+0} }
372 root 1.3 }
373    
374     sub _inject_nodeevent($$;@) {
375 root 1.16 my ($node, $up, @reason) = @_;
376 root 1.3
377     for my $cb (values %MON_NODES) {
378 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
379 root 1.16 or $WARN->(1, $@);
380 root 1.3 }
381 root 1.16
382 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
383 root 1.3 }
384    
385     #############################################################################
386 root 1.1 # self node code
387    
388 root 1.67 sub _kill {
389     my $port = shift;
390    
391     delete $PORT{$port}
392     or return; # killing nonexistent ports is O.K.
393     delete $PORT_DATA{$port};
394    
395     my $mon = delete $LMON{$port}
396     or !@_
397     or $WARN->(2, "unmonitored local port $port died with reason: @_");
398    
399     $_->(@_) for values %$mon;
400     }
401    
402     sub _monitor {
403     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
404     unless exists $PORT{$_[1]};
405    
406     $LMON{$_[1]}{$_[2]+0} = $_[2];
407     }
408    
409     sub _unmonitor {
410 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
411     if exists $LMON{$_[1]};
412 root 1.67 }
413    
414 root 1.83 sub _secure_check {
415     $SECURE->($SRCNODE->{id})
416 root 1.88 or $SRCNODE->{id} eq $NODE
417 root 1.83 or die "remote execution attempt by insecure node\n";
418     }
419    
420 root 1.79 our %NODE_REQ = (
421 root 1.1 # internal services
422    
423     # monitoring
424 root 1.65 mon0 => sub { # stop monitoring a port for another node
425 root 1.1 my $portid = shift;
426 root 1.67 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
427 root 1.1 },
428 root 1.65 mon1 => sub { # start monitoring a port for another node
429 root 1.1 my $portid = shift;
430 root 1.67 Scalar::Util::weaken (my $node = $SRCNODE);
431     _monitor undef, $portid, $node->{rmon}{$portid} = sub {
432 root 1.58 delete $node->{rmon}{$portid};
433 root 1.65 $node->send (["", kil0 => $portid, @_])
434 root 1.59 if $node && $node->{transport};
435 root 1.67 };
436 root 1.1 },
437 root 1.65 # another node has killed a monitored port
438     kil0 => sub {
439 root 1.1 my $cbs = delete $SRCNODE->{lmon}{+shift}
440     or return;
441    
442     $_->(@_) for @$cbs;
443     },
444    
445 root 1.18 # "public" services - not actually public
446 root 1.1
447 root 1.65 # another node wants to kill a local port
448 root 1.66 kil => \&_kill,
449 root 1.65
450 root 1.88 # is the remote node considered secure?
451     # secure => sub {
452     # #TODO#
453     # },
454    
455 root 1.1 # relay message to another node / generic echo
456 root 1.88 snd => sub {
457     &_secure_check;
458     &snd
459 root 1.1 },
460    
461 root 1.30 # random utilities
462 root 1.1 eval => sub {
463 root 1.83 &_secure_check;
464 root 1.50 my @res = do { package main; eval shift };
465 root 1.1 snd @_, "$@", @res if @_;
466     },
467     time => sub {
468 root 1.88 &_secure_check;
469 root 1.76 snd @_, AE::now;
470 root 1.1 },
471     devnull => sub {
472     #
473     },
474 root 1.15 "" => sub {
475 root 1.27 # empty messages are keepalives or similar devnull-applications
476 root 1.15 },
477 root 1.1 );
478    
479 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
480 root 1.1 $PORT{""} = sub {
481     my $tag = shift;
482 root 1.83 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
483     $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@;
484 root 1.1 };
485    
486 root 1.84 our $NPROTO = 1;
487    
488     # tell everybody who connects our nproto
489     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
490     $_[0]{local_greeting}{nproto} = $NPROTO;
491     };
492    
493 root 1.69 #############################################################################
494 root 1.71 # seed management, try to keep connections to all seeds at all times
495 root 1.69
496 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
497     our %NODE_SEED; # map node ID to seed ID
498 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
499     our $SEED_WATCHER;
500 root 1.71 our $SEED_RETRY;
501 root 1.69
502     sub seed_connect {
503     my ($seed) = @_;
504    
505     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
506     or Carp::croak "$seed: unparsable seed address";
507    
508     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
509    
510 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
511     $host, $port,
512     on_greeted => sub {
513     # called after receiving remote greeting, learn remote node name
514    
515     # we rely on untrusted data here (the remote node name) this is
516     # hopefully ok, as this can at most be used for DOSing, which is easy
517     # when you can do MITM anyway.
518    
519     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
520     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
521 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
522 root 1.71 delete $SEED_NODE{$seed};
523     delete $NODE_SEED{$seed};
524     } else {
525     $SEED_NODE{$seed} = $_[0]{remote_node};
526     $NODE_SEED{$_[0]{remote_node}} = $seed;
527     }
528     },
529     on_destroy => sub {
530     delete $SEED_CONNECT{$seed};
531     },
532 root 1.69 sub {
533     $SEED_CONNECT{$seed} = 1;
534 root 1.71 }
535 root 1.69 ;
536     }
537    
538     sub seed_all {
539     my @seeds = grep {
540     !exists $SEED_CONNECT{$_}
541     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
542     } keys %SEED_NODE;
543    
544     if (@seeds) {
545 root 1.70 # start connection attempt for every seed we are not connected to yet
546 root 1.69 seed_connect $_
547     for @seeds;
548 root 1.71
549     $SEED_RETRY = $SEED_RETRY * 2 + rand;
550     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
551     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
552    
553     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
554    
555 root 1.69 } else {
556 root 1.71 # all seeds connected or connecting, no need to restart timer
557 root 1.69 undef $SEED_WATCHER;
558     }
559     }
560    
561     sub seed_again {
562 root 1.71 $SEED_RETRY = 1;
563     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
564 root 1.69 }
565    
566     # sets new seed list, starts connecting
567     sub set_seeds(@) {
568     %SEED_NODE = ();
569 root 1.71 %NODE_SEED = ();
570     %SEED_CONNECT = ();
571    
572 root 1.69 @SEED_NODE{@_} = ();
573    
574 root 1.71 seed_again;#d#
575     seed_all;
576     }
577    
578     mon_nodes sub {
579     # if we lost the connection to a seed node, make sure we are seeding
580     seed_again
581     if !$_[1] && exists $NODE_SEED{$_[0]};
582     };
583    
584     #############################################################################
585     # talk with/to global nodes
586    
587 root 1.72 # protocol messages:
588     #
589 root 1.73 # sent by all slave nodes (slave to master)
590     # g_slave database - make other global node master of the sender
591 root 1.72 #
592 root 1.73 # sent by any node to global nodes
593     # g_set database - set whole database
594     # g_add family key val - add/replace key to database
595     # g_del family key - delete key from database
596     # g_get family key reply... - send reply with data
597     #
598     # send by global nodes
599     # g_global - node became global, similar to global=1 greeting
600     #
601     # database families
602     # "'l" -> node -> listeners
603     # "'g" -> node -> undef
604     # ...
605 root 1.72 #
606    
607 root 1.73 # used on all nodes:
608 root 1.88 our $MASTER; # the global node we bind ourselves to
609 root 1.78 our $MASTER_MON;
610     our %LOCAL_DB; # this node database
611    
612     our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
613 root 1.71
614 root 1.84 our $GPROTO = 1;
615    
616     # tell everybody who connects our nproto
617     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
618     $_[0]{local_greeting}{gproto} = $GPROTO;
619     };
620    
621 root 1.73 #############################################################################
622     # master selection
623    
624     # master requests
625     our %GLOBAL_REQ; # $id => \@req
626 root 1.71
627 root 1.74 sub global_req_add {
628 root 1.80 my ($id, $req) = @_;
629 root 1.71
630 root 1.74 return if exists $GLOBAL_REQ{$id};
631    
632 root 1.80 $GLOBAL_REQ{$id} = $req;
633 root 1.71
634 root 1.80 snd $MASTER, @$req
635 root 1.73 if $MASTER;
636 root 1.74 }
637 root 1.71
638 root 1.74 sub global_req_del {
639     delete $GLOBAL_REQ{$_[0]};
640     }
641    
642 root 1.88 #################################
643     # master rpc
644    
645     our %GLOBAL_RES;
646     our $GLOBAL_RES_ID = "a";
647    
648     sub global_call {
649     my $id = ++$GLOBAL_RES_ID;
650     $GLOBAL_RES{$id} = pop;
651     global_req_add $id, [@_, $id];
652     }
653    
654     $NODE_REQ{g_reply} = sub {
655     my $id = shift;
656     global_req_del $id;
657     my $cb = delete $GLOBAL_RES{$id}
658     or return;
659     &$cb
660     };
661    
662     #################################
663    
664 root 1.74 sub g_find {
665 root 1.80 global_req_add "g_find $_[0]", [g_find => $_[0]];
666 root 1.73 }
667 root 1.71
668 root 1.73 # reply for g_find started in Node.pm
669 root 1.79 $NODE_REQ{g_found} = sub {
670 root 1.74 global_req_del "g_find $_[0]";
671    
672 root 1.73 my $node = $NODE{$_[0]} or return;
673 root 1.71
674 root 1.79 $node->connect_to ($_[1]);
675 root 1.71 };
676    
677 root 1.73 sub master_set {
678     $MASTER = $_[0];
679 root 1.71
680 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
681 root 1.71
682 root 1.73 # (re-)send queued requests
683     snd $MASTER, @$_
684     for values %GLOBAL_REQ;
685     }
686 root 1.71
687 root 1.72 sub master_search {
688 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
689     for (keys %NODE_SEED) {
690 root 1.72 if (node_is_up $_) {
691     master_set $_;
692     return;
693 root 1.71 }
694 root 1.72 }
695 root 1.71
696 root 1.78 $MASTER_MON = mon_nodes sub {
697 root 1.72 return unless $_[1]; # we are only interested in node-ups
698     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
699 root 1.71
700 root 1.72 master_set $_[0];
701 root 1.71
702 root 1.78 $MASTER_MON = mon_nodes sub {
703 root 1.72 if ($_[0] eq $MASTER && !$_[1]) {
704     undef $MASTER;
705     master_search ();
706     }
707 root 1.71 };
708 root 1.72 };
709 root 1.71 }
710    
711 root 1.73 # other node wants to make us the master
712 root 1.79 $NODE_REQ{g_slave} = sub {
713 root 1.73 my ($db) = @_;
714    
715 root 1.80 # load global module and redo the request
716 root 1.73 require AnyEvent::MP::Global;
717 root 1.80 &{ $NODE_REQ{g_slave} }
718 root 1.71 };
719    
720 root 1.73 #############################################################################
721 root 1.79 # local database operations
722 root 1.71
723 root 1.79 # local database management
724 root 1.88
725 root 1.87 sub db_set($$;$) {
726 root 1.79 $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
727     snd $MASTER, g_add => $_[0] => $_[1] => $_[2]
728     if defined $MASTER;
729     }
730    
731     sub db_del($$) {
732     delete $LOCAL_DB{$_[0]}{$_[1]};
733     snd $MASTER, g_del => $_[0] => $_[1]
734     if defined $MASTER;
735     }
736    
737     sub db_reg($$;$) {
738     my ($family, $key) = @_;
739     &db_set;
740     Guard::guard { db_del $family => $key }
741     }
742 root 1.71
743 root 1.88 # database query
744    
745     sub db_family {
746     my ($family, $cb) = @_;
747     global_call g_db_family => $family, $cb;
748     }
749    
750     sub db_keys {
751     my ($family, $cb) = @_;
752     global_call g_db_keys => $family, $cb;
753     }
754    
755     sub db_values {
756     my ($family, $cb) = @_;
757     global_call g_db_values => $family, $cb;
758 root 1.80 }
759    
760 root 1.88 # database monitoring
761 root 1.80
762 root 1.81 our %LOCAL_MON; # f, reply
763     our %MON_DB; # f, k, value
764 root 1.80
765 root 1.81 sub db_mon($@) {
766 root 1.84 my ($family, $cb) = @_;
767 root 1.81
768 root 1.84 if (my $db = $MON_DB{$family}) {
769 root 1.81 # if we already monitor this thingy, generate
770     # create events for all of them
771 root 1.84 $cb->($db, [keys %$db]);
772 root 1.81 } else {
773     # new monitor, request chg1 from upstream
774     global_req_add "mon1 $family" => [g_mon1 => $family];
775     $MON_DB{$family} = {};
776     }
777    
778 root 1.84 $LOCAL_MON{$family}{$cb+0} = $cb;
779 root 1.80
780     Guard::guard {
781 root 1.81 my $mon = $LOCAL_MON{$family};
782 root 1.84 delete $mon->{$cb+0};
783 root 1.81
784     unless (%$mon) {
785     global_req_del "mon1 $family";
786 root 1.80
787     # no global_req, because we don't care if we are not connected
788 root 1.81 snd $MASTER, g_mon0 => $family
789 root 1.80 if $MASTER;
790    
791 root 1.81 delete $LOCAL_MON{$family};
792     delete $MON_DB{$family};
793 root 1.80 }
794     }
795     }
796    
797 root 1.82 # full update
798 root 1.80 $NODE_REQ{g_chg1} = sub {
799 root 1.84 my ($f, $ndb) = @_;
800    
801     my $db = $MON_DB{$f};
802     my @k;
803 root 1.81
804 root 1.82 # add or replace keys
805 root 1.84 while (my ($k, $v) = each %$ndb) {
806     $db->{$k} = $v;
807     push @k, $k;
808 root 1.82 }
809 root 1.81
810 root 1.82 # delete keys that are no longer present
811 root 1.84 for (grep !exists $ndb->{$_}, keys %$db) {
812     delete $db->{$_};
813     push @k, $_;
814 root 1.81 }
815 root 1.84
816     $_->($db, \@k)
817     for values %{ $LOCAL_MON{$_[0]} };
818 root 1.80 };
819    
820 root 1.82 # incremental update
821 root 1.84 $NODE_REQ{g_chg2} = sub {
822     my $db = $MON_DB{$_[0]};
823    
824     @_ >= 3
825     ? $db->{$_[1]} = $_[2]
826     : delete $db->{$_[1]};
827    
828     $_->($db, [$_[1]])
829     for values %{ $LOCAL_MON{$_[0]} };
830     };
831 root 1.80
832 root 1.69 #############################################################################
833     # configure
834    
835 root 1.81 sub nodename {
836 root 1.69 require POSIX;
837     (POSIX::uname ())[1]
838     }
839    
840     sub _resolve($) {
841     my ($nodeid) = @_;
842    
843     my $cv = AE::cv;
844     my @res;
845    
846     $cv->begin (sub {
847     my %seen;
848     my @refs;
849     for (sort { $a->[0] <=> $b->[0] } @res) {
850     push @refs, $_->[1] unless $seen{$_->[1]}++
851     }
852     shift->send (@refs);
853     });
854    
855     my $idx;
856     for my $t (split /,/, $nodeid) {
857     my $pri = ++$idx;
858    
859 root 1.81 $t = length $t ? nodename . ":$t" : nodename
860 root 1.69 if $t =~ /^\d*$/;
861    
862     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
863     or Carp::croak "$t: unparsable transport descriptor";
864    
865     $port = "0" if $port eq "*";
866    
867     if ($host eq "*") {
868     $cv->begin;
869     # use fork_call, as Net::Interface is big, and we need it rarely.
870     require AnyEvent::Util;
871     AnyEvent::Util::fork_call (
872     sub {
873     my @addr;
874    
875     require Net::Interface;
876    
877     for my $if (Net::Interface->interfaces) {
878     # we statically lower-prioritise ipv6 here, TODO :()
879     for $_ ($if->address (Net::Interface::AF_INET ())) {
880     next if /^\x7f/; # skip localhost etc.
881     push @addr, $_;
882     }
883     for ($if->address (Net::Interface::AF_INET6 ())) {
884     #next if $if->scope ($_) <= 2;
885     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
886     push @addr, $_;
887     }
888    
889     }
890     @addr
891     }, sub {
892     for my $ip (@_) {
893     push @res, [
894     $pri += 1e-5,
895     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
896     ];
897     }
898     $cv->end;
899     }
900     );
901     } else {
902     $cv->begin;
903     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
904     for (@_) {
905     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
906     push @res, [
907     $pri += 1e-5,
908     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
909     ];
910     }
911     $cv->end;
912     };
913     }
914     }
915    
916     $cv->end;
917    
918     $cv
919     }
920    
921     sub configure(@) {
922     unshift @_, "profile" if @_ & 1;
923     my (%kv) = @_;
924    
925     delete $NODE{$NODE}; # we do not support doing stuff before configure
926     _init_names;
927    
928     my $profile = delete $kv{profile};
929    
930 root 1.81 $profile = nodename
931 root 1.69 unless defined $profile;
932    
933     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
934    
935 root 1.83 if (exists $CONFIG->{secure}) {
936     my $pass = !$CONFIG->{secure};
937     $SECURE = sub { $pass };
938     }
939    
940 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
941 root 1.69
942     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
943    
944 root 1.72 $NODE = $node;
945 root 1.77
946 root 1.81 $NODE =~ s/%n/nodename/ge;
947    
948     if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
949 root 1.77 # nodes with randomised node names do not need randomised port names
950     $UNIQ = "";
951     }
952 root 1.69
953     $NODE{$NODE} = $NODE{""};
954     $NODE{$NODE}{id} = $NODE;
955    
956     my $seeds = $CONFIG->{seeds};
957     my $binds = $CONFIG->{binds};
958    
959     $binds ||= ["*"];
960    
961     $WARN->(8, "node $NODE starting up.");
962    
963 root 1.85 $BINDS = [];
964     %BINDS = ();
965 root 1.69
966     for (map _resolve $_, @$binds) {
967     for my $bind ($_->recv) {
968     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
969     or Carp::croak "$bind: unparsable local bind address";
970    
971     my $listener = AnyEvent::MP::Transport::mp_server
972     $host,
973     $port,
974     prepare => sub {
975     my (undef, $host, $port) = @_;
976     $bind = AnyEvent::Socket::format_hostport $host, $port;
977     0
978     },
979     ;
980 root 1.85 $BINDS{$bind} = $listener;
981     push @$BINDS, $bind;
982 root 1.69 }
983     }
984    
985 root 1.85 db_set "'l" => $NODE => $BINDS;
986 root 1.73
987 root 1.85 $WARN->(8, "node listens on [@$BINDS].");
988 root 1.69
989     # connect to all seednodes
990     set_seeds map $_->recv, map _resolve $_, @$seeds;
991    
992 root 1.73 master_search;
993    
994 root 1.71 if ($NODE eq "atha") {;#d#
995 root 1.72 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
996 root 1.71 }
997    
998 root 1.69 for (@{ $CONFIG->{services} }) {
999     if (ref) {
1000     my ($func, @args) = @$_;
1001     (load_func $func)->(@args);
1002     } elsif (s/::$//) {
1003     eval "require $_";
1004     die $@ if $@;
1005     } else {
1006     (load_func $_)->();
1007     }
1008     }
1009     }
1010    
1011 root 1.1 =back
1012    
1013     =head1 SEE ALSO
1014    
1015     L<AnyEvent::MP>.
1016    
1017     =head1 AUTHOR
1018    
1019     Marc Lehmann <schmorp@schmorp.de>
1020     http://home.schmorp.de/
1021    
1022     =cut
1023    
1024     1
1025