ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.105
Committed: Fri Mar 23 13:46:03 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.104: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27     use Carp ();
28    
29 root 1.79 use AnyEvent ();
30     use Guard ();
31 root 1.1
32     use AnyEvent::MP::Node;
33     use AnyEvent::MP::Transport;
34    
35     use base "Exporter";
36    
37 root 1.71 our @EXPORT_OK = qw(
38     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
39     );
40    
41 root 1.1 our @EXPORT = qw(
42 root 1.21 add_node load_func snd_to_func snd_on eval_on
43 root 1.1
44 root 1.34 NODE $NODE node_of snd kil port_is_local
45     configure
46 root 1.46 up_nodes mon_nodes node_is_up
47 root 1.97 db_set db_del
48 root 1.84 db_mon db_family db_keys db_values
49 root 1.1 );
50    
51 root 1.6 sub load_func($) {
52     my $func = $_[0];
53    
54     unless (defined &$func) {
55     my $pkg = $func;
56     do {
57     $pkg =~ s/::[^:]+$//
58 root 1.63 or return sub { die "unable to resolve function '$func'" };
59 root 1.60
60     local $@;
61 root 1.61 unless (eval "require $pkg; 1") {
62     my $error = $@;
63     $error =~ /^Can't locate .*.pm in \@INC \(/
64     or return sub { die $error };
65     }
66 root 1.6 } until defined &$func;
67     }
68    
69     \&$func
70     }
71    
72 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
73    
74 root 1.1 sub nonce($) {
75 root 1.78 join "", map chr rand 256, 1 .. $_[0]
76 root 1.1 }
77    
78 root 1.78 sub nonce62($) {
79     join "", map $alnum[rand 62], 1 .. $_[0]
80 root 1.1 }
81    
82     sub gen_uniq {
83 root 1.78 my $now = AE::now;
84     (join "",
85     map $alnum[$_],
86     $$ / 62 % 62,
87     $$ % 62,
88     (int $now ) % 62,
89     (int $now * 100) % 62,
90     (int $now * 10000) % 62,
91     ) . nonce62 4;
92 root 1.1 }
93    
94 root 1.20 our $CONFIG; # this node's configuration
95 root 1.104 our $SECURE;
96 root 1.21
97 root 1.64 our $RUNIQ; # remote uniq value
98     our $UNIQ; # per-process/node unique cookie
99     our $NODE;
100     our $ID = "a";
101 root 1.1
102     our %NODE; # node id to transport mapping, or "undef", for local node
103     our (%PORT, %PORT_DATA); # local ports
104    
105 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
106 root 1.1 our %LMON; # monitored _local_ ports
107    
108 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
109 root 1.85 our %BINDS;
110     our $BINDS; # our listeners, as arrayref
111 root 1.1
112 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
113 root 1.1
114 root 1.69 sub _init_names {
115 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
116     $UNIQ = gen_uniq;
117    
118     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
119     $RUNIQ = nonce62 10;
120     $RUNIQ =~ s/(.)$/\U$1/;
121    
122     $NODE = "anon/$RUNIQ";
123 root 1.64 }
124    
125 root 1.69 _init_names;
126 root 1.64
127 root 1.1 sub NODE() {
128     $NODE
129     }
130    
131     sub node_of($) {
132 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
133 root 1.1
134 root 1.21 $node
135 root 1.1 }
136    
137 root 1.17 BEGIN {
138     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
139     ? sub () { 1 }
140     : sub () { 0 };
141     }
142 root 1.1
143 root 1.42 our $DELAY_TIMER;
144     our @DELAY_QUEUE;
145    
146 root 1.97 our $delay_run = sub {
147 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
148 root 1.97 };
149 root 1.42
150     sub delay($) {
151     push @DELAY_QUEUE, shift;
152 root 1.97 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
153 root 1.42 }
154    
155 root 1.96 =item $AnyEvent::MP::Kernel::SRCNODE
156    
157     During execution of a message callback, this variable contains the node ID
158     of the origin node.
159    
160     The main use of this variable is for debugging output - there are probably
161     very few other cases where you need to know the source node ID.
162    
163     =cut
164    
165 root 1.1 sub _inject {
166 root 1.96 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
167 root 1.1 &{ $PORT{+shift} or return };
168     }
169    
170 root 1.20 # this function adds a node-ref, so you can send stuff to it
171     # it is basically the central routing component.
172 root 1.1 sub add_node {
173 root 1.21 my ($node) = @_;
174 root 1.1
175 root 1.94 length $node
176     or Carp::croak "'undef' or the empty string are not valid node/port IDs";
177 root 1.93
178 root 1.71 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
179 root 1.13 }
180    
181 root 1.1 sub snd(@) {
182 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
183 root 1.1
184 root 1.86 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
185 root 1.1
186 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
187 root 1.2 ->{send} (["$portid", @_]);
188 root 1.1 }
189    
190 root 1.17 =item $is_local = port_is_local $port
191    
192     Returns true iff the port is a local port.
193    
194     =cut
195    
196     sub port_is_local($) {
197 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
198 root 1.17
199 root 1.21 $NODE{$nodeid} == $NODE{""}
200 root 1.17 }
201    
202 root 1.18 =item snd_to_func $node, $func, @args
203 root 1.11
204 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
205 root 1.11 this function with the given arguments on that node.
206    
207 root 1.20 This function can be used to implement C<spawn>-like interfaces.
208 root 1.11
209     =cut
210    
211 root 1.18 sub snd_to_func($$;@) {
212 root 1.21 my $nodeid = shift;
213 root 1.11
214 root 1.41 # on $NODE, we artificially delay... (for spawn)
215     # this is very ugly - maybe we should simply delay ALL messages,
216     # to avoid deep recursion issues. but that's so... slow...
217 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
218     if $nodeid ne $NODE;
219    
220 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
221 root 1.11 }
222    
223 root 1.18 =item snd_on $node, @msg
224    
225     Executes C<snd> with the given C<@msg> (which must include the destination
226     port) on the given node.
227    
228     =cut
229    
230     sub snd_on($@) {
231     my $node = shift;
232     snd $node, snd => @_;
233     }
234    
235 root 1.29 =item eval_on $node, $string[, @reply]
236 root 1.18
237 root 1.29 Evaluates the given string as Perl expression on the given node. When
238     @reply is specified, then it is used to construct a reply message with
239     C<"$@"> and any results from the eval appended.
240 root 1.18
241     =cut
242    
243 root 1.29 sub eval_on($$;@) {
244 root 1.18 my $node = shift;
245     snd $node, eval => @_;
246     }
247    
248 root 1.1 sub kil(@) {
249 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
250 root 1.1
251     length $portid
252 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
253 root 1.1
254 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
255 root 1.1 ->kill ("$portid", @_);
256     }
257    
258     #############################################################################
259 root 1.6 # node monitoring and info
260 root 1.3
261 root 1.21 =item node_is_up $nodeid
262 root 1.13
263     Returns true if the given node is "up", that is, the kernel thinks it has
264     a working connection to it.
265    
266 root 1.95 If the node is up, returns C<1>. If the node is currently connecting or
267     otherwise known but not connected, returns C<0>. If nothing is known about
268     the node, returns C<undef>.
269 root 1.13
270     =cut
271    
272     sub node_is_up($) {
273     ($NODE{$_[0]} or return)->{transport}
274     ? 1 : 0
275     }
276    
277 root 1.3 =item up_nodes
278    
279 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
280     the node itself).
281 root 1.3
282     =cut
283    
284 root 1.49 sub up_nodes() {
285 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
286 root 1.3 }
287    
288 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
289 root 1.3
290 root 1.27 Registers a callback that is called each time a node goes up (a connection
291     is established) or down (the connection is lost).
292 root 1.3
293     Node up messages can only be followed by node down messages for the same
294     node, and vice versa.
295    
296 root 1.71 Note that monitoring a node is usually better done by monitoring its node
297 root 1.27 port. This function is mainly of interest to modules that are concerned
298     about the network topology and low-level connection handling.
299    
300     Callbacks I<must not> block and I<should not> send any messages.
301    
302     The function returns an optional guard which can be used to unregister
303 root 1.3 the monitoring callback again.
304    
305 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
306     or go up (and down).
307    
308     newnode $_, 1 for up_nodes;
309     mon_nodes \&newnode;
310    
311 root 1.3 =cut
312    
313     our %MON_NODES;
314    
315     sub mon_nodes($) {
316     my ($cb) = @_;
317    
318     $MON_NODES{$cb+0} = $cb;
319    
320 root 1.90 defined wantarray
321     and Guard::guard { delete $MON_NODES{$cb+0} }
322 root 1.3 }
323    
324     sub _inject_nodeevent($$;@) {
325 root 1.16 my ($node, $up, @reason) = @_;
326 root 1.3
327     for my $cb (values %MON_NODES) {
328 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
329 root 1.98 or AE::log die => $@;
330 root 1.3 }
331 root 1.16
332 root 1.102 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
333 root 1.3 }
334    
335     #############################################################################
336 root 1.1 # self node code
337    
338 root 1.67 sub _kill {
339     my $port = shift;
340    
341     delete $PORT{$port}
342     or return; # killing nonexistent ports is O.K.
343     delete $PORT_DATA{$port};
344    
345     my $mon = delete $LMON{$port}
346     or !@_
347 root 1.98 or AE::log die => "unmonitored local port $port died with reason: @_";
348 root 1.67
349     $_->(@_) for values %$mon;
350     }
351    
352     sub _monitor {
353     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
354     unless exists $PORT{$_[1]};
355    
356     $LMON{$_[1]}{$_[2]+0} = $_[2];
357     }
358    
359     sub _unmonitor {
360 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
361     if exists $LMON{$_[1]};
362 root 1.67 }
363    
364 root 1.83 sub _secure_check {
365 root 1.104 $SECURE
366 root 1.105 and die "remote execution not allowed\n";
367 root 1.83 }
368    
369 root 1.79 our %NODE_REQ = (
370 root 1.1 # internal services
371    
372     # monitoring
373 root 1.65 mon0 => sub { # stop monitoring a port for another node
374 root 1.1 my $portid = shift;
375 root 1.96 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
376 root 1.1 },
377 root 1.65 mon1 => sub { # start monitoring a port for another node
378 root 1.1 my $portid = shift;
379 root 1.96 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
380 root 1.67 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
381 root 1.58 delete $node->{rmon}{$portid};
382 root 1.65 $node->send (["", kil0 => $portid, @_])
383 root 1.59 if $node && $node->{transport};
384 root 1.67 };
385 root 1.1 },
386 root 1.65 # another node has killed a monitored port
387     kil0 => sub {
388 root 1.96 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
389 root 1.1 or return;
390    
391     $_->(@_) for @$cbs;
392     },
393    
394 root 1.18 # "public" services - not actually public
395 root 1.1
396 root 1.65 # another node wants to kill a local port
397 root 1.66 kil => \&_kill,
398 root 1.65
399 root 1.1 # relay message to another node / generic echo
400 root 1.88 snd => sub {
401     &_secure_check;
402     &snd
403 root 1.1 },
404    
405 root 1.30 # random utilities
406 root 1.1 eval => sub {
407 root 1.83 &_secure_check;
408 root 1.50 my @res = do { package main; eval shift };
409 root 1.1 snd @_, "$@", @res if @_;
410     },
411     time => sub {
412 root 1.88 &_secure_check;
413 root 1.76 snd @_, AE::now;
414 root 1.1 },
415     devnull => sub {
416     #
417     },
418 root 1.15 "" => sub {
419 root 1.27 # empty messages are keepalives or similar devnull-applications
420 root 1.15 },
421 root 1.1 );
422    
423 root 1.104 # the node port
424 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
425 root 1.1 $PORT{""} = sub {
426     my $tag = shift;
427 root 1.83 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
428 root 1.98 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
429 root 1.1 };
430    
431 root 1.84 our $NPROTO = 1;
432    
433     # tell everybody who connects our nproto
434     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
435     $_[0]{local_greeting}{nproto} = $NPROTO;
436     };
437    
438 root 1.69 #############################################################################
439 root 1.71 # seed management, try to keep connections to all seeds at all times
440 root 1.69
441 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
442     our %NODE_SEED; # map node ID to seed ID
443 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
444     our $SEED_WATCHER;
445 root 1.71 our $SEED_RETRY;
446 root 1.69
447     sub seed_connect {
448     my ($seed) = @_;
449    
450     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
451     or Carp::croak "$seed: unparsable seed address";
452    
453 root 1.98 AE::log 9 => "trying connect to seed node $seed.";
454 root 1.69
455 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
456     $host, $port,
457     on_greeted => sub {
458     # called after receiving remote greeting, learn remote node name
459    
460     # we rely on untrusted data here (the remote node name) this is
461     # hopefully ok, as this can at most be used for DOSing, which is easy
462     # when you can do MITM anyway.
463    
464     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
465     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
466 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
467 root 1.71 delete $SEED_NODE{$seed};
468     } else {
469     $SEED_NODE{$seed} = $_[0]{remote_node};
470     $NODE_SEED{$_[0]{remote_node}} = $seed;
471 root 1.93 # also start global service, if not running
472     # we need to check here in addition to the mon_nodes below
473     # because we might only learn late that a node is a seed
474     # and then we might already be connected
475     snd $_[0]{remote_node}, "g_slave"
476     unless $_[0]{remote_greeting}{global};
477 root 1.71 }
478     },
479 root 1.93 sub {
480 root 1.71 delete $SEED_CONNECT{$seed};
481     }
482 root 1.69 ;
483     }
484    
485     sub seed_all {
486 root 1.93 my @seeds = grep
487     !exists $SEED_CONNECT{$_}
488     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
489     keys %SEED_NODE;
490 root 1.69
491     if (@seeds) {
492 root 1.70 # start connection attempt for every seed we are not connected to yet
493 root 1.69 seed_connect $_
494     for @seeds;
495 root 1.71
496     $SEED_RETRY = $SEED_RETRY * 2 + rand;
497     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
498     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
499    
500     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
501    
502 root 1.69 } else {
503 root 1.71 # all seeds connected or connecting, no need to restart timer
504 root 1.69 undef $SEED_WATCHER;
505     }
506     }
507    
508     sub seed_again {
509 root 1.71 $SEED_RETRY = 1;
510     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
511 root 1.69 }
512    
513     # sets new seed list, starts connecting
514     sub set_seeds(@) {
515     %SEED_NODE = ();
516 root 1.71 %NODE_SEED = ();
517     %SEED_CONNECT = ();
518    
519 root 1.69 @SEED_NODE{@_} = ();
520    
521 root 1.71 seed_all;
522     }
523    
524     mon_nodes sub {
525 root 1.93 return unless exists $NODE_SEED{$_[0]};
526    
527     if ($_[1]) {
528     # each time a connection to a seed node goes up, make
529     # sure it runs the global service.
530     snd $_[0], "g_slave"
531     unless $NODE{$_[0]}{transport}{remote_greeting}{global};
532     } else {
533     # if we lost the connection to a seed node, make sure we are seeding
534     seed_again;
535     }
536 root 1.71 };
537    
538     #############################################################################
539     # talk with/to global nodes
540    
541 root 1.72 # protocol messages:
542     #
543 root 1.73 # sent by all slave nodes (slave to master)
544     # g_slave database - make other global node master of the sender
545 root 1.72 #
546 root 1.73 # sent by any node to global nodes
547     # g_set database - set whole database
548 root 1.89 # g_upd family set del - update single family
549 root 1.73 # g_del family key - delete key from database
550     # g_get family key reply... - send reply with data
551     #
552     # send by global nodes
553     # g_global - node became global, similar to global=1 greeting
554     #
555     # database families
556     # "'l" -> node -> listeners
557     # "'g" -> node -> undef
558     # ...
559 root 1.72 #
560    
561 root 1.73 # used on all nodes:
562 root 1.88 our $MASTER; # the global node we bind ourselves to
563 root 1.78 our $MASTER_MON;
564     our %LOCAL_DB; # this node database
565    
566     our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
567 root 1.71
568 root 1.84 our $GPROTO = 1;
569    
570     # tell everybody who connects our nproto
571     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
572     $_[0]{local_greeting}{gproto} = $GPROTO;
573     };
574    
575 root 1.73 #############################################################################
576     # master selection
577    
578     # master requests
579     our %GLOBAL_REQ; # $id => \@req
580 root 1.71
581 root 1.74 sub global_req_add {
582 root 1.80 my ($id, $req) = @_;
583 root 1.71
584 root 1.74 return if exists $GLOBAL_REQ{$id};
585    
586 root 1.80 $GLOBAL_REQ{$id} = $req;
587 root 1.71
588 root 1.80 snd $MASTER, @$req
589 root 1.73 if $MASTER;
590 root 1.74 }
591 root 1.71
592 root 1.74 sub global_req_del {
593     delete $GLOBAL_REQ{$_[0]};
594     }
595    
596 root 1.88 #################################
597     # master rpc
598    
599     our %GLOBAL_RES;
600     our $GLOBAL_RES_ID = "a";
601    
602     sub global_call {
603     my $id = ++$GLOBAL_RES_ID;
604     $GLOBAL_RES{$id} = pop;
605     global_req_add $id, [@_, $id];
606     }
607    
608     $NODE_REQ{g_reply} = sub {
609     my $id = shift;
610     global_req_del $id;
611     my $cb = delete $GLOBAL_RES{$id}
612     or return;
613     &$cb
614     };
615    
616     #################################
617    
618 root 1.74 sub g_find {
619 root 1.80 global_req_add "g_find $_[0]", [g_find => $_[0]];
620 root 1.73 }
621 root 1.71
622 root 1.73 # reply for g_find started in Node.pm
623 root 1.79 $NODE_REQ{g_found} = sub {
624 root 1.74 global_req_del "g_find $_[0]";
625    
626 root 1.73 my $node = $NODE{$_[0]} or return;
627 root 1.71
628 root 1.79 $node->connect_to ($_[1]);
629 root 1.71 };
630    
631 root 1.73 sub master_set {
632     $MASTER = $_[0];
633 root 1.71
634 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
635 root 1.71
636 root 1.73 # (re-)send queued requests
637     snd $MASTER, @$_
638     for values %GLOBAL_REQ;
639     }
640 root 1.71
641 root 1.72 sub master_search {
642 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
643     for (keys %NODE_SEED) {
644 root 1.72 if (node_is_up $_) {
645     master_set $_;
646     return;
647 root 1.71 }
648 root 1.72 }
649 root 1.71
650 root 1.78 $MASTER_MON = mon_nodes sub {
651 root 1.72 return unless $_[1]; # we are only interested in node-ups
652     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
653 root 1.71
654 root 1.72 master_set $_[0];
655 root 1.71
656 root 1.78 $MASTER_MON = mon_nodes sub {
657 root 1.72 if ($_[0] eq $MASTER && !$_[1]) {
658     undef $MASTER;
659     master_search ();
660     }
661 root 1.71 };
662 root 1.72 };
663 root 1.71 }
664    
665 root 1.73 # other node wants to make us the master
666 root 1.79 $NODE_REQ{g_slave} = sub {
667 root 1.73 my ($db) = @_;
668    
669 root 1.80 # load global module and redo the request
670 root 1.73 require AnyEvent::MP::Global;
671 root 1.80 &{ $NODE_REQ{g_slave} }
672 root 1.71 };
673    
674 root 1.73 #############################################################################
675 root 1.79 # local database operations
676 root 1.71
677 root 1.79 # local database management
678 root 1.88
679 root 1.87 sub db_set($$;$) {
680 root 1.97 my ($family, $subkey) = @_;
681    
682 root 1.89 # if (ref $_[1]) {
683     # # bulk
684     # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
685     # $LOCAL_DB{$_[0]} = $_[1];
686     # snd $MASTER, g_upd => $_[0] => $_[1], \@del
687     # if defined $MASTER;
688     # } else {
689     # single-key
690 root 1.97 $LOCAL_DB{$family}{$subkey} = $_[2];
691     snd $MASTER, g_upd => $family => { $subkey => $_[2] }
692 root 1.89 if defined $MASTER;
693     # }
694 root 1.97
695     defined wantarray
696     and Guard::guard { db_del $family => $subkey }
697 root 1.79 }
698    
699 root 1.89 sub db_del($@) {
700     my $family = shift;
701    
702     delete @{ $LOCAL_DB{$family} }{@_};
703     snd $MASTER, g_upd => $family => undef, \@_
704 root 1.79 if defined $MASTER;
705     }
706    
707 root 1.88 # database query
708    
709     sub db_family {
710     my ($family, $cb) = @_;
711     global_call g_db_family => $family, $cb;
712     }
713    
714     sub db_keys {
715     my ($family, $cb) = @_;
716     global_call g_db_keys => $family, $cb;
717     }
718    
719     sub db_values {
720     my ($family, $cb) = @_;
721     global_call g_db_values => $family, $cb;
722 root 1.80 }
723    
724 root 1.88 # database monitoring
725 root 1.80
726 root 1.81 our %LOCAL_MON; # f, reply
727     our %MON_DB; # f, k, value
728 root 1.80
729 root 1.81 sub db_mon($@) {
730 root 1.84 my ($family, $cb) = @_;
731 root 1.81
732 root 1.84 if (my $db = $MON_DB{$family}) {
733 root 1.89 # we already monitor, so create a "dummy" change event
734     # this is postponed, which might be too late (we could process
735     # change events), so disable the callback at first
736     $LOCAL_MON{$family}{$cb+0} = sub { };
737     AE::postpone {
738     return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
739    
740     # set actual callback
741     $LOCAL_MON{$family}{$cb+0} = $cb;
742     $cb->($db, [keys %$db]);
743     };
744 root 1.81 } else {
745     # new monitor, request chg1 from upstream
746 root 1.89 $LOCAL_MON{$family}{$cb+0} = $cb;
747 root 1.81 global_req_add "mon1 $family" => [g_mon1 => $family];
748     $MON_DB{$family} = {};
749     }
750    
751 root 1.90 defined wantarray
752     and Guard::guard {
753     my $mon = $LOCAL_MON{$family};
754     delete $mon->{$cb+0};
755    
756     unless (%$mon) {
757     global_req_del "mon1 $family";
758    
759     # no global_req, because we don't care if we are not connected
760     snd $MASTER, g_mon0 => $family
761     if $MASTER;
762 root 1.80
763 root 1.90 delete $LOCAL_MON{$family};
764     delete $MON_DB{$family};
765     }
766 root 1.80 }
767     }
768    
769 root 1.82 # full update
770 root 1.80 $NODE_REQ{g_chg1} = sub {
771 root 1.96 return unless $SRCNODE eq $MASTER;
772 root 1.84 my ($f, $ndb) = @_;
773    
774     my $db = $MON_DB{$f};
775 root 1.89 my (@a, @c, @d);
776 root 1.81
777 root 1.82 # add or replace keys
778 root 1.84 while (my ($k, $v) = each %$ndb) {
779 root 1.89 exists $db->{$k}
780     ? push @c, $k
781     : push @a, $k;
782 root 1.84 $db->{$k} = $v;
783 root 1.82 }
784 root 1.81
785 root 1.82 # delete keys that are no longer present
786 root 1.84 for (grep !exists $ndb->{$_}, keys %$db) {
787     delete $db->{$_};
788 root 1.89 push @d, $_;
789 root 1.81 }
790 root 1.84
791 root 1.89 $_->($db, \@a, \@c, \@d)
792 root 1.84 for values %{ $LOCAL_MON{$_[0]} };
793 root 1.80 };
794    
795 root 1.82 # incremental update
796 root 1.84 $NODE_REQ{g_chg2} = sub {
797 root 1.96 return unless $SRCNODE eq $MASTER;
798 root 1.89 my ($family, $set, $del) = @_;
799    
800     my $db = $MON_DB{$family};
801 root 1.84
802 root 1.89 my (@a, @c);
803 root 1.84
804 root 1.89 while (my ($k, $v) = each %$set) {
805     exists $db->{$k}
806     ? push @c, $k
807     : push @a, $k;
808     $db->{$k} = $v;
809     }
810    
811     delete @$db{@$del};
812    
813     $_->($db, \@a, \@c, $del)
814     for values %{ $LOCAL_MON{$family} };
815 root 1.84 };
816 root 1.80
817 root 1.69 #############################################################################
818     # configure
819    
820 root 1.81 sub nodename {
821 root 1.69 require POSIX;
822     (POSIX::uname ())[1]
823     }
824    
825     sub _resolve($) {
826     my ($nodeid) = @_;
827    
828     my $cv = AE::cv;
829     my @res;
830    
831     $cv->begin (sub {
832     my %seen;
833     my @refs;
834     for (sort { $a->[0] <=> $b->[0] } @res) {
835     push @refs, $_->[1] unless $seen{$_->[1]}++
836     }
837     shift->send (@refs);
838     });
839    
840     my $idx;
841     for my $t (split /,/, $nodeid) {
842     my $pri = ++$idx;
843    
844 root 1.81 $t = length $t ? nodename . ":$t" : nodename
845 root 1.69 if $t =~ /^\d*$/;
846    
847     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
848     or Carp::croak "$t: unparsable transport descriptor";
849    
850     $port = "0" if $port eq "*";
851    
852     if ($host eq "*") {
853     $cv->begin;
854    
855 root 1.103 my $get_addr = sub {
856     my @addr;
857    
858     require Net::Interface;
859    
860     # Net::Interface hangs on some systems, so hope for the best
861     local $SIG{ALRM} = 'DEFAULT';
862     alarm 2;
863    
864     for my $if (Net::Interface->interfaces) {
865     # we statically lower-prioritise ipv6 here, TODO :()
866     for $_ ($if->address (Net::Interface::AF_INET ())) {
867     next if /^\x7f/; # skip localhost etc.
868     push @addr, $_;
869 root 1.69 }
870 root 1.103 for ($if->address (Net::Interface::AF_INET6 ())) {
871     #next if $if->scope ($_) <= 2;
872     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
873     push @addr, $_;
874 root 1.69 }
875     }
876 root 1.103
877     alarm 0;
878    
879     @addr
880     };
881    
882     my @addr;
883    
884     if (AnyEvent::WIN32) {
885     @addr = $get_addr->();
886     } else {
887     # use a child process, as Net::Interface is big, and we need it only once.
888    
889     pipe my $r, my $w
890     or die "pipe: $!";
891    
892     if (fork eq 0) {
893     close $r;
894     syswrite $w, pack "(C/a*)*", $get_addr->();
895     require POSIX;
896     POSIX::_exit (0);
897     } else {
898     close $w;
899    
900     my $addr;
901    
902     1 while sysread $r, $addr, 1024, length $addr;
903    
904     @addr = unpack "(C/a*)*", $addr;
905     }
906     }
907    
908     for my $ip (@addr) {
909     push @res, [
910     $pri += 1e-5,
911     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
912     ];
913     }
914     $cv->end;
915 root 1.69 } else {
916     $cv->begin;
917     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
918     for (@_) {
919     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
920     push @res, [
921     $pri += 1e-5,
922     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
923     ];
924     }
925     $cv->end;
926     };
927     }
928     }
929    
930     $cv->end;
931    
932     $cv
933     }
934    
935     sub configure(@) {
936     unshift @_, "profile" if @_ & 1;
937     my (%kv) = @_;
938    
939     delete $NODE{$NODE}; # we do not support doing stuff before configure
940     _init_names;
941    
942     my $profile = delete $kv{profile};
943    
944 root 1.81 $profile = nodename
945 root 1.69 unless defined $profile;
946    
947     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
948    
949 root 1.104 $SECURE = $CONFIG->{secure};
950 root 1.83
951 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
952 root 1.69
953     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
954    
955 root 1.72 $NODE = $node;
956 root 1.77
957 root 1.81 $NODE =~ s/%n/nodename/ge;
958    
959     if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
960 root 1.77 # nodes with randomised node names do not need randomised port names
961     $UNIQ = "";
962     }
963 root 1.69
964     $NODE{$NODE} = $NODE{""};
965     $NODE{$NODE}{id} = $NODE;
966    
967     my $seeds = $CONFIG->{seeds};
968     my $binds = $CONFIG->{binds};
969    
970     $binds ||= ["*"];
971    
972 root 1.98 AE::log 8 => "node $NODE starting up.";
973 root 1.69
974 root 1.85 $BINDS = [];
975     %BINDS = ();
976 root 1.69
977     for (map _resolve $_, @$binds) {
978     for my $bind ($_->recv) {
979     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
980     or Carp::croak "$bind: unparsable local bind address";
981    
982     my $listener = AnyEvent::MP::Transport::mp_server
983     $host,
984     $port,
985     prepare => sub {
986     my (undef, $host, $port) = @_;
987     $bind = AnyEvent::Socket::format_hostport $host, $port;
988     0
989     },
990     ;
991 root 1.85 $BINDS{$bind} = $listener;
992     push @$BINDS, $bind;
993 root 1.69 }
994     }
995    
996 root 1.85 db_set "'l" => $NODE => $BINDS;
997 root 1.73
998 root 1.98 AE::log 8 => "node listens on [@$BINDS].";
999 root 1.69
1000     # connect to all seednodes
1001     set_seeds map $_->recv, map _resolve $_, @$seeds;
1002    
1003 root 1.73 master_search;
1004    
1005 root 1.103 # save gobs of memory
1006     undef &_resolve;
1007     *configure = sub (@){ };
1008    
1009 root 1.69 for (@{ $CONFIG->{services} }) {
1010     if (ref) {
1011     my ($func, @args) = @$_;
1012     (load_func $func)->(@args);
1013     } elsif (s/::$//) {
1014     eval "require $_";
1015     die $@ if $@;
1016     } else {
1017     (load_func $_)->();
1018     }
1019     }
1020 root 1.102
1021     eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
1022     die "$@" if $@;
1023 root 1.69 }
1024    
1025 root 1.1 =back
1026    
1027 root 1.101 =head1 LOGGING
1028    
1029     AnyEvent::MP::Kernel logs high-level information about the current node,
1030     when nodes go up and down, and most runtime errors. It also logs some
1031     debugging and trace messages about network maintainance, such as seed
1032     connections and global node management.
1033    
1034 root 1.1 =head1 SEE ALSO
1035    
1036     L<AnyEvent::MP>.
1037    
1038     =head1 AUTHOR
1039    
1040     Marc Lehmann <schmorp@schmorp.de>
1041     http://home.schmorp.de/
1042    
1043     =cut
1044    
1045     1
1046