ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.93
Committed: Wed Mar 14 23:34:10 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.92: +25 -31 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29    
30 root 1.79 use AnyEvent ();
31     use Guard ();
32 root 1.1
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.71 our @EXPORT_OK = qw(
39     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40     );
41    
42 root 1.1 our @EXPORT = qw(
43 root 1.21 add_node load_func snd_to_func snd_on eval_on
44 root 1.1
45 root 1.34 NODE $NODE node_of snd kil port_is_local
46     configure
47 root 1.46 up_nodes mon_nodes node_is_up
48 root 1.79 db_set db_del db_reg
49 root 1.84 db_mon db_family db_keys db_values
50 root 1.1 );
51    
52 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
53 root 1.1
54 root 1.27 This value is called with an error or warning message, when e.g. a
55     connection could not be created, authorisation failed and so on.
56    
57     It I<must not> block or send messages -queue it and use an idle watcher if
58     you need to do any of these things.
59 root 1.1
60 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
61 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62     node connectivity and services, C<8> for debugging messages and C<9> for
63     tracing messages.
64    
65 root 1.1 The default simply logs the message to STDERR.
66    
67 root 1.44 =item @AnyEvent::MP::Kernel::WARN
68    
69     All code references in this array are called for every log message, from
70     the default C<$WARN> handler. This is an easy way to tie into the log
71     messages without disturbing others.
72    
73 root 1.1 =cut
74    
75 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
76 root 1.44 our @WARN;
77     our $WARN = sub {
78     &$_ for @WARN;
79 root 1.29
80     return if $WARNLEVEL < $_[0];
81    
82 root 1.16 my ($level, $msg) = @_;
83    
84 root 1.1 $msg =~ s/\n$//;
85 root 1.16
86     printf STDERR "%s <%d> %s\n",
87     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
88     $level,
89     $msg;
90 root 1.1 };
91    
92 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
93    
94     The maximum level at which warning messages will be printed to STDERR by
95     the default warn handler.
96    
97     =cut
98    
99 root 1.6 sub load_func($) {
100     my $func = $_[0];
101    
102     unless (defined &$func) {
103     my $pkg = $func;
104     do {
105     $pkg =~ s/::[^:]+$//
106 root 1.63 or return sub { die "unable to resolve function '$func'" };
107 root 1.60
108     local $@;
109 root 1.61 unless (eval "require $pkg; 1") {
110     my $error = $@;
111     $error =~ /^Can't locate .*.pm in \@INC \(/
112     or return sub { die $error };
113     }
114 root 1.6 } until defined &$func;
115     }
116    
117     \&$func
118     }
119    
120 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
121    
122 root 1.1 sub nonce($) {
123 root 1.78 join "", map chr rand 256, 1 .. $_[0]
124 root 1.1 }
125    
126 root 1.78 sub nonce62($) {
127     join "", map $alnum[rand 62], 1 .. $_[0]
128 root 1.1 }
129    
130     sub gen_uniq {
131 root 1.78 my $now = AE::now;
132     (join "",
133     map $alnum[$_],
134     $$ / 62 % 62,
135     $$ % 62,
136     (int $now ) % 62,
137     (int $now * 100) % 62,
138     (int $now * 10000) % 62,
139     ) . nonce62 4;
140 root 1.1 }
141    
142 root 1.20 our $CONFIG; # this node's configuration
143 root 1.83 our $SECURE = sub { 1 };
144 root 1.21
145 root 1.64 our $RUNIQ; # remote uniq value
146     our $UNIQ; # per-process/node unique cookie
147     our $NODE;
148     our $ID = "a";
149 root 1.1
150     our %NODE; # node id to transport mapping, or "undef", for local node
151     our (%PORT, %PORT_DATA); # local ports
152    
153 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
154 root 1.1 our %LMON; # monitored _local_ ports
155    
156 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
157 root 1.85 our %BINDS;
158     our $BINDS; # our listeners, as arrayref
159 root 1.1
160 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
161 root 1.1
162 root 1.69 sub _init_names {
163 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
164     $UNIQ = gen_uniq;
165    
166     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
167     $RUNIQ = nonce62 10;
168     $RUNIQ =~ s/(.)$/\U$1/;
169    
170     $NODE = "anon/$RUNIQ";
171 root 1.64 }
172    
173 root 1.69 _init_names;
174 root 1.64
175 root 1.1 sub NODE() {
176     $NODE
177     }
178    
179     sub node_of($) {
180 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
181 root 1.1
182 root 1.21 $node
183 root 1.1 }
184    
185 root 1.17 BEGIN {
186     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
187     ? sub () { 1 }
188     : sub () { 0 };
189     }
190 root 1.1
191 root 1.42 our $DELAY_TIMER;
192     our @DELAY_QUEUE;
193    
194     sub _delay_run {
195 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
196 root 1.42 }
197    
198     sub delay($) {
199     push @DELAY_QUEUE, shift;
200     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
201     }
202    
203 root 1.1 sub _inject {
204 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
205 root 1.1 &{ $PORT{+shift} or return };
206     }
207    
208 root 1.20 # this function adds a node-ref, so you can send stuff to it
209     # it is basically the central routing component.
210 root 1.1 sub add_node {
211 root 1.21 my ($node) = @_;
212 root 1.1
213 root 1.93 defined $node #d#UGLY
214     or Carp::croak "'undef' is not a valid node ID/port ID";
215    
216 root 1.71 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
217 root 1.13 }
218    
219 root 1.1 sub snd(@) {
220 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
221 root 1.1
222 root 1.86 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
223 root 1.1
224 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
225 root 1.2 ->{send} (["$portid", @_]);
226 root 1.1 }
227    
228 root 1.17 =item $is_local = port_is_local $port
229    
230     Returns true iff the port is a local port.
231    
232     =cut
233    
234     sub port_is_local($) {
235 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
236 root 1.17
237 root 1.21 $NODE{$nodeid} == $NODE{""}
238 root 1.17 }
239    
240 root 1.18 =item snd_to_func $node, $func, @args
241 root 1.11
242 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
243 root 1.11 this function with the given arguments on that node.
244    
245 root 1.20 This function can be used to implement C<spawn>-like interfaces.
246 root 1.11
247     =cut
248    
249 root 1.18 sub snd_to_func($$;@) {
250 root 1.21 my $nodeid = shift;
251 root 1.11
252 root 1.41 # on $NODE, we artificially delay... (for spawn)
253     # this is very ugly - maybe we should simply delay ALL messages,
254     # to avoid deep recursion issues. but that's so... slow...
255 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
256     if $nodeid ne $NODE;
257    
258 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
259 root 1.11 }
260    
261 root 1.18 =item snd_on $node, @msg
262    
263     Executes C<snd> with the given C<@msg> (which must include the destination
264     port) on the given node.
265    
266     =cut
267    
268     sub snd_on($@) {
269     my $node = shift;
270     snd $node, snd => @_;
271     }
272    
273 root 1.29 =item eval_on $node, $string[, @reply]
274 root 1.18
275 root 1.29 Evaluates the given string as Perl expression on the given node. When
276     @reply is specified, then it is used to construct a reply message with
277     C<"$@"> and any results from the eval appended.
278 root 1.18
279     =cut
280    
281 root 1.29 sub eval_on($$;@) {
282 root 1.18 my $node = shift;
283     snd $node, eval => @_;
284     }
285    
286 root 1.1 sub kil(@) {
287 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
288 root 1.1
289     length $portid
290 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
291 root 1.1
292 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
293 root 1.1 ->kill ("$portid", @_);
294     }
295    
296     #############################################################################
297 root 1.6 # node monitoring and info
298 root 1.3
299 root 1.21 =item node_is_known $nodeid
300 root 1.13
301 root 1.71 #TODO#
302     Returns true iff the given node is currently known to this node.
303 root 1.13
304     =cut
305    
306     sub node_is_known($) {
307     exists $NODE{$_[0]}
308     }
309    
310 root 1.21 =item node_is_up $nodeid
311 root 1.13
312     Returns true if the given node is "up", that is, the kernel thinks it has
313     a working connection to it.
314    
315 root 1.69 If the node is known (to this local node) but not currently connected,
316     returns C<0>. If the node is not known, returns C<undef>.
317 root 1.13
318     =cut
319    
320     sub node_is_up($) {
321     ($NODE{$_[0]} or return)->{transport}
322     ? 1 : 0
323     }
324    
325 root 1.3 =item up_nodes
326    
327 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
328     the node itself).
329 root 1.3
330     =cut
331    
332 root 1.49 sub up_nodes() {
333 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
334 root 1.3 }
335    
336 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
337 root 1.3
338 root 1.27 Registers a callback that is called each time a node goes up (a connection
339     is established) or down (the connection is lost).
340 root 1.3
341     Node up messages can only be followed by node down messages for the same
342     node, and vice versa.
343    
344 root 1.71 Note that monitoring a node is usually better done by monitoring its node
345 root 1.27 port. This function is mainly of interest to modules that are concerned
346     about the network topology and low-level connection handling.
347    
348     Callbacks I<must not> block and I<should not> send any messages.
349    
350     The function returns an optional guard which can be used to unregister
351 root 1.3 the monitoring callback again.
352    
353 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
354     or go up (and down).
355    
356     newnode $_, 1 for up_nodes;
357     mon_nodes \&newnode;
358    
359 root 1.3 =cut
360    
361     our %MON_NODES;
362    
363     sub mon_nodes($) {
364     my ($cb) = @_;
365    
366     $MON_NODES{$cb+0} = $cb;
367    
368 root 1.90 defined wantarray
369     and Guard::guard { delete $MON_NODES{$cb+0} }
370 root 1.3 }
371    
372     sub _inject_nodeevent($$;@) {
373 root 1.16 my ($node, $up, @reason) = @_;
374 root 1.3
375     for my $cb (values %MON_NODES) {
376 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
377 root 1.16 or $WARN->(1, $@);
378 root 1.3 }
379 root 1.16
380 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
381 root 1.3 }
382    
383     #############################################################################
384 root 1.1 # self node code
385    
386 root 1.67 sub _kill {
387     my $port = shift;
388    
389     delete $PORT{$port}
390     or return; # killing nonexistent ports is O.K.
391     delete $PORT_DATA{$port};
392    
393     my $mon = delete $LMON{$port}
394     or !@_
395     or $WARN->(2, "unmonitored local port $port died with reason: @_");
396    
397     $_->(@_) for values %$mon;
398     }
399    
400     sub _monitor {
401     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
402     unless exists $PORT{$_[1]};
403    
404     $LMON{$_[1]}{$_[2]+0} = $_[2];
405     }
406    
407     sub _unmonitor {
408 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
409     if exists $LMON{$_[1]};
410 root 1.67 }
411    
412 root 1.83 sub _secure_check {
413     $SECURE->($SRCNODE->{id})
414 root 1.88 or $SRCNODE->{id} eq $NODE
415 root 1.83 or die "remote execution attempt by insecure node\n";
416     }
417    
418 root 1.79 our %NODE_REQ = (
419 root 1.1 # internal services
420    
421     # monitoring
422 root 1.65 mon0 => sub { # stop monitoring a port for another node
423 root 1.1 my $portid = shift;
424 root 1.91 _unmonitor undef, $portid, delete $NODE{$SRCNODE->{id}}{rmon}{$portid};
425 root 1.1 },
426 root 1.65 mon1 => sub { # start monitoring a port for another node
427 root 1.1 my $portid = shift;
428 root 1.91 Scalar::Util::weaken (my $node = $NODE{$SRCNODE->{id}});
429 root 1.67 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
430 root 1.58 delete $node->{rmon}{$portid};
431 root 1.65 $node->send (["", kil0 => $portid, @_])
432 root 1.59 if $node && $node->{transport};
433 root 1.67 };
434 root 1.1 },
435 root 1.65 # another node has killed a monitored port
436     kil0 => sub {
437 root 1.91 my $cbs = delete $NODE{$SRCNODE->{id}}{lmon}{+shift}
438 root 1.1 or return;
439    
440     $_->(@_) for @$cbs;
441     },
442    
443 root 1.18 # "public" services - not actually public
444 root 1.1
445 root 1.65 # another node wants to kill a local port
446 root 1.66 kil => \&_kill,
447 root 1.65
448 root 1.88 # is the remote node considered secure?
449     # secure => sub {
450     # #TODO#
451     # },
452    
453 root 1.1 # relay message to another node / generic echo
454 root 1.88 snd => sub {
455     &_secure_check;
456     &snd
457 root 1.1 },
458    
459 root 1.30 # random utilities
460 root 1.1 eval => sub {
461 root 1.83 &_secure_check;
462 root 1.50 my @res = do { package main; eval shift };
463 root 1.1 snd @_, "$@", @res if @_;
464     },
465     time => sub {
466 root 1.88 &_secure_check;
467 root 1.76 snd @_, AE::now;
468 root 1.1 },
469     devnull => sub {
470     #
471     },
472 root 1.15 "" => sub {
473 root 1.27 # empty messages are keepalives or similar devnull-applications
474 root 1.15 },
475 root 1.1 );
476    
477 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
478 root 1.1 $PORT{""} = sub {
479     my $tag = shift;
480 root 1.83 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
481     $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@;
482 root 1.1 };
483    
484 root 1.84 our $NPROTO = 1;
485    
486     # tell everybody who connects our nproto
487     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
488     $_[0]{local_greeting}{nproto} = $NPROTO;
489     };
490    
491 root 1.69 #############################################################################
492 root 1.71 # seed management, try to keep connections to all seeds at all times
493 root 1.69
494 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
495     our %NODE_SEED; # map node ID to seed ID
496 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
497     our $SEED_WATCHER;
498 root 1.71 our $SEED_RETRY;
499 root 1.69
500     sub seed_connect {
501     my ($seed) = @_;
502    
503     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
504     or Carp::croak "$seed: unparsable seed address";
505    
506     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
507    
508 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
509     $host, $port,
510     on_greeted => sub {
511     # called after receiving remote greeting, learn remote node name
512    
513     # we rely on untrusted data here (the remote node name) this is
514     # hopefully ok, as this can at most be used for DOSing, which is easy
515     # when you can do MITM anyway.
516    
517     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
518     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
519 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
520 root 1.71 delete $SEED_NODE{$seed};
521     } else {
522     $SEED_NODE{$seed} = $_[0]{remote_node};
523     $NODE_SEED{$_[0]{remote_node}} = $seed;
524 root 1.93 # also start global service, if not running
525     # we need to check here in addition to the mon_nodes below
526     # because we might only learn late that a node is a seed
527     # and then we might already be connected
528     snd $_[0]{remote_node}, "g_slave"
529     unless $_[0]{remote_greeting}{global};
530 root 1.71 }
531     },
532 root 1.93 sub {
533 root 1.71 delete $SEED_CONNECT{$seed};
534     }
535 root 1.69 ;
536     }
537    
538     sub seed_all {
539 root 1.93 my @seeds = grep
540     !exists $SEED_CONNECT{$_}
541     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
542     keys %SEED_NODE;
543 root 1.69
544     if (@seeds) {
545 root 1.70 # start connection attempt for every seed we are not connected to yet
546 root 1.69 seed_connect $_
547     for @seeds;
548 root 1.71
549     $SEED_RETRY = $SEED_RETRY * 2 + rand;
550     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
551     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
552    
553     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
554    
555 root 1.69 } else {
556 root 1.71 # all seeds connected or connecting, no need to restart timer
557 root 1.69 undef $SEED_WATCHER;
558     }
559     }
560    
561     sub seed_again {
562 root 1.71 $SEED_RETRY = 1;
563     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
564 root 1.69 }
565    
566     # sets new seed list, starts connecting
567     sub set_seeds(@) {
568     %SEED_NODE = ();
569 root 1.71 %NODE_SEED = ();
570     %SEED_CONNECT = ();
571    
572 root 1.69 @SEED_NODE{@_} = ();
573    
574 root 1.71 seed_all;
575     }
576    
577     mon_nodes sub {
578 root 1.93 return unless exists $NODE_SEED{$_[0]};
579    
580     if ($_[1]) {
581     # each time a connection to a seed node goes up, make
582     # sure it runs the global service.
583     snd $_[0], "g_slave"
584     unless $NODE{$_[0]}{transport}{remote_greeting}{global};
585     } else {
586     # if we lost the connection to a seed node, make sure we are seeding
587     seed_again;
588     }
589 root 1.71 };
590    
591     #############################################################################
592     # talk with/to global nodes
593    
594 root 1.72 # protocol messages:
595     #
596 root 1.73 # sent by all slave nodes (slave to master)
597     # g_slave database - make other global node master of the sender
598 root 1.72 #
599 root 1.73 # sent by any node to global nodes
600     # g_set database - set whole database
601 root 1.89 # g_upd family set del - update single family
602 root 1.73 # g_del family key - delete key from database
603     # g_get family key reply... - send reply with data
604     #
605     # send by global nodes
606     # g_global - node became global, similar to global=1 greeting
607     #
608     # database families
609     # "'l" -> node -> listeners
610     # "'g" -> node -> undef
611     # ...
612 root 1.72 #
613    
614 root 1.73 # used on all nodes:
615 root 1.88 our $MASTER; # the global node we bind ourselves to
616 root 1.78 our $MASTER_MON;
617     our %LOCAL_DB; # this node database
618    
619     our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
620 root 1.71
621 root 1.84 our $GPROTO = 1;
622    
623     # tell everybody who connects our nproto
624     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
625     $_[0]{local_greeting}{gproto} = $GPROTO;
626     };
627    
628 root 1.73 #############################################################################
629     # master selection
630    
631     # master requests
632     our %GLOBAL_REQ; # $id => \@req
633 root 1.71
634 root 1.74 sub global_req_add {
635 root 1.80 my ($id, $req) = @_;
636 root 1.71
637 root 1.74 return if exists $GLOBAL_REQ{$id};
638    
639 root 1.80 $GLOBAL_REQ{$id} = $req;
640 root 1.71
641 root 1.80 snd $MASTER, @$req
642 root 1.73 if $MASTER;
643 root 1.74 }
644 root 1.71
645 root 1.74 sub global_req_del {
646     delete $GLOBAL_REQ{$_[0]};
647     }
648    
649 root 1.88 #################################
650     # master rpc
651    
652     our %GLOBAL_RES;
653     our $GLOBAL_RES_ID = "a";
654    
655     sub global_call {
656     my $id = ++$GLOBAL_RES_ID;
657     $GLOBAL_RES{$id} = pop;
658     global_req_add $id, [@_, $id];
659     }
660    
661     $NODE_REQ{g_reply} = sub {
662     my $id = shift;
663     global_req_del $id;
664     my $cb = delete $GLOBAL_RES{$id}
665     or return;
666     &$cb
667     };
668    
669     #################################
670    
671 root 1.74 sub g_find {
672 root 1.80 global_req_add "g_find $_[0]", [g_find => $_[0]];
673 root 1.73 }
674 root 1.71
675 root 1.73 # reply for g_find started in Node.pm
676 root 1.79 $NODE_REQ{g_found} = sub {
677 root 1.74 global_req_del "g_find $_[0]";
678    
679 root 1.73 my $node = $NODE{$_[0]} or return;
680 root 1.71
681 root 1.79 $node->connect_to ($_[1]);
682 root 1.71 };
683    
684 root 1.73 sub master_set {
685     $MASTER = $_[0];
686 root 1.71
687 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
688 root 1.71
689 root 1.73 # (re-)send queued requests
690     snd $MASTER, @$_
691     for values %GLOBAL_REQ;
692     }
693 root 1.71
694 root 1.72 sub master_search {
695 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
696     for (keys %NODE_SEED) {
697 root 1.72 if (node_is_up $_) {
698     master_set $_;
699     return;
700 root 1.71 }
701 root 1.72 }
702 root 1.71
703 root 1.78 $MASTER_MON = mon_nodes sub {
704 root 1.72 return unless $_[1]; # we are only interested in node-ups
705     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
706 root 1.71
707 root 1.72 master_set $_[0];
708 root 1.71
709 root 1.78 $MASTER_MON = mon_nodes sub {
710 root 1.72 if ($_[0] eq $MASTER && !$_[1]) {
711     undef $MASTER;
712     master_search ();
713     }
714 root 1.71 };
715 root 1.72 };
716 root 1.71 }
717    
718 root 1.73 # other node wants to make us the master
719 root 1.79 $NODE_REQ{g_slave} = sub {
720 root 1.73 my ($db) = @_;
721    
722 root 1.80 # load global module and redo the request
723 root 1.73 require AnyEvent::MP::Global;
724 root 1.80 &{ $NODE_REQ{g_slave} }
725 root 1.71 };
726    
727 root 1.73 #############################################################################
728 root 1.79 # local database operations
729 root 1.71
730 root 1.79 # local database management
731 root 1.88
732 root 1.87 sub db_set($$;$) {
733 root 1.89 # if (ref $_[1]) {
734     # # bulk
735     # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
736     # $LOCAL_DB{$_[0]} = $_[1];
737     # snd $MASTER, g_upd => $_[0] => $_[1], \@del
738     # if defined $MASTER;
739     # } else {
740     # single-key
741     $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
742     snd $MASTER, g_upd => $_[0] => { $_[1] => $_[2] }
743     if defined $MASTER;
744     # }
745 root 1.79 }
746    
747 root 1.89 sub db_del($@) {
748     my $family = shift;
749    
750     delete @{ $LOCAL_DB{$family} }{@_};
751     snd $MASTER, g_upd => $family => undef, \@_
752 root 1.79 if defined $MASTER;
753     }
754    
755     sub db_reg($$;$) {
756     my ($family, $key) = @_;
757     &db_set;
758     Guard::guard { db_del $family => $key }
759     }
760 root 1.71
761 root 1.88 # database query
762    
763     sub db_family {
764     my ($family, $cb) = @_;
765     global_call g_db_family => $family, $cb;
766     }
767    
768     sub db_keys {
769     my ($family, $cb) = @_;
770     global_call g_db_keys => $family, $cb;
771     }
772    
773     sub db_values {
774     my ($family, $cb) = @_;
775     global_call g_db_values => $family, $cb;
776 root 1.80 }
777    
778 root 1.88 # database monitoring
779 root 1.80
780 root 1.81 our %LOCAL_MON; # f, reply
781     our %MON_DB; # f, k, value
782 root 1.80
783 root 1.81 sub db_mon($@) {
784 root 1.84 my ($family, $cb) = @_;
785 root 1.81
786 root 1.84 if (my $db = $MON_DB{$family}) {
787 root 1.89 # we already monitor, so create a "dummy" change event
788     # this is postponed, which might be too late (we could process
789     # change events), so disable the callback at first
790     $LOCAL_MON{$family}{$cb+0} = sub { };
791     AE::postpone {
792     return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
793    
794     # set actual callback
795     $LOCAL_MON{$family}{$cb+0} = $cb;
796     $cb->($db, [keys %$db]);
797     };
798 root 1.81 } else {
799     # new monitor, request chg1 from upstream
800 root 1.89 $LOCAL_MON{$family}{$cb+0} = $cb;
801 root 1.81 global_req_add "mon1 $family" => [g_mon1 => $family];
802     $MON_DB{$family} = {};
803     }
804    
805 root 1.90 defined wantarray
806     and Guard::guard {
807     my $mon = $LOCAL_MON{$family};
808     delete $mon->{$cb+0};
809    
810     unless (%$mon) {
811     global_req_del "mon1 $family";
812    
813     # no global_req, because we don't care if we are not connected
814     snd $MASTER, g_mon0 => $family
815     if $MASTER;
816 root 1.80
817 root 1.90 delete $LOCAL_MON{$family};
818     delete $MON_DB{$family};
819     }
820 root 1.80 }
821     }
822    
823 root 1.82 # full update
824 root 1.80 $NODE_REQ{g_chg1} = sub {
825 root 1.84 my ($f, $ndb) = @_;
826    
827     my $db = $MON_DB{$f};
828 root 1.89 my (@a, @c, @d);
829 root 1.81
830 root 1.82 # add or replace keys
831 root 1.84 while (my ($k, $v) = each %$ndb) {
832 root 1.89 exists $db->{$k}
833     ? push @c, $k
834     : push @a, $k;
835 root 1.84 $db->{$k} = $v;
836 root 1.82 }
837 root 1.81
838 root 1.82 # delete keys that are no longer present
839 root 1.84 for (grep !exists $ndb->{$_}, keys %$db) {
840     delete $db->{$_};
841 root 1.89 push @d, $_;
842 root 1.81 }
843 root 1.84
844 root 1.89 $_->($db, \@a, \@c, \@d)
845 root 1.84 for values %{ $LOCAL_MON{$_[0]} };
846 root 1.80 };
847    
848 root 1.82 # incremental update
849 root 1.84 $NODE_REQ{g_chg2} = sub {
850 root 1.89 my ($family, $set, $del) = @_;
851    
852     my $db = $MON_DB{$family};
853 root 1.84
854 root 1.89 my (@a, @c);
855 root 1.84
856 root 1.89 while (my ($k, $v) = each %$set) {
857     exists $db->{$k}
858     ? push @c, $k
859     : push @a, $k;
860     $db->{$k} = $v;
861     }
862    
863     delete @$db{@$del};
864    
865     $_->($db, \@a, \@c, $del)
866     for values %{ $LOCAL_MON{$family} };
867 root 1.84 };
868 root 1.80
869 root 1.69 #############################################################################
870     # configure
871    
872 root 1.81 sub nodename {
873 root 1.69 require POSIX;
874     (POSIX::uname ())[1]
875     }
876    
877     sub _resolve($) {
878     my ($nodeid) = @_;
879    
880     my $cv = AE::cv;
881     my @res;
882    
883     $cv->begin (sub {
884     my %seen;
885     my @refs;
886     for (sort { $a->[0] <=> $b->[0] } @res) {
887     push @refs, $_->[1] unless $seen{$_->[1]}++
888     }
889     shift->send (@refs);
890     });
891    
892     my $idx;
893     for my $t (split /,/, $nodeid) {
894     my $pri = ++$idx;
895    
896 root 1.81 $t = length $t ? nodename . ":$t" : nodename
897 root 1.69 if $t =~ /^\d*$/;
898    
899     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
900     or Carp::croak "$t: unparsable transport descriptor";
901    
902     $port = "0" if $port eq "*";
903    
904     if ($host eq "*") {
905     $cv->begin;
906     # use fork_call, as Net::Interface is big, and we need it rarely.
907     require AnyEvent::Util;
908     AnyEvent::Util::fork_call (
909     sub {
910     my @addr;
911    
912     require Net::Interface;
913    
914     for my $if (Net::Interface->interfaces) {
915     # we statically lower-prioritise ipv6 here, TODO :()
916     for $_ ($if->address (Net::Interface::AF_INET ())) {
917     next if /^\x7f/; # skip localhost etc.
918     push @addr, $_;
919     }
920     for ($if->address (Net::Interface::AF_INET6 ())) {
921     #next if $if->scope ($_) <= 2;
922     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
923     push @addr, $_;
924     }
925    
926     }
927     @addr
928     }, sub {
929     for my $ip (@_) {
930     push @res, [
931     $pri += 1e-5,
932     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
933     ];
934     }
935     $cv->end;
936     }
937     );
938     } else {
939     $cv->begin;
940     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
941     for (@_) {
942     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
943     push @res, [
944     $pri += 1e-5,
945     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
946     ];
947     }
948     $cv->end;
949     };
950     }
951     }
952    
953     $cv->end;
954    
955     $cv
956     }
957    
958     sub configure(@) {
959     unshift @_, "profile" if @_ & 1;
960     my (%kv) = @_;
961    
962     delete $NODE{$NODE}; # we do not support doing stuff before configure
963     _init_names;
964    
965     my $profile = delete $kv{profile};
966    
967 root 1.81 $profile = nodename
968 root 1.69 unless defined $profile;
969    
970     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
971    
972 root 1.83 if (exists $CONFIG->{secure}) {
973     my $pass = !$CONFIG->{secure};
974     $SECURE = sub { $pass };
975     }
976    
977 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
978 root 1.69
979     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
980    
981 root 1.72 $NODE = $node;
982 root 1.77
983 root 1.81 $NODE =~ s/%n/nodename/ge;
984    
985     if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
986 root 1.77 # nodes with randomised node names do not need randomised port names
987     $UNIQ = "";
988     }
989 root 1.69
990     $NODE{$NODE} = $NODE{""};
991     $NODE{$NODE}{id} = $NODE;
992    
993     my $seeds = $CONFIG->{seeds};
994     my $binds = $CONFIG->{binds};
995    
996     $binds ||= ["*"];
997    
998     $WARN->(8, "node $NODE starting up.");
999    
1000 root 1.85 $BINDS = [];
1001     %BINDS = ();
1002 root 1.69
1003     for (map _resolve $_, @$binds) {
1004     for my $bind ($_->recv) {
1005     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1006     or Carp::croak "$bind: unparsable local bind address";
1007    
1008     my $listener = AnyEvent::MP::Transport::mp_server
1009     $host,
1010     $port,
1011     prepare => sub {
1012     my (undef, $host, $port) = @_;
1013     $bind = AnyEvent::Socket::format_hostport $host, $port;
1014     0
1015     },
1016     ;
1017 root 1.85 $BINDS{$bind} = $listener;
1018     push @$BINDS, $bind;
1019 root 1.69 }
1020     }
1021    
1022 root 1.85 db_set "'l" => $NODE => $BINDS;
1023 root 1.73
1024 root 1.85 $WARN->(8, "node listens on [@$BINDS].");
1025 root 1.69
1026     # connect to all seednodes
1027     set_seeds map $_->recv, map _resolve $_, @$seeds;
1028    
1029 root 1.73 master_search;
1030    
1031 root 1.69 for (@{ $CONFIG->{services} }) {
1032     if (ref) {
1033     my ($func, @args) = @$_;
1034     (load_func $func)->(@args);
1035     } elsif (s/::$//) {
1036     eval "require $_";
1037     die $@ if $@;
1038     } else {
1039     (load_func $_)->();
1040     }
1041     }
1042     }
1043    
1044 root 1.1 =back
1045    
1046     =head1 SEE ALSO
1047    
1048     L<AnyEvent::MP>.
1049    
1050     =head1 AUTHOR
1051    
1052     Marc Lehmann <schmorp@schmorp.de>
1053     http://home.schmorp.de/
1054    
1055     =cut
1056    
1057     1
1058