ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.92
Committed: Wed Mar 14 22:59:58 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.91: +12 -4 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29    
30 root 1.79 use AnyEvent ();
31     use Guard ();
32 root 1.1
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.71 our @EXPORT_OK = qw(
39     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40     );
41    
42 root 1.1 our @EXPORT = qw(
43 root 1.21 add_node load_func snd_to_func snd_on eval_on
44 root 1.1
45 root 1.34 NODE $NODE node_of snd kil port_is_local
46     configure
47 root 1.46 up_nodes mon_nodes node_is_up
48 root 1.79 db_set db_del db_reg
49 root 1.84 db_mon db_family db_keys db_values
50 root 1.1 );
51    
52 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
53 root 1.1
54 root 1.27 This value is called with an error or warning message, when e.g. a
55     connection could not be created, authorisation failed and so on.
56    
57     It I<must not> block or send messages -queue it and use an idle watcher if
58     you need to do any of these things.
59 root 1.1
60 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
61 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62     node connectivity and services, C<8> for debugging messages and C<9> for
63     tracing messages.
64    
65 root 1.1 The default simply logs the message to STDERR.
66    
67 root 1.44 =item @AnyEvent::MP::Kernel::WARN
68    
69     All code references in this array are called for every log message, from
70     the default C<$WARN> handler. This is an easy way to tie into the log
71     messages without disturbing others.
72    
73 root 1.1 =cut
74    
75 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
76 root 1.44 our @WARN;
77     our $WARN = sub {
78     &$_ for @WARN;
79 root 1.29
80     return if $WARNLEVEL < $_[0];
81    
82 root 1.16 my ($level, $msg) = @_;
83    
84 root 1.1 $msg =~ s/\n$//;
85 root 1.16
86     printf STDERR "%s <%d> %s\n",
87     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
88     $level,
89     $msg;
90 root 1.1 };
91    
92 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
93    
94     The maximum level at which warning messages will be printed to STDERR by
95     the default warn handler.
96    
97     =cut
98    
99 root 1.6 sub load_func($) {
100     my $func = $_[0];
101    
102     unless (defined &$func) {
103     my $pkg = $func;
104     do {
105     $pkg =~ s/::[^:]+$//
106 root 1.63 or return sub { die "unable to resolve function '$func'" };
107 root 1.60
108     local $@;
109 root 1.61 unless (eval "require $pkg; 1") {
110     my $error = $@;
111     $error =~ /^Can't locate .*.pm in \@INC \(/
112     or return sub { die $error };
113     }
114 root 1.6 } until defined &$func;
115     }
116    
117     \&$func
118     }
119    
120 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
121    
122 root 1.1 sub nonce($) {
123 root 1.78 join "", map chr rand 256, 1 .. $_[0]
124 root 1.1 }
125    
126 root 1.78 sub nonce62($) {
127     join "", map $alnum[rand 62], 1 .. $_[0]
128 root 1.1 }
129    
130     sub gen_uniq {
131 root 1.78 my $now = AE::now;
132     (join "",
133     map $alnum[$_],
134     $$ / 62 % 62,
135     $$ % 62,
136     (int $now ) % 62,
137     (int $now * 100) % 62,
138     (int $now * 10000) % 62,
139     ) . nonce62 4;
140 root 1.1 }
141    
142 root 1.20 our $CONFIG; # this node's configuration
143 root 1.83 our $SECURE = sub { 1 };
144 root 1.21
145 root 1.64 our $RUNIQ; # remote uniq value
146     our $UNIQ; # per-process/node unique cookie
147     our $NODE;
148     our $ID = "a";
149 root 1.1
150     our %NODE; # node id to transport mapping, or "undef", for local node
151     our (%PORT, %PORT_DATA); # local ports
152    
153 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
154 root 1.1 our %LMON; # monitored _local_ ports
155    
156 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
157 root 1.85 our %BINDS;
158     our $BINDS; # our listeners, as arrayref
159 root 1.1
160 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
161 root 1.1
162 root 1.69 sub _init_names {
163 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
164     $UNIQ = gen_uniq;
165    
166     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
167     $RUNIQ = nonce62 10;
168     $RUNIQ =~ s/(.)$/\U$1/;
169    
170     $NODE = "anon/$RUNIQ";
171 root 1.64 }
172    
173 root 1.69 _init_names;
174 root 1.64
175 root 1.1 sub NODE() {
176     $NODE
177     }
178    
179     sub node_of($) {
180 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
181 root 1.1
182 root 1.21 $node
183 root 1.1 }
184    
185 root 1.17 BEGIN {
186     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
187     ? sub () { 1 }
188     : sub () { 0 };
189     }
190 root 1.1
191 root 1.42 our $DELAY_TIMER;
192     our @DELAY_QUEUE;
193    
194     sub _delay_run {
195 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
196 root 1.42 }
197    
198     sub delay($) {
199     push @DELAY_QUEUE, shift;
200     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
201     }
202    
203 root 1.1 sub _inject {
204 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
205 root 1.1 &{ $PORT{+shift} or return };
206     }
207    
208 root 1.20 # this function adds a node-ref, so you can send stuff to it
209     # it is basically the central routing component.
210 root 1.1 sub add_node {
211 root 1.21 my ($node) = @_;
212 root 1.1
213 root 1.71 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
214 root 1.13 }
215    
216 root 1.1 sub snd(@) {
217 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
218 root 1.1
219 root 1.86 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
220 root 1.1
221 root 1.49 defined $nodeid #d#UGLY
222     or Carp::croak "'undef' is not a valid node ID/port ID";
223    
224 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
225 root 1.2 ->{send} (["$portid", @_]);
226 root 1.1 }
227    
228 root 1.17 =item $is_local = port_is_local $port
229    
230     Returns true iff the port is a local port.
231    
232     =cut
233    
234     sub port_is_local($) {
235 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
236 root 1.17
237 root 1.21 $NODE{$nodeid} == $NODE{""}
238 root 1.17 }
239    
240 root 1.18 =item snd_to_func $node, $func, @args
241 root 1.11
242 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
243 root 1.11 this function with the given arguments on that node.
244    
245 root 1.20 This function can be used to implement C<spawn>-like interfaces.
246 root 1.11
247     =cut
248    
249 root 1.18 sub snd_to_func($$;@) {
250 root 1.21 my $nodeid = shift;
251 root 1.11
252 root 1.41 # on $NODE, we artificially delay... (for spawn)
253     # this is very ugly - maybe we should simply delay ALL messages,
254     # to avoid deep recursion issues. but that's so... slow...
255 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
256     if $nodeid ne $NODE;
257    
258 root 1.49 defined $nodeid #d#UGLY
259     or Carp::croak "'undef' is not a valid node ID/port ID";
260    
261 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
262 root 1.11 }
263    
264 root 1.18 =item snd_on $node, @msg
265    
266     Executes C<snd> with the given C<@msg> (which must include the destination
267     port) on the given node.
268    
269     =cut
270    
271     sub snd_on($@) {
272     my $node = shift;
273     snd $node, snd => @_;
274     }
275    
276 root 1.29 =item eval_on $node, $string[, @reply]
277 root 1.18
278 root 1.29 Evaluates the given string as Perl expression on the given node. When
279     @reply is specified, then it is used to construct a reply message with
280     C<"$@"> and any results from the eval appended.
281 root 1.18
282     =cut
283    
284 root 1.29 sub eval_on($$;@) {
285 root 1.18 my $node = shift;
286     snd $node, eval => @_;
287     }
288    
289 root 1.1 sub kil(@) {
290 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
291 root 1.1
292     length $portid
293 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
294 root 1.1
295 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
296 root 1.1 ->kill ("$portid", @_);
297     }
298    
299     #############################################################################
300 root 1.6 # node monitoring and info
301 root 1.3
302 root 1.21 =item node_is_known $nodeid
303 root 1.13
304 root 1.71 #TODO#
305     Returns true iff the given node is currently known to this node.
306 root 1.13
307     =cut
308    
309     sub node_is_known($) {
310     exists $NODE{$_[0]}
311     }
312    
313 root 1.21 =item node_is_up $nodeid
314 root 1.13
315     Returns true if the given node is "up", that is, the kernel thinks it has
316     a working connection to it.
317    
318 root 1.69 If the node is known (to this local node) but not currently connected,
319     returns C<0>. If the node is not known, returns C<undef>.
320 root 1.13
321     =cut
322    
323     sub node_is_up($) {
324     ($NODE{$_[0]} or return)->{transport}
325     ? 1 : 0
326     }
327    
328 root 1.3 =item up_nodes
329    
330 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
331     the node itself).
332 root 1.3
333     =cut
334    
335 root 1.49 sub up_nodes() {
336 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
337 root 1.3 }
338    
339 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
340 root 1.3
341 root 1.27 Registers a callback that is called each time a node goes up (a connection
342     is established) or down (the connection is lost).
343 root 1.3
344     Node up messages can only be followed by node down messages for the same
345     node, and vice versa.
346    
347 root 1.71 Note that monitoring a node is usually better done by monitoring its node
348 root 1.27 port. This function is mainly of interest to modules that are concerned
349     about the network topology and low-level connection handling.
350    
351     Callbacks I<must not> block and I<should not> send any messages.
352    
353     The function returns an optional guard which can be used to unregister
354 root 1.3 the monitoring callback again.
355    
356 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
357     or go up (and down).
358    
359     newnode $_, 1 for up_nodes;
360     mon_nodes \&newnode;
361    
362 root 1.3 =cut
363    
364     our %MON_NODES;
365    
366     sub mon_nodes($) {
367     my ($cb) = @_;
368    
369     $MON_NODES{$cb+0} = $cb;
370    
371 root 1.90 defined wantarray
372     and Guard::guard { delete $MON_NODES{$cb+0} }
373 root 1.3 }
374    
375     sub _inject_nodeevent($$;@) {
376 root 1.16 my ($node, $up, @reason) = @_;
377 root 1.3
378     for my $cb (values %MON_NODES) {
379 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
380 root 1.16 or $WARN->(1, $@);
381 root 1.3 }
382 root 1.16
383 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
384 root 1.3 }
385    
386     #############################################################################
387 root 1.1 # self node code
388    
389 root 1.67 sub _kill {
390     my $port = shift;
391    
392     delete $PORT{$port}
393     or return; # killing nonexistent ports is O.K.
394     delete $PORT_DATA{$port};
395    
396     my $mon = delete $LMON{$port}
397     or !@_
398     or $WARN->(2, "unmonitored local port $port died with reason: @_");
399    
400     $_->(@_) for values %$mon;
401     }
402    
403     sub _monitor {
404     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
405     unless exists $PORT{$_[1]};
406    
407     $LMON{$_[1]}{$_[2]+0} = $_[2];
408     }
409    
410     sub _unmonitor {
411 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
412     if exists $LMON{$_[1]};
413 root 1.67 }
414    
415 root 1.83 sub _secure_check {
416     $SECURE->($SRCNODE->{id})
417 root 1.88 or $SRCNODE->{id} eq $NODE
418 root 1.83 or die "remote execution attempt by insecure node\n";
419     }
420    
421 root 1.79 our %NODE_REQ = (
422 root 1.1 # internal services
423    
424     # monitoring
425 root 1.65 mon0 => sub { # stop monitoring a port for another node
426 root 1.1 my $portid = shift;
427 root 1.91 _unmonitor undef, $portid, delete $NODE{$SRCNODE->{id}}{rmon}{$portid};
428 root 1.1 },
429 root 1.65 mon1 => sub { # start monitoring a port for another node
430 root 1.1 my $portid = shift;
431 root 1.91 Scalar::Util::weaken (my $node = $NODE{$SRCNODE->{id}});
432 root 1.67 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
433 root 1.58 delete $node->{rmon}{$portid};
434 root 1.65 $node->send (["", kil0 => $portid, @_])
435 root 1.59 if $node && $node->{transport};
436 root 1.67 };
437 root 1.1 },
438 root 1.65 # another node has killed a monitored port
439     kil0 => sub {
440 root 1.91 my $cbs = delete $NODE{$SRCNODE->{id}}{lmon}{+shift}
441 root 1.1 or return;
442    
443     $_->(@_) for @$cbs;
444     },
445    
446 root 1.18 # "public" services - not actually public
447 root 1.1
448 root 1.65 # another node wants to kill a local port
449 root 1.66 kil => \&_kill,
450 root 1.65
451 root 1.88 # is the remote node considered secure?
452     # secure => sub {
453     # #TODO#
454     # },
455    
456 root 1.1 # relay message to another node / generic echo
457 root 1.88 snd => sub {
458     &_secure_check;
459     &snd
460 root 1.1 },
461    
462 root 1.30 # random utilities
463 root 1.1 eval => sub {
464 root 1.83 &_secure_check;
465 root 1.50 my @res = do { package main; eval shift };
466 root 1.1 snd @_, "$@", @res if @_;
467     },
468     time => sub {
469 root 1.88 &_secure_check;
470 root 1.76 snd @_, AE::now;
471 root 1.1 },
472     devnull => sub {
473     #
474     },
475 root 1.15 "" => sub {
476 root 1.27 # empty messages are keepalives or similar devnull-applications
477 root 1.15 },
478 root 1.1 );
479    
480 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
481 root 1.1 $PORT{""} = sub {
482     my $tag = shift;
483 root 1.83 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
484     $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@;
485 root 1.1 };
486    
487 root 1.84 our $NPROTO = 1;
488    
489     # tell everybody who connects our nproto
490     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
491     $_[0]{local_greeting}{nproto} = $NPROTO;
492     };
493    
494 root 1.69 #############################################################################
495 root 1.71 # seed management, try to keep connections to all seeds at all times
496 root 1.69
497 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
498     our %NODE_SEED; # map node ID to seed ID
499 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
500     our $SEED_WATCHER;
501 root 1.71 our $SEED_RETRY;
502 root 1.69
503     sub seed_connect {
504     my ($seed) = @_;
505    
506     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
507     or Carp::croak "$seed: unparsable seed address";
508    
509     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
510    
511 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
512     $host, $port,
513     on_greeted => sub {
514     # called after receiving remote greeting, learn remote node name
515    
516     # we rely on untrusted data here (the remote node name) this is
517     # hopefully ok, as this can at most be used for DOSing, which is easy
518     # when you can do MITM anyway.
519    
520     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
521     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
522 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
523 root 1.71 delete $SEED_NODE{$seed};
524     delete $NODE_SEED{$seed};
525     } else {
526     $SEED_NODE{$seed} = $_[0]{remote_node};
527     $NODE_SEED{$_[0]{remote_node}} = $seed;
528     }
529     },
530     on_destroy => sub {
531     delete $SEED_CONNECT{$seed};
532     },
533 root 1.69 sub {
534     $SEED_CONNECT{$seed} = 1;
535 root 1.71 }
536 root 1.69 ;
537     }
538    
539     sub seed_all {
540 root 1.92 my @seeds;
541    
542     for (grep !exists $SEED_CONNECT{$_}, keys %SEED_NODE) {
543     if (defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})) {
544     # node is up, make sure it's running the global service
545     snd $_, "g_slave"
546     unless $NODE{$_}{transport}{remote_greeting}{global};
547     } else {
548     # else node is down, we need to seed
549     push @seeds, $_;
550     }
551     }
552 root 1.69
553     if (@seeds) {
554 root 1.70 # start connection attempt for every seed we are not connected to yet
555 root 1.69 seed_connect $_
556     for @seeds;
557 root 1.71
558     $SEED_RETRY = $SEED_RETRY * 2 + rand;
559     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
560     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
561    
562     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
563    
564 root 1.69 } else {
565 root 1.71 # all seeds connected or connecting, no need to restart timer
566 root 1.69 undef $SEED_WATCHER;
567     }
568     }
569    
570     sub seed_again {
571 root 1.71 $SEED_RETRY = 1;
572     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
573 root 1.69 }
574    
575     # sets new seed list, starts connecting
576     sub set_seeds(@) {
577     %SEED_NODE = ();
578 root 1.71 %NODE_SEED = ();
579     %SEED_CONNECT = ();
580    
581 root 1.69 @SEED_NODE{@_} = ();
582    
583 root 1.71 seed_again;#d#
584     seed_all;
585     }
586    
587     mon_nodes sub {
588     # if we lost the connection to a seed node, make sure we are seeding
589     seed_again
590     if !$_[1] && exists $NODE_SEED{$_[0]};
591     };
592    
593     #############################################################################
594     # talk with/to global nodes
595    
596 root 1.72 # protocol messages:
597     #
598 root 1.73 # sent by all slave nodes (slave to master)
599     # g_slave database - make other global node master of the sender
600 root 1.72 #
601 root 1.73 # sent by any node to global nodes
602     # g_set database - set whole database
603 root 1.89 # g_upd family set del - update single family
604 root 1.73 # g_del family key - delete key from database
605     # g_get family key reply... - send reply with data
606     #
607     # send by global nodes
608     # g_global - node became global, similar to global=1 greeting
609     #
610     # database families
611     # "'l" -> node -> listeners
612     # "'g" -> node -> undef
613     # ...
614 root 1.72 #
615    
616 root 1.73 # used on all nodes:
617 root 1.88 our $MASTER; # the global node we bind ourselves to
618 root 1.78 our $MASTER_MON;
619     our %LOCAL_DB; # this node database
620    
621     our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
622 root 1.71
623 root 1.84 our $GPROTO = 1;
624    
625     # tell everybody who connects our nproto
626     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
627     $_[0]{local_greeting}{gproto} = $GPROTO;
628     };
629    
630 root 1.73 #############################################################################
631     # master selection
632    
633     # master requests
634     our %GLOBAL_REQ; # $id => \@req
635 root 1.71
636 root 1.74 sub global_req_add {
637 root 1.80 my ($id, $req) = @_;
638 root 1.71
639 root 1.74 return if exists $GLOBAL_REQ{$id};
640    
641 root 1.80 $GLOBAL_REQ{$id} = $req;
642 root 1.71
643 root 1.80 snd $MASTER, @$req
644 root 1.73 if $MASTER;
645 root 1.74 }
646 root 1.71
647 root 1.74 sub global_req_del {
648     delete $GLOBAL_REQ{$_[0]};
649     }
650    
651 root 1.88 #################################
652     # master rpc
653    
654     our %GLOBAL_RES;
655     our $GLOBAL_RES_ID = "a";
656    
657     sub global_call {
658     my $id = ++$GLOBAL_RES_ID;
659     $GLOBAL_RES{$id} = pop;
660     global_req_add $id, [@_, $id];
661     }
662    
663     $NODE_REQ{g_reply} = sub {
664     my $id = shift;
665     global_req_del $id;
666     my $cb = delete $GLOBAL_RES{$id}
667     or return;
668     &$cb
669     };
670    
671     #################################
672    
673 root 1.74 sub g_find {
674 root 1.80 global_req_add "g_find $_[0]", [g_find => $_[0]];
675 root 1.73 }
676 root 1.71
677 root 1.73 # reply for g_find started in Node.pm
678 root 1.79 $NODE_REQ{g_found} = sub {
679 root 1.74 global_req_del "g_find $_[0]";
680    
681 root 1.73 my $node = $NODE{$_[0]} or return;
682 root 1.71
683 root 1.79 $node->connect_to ($_[1]);
684 root 1.71 };
685    
686 root 1.73 sub master_set {
687     $MASTER = $_[0];
688 root 1.71
689 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
690 root 1.71
691 root 1.73 # (re-)send queued requests
692     snd $MASTER, @$_
693     for values %GLOBAL_REQ;
694     }
695 root 1.71
696 root 1.72 sub master_search {
697 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
698     for (keys %NODE_SEED) {
699 root 1.72 if (node_is_up $_) {
700     master_set $_;
701     return;
702 root 1.71 }
703 root 1.72 }
704 root 1.71
705 root 1.78 $MASTER_MON = mon_nodes sub {
706 root 1.72 return unless $_[1]; # we are only interested in node-ups
707     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
708 root 1.71
709 root 1.72 master_set $_[0];
710 root 1.71
711 root 1.78 $MASTER_MON = mon_nodes sub {
712 root 1.72 if ($_[0] eq $MASTER && !$_[1]) {
713     undef $MASTER;
714     master_search ();
715     }
716 root 1.71 };
717 root 1.72 };
718 root 1.71 }
719    
720 root 1.73 # other node wants to make us the master
721 root 1.79 $NODE_REQ{g_slave} = sub {
722 root 1.73 my ($db) = @_;
723    
724 root 1.80 # load global module and redo the request
725 root 1.73 require AnyEvent::MP::Global;
726 root 1.80 &{ $NODE_REQ{g_slave} }
727 root 1.71 };
728    
729 root 1.73 #############################################################################
730 root 1.79 # local database operations
731 root 1.71
732 root 1.79 # local database management
733 root 1.88
734 root 1.87 sub db_set($$;$) {
735 root 1.89 # if (ref $_[1]) {
736     # # bulk
737     # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
738     # $LOCAL_DB{$_[0]} = $_[1];
739     # snd $MASTER, g_upd => $_[0] => $_[1], \@del
740     # if defined $MASTER;
741     # } else {
742     # single-key
743     $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
744     snd $MASTER, g_upd => $_[0] => { $_[1] => $_[2] }
745     if defined $MASTER;
746     # }
747 root 1.79 }
748    
749 root 1.89 sub db_del($@) {
750     my $family = shift;
751    
752     delete @{ $LOCAL_DB{$family} }{@_};
753     snd $MASTER, g_upd => $family => undef, \@_
754 root 1.79 if defined $MASTER;
755     }
756    
757     sub db_reg($$;$) {
758     my ($family, $key) = @_;
759     &db_set;
760     Guard::guard { db_del $family => $key }
761     }
762 root 1.71
763 root 1.88 # database query
764    
765     sub db_family {
766     my ($family, $cb) = @_;
767     global_call g_db_family => $family, $cb;
768     }
769    
770     sub db_keys {
771     my ($family, $cb) = @_;
772     global_call g_db_keys => $family, $cb;
773     }
774    
775     sub db_values {
776     my ($family, $cb) = @_;
777     global_call g_db_values => $family, $cb;
778 root 1.80 }
779    
780 root 1.88 # database monitoring
781 root 1.80
782 root 1.81 our %LOCAL_MON; # f, reply
783     our %MON_DB; # f, k, value
784 root 1.80
785 root 1.81 sub db_mon($@) {
786 root 1.84 my ($family, $cb) = @_;
787 root 1.81
788 root 1.84 if (my $db = $MON_DB{$family}) {
789 root 1.89 # we already monitor, so create a "dummy" change event
790     # this is postponed, which might be too late (we could process
791     # change events), so disable the callback at first
792     $LOCAL_MON{$family}{$cb+0} = sub { };
793     AE::postpone {
794     return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
795    
796     # set actual callback
797     $LOCAL_MON{$family}{$cb+0} = $cb;
798     $cb->($db, [keys %$db]);
799     };
800 root 1.81 } else {
801     # new monitor, request chg1 from upstream
802 root 1.89 $LOCAL_MON{$family}{$cb+0} = $cb;
803 root 1.81 global_req_add "mon1 $family" => [g_mon1 => $family];
804     $MON_DB{$family} = {};
805     }
806    
807 root 1.90 defined wantarray
808     and Guard::guard {
809     my $mon = $LOCAL_MON{$family};
810     delete $mon->{$cb+0};
811    
812     unless (%$mon) {
813     global_req_del "mon1 $family";
814    
815     # no global_req, because we don't care if we are not connected
816     snd $MASTER, g_mon0 => $family
817     if $MASTER;
818 root 1.80
819 root 1.90 delete $LOCAL_MON{$family};
820     delete $MON_DB{$family};
821     }
822 root 1.80 }
823     }
824    
825 root 1.82 # full update
826 root 1.80 $NODE_REQ{g_chg1} = sub {
827 root 1.84 my ($f, $ndb) = @_;
828    
829     my $db = $MON_DB{$f};
830 root 1.89 my (@a, @c, @d);
831 root 1.81
832 root 1.82 # add or replace keys
833 root 1.84 while (my ($k, $v) = each %$ndb) {
834 root 1.89 exists $db->{$k}
835     ? push @c, $k
836     : push @a, $k;
837 root 1.84 $db->{$k} = $v;
838 root 1.82 }
839 root 1.81
840 root 1.82 # delete keys that are no longer present
841 root 1.84 for (grep !exists $ndb->{$_}, keys %$db) {
842     delete $db->{$_};
843 root 1.89 push @d, $_;
844 root 1.81 }
845 root 1.84
846 root 1.89 $_->($db, \@a, \@c, \@d)
847 root 1.84 for values %{ $LOCAL_MON{$_[0]} };
848 root 1.80 };
849    
850 root 1.82 # incremental update
851 root 1.84 $NODE_REQ{g_chg2} = sub {
852 root 1.89 my ($family, $set, $del) = @_;
853    
854     my $db = $MON_DB{$family};
855 root 1.84
856 root 1.89 my (@a, @c);
857 root 1.84
858 root 1.89 while (my ($k, $v) = each %$set) {
859     exists $db->{$k}
860     ? push @c, $k
861     : push @a, $k;
862     $db->{$k} = $v;
863     }
864    
865     delete @$db{@$del};
866    
867     $_->($db, \@a, \@c, $del)
868     for values %{ $LOCAL_MON{$family} };
869 root 1.84 };
870 root 1.80
871 root 1.69 #############################################################################
872     # configure
873    
874 root 1.81 sub nodename {
875 root 1.69 require POSIX;
876     (POSIX::uname ())[1]
877     }
878    
879     sub _resolve($) {
880     my ($nodeid) = @_;
881    
882     my $cv = AE::cv;
883     my @res;
884    
885     $cv->begin (sub {
886     my %seen;
887     my @refs;
888     for (sort { $a->[0] <=> $b->[0] } @res) {
889     push @refs, $_->[1] unless $seen{$_->[1]}++
890     }
891     shift->send (@refs);
892     });
893    
894     my $idx;
895     for my $t (split /,/, $nodeid) {
896     my $pri = ++$idx;
897    
898 root 1.81 $t = length $t ? nodename . ":$t" : nodename
899 root 1.69 if $t =~ /^\d*$/;
900    
901     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
902     or Carp::croak "$t: unparsable transport descriptor";
903    
904     $port = "0" if $port eq "*";
905    
906     if ($host eq "*") {
907     $cv->begin;
908     # use fork_call, as Net::Interface is big, and we need it rarely.
909     require AnyEvent::Util;
910     AnyEvent::Util::fork_call (
911     sub {
912     my @addr;
913    
914     require Net::Interface;
915    
916     for my $if (Net::Interface->interfaces) {
917     # we statically lower-prioritise ipv6 here, TODO :()
918     for $_ ($if->address (Net::Interface::AF_INET ())) {
919     next if /^\x7f/; # skip localhost etc.
920     push @addr, $_;
921     }
922     for ($if->address (Net::Interface::AF_INET6 ())) {
923     #next if $if->scope ($_) <= 2;
924     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
925     push @addr, $_;
926     }
927    
928     }
929     @addr
930     }, sub {
931     for my $ip (@_) {
932     push @res, [
933     $pri += 1e-5,
934     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
935     ];
936     }
937     $cv->end;
938     }
939     );
940     } else {
941     $cv->begin;
942     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
943     for (@_) {
944     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
945     push @res, [
946     $pri += 1e-5,
947     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
948     ];
949     }
950     $cv->end;
951     };
952     }
953     }
954    
955     $cv->end;
956    
957     $cv
958     }
959    
960     sub configure(@) {
961     unshift @_, "profile" if @_ & 1;
962     my (%kv) = @_;
963    
964     delete $NODE{$NODE}; # we do not support doing stuff before configure
965     _init_names;
966    
967     my $profile = delete $kv{profile};
968    
969 root 1.81 $profile = nodename
970 root 1.69 unless defined $profile;
971    
972     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
973    
974 root 1.83 if (exists $CONFIG->{secure}) {
975     my $pass = !$CONFIG->{secure};
976     $SECURE = sub { $pass };
977     }
978    
979 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
980 root 1.69
981     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
982    
983 root 1.72 $NODE = $node;
984 root 1.77
985 root 1.81 $NODE =~ s/%n/nodename/ge;
986    
987     if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
988 root 1.77 # nodes with randomised node names do not need randomised port names
989     $UNIQ = "";
990     }
991 root 1.69
992     $NODE{$NODE} = $NODE{""};
993     $NODE{$NODE}{id} = $NODE;
994    
995     my $seeds = $CONFIG->{seeds};
996     my $binds = $CONFIG->{binds};
997    
998     $binds ||= ["*"];
999    
1000     $WARN->(8, "node $NODE starting up.");
1001    
1002 root 1.85 $BINDS = [];
1003     %BINDS = ();
1004 root 1.69
1005     for (map _resolve $_, @$binds) {
1006     for my $bind ($_->recv) {
1007     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1008     or Carp::croak "$bind: unparsable local bind address";
1009    
1010     my $listener = AnyEvent::MP::Transport::mp_server
1011     $host,
1012     $port,
1013     prepare => sub {
1014     my (undef, $host, $port) = @_;
1015     $bind = AnyEvent::Socket::format_hostport $host, $port;
1016     0
1017     },
1018     ;
1019 root 1.85 $BINDS{$bind} = $listener;
1020     push @$BINDS, $bind;
1021 root 1.69 }
1022     }
1023    
1024 root 1.85 db_set "'l" => $NODE => $BINDS;
1025 root 1.73
1026 root 1.85 $WARN->(8, "node listens on [@$BINDS].");
1027 root 1.69
1028     # connect to all seednodes
1029     set_seeds map $_->recv, map _resolve $_, @$seeds;
1030    
1031 root 1.73 master_search;
1032    
1033 root 1.71 if ($NODE eq "atha") {;#d#
1034 root 1.72 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
1035 root 1.71 }
1036    
1037 root 1.69 for (@{ $CONFIG->{services} }) {
1038     if (ref) {
1039     my ($func, @args) = @$_;
1040     (load_func $func)->(@args);
1041     } elsif (s/::$//) {
1042     eval "require $_";
1043     die $@ if $@;
1044     } else {
1045     (load_func $_)->();
1046     }
1047     }
1048     }
1049    
1050 root 1.1 =back
1051    
1052     =head1 SEE ALSO
1053    
1054     L<AnyEvent::MP>.
1055    
1056     =head1 AUTHOR
1057    
1058     Marc Lehmann <schmorp@schmorp.de>
1059     http://home.schmorp.de/
1060    
1061     =cut
1062    
1063     1
1064