ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP/Kernel.pm
Revision: 1.83
Committed: Sat Mar 3 20:35:10 2012 UTC (12 years, 4 months ago) by root
Branch: MAIN
Changes since 1.82: +14 -4 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29    
30 root 1.79 use AnyEvent ();
31     use Guard ();
32 root 1.1
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.71 our @EXPORT_OK = qw(
39     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40     );
41    
42 root 1.1 our @EXPORT = qw(
43 root 1.21 add_node load_func snd_to_func snd_on eval_on
44 root 1.1
45 root 1.34 NODE $NODE node_of snd kil port_is_local
46     configure
47 root 1.46 up_nodes mon_nodes node_is_up
48 root 1.79 db_set db_del db_reg
49 root 1.1 );
50    
51 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
52 root 1.1
53 root 1.27 This value is called with an error or warning message, when e.g. a
54     connection could not be created, authorisation failed and so on.
55    
56     It I<must not> block or send messages -queue it and use an idle watcher if
57     you need to do any of these things.
58 root 1.1
59 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
60 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
61     node connectivity and services, C<8> for debugging messages and C<9> for
62     tracing messages.
63    
64 root 1.1 The default simply logs the message to STDERR.
65    
66 root 1.44 =item @AnyEvent::MP::Kernel::WARN
67    
68     All code references in this array are called for every log message, from
69     the default C<$WARN> handler. This is an easy way to tie into the log
70     messages without disturbing others.
71    
72 root 1.1 =cut
73    
74 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
75 root 1.44 our @WARN;
76     our $WARN = sub {
77     &$_ for @WARN;
78 root 1.29
79     return if $WARNLEVEL < $_[0];
80    
81 root 1.16 my ($level, $msg) = @_;
82    
83 root 1.1 $msg =~ s/\n$//;
84 root 1.16
85     printf STDERR "%s <%d> %s\n",
86     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
87     $level,
88     $msg;
89 root 1.1 };
90    
91 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
92    
93     The maximum level at which warning messages will be printed to STDERR by
94     the default warn handler.
95    
96     =cut
97    
98 root 1.6 sub load_func($) {
99     my $func = $_[0];
100    
101     unless (defined &$func) {
102     my $pkg = $func;
103     do {
104     $pkg =~ s/::[^:]+$//
105 root 1.63 or return sub { die "unable to resolve function '$func'" };
106 root 1.60
107     local $@;
108 root 1.61 unless (eval "require $pkg; 1") {
109     my $error = $@;
110     $error =~ /^Can't locate .*.pm in \@INC \(/
111     or return sub { die $error };
112     }
113 root 1.6 } until defined &$func;
114     }
115    
116     \&$func
117     }
118    
119 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
120    
121 root 1.1 sub nonce($) {
122 root 1.78 join "", map chr rand 256, 1 .. $_[0]
123 root 1.1 }
124    
125 root 1.78 sub nonce62($) {
126     join "", map $alnum[rand 62], 1 .. $_[0]
127 root 1.1 }
128    
129     sub gen_uniq {
130 root 1.78 my $now = AE::now;
131     (join "",
132     map $alnum[$_],
133     $$ / 62 % 62,
134     $$ % 62,
135     (int $now ) % 62,
136     (int $now * 100) % 62,
137     (int $now * 10000) % 62,
138     ) . nonce62 4;
139 root 1.1 }
140    
141 root 1.20 our $CONFIG; # this node's configuration
142 root 1.83 our $SECURE = sub { 1 };
143 root 1.21
144 root 1.64 our $RUNIQ; # remote uniq value
145     our $UNIQ; # per-process/node unique cookie
146     our $NODE;
147     our $ID = "a";
148 root 1.1
149     our %NODE; # node id to transport mapping, or "undef", for local node
150     our (%PORT, %PORT_DATA); # local ports
151    
152 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
153 root 1.1 our %LMON; # monitored _local_ ports
154    
155 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
156 root 1.1 our %LISTENER;
157 root 1.21 our $LISTENER; # our listeners, as arrayref
158 root 1.1
159 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
160 root 1.1
161 root 1.69 sub _init_names {
162 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
163     $UNIQ = gen_uniq;
164    
165     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
166     $RUNIQ = nonce62 10;
167     $RUNIQ =~ s/(.)$/\U$1/;
168    
169     $NODE = "anon/$RUNIQ";
170 root 1.64 }
171    
172 root 1.69 _init_names;
173 root 1.64
174 root 1.1 sub NODE() {
175     $NODE
176     }
177    
178     sub node_of($) {
179 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
180 root 1.1
181 root 1.21 $node
182 root 1.1 }
183    
184 root 1.17 BEGIN {
185     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
186     ? sub () { 1 }
187     : sub () { 0 };
188     }
189 root 1.1
190 root 1.42 our $DELAY_TIMER;
191     our @DELAY_QUEUE;
192    
193     sub _delay_run {
194 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
195 root 1.42 }
196    
197     sub delay($) {
198     push @DELAY_QUEUE, shift;
199     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
200     }
201    
202 root 1.1 sub _inject {
203 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
204 root 1.1 &{ $PORT{+shift} or return };
205     }
206    
207 root 1.20 # this function adds a node-ref, so you can send stuff to it
208     # it is basically the central routing component.
209 root 1.1 sub add_node {
210 root 1.21 my ($node) = @_;
211 root 1.1
212 root 1.71 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
213 root 1.13 }
214    
215 root 1.1 sub snd(@) {
216 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
217 root 1.1
218 root 1.48 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
219 root 1.1
220 root 1.49 defined $nodeid #d#UGLY
221     or Carp::croak "'undef' is not a valid node ID/port ID";
222    
223 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
224 root 1.2 ->{send} (["$portid", @_]);
225 root 1.1 }
226    
227 root 1.17 =item $is_local = port_is_local $port
228    
229     Returns true iff the port is a local port.
230    
231     =cut
232    
233     sub port_is_local($) {
234 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
235 root 1.17
236 root 1.21 $NODE{$nodeid} == $NODE{""}
237 root 1.17 }
238    
239 root 1.18 =item snd_to_func $node, $func, @args
240 root 1.11
241 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
242 root 1.11 this function with the given arguments on that node.
243    
244 root 1.20 This function can be used to implement C<spawn>-like interfaces.
245 root 1.11
246     =cut
247    
248 root 1.18 sub snd_to_func($$;@) {
249 root 1.21 my $nodeid = shift;
250 root 1.11
251 root 1.41 # on $NODE, we artificially delay... (for spawn)
252     # this is very ugly - maybe we should simply delay ALL messages,
253     # to avoid deep recursion issues. but that's so... slow...
254 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
255     if $nodeid ne $NODE;
256    
257 root 1.49 defined $nodeid #d#UGLY
258     or Carp::croak "'undef' is not a valid node ID/port ID";
259    
260 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
261 root 1.11 }
262    
263 root 1.18 =item snd_on $node, @msg
264    
265     Executes C<snd> with the given C<@msg> (which must include the destination
266     port) on the given node.
267    
268     =cut
269    
270     sub snd_on($@) {
271     my $node = shift;
272     snd $node, snd => @_;
273     }
274    
275 root 1.29 =item eval_on $node, $string[, @reply]
276 root 1.18
277 root 1.29 Evaluates the given string as Perl expression on the given node. When
278     @reply is specified, then it is used to construct a reply message with
279     C<"$@"> and any results from the eval appended.
280 root 1.18
281     =cut
282    
283 root 1.29 sub eval_on($$;@) {
284 root 1.18 my $node = shift;
285     snd $node, eval => @_;
286     }
287    
288 root 1.1 sub kil(@) {
289 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
290 root 1.1
291     length $portid
292 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
293 root 1.1
294 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
295 root 1.1 ->kill ("$portid", @_);
296     }
297    
298     #############################################################################
299 root 1.6 # node monitoring and info
300 root 1.3
301 root 1.21 =item node_is_known $nodeid
302 root 1.13
303 root 1.71 #TODO#
304     Returns true iff the given node is currently known to this node.
305 root 1.13
306     =cut
307    
308     sub node_is_known($) {
309     exists $NODE{$_[0]}
310     }
311    
312 root 1.21 =item node_is_up $nodeid
313 root 1.13
314     Returns true if the given node is "up", that is, the kernel thinks it has
315     a working connection to it.
316    
317 root 1.69 If the node is known (to this local node) but not currently connected,
318     returns C<0>. If the node is not known, returns C<undef>.
319 root 1.13
320     =cut
321    
322     sub node_is_up($) {
323     ($NODE{$_[0]} or return)->{transport}
324     ? 1 : 0
325     }
326    
327 root 1.3 =item up_nodes
328    
329 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
330     the node itself).
331 root 1.3
332     =cut
333    
334 root 1.49 sub up_nodes() {
335 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
336 root 1.3 }
337    
338 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
339 root 1.3
340 root 1.27 Registers a callback that is called each time a node goes up (a connection
341     is established) or down (the connection is lost).
342 root 1.3
343     Node up messages can only be followed by node down messages for the same
344     node, and vice versa.
345    
346 root 1.71 Note that monitoring a node is usually better done by monitoring its node
347 root 1.27 port. This function is mainly of interest to modules that are concerned
348     about the network topology and low-level connection handling.
349    
350     Callbacks I<must not> block and I<should not> send any messages.
351    
352     The function returns an optional guard which can be used to unregister
353 root 1.3 the monitoring callback again.
354    
355 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
356     or go up (and down).
357    
358     newnode $_, 1 for up_nodes;
359     mon_nodes \&newnode;
360    
361 root 1.3 =cut
362    
363     our %MON_NODES;
364    
365     sub mon_nodes($) {
366     my ($cb) = @_;
367    
368     $MON_NODES{$cb+0} = $cb;
369    
370 root 1.79 defined wantarray && Guard::guard { delete $MON_NODES{$cb+0} }
371 root 1.3 }
372    
373     sub _inject_nodeevent($$;@) {
374 root 1.16 my ($node, $up, @reason) = @_;
375 root 1.3
376     for my $cb (values %MON_NODES) {
377 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
378 root 1.16 or $WARN->(1, $@);
379 root 1.3 }
380 root 1.16
381 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
382 root 1.3 }
383    
384     #############################################################################
385 root 1.1 # self node code
386    
387 root 1.67 sub _kill {
388     my $port = shift;
389    
390     delete $PORT{$port}
391     or return; # killing nonexistent ports is O.K.
392     delete $PORT_DATA{$port};
393    
394     my $mon = delete $LMON{$port}
395     or !@_
396     or $WARN->(2, "unmonitored local port $port died with reason: @_");
397    
398     $_->(@_) for values %$mon;
399     }
400    
401     sub _monitor {
402     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
403     unless exists $PORT{$_[1]};
404    
405     $LMON{$_[1]}{$_[2]+0} = $_[2];
406     }
407    
408     sub _unmonitor {
409 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
410     if exists $LMON{$_[1]};
411 root 1.67 }
412    
413 root 1.83 sub _secure_check {
414     $SECURE->($SRCNODE->{id})
415     or die "remote execution attempt by insecure node\n";
416     }
417    
418 root 1.79 our %NODE_REQ = (
419 root 1.1 # internal services
420    
421     # monitoring
422 root 1.65 mon0 => sub { # stop monitoring a port for another node
423 root 1.1 my $portid = shift;
424 root 1.67 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
425 root 1.1 },
426 root 1.65 mon1 => sub { # start monitoring a port for another node
427 root 1.1 my $portid = shift;
428 root 1.67 Scalar::Util::weaken (my $node = $SRCNODE);
429     _monitor undef, $portid, $node->{rmon}{$portid} = sub {
430 root 1.58 delete $node->{rmon}{$portid};
431 root 1.65 $node->send (["", kil0 => $portid, @_])
432 root 1.59 if $node && $node->{transport};
433 root 1.67 };
434 root 1.1 },
435 root 1.65 # another node has killed a monitored port
436     kil0 => sub {
437 root 1.1 my $cbs = delete $SRCNODE->{lmon}{+shift}
438     or return;
439    
440     $_->(@_) for @$cbs;
441     },
442    
443 root 1.18 # "public" services - not actually public
444 root 1.1
445 root 1.65 # another node wants to kill a local port
446 root 1.66 kil => \&_kill,
447 root 1.65
448 root 1.1 # relay message to another node / generic echo
449 root 1.15 snd => \&snd,
450 root 1.27 snd_multiple => sub {
451 root 1.1 snd @$_ for @_
452     },
453    
454 root 1.30 # random utilities
455 root 1.1 eval => sub {
456 root 1.83 &_secure_check;
457 root 1.50 my @res = do { package main; eval shift };
458 root 1.1 snd @_, "$@", @res if @_;
459     },
460     time => sub {
461 root 1.76 snd @_, AE::now;
462 root 1.1 },
463     devnull => sub {
464     #
465     },
466 root 1.15 "" => sub {
467 root 1.27 # empty messages are keepalives or similar devnull-applications
468 root 1.15 },
469 root 1.1 );
470    
471 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
472 root 1.1 $PORT{""} = sub {
473     my $tag = shift;
474 root 1.83 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
475     $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@;
476 root 1.1 };
477    
478 root 1.69 #############################################################################
479 root 1.71 # seed management, try to keep connections to all seeds at all times
480 root 1.69
481 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
482     our %NODE_SEED; # map node ID to seed ID
483 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
484     our $SEED_WATCHER;
485 root 1.71 our $SEED_RETRY;
486 root 1.69
487     sub seed_connect {
488     my ($seed) = @_;
489    
490     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
491     or Carp::croak "$seed: unparsable seed address";
492    
493     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
494    
495 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
496     $host, $port,
497     on_greeted => sub {
498     # called after receiving remote greeting, learn remote node name
499    
500     # we rely on untrusted data here (the remote node name) this is
501     # hopefully ok, as this can at most be used for DOSing, which is easy
502     # when you can do MITM anyway.
503    
504     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
505     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
506 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
507 root 1.71 delete $SEED_NODE{$seed};
508     delete $NODE_SEED{$seed};
509     } else {
510     $SEED_NODE{$seed} = $_[0]{remote_node};
511     $NODE_SEED{$_[0]{remote_node}} = $seed;
512     }
513     },
514     on_destroy => sub {
515     delete $SEED_CONNECT{$seed};
516     },
517 root 1.69 sub {
518     $SEED_CONNECT{$seed} = 1;
519 root 1.71 }
520 root 1.69 ;
521     }
522    
523     sub seed_all {
524     my @seeds = grep {
525     !exists $SEED_CONNECT{$_}
526     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
527     } keys %SEED_NODE;
528    
529     if (@seeds) {
530 root 1.70 # start connection attempt for every seed we are not connected to yet
531 root 1.69 seed_connect $_
532     for @seeds;
533 root 1.71
534     $SEED_RETRY = $SEED_RETRY * 2 + rand;
535     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
536     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
537    
538     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
539    
540 root 1.69 } else {
541 root 1.71 # all seeds connected or connecting, no need to restart timer
542 root 1.69 undef $SEED_WATCHER;
543     }
544     }
545    
546     sub seed_again {
547 root 1.71 $SEED_RETRY = 1;
548     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
549 root 1.69 }
550    
551     # sets new seed list, starts connecting
552     sub set_seeds(@) {
553     %SEED_NODE = ();
554 root 1.71 %NODE_SEED = ();
555     %SEED_CONNECT = ();
556    
557 root 1.69 @SEED_NODE{@_} = ();
558    
559 root 1.71 seed_again;#d#
560     seed_all;
561     }
562    
563     mon_nodes sub {
564     # if we lost the connection to a seed node, make sure we are seeding
565     seed_again
566     if !$_[1] && exists $NODE_SEED{$_[0]};
567     };
568    
569     #############################################################################
570     # talk with/to global nodes
571    
572 root 1.72 # protocol messages:
573     #
574 root 1.73 # sent by all slave nodes (slave to master)
575     # g_slave database - make other global node master of the sender
576 root 1.72 #
577 root 1.73 # sent by any node to global nodes
578     # g_set database - set whole database
579     # g_add family key val - add/replace key to database
580     # g_del family key - delete key from database
581     # g_get family key reply... - send reply with data
582     #
583     # send by global nodes
584     # g_global - node became global, similar to global=1 greeting
585     #
586     # database families
587     # "'l" -> node -> listeners
588     # "'g" -> node -> undef
589     # ...
590 root 1.72 #
591    
592 root 1.73 # used on all nodes:
593 root 1.78 our $MASTER; # the global node we bind ourselves to, unless we are global ourselves
594     our $MASTER_MON;
595     our %LOCAL_DB; # this node database
596    
597     our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
598 root 1.71
599 root 1.73 #############################################################################
600     # master selection
601    
602     # master requests
603     our %GLOBAL_REQ; # $id => \@req
604 root 1.71
605 root 1.74 sub global_req_add {
606 root 1.80 my ($id, $req) = @_;
607 root 1.71
608 root 1.74 return if exists $GLOBAL_REQ{$id};
609    
610 root 1.80 $GLOBAL_REQ{$id} = $req;
611 root 1.71
612 root 1.80 snd $MASTER, @$req
613 root 1.73 if $MASTER;
614 root 1.74 }
615 root 1.71
616 root 1.74 sub global_req_del {
617     delete $GLOBAL_REQ{$_[0]};
618     }
619    
620     sub g_find {
621 root 1.80 global_req_add "g_find $_[0]", [g_find => $_[0]];
622 root 1.73 }
623 root 1.71
624 root 1.73 # reply for g_find started in Node.pm
625 root 1.79 $NODE_REQ{g_found} = sub {
626 root 1.74 global_req_del "g_find $_[0]";
627    
628 root 1.73 my $node = $NODE{$_[0]} or return;
629 root 1.71
630 root 1.79 $node->connect_to ($_[1]);
631 root 1.71 };
632    
633 root 1.73 sub master_set {
634     $MASTER = $_[0];
635 root 1.71
636 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
637 root 1.71
638 root 1.73 # (re-)send queued requests
639     snd $MASTER, @$_
640     for values %GLOBAL_REQ;
641     }
642 root 1.71
643 root 1.72 sub master_search {
644 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
645     for (keys %NODE_SEED) {
646 root 1.72 if (node_is_up $_) {
647     master_set $_;
648     return;
649 root 1.71 }
650 root 1.72 }
651 root 1.71
652 root 1.78 $MASTER_MON = mon_nodes sub {
653 root 1.72 return unless $_[1]; # we are only interested in node-ups
654     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
655 root 1.71
656 root 1.72 master_set $_[0];
657 root 1.71
658 root 1.78 $MASTER_MON = mon_nodes sub {
659 root 1.72 if ($_[0] eq $MASTER && !$_[1]) {
660     undef $MASTER;
661     master_search ();
662     }
663 root 1.71 };
664 root 1.72 };
665 root 1.71 }
666    
667 root 1.73 # other node wants to make us the master
668 root 1.79 $NODE_REQ{g_slave} = sub {
669 root 1.73 my ($db) = @_;
670    
671 root 1.80 # load global module and redo the request
672 root 1.73 require AnyEvent::MP::Global;
673 root 1.80 &{ $NODE_REQ{g_slave} }
674 root 1.71 };
675    
676 root 1.73 #############################################################################
677 root 1.79 # local database operations
678 root 1.71
679 root 1.79 # local database management
680     sub db_set($$$) {
681     $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
682     snd $MASTER, g_add => $_[0] => $_[1] => $_[2]
683     if defined $MASTER;
684     }
685    
686     sub db_del($$) {
687     delete $LOCAL_DB{$_[0]}{$_[1]};
688     snd $MASTER, g_del => $_[0] => $_[1]
689     if defined $MASTER;
690     }
691    
692     sub db_reg($$;$) {
693     my ($family, $key) = @_;
694     &db_set;
695     Guard::guard { db_del $family => $key }
696     }
697 root 1.71
698 root 1.80 sub db_keys($$$) {
699     #d#
700     }
701    
702     #d# db_values
703     #d# db_family
704     #d# db_key
705    
706 root 1.81 our %LOCAL_MON; # f, reply
707     our %MON_DB; # f, k, value
708 root 1.80
709 root 1.81 sub g_chg {
710     my $f = shift;
711 root 1.80
712 root 1.81 $#_ ? $MON_DB{$f}{$_[0]} = $_[1]
713     : delete $MON_DB{$f}{$_[0]};
714 root 1.80
715 root 1.81 &{ $_->[0] }
716     for values %{ $LOCAL_MON{$f} };
717     }
718    
719     sub db_mon($@) {
720     my ($family, @reply) = @_;
721 root 1.80
722 root 1.81 my $reply = \@reply;
723     my $id = $reply + 0;
724    
725     if (%{ $LOCAL_MON{$family} }) {
726     # if we already monitor this thingy, generate
727     # create events for all of them
728     while (my ($key, $value) = each %{ $MON_DB{$family} }) {
729     $reply->[0]->($key, $value);
730     }
731     } else {
732     # new monitor, request chg1 from upstream
733     global_req_add "mon1 $family" => [g_mon1 => $family];
734     $MON_DB{$family} = {};
735     }
736    
737     $LOCAL_MON{$family}{$id} = \@reply;
738 root 1.80
739     Guard::guard {
740 root 1.81 my $mon = $LOCAL_MON{$family};
741     delete $mon->{$id};
742    
743     unless (%$mon) {
744     global_req_del "mon1 $family";
745 root 1.80
746     # no global_req, because we don't care if we are not connected
747 root 1.81 snd $MASTER, g_mon0 => $family
748 root 1.80 if $MASTER;
749    
750 root 1.81 delete $LOCAL_MON{$family};
751     delete $MON_DB{$family};
752 root 1.80 }
753     }
754     }
755    
756 root 1.82 # full update
757 root 1.80 $NODE_REQ{g_chg1} = sub {
758 root 1.81 my ($f, $db) = @_;
759    
760 root 1.82 # add or replace keys
761     while (my ($k, $v) = each %$db) {
762     g_chg $f, $k, $v;
763     }
764 root 1.81
765 root 1.82 # delete keys that are no longer present
766     for (keys %{ $MON_DB{$f} }) {
767 root 1.81 g_chg $f, $_
768     unless exists $db->{$_};
769     }
770 root 1.80 };
771    
772 root 1.82 # incremental update
773 root 1.81 $NODE_REQ{g_chg2} = \&g_chg;
774 root 1.80
775 root 1.69 #############################################################################
776     # configure
777    
778 root 1.81 sub nodename {
779 root 1.69 require POSIX;
780     (POSIX::uname ())[1]
781     }
782    
783     sub _resolve($) {
784     my ($nodeid) = @_;
785    
786     my $cv = AE::cv;
787     my @res;
788    
789     $cv->begin (sub {
790     my %seen;
791     my @refs;
792     for (sort { $a->[0] <=> $b->[0] } @res) {
793     push @refs, $_->[1] unless $seen{$_->[1]}++
794     }
795     shift->send (@refs);
796     });
797    
798     my $idx;
799     for my $t (split /,/, $nodeid) {
800     my $pri = ++$idx;
801    
802 root 1.81 $t = length $t ? nodename . ":$t" : nodename
803 root 1.69 if $t =~ /^\d*$/;
804    
805     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
806     or Carp::croak "$t: unparsable transport descriptor";
807    
808     $port = "0" if $port eq "*";
809    
810     if ($host eq "*") {
811     $cv->begin;
812     # use fork_call, as Net::Interface is big, and we need it rarely.
813     require AnyEvent::Util;
814     AnyEvent::Util::fork_call (
815     sub {
816     my @addr;
817    
818     require Net::Interface;
819    
820     for my $if (Net::Interface->interfaces) {
821     # we statically lower-prioritise ipv6 here, TODO :()
822     for $_ ($if->address (Net::Interface::AF_INET ())) {
823     next if /^\x7f/; # skip localhost etc.
824     push @addr, $_;
825     }
826     for ($if->address (Net::Interface::AF_INET6 ())) {
827     #next if $if->scope ($_) <= 2;
828     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
829     push @addr, $_;
830     }
831    
832     }
833     @addr
834     }, sub {
835     for my $ip (@_) {
836     push @res, [
837     $pri += 1e-5,
838     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
839     ];
840     }
841     $cv->end;
842     }
843     );
844     } else {
845     $cv->begin;
846     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
847     for (@_) {
848     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
849     push @res, [
850     $pri += 1e-5,
851     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
852     ];
853     }
854     $cv->end;
855     };
856     }
857     }
858    
859     $cv->end;
860    
861     $cv
862     }
863    
864     sub configure(@) {
865     unshift @_, "profile" if @_ & 1;
866     my (%kv) = @_;
867    
868     delete $NODE{$NODE}; # we do not support doing stuff before configure
869     _init_names;
870    
871     my $profile = delete $kv{profile};
872    
873 root 1.81 $profile = nodename
874 root 1.69 unless defined $profile;
875    
876     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
877    
878 root 1.83 if (exists $CONFIG->{secure}) {
879     my $pass = !$CONFIG->{secure};
880     $SECURE = sub { $pass };
881     }
882    
883 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
884 root 1.69
885     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
886    
887 root 1.72 $NODE = $node;
888 root 1.77
889 root 1.81 $NODE =~ s/%n/nodename/ge;
890    
891     if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
892 root 1.77 # nodes with randomised node names do not need randomised port names
893     $UNIQ = "";
894     }
895 root 1.69
896     $NODE{$NODE} = $NODE{""};
897     $NODE{$NODE}{id} = $NODE;
898    
899     my $seeds = $CONFIG->{seeds};
900     my $binds = $CONFIG->{binds};
901    
902     $binds ||= ["*"];
903    
904     $WARN->(8, "node $NODE starting up.");
905    
906     $LISTENER = [];
907     %LISTENER = ();
908    
909     for (map _resolve $_, @$binds) {
910     for my $bind ($_->recv) {
911     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
912     or Carp::croak "$bind: unparsable local bind address";
913    
914     my $listener = AnyEvent::MP::Transport::mp_server
915     $host,
916     $port,
917     prepare => sub {
918     my (undef, $host, $port) = @_;
919     $bind = AnyEvent::Socket::format_hostport $host, $port;
920     0
921     },
922     ;
923     $LISTENER{$bind} = $listener;
924     push @$LISTENER, $bind;
925     }
926     }
927    
928 root 1.79 db_set "'l" => $NODE => $LISTENER;
929 root 1.73
930 root 1.69 $WARN->(8, "node listens on [@$LISTENER].");
931    
932     # connect to all seednodes
933     set_seeds map $_->recv, map _resolve $_, @$seeds;
934    
935 root 1.73 master_search;
936    
937 root 1.71 if ($NODE eq "atha") {;#d#
938 root 1.72 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
939 root 1.71 }
940    
941 root 1.69 for (@{ $CONFIG->{services} }) {
942     if (ref) {
943     my ($func, @args) = @$_;
944     (load_func $func)->(@args);
945     } elsif (s/::$//) {
946     eval "require $_";
947     die $@ if $@;
948     } else {
949     (load_func $_)->();
950     }
951     }
952     }
953    
954 root 1.1 =back
955    
956     =head1 SEE ALSO
957    
958     L<AnyEvent::MP>.
959    
960     =head1 AUTHOR
961    
962     Marc Lehmann <schmorp@schmorp.de>
963     http://home.schmorp.de/
964    
965     =cut
966    
967     1
968