ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.86
Committed: Sun Mar 4 18:48:27 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.85: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29    
30 root 1.79 use AnyEvent ();
31     use Guard ();
32 root 1.1
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.71 our @EXPORT_OK = qw(
39     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40     );
41    
42 root 1.1 our @EXPORT = qw(
43 root 1.21 add_node load_func snd_to_func snd_on eval_on
44 root 1.1
45 root 1.34 NODE $NODE node_of snd kil port_is_local
46     configure
47 root 1.46 up_nodes mon_nodes node_is_up
48 root 1.79 db_set db_del db_reg
49 root 1.84 db_mon db_family db_keys db_values
50 root 1.1 );
51    
52 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
53 root 1.1
54 root 1.27 This value is called with an error or warning message, when e.g. a
55     connection could not be created, authorisation failed and so on.
56    
57     It I<must not> block or send messages -queue it and use an idle watcher if
58     you need to do any of these things.
59 root 1.1
60 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
61 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62     node connectivity and services, C<8> for debugging messages and C<9> for
63     tracing messages.
64    
65 root 1.1 The default simply logs the message to STDERR.
66    
67 root 1.44 =item @AnyEvent::MP::Kernel::WARN
68    
69     All code references in this array are called for every log message, from
70     the default C<$WARN> handler. This is an easy way to tie into the log
71     messages without disturbing others.
72    
73 root 1.1 =cut
74    
75 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
76 root 1.44 our @WARN;
77     our $WARN = sub {
78     &$_ for @WARN;
79 root 1.29
80     return if $WARNLEVEL < $_[0];
81    
82 root 1.16 my ($level, $msg) = @_;
83    
84 root 1.1 $msg =~ s/\n$//;
85 root 1.16
86     printf STDERR "%s <%d> %s\n",
87     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
88     $level,
89     $msg;
90 root 1.1 };
91    
92 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
93    
94     The maximum level at which warning messages will be printed to STDERR by
95     the default warn handler.
96    
97     =cut
98    
99 root 1.6 sub load_func($) {
100     my $func = $_[0];
101    
102     unless (defined &$func) {
103     my $pkg = $func;
104     do {
105     $pkg =~ s/::[^:]+$//
106 root 1.63 or return sub { die "unable to resolve function '$func'" };
107 root 1.60
108     local $@;
109 root 1.61 unless (eval "require $pkg; 1") {
110     my $error = $@;
111     $error =~ /^Can't locate .*.pm in \@INC \(/
112     or return sub { die $error };
113     }
114 root 1.6 } until defined &$func;
115     }
116    
117     \&$func
118     }
119    
120 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
121    
122 root 1.1 sub nonce($) {
123 root 1.78 join "", map chr rand 256, 1 .. $_[0]
124 root 1.1 }
125    
126 root 1.78 sub nonce62($) {
127     join "", map $alnum[rand 62], 1 .. $_[0]
128 root 1.1 }
129    
130     sub gen_uniq {
131 root 1.78 my $now = AE::now;
132     (join "",
133     map $alnum[$_],
134     $$ / 62 % 62,
135     $$ % 62,
136     (int $now ) % 62,
137     (int $now * 100) % 62,
138     (int $now * 10000) % 62,
139     ) . nonce62 4;
140 root 1.1 }
141    
142 root 1.20 our $CONFIG; # this node's configuration
143 root 1.83 our $SECURE = sub { 1 };
144 root 1.21
145 root 1.64 our $RUNIQ; # remote uniq value
146     our $UNIQ; # per-process/node unique cookie
147     our $NODE;
148     our $ID = "a";
149 root 1.1
150     our %NODE; # node id to transport mapping, or "undef", for local node
151     our (%PORT, %PORT_DATA); # local ports
152    
153 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
154 root 1.1 our %LMON; # monitored _local_ ports
155    
156 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
157 root 1.85 our %BINDS;
158     our $BINDS; # our listeners, as arrayref
159 root 1.1
160 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
161 root 1.1
162 root 1.69 sub _init_names {
163 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
164     $UNIQ = gen_uniq;
165    
166     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
167     $RUNIQ = nonce62 10;
168     $RUNIQ =~ s/(.)$/\U$1/;
169    
170     $NODE = "anon/$RUNIQ";
171 root 1.64 }
172    
173 root 1.69 _init_names;
174 root 1.64
175 root 1.1 sub NODE() {
176     $NODE
177     }
178    
179     sub node_of($) {
180 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
181 root 1.1
182 root 1.21 $node
183 root 1.1 }
184    
185 root 1.17 BEGIN {
186     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
187     ? sub () { 1 }
188     : sub () { 0 };
189     }
190 root 1.1
191 root 1.42 our $DELAY_TIMER;
192     our @DELAY_QUEUE;
193    
194     sub _delay_run {
195 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
196 root 1.42 }
197    
198     sub delay($) {
199     push @DELAY_QUEUE, shift;
200     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
201     }
202    
203 root 1.1 sub _inject {
204 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
205 root 1.1 &{ $PORT{+shift} or return };
206     }
207    
208 root 1.20 # this function adds a node-ref, so you can send stuff to it
209     # it is basically the central routing component.
210 root 1.1 sub add_node {
211 root 1.21 my ($node) = @_;
212 root 1.1
213 root 1.71 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
214 root 1.13 }
215    
216 root 1.1 sub snd(@) {
217 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
218 root 1.1
219 root 1.86 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
220 root 1.1
221 root 1.49 defined $nodeid #d#UGLY
222     or Carp::croak "'undef' is not a valid node ID/port ID";
223    
224 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
225 root 1.2 ->{send} (["$portid", @_]);
226 root 1.1 }
227    
228 root 1.17 =item $is_local = port_is_local $port
229    
230     Returns true iff the port is a local port.
231    
232     =cut
233    
234     sub port_is_local($) {
235 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
236 root 1.17
237 root 1.21 $NODE{$nodeid} == $NODE{""}
238 root 1.17 }
239    
240 root 1.18 =item snd_to_func $node, $func, @args
241 root 1.11
242 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
243 root 1.11 this function with the given arguments on that node.
244    
245 root 1.20 This function can be used to implement C<spawn>-like interfaces.
246 root 1.11
247     =cut
248    
249 root 1.18 sub snd_to_func($$;@) {
250 root 1.21 my $nodeid = shift;
251 root 1.11
252 root 1.41 # on $NODE, we artificially delay... (for spawn)
253     # this is very ugly - maybe we should simply delay ALL messages,
254     # to avoid deep recursion issues. but that's so... slow...
255 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
256     if $nodeid ne $NODE;
257    
258 root 1.49 defined $nodeid #d#UGLY
259     or Carp::croak "'undef' is not a valid node ID/port ID";
260    
261 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
262 root 1.11 }
263    
264 root 1.18 =item snd_on $node, @msg
265    
266     Executes C<snd> with the given C<@msg> (which must include the destination
267     port) on the given node.
268    
269     =cut
270    
271     sub snd_on($@) {
272     my $node = shift;
273     snd $node, snd => @_;
274     }
275    
276 root 1.29 =item eval_on $node, $string[, @reply]
277 root 1.18
278 root 1.29 Evaluates the given string as Perl expression on the given node. When
279     @reply is specified, then it is used to construct a reply message with
280     C<"$@"> and any results from the eval appended.
281 root 1.18
282     =cut
283    
284 root 1.29 sub eval_on($$;@) {
285 root 1.18 my $node = shift;
286     snd $node, eval => @_;
287     }
288    
289 root 1.1 sub kil(@) {
290 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
291 root 1.1
292     length $portid
293 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
294 root 1.1
295 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
296 root 1.1 ->kill ("$portid", @_);
297     }
298    
299     #############################################################################
300 root 1.6 # node monitoring and info
301 root 1.3
302 root 1.21 =item node_is_known $nodeid
303 root 1.13
304 root 1.71 #TODO#
305     Returns true iff the given node is currently known to this node.
306 root 1.13
307     =cut
308    
309     sub node_is_known($) {
310     exists $NODE{$_[0]}
311     }
312    
313 root 1.21 =item node_is_up $nodeid
314 root 1.13
315     Returns true if the given node is "up", that is, the kernel thinks it has
316     a working connection to it.
317    
318 root 1.69 If the node is known (to this local node) but not currently connected,
319     returns C<0>. If the node is not known, returns C<undef>.
320 root 1.13
321     =cut
322    
323     sub node_is_up($) {
324     ($NODE{$_[0]} or return)->{transport}
325     ? 1 : 0
326     }
327    
328 root 1.3 =item up_nodes
329    
330 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
331     the node itself).
332 root 1.3
333     =cut
334    
335 root 1.49 sub up_nodes() {
336 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
337 root 1.3 }
338    
339 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
340 root 1.3
341 root 1.27 Registers a callback that is called each time a node goes up (a connection
342     is established) or down (the connection is lost).
343 root 1.3
344     Node up messages can only be followed by node down messages for the same
345     node, and vice versa.
346    
347 root 1.71 Note that monitoring a node is usually better done by monitoring its node
348 root 1.27 port. This function is mainly of interest to modules that are concerned
349     about the network topology and low-level connection handling.
350    
351     Callbacks I<must not> block and I<should not> send any messages.
352    
353     The function returns an optional guard which can be used to unregister
354 root 1.3 the monitoring callback again.
355    
356 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
357     or go up (and down).
358    
359     newnode $_, 1 for up_nodes;
360     mon_nodes \&newnode;
361    
362 root 1.3 =cut
363    
364     our %MON_NODES;
365    
366     sub mon_nodes($) {
367     my ($cb) = @_;
368    
369     $MON_NODES{$cb+0} = $cb;
370    
371 root 1.79 defined wantarray && Guard::guard { delete $MON_NODES{$cb+0} }
372 root 1.3 }
373    
374     sub _inject_nodeevent($$;@) {
375 root 1.16 my ($node, $up, @reason) = @_;
376 root 1.3
377     for my $cb (values %MON_NODES) {
378 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
379 root 1.16 or $WARN->(1, $@);
380 root 1.3 }
381 root 1.16
382 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
383 root 1.3 }
384    
385     #############################################################################
386 root 1.1 # self node code
387    
388 root 1.67 sub _kill {
389     my $port = shift;
390    
391     delete $PORT{$port}
392     or return; # killing nonexistent ports is O.K.
393     delete $PORT_DATA{$port};
394    
395     my $mon = delete $LMON{$port}
396     or !@_
397     or $WARN->(2, "unmonitored local port $port died with reason: @_");
398    
399     $_->(@_) for values %$mon;
400     }
401    
402     sub _monitor {
403     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
404     unless exists $PORT{$_[1]};
405    
406     $LMON{$_[1]}{$_[2]+0} = $_[2];
407     }
408    
409     sub _unmonitor {
410 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
411     if exists $LMON{$_[1]};
412 root 1.67 }
413    
414 root 1.83 sub _secure_check {
415     $SECURE->($SRCNODE->{id})
416     or die "remote execution attempt by insecure node\n";
417     }
418    
419 root 1.79 our %NODE_REQ = (
420 root 1.1 # internal services
421    
422     # monitoring
423 root 1.65 mon0 => sub { # stop monitoring a port for another node
424 root 1.1 my $portid = shift;
425 root 1.67 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
426 root 1.1 },
427 root 1.65 mon1 => sub { # start monitoring a port for another node
428 root 1.1 my $portid = shift;
429 root 1.67 Scalar::Util::weaken (my $node = $SRCNODE);
430     _monitor undef, $portid, $node->{rmon}{$portid} = sub {
431 root 1.58 delete $node->{rmon}{$portid};
432 root 1.65 $node->send (["", kil0 => $portid, @_])
433 root 1.59 if $node && $node->{transport};
434 root 1.67 };
435 root 1.1 },
436 root 1.65 # another node has killed a monitored port
437     kil0 => sub {
438 root 1.1 my $cbs = delete $SRCNODE->{lmon}{+shift}
439     or return;
440    
441     $_->(@_) for @$cbs;
442     },
443    
444 root 1.18 # "public" services - not actually public
445 root 1.1
446 root 1.65 # another node wants to kill a local port
447 root 1.66 kil => \&_kill,
448 root 1.65
449 root 1.1 # relay message to another node / generic echo
450 root 1.15 snd => \&snd,
451 root 1.27 snd_multiple => sub {
452 root 1.1 snd @$_ for @_
453     },
454    
455 root 1.30 # random utilities
456 root 1.1 eval => sub {
457 root 1.83 &_secure_check;
458 root 1.50 my @res = do { package main; eval shift };
459 root 1.1 snd @_, "$@", @res if @_;
460     },
461     time => sub {
462 root 1.76 snd @_, AE::now;
463 root 1.1 },
464     devnull => sub {
465     #
466     },
467 root 1.15 "" => sub {
468 root 1.27 # empty messages are keepalives or similar devnull-applications
469 root 1.15 },
470 root 1.1 );
471    
472 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
473 root 1.1 $PORT{""} = sub {
474     my $tag = shift;
475 root 1.83 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
476     $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@;
477 root 1.1 };
478    
479 root 1.84 our $NPROTO = 1;
480    
481     # tell everybody who connects our nproto
482     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
483     $_[0]{local_greeting}{nproto} = $NPROTO;
484     };
485    
486 root 1.69 #############################################################################
487 root 1.71 # seed management, try to keep connections to all seeds at all times
488 root 1.69
489 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
490     our %NODE_SEED; # map node ID to seed ID
491 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
492     our $SEED_WATCHER;
493 root 1.71 our $SEED_RETRY;
494 root 1.69
495     sub seed_connect {
496     my ($seed) = @_;
497    
498     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
499     or Carp::croak "$seed: unparsable seed address";
500    
501     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
502    
503 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
504     $host, $port,
505     on_greeted => sub {
506     # called after receiving remote greeting, learn remote node name
507    
508     # we rely on untrusted data here (the remote node name) this is
509     # hopefully ok, as this can at most be used for DOSing, which is easy
510     # when you can do MITM anyway.
511    
512     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
513     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
514 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
515 root 1.71 delete $SEED_NODE{$seed};
516     delete $NODE_SEED{$seed};
517     } else {
518     $SEED_NODE{$seed} = $_[0]{remote_node};
519     $NODE_SEED{$_[0]{remote_node}} = $seed;
520     }
521     },
522     on_destroy => sub {
523     delete $SEED_CONNECT{$seed};
524     },
525 root 1.69 sub {
526     $SEED_CONNECT{$seed} = 1;
527 root 1.71 }
528 root 1.69 ;
529     }
530    
531     sub seed_all {
532     my @seeds = grep {
533     !exists $SEED_CONNECT{$_}
534     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
535     } keys %SEED_NODE;
536    
537     if (@seeds) {
538 root 1.70 # start connection attempt for every seed we are not connected to yet
539 root 1.69 seed_connect $_
540     for @seeds;
541 root 1.71
542     $SEED_RETRY = $SEED_RETRY * 2 + rand;
543     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
544     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
545    
546     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
547    
548 root 1.69 } else {
549 root 1.71 # all seeds connected or connecting, no need to restart timer
550 root 1.69 undef $SEED_WATCHER;
551     }
552     }
553    
554     sub seed_again {
555 root 1.71 $SEED_RETRY = 1;
556     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
557 root 1.69 }
558    
559     # sets new seed list, starts connecting
560     sub set_seeds(@) {
561     %SEED_NODE = ();
562 root 1.71 %NODE_SEED = ();
563     %SEED_CONNECT = ();
564    
565 root 1.69 @SEED_NODE{@_} = ();
566    
567 root 1.71 seed_again;#d#
568     seed_all;
569     }
570    
571     mon_nodes sub {
572     # if we lost the connection to a seed node, make sure we are seeding
573     seed_again
574     if !$_[1] && exists $NODE_SEED{$_[0]};
575     };
576    
577     #############################################################################
578     # talk with/to global nodes
579    
580 root 1.72 # protocol messages:
581     #
582 root 1.73 # sent by all slave nodes (slave to master)
583     # g_slave database - make other global node master of the sender
584 root 1.72 #
585 root 1.73 # sent by any node to global nodes
586     # g_set database - set whole database
587     # g_add family key val - add/replace key to database
588     # g_del family key - delete key from database
589     # g_get family key reply... - send reply with data
590     #
591     # send by global nodes
592     # g_global - node became global, similar to global=1 greeting
593     #
594     # database families
595     # "'l" -> node -> listeners
596     # "'g" -> node -> undef
597     # ...
598 root 1.72 #
599    
600 root 1.73 # used on all nodes:
601 root 1.78 our $MASTER; # the global node we bind ourselves to, unless we are global ourselves
602     our $MASTER_MON;
603     our %LOCAL_DB; # this node database
604    
605     our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
606 root 1.71
607 root 1.84 our $GPROTO = 1;
608    
609     # tell everybody who connects our nproto
610     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
611     $_[0]{local_greeting}{gproto} = $GPROTO;
612     };
613    
614 root 1.73 #############################################################################
615     # master selection
616    
617     # master requests
618     our %GLOBAL_REQ; # $id => \@req
619 root 1.71
620 root 1.74 sub global_req_add {
621 root 1.80 my ($id, $req) = @_;
622 root 1.71
623 root 1.74 return if exists $GLOBAL_REQ{$id};
624    
625 root 1.80 $GLOBAL_REQ{$id} = $req;
626 root 1.71
627 root 1.80 snd $MASTER, @$req
628 root 1.73 if $MASTER;
629 root 1.74 }
630 root 1.71
631 root 1.74 sub global_req_del {
632     delete $GLOBAL_REQ{$_[0]};
633     }
634    
635     sub g_find {
636 root 1.80 global_req_add "g_find $_[0]", [g_find => $_[0]];
637 root 1.73 }
638 root 1.71
639 root 1.73 # reply for g_find started in Node.pm
640 root 1.79 $NODE_REQ{g_found} = sub {
641 root 1.74 global_req_del "g_find $_[0]";
642    
643 root 1.73 my $node = $NODE{$_[0]} or return;
644 root 1.71
645 root 1.79 $node->connect_to ($_[1]);
646 root 1.71 };
647    
648 root 1.73 sub master_set {
649     $MASTER = $_[0];
650 root 1.71
651 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
652 root 1.71
653 root 1.73 # (re-)send queued requests
654     snd $MASTER, @$_
655     for values %GLOBAL_REQ;
656     }
657 root 1.71
658 root 1.72 sub master_search {
659 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
660     for (keys %NODE_SEED) {
661 root 1.72 if (node_is_up $_) {
662     master_set $_;
663     return;
664 root 1.71 }
665 root 1.72 }
666 root 1.71
667 root 1.78 $MASTER_MON = mon_nodes sub {
668 root 1.72 return unless $_[1]; # we are only interested in node-ups
669     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
670 root 1.71
671 root 1.72 master_set $_[0];
672 root 1.71
673 root 1.78 $MASTER_MON = mon_nodes sub {
674 root 1.72 if ($_[0] eq $MASTER && !$_[1]) {
675     undef $MASTER;
676     master_search ();
677     }
678 root 1.71 };
679 root 1.72 };
680 root 1.71 }
681    
682 root 1.73 # other node wants to make us the master
683 root 1.79 $NODE_REQ{g_slave} = sub {
684 root 1.73 my ($db) = @_;
685    
686 root 1.80 # load global module and redo the request
687 root 1.73 require AnyEvent::MP::Global;
688 root 1.80 &{ $NODE_REQ{g_slave} }
689 root 1.71 };
690    
691 root 1.73 #############################################################################
692 root 1.79 # local database operations
693 root 1.71
694 root 1.79 # local database management
695     sub db_set($$$) {
696     $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
697     snd $MASTER, g_add => $_[0] => $_[1] => $_[2]
698     if defined $MASTER;
699     }
700    
701     sub db_del($$) {
702     delete $LOCAL_DB{$_[0]}{$_[1]};
703     snd $MASTER, g_del => $_[0] => $_[1]
704     if defined $MASTER;
705     }
706    
707     sub db_reg($$;$) {
708     my ($family, $key) = @_;
709     &db_set;
710     Guard::guard { db_del $family => $key }
711     }
712 root 1.71
713 root 1.80 sub db_keys($$$) {
714     #d#
715     }
716    
717     #d# db_values
718     #d# db_family
719     #d# db_key
720    
721 root 1.81 our %LOCAL_MON; # f, reply
722     our %MON_DB; # f, k, value
723 root 1.80
724 root 1.81 sub db_mon($@) {
725 root 1.84 my ($family, $cb) = @_;
726 root 1.81
727 root 1.84 if (my $db = $MON_DB{$family}) {
728 root 1.81 # if we already monitor this thingy, generate
729     # create events for all of them
730 root 1.84 $cb->($db, [keys %$db]);
731 root 1.81 } else {
732     # new monitor, request chg1 from upstream
733     global_req_add "mon1 $family" => [g_mon1 => $family];
734     $MON_DB{$family} = {};
735     }
736    
737 root 1.84 $LOCAL_MON{$family}{$cb+0} = $cb;
738 root 1.80
739     Guard::guard {
740 root 1.81 my $mon = $LOCAL_MON{$family};
741 root 1.84 delete $mon->{$cb+0};
742 root 1.81
743     unless (%$mon) {
744     global_req_del "mon1 $family";
745 root 1.80
746     # no global_req, because we don't care if we are not connected
747 root 1.81 snd $MASTER, g_mon0 => $family
748 root 1.80 if $MASTER;
749    
750 root 1.81 delete $LOCAL_MON{$family};
751     delete $MON_DB{$family};
752 root 1.80 }
753     }
754     }
755    
756 root 1.82 # full update
757 root 1.80 $NODE_REQ{g_chg1} = sub {
758 root 1.84 my ($f, $ndb) = @_;
759    
760     my $db = $MON_DB{$f};
761     my @k;
762 root 1.81
763 root 1.82 # add or replace keys
764 root 1.84 while (my ($k, $v) = each %$ndb) {
765     $db->{$k} = $v;
766     push @k, $k;
767 root 1.82 }
768 root 1.81
769 root 1.82 # delete keys that are no longer present
770 root 1.84 for (grep !exists $ndb->{$_}, keys %$db) {
771     delete $db->{$_};
772     push @k, $_;
773 root 1.81 }
774 root 1.84
775     $_->($db, \@k)
776     for values %{ $LOCAL_MON{$_[0]} };
777 root 1.80 };
778    
779 root 1.82 # incremental update
780 root 1.84 $NODE_REQ{g_chg2} = sub {
781     my $db = $MON_DB{$_[0]};
782    
783     @_ >= 3
784     ? $db->{$_[1]} = $_[2]
785     : delete $db->{$_[1]};
786    
787     $_->($db, [$_[1]])
788     for values %{ $LOCAL_MON{$_[0]} };
789     };
790 root 1.80
791 root 1.69 #############################################################################
792     # configure
793    
794 root 1.81 sub nodename {
795 root 1.69 require POSIX;
796     (POSIX::uname ())[1]
797     }
798    
799     sub _resolve($) {
800     my ($nodeid) = @_;
801    
802     my $cv = AE::cv;
803     my @res;
804    
805     $cv->begin (sub {
806     my %seen;
807     my @refs;
808     for (sort { $a->[0] <=> $b->[0] } @res) {
809     push @refs, $_->[1] unless $seen{$_->[1]}++
810     }
811     shift->send (@refs);
812     });
813    
814     my $idx;
815     for my $t (split /,/, $nodeid) {
816     my $pri = ++$idx;
817    
818 root 1.81 $t = length $t ? nodename . ":$t" : nodename
819 root 1.69 if $t =~ /^\d*$/;
820    
821     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
822     or Carp::croak "$t: unparsable transport descriptor";
823    
824     $port = "0" if $port eq "*";
825    
826     if ($host eq "*") {
827     $cv->begin;
828     # use fork_call, as Net::Interface is big, and we need it rarely.
829     require AnyEvent::Util;
830     AnyEvent::Util::fork_call (
831     sub {
832     my @addr;
833    
834     require Net::Interface;
835    
836     for my $if (Net::Interface->interfaces) {
837     # we statically lower-prioritise ipv6 here, TODO :()
838     for $_ ($if->address (Net::Interface::AF_INET ())) {
839     next if /^\x7f/; # skip localhost etc.
840     push @addr, $_;
841     }
842     for ($if->address (Net::Interface::AF_INET6 ())) {
843     #next if $if->scope ($_) <= 2;
844     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
845     push @addr, $_;
846     }
847    
848     }
849     @addr
850     }, sub {
851     for my $ip (@_) {
852     push @res, [
853     $pri += 1e-5,
854     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
855     ];
856     }
857     $cv->end;
858     }
859     );
860     } else {
861     $cv->begin;
862     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
863     for (@_) {
864     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
865     push @res, [
866     $pri += 1e-5,
867     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
868     ];
869     }
870     $cv->end;
871     };
872     }
873     }
874    
875     $cv->end;
876    
877     $cv
878     }
879    
880     sub configure(@) {
881     unshift @_, "profile" if @_ & 1;
882     my (%kv) = @_;
883    
884     delete $NODE{$NODE}; # we do not support doing stuff before configure
885     _init_names;
886    
887     my $profile = delete $kv{profile};
888    
889 root 1.81 $profile = nodename
890 root 1.69 unless defined $profile;
891    
892     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
893    
894 root 1.83 if (exists $CONFIG->{secure}) {
895     my $pass = !$CONFIG->{secure};
896     $SECURE = sub { $pass };
897     }
898    
899 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
900 root 1.69
901     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
902    
903 root 1.72 $NODE = $node;
904 root 1.77
905 root 1.81 $NODE =~ s/%n/nodename/ge;
906    
907     if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
908 root 1.77 # nodes with randomised node names do not need randomised port names
909     $UNIQ = "";
910     }
911 root 1.69
912     $NODE{$NODE} = $NODE{""};
913     $NODE{$NODE}{id} = $NODE;
914    
915     my $seeds = $CONFIG->{seeds};
916     my $binds = $CONFIG->{binds};
917    
918     $binds ||= ["*"];
919    
920     $WARN->(8, "node $NODE starting up.");
921    
922 root 1.85 $BINDS = [];
923     %BINDS = ();
924 root 1.69
925     for (map _resolve $_, @$binds) {
926     for my $bind ($_->recv) {
927     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
928     or Carp::croak "$bind: unparsable local bind address";
929    
930     my $listener = AnyEvent::MP::Transport::mp_server
931     $host,
932     $port,
933     prepare => sub {
934     my (undef, $host, $port) = @_;
935     $bind = AnyEvent::Socket::format_hostport $host, $port;
936     0
937     },
938     ;
939 root 1.85 $BINDS{$bind} = $listener;
940     push @$BINDS, $bind;
941 root 1.69 }
942     }
943    
944 root 1.85 db_set "'l" => $NODE => $BINDS;
945 root 1.73
946 root 1.85 $WARN->(8, "node listens on [@$BINDS].");
947 root 1.69
948     # connect to all seednodes
949     set_seeds map $_->recv, map _resolve $_, @$seeds;
950    
951 root 1.73 master_search;
952    
953 root 1.71 if ($NODE eq "atha") {;#d#
954 root 1.72 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
955 root 1.71 }
956    
957 root 1.69 for (@{ $CONFIG->{services} }) {
958     if (ref) {
959     my ($func, @args) = @$_;
960     (load_func $func)->(@args);
961     } elsif (s/::$//) {
962     eval "require $_";
963     die $@ if $@;
964     } else {
965     (load_func $_)->();
966     }
967     }
968     }
969    
970 root 1.1 =back
971    
972     =head1 SEE ALSO
973    
974     L<AnyEvent::MP>.
975    
976     =head1 AUTHOR
977    
978     Marc Lehmann <schmorp@schmorp.de>
979     http://home.schmorp.de/
980    
981     =cut
982    
983     1
984