ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.100
Committed: Thu Mar 22 01:24:26 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.99: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29    
30 root 1.79 use AnyEvent ();
31     use Guard ();
32 root 1.1
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.71 our @EXPORT_OK = qw(
39     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40     );
41    
42 root 1.1 our @EXPORT = qw(
43 root 1.21 add_node load_func snd_to_func snd_on eval_on
44 root 1.1
45 root 1.34 NODE $NODE node_of snd kil port_is_local
46     configure
47 root 1.46 up_nodes mon_nodes node_is_up
48 root 1.97 db_set db_del
49 root 1.84 db_mon db_family db_keys db_values
50 root 1.1 );
51    
52 root 1.6 sub load_func($) {
53     my $func = $_[0];
54    
55     unless (defined &$func) {
56     my $pkg = $func;
57     do {
58     $pkg =~ s/::[^:]+$//
59 root 1.63 or return sub { die "unable to resolve function '$func'" };
60 root 1.60
61     local $@;
62 root 1.61 unless (eval "require $pkg; 1") {
63     my $error = $@;
64     $error =~ /^Can't locate .*.pm in \@INC \(/
65     or return sub { die $error };
66     }
67 root 1.6 } until defined &$func;
68     }
69    
70     \&$func
71     }
72    
73 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
74    
75 root 1.1 sub nonce($) {
76 root 1.78 join "", map chr rand 256, 1 .. $_[0]
77 root 1.1 }
78    
79 root 1.78 sub nonce62($) {
80     join "", map $alnum[rand 62], 1 .. $_[0]
81 root 1.1 }
82    
83     sub gen_uniq {
84 root 1.78 my $now = AE::now;
85     (join "",
86     map $alnum[$_],
87     $$ / 62 % 62,
88     $$ % 62,
89     (int $now ) % 62,
90     (int $now * 100) % 62,
91     (int $now * 10000) % 62,
92     ) . nonce62 4;
93 root 1.1 }
94    
95 root 1.20 our $CONFIG; # this node's configuration
96 root 1.83 our $SECURE = sub { 1 };
97 root 1.21
98 root 1.64 our $RUNIQ; # remote uniq value
99     our $UNIQ; # per-process/node unique cookie
100     our $NODE;
101     our $ID = "a";
102 root 1.1
103     our %NODE; # node id to transport mapping, or "undef", for local node
104     our (%PORT, %PORT_DATA); # local ports
105    
106 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
107 root 1.1 our %LMON; # monitored _local_ ports
108    
109 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
110 root 1.85 our %BINDS;
111     our $BINDS; # our listeners, as arrayref
112 root 1.1
113 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
114 root 1.1
115 root 1.69 sub _init_names {
116 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
117     $UNIQ = gen_uniq;
118    
119     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
120     $RUNIQ = nonce62 10;
121     $RUNIQ =~ s/(.)$/\U$1/;
122    
123     $NODE = "anon/$RUNIQ";
124 root 1.64 }
125    
126 root 1.69 _init_names;
127 root 1.64
128 root 1.1 sub NODE() {
129     $NODE
130     }
131    
132     sub node_of($) {
133 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
134 root 1.1
135 root 1.21 $node
136 root 1.1 }
137    
138 root 1.17 BEGIN {
139     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
140     ? sub () { 1 }
141     : sub () { 0 };
142     }
143 root 1.1
144 root 1.42 our $DELAY_TIMER;
145     our @DELAY_QUEUE;
146    
147 root 1.97 our $delay_run = sub {
148 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
149 root 1.97 };
150 root 1.42
151     sub delay($) {
152     push @DELAY_QUEUE, shift;
153 root 1.97 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
154 root 1.42 }
155    
156 root 1.96 =item $AnyEvent::MP::Kernel::SRCNODE
157    
158     During execution of a message callback, this variable contains the node ID
159     of the origin node.
160    
161     The main use of this variable is for debugging output - there are probably
162     very few other cases where you need to know the source node ID.
163    
164     =cut
165    
166 root 1.1 sub _inject {
167 root 1.96 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
168 root 1.1 &{ $PORT{+shift} or return };
169     }
170    
171 root 1.20 # this function adds a node-ref, so you can send stuff to it
172     # it is basically the central routing component.
173 root 1.1 sub add_node {
174 root 1.21 my ($node) = @_;
175 root 1.1
176 root 1.94 length $node
177     or Carp::croak "'undef' or the empty string are not valid node/port IDs";
178 root 1.93
179 root 1.71 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
180 root 1.13 }
181    
182 root 1.1 sub snd(@) {
183 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
184 root 1.1
185 root 1.86 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
186 root 1.1
187 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
188 root 1.2 ->{send} (["$portid", @_]);
189 root 1.1 }
190    
191 root 1.17 =item $is_local = port_is_local $port
192    
193     Returns true iff the port is a local port.
194    
195     =cut
196    
197     sub port_is_local($) {
198 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
199 root 1.17
200 root 1.21 $NODE{$nodeid} == $NODE{""}
201 root 1.17 }
202    
203 root 1.18 =item snd_to_func $node, $func, @args
204 root 1.11
205 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
206 root 1.11 this function with the given arguments on that node.
207    
208 root 1.20 This function can be used to implement C<spawn>-like interfaces.
209 root 1.11
210     =cut
211    
212 root 1.18 sub snd_to_func($$;@) {
213 root 1.21 my $nodeid = shift;
214 root 1.11
215 root 1.41 # on $NODE, we artificially delay... (for spawn)
216     # this is very ugly - maybe we should simply delay ALL messages,
217     # to avoid deep recursion issues. but that's so... slow...
218 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
219     if $nodeid ne $NODE;
220    
221 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
222 root 1.11 }
223    
224 root 1.18 =item snd_on $node, @msg
225    
226     Executes C<snd> with the given C<@msg> (which must include the destination
227     port) on the given node.
228    
229     =cut
230    
231     sub snd_on($@) {
232     my $node = shift;
233     snd $node, snd => @_;
234     }
235    
236 root 1.29 =item eval_on $node, $string[, @reply]
237 root 1.18
238 root 1.29 Evaluates the given string as Perl expression on the given node. When
239     @reply is specified, then it is used to construct a reply message with
240     C<"$@"> and any results from the eval appended.
241 root 1.18
242     =cut
243    
244 root 1.29 sub eval_on($$;@) {
245 root 1.18 my $node = shift;
246     snd $node, eval => @_;
247     }
248    
249 root 1.1 sub kil(@) {
250 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
251 root 1.1
252     length $portid
253 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
254 root 1.1
255 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
256 root 1.1 ->kill ("$portid", @_);
257     }
258    
259     #############################################################################
260 root 1.6 # node monitoring and info
261 root 1.3
262 root 1.21 =item node_is_up $nodeid
263 root 1.13
264     Returns true if the given node is "up", that is, the kernel thinks it has
265     a working connection to it.
266    
267 root 1.95 If the node is up, returns C<1>. If the node is currently connecting or
268     otherwise known but not connected, returns C<0>. If nothing is known about
269     the node, returns C<undef>.
270 root 1.13
271     =cut
272    
273     sub node_is_up($) {
274     ($NODE{$_[0]} or return)->{transport}
275     ? 1 : 0
276     }
277    
278 root 1.3 =item up_nodes
279    
280 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
281     the node itself).
282 root 1.3
283     =cut
284    
285 root 1.49 sub up_nodes() {
286 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
287 root 1.3 }
288    
289 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
290 root 1.3
291 root 1.27 Registers a callback that is called each time a node goes up (a connection
292     is established) or down (the connection is lost).
293 root 1.3
294     Node up messages can only be followed by node down messages for the same
295     node, and vice versa.
296    
297 root 1.71 Note that monitoring a node is usually better done by monitoring its node
298 root 1.27 port. This function is mainly of interest to modules that are concerned
299     about the network topology and low-level connection handling.
300    
301     Callbacks I<must not> block and I<should not> send any messages.
302    
303     The function returns an optional guard which can be used to unregister
304 root 1.3 the monitoring callback again.
305    
306 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
307     or go up (and down).
308    
309     newnode $_, 1 for up_nodes;
310     mon_nodes \&newnode;
311    
312 root 1.3 =cut
313    
314     our %MON_NODES;
315    
316     sub mon_nodes($) {
317     my ($cb) = @_;
318    
319     $MON_NODES{$cb+0} = $cb;
320    
321 root 1.90 defined wantarray
322     and Guard::guard { delete $MON_NODES{$cb+0} }
323 root 1.3 }
324    
325     sub _inject_nodeevent($$;@) {
326 root 1.16 my ($node, $up, @reason) = @_;
327 root 1.3
328     for my $cb (values %MON_NODES) {
329 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
330 root 1.98 or AE::log die => $@;
331 root 1.3 }
332 root 1.16
333 root 1.99 AE::log 7 => "$node->{id} is " . ($up ? "up" : "down") . " (@reason).";
334 root 1.3 }
335    
336     #############################################################################
337 root 1.1 # self node code
338    
339 root 1.67 sub _kill {
340     my $port = shift;
341    
342     delete $PORT{$port}
343     or return; # killing nonexistent ports is O.K.
344     delete $PORT_DATA{$port};
345    
346     my $mon = delete $LMON{$port}
347     or !@_
348 root 1.98 or AE::log die => "unmonitored local port $port died with reason: @_";
349 root 1.67
350     $_->(@_) for values %$mon;
351     }
352    
353     sub _monitor {
354     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
355     unless exists $PORT{$_[1]};
356    
357     $LMON{$_[1]}{$_[2]+0} = $_[2];
358     }
359    
360     sub _unmonitor {
361 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
362     if exists $LMON{$_[1]};
363 root 1.67 }
364    
365 root 1.83 sub _secure_check {
366 root 1.96 &$SECURE
367 root 1.83 or die "remote execution attempt by insecure node\n";
368     }
369    
370 root 1.79 our %NODE_REQ = (
371 root 1.1 # internal services
372    
373     # monitoring
374 root 1.65 mon0 => sub { # stop monitoring a port for another node
375 root 1.1 my $portid = shift;
376 root 1.96 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
377 root 1.1 },
378 root 1.65 mon1 => sub { # start monitoring a port for another node
379 root 1.1 my $portid = shift;
380 root 1.96 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
381 root 1.67 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
382 root 1.58 delete $node->{rmon}{$portid};
383 root 1.65 $node->send (["", kil0 => $portid, @_])
384 root 1.59 if $node && $node->{transport};
385 root 1.67 };
386 root 1.1 },
387 root 1.65 # another node has killed a monitored port
388     kil0 => sub {
389 root 1.96 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
390 root 1.1 or return;
391    
392     $_->(@_) for @$cbs;
393     },
394    
395 root 1.18 # "public" services - not actually public
396 root 1.1
397 root 1.65 # another node wants to kill a local port
398 root 1.66 kil => \&_kill,
399 root 1.65
400 root 1.1 # relay message to another node / generic echo
401 root 1.88 snd => sub {
402     &_secure_check;
403     &snd
404 root 1.1 },
405    
406 root 1.30 # random utilities
407 root 1.1 eval => sub {
408 root 1.83 &_secure_check;
409 root 1.50 my @res = do { package main; eval shift };
410 root 1.1 snd @_, "$@", @res if @_;
411     },
412     time => sub {
413 root 1.88 &_secure_check;
414 root 1.76 snd @_, AE::now;
415 root 1.1 },
416     devnull => sub {
417     #
418     },
419 root 1.15 "" => sub {
420 root 1.27 # empty messages are keepalives or similar devnull-applications
421 root 1.15 },
422 root 1.1 );
423    
424 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
425 root 1.1 $PORT{""} = sub {
426     my $tag = shift;
427 root 1.83 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
428 root 1.98 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
429 root 1.1 };
430    
431 root 1.84 our $NPROTO = 1;
432    
433     # tell everybody who connects our nproto
434     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
435     $_[0]{local_greeting}{nproto} = $NPROTO;
436     };
437    
438 root 1.69 #############################################################################
439 root 1.71 # seed management, try to keep connections to all seeds at all times
440 root 1.69
441 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
442     our %NODE_SEED; # map node ID to seed ID
443 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
444     our $SEED_WATCHER;
445 root 1.71 our $SEED_RETRY;
446 root 1.69
447     sub seed_connect {
448     my ($seed) = @_;
449    
450     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
451     or Carp::croak "$seed: unparsable seed address";
452    
453 root 1.98 AE::log 9 => "trying connect to seed node $seed.";
454 root 1.69
455 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
456     $host, $port,
457     on_greeted => sub {
458     # called after receiving remote greeting, learn remote node name
459    
460     # we rely on untrusted data here (the remote node name) this is
461     # hopefully ok, as this can at most be used for DOSing, which is easy
462     # when you can do MITM anyway.
463    
464     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
465     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
466 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
467 root 1.71 delete $SEED_NODE{$seed};
468     } else {
469     $SEED_NODE{$seed} = $_[0]{remote_node};
470     $NODE_SEED{$_[0]{remote_node}} = $seed;
471 root 1.93 # also start global service, if not running
472     # we need to check here in addition to the mon_nodes below
473     # because we might only learn late that a node is a seed
474     # and then we might already be connected
475     snd $_[0]{remote_node}, "g_slave"
476     unless $_[0]{remote_greeting}{global};
477 root 1.71 }
478     },
479 root 1.93 sub {
480 root 1.71 delete $SEED_CONNECT{$seed};
481     }
482 root 1.69 ;
483     }
484    
485     sub seed_all {
486 root 1.93 my @seeds = grep
487     !exists $SEED_CONNECT{$_}
488     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
489     keys %SEED_NODE;
490 root 1.69
491     if (@seeds) {
492 root 1.70 # start connection attempt for every seed we are not connected to yet
493 root 1.69 seed_connect $_
494     for @seeds;
495 root 1.71
496     $SEED_RETRY = $SEED_RETRY * 2 + rand;
497     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
498     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
499    
500     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
501    
502 root 1.69 } else {
503 root 1.71 # all seeds connected or connecting, no need to restart timer
504 root 1.69 undef $SEED_WATCHER;
505     }
506     }
507    
508     sub seed_again {
509 root 1.71 $SEED_RETRY = 1;
510     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
511 root 1.69 }
512    
513     # sets new seed list, starts connecting
514     sub set_seeds(@) {
515     %SEED_NODE = ();
516 root 1.71 %NODE_SEED = ();
517     %SEED_CONNECT = ();
518    
519 root 1.69 @SEED_NODE{@_} = ();
520    
521 root 1.71 seed_all;
522     }
523    
524     mon_nodes sub {
525 root 1.93 return unless exists $NODE_SEED{$_[0]};
526    
527     if ($_[1]) {
528     # each time a connection to a seed node goes up, make
529     # sure it runs the global service.
530     snd $_[0], "g_slave"
531     unless $NODE{$_[0]}{transport}{remote_greeting}{global};
532     } else {
533     # if we lost the connection to a seed node, make sure we are seeding
534     seed_again;
535     }
536 root 1.71 };
537    
538     #############################################################################
539     # talk with/to global nodes
540    
541 root 1.72 # protocol messages:
542     #
543 root 1.73 # sent by all slave nodes (slave to master)
544     # g_slave database - make other global node master of the sender
545 root 1.72 #
546 root 1.73 # sent by any node to global nodes
547     # g_set database - set whole database
548 root 1.89 # g_upd family set del - update single family
549 root 1.73 # g_del family key - delete key from database
550     # g_get family key reply... - send reply with data
551     #
552     # send by global nodes
553     # g_global - node became global, similar to global=1 greeting
554     #
555     # database families
556     # "'l" -> node -> listeners
557     # "'g" -> node -> undef
558     # ...
559 root 1.72 #
560    
561 root 1.73 # used on all nodes:
562 root 1.88 our $MASTER; # the global node we bind ourselves to
563 root 1.78 our $MASTER_MON;
564     our %LOCAL_DB; # this node database
565    
566     our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
567 root 1.71
568 root 1.84 our $GPROTO = 1;
569    
570     # tell everybody who connects our nproto
571     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
572     $_[0]{local_greeting}{gproto} = $GPROTO;
573     };
574    
575 root 1.73 #############################################################################
576     # master selection
577    
578     # master requests
579     our %GLOBAL_REQ; # $id => \@req
580 root 1.71
581 root 1.74 sub global_req_add {
582 root 1.80 my ($id, $req) = @_;
583 root 1.71
584 root 1.74 return if exists $GLOBAL_REQ{$id};
585    
586 root 1.80 $GLOBAL_REQ{$id} = $req;
587 root 1.71
588 root 1.80 snd $MASTER, @$req
589 root 1.73 if $MASTER;
590 root 1.74 }
591 root 1.71
592 root 1.74 sub global_req_del {
593     delete $GLOBAL_REQ{$_[0]};
594     }
595    
596 root 1.88 #################################
597     # master rpc
598    
599     our %GLOBAL_RES;
600     our $GLOBAL_RES_ID = "a";
601    
602     sub global_call {
603     my $id = ++$GLOBAL_RES_ID;
604     $GLOBAL_RES{$id} = pop;
605     global_req_add $id, [@_, $id];
606     }
607    
608     $NODE_REQ{g_reply} = sub {
609     my $id = shift;
610     global_req_del $id;
611     my $cb = delete $GLOBAL_RES{$id}
612     or return;
613     &$cb
614     };
615    
616     #################################
617    
618 root 1.74 sub g_find {
619 root 1.80 global_req_add "g_find $_[0]", [g_find => $_[0]];
620 root 1.73 }
621 root 1.71
622 root 1.73 # reply for g_find started in Node.pm
623 root 1.79 $NODE_REQ{g_found} = sub {
624 root 1.74 global_req_del "g_find $_[0]";
625    
626 root 1.73 my $node = $NODE{$_[0]} or return;
627 root 1.71
628 root 1.79 $node->connect_to ($_[1]);
629 root 1.71 };
630    
631 root 1.73 sub master_set {
632     $MASTER = $_[0];
633 root 1.71
634 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
635 root 1.71
636 root 1.73 # (re-)send queued requests
637     snd $MASTER, @$_
638     for values %GLOBAL_REQ;
639     }
640 root 1.71
641 root 1.72 sub master_search {
642 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
643     for (keys %NODE_SEED) {
644 root 1.72 if (node_is_up $_) {
645     master_set $_;
646     return;
647 root 1.71 }
648 root 1.72 }
649 root 1.71
650 root 1.78 $MASTER_MON = mon_nodes sub {
651 root 1.72 return unless $_[1]; # we are only interested in node-ups
652     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
653 root 1.71
654 root 1.72 master_set $_[0];
655 root 1.71
656 root 1.78 $MASTER_MON = mon_nodes sub {
657 root 1.72 if ($_[0] eq $MASTER && !$_[1]) {
658     undef $MASTER;
659     master_search ();
660     }
661 root 1.71 };
662 root 1.72 };
663 root 1.71 }
664    
665 root 1.73 # other node wants to make us the master
666 root 1.79 $NODE_REQ{g_slave} = sub {
667 root 1.73 my ($db) = @_;
668    
669 root 1.80 # load global module and redo the request
670 root 1.73 require AnyEvent::MP::Global;
671 root 1.80 &{ $NODE_REQ{g_slave} }
672 root 1.71 };
673    
674 root 1.73 #############################################################################
675 root 1.79 # local database operations
676 root 1.71
677 root 1.79 # local database management
678 root 1.88
679 root 1.87 sub db_set($$;$) {
680 root 1.97 my ($family, $subkey) = @_;
681    
682 root 1.89 # if (ref $_[1]) {
683     # # bulk
684     # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
685     # $LOCAL_DB{$_[0]} = $_[1];
686     # snd $MASTER, g_upd => $_[0] => $_[1], \@del
687     # if defined $MASTER;
688     # } else {
689     # single-key
690 root 1.97 $LOCAL_DB{$family}{$subkey} = $_[2];
691     snd $MASTER, g_upd => $family => { $subkey => $_[2] }
692 root 1.89 if defined $MASTER;
693     # }
694 root 1.97
695     defined wantarray
696     and Guard::guard { db_del $family => $subkey }
697 root 1.79 }
698    
699 root 1.89 sub db_del($@) {
700     my $family = shift;
701    
702     delete @{ $LOCAL_DB{$family} }{@_};
703     snd $MASTER, g_upd => $family => undef, \@_
704 root 1.79 if defined $MASTER;
705     }
706    
707 root 1.88 # database query
708    
709     sub db_family {
710     my ($family, $cb) = @_;
711     global_call g_db_family => $family, $cb;
712     }
713    
714     sub db_keys {
715     my ($family, $cb) = @_;
716     global_call g_db_keys => $family, $cb;
717     }
718    
719     sub db_values {
720     my ($family, $cb) = @_;
721     global_call g_db_values => $family, $cb;
722 root 1.80 }
723    
724 root 1.88 # database monitoring
725 root 1.80
726 root 1.81 our %LOCAL_MON; # f, reply
727     our %MON_DB; # f, k, value
728 root 1.80
729 root 1.81 sub db_mon($@) {
730 root 1.84 my ($family, $cb) = @_;
731 root 1.81
732 root 1.84 if (my $db = $MON_DB{$family}) {
733 root 1.89 # we already monitor, so create a "dummy" change event
734     # this is postponed, which might be too late (we could process
735     # change events), so disable the callback at first
736     $LOCAL_MON{$family}{$cb+0} = sub { };
737     AE::postpone {
738     return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
739    
740     # set actual callback
741     $LOCAL_MON{$family}{$cb+0} = $cb;
742     $cb->($db, [keys %$db]);
743     };
744 root 1.81 } else {
745     # new monitor, request chg1 from upstream
746 root 1.89 $LOCAL_MON{$family}{$cb+0} = $cb;
747 root 1.81 global_req_add "mon1 $family" => [g_mon1 => $family];
748     $MON_DB{$family} = {};
749     }
750    
751 root 1.90 defined wantarray
752     and Guard::guard {
753     my $mon = $LOCAL_MON{$family};
754     delete $mon->{$cb+0};
755    
756     unless (%$mon) {
757     global_req_del "mon1 $family";
758    
759     # no global_req, because we don't care if we are not connected
760     snd $MASTER, g_mon0 => $family
761     if $MASTER;
762 root 1.80
763 root 1.90 delete $LOCAL_MON{$family};
764     delete $MON_DB{$family};
765     }
766 root 1.80 }
767     }
768    
769 root 1.82 # full update
770 root 1.80 $NODE_REQ{g_chg1} = sub {
771 root 1.96 return unless $SRCNODE eq $MASTER;
772 root 1.84 my ($f, $ndb) = @_;
773    
774     my $db = $MON_DB{$f};
775 root 1.89 my (@a, @c, @d);
776 root 1.81
777 root 1.82 # add or replace keys
778 root 1.84 while (my ($k, $v) = each %$ndb) {
779 root 1.89 exists $db->{$k}
780     ? push @c, $k
781     : push @a, $k;
782 root 1.84 $db->{$k} = $v;
783 root 1.82 }
784 root 1.81
785 root 1.82 # delete keys that are no longer present
786 root 1.84 for (grep !exists $ndb->{$_}, keys %$db) {
787     delete $db->{$_};
788 root 1.89 push @d, $_;
789 root 1.81 }
790 root 1.84
791 root 1.89 $_->($db, \@a, \@c, \@d)
792 root 1.84 for values %{ $LOCAL_MON{$_[0]} };
793 root 1.80 };
794    
795 root 1.82 # incremental update
796 root 1.84 $NODE_REQ{g_chg2} = sub {
797 root 1.96 return unless $SRCNODE eq $MASTER;
798 root 1.89 my ($family, $set, $del) = @_;
799    
800     my $db = $MON_DB{$family};
801 root 1.84
802 root 1.89 my (@a, @c);
803 root 1.84
804 root 1.89 while (my ($k, $v) = each %$set) {
805     exists $db->{$k}
806     ? push @c, $k
807     : push @a, $k;
808     $db->{$k} = $v;
809     }
810    
811     delete @$db{@$del};
812    
813     $_->($db, \@a, \@c, $del)
814     for values %{ $LOCAL_MON{$family} };
815 root 1.84 };
816 root 1.80
817 root 1.69 #############################################################################
818     # configure
819    
820 root 1.81 sub nodename {
821 root 1.69 require POSIX;
822     (POSIX::uname ())[1]
823     }
824    
825     sub _resolve($) {
826     my ($nodeid) = @_;
827    
828     my $cv = AE::cv;
829     my @res;
830    
831     $cv->begin (sub {
832     my %seen;
833     my @refs;
834     for (sort { $a->[0] <=> $b->[0] } @res) {
835     push @refs, $_->[1] unless $seen{$_->[1]}++
836     }
837     shift->send (@refs);
838     });
839    
840     my $idx;
841     for my $t (split /,/, $nodeid) {
842     my $pri = ++$idx;
843    
844 root 1.81 $t = length $t ? nodename . ":$t" : nodename
845 root 1.69 if $t =~ /^\d*$/;
846    
847     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
848     or Carp::croak "$t: unparsable transport descriptor";
849    
850     $port = "0" if $port eq "*";
851    
852     if ($host eq "*") {
853     $cv->begin;
854     # use fork_call, as Net::Interface is big, and we need it rarely.
855     require AnyEvent::Util;
856     AnyEvent::Util::fork_call (
857     sub {
858     my @addr;
859    
860     require Net::Interface;
861    
862     for my $if (Net::Interface->interfaces) {
863     # we statically lower-prioritise ipv6 here, TODO :()
864     for $_ ($if->address (Net::Interface::AF_INET ())) {
865     next if /^\x7f/; # skip localhost etc.
866     push @addr, $_;
867     }
868     for ($if->address (Net::Interface::AF_INET6 ())) {
869     #next if $if->scope ($_) <= 2;
870     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
871     push @addr, $_;
872     }
873    
874     }
875     @addr
876     }, sub {
877     for my $ip (@_) {
878     push @res, [
879     $pri += 1e-5,
880     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
881     ];
882     }
883     $cv->end;
884     }
885     );
886     } else {
887     $cv->begin;
888     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
889     for (@_) {
890     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
891     push @res, [
892     $pri += 1e-5,
893     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
894     ];
895     }
896     $cv->end;
897     };
898     }
899     }
900    
901     $cv->end;
902    
903     $cv
904     }
905    
906     sub configure(@) {
907     unshift @_, "profile" if @_ & 1;
908     my (%kv) = @_;
909    
910     delete $NODE{$NODE}; # we do not support doing stuff before configure
911     _init_names;
912    
913     my $profile = delete $kv{profile};
914    
915 root 1.81 $profile = nodename
916 root 1.69 unless defined $profile;
917    
918     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
919    
920 root 1.83 if (exists $CONFIG->{secure}) {
921 root 1.100 $SECURE = eval +($CONFIG->{secure} ? "sub { 0 }" : "sub { 1 }");
922 root 1.83 }
923    
924 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
925 root 1.69
926     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
927    
928 root 1.72 $NODE = $node;
929 root 1.77
930 root 1.81 $NODE =~ s/%n/nodename/ge;
931    
932     if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
933 root 1.77 # nodes with randomised node names do not need randomised port names
934     $UNIQ = "";
935     }
936 root 1.69
937     $NODE{$NODE} = $NODE{""};
938     $NODE{$NODE}{id} = $NODE;
939    
940     my $seeds = $CONFIG->{seeds};
941     my $binds = $CONFIG->{binds};
942    
943     $binds ||= ["*"];
944    
945 root 1.98 AE::log 8 => "node $NODE starting up.";
946 root 1.69
947 root 1.85 $BINDS = [];
948     %BINDS = ();
949 root 1.69
950     for (map _resolve $_, @$binds) {
951     for my $bind ($_->recv) {
952     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
953     or Carp::croak "$bind: unparsable local bind address";
954    
955     my $listener = AnyEvent::MP::Transport::mp_server
956     $host,
957     $port,
958     prepare => sub {
959     my (undef, $host, $port) = @_;
960     $bind = AnyEvent::Socket::format_hostport $host, $port;
961     0
962     },
963     ;
964 root 1.85 $BINDS{$bind} = $listener;
965     push @$BINDS, $bind;
966 root 1.69 }
967     }
968    
969 root 1.85 db_set "'l" => $NODE => $BINDS;
970 root 1.73
971 root 1.98 AE::log 8 => "node listens on [@$BINDS].";
972 root 1.69
973     # connect to all seednodes
974     set_seeds map $_->recv, map _resolve $_, @$seeds;
975    
976 root 1.73 master_search;
977    
978 root 1.69 for (@{ $CONFIG->{services} }) {
979     if (ref) {
980     my ($func, @args) = @$_;
981     (load_func $func)->(@args);
982     } elsif (s/::$//) {
983     eval "require $_";
984     die $@ if $@;
985     } else {
986     (load_func $_)->();
987     }
988     }
989     }
990    
991 root 1.1 =back
992    
993     =head1 SEE ALSO
994    
995     L<AnyEvent::MP>.
996    
997     =head1 AUTHOR
998    
999     Marc Lehmann <schmorp@schmorp.de>
1000     http://home.schmorp.de/
1001    
1002     =cut
1003    
1004     1
1005