ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.103
Committed: Fri Mar 23 03:24:41 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.102: +61 -29 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27     use Carp ();
28    
29 root 1.79 use AnyEvent ();
30     use Guard ();
31 root 1.1
32     use AnyEvent::MP::Node;
33     use AnyEvent::MP::Transport;
34    
35     use base "Exporter";
36    
37 root 1.71 our @EXPORT_OK = qw(
38     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
39     );
40    
41 root 1.1 our @EXPORT = qw(
42 root 1.21 add_node load_func snd_to_func snd_on eval_on
43 root 1.1
44 root 1.34 NODE $NODE node_of snd kil port_is_local
45     configure
46 root 1.46 up_nodes mon_nodes node_is_up
47 root 1.97 db_set db_del
48 root 1.84 db_mon db_family db_keys db_values
49 root 1.1 );
50    
51 root 1.6 sub load_func($) {
52     my $func = $_[0];
53    
54     unless (defined &$func) {
55     my $pkg = $func;
56     do {
57     $pkg =~ s/::[^:]+$//
58 root 1.63 or return sub { die "unable to resolve function '$func'" };
59 root 1.60
60     local $@;
61 root 1.61 unless (eval "require $pkg; 1") {
62     my $error = $@;
63     $error =~ /^Can't locate .*.pm in \@INC \(/
64     or return sub { die $error };
65     }
66 root 1.6 } until defined &$func;
67     }
68    
69     \&$func
70     }
71    
72 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
73    
74 root 1.1 sub nonce($) {
75 root 1.78 join "", map chr rand 256, 1 .. $_[0]
76 root 1.1 }
77    
78 root 1.78 sub nonce62($) {
79     join "", map $alnum[rand 62], 1 .. $_[0]
80 root 1.1 }
81    
82     sub gen_uniq {
83 root 1.78 my $now = AE::now;
84     (join "",
85     map $alnum[$_],
86     $$ / 62 % 62,
87     $$ % 62,
88     (int $now ) % 62,
89     (int $now * 100) % 62,
90     (int $now * 10000) % 62,
91     ) . nonce62 4;
92 root 1.1 }
93    
94 root 1.20 our $CONFIG; # this node's configuration
95 root 1.83 our $SECURE = sub { 1 };
96 root 1.21
97 root 1.64 our $RUNIQ; # remote uniq value
98     our $UNIQ; # per-process/node unique cookie
99     our $NODE;
100     our $ID = "a";
101 root 1.1
102     our %NODE; # node id to transport mapping, or "undef", for local node
103     our (%PORT, %PORT_DATA); # local ports
104    
105 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
106 root 1.1 our %LMON; # monitored _local_ ports
107    
108 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
109 root 1.85 our %BINDS;
110     our $BINDS; # our listeners, as arrayref
111 root 1.1
112 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
113 root 1.1
114 root 1.69 sub _init_names {
115 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
116     $UNIQ = gen_uniq;
117    
118     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
119     $RUNIQ = nonce62 10;
120     $RUNIQ =~ s/(.)$/\U$1/;
121    
122     $NODE = "anon/$RUNIQ";
123 root 1.64 }
124    
125 root 1.69 _init_names;
126 root 1.64
127 root 1.1 sub NODE() {
128     $NODE
129     }
130    
131     sub node_of($) {
132 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
133 root 1.1
134 root 1.21 $node
135 root 1.1 }
136    
137 root 1.17 BEGIN {
138     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
139     ? sub () { 1 }
140     : sub () { 0 };
141     }
142 root 1.1
143 root 1.42 our $DELAY_TIMER;
144     our @DELAY_QUEUE;
145    
146 root 1.97 our $delay_run = sub {
147 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
148 root 1.97 };
149 root 1.42
150     sub delay($) {
151     push @DELAY_QUEUE, shift;
152 root 1.97 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
153 root 1.42 }
154    
155 root 1.96 =item $AnyEvent::MP::Kernel::SRCNODE
156    
157     During execution of a message callback, this variable contains the node ID
158     of the origin node.
159    
160     The main use of this variable is for debugging output - there are probably
161     very few other cases where you need to know the source node ID.
162    
163     =cut
164    
165 root 1.1 sub _inject {
166 root 1.96 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
167 root 1.1 &{ $PORT{+shift} or return };
168     }
169    
170 root 1.20 # this function adds a node-ref, so you can send stuff to it
171     # it is basically the central routing component.
172 root 1.1 sub add_node {
173 root 1.21 my ($node) = @_;
174 root 1.1
175 root 1.94 length $node
176     or Carp::croak "'undef' or the empty string are not valid node/port IDs";
177 root 1.93
178 root 1.71 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
179 root 1.13 }
180    
181 root 1.1 sub snd(@) {
182 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
183 root 1.1
184 root 1.86 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
185 root 1.1
186 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
187 root 1.2 ->{send} (["$portid", @_]);
188 root 1.1 }
189    
190 root 1.17 =item $is_local = port_is_local $port
191    
192     Returns true iff the port is a local port.
193    
194     =cut
195    
196     sub port_is_local($) {
197 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
198 root 1.17
199 root 1.21 $NODE{$nodeid} == $NODE{""}
200 root 1.17 }
201    
202 root 1.18 =item snd_to_func $node, $func, @args
203 root 1.11
204 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
205 root 1.11 this function with the given arguments on that node.
206    
207 root 1.20 This function can be used to implement C<spawn>-like interfaces.
208 root 1.11
209     =cut
210    
211 root 1.18 sub snd_to_func($$;@) {
212 root 1.21 my $nodeid = shift;
213 root 1.11
214 root 1.41 # on $NODE, we artificially delay... (for spawn)
215     # this is very ugly - maybe we should simply delay ALL messages,
216     # to avoid deep recursion issues. but that's so... slow...
217 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
218     if $nodeid ne $NODE;
219    
220 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
221 root 1.11 }
222    
223 root 1.18 =item snd_on $node, @msg
224    
225     Executes C<snd> with the given C<@msg> (which must include the destination
226     port) on the given node.
227    
228     =cut
229    
230     sub snd_on($@) {
231     my $node = shift;
232     snd $node, snd => @_;
233     }
234    
235 root 1.29 =item eval_on $node, $string[, @reply]
236 root 1.18
237 root 1.29 Evaluates the given string as Perl expression on the given node. When
238     @reply is specified, then it is used to construct a reply message with
239     C<"$@"> and any results from the eval appended.
240 root 1.18
241     =cut
242    
243 root 1.29 sub eval_on($$;@) {
244 root 1.18 my $node = shift;
245     snd $node, eval => @_;
246     }
247    
248 root 1.1 sub kil(@) {
249 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
250 root 1.1
251     length $portid
252 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
253 root 1.1
254 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
255 root 1.1 ->kill ("$portid", @_);
256     }
257    
258     #############################################################################
259 root 1.6 # node monitoring and info
260 root 1.3
261 root 1.21 =item node_is_up $nodeid
262 root 1.13
263     Returns true if the given node is "up", that is, the kernel thinks it has
264     a working connection to it.
265    
266 root 1.95 If the node is up, returns C<1>. If the node is currently connecting or
267     otherwise known but not connected, returns C<0>. If nothing is known about
268     the node, returns C<undef>.
269 root 1.13
270     =cut
271    
272     sub node_is_up($) {
273     ($NODE{$_[0]} or return)->{transport}
274     ? 1 : 0
275     }
276    
277 root 1.3 =item up_nodes
278    
279 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
280     the node itself).
281 root 1.3
282     =cut
283    
284 root 1.49 sub up_nodes() {
285 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
286 root 1.3 }
287    
288 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
289 root 1.3
290 root 1.27 Registers a callback that is called each time a node goes up (a connection
291     is established) or down (the connection is lost).
292 root 1.3
293     Node up messages can only be followed by node down messages for the same
294     node, and vice versa.
295    
296 root 1.71 Note that monitoring a node is usually better done by monitoring its node
297 root 1.27 port. This function is mainly of interest to modules that are concerned
298     about the network topology and low-level connection handling.
299    
300     Callbacks I<must not> block and I<should not> send any messages.
301    
302     The function returns an optional guard which can be used to unregister
303 root 1.3 the monitoring callback again.
304    
305 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
306     or go up (and down).
307    
308     newnode $_, 1 for up_nodes;
309     mon_nodes \&newnode;
310    
311 root 1.3 =cut
312    
313     our %MON_NODES;
314    
315     sub mon_nodes($) {
316     my ($cb) = @_;
317    
318     $MON_NODES{$cb+0} = $cb;
319    
320 root 1.90 defined wantarray
321     and Guard::guard { delete $MON_NODES{$cb+0} }
322 root 1.3 }
323    
324     sub _inject_nodeevent($$;@) {
325 root 1.16 my ($node, $up, @reason) = @_;
326 root 1.3
327     for my $cb (values %MON_NODES) {
328 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
329 root 1.98 or AE::log die => $@;
330 root 1.3 }
331 root 1.16
332 root 1.102 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
333 root 1.3 }
334    
335     #############################################################################
336 root 1.1 # self node code
337    
338 root 1.67 sub _kill {
339     my $port = shift;
340    
341     delete $PORT{$port}
342     or return; # killing nonexistent ports is O.K.
343     delete $PORT_DATA{$port};
344    
345     my $mon = delete $LMON{$port}
346     or !@_
347 root 1.98 or AE::log die => "unmonitored local port $port died with reason: @_";
348 root 1.67
349     $_->(@_) for values %$mon;
350     }
351    
352     sub _monitor {
353     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
354     unless exists $PORT{$_[1]};
355    
356     $LMON{$_[1]}{$_[2]+0} = $_[2];
357     }
358    
359     sub _unmonitor {
360 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
361     if exists $LMON{$_[1]};
362 root 1.67 }
363    
364 root 1.83 sub _secure_check {
365 root 1.96 &$SECURE
366 root 1.83 or die "remote execution attempt by insecure node\n";
367     }
368    
369 root 1.79 our %NODE_REQ = (
370 root 1.1 # internal services
371    
372     # monitoring
373 root 1.65 mon0 => sub { # stop monitoring a port for another node
374 root 1.1 my $portid = shift;
375 root 1.96 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
376 root 1.1 },
377 root 1.65 mon1 => sub { # start monitoring a port for another node
378 root 1.1 my $portid = shift;
379 root 1.96 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
380 root 1.67 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
381 root 1.58 delete $node->{rmon}{$portid};
382 root 1.65 $node->send (["", kil0 => $portid, @_])
383 root 1.59 if $node && $node->{transport};
384 root 1.67 };
385 root 1.1 },
386 root 1.65 # another node has killed a monitored port
387     kil0 => sub {
388 root 1.96 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
389 root 1.1 or return;
390    
391     $_->(@_) for @$cbs;
392     },
393    
394 root 1.18 # "public" services - not actually public
395 root 1.1
396 root 1.65 # another node wants to kill a local port
397 root 1.66 kil => \&_kill,
398 root 1.65
399 root 1.1 # relay message to another node / generic echo
400 root 1.88 snd => sub {
401     &_secure_check;
402     &snd
403 root 1.1 },
404    
405 root 1.30 # random utilities
406 root 1.1 eval => sub {
407 root 1.83 &_secure_check;
408 root 1.50 my @res = do { package main; eval shift };
409 root 1.1 snd @_, "$@", @res if @_;
410     },
411     time => sub {
412 root 1.88 &_secure_check;
413 root 1.76 snd @_, AE::now;
414 root 1.1 },
415     devnull => sub {
416     #
417     },
418 root 1.15 "" => sub {
419 root 1.27 # empty messages are keepalives or similar devnull-applications
420 root 1.15 },
421 root 1.1 );
422    
423 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
424 root 1.1 $PORT{""} = sub {
425     my $tag = shift;
426 root 1.83 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
427 root 1.98 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
428 root 1.1 };
429    
430 root 1.84 our $NPROTO = 1;
431    
432     # tell everybody who connects our nproto
433     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
434     $_[0]{local_greeting}{nproto} = $NPROTO;
435     };
436    
437 root 1.69 #############################################################################
438 root 1.71 # seed management, try to keep connections to all seeds at all times
439 root 1.69
440 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
441     our %NODE_SEED; # map node ID to seed ID
442 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
443     our $SEED_WATCHER;
444 root 1.71 our $SEED_RETRY;
445 root 1.69
446     sub seed_connect {
447     my ($seed) = @_;
448    
449     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
450     or Carp::croak "$seed: unparsable seed address";
451    
452 root 1.98 AE::log 9 => "trying connect to seed node $seed.";
453 root 1.69
454 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
455     $host, $port,
456     on_greeted => sub {
457     # called after receiving remote greeting, learn remote node name
458    
459     # we rely on untrusted data here (the remote node name) this is
460     # hopefully ok, as this can at most be used for DOSing, which is easy
461     # when you can do MITM anyway.
462    
463     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
464     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
465 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
466 root 1.71 delete $SEED_NODE{$seed};
467     } else {
468     $SEED_NODE{$seed} = $_[0]{remote_node};
469     $NODE_SEED{$_[0]{remote_node}} = $seed;
470 root 1.93 # also start global service, if not running
471     # we need to check here in addition to the mon_nodes below
472     # because we might only learn late that a node is a seed
473     # and then we might already be connected
474     snd $_[0]{remote_node}, "g_slave"
475     unless $_[0]{remote_greeting}{global};
476 root 1.71 }
477     },
478 root 1.93 sub {
479 root 1.71 delete $SEED_CONNECT{$seed};
480     }
481 root 1.69 ;
482     }
483    
484     sub seed_all {
485 root 1.93 my @seeds = grep
486     !exists $SEED_CONNECT{$_}
487     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
488     keys %SEED_NODE;
489 root 1.69
490     if (@seeds) {
491 root 1.70 # start connection attempt for every seed we are not connected to yet
492 root 1.69 seed_connect $_
493     for @seeds;
494 root 1.71
495     $SEED_RETRY = $SEED_RETRY * 2 + rand;
496     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
497     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
498    
499     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
500    
501 root 1.69 } else {
502 root 1.71 # all seeds connected or connecting, no need to restart timer
503 root 1.69 undef $SEED_WATCHER;
504     }
505     }
506    
507     sub seed_again {
508 root 1.71 $SEED_RETRY = 1;
509     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
510 root 1.69 }
511    
512     # sets new seed list, starts connecting
513     sub set_seeds(@) {
514     %SEED_NODE = ();
515 root 1.71 %NODE_SEED = ();
516     %SEED_CONNECT = ();
517    
518 root 1.69 @SEED_NODE{@_} = ();
519    
520 root 1.71 seed_all;
521     }
522    
523     mon_nodes sub {
524 root 1.93 return unless exists $NODE_SEED{$_[0]};
525    
526     if ($_[1]) {
527     # each time a connection to a seed node goes up, make
528     # sure it runs the global service.
529     snd $_[0], "g_slave"
530     unless $NODE{$_[0]}{transport}{remote_greeting}{global};
531     } else {
532     # if we lost the connection to a seed node, make sure we are seeding
533     seed_again;
534     }
535 root 1.71 };
536    
537     #############################################################################
538     # talk with/to global nodes
539    
540 root 1.72 # protocol messages:
541     #
542 root 1.73 # sent by all slave nodes (slave to master)
543     # g_slave database - make other global node master of the sender
544 root 1.72 #
545 root 1.73 # sent by any node to global nodes
546     # g_set database - set whole database
547 root 1.89 # g_upd family set del - update single family
548 root 1.73 # g_del family key - delete key from database
549     # g_get family key reply... - send reply with data
550     #
551     # send by global nodes
552     # g_global - node became global, similar to global=1 greeting
553     #
554     # database families
555     # "'l" -> node -> listeners
556     # "'g" -> node -> undef
557     # ...
558 root 1.72 #
559    
560 root 1.73 # used on all nodes:
561 root 1.88 our $MASTER; # the global node we bind ourselves to
562 root 1.78 our $MASTER_MON;
563     our %LOCAL_DB; # this node database
564    
565     our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
566 root 1.71
567 root 1.84 our $GPROTO = 1;
568    
569     # tell everybody who connects our nproto
570     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
571     $_[0]{local_greeting}{gproto} = $GPROTO;
572     };
573    
574 root 1.73 #############################################################################
575     # master selection
576    
577     # master requests
578     our %GLOBAL_REQ; # $id => \@req
579 root 1.71
580 root 1.74 sub global_req_add {
581 root 1.80 my ($id, $req) = @_;
582 root 1.71
583 root 1.74 return if exists $GLOBAL_REQ{$id};
584    
585 root 1.80 $GLOBAL_REQ{$id} = $req;
586 root 1.71
587 root 1.80 snd $MASTER, @$req
588 root 1.73 if $MASTER;
589 root 1.74 }
590 root 1.71
591 root 1.74 sub global_req_del {
592     delete $GLOBAL_REQ{$_[0]};
593     }
594    
595 root 1.88 #################################
596     # master rpc
597    
598     our %GLOBAL_RES;
599     our $GLOBAL_RES_ID = "a";
600    
601     sub global_call {
602     my $id = ++$GLOBAL_RES_ID;
603     $GLOBAL_RES{$id} = pop;
604     global_req_add $id, [@_, $id];
605     }
606    
607     $NODE_REQ{g_reply} = sub {
608     my $id = shift;
609     global_req_del $id;
610     my $cb = delete $GLOBAL_RES{$id}
611     or return;
612     &$cb
613     };
614    
615     #################################
616    
617 root 1.74 sub g_find {
618 root 1.80 global_req_add "g_find $_[0]", [g_find => $_[0]];
619 root 1.73 }
620 root 1.71
621 root 1.73 # reply for g_find started in Node.pm
622 root 1.79 $NODE_REQ{g_found} = sub {
623 root 1.74 global_req_del "g_find $_[0]";
624    
625 root 1.73 my $node = $NODE{$_[0]} or return;
626 root 1.71
627 root 1.79 $node->connect_to ($_[1]);
628 root 1.71 };
629    
630 root 1.73 sub master_set {
631     $MASTER = $_[0];
632 root 1.71
633 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
634 root 1.71
635 root 1.73 # (re-)send queued requests
636     snd $MASTER, @$_
637     for values %GLOBAL_REQ;
638     }
639 root 1.71
640 root 1.72 sub master_search {
641 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
642     for (keys %NODE_SEED) {
643 root 1.72 if (node_is_up $_) {
644     master_set $_;
645     return;
646 root 1.71 }
647 root 1.72 }
648 root 1.71
649 root 1.78 $MASTER_MON = mon_nodes sub {
650 root 1.72 return unless $_[1]; # we are only interested in node-ups
651     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
652 root 1.71
653 root 1.72 master_set $_[0];
654 root 1.71
655 root 1.78 $MASTER_MON = mon_nodes sub {
656 root 1.72 if ($_[0] eq $MASTER && !$_[1]) {
657     undef $MASTER;
658     master_search ();
659     }
660 root 1.71 };
661 root 1.72 };
662 root 1.71 }
663    
664 root 1.73 # other node wants to make us the master
665 root 1.79 $NODE_REQ{g_slave} = sub {
666 root 1.73 my ($db) = @_;
667    
668 root 1.80 # load global module and redo the request
669 root 1.73 require AnyEvent::MP::Global;
670 root 1.80 &{ $NODE_REQ{g_slave} }
671 root 1.71 };
672    
673 root 1.73 #############################################################################
674 root 1.79 # local database operations
675 root 1.71
676 root 1.79 # local database management
677 root 1.88
678 root 1.87 sub db_set($$;$) {
679 root 1.97 my ($family, $subkey) = @_;
680    
681 root 1.89 # if (ref $_[1]) {
682     # # bulk
683     # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
684     # $LOCAL_DB{$_[0]} = $_[1];
685     # snd $MASTER, g_upd => $_[0] => $_[1], \@del
686     # if defined $MASTER;
687     # } else {
688     # single-key
689 root 1.97 $LOCAL_DB{$family}{$subkey} = $_[2];
690     snd $MASTER, g_upd => $family => { $subkey => $_[2] }
691 root 1.89 if defined $MASTER;
692     # }
693 root 1.97
694     defined wantarray
695     and Guard::guard { db_del $family => $subkey }
696 root 1.79 }
697    
698 root 1.89 sub db_del($@) {
699     my $family = shift;
700    
701     delete @{ $LOCAL_DB{$family} }{@_};
702     snd $MASTER, g_upd => $family => undef, \@_
703 root 1.79 if defined $MASTER;
704     }
705    
706 root 1.88 # database query
707    
708     sub db_family {
709     my ($family, $cb) = @_;
710     global_call g_db_family => $family, $cb;
711     }
712    
713     sub db_keys {
714     my ($family, $cb) = @_;
715     global_call g_db_keys => $family, $cb;
716     }
717    
718     sub db_values {
719     my ($family, $cb) = @_;
720     global_call g_db_values => $family, $cb;
721 root 1.80 }
722    
723 root 1.88 # database monitoring
724 root 1.80
725 root 1.81 our %LOCAL_MON; # f, reply
726     our %MON_DB; # f, k, value
727 root 1.80
728 root 1.81 sub db_mon($@) {
729 root 1.84 my ($family, $cb) = @_;
730 root 1.81
731 root 1.84 if (my $db = $MON_DB{$family}) {
732 root 1.89 # we already monitor, so create a "dummy" change event
733     # this is postponed, which might be too late (we could process
734     # change events), so disable the callback at first
735     $LOCAL_MON{$family}{$cb+0} = sub { };
736     AE::postpone {
737     return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
738    
739     # set actual callback
740     $LOCAL_MON{$family}{$cb+0} = $cb;
741     $cb->($db, [keys %$db]);
742     };
743 root 1.81 } else {
744     # new monitor, request chg1 from upstream
745 root 1.89 $LOCAL_MON{$family}{$cb+0} = $cb;
746 root 1.81 global_req_add "mon1 $family" => [g_mon1 => $family];
747     $MON_DB{$family} = {};
748     }
749    
750 root 1.90 defined wantarray
751     and Guard::guard {
752     my $mon = $LOCAL_MON{$family};
753     delete $mon->{$cb+0};
754    
755     unless (%$mon) {
756     global_req_del "mon1 $family";
757    
758     # no global_req, because we don't care if we are not connected
759     snd $MASTER, g_mon0 => $family
760     if $MASTER;
761 root 1.80
762 root 1.90 delete $LOCAL_MON{$family};
763     delete $MON_DB{$family};
764     }
765 root 1.80 }
766     }
767    
768 root 1.82 # full update
769 root 1.80 $NODE_REQ{g_chg1} = sub {
770 root 1.96 return unless $SRCNODE eq $MASTER;
771 root 1.84 my ($f, $ndb) = @_;
772    
773     my $db = $MON_DB{$f};
774 root 1.89 my (@a, @c, @d);
775 root 1.81
776 root 1.82 # add or replace keys
777 root 1.84 while (my ($k, $v) = each %$ndb) {
778 root 1.89 exists $db->{$k}
779     ? push @c, $k
780     : push @a, $k;
781 root 1.84 $db->{$k} = $v;
782 root 1.82 }
783 root 1.81
784 root 1.82 # delete keys that are no longer present
785 root 1.84 for (grep !exists $ndb->{$_}, keys %$db) {
786     delete $db->{$_};
787 root 1.89 push @d, $_;
788 root 1.81 }
789 root 1.84
790 root 1.89 $_->($db, \@a, \@c, \@d)
791 root 1.84 for values %{ $LOCAL_MON{$_[0]} };
792 root 1.80 };
793    
794 root 1.82 # incremental update
795 root 1.84 $NODE_REQ{g_chg2} = sub {
796 root 1.96 return unless $SRCNODE eq $MASTER;
797 root 1.89 my ($family, $set, $del) = @_;
798    
799     my $db = $MON_DB{$family};
800 root 1.84
801 root 1.89 my (@a, @c);
802 root 1.84
803 root 1.89 while (my ($k, $v) = each %$set) {
804     exists $db->{$k}
805     ? push @c, $k
806     : push @a, $k;
807     $db->{$k} = $v;
808     }
809    
810     delete @$db{@$del};
811    
812     $_->($db, \@a, \@c, $del)
813     for values %{ $LOCAL_MON{$family} };
814 root 1.84 };
815 root 1.80
816 root 1.69 #############################################################################
817     # configure
818    
819 root 1.81 sub nodename {
820 root 1.69 require POSIX;
821     (POSIX::uname ())[1]
822     }
823    
824     sub _resolve($) {
825     my ($nodeid) = @_;
826    
827     my $cv = AE::cv;
828     my @res;
829    
830     $cv->begin (sub {
831     my %seen;
832     my @refs;
833     for (sort { $a->[0] <=> $b->[0] } @res) {
834     push @refs, $_->[1] unless $seen{$_->[1]}++
835     }
836     shift->send (@refs);
837     });
838    
839     my $idx;
840     for my $t (split /,/, $nodeid) {
841     my $pri = ++$idx;
842    
843 root 1.81 $t = length $t ? nodename . ":$t" : nodename
844 root 1.69 if $t =~ /^\d*$/;
845    
846     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
847     or Carp::croak "$t: unparsable transport descriptor";
848    
849     $port = "0" if $port eq "*";
850    
851     if ($host eq "*") {
852     $cv->begin;
853    
854 root 1.103 my $get_addr = sub {
855     my @addr;
856    
857     require Net::Interface;
858    
859     # Net::Interface hangs on some systems, so hope for the best
860     local $SIG{ALRM} = 'DEFAULT';
861     alarm 2;
862    
863     for my $if (Net::Interface->interfaces) {
864     # we statically lower-prioritise ipv6 here, TODO :()
865     for $_ ($if->address (Net::Interface::AF_INET ())) {
866     next if /^\x7f/; # skip localhost etc.
867     push @addr, $_;
868 root 1.69 }
869 root 1.103 for ($if->address (Net::Interface::AF_INET6 ())) {
870     #next if $if->scope ($_) <= 2;
871     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
872     push @addr, $_;
873 root 1.69 }
874     }
875 root 1.103
876     alarm 0;
877    
878     @addr
879     };
880    
881     my @addr;
882    
883     if (AnyEvent::WIN32) {
884     @addr = $get_addr->();
885     } else {
886     # use a child process, as Net::Interface is big, and we need it only once.
887    
888     pipe my $r, my $w
889     or die "pipe: $!";
890    
891     if (fork eq 0) {
892     close $r;
893     syswrite $w, pack "(C/a*)*", $get_addr->();
894     require POSIX;
895     POSIX::_exit (0);
896     } else {
897     close $w;
898    
899     my $addr;
900    
901     1 while sysread $r, $addr, 1024, length $addr;
902    
903     @addr = unpack "(C/a*)*", $addr;
904     }
905     }
906    
907     for my $ip (@addr) {
908     push @res, [
909     $pri += 1e-5,
910     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
911     ];
912     }
913     $cv->end;
914 root 1.69 } else {
915     $cv->begin;
916     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
917     for (@_) {
918     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
919     push @res, [
920     $pri += 1e-5,
921     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
922     ];
923     }
924     $cv->end;
925     };
926     }
927     }
928    
929     $cv->end;
930    
931     $cv
932     }
933    
934     sub configure(@) {
935     unshift @_, "profile" if @_ & 1;
936     my (%kv) = @_;
937    
938     delete $NODE{$NODE}; # we do not support doing stuff before configure
939     _init_names;
940    
941     my $profile = delete $kv{profile};
942    
943 root 1.81 $profile = nodename
944 root 1.69 unless defined $profile;
945    
946     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
947    
948 root 1.83 if (exists $CONFIG->{secure}) {
949 root 1.100 $SECURE = eval +($CONFIG->{secure} ? "sub { 0 }" : "sub { 1 }");
950 root 1.83 }
951    
952 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
953 root 1.69
954     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
955    
956 root 1.72 $NODE = $node;
957 root 1.77
958 root 1.81 $NODE =~ s/%n/nodename/ge;
959    
960     if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
961 root 1.77 # nodes with randomised node names do not need randomised port names
962     $UNIQ = "";
963     }
964 root 1.69
965     $NODE{$NODE} = $NODE{""};
966     $NODE{$NODE}{id} = $NODE;
967    
968     my $seeds = $CONFIG->{seeds};
969     my $binds = $CONFIG->{binds};
970    
971     $binds ||= ["*"];
972    
973 root 1.98 AE::log 8 => "node $NODE starting up.";
974 root 1.69
975 root 1.85 $BINDS = [];
976     %BINDS = ();
977 root 1.69
978     for (map _resolve $_, @$binds) {
979     for my $bind ($_->recv) {
980     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
981     or Carp::croak "$bind: unparsable local bind address";
982    
983     my $listener = AnyEvent::MP::Transport::mp_server
984     $host,
985     $port,
986     prepare => sub {
987     my (undef, $host, $port) = @_;
988     $bind = AnyEvent::Socket::format_hostport $host, $port;
989     0
990     },
991     ;
992 root 1.85 $BINDS{$bind} = $listener;
993     push @$BINDS, $bind;
994 root 1.69 }
995     }
996    
997 root 1.85 db_set "'l" => $NODE => $BINDS;
998 root 1.73
999 root 1.98 AE::log 8 => "node listens on [@$BINDS].";
1000 root 1.69
1001     # connect to all seednodes
1002     set_seeds map $_->recv, map _resolve $_, @$seeds;
1003    
1004 root 1.73 master_search;
1005    
1006 root 1.103 # save gobs of memory
1007     undef &_resolve;
1008     *configure = sub (@){ };
1009    
1010 root 1.69 for (@{ $CONFIG->{services} }) {
1011     if (ref) {
1012     my ($func, @args) = @$_;
1013     (load_func $func)->(@args);
1014     } elsif (s/::$//) {
1015     eval "require $_";
1016     die $@ if $@;
1017     } else {
1018     (load_func $_)->();
1019     }
1020     }
1021 root 1.102
1022     eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
1023     die "$@" if $@;
1024 root 1.69 }
1025    
1026 root 1.1 =back
1027    
1028 root 1.101 =head1 LOGGING
1029    
1030     AnyEvent::MP::Kernel logs high-level information about the current node,
1031     when nodes go up and down, and most runtime errors. It also logs some
1032     debugging and trace messages about network maintainance, such as seed
1033     connections and global node management.
1034    
1035 root 1.1 =head1 SEE ALSO
1036    
1037     L<AnyEvent::MP>.
1038    
1039     =head1 AUTHOR
1040    
1041     Marc Lehmann <schmorp@schmorp.de>
1042     http://home.schmorp.de/
1043    
1044     =cut
1045    
1046     1
1047