ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.95
Committed: Wed Mar 21 00:14:25 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.94: +3 -13 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29    
30 root 1.79 use AnyEvent ();
31     use Guard ();
32 root 1.1
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.71 our @EXPORT_OK = qw(
39     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40     );
41    
42 root 1.1 our @EXPORT = qw(
43 root 1.21 add_node load_func snd_to_func snd_on eval_on
44 root 1.1
45 root 1.34 NODE $NODE node_of snd kil port_is_local
46     configure
47 root 1.46 up_nodes mon_nodes node_is_up
48 root 1.79 db_set db_del db_reg
49 root 1.84 db_mon db_family db_keys db_values
50 root 1.1 );
51    
52 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
53 root 1.1
54 root 1.27 This value is called with an error or warning message, when e.g. a
55     connection could not be created, authorisation failed and so on.
56    
57     It I<must not> block or send messages -queue it and use an idle watcher if
58     you need to do any of these things.
59 root 1.1
60 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
61 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62     node connectivity and services, C<8> for debugging messages and C<9> for
63     tracing messages.
64    
65 root 1.1 The default simply logs the message to STDERR.
66    
67 root 1.44 =item @AnyEvent::MP::Kernel::WARN
68    
69     All code references in this array are called for every log message, from
70     the default C<$WARN> handler. This is an easy way to tie into the log
71     messages without disturbing others.
72    
73 root 1.1 =cut
74    
75 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
76 root 1.44 our @WARN;
77     our $WARN = sub {
78     &$_ for @WARN;
79 root 1.29
80     return if $WARNLEVEL < $_[0];
81    
82 root 1.16 my ($level, $msg) = @_;
83    
84 root 1.1 $msg =~ s/\n$//;
85 root 1.16
86     printf STDERR "%s <%d> %s\n",
87     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
88     $level,
89     $msg;
90 root 1.1 };
91    
92 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
93    
94     The maximum level at which warning messages will be printed to STDERR by
95     the default warn handler.
96    
97     =cut
98    
99 root 1.6 sub load_func($) {
100     my $func = $_[0];
101    
102     unless (defined &$func) {
103     my $pkg = $func;
104     do {
105     $pkg =~ s/::[^:]+$//
106 root 1.63 or return sub { die "unable to resolve function '$func'" };
107 root 1.60
108     local $@;
109 root 1.61 unless (eval "require $pkg; 1") {
110     my $error = $@;
111     $error =~ /^Can't locate .*.pm in \@INC \(/
112     or return sub { die $error };
113     }
114 root 1.6 } until defined &$func;
115     }
116    
117     \&$func
118     }
119    
120 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
121    
122 root 1.1 sub nonce($) {
123 root 1.78 join "", map chr rand 256, 1 .. $_[0]
124 root 1.1 }
125    
126 root 1.78 sub nonce62($) {
127     join "", map $alnum[rand 62], 1 .. $_[0]
128 root 1.1 }
129    
130     sub gen_uniq {
131 root 1.78 my $now = AE::now;
132     (join "",
133     map $alnum[$_],
134     $$ / 62 % 62,
135     $$ % 62,
136     (int $now ) % 62,
137     (int $now * 100) % 62,
138     (int $now * 10000) % 62,
139     ) . nonce62 4;
140 root 1.1 }
141    
142 root 1.20 our $CONFIG; # this node's configuration
143 root 1.83 our $SECURE = sub { 1 };
144 root 1.21
145 root 1.64 our $RUNIQ; # remote uniq value
146     our $UNIQ; # per-process/node unique cookie
147     our $NODE;
148     our $ID = "a";
149 root 1.1
150     our %NODE; # node id to transport mapping, or "undef", for local node
151     our (%PORT, %PORT_DATA); # local ports
152    
153 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
154 root 1.1 our %LMON; # monitored _local_ ports
155    
156 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
157 root 1.85 our %BINDS;
158     our $BINDS; # our listeners, as arrayref
159 root 1.1
160 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
161 root 1.1
162 root 1.69 sub _init_names {
163 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
164     $UNIQ = gen_uniq;
165    
166     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
167     $RUNIQ = nonce62 10;
168     $RUNIQ =~ s/(.)$/\U$1/;
169    
170     $NODE = "anon/$RUNIQ";
171 root 1.64 }
172    
173 root 1.69 _init_names;
174 root 1.64
175 root 1.1 sub NODE() {
176     $NODE
177     }
178    
179     sub node_of($) {
180 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
181 root 1.1
182 root 1.21 $node
183 root 1.1 }
184    
185 root 1.17 BEGIN {
186     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
187     ? sub () { 1 }
188     : sub () { 0 };
189     }
190 root 1.1
191 root 1.42 our $DELAY_TIMER;
192     our @DELAY_QUEUE;
193    
194     sub _delay_run {
195 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
196 root 1.42 }
197    
198     sub delay($) {
199     push @DELAY_QUEUE, shift;
200     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
201     }
202    
203 root 1.1 sub _inject {
204 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
205 root 1.1 &{ $PORT{+shift} or return };
206     }
207    
208 root 1.20 # this function adds a node-ref, so you can send stuff to it
209     # it is basically the central routing component.
210 root 1.1 sub add_node {
211 root 1.21 my ($node) = @_;
212 root 1.1
213 root 1.94 length $node
214     or Carp::croak "'undef' or the empty string are not valid node/port IDs";
215 root 1.93
216 root 1.71 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
217 root 1.13 }
218    
219 root 1.1 sub snd(@) {
220 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
221 root 1.1
222 root 1.86 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
223 root 1.1
224 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
225 root 1.2 ->{send} (["$portid", @_]);
226 root 1.1 }
227    
228 root 1.17 =item $is_local = port_is_local $port
229    
230     Returns true iff the port is a local port.
231    
232     =cut
233    
234     sub port_is_local($) {
235 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
236 root 1.17
237 root 1.21 $NODE{$nodeid} == $NODE{""}
238 root 1.17 }
239    
240 root 1.18 =item snd_to_func $node, $func, @args
241 root 1.11
242 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
243 root 1.11 this function with the given arguments on that node.
244    
245 root 1.20 This function can be used to implement C<spawn>-like interfaces.
246 root 1.11
247     =cut
248    
249 root 1.18 sub snd_to_func($$;@) {
250 root 1.21 my $nodeid = shift;
251 root 1.11
252 root 1.41 # on $NODE, we artificially delay... (for spawn)
253     # this is very ugly - maybe we should simply delay ALL messages,
254     # to avoid deep recursion issues. but that's so... slow...
255 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
256     if $nodeid ne $NODE;
257    
258 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
259 root 1.11 }
260    
261 root 1.18 =item snd_on $node, @msg
262    
263     Executes C<snd> with the given C<@msg> (which must include the destination
264     port) on the given node.
265    
266     =cut
267    
268     sub snd_on($@) {
269     my $node = shift;
270     snd $node, snd => @_;
271     }
272    
273 root 1.29 =item eval_on $node, $string[, @reply]
274 root 1.18
275 root 1.29 Evaluates the given string as Perl expression on the given node. When
276     @reply is specified, then it is used to construct a reply message with
277     C<"$@"> and any results from the eval appended.
278 root 1.18
279     =cut
280    
281 root 1.29 sub eval_on($$;@) {
282 root 1.18 my $node = shift;
283     snd $node, eval => @_;
284     }
285    
286 root 1.1 sub kil(@) {
287 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
288 root 1.1
289     length $portid
290 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
291 root 1.1
292 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
293 root 1.1 ->kill ("$portid", @_);
294     }
295    
296     #############################################################################
297 root 1.6 # node monitoring and info
298 root 1.3
299 root 1.21 =item node_is_up $nodeid
300 root 1.13
301     Returns true if the given node is "up", that is, the kernel thinks it has
302     a working connection to it.
303    
304 root 1.95 If the node is up, returns C<1>. If the node is currently connecting or
305     otherwise known but not connected, returns C<0>. If nothing is known about
306     the node, returns C<undef>.
307 root 1.13
308     =cut
309    
310     sub node_is_up($) {
311     ($NODE{$_[0]} or return)->{transport}
312     ? 1 : 0
313     }
314    
315 root 1.3 =item up_nodes
316    
317 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
318     the node itself).
319 root 1.3
320     =cut
321    
322 root 1.49 sub up_nodes() {
323 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
324 root 1.3 }
325    
326 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
327 root 1.3
328 root 1.27 Registers a callback that is called each time a node goes up (a connection
329     is established) or down (the connection is lost).
330 root 1.3
331     Node up messages can only be followed by node down messages for the same
332     node, and vice versa.
333    
334 root 1.71 Note that monitoring a node is usually better done by monitoring its node
335 root 1.27 port. This function is mainly of interest to modules that are concerned
336     about the network topology and low-level connection handling.
337    
338     Callbacks I<must not> block and I<should not> send any messages.
339    
340     The function returns an optional guard which can be used to unregister
341 root 1.3 the monitoring callback again.
342    
343 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
344     or go up (and down).
345    
346     newnode $_, 1 for up_nodes;
347     mon_nodes \&newnode;
348    
349 root 1.3 =cut
350    
351     our %MON_NODES;
352    
353     sub mon_nodes($) {
354     my ($cb) = @_;
355    
356     $MON_NODES{$cb+0} = $cb;
357    
358 root 1.90 defined wantarray
359     and Guard::guard { delete $MON_NODES{$cb+0} }
360 root 1.3 }
361    
362     sub _inject_nodeevent($$;@) {
363 root 1.16 my ($node, $up, @reason) = @_;
364 root 1.3
365     for my $cb (values %MON_NODES) {
366 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
367 root 1.16 or $WARN->(1, $@);
368 root 1.3 }
369 root 1.16
370 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
371 root 1.3 }
372    
373     #############################################################################
374 root 1.1 # self node code
375    
376 root 1.67 sub _kill {
377     my $port = shift;
378    
379     delete $PORT{$port}
380     or return; # killing nonexistent ports is O.K.
381     delete $PORT_DATA{$port};
382    
383     my $mon = delete $LMON{$port}
384     or !@_
385     or $WARN->(2, "unmonitored local port $port died with reason: @_");
386    
387     $_->(@_) for values %$mon;
388     }
389    
390     sub _monitor {
391     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
392     unless exists $PORT{$_[1]};
393    
394     $LMON{$_[1]}{$_[2]+0} = $_[2];
395     }
396    
397     sub _unmonitor {
398 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
399     if exists $LMON{$_[1]};
400 root 1.67 }
401    
402 root 1.83 sub _secure_check {
403     $SECURE->($SRCNODE->{id})
404 root 1.88 or $SRCNODE->{id} eq $NODE
405 root 1.83 or die "remote execution attempt by insecure node\n";
406     }
407    
408 root 1.79 our %NODE_REQ = (
409 root 1.1 # internal services
410    
411     # monitoring
412 root 1.65 mon0 => sub { # stop monitoring a port for another node
413 root 1.1 my $portid = shift;
414 root 1.91 _unmonitor undef, $portid, delete $NODE{$SRCNODE->{id}}{rmon}{$portid};
415 root 1.1 },
416 root 1.65 mon1 => sub { # start monitoring a port for another node
417 root 1.1 my $portid = shift;
418 root 1.91 Scalar::Util::weaken (my $node = $NODE{$SRCNODE->{id}});
419 root 1.67 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
420 root 1.58 delete $node->{rmon}{$portid};
421 root 1.65 $node->send (["", kil0 => $portid, @_])
422 root 1.59 if $node && $node->{transport};
423 root 1.67 };
424 root 1.1 },
425 root 1.65 # another node has killed a monitored port
426     kil0 => sub {
427 root 1.91 my $cbs = delete $NODE{$SRCNODE->{id}}{lmon}{+shift}
428 root 1.1 or return;
429    
430     $_->(@_) for @$cbs;
431     },
432    
433 root 1.18 # "public" services - not actually public
434 root 1.1
435 root 1.65 # another node wants to kill a local port
436 root 1.66 kil => \&_kill,
437 root 1.65
438 root 1.88 # is the remote node considered secure?
439     # secure => sub {
440     # #TODO#
441     # },
442    
443 root 1.1 # relay message to another node / generic echo
444 root 1.88 snd => sub {
445     &_secure_check;
446     &snd
447 root 1.1 },
448    
449 root 1.30 # random utilities
450 root 1.1 eval => sub {
451 root 1.83 &_secure_check;
452 root 1.50 my @res = do { package main; eval shift };
453 root 1.1 snd @_, "$@", @res if @_;
454     },
455     time => sub {
456 root 1.88 &_secure_check;
457 root 1.76 snd @_, AE::now;
458 root 1.1 },
459     devnull => sub {
460     #
461     },
462 root 1.15 "" => sub {
463 root 1.27 # empty messages are keepalives or similar devnull-applications
464 root 1.15 },
465 root 1.1 );
466    
467 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
468 root 1.1 $PORT{""} = sub {
469     my $tag = shift;
470 root 1.83 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
471     $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@;
472 root 1.1 };
473    
474 root 1.84 our $NPROTO = 1;
475    
476     # tell everybody who connects our nproto
477     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
478     $_[0]{local_greeting}{nproto} = $NPROTO;
479     };
480    
481 root 1.69 #############################################################################
482 root 1.71 # seed management, try to keep connections to all seeds at all times
483 root 1.69
484 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
485     our %NODE_SEED; # map node ID to seed ID
486 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
487     our $SEED_WATCHER;
488 root 1.71 our $SEED_RETRY;
489 root 1.69
490     sub seed_connect {
491     my ($seed) = @_;
492    
493     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
494     or Carp::croak "$seed: unparsable seed address";
495    
496     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
497    
498 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
499     $host, $port,
500     on_greeted => sub {
501     # called after receiving remote greeting, learn remote node name
502    
503     # we rely on untrusted data here (the remote node name) this is
504     # hopefully ok, as this can at most be used for DOSing, which is easy
505     # when you can do MITM anyway.
506    
507     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
508     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
509 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
510 root 1.71 delete $SEED_NODE{$seed};
511     } else {
512     $SEED_NODE{$seed} = $_[0]{remote_node};
513     $NODE_SEED{$_[0]{remote_node}} = $seed;
514 root 1.93 # also start global service, if not running
515     # we need to check here in addition to the mon_nodes below
516     # because we might only learn late that a node is a seed
517     # and then we might already be connected
518     snd $_[0]{remote_node}, "g_slave"
519     unless $_[0]{remote_greeting}{global};
520 root 1.71 }
521     },
522 root 1.93 sub {
523 root 1.71 delete $SEED_CONNECT{$seed};
524     }
525 root 1.69 ;
526     }
527    
528     sub seed_all {
529 root 1.93 my @seeds = grep
530     !exists $SEED_CONNECT{$_}
531     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
532     keys %SEED_NODE;
533 root 1.69
534     if (@seeds) {
535 root 1.70 # start connection attempt for every seed we are not connected to yet
536 root 1.69 seed_connect $_
537     for @seeds;
538 root 1.71
539     $SEED_RETRY = $SEED_RETRY * 2 + rand;
540     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
541     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
542    
543     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
544    
545 root 1.69 } else {
546 root 1.71 # all seeds connected or connecting, no need to restart timer
547 root 1.69 undef $SEED_WATCHER;
548     }
549     }
550    
551     sub seed_again {
552 root 1.71 $SEED_RETRY = 1;
553     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
554 root 1.69 }
555    
556     # sets new seed list, starts connecting
557     sub set_seeds(@) {
558     %SEED_NODE = ();
559 root 1.71 %NODE_SEED = ();
560     %SEED_CONNECT = ();
561    
562 root 1.69 @SEED_NODE{@_} = ();
563    
564 root 1.71 seed_all;
565     }
566    
567     mon_nodes sub {
568 root 1.93 return unless exists $NODE_SEED{$_[0]};
569    
570     if ($_[1]) {
571     # each time a connection to a seed node goes up, make
572     # sure it runs the global service.
573     snd $_[0], "g_slave"
574     unless $NODE{$_[0]}{transport}{remote_greeting}{global};
575     } else {
576     # if we lost the connection to a seed node, make sure we are seeding
577     seed_again;
578     }
579 root 1.71 };
580    
581     #############################################################################
582     # talk with/to global nodes
583    
584 root 1.72 # protocol messages:
585     #
586 root 1.73 # sent by all slave nodes (slave to master)
587     # g_slave database - make other global node master of the sender
588 root 1.72 #
589 root 1.73 # sent by any node to global nodes
590     # g_set database - set whole database
591 root 1.89 # g_upd family set del - update single family
592 root 1.73 # g_del family key - delete key from database
593     # g_get family key reply... - send reply with data
594     #
595     # send by global nodes
596     # g_global - node became global, similar to global=1 greeting
597     #
598     # database families
599     # "'l" -> node -> listeners
600     # "'g" -> node -> undef
601     # ...
602 root 1.72 #
603    
604 root 1.73 # used on all nodes:
605 root 1.88 our $MASTER; # the global node we bind ourselves to
606 root 1.78 our $MASTER_MON;
607     our %LOCAL_DB; # this node database
608    
609     our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
610 root 1.71
611 root 1.84 our $GPROTO = 1;
612    
613     # tell everybody who connects our nproto
614     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
615     $_[0]{local_greeting}{gproto} = $GPROTO;
616     };
617    
618 root 1.73 #############################################################################
619     # master selection
620    
621     # master requests
622     our %GLOBAL_REQ; # $id => \@req
623 root 1.71
624 root 1.74 sub global_req_add {
625 root 1.80 my ($id, $req) = @_;
626 root 1.71
627 root 1.74 return if exists $GLOBAL_REQ{$id};
628    
629 root 1.80 $GLOBAL_REQ{$id} = $req;
630 root 1.71
631 root 1.80 snd $MASTER, @$req
632 root 1.73 if $MASTER;
633 root 1.74 }
634 root 1.71
635 root 1.74 sub global_req_del {
636     delete $GLOBAL_REQ{$_[0]};
637     }
638    
639 root 1.88 #################################
640     # master rpc
641    
642     our %GLOBAL_RES;
643     our $GLOBAL_RES_ID = "a";
644    
645     sub global_call {
646     my $id = ++$GLOBAL_RES_ID;
647     $GLOBAL_RES{$id} = pop;
648     global_req_add $id, [@_, $id];
649     }
650    
651     $NODE_REQ{g_reply} = sub {
652     my $id = shift;
653     global_req_del $id;
654     my $cb = delete $GLOBAL_RES{$id}
655     or return;
656     &$cb
657     };
658    
659     #################################
660    
661 root 1.74 sub g_find {
662 root 1.80 global_req_add "g_find $_[0]", [g_find => $_[0]];
663 root 1.73 }
664 root 1.71
665 root 1.73 # reply for g_find started in Node.pm
666 root 1.79 $NODE_REQ{g_found} = sub {
667 root 1.74 global_req_del "g_find $_[0]";
668    
669 root 1.73 my $node = $NODE{$_[0]} or return;
670 root 1.71
671 root 1.79 $node->connect_to ($_[1]);
672 root 1.71 };
673    
674 root 1.73 sub master_set {
675     $MASTER = $_[0];
676 root 1.71
677 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
678 root 1.71
679 root 1.73 # (re-)send queued requests
680     snd $MASTER, @$_
681     for values %GLOBAL_REQ;
682     }
683 root 1.71
684 root 1.72 sub master_search {
685 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
686     for (keys %NODE_SEED) {
687 root 1.72 if (node_is_up $_) {
688     master_set $_;
689     return;
690 root 1.71 }
691 root 1.72 }
692 root 1.71
693 root 1.78 $MASTER_MON = mon_nodes sub {
694 root 1.72 return unless $_[1]; # we are only interested in node-ups
695     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
696 root 1.71
697 root 1.72 master_set $_[0];
698 root 1.71
699 root 1.78 $MASTER_MON = mon_nodes sub {
700 root 1.72 if ($_[0] eq $MASTER && !$_[1]) {
701     undef $MASTER;
702     master_search ();
703     }
704 root 1.71 };
705 root 1.72 };
706 root 1.71 }
707    
708 root 1.73 # other node wants to make us the master
709 root 1.79 $NODE_REQ{g_slave} = sub {
710 root 1.73 my ($db) = @_;
711    
712 root 1.80 # load global module and redo the request
713 root 1.73 require AnyEvent::MP::Global;
714 root 1.80 &{ $NODE_REQ{g_slave} }
715 root 1.71 };
716    
717 root 1.73 #############################################################################
718 root 1.79 # local database operations
719 root 1.71
720 root 1.79 # local database management
721 root 1.88
722 root 1.87 sub db_set($$;$) {
723 root 1.89 # if (ref $_[1]) {
724     # # bulk
725     # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
726     # $LOCAL_DB{$_[0]} = $_[1];
727     # snd $MASTER, g_upd => $_[0] => $_[1], \@del
728     # if defined $MASTER;
729     # } else {
730     # single-key
731     $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
732     snd $MASTER, g_upd => $_[0] => { $_[1] => $_[2] }
733     if defined $MASTER;
734     # }
735 root 1.79 }
736    
737 root 1.89 sub db_del($@) {
738     my $family = shift;
739    
740     delete @{ $LOCAL_DB{$family} }{@_};
741     snd $MASTER, g_upd => $family => undef, \@_
742 root 1.79 if defined $MASTER;
743     }
744    
745     sub db_reg($$;$) {
746     my ($family, $key) = @_;
747     &db_set;
748     Guard::guard { db_del $family => $key }
749     }
750 root 1.71
751 root 1.88 # database query
752    
753     sub db_family {
754     my ($family, $cb) = @_;
755     global_call g_db_family => $family, $cb;
756     }
757    
758     sub db_keys {
759     my ($family, $cb) = @_;
760     global_call g_db_keys => $family, $cb;
761     }
762    
763     sub db_values {
764     my ($family, $cb) = @_;
765     global_call g_db_values => $family, $cb;
766 root 1.80 }
767    
768 root 1.88 # database monitoring
769 root 1.80
770 root 1.81 our %LOCAL_MON; # f, reply
771     our %MON_DB; # f, k, value
772 root 1.80
773 root 1.81 sub db_mon($@) {
774 root 1.84 my ($family, $cb) = @_;
775 root 1.81
776 root 1.84 if (my $db = $MON_DB{$family}) {
777 root 1.89 # we already monitor, so create a "dummy" change event
778     # this is postponed, which might be too late (we could process
779     # change events), so disable the callback at first
780     $LOCAL_MON{$family}{$cb+0} = sub { };
781     AE::postpone {
782     return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
783    
784     # set actual callback
785     $LOCAL_MON{$family}{$cb+0} = $cb;
786     $cb->($db, [keys %$db]);
787     };
788 root 1.81 } else {
789     # new monitor, request chg1 from upstream
790 root 1.89 $LOCAL_MON{$family}{$cb+0} = $cb;
791 root 1.81 global_req_add "mon1 $family" => [g_mon1 => $family];
792     $MON_DB{$family} = {};
793     }
794    
795 root 1.90 defined wantarray
796     and Guard::guard {
797     my $mon = $LOCAL_MON{$family};
798     delete $mon->{$cb+0};
799    
800     unless (%$mon) {
801     global_req_del "mon1 $family";
802    
803     # no global_req, because we don't care if we are not connected
804     snd $MASTER, g_mon0 => $family
805     if $MASTER;
806 root 1.80
807 root 1.90 delete $LOCAL_MON{$family};
808     delete $MON_DB{$family};
809     }
810 root 1.80 }
811     }
812    
813 root 1.82 # full update
814 root 1.80 $NODE_REQ{g_chg1} = sub {
815 root 1.84 my ($f, $ndb) = @_;
816    
817     my $db = $MON_DB{$f};
818 root 1.89 my (@a, @c, @d);
819 root 1.81
820 root 1.82 # add or replace keys
821 root 1.84 while (my ($k, $v) = each %$ndb) {
822 root 1.89 exists $db->{$k}
823     ? push @c, $k
824     : push @a, $k;
825 root 1.84 $db->{$k} = $v;
826 root 1.82 }
827 root 1.81
828 root 1.82 # delete keys that are no longer present
829 root 1.84 for (grep !exists $ndb->{$_}, keys %$db) {
830     delete $db->{$_};
831 root 1.89 push @d, $_;
832 root 1.81 }
833 root 1.84
834 root 1.89 $_->($db, \@a, \@c, \@d)
835 root 1.84 for values %{ $LOCAL_MON{$_[0]} };
836 root 1.80 };
837    
838 root 1.82 # incremental update
839 root 1.84 $NODE_REQ{g_chg2} = sub {
840 root 1.89 my ($family, $set, $del) = @_;
841    
842     my $db = $MON_DB{$family};
843 root 1.84
844 root 1.89 my (@a, @c);
845 root 1.84
846 root 1.89 while (my ($k, $v) = each %$set) {
847     exists $db->{$k}
848     ? push @c, $k
849     : push @a, $k;
850     $db->{$k} = $v;
851     }
852    
853     delete @$db{@$del};
854    
855     $_->($db, \@a, \@c, $del)
856     for values %{ $LOCAL_MON{$family} };
857 root 1.84 };
858 root 1.80
859 root 1.69 #############################################################################
860     # configure
861    
862 root 1.81 sub nodename {
863 root 1.69 require POSIX;
864     (POSIX::uname ())[1]
865     }
866    
867     sub _resolve($) {
868     my ($nodeid) = @_;
869    
870     my $cv = AE::cv;
871     my @res;
872    
873     $cv->begin (sub {
874     my %seen;
875     my @refs;
876     for (sort { $a->[0] <=> $b->[0] } @res) {
877     push @refs, $_->[1] unless $seen{$_->[1]}++
878     }
879     shift->send (@refs);
880     });
881    
882     my $idx;
883     for my $t (split /,/, $nodeid) {
884     my $pri = ++$idx;
885    
886 root 1.81 $t = length $t ? nodename . ":$t" : nodename
887 root 1.69 if $t =~ /^\d*$/;
888    
889     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
890     or Carp::croak "$t: unparsable transport descriptor";
891    
892     $port = "0" if $port eq "*";
893    
894     if ($host eq "*") {
895     $cv->begin;
896     # use fork_call, as Net::Interface is big, and we need it rarely.
897     require AnyEvent::Util;
898     AnyEvent::Util::fork_call (
899     sub {
900     my @addr;
901    
902     require Net::Interface;
903    
904     for my $if (Net::Interface->interfaces) {
905     # we statically lower-prioritise ipv6 here, TODO :()
906     for $_ ($if->address (Net::Interface::AF_INET ())) {
907     next if /^\x7f/; # skip localhost etc.
908     push @addr, $_;
909     }
910     for ($if->address (Net::Interface::AF_INET6 ())) {
911     #next if $if->scope ($_) <= 2;
912     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
913     push @addr, $_;
914     }
915    
916     }
917     @addr
918     }, sub {
919     for my $ip (@_) {
920     push @res, [
921     $pri += 1e-5,
922     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
923     ];
924     }
925     $cv->end;
926     }
927     );
928     } else {
929     $cv->begin;
930     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
931     for (@_) {
932     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
933     push @res, [
934     $pri += 1e-5,
935     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
936     ];
937     }
938     $cv->end;
939     };
940     }
941     }
942    
943     $cv->end;
944    
945     $cv
946     }
947    
948     sub configure(@) {
949     unshift @_, "profile" if @_ & 1;
950     my (%kv) = @_;
951    
952     delete $NODE{$NODE}; # we do not support doing stuff before configure
953     _init_names;
954    
955     my $profile = delete $kv{profile};
956    
957 root 1.81 $profile = nodename
958 root 1.69 unless defined $profile;
959    
960     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
961    
962 root 1.83 if (exists $CONFIG->{secure}) {
963     my $pass = !$CONFIG->{secure};
964     $SECURE = sub { $pass };
965     }
966    
967 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
968 root 1.69
969     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
970    
971 root 1.72 $NODE = $node;
972 root 1.77
973 root 1.81 $NODE =~ s/%n/nodename/ge;
974    
975     if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
976 root 1.77 # nodes with randomised node names do not need randomised port names
977     $UNIQ = "";
978     }
979 root 1.69
980     $NODE{$NODE} = $NODE{""};
981     $NODE{$NODE}{id} = $NODE;
982    
983     my $seeds = $CONFIG->{seeds};
984     my $binds = $CONFIG->{binds};
985    
986     $binds ||= ["*"];
987    
988     $WARN->(8, "node $NODE starting up.");
989    
990 root 1.85 $BINDS = [];
991     %BINDS = ();
992 root 1.69
993     for (map _resolve $_, @$binds) {
994     for my $bind ($_->recv) {
995     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
996     or Carp::croak "$bind: unparsable local bind address";
997    
998     my $listener = AnyEvent::MP::Transport::mp_server
999     $host,
1000     $port,
1001     prepare => sub {
1002     my (undef, $host, $port) = @_;
1003     $bind = AnyEvent::Socket::format_hostport $host, $port;
1004     0
1005     },
1006     ;
1007 root 1.85 $BINDS{$bind} = $listener;
1008     push @$BINDS, $bind;
1009 root 1.69 }
1010     }
1011    
1012 root 1.85 db_set "'l" => $NODE => $BINDS;
1013 root 1.73
1014 root 1.85 $WARN->(8, "node listens on [@$BINDS].");
1015 root 1.69
1016     # connect to all seednodes
1017     set_seeds map $_->recv, map _resolve $_, @$seeds;
1018    
1019 root 1.73 master_search;
1020    
1021 root 1.69 for (@{ $CONFIG->{services} }) {
1022     if (ref) {
1023     my ($func, @args) = @$_;
1024     (load_func $func)->(@args);
1025     } elsif (s/::$//) {
1026     eval "require $_";
1027     die $@ if $@;
1028     } else {
1029     (load_func $_)->();
1030     }
1031     }
1032     }
1033    
1034 root 1.1 =back
1035    
1036     =head1 SEE ALSO
1037    
1038     L<AnyEvent::MP>.
1039    
1040     =head1 AUTHOR
1041    
1042     Marc Lehmann <schmorp@schmorp.de>
1043     http://home.schmorp.de/
1044    
1045     =cut
1046    
1047     1
1048