ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.91
Committed: Sun Mar 11 22:34:21 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.90: +3 -3 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29    
30 root 1.79 use AnyEvent ();
31     use Guard ();
32 root 1.1
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.71 our @EXPORT_OK = qw(
39     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40     );
41    
42 root 1.1 our @EXPORT = qw(
43 root 1.21 add_node load_func snd_to_func snd_on eval_on
44 root 1.1
45 root 1.34 NODE $NODE node_of snd kil port_is_local
46     configure
47 root 1.46 up_nodes mon_nodes node_is_up
48 root 1.79 db_set db_del db_reg
49 root 1.84 db_mon db_family db_keys db_values
50 root 1.1 );
51    
52 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
53 root 1.1
54 root 1.27 This value is called with an error or warning message, when e.g. a
55     connection could not be created, authorisation failed and so on.
56    
57     It I<must not> block or send messages -queue it and use an idle watcher if
58     you need to do any of these things.
59 root 1.1
60 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
61 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62     node connectivity and services, C<8> for debugging messages and C<9> for
63     tracing messages.
64    
65 root 1.1 The default simply logs the message to STDERR.
66    
67 root 1.44 =item @AnyEvent::MP::Kernel::WARN
68    
69     All code references in this array are called for every log message, from
70     the default C<$WARN> handler. This is an easy way to tie into the log
71     messages without disturbing others.
72    
73 root 1.1 =cut
74    
75 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
76 root 1.44 our @WARN;
77     our $WARN = sub {
78     &$_ for @WARN;
79 root 1.29
80     return if $WARNLEVEL < $_[0];
81    
82 root 1.16 my ($level, $msg) = @_;
83    
84 root 1.1 $msg =~ s/\n$//;
85 root 1.16
86     printf STDERR "%s <%d> %s\n",
87     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
88     $level,
89     $msg;
90 root 1.1 };
91    
92 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
93    
94     The maximum level at which warning messages will be printed to STDERR by
95     the default warn handler.
96    
97     =cut
98    
99 root 1.6 sub load_func($) {
100     my $func = $_[0];
101    
102     unless (defined &$func) {
103     my $pkg = $func;
104     do {
105     $pkg =~ s/::[^:]+$//
106 root 1.63 or return sub { die "unable to resolve function '$func'" };
107 root 1.60
108     local $@;
109 root 1.61 unless (eval "require $pkg; 1") {
110     my $error = $@;
111     $error =~ /^Can't locate .*.pm in \@INC \(/
112     or return sub { die $error };
113     }
114 root 1.6 } until defined &$func;
115     }
116    
117     \&$func
118     }
119    
120 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
121    
122 root 1.1 sub nonce($) {
123 root 1.78 join "", map chr rand 256, 1 .. $_[0]
124 root 1.1 }
125    
126 root 1.78 sub nonce62($) {
127     join "", map $alnum[rand 62], 1 .. $_[0]
128 root 1.1 }
129    
130     sub gen_uniq {
131 root 1.78 my $now = AE::now;
132     (join "",
133     map $alnum[$_],
134     $$ / 62 % 62,
135     $$ % 62,
136     (int $now ) % 62,
137     (int $now * 100) % 62,
138     (int $now * 10000) % 62,
139     ) . nonce62 4;
140 root 1.1 }
141    
142 root 1.20 our $CONFIG; # this node's configuration
143 root 1.83 our $SECURE = sub { 1 };
144 root 1.21
145 root 1.64 our $RUNIQ; # remote uniq value
146     our $UNIQ; # per-process/node unique cookie
147     our $NODE;
148     our $ID = "a";
149 root 1.1
150     our %NODE; # node id to transport mapping, or "undef", for local node
151     our (%PORT, %PORT_DATA); # local ports
152    
153 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
154 root 1.1 our %LMON; # monitored _local_ ports
155    
156 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
157 root 1.85 our %BINDS;
158     our $BINDS; # our listeners, as arrayref
159 root 1.1
160 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
161 root 1.1
162 root 1.69 sub _init_names {
163 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
164     $UNIQ = gen_uniq;
165    
166     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
167     $RUNIQ = nonce62 10;
168     $RUNIQ =~ s/(.)$/\U$1/;
169    
170     $NODE = "anon/$RUNIQ";
171 root 1.64 }
172    
173 root 1.69 _init_names;
174 root 1.64
175 root 1.1 sub NODE() {
176     $NODE
177     }
178    
179     sub node_of($) {
180 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
181 root 1.1
182 root 1.21 $node
183 root 1.1 }
184    
185 root 1.17 BEGIN {
186     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
187     ? sub () { 1 }
188     : sub () { 0 };
189     }
190 root 1.1
191 root 1.42 our $DELAY_TIMER;
192     our @DELAY_QUEUE;
193    
194     sub _delay_run {
195 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
196 root 1.42 }
197    
198     sub delay($) {
199     push @DELAY_QUEUE, shift;
200     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
201     }
202    
203 root 1.1 sub _inject {
204 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
205 root 1.1 &{ $PORT{+shift} or return };
206     }
207    
208 root 1.20 # this function adds a node-ref, so you can send stuff to it
209     # it is basically the central routing component.
210 root 1.1 sub add_node {
211 root 1.21 my ($node) = @_;
212 root 1.1
213 root 1.71 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
214 root 1.13 }
215    
216 root 1.1 sub snd(@) {
217 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
218 root 1.1
219 root 1.86 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
220 root 1.1
221 root 1.49 defined $nodeid #d#UGLY
222     or Carp::croak "'undef' is not a valid node ID/port ID";
223    
224 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
225 root 1.2 ->{send} (["$portid", @_]);
226 root 1.1 }
227    
228 root 1.17 =item $is_local = port_is_local $port
229    
230     Returns true iff the port is a local port.
231    
232     =cut
233    
234     sub port_is_local($) {
235 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
236 root 1.17
237 root 1.21 $NODE{$nodeid} == $NODE{""}
238 root 1.17 }
239    
240 root 1.18 =item snd_to_func $node, $func, @args
241 root 1.11
242 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
243 root 1.11 this function with the given arguments on that node.
244    
245 root 1.20 This function can be used to implement C<spawn>-like interfaces.
246 root 1.11
247     =cut
248    
249 root 1.18 sub snd_to_func($$;@) {
250 root 1.21 my $nodeid = shift;
251 root 1.11
252 root 1.41 # on $NODE, we artificially delay... (for spawn)
253     # this is very ugly - maybe we should simply delay ALL messages,
254     # to avoid deep recursion issues. but that's so... slow...
255 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
256     if $nodeid ne $NODE;
257    
258 root 1.49 defined $nodeid #d#UGLY
259     or Carp::croak "'undef' is not a valid node ID/port ID";
260    
261 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
262 root 1.11 }
263    
264 root 1.18 =item snd_on $node, @msg
265    
266     Executes C<snd> with the given C<@msg> (which must include the destination
267     port) on the given node.
268    
269     =cut
270    
271     sub snd_on($@) {
272     my $node = shift;
273     snd $node, snd => @_;
274     }
275    
276 root 1.29 =item eval_on $node, $string[, @reply]
277 root 1.18
278 root 1.29 Evaluates the given string as Perl expression on the given node. When
279     @reply is specified, then it is used to construct a reply message with
280     C<"$@"> and any results from the eval appended.
281 root 1.18
282     =cut
283    
284 root 1.29 sub eval_on($$;@) {
285 root 1.18 my $node = shift;
286     snd $node, eval => @_;
287     }
288    
289 root 1.1 sub kil(@) {
290 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
291 root 1.1
292     length $portid
293 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
294 root 1.1
295 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
296 root 1.1 ->kill ("$portid", @_);
297     }
298    
299     #############################################################################
300 root 1.6 # node monitoring and info
301 root 1.3
302 root 1.21 =item node_is_known $nodeid
303 root 1.13
304 root 1.71 #TODO#
305     Returns true iff the given node is currently known to this node.
306 root 1.13
307     =cut
308    
309     sub node_is_known($) {
310     exists $NODE{$_[0]}
311     }
312    
313 root 1.21 =item node_is_up $nodeid
314 root 1.13
315     Returns true if the given node is "up", that is, the kernel thinks it has
316     a working connection to it.
317    
318 root 1.69 If the node is known (to this local node) but not currently connected,
319     returns C<0>. If the node is not known, returns C<undef>.
320 root 1.13
321     =cut
322    
323     sub node_is_up($) {
324     ($NODE{$_[0]} or return)->{transport}
325     ? 1 : 0
326     }
327    
328 root 1.3 =item up_nodes
329    
330 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
331     the node itself).
332 root 1.3
333     =cut
334    
335 root 1.49 sub up_nodes() {
336 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
337 root 1.3 }
338    
339 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
340 root 1.3
341 root 1.27 Registers a callback that is called each time a node goes up (a connection
342     is established) or down (the connection is lost).
343 root 1.3
344     Node up messages can only be followed by node down messages for the same
345     node, and vice versa.
346    
347 root 1.71 Note that monitoring a node is usually better done by monitoring its node
348 root 1.27 port. This function is mainly of interest to modules that are concerned
349     about the network topology and low-level connection handling.
350    
351     Callbacks I<must not> block and I<should not> send any messages.
352    
353     The function returns an optional guard which can be used to unregister
354 root 1.3 the monitoring callback again.
355    
356 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
357     or go up (and down).
358    
359     newnode $_, 1 for up_nodes;
360     mon_nodes \&newnode;
361    
362 root 1.3 =cut
363    
364     our %MON_NODES;
365    
366     sub mon_nodes($) {
367     my ($cb) = @_;
368    
369     $MON_NODES{$cb+0} = $cb;
370    
371 root 1.90 defined wantarray
372     and Guard::guard { delete $MON_NODES{$cb+0} }
373 root 1.3 }
374    
375     sub _inject_nodeevent($$;@) {
376 root 1.16 my ($node, $up, @reason) = @_;
377 root 1.3
378     for my $cb (values %MON_NODES) {
379 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
380 root 1.16 or $WARN->(1, $@);
381 root 1.3 }
382 root 1.16
383 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
384 root 1.3 }
385    
386     #############################################################################
387 root 1.1 # self node code
388    
389 root 1.67 sub _kill {
390     my $port = shift;
391    
392     delete $PORT{$port}
393     or return; # killing nonexistent ports is O.K.
394     delete $PORT_DATA{$port};
395    
396     my $mon = delete $LMON{$port}
397     or !@_
398     or $WARN->(2, "unmonitored local port $port died with reason: @_");
399    
400     $_->(@_) for values %$mon;
401     }
402    
403     sub _monitor {
404     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
405     unless exists $PORT{$_[1]};
406    
407     $LMON{$_[1]}{$_[2]+0} = $_[2];
408     }
409    
410     sub _unmonitor {
411 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
412     if exists $LMON{$_[1]};
413 root 1.67 }
414    
415 root 1.83 sub _secure_check {
416     $SECURE->($SRCNODE->{id})
417 root 1.88 or $SRCNODE->{id} eq $NODE
418 root 1.83 or die "remote execution attempt by insecure node\n";
419     }
420    
421 root 1.79 our %NODE_REQ = (
422 root 1.1 # internal services
423    
424     # monitoring
425 root 1.65 mon0 => sub { # stop monitoring a port for another node
426 root 1.1 my $portid = shift;
427 root 1.91 _unmonitor undef, $portid, delete $NODE{$SRCNODE->{id}}{rmon}{$portid};
428 root 1.1 },
429 root 1.65 mon1 => sub { # start monitoring a port for another node
430 root 1.1 my $portid = shift;
431 root 1.91 Scalar::Util::weaken (my $node = $NODE{$SRCNODE->{id}});
432 root 1.67 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
433 root 1.58 delete $node->{rmon}{$portid};
434 root 1.65 $node->send (["", kil0 => $portid, @_])
435 root 1.59 if $node && $node->{transport};
436 root 1.67 };
437 root 1.1 },
438 root 1.65 # another node has killed a monitored port
439     kil0 => sub {
440 root 1.91 my $cbs = delete $NODE{$SRCNODE->{id}}{lmon}{+shift}
441 root 1.1 or return;
442    
443     $_->(@_) for @$cbs;
444     },
445    
446 root 1.18 # "public" services - not actually public
447 root 1.1
448 root 1.65 # another node wants to kill a local port
449 root 1.66 kil => \&_kill,
450 root 1.65
451 root 1.88 # is the remote node considered secure?
452     # secure => sub {
453     # #TODO#
454     # },
455    
456 root 1.1 # relay message to another node / generic echo
457 root 1.88 snd => sub {
458     &_secure_check;
459     &snd
460 root 1.1 },
461    
462 root 1.30 # random utilities
463 root 1.1 eval => sub {
464 root 1.83 &_secure_check;
465 root 1.50 my @res = do { package main; eval shift };
466 root 1.1 snd @_, "$@", @res if @_;
467     },
468     time => sub {
469 root 1.88 &_secure_check;
470 root 1.76 snd @_, AE::now;
471 root 1.1 },
472     devnull => sub {
473     #
474     },
475 root 1.15 "" => sub {
476 root 1.27 # empty messages are keepalives or similar devnull-applications
477 root 1.15 },
478 root 1.1 );
479    
480 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
481 root 1.1 $PORT{""} = sub {
482     my $tag = shift;
483 root 1.83 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
484     $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@;
485 root 1.1 };
486    
487 root 1.84 our $NPROTO = 1;
488    
489     # tell everybody who connects our nproto
490     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
491     $_[0]{local_greeting}{nproto} = $NPROTO;
492     };
493    
494 root 1.69 #############################################################################
495 root 1.71 # seed management, try to keep connections to all seeds at all times
496 root 1.69
497 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
498     our %NODE_SEED; # map node ID to seed ID
499 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
500     our $SEED_WATCHER;
501 root 1.71 our $SEED_RETRY;
502 root 1.69
503     sub seed_connect {
504     my ($seed) = @_;
505    
506     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
507     or Carp::croak "$seed: unparsable seed address";
508    
509     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
510    
511 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
512     $host, $port,
513     on_greeted => sub {
514     # called after receiving remote greeting, learn remote node name
515    
516     # we rely on untrusted data here (the remote node name) this is
517     # hopefully ok, as this can at most be used for DOSing, which is easy
518     # when you can do MITM anyway.
519    
520     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
521     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
522 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
523 root 1.71 delete $SEED_NODE{$seed};
524     delete $NODE_SEED{$seed};
525     } else {
526     $SEED_NODE{$seed} = $_[0]{remote_node};
527     $NODE_SEED{$_[0]{remote_node}} = $seed;
528     }
529     },
530     on_destroy => sub {
531     delete $SEED_CONNECT{$seed};
532     },
533 root 1.69 sub {
534     $SEED_CONNECT{$seed} = 1;
535 root 1.71 }
536 root 1.69 ;
537     }
538    
539     sub seed_all {
540     my @seeds = grep {
541     !exists $SEED_CONNECT{$_}
542     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
543     } keys %SEED_NODE;
544    
545     if (@seeds) {
546 root 1.70 # start connection attempt for every seed we are not connected to yet
547 root 1.69 seed_connect $_
548     for @seeds;
549 root 1.71
550     $SEED_RETRY = $SEED_RETRY * 2 + rand;
551     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
552     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
553    
554     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
555    
556 root 1.69 } else {
557 root 1.71 # all seeds connected or connecting, no need to restart timer
558 root 1.69 undef $SEED_WATCHER;
559     }
560     }
561    
562     sub seed_again {
563 root 1.71 $SEED_RETRY = 1;
564     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
565 root 1.69 }
566    
567     # sets new seed list, starts connecting
568     sub set_seeds(@) {
569     %SEED_NODE = ();
570 root 1.71 %NODE_SEED = ();
571     %SEED_CONNECT = ();
572    
573 root 1.69 @SEED_NODE{@_} = ();
574    
575 root 1.71 seed_again;#d#
576     seed_all;
577     }
578    
579     mon_nodes sub {
580     # if we lost the connection to a seed node, make sure we are seeding
581     seed_again
582     if !$_[1] && exists $NODE_SEED{$_[0]};
583     };
584    
585     #############################################################################
586     # talk with/to global nodes
587    
588 root 1.72 # protocol messages:
589     #
590 root 1.73 # sent by all slave nodes (slave to master)
591     # g_slave database - make other global node master of the sender
592 root 1.72 #
593 root 1.73 # sent by any node to global nodes
594     # g_set database - set whole database
595 root 1.89 # g_upd family set del - update single family
596 root 1.73 # g_del family key - delete key from database
597     # g_get family key reply... - send reply with data
598     #
599     # send by global nodes
600     # g_global - node became global, similar to global=1 greeting
601     #
602     # database families
603     # "'l" -> node -> listeners
604     # "'g" -> node -> undef
605     # ...
606 root 1.72 #
607    
608 root 1.73 # used on all nodes:
609 root 1.88 our $MASTER; # the global node we bind ourselves to
610 root 1.78 our $MASTER_MON;
611     our %LOCAL_DB; # this node database
612    
613     our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
614 root 1.71
615 root 1.84 our $GPROTO = 1;
616    
617     # tell everybody who connects our nproto
618     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
619     $_[0]{local_greeting}{gproto} = $GPROTO;
620     };
621    
622 root 1.73 #############################################################################
623     # master selection
624    
625     # master requests
626     our %GLOBAL_REQ; # $id => \@req
627 root 1.71
628 root 1.74 sub global_req_add {
629 root 1.80 my ($id, $req) = @_;
630 root 1.71
631 root 1.74 return if exists $GLOBAL_REQ{$id};
632    
633 root 1.80 $GLOBAL_REQ{$id} = $req;
634 root 1.71
635 root 1.80 snd $MASTER, @$req
636 root 1.73 if $MASTER;
637 root 1.74 }
638 root 1.71
639 root 1.74 sub global_req_del {
640     delete $GLOBAL_REQ{$_[0]};
641     }
642    
643 root 1.88 #################################
644     # master rpc
645    
646     our %GLOBAL_RES;
647     our $GLOBAL_RES_ID = "a";
648    
649     sub global_call {
650     my $id = ++$GLOBAL_RES_ID;
651     $GLOBAL_RES{$id} = pop;
652     global_req_add $id, [@_, $id];
653     }
654    
655     $NODE_REQ{g_reply} = sub {
656     my $id = shift;
657     global_req_del $id;
658     my $cb = delete $GLOBAL_RES{$id}
659     or return;
660     &$cb
661     };
662    
663     #################################
664    
665 root 1.74 sub g_find {
666 root 1.80 global_req_add "g_find $_[0]", [g_find => $_[0]];
667 root 1.73 }
668 root 1.71
669 root 1.73 # reply for g_find started in Node.pm
670 root 1.79 $NODE_REQ{g_found} = sub {
671 root 1.74 global_req_del "g_find $_[0]";
672    
673 root 1.73 my $node = $NODE{$_[0]} or return;
674 root 1.71
675 root 1.79 $node->connect_to ($_[1]);
676 root 1.71 };
677    
678 root 1.73 sub master_set {
679     $MASTER = $_[0];
680 root 1.71
681 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
682 root 1.71
683 root 1.73 # (re-)send queued requests
684     snd $MASTER, @$_
685     for values %GLOBAL_REQ;
686     }
687 root 1.71
688 root 1.72 sub master_search {
689 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
690     for (keys %NODE_SEED) {
691 root 1.72 if (node_is_up $_) {
692     master_set $_;
693     return;
694 root 1.71 }
695 root 1.72 }
696 root 1.71
697 root 1.78 $MASTER_MON = mon_nodes sub {
698 root 1.72 return unless $_[1]; # we are only interested in node-ups
699     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
700 root 1.71
701 root 1.72 master_set $_[0];
702 root 1.71
703 root 1.78 $MASTER_MON = mon_nodes sub {
704 root 1.72 if ($_[0] eq $MASTER && !$_[1]) {
705     undef $MASTER;
706     master_search ();
707     }
708 root 1.71 };
709 root 1.72 };
710 root 1.71 }
711    
712 root 1.73 # other node wants to make us the master
713 root 1.79 $NODE_REQ{g_slave} = sub {
714 root 1.73 my ($db) = @_;
715    
716 root 1.80 # load global module and redo the request
717 root 1.73 require AnyEvent::MP::Global;
718 root 1.80 &{ $NODE_REQ{g_slave} }
719 root 1.71 };
720    
721 root 1.73 #############################################################################
722 root 1.79 # local database operations
723 root 1.71
724 root 1.79 # local database management
725 root 1.88
726 root 1.87 sub db_set($$;$) {
727 root 1.89 # if (ref $_[1]) {
728     # # bulk
729     # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
730     # $LOCAL_DB{$_[0]} = $_[1];
731     # snd $MASTER, g_upd => $_[0] => $_[1], \@del
732     # if defined $MASTER;
733     # } else {
734     # single-key
735     $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
736     snd $MASTER, g_upd => $_[0] => { $_[1] => $_[2] }
737     if defined $MASTER;
738     # }
739 root 1.79 }
740    
741 root 1.89 sub db_del($@) {
742     my $family = shift;
743    
744     delete @{ $LOCAL_DB{$family} }{@_};
745     snd $MASTER, g_upd => $family => undef, \@_
746 root 1.79 if defined $MASTER;
747     }
748    
749     sub db_reg($$;$) {
750     my ($family, $key) = @_;
751     &db_set;
752     Guard::guard { db_del $family => $key }
753     }
754 root 1.71
755 root 1.88 # database query
756    
757     sub db_family {
758     my ($family, $cb) = @_;
759     global_call g_db_family => $family, $cb;
760     }
761    
762     sub db_keys {
763     my ($family, $cb) = @_;
764     global_call g_db_keys => $family, $cb;
765     }
766    
767     sub db_values {
768     my ($family, $cb) = @_;
769     global_call g_db_values => $family, $cb;
770 root 1.80 }
771    
772 root 1.88 # database monitoring
773 root 1.80
774 root 1.81 our %LOCAL_MON; # f, reply
775     our %MON_DB; # f, k, value
776 root 1.80
777 root 1.81 sub db_mon($@) {
778 root 1.84 my ($family, $cb) = @_;
779 root 1.81
780 root 1.84 if (my $db = $MON_DB{$family}) {
781 root 1.89 # we already monitor, so create a "dummy" change event
782     # this is postponed, which might be too late (we could process
783     # change events), so disable the callback at first
784     $LOCAL_MON{$family}{$cb+0} = sub { };
785     AE::postpone {
786     return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
787    
788     # set actual callback
789     $LOCAL_MON{$family}{$cb+0} = $cb;
790     $cb->($db, [keys %$db]);
791     };
792 root 1.81 } else {
793     # new monitor, request chg1 from upstream
794 root 1.89 $LOCAL_MON{$family}{$cb+0} = $cb;
795 root 1.81 global_req_add "mon1 $family" => [g_mon1 => $family];
796     $MON_DB{$family} = {};
797     }
798    
799 root 1.90 defined wantarray
800     and Guard::guard {
801     my $mon = $LOCAL_MON{$family};
802     delete $mon->{$cb+0};
803    
804     unless (%$mon) {
805     global_req_del "mon1 $family";
806    
807     # no global_req, because we don't care if we are not connected
808     snd $MASTER, g_mon0 => $family
809     if $MASTER;
810 root 1.80
811 root 1.90 delete $LOCAL_MON{$family};
812     delete $MON_DB{$family};
813     }
814 root 1.80 }
815     }
816    
817 root 1.82 # full update
818 root 1.80 $NODE_REQ{g_chg1} = sub {
819 root 1.84 my ($f, $ndb) = @_;
820    
821     my $db = $MON_DB{$f};
822 root 1.89 my (@a, @c, @d);
823 root 1.81
824 root 1.82 # add or replace keys
825 root 1.84 while (my ($k, $v) = each %$ndb) {
826 root 1.89 exists $db->{$k}
827     ? push @c, $k
828     : push @a, $k;
829 root 1.84 $db->{$k} = $v;
830 root 1.82 }
831 root 1.81
832 root 1.82 # delete keys that are no longer present
833 root 1.84 for (grep !exists $ndb->{$_}, keys %$db) {
834     delete $db->{$_};
835 root 1.89 push @d, $_;
836 root 1.81 }
837 root 1.84
838 root 1.89 $_->($db, \@a, \@c, \@d)
839 root 1.84 for values %{ $LOCAL_MON{$_[0]} };
840 root 1.80 };
841    
842 root 1.82 # incremental update
843 root 1.84 $NODE_REQ{g_chg2} = sub {
844 root 1.89 my ($family, $set, $del) = @_;
845    
846     my $db = $MON_DB{$family};
847 root 1.84
848 root 1.89 my (@a, @c);
849 root 1.84
850 root 1.89 while (my ($k, $v) = each %$set) {
851     exists $db->{$k}
852     ? push @c, $k
853     : push @a, $k;
854     $db->{$k} = $v;
855     }
856    
857     delete @$db{@$del};
858    
859     $_->($db, \@a, \@c, $del)
860     for values %{ $LOCAL_MON{$family} };
861 root 1.84 };
862 root 1.80
863 root 1.69 #############################################################################
864     # configure
865    
866 root 1.81 sub nodename {
867 root 1.69 require POSIX;
868     (POSIX::uname ())[1]
869     }
870    
871     sub _resolve($) {
872     my ($nodeid) = @_;
873    
874     my $cv = AE::cv;
875     my @res;
876    
877     $cv->begin (sub {
878     my %seen;
879     my @refs;
880     for (sort { $a->[0] <=> $b->[0] } @res) {
881     push @refs, $_->[1] unless $seen{$_->[1]}++
882     }
883     shift->send (@refs);
884     });
885    
886     my $idx;
887     for my $t (split /,/, $nodeid) {
888     my $pri = ++$idx;
889    
890 root 1.81 $t = length $t ? nodename . ":$t" : nodename
891 root 1.69 if $t =~ /^\d*$/;
892    
893     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
894     or Carp::croak "$t: unparsable transport descriptor";
895    
896     $port = "0" if $port eq "*";
897    
898     if ($host eq "*") {
899     $cv->begin;
900     # use fork_call, as Net::Interface is big, and we need it rarely.
901     require AnyEvent::Util;
902     AnyEvent::Util::fork_call (
903     sub {
904     my @addr;
905    
906     require Net::Interface;
907    
908     for my $if (Net::Interface->interfaces) {
909     # we statically lower-prioritise ipv6 here, TODO :()
910     for $_ ($if->address (Net::Interface::AF_INET ())) {
911     next if /^\x7f/; # skip localhost etc.
912     push @addr, $_;
913     }
914     for ($if->address (Net::Interface::AF_INET6 ())) {
915     #next if $if->scope ($_) <= 2;
916     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
917     push @addr, $_;
918     }
919    
920     }
921     @addr
922     }, sub {
923     for my $ip (@_) {
924     push @res, [
925     $pri += 1e-5,
926     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
927     ];
928     }
929     $cv->end;
930     }
931     );
932     } else {
933     $cv->begin;
934     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
935     for (@_) {
936     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
937     push @res, [
938     $pri += 1e-5,
939     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
940     ];
941     }
942     $cv->end;
943     };
944     }
945     }
946    
947     $cv->end;
948    
949     $cv
950     }
951    
952     sub configure(@) {
953     unshift @_, "profile" if @_ & 1;
954     my (%kv) = @_;
955    
956     delete $NODE{$NODE}; # we do not support doing stuff before configure
957     _init_names;
958    
959     my $profile = delete $kv{profile};
960    
961 root 1.81 $profile = nodename
962 root 1.69 unless defined $profile;
963    
964     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
965    
966 root 1.83 if (exists $CONFIG->{secure}) {
967     my $pass = !$CONFIG->{secure};
968     $SECURE = sub { $pass };
969     }
970    
971 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
972 root 1.69
973     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
974    
975 root 1.72 $NODE = $node;
976 root 1.77
977 root 1.81 $NODE =~ s/%n/nodename/ge;
978    
979     if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
980 root 1.77 # nodes with randomised node names do not need randomised port names
981     $UNIQ = "";
982     }
983 root 1.69
984     $NODE{$NODE} = $NODE{""};
985     $NODE{$NODE}{id} = $NODE;
986    
987     my $seeds = $CONFIG->{seeds};
988     my $binds = $CONFIG->{binds};
989    
990     $binds ||= ["*"];
991    
992     $WARN->(8, "node $NODE starting up.");
993    
994 root 1.85 $BINDS = [];
995     %BINDS = ();
996 root 1.69
997     for (map _resolve $_, @$binds) {
998     for my $bind ($_->recv) {
999     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1000     or Carp::croak "$bind: unparsable local bind address";
1001    
1002     my $listener = AnyEvent::MP::Transport::mp_server
1003     $host,
1004     $port,
1005     prepare => sub {
1006     my (undef, $host, $port) = @_;
1007     $bind = AnyEvent::Socket::format_hostport $host, $port;
1008     0
1009     },
1010     ;
1011 root 1.85 $BINDS{$bind} = $listener;
1012     push @$BINDS, $bind;
1013 root 1.69 }
1014     }
1015    
1016 root 1.85 db_set "'l" => $NODE => $BINDS;
1017 root 1.73
1018 root 1.85 $WARN->(8, "node listens on [@$BINDS].");
1019 root 1.69
1020     # connect to all seednodes
1021     set_seeds map $_->recv, map _resolve $_, @$seeds;
1022    
1023 root 1.73 master_search;
1024    
1025 root 1.71 if ($NODE eq "atha") {;#d#
1026 root 1.72 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
1027 root 1.71 }
1028    
1029 root 1.69 for (@{ $CONFIG->{services} }) {
1030     if (ref) {
1031     my ($func, @args) = @$_;
1032     (load_func $func)->(@args);
1033     } elsif (s/::$//) {
1034     eval "require $_";
1035     die $@ if $@;
1036     } else {
1037     (load_func $_)->();
1038     }
1039     }
1040     }
1041    
1042 root 1.1 =back
1043    
1044     =head1 SEE ALSO
1045    
1046     L<AnyEvent::MP>.
1047    
1048     =head1 AUTHOR
1049    
1050     Marc Lehmann <schmorp@schmorp.de>
1051     http://home.schmorp.de/
1052    
1053     =cut
1054    
1055     1
1056