ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.97
Committed: Wed Mar 21 23:48:39 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.96: +11 -12 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29    
30 root 1.79 use AnyEvent ();
31     use Guard ();
32 root 1.1
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.71 our @EXPORT_OK = qw(
39     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40     );
41    
42 root 1.1 our @EXPORT = qw(
43 root 1.21 add_node load_func snd_to_func snd_on eval_on
44 root 1.1
45 root 1.34 NODE $NODE node_of snd kil port_is_local
46     configure
47 root 1.46 up_nodes mon_nodes node_is_up
48 root 1.97 db_set db_del
49 root 1.84 db_mon db_family db_keys db_values
50 root 1.1 );
51    
52 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
53 root 1.1
54 root 1.27 This value is called with an error or warning message, when e.g. a
55     connection could not be created, authorisation failed and so on.
56    
57     It I<must not> block or send messages -queue it and use an idle watcher if
58     you need to do any of these things.
59 root 1.1
60 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
61 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62     node connectivity and services, C<8> for debugging messages and C<9> for
63     tracing messages.
64    
65 root 1.1 The default simply logs the message to STDERR.
66    
67 root 1.44 =item @AnyEvent::MP::Kernel::WARN
68    
69     All code references in this array are called for every log message, from
70     the default C<$WARN> handler. This is an easy way to tie into the log
71     messages without disturbing others.
72    
73 root 1.1 =cut
74    
75 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
76 root 1.44 our @WARN;
77     our $WARN = sub {
78     &$_ for @WARN;
79 root 1.29
80     return if $WARNLEVEL < $_[0];
81    
82 root 1.16 my ($level, $msg) = @_;
83    
84 root 1.1 $msg =~ s/\n$//;
85 root 1.16
86     printf STDERR "%s <%d> %s\n",
87     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
88     $level,
89     $msg;
90 root 1.1 };
91    
92 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
93    
94     The maximum level at which warning messages will be printed to STDERR by
95     the default warn handler.
96    
97     =cut
98    
99 root 1.6 sub load_func($) {
100     my $func = $_[0];
101    
102     unless (defined &$func) {
103     my $pkg = $func;
104     do {
105     $pkg =~ s/::[^:]+$//
106 root 1.63 or return sub { die "unable to resolve function '$func'" };
107 root 1.60
108     local $@;
109 root 1.61 unless (eval "require $pkg; 1") {
110     my $error = $@;
111     $error =~ /^Can't locate .*.pm in \@INC \(/
112     or return sub { die $error };
113     }
114 root 1.6 } until defined &$func;
115     }
116    
117     \&$func
118     }
119    
120 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
121    
122 root 1.1 sub nonce($) {
123 root 1.78 join "", map chr rand 256, 1 .. $_[0]
124 root 1.1 }
125    
126 root 1.78 sub nonce62($) {
127     join "", map $alnum[rand 62], 1 .. $_[0]
128 root 1.1 }
129    
130     sub gen_uniq {
131 root 1.78 my $now = AE::now;
132     (join "",
133     map $alnum[$_],
134     $$ / 62 % 62,
135     $$ % 62,
136     (int $now ) % 62,
137     (int $now * 100) % 62,
138     (int $now * 10000) % 62,
139     ) . nonce62 4;
140 root 1.1 }
141    
142 root 1.20 our $CONFIG; # this node's configuration
143 root 1.83 our $SECURE = sub { 1 };
144 root 1.21
145 root 1.64 our $RUNIQ; # remote uniq value
146     our $UNIQ; # per-process/node unique cookie
147     our $NODE;
148     our $ID = "a";
149 root 1.1
150     our %NODE; # node id to transport mapping, or "undef", for local node
151     our (%PORT, %PORT_DATA); # local ports
152    
153 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
154 root 1.1 our %LMON; # monitored _local_ ports
155    
156 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
157 root 1.85 our %BINDS;
158     our $BINDS; # our listeners, as arrayref
159 root 1.1
160 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
161 root 1.1
162 root 1.69 sub _init_names {
163 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
164     $UNIQ = gen_uniq;
165    
166     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
167     $RUNIQ = nonce62 10;
168     $RUNIQ =~ s/(.)$/\U$1/;
169    
170     $NODE = "anon/$RUNIQ";
171 root 1.64 }
172    
173 root 1.69 _init_names;
174 root 1.64
175 root 1.1 sub NODE() {
176     $NODE
177     }
178    
179     sub node_of($) {
180 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
181 root 1.1
182 root 1.21 $node
183 root 1.1 }
184    
185 root 1.17 BEGIN {
186     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
187     ? sub () { 1 }
188     : sub () { 0 };
189     }
190 root 1.1
191 root 1.42 our $DELAY_TIMER;
192     our @DELAY_QUEUE;
193    
194 root 1.97 our $delay_run = sub {
195 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
196 root 1.97 };
197 root 1.42
198     sub delay($) {
199     push @DELAY_QUEUE, shift;
200 root 1.97 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
201 root 1.42 }
202    
203 root 1.96 =item $AnyEvent::MP::Kernel::SRCNODE
204    
205     During execution of a message callback, this variable contains the node ID
206     of the origin node.
207    
208     The main use of this variable is for debugging output - there are probably
209     very few other cases where you need to know the source node ID.
210    
211     =cut
212    
213 root 1.1 sub _inject {
214 root 1.96 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
215 root 1.1 &{ $PORT{+shift} or return };
216     }
217    
218 root 1.20 # this function adds a node-ref, so you can send stuff to it
219     # it is basically the central routing component.
220 root 1.1 sub add_node {
221 root 1.21 my ($node) = @_;
222 root 1.1
223 root 1.94 length $node
224     or Carp::croak "'undef' or the empty string are not valid node/port IDs";
225 root 1.93
226 root 1.71 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
227 root 1.13 }
228    
229 root 1.1 sub snd(@) {
230 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
231 root 1.1
232 root 1.86 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
233 root 1.1
234 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
235 root 1.2 ->{send} (["$portid", @_]);
236 root 1.1 }
237    
238 root 1.17 =item $is_local = port_is_local $port
239    
240     Returns true iff the port is a local port.
241    
242     =cut
243    
244     sub port_is_local($) {
245 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
246 root 1.17
247 root 1.21 $NODE{$nodeid} == $NODE{""}
248 root 1.17 }
249    
250 root 1.18 =item snd_to_func $node, $func, @args
251 root 1.11
252 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
253 root 1.11 this function with the given arguments on that node.
254    
255 root 1.20 This function can be used to implement C<spawn>-like interfaces.
256 root 1.11
257     =cut
258    
259 root 1.18 sub snd_to_func($$;@) {
260 root 1.21 my $nodeid = shift;
261 root 1.11
262 root 1.41 # on $NODE, we artificially delay... (for spawn)
263     # this is very ugly - maybe we should simply delay ALL messages,
264     # to avoid deep recursion issues. but that's so... slow...
265 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
266     if $nodeid ne $NODE;
267    
268 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
269 root 1.11 }
270    
271 root 1.18 =item snd_on $node, @msg
272    
273     Executes C<snd> with the given C<@msg> (which must include the destination
274     port) on the given node.
275    
276     =cut
277    
278     sub snd_on($@) {
279     my $node = shift;
280     snd $node, snd => @_;
281     }
282    
283 root 1.29 =item eval_on $node, $string[, @reply]
284 root 1.18
285 root 1.29 Evaluates the given string as Perl expression on the given node. When
286     @reply is specified, then it is used to construct a reply message with
287     C<"$@"> and any results from the eval appended.
288 root 1.18
289     =cut
290    
291 root 1.29 sub eval_on($$;@) {
292 root 1.18 my $node = shift;
293     snd $node, eval => @_;
294     }
295    
296 root 1.1 sub kil(@) {
297 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
298 root 1.1
299     length $portid
300 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
301 root 1.1
302 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
303 root 1.1 ->kill ("$portid", @_);
304     }
305    
306     #############################################################################
307 root 1.6 # node monitoring and info
308 root 1.3
309 root 1.21 =item node_is_up $nodeid
310 root 1.13
311     Returns true if the given node is "up", that is, the kernel thinks it has
312     a working connection to it.
313    
314 root 1.95 If the node is up, returns C<1>. If the node is currently connecting or
315     otherwise known but not connected, returns C<0>. If nothing is known about
316     the node, returns C<undef>.
317 root 1.13
318     =cut
319    
320     sub node_is_up($) {
321     ($NODE{$_[0]} or return)->{transport}
322     ? 1 : 0
323     }
324    
325 root 1.3 =item up_nodes
326    
327 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
328     the node itself).
329 root 1.3
330     =cut
331    
332 root 1.49 sub up_nodes() {
333 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
334 root 1.3 }
335    
336 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
337 root 1.3
338 root 1.27 Registers a callback that is called each time a node goes up (a connection
339     is established) or down (the connection is lost).
340 root 1.3
341     Node up messages can only be followed by node down messages for the same
342     node, and vice versa.
343    
344 root 1.71 Note that monitoring a node is usually better done by monitoring its node
345 root 1.27 port. This function is mainly of interest to modules that are concerned
346     about the network topology and low-level connection handling.
347    
348     Callbacks I<must not> block and I<should not> send any messages.
349    
350     The function returns an optional guard which can be used to unregister
351 root 1.3 the monitoring callback again.
352    
353 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
354     or go up (and down).
355    
356     newnode $_, 1 for up_nodes;
357     mon_nodes \&newnode;
358    
359 root 1.3 =cut
360    
361     our %MON_NODES;
362    
363     sub mon_nodes($) {
364     my ($cb) = @_;
365    
366     $MON_NODES{$cb+0} = $cb;
367    
368 root 1.90 defined wantarray
369     and Guard::guard { delete $MON_NODES{$cb+0} }
370 root 1.3 }
371    
372     sub _inject_nodeevent($$;@) {
373 root 1.16 my ($node, $up, @reason) = @_;
374 root 1.3
375     for my $cb (values %MON_NODES) {
376 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
377 root 1.16 or $WARN->(1, $@);
378 root 1.3 }
379 root 1.16
380 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
381 root 1.3 }
382    
383     #############################################################################
384 root 1.1 # self node code
385    
386 root 1.67 sub _kill {
387     my $port = shift;
388    
389     delete $PORT{$port}
390     or return; # killing nonexistent ports is O.K.
391     delete $PORT_DATA{$port};
392    
393     my $mon = delete $LMON{$port}
394     or !@_
395     or $WARN->(2, "unmonitored local port $port died with reason: @_");
396    
397     $_->(@_) for values %$mon;
398     }
399    
400     sub _monitor {
401     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
402     unless exists $PORT{$_[1]};
403    
404     $LMON{$_[1]}{$_[2]+0} = $_[2];
405     }
406    
407     sub _unmonitor {
408 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
409     if exists $LMON{$_[1]};
410 root 1.67 }
411    
412 root 1.83 sub _secure_check {
413 root 1.96 &$SECURE
414 root 1.83 or die "remote execution attempt by insecure node\n";
415     }
416    
417 root 1.79 our %NODE_REQ = (
418 root 1.1 # internal services
419    
420     # monitoring
421 root 1.65 mon0 => sub { # stop monitoring a port for another node
422 root 1.1 my $portid = shift;
423 root 1.96 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
424 root 1.1 },
425 root 1.65 mon1 => sub { # start monitoring a port for another node
426 root 1.1 my $portid = shift;
427 root 1.96 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
428 root 1.67 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
429 root 1.58 delete $node->{rmon}{$portid};
430 root 1.65 $node->send (["", kil0 => $portid, @_])
431 root 1.59 if $node && $node->{transport};
432 root 1.67 };
433 root 1.1 },
434 root 1.65 # another node has killed a monitored port
435     kil0 => sub {
436 root 1.96 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
437 root 1.1 or return;
438    
439     $_->(@_) for @$cbs;
440     },
441    
442 root 1.18 # "public" services - not actually public
443 root 1.1
444 root 1.65 # another node wants to kill a local port
445 root 1.66 kil => \&_kill,
446 root 1.65
447 root 1.1 # relay message to another node / generic echo
448 root 1.88 snd => sub {
449     &_secure_check;
450     &snd
451 root 1.1 },
452    
453 root 1.30 # random utilities
454 root 1.1 eval => sub {
455 root 1.83 &_secure_check;
456 root 1.50 my @res = do { package main; eval shift };
457 root 1.1 snd @_, "$@", @res if @_;
458     },
459     time => sub {
460 root 1.88 &_secure_check;
461 root 1.76 snd @_, AE::now;
462 root 1.1 },
463     devnull => sub {
464     #
465     },
466 root 1.15 "" => sub {
467 root 1.27 # empty messages are keepalives or similar devnull-applications
468 root 1.15 },
469 root 1.1 );
470    
471 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
472 root 1.1 $PORT{""} = sub {
473     my $tag = shift;
474 root 1.83 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
475 root 1.96 $WARN->(2, "error processing node message from $SRCNODE: $@") if $@;
476 root 1.1 };
477    
478 root 1.84 our $NPROTO = 1;
479    
480     # tell everybody who connects our nproto
481     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
482     $_[0]{local_greeting}{nproto} = $NPROTO;
483     };
484    
485 root 1.69 #############################################################################
486 root 1.71 # seed management, try to keep connections to all seeds at all times
487 root 1.69
488 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
489     our %NODE_SEED; # map node ID to seed ID
490 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
491     our $SEED_WATCHER;
492 root 1.71 our $SEED_RETRY;
493 root 1.69
494     sub seed_connect {
495     my ($seed) = @_;
496    
497     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
498     or Carp::croak "$seed: unparsable seed address";
499    
500     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
501    
502 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
503     $host, $port,
504     on_greeted => sub {
505     # called after receiving remote greeting, learn remote node name
506    
507     # we rely on untrusted data here (the remote node name) this is
508     # hopefully ok, as this can at most be used for DOSing, which is easy
509     # when you can do MITM anyway.
510    
511     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
512     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
513 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
514 root 1.71 delete $SEED_NODE{$seed};
515     } else {
516     $SEED_NODE{$seed} = $_[0]{remote_node};
517     $NODE_SEED{$_[0]{remote_node}} = $seed;
518 root 1.93 # also start global service, if not running
519     # we need to check here in addition to the mon_nodes below
520     # because we might only learn late that a node is a seed
521     # and then we might already be connected
522     snd $_[0]{remote_node}, "g_slave"
523     unless $_[0]{remote_greeting}{global};
524 root 1.71 }
525     },
526 root 1.93 sub {
527 root 1.71 delete $SEED_CONNECT{$seed};
528     }
529 root 1.69 ;
530     }
531    
532     sub seed_all {
533 root 1.93 my @seeds = grep
534     !exists $SEED_CONNECT{$_}
535     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
536     keys %SEED_NODE;
537 root 1.69
538     if (@seeds) {
539 root 1.70 # start connection attempt for every seed we are not connected to yet
540 root 1.69 seed_connect $_
541     for @seeds;
542 root 1.71
543     $SEED_RETRY = $SEED_RETRY * 2 + rand;
544     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
545     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
546    
547     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
548    
549 root 1.69 } else {
550 root 1.71 # all seeds connected or connecting, no need to restart timer
551 root 1.69 undef $SEED_WATCHER;
552     }
553     }
554    
555     sub seed_again {
556 root 1.71 $SEED_RETRY = 1;
557     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
558 root 1.69 }
559    
560     # sets new seed list, starts connecting
561     sub set_seeds(@) {
562     %SEED_NODE = ();
563 root 1.71 %NODE_SEED = ();
564     %SEED_CONNECT = ();
565    
566 root 1.69 @SEED_NODE{@_} = ();
567    
568 root 1.71 seed_all;
569     }
570    
571     mon_nodes sub {
572 root 1.93 return unless exists $NODE_SEED{$_[0]};
573    
574     if ($_[1]) {
575     # each time a connection to a seed node goes up, make
576     # sure it runs the global service.
577     snd $_[0], "g_slave"
578     unless $NODE{$_[0]}{transport}{remote_greeting}{global};
579     } else {
580     # if we lost the connection to a seed node, make sure we are seeding
581     seed_again;
582     }
583 root 1.71 };
584    
585     #############################################################################
586     # talk with/to global nodes
587    
588 root 1.72 # protocol messages:
589     #
590 root 1.73 # sent by all slave nodes (slave to master)
591     # g_slave database - make other global node master of the sender
592 root 1.72 #
593 root 1.73 # sent by any node to global nodes
594     # g_set database - set whole database
595 root 1.89 # g_upd family set del - update single family
596 root 1.73 # g_del family key - delete key from database
597     # g_get family key reply... - send reply with data
598     #
599     # send by global nodes
600     # g_global - node became global, similar to global=1 greeting
601     #
602     # database families
603     # "'l" -> node -> listeners
604     # "'g" -> node -> undef
605     # ...
606 root 1.72 #
607    
608 root 1.73 # used on all nodes:
609 root 1.88 our $MASTER; # the global node we bind ourselves to
610 root 1.78 our $MASTER_MON;
611     our %LOCAL_DB; # this node database
612    
613     our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
614 root 1.71
615 root 1.84 our $GPROTO = 1;
616    
617     # tell everybody who connects our nproto
618     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
619     $_[0]{local_greeting}{gproto} = $GPROTO;
620     };
621    
622 root 1.73 #############################################################################
623     # master selection
624    
625     # master requests
626     our %GLOBAL_REQ; # $id => \@req
627 root 1.71
628 root 1.74 sub global_req_add {
629 root 1.80 my ($id, $req) = @_;
630 root 1.71
631 root 1.74 return if exists $GLOBAL_REQ{$id};
632    
633 root 1.80 $GLOBAL_REQ{$id} = $req;
634 root 1.71
635 root 1.80 snd $MASTER, @$req
636 root 1.73 if $MASTER;
637 root 1.74 }
638 root 1.71
639 root 1.74 sub global_req_del {
640     delete $GLOBAL_REQ{$_[0]};
641     }
642    
643 root 1.88 #################################
644     # master rpc
645    
646     our %GLOBAL_RES;
647     our $GLOBAL_RES_ID = "a";
648    
649     sub global_call {
650     my $id = ++$GLOBAL_RES_ID;
651     $GLOBAL_RES{$id} = pop;
652     global_req_add $id, [@_, $id];
653     }
654    
655     $NODE_REQ{g_reply} = sub {
656     my $id = shift;
657     global_req_del $id;
658     my $cb = delete $GLOBAL_RES{$id}
659     or return;
660     &$cb
661     };
662    
663     #################################
664    
665 root 1.74 sub g_find {
666 root 1.80 global_req_add "g_find $_[0]", [g_find => $_[0]];
667 root 1.73 }
668 root 1.71
669 root 1.73 # reply for g_find started in Node.pm
670 root 1.79 $NODE_REQ{g_found} = sub {
671 root 1.74 global_req_del "g_find $_[0]";
672    
673 root 1.73 my $node = $NODE{$_[0]} or return;
674 root 1.71
675 root 1.79 $node->connect_to ($_[1]);
676 root 1.71 };
677    
678 root 1.73 sub master_set {
679     $MASTER = $_[0];
680 root 1.71
681 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
682 root 1.71
683 root 1.73 # (re-)send queued requests
684     snd $MASTER, @$_
685     for values %GLOBAL_REQ;
686     }
687 root 1.71
688 root 1.72 sub master_search {
689 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
690     for (keys %NODE_SEED) {
691 root 1.72 if (node_is_up $_) {
692     master_set $_;
693     return;
694 root 1.71 }
695 root 1.72 }
696 root 1.71
697 root 1.78 $MASTER_MON = mon_nodes sub {
698 root 1.72 return unless $_[1]; # we are only interested in node-ups
699     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
700 root 1.71
701 root 1.72 master_set $_[0];
702 root 1.71
703 root 1.78 $MASTER_MON = mon_nodes sub {
704 root 1.72 if ($_[0] eq $MASTER && !$_[1]) {
705     undef $MASTER;
706     master_search ();
707     }
708 root 1.71 };
709 root 1.72 };
710 root 1.71 }
711    
712 root 1.73 # other node wants to make us the master
713 root 1.79 $NODE_REQ{g_slave} = sub {
714 root 1.73 my ($db) = @_;
715    
716 root 1.80 # load global module and redo the request
717 root 1.73 require AnyEvent::MP::Global;
718 root 1.80 &{ $NODE_REQ{g_slave} }
719 root 1.71 };
720    
721 root 1.73 #############################################################################
722 root 1.79 # local database operations
723 root 1.71
724 root 1.79 # local database management
725 root 1.88
726 root 1.87 sub db_set($$;$) {
727 root 1.97 my ($family, $subkey) = @_;
728    
729 root 1.89 # if (ref $_[1]) {
730     # # bulk
731     # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
732     # $LOCAL_DB{$_[0]} = $_[1];
733     # snd $MASTER, g_upd => $_[0] => $_[1], \@del
734     # if defined $MASTER;
735     # } else {
736     # single-key
737 root 1.97 $LOCAL_DB{$family}{$subkey} = $_[2];
738     snd $MASTER, g_upd => $family => { $subkey => $_[2] }
739 root 1.89 if defined $MASTER;
740     # }
741 root 1.97
742     defined wantarray
743     and Guard::guard { db_del $family => $subkey }
744 root 1.79 }
745    
746 root 1.89 sub db_del($@) {
747     my $family = shift;
748    
749     delete @{ $LOCAL_DB{$family} }{@_};
750     snd $MASTER, g_upd => $family => undef, \@_
751 root 1.79 if defined $MASTER;
752     }
753    
754 root 1.88 # database query
755    
756     sub db_family {
757     my ($family, $cb) = @_;
758     global_call g_db_family => $family, $cb;
759     }
760    
761     sub db_keys {
762     my ($family, $cb) = @_;
763     global_call g_db_keys => $family, $cb;
764     }
765    
766     sub db_values {
767     my ($family, $cb) = @_;
768     global_call g_db_values => $family, $cb;
769 root 1.80 }
770    
771 root 1.88 # database monitoring
772 root 1.80
773 root 1.81 our %LOCAL_MON; # f, reply
774     our %MON_DB; # f, k, value
775 root 1.80
776 root 1.81 sub db_mon($@) {
777 root 1.84 my ($family, $cb) = @_;
778 root 1.81
779 root 1.84 if (my $db = $MON_DB{$family}) {
780 root 1.89 # we already monitor, so create a "dummy" change event
781     # this is postponed, which might be too late (we could process
782     # change events), so disable the callback at first
783     $LOCAL_MON{$family}{$cb+0} = sub { };
784     AE::postpone {
785     return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
786    
787     # set actual callback
788     $LOCAL_MON{$family}{$cb+0} = $cb;
789     $cb->($db, [keys %$db]);
790     };
791 root 1.81 } else {
792     # new monitor, request chg1 from upstream
793 root 1.89 $LOCAL_MON{$family}{$cb+0} = $cb;
794 root 1.81 global_req_add "mon1 $family" => [g_mon1 => $family];
795     $MON_DB{$family} = {};
796     }
797    
798 root 1.90 defined wantarray
799     and Guard::guard {
800     my $mon = $LOCAL_MON{$family};
801     delete $mon->{$cb+0};
802    
803     unless (%$mon) {
804     global_req_del "mon1 $family";
805    
806     # no global_req, because we don't care if we are not connected
807     snd $MASTER, g_mon0 => $family
808     if $MASTER;
809 root 1.80
810 root 1.90 delete $LOCAL_MON{$family};
811     delete $MON_DB{$family};
812     }
813 root 1.80 }
814     }
815    
816 root 1.82 # full update
817 root 1.80 $NODE_REQ{g_chg1} = sub {
818 root 1.96 return unless $SRCNODE eq $MASTER;
819 root 1.84 my ($f, $ndb) = @_;
820    
821     my $db = $MON_DB{$f};
822 root 1.89 my (@a, @c, @d);
823 root 1.81
824 root 1.82 # add or replace keys
825 root 1.84 while (my ($k, $v) = each %$ndb) {
826 root 1.89 exists $db->{$k}
827     ? push @c, $k
828     : push @a, $k;
829 root 1.84 $db->{$k} = $v;
830 root 1.82 }
831 root 1.81
832 root 1.82 # delete keys that are no longer present
833 root 1.84 for (grep !exists $ndb->{$_}, keys %$db) {
834     delete $db->{$_};
835 root 1.89 push @d, $_;
836 root 1.81 }
837 root 1.84
838 root 1.89 $_->($db, \@a, \@c, \@d)
839 root 1.84 for values %{ $LOCAL_MON{$_[0]} };
840 root 1.80 };
841    
842 root 1.82 # incremental update
843 root 1.84 $NODE_REQ{g_chg2} = sub {
844 root 1.96 return unless $SRCNODE eq $MASTER;
845 root 1.89 my ($family, $set, $del) = @_;
846    
847     my $db = $MON_DB{$family};
848 root 1.84
849 root 1.89 my (@a, @c);
850 root 1.84
851 root 1.89 while (my ($k, $v) = each %$set) {
852     exists $db->{$k}
853     ? push @c, $k
854     : push @a, $k;
855     $db->{$k} = $v;
856     }
857    
858     delete @$db{@$del};
859    
860     $_->($db, \@a, \@c, $del)
861     for values %{ $LOCAL_MON{$family} };
862 root 1.84 };
863 root 1.80
864 root 1.69 #############################################################################
865     # configure
866    
867 root 1.81 sub nodename {
868 root 1.69 require POSIX;
869     (POSIX::uname ())[1]
870     }
871    
872     sub _resolve($) {
873     my ($nodeid) = @_;
874    
875     my $cv = AE::cv;
876     my @res;
877    
878     $cv->begin (sub {
879     my %seen;
880     my @refs;
881     for (sort { $a->[0] <=> $b->[0] } @res) {
882     push @refs, $_->[1] unless $seen{$_->[1]}++
883     }
884     shift->send (@refs);
885     });
886    
887     my $idx;
888     for my $t (split /,/, $nodeid) {
889     my $pri = ++$idx;
890    
891 root 1.81 $t = length $t ? nodename . ":$t" : nodename
892 root 1.69 if $t =~ /^\d*$/;
893    
894     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
895     or Carp::croak "$t: unparsable transport descriptor";
896    
897     $port = "0" if $port eq "*";
898    
899     if ($host eq "*") {
900     $cv->begin;
901     # use fork_call, as Net::Interface is big, and we need it rarely.
902     require AnyEvent::Util;
903     AnyEvent::Util::fork_call (
904     sub {
905     my @addr;
906    
907     require Net::Interface;
908    
909     for my $if (Net::Interface->interfaces) {
910     # we statically lower-prioritise ipv6 here, TODO :()
911     for $_ ($if->address (Net::Interface::AF_INET ())) {
912     next if /^\x7f/; # skip localhost etc.
913     push @addr, $_;
914     }
915     for ($if->address (Net::Interface::AF_INET6 ())) {
916     #next if $if->scope ($_) <= 2;
917     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
918     push @addr, $_;
919     }
920    
921     }
922     @addr
923     }, sub {
924     for my $ip (@_) {
925     push @res, [
926     $pri += 1e-5,
927     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
928     ];
929     }
930     $cv->end;
931     }
932     );
933     } else {
934     $cv->begin;
935     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
936     for (@_) {
937     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
938     push @res, [
939     $pri += 1e-5,
940     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
941     ];
942     }
943     $cv->end;
944     };
945     }
946     }
947    
948     $cv->end;
949    
950     $cv
951     }
952    
953     sub configure(@) {
954     unshift @_, "profile" if @_ & 1;
955     my (%kv) = @_;
956    
957     delete $NODE{$NODE}; # we do not support doing stuff before configure
958     _init_names;
959    
960     my $profile = delete $kv{profile};
961    
962 root 1.81 $profile = nodename
963 root 1.69 unless defined $profile;
964    
965     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
966    
967 root 1.83 if (exists $CONFIG->{secure}) {
968 root 1.96 $SECURE = eval $CONFIG->{secure} ? "sub { 0 }" : "sub { 1 }";
969 root 1.83 }
970    
971 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
972 root 1.69
973     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
974    
975 root 1.72 $NODE = $node;
976 root 1.77
977 root 1.81 $NODE =~ s/%n/nodename/ge;
978    
979     if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
980 root 1.77 # nodes with randomised node names do not need randomised port names
981     $UNIQ = "";
982     }
983 root 1.69
984     $NODE{$NODE} = $NODE{""};
985     $NODE{$NODE}{id} = $NODE;
986    
987     my $seeds = $CONFIG->{seeds};
988     my $binds = $CONFIG->{binds};
989    
990     $binds ||= ["*"];
991    
992     $WARN->(8, "node $NODE starting up.");
993    
994 root 1.85 $BINDS = [];
995     %BINDS = ();
996 root 1.69
997     for (map _resolve $_, @$binds) {
998     for my $bind ($_->recv) {
999     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1000     or Carp::croak "$bind: unparsable local bind address";
1001    
1002     my $listener = AnyEvent::MP::Transport::mp_server
1003     $host,
1004     $port,
1005     prepare => sub {
1006     my (undef, $host, $port) = @_;
1007     $bind = AnyEvent::Socket::format_hostport $host, $port;
1008     0
1009     },
1010     ;
1011 root 1.85 $BINDS{$bind} = $listener;
1012     push @$BINDS, $bind;
1013 root 1.69 }
1014     }
1015    
1016 root 1.85 db_set "'l" => $NODE => $BINDS;
1017 root 1.73
1018 root 1.85 $WARN->(8, "node listens on [@$BINDS].");
1019 root 1.69
1020     # connect to all seednodes
1021     set_seeds map $_->recv, map _resolve $_, @$seeds;
1022    
1023 root 1.73 master_search;
1024    
1025 root 1.69 for (@{ $CONFIG->{services} }) {
1026     if (ref) {
1027     my ($func, @args) = @$_;
1028     (load_func $func)->(@args);
1029     } elsif (s/::$//) {
1030     eval "require $_";
1031     die $@ if $@;
1032     } else {
1033     (load_func $_)->();
1034     }
1035     }
1036     }
1037    
1038 root 1.1 =back
1039    
1040     =head1 SEE ALSO
1041    
1042     L<AnyEvent::MP>.
1043    
1044     =head1 AUTHOR
1045    
1046     Marc Lehmann <schmorp@schmorp.de>
1047     http://home.schmorp.de/
1048    
1049     =cut
1050    
1051     1
1052