ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.82
Committed: Sat Mar 3 19:55:56 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.81: +8 -6 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29    
30 root 1.79 use AnyEvent ();
31     use Guard ();
32 root 1.1
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.71 our @EXPORT_OK = qw(
39     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40     );
41    
42 root 1.1 our @EXPORT = qw(
43 root 1.21 add_node load_func snd_to_func snd_on eval_on
44 root 1.1
45 root 1.34 NODE $NODE node_of snd kil port_is_local
46     configure
47 root 1.46 up_nodes mon_nodes node_is_up
48 root 1.79 db_set db_del db_reg
49 root 1.1 );
50    
51 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
52 root 1.1
53 root 1.27 This value is called with an error or warning message, when e.g. a
54     connection could not be created, authorisation failed and so on.
55    
56     It I<must not> block or send messages -queue it and use an idle watcher if
57     you need to do any of these things.
58 root 1.1
59 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
60 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
61     node connectivity and services, C<8> for debugging messages and C<9> for
62     tracing messages.
63    
64 root 1.1 The default simply logs the message to STDERR.
65    
66 root 1.44 =item @AnyEvent::MP::Kernel::WARN
67    
68     All code references in this array are called for every log message, from
69     the default C<$WARN> handler. This is an easy way to tie into the log
70     messages without disturbing others.
71    
72 root 1.1 =cut
73    
74 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
75 root 1.44 our @WARN;
76     our $WARN = sub {
77     &$_ for @WARN;
78 root 1.29
79     return if $WARNLEVEL < $_[0];
80    
81 root 1.16 my ($level, $msg) = @_;
82    
83 root 1.1 $msg =~ s/\n$//;
84 root 1.16
85     printf STDERR "%s <%d> %s\n",
86     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
87     $level,
88     $msg;
89 root 1.1 };
90    
91 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
92    
93     The maximum level at which warning messages will be printed to STDERR by
94     the default warn handler.
95    
96     =cut
97    
98 root 1.6 sub load_func($) {
99     my $func = $_[0];
100    
101     unless (defined &$func) {
102     my $pkg = $func;
103     do {
104     $pkg =~ s/::[^:]+$//
105 root 1.63 or return sub { die "unable to resolve function '$func'" };
106 root 1.60
107     local $@;
108 root 1.61 unless (eval "require $pkg; 1") {
109     my $error = $@;
110     $error =~ /^Can't locate .*.pm in \@INC \(/
111     or return sub { die $error };
112     }
113 root 1.6 } until defined &$func;
114     }
115    
116     \&$func
117     }
118    
119 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
120    
121 root 1.1 sub nonce($) {
122 root 1.78 join "", map chr rand 256, 1 .. $_[0]
123 root 1.1 }
124    
125 root 1.78 sub nonce62($) {
126     join "", map $alnum[rand 62], 1 .. $_[0]
127 root 1.1 }
128    
129     sub gen_uniq {
130 root 1.78 my $now = AE::now;
131     (join "",
132     map $alnum[$_],
133     $$ / 62 % 62,
134     $$ % 62,
135     (int $now ) % 62,
136     (int $now * 100) % 62,
137     (int $now * 10000) % 62,
138     ) . nonce62 4;
139 root 1.1 }
140    
141 root 1.20 our $CONFIG; # this node's configuration
142 root 1.21
143 root 1.64 our $RUNIQ; # remote uniq value
144     our $UNIQ; # per-process/node unique cookie
145     our $NODE;
146     our $ID = "a";
147 root 1.1
148     our %NODE; # node id to transport mapping, or "undef", for local node
149     our (%PORT, %PORT_DATA); # local ports
150    
151 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
152 root 1.1 our %LMON; # monitored _local_ ports
153    
154 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
155 root 1.1 our %LISTENER;
156 root 1.21 our $LISTENER; # our listeners, as arrayref
157 root 1.1
158 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
159 root 1.1
160 root 1.69 sub _init_names {
161 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
162     $UNIQ = gen_uniq;
163    
164     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
165     $RUNIQ = nonce62 10;
166     $RUNIQ =~ s/(.)$/\U$1/;
167    
168     $NODE = "anon/$RUNIQ";
169 root 1.64 }
170    
171 root 1.69 _init_names;
172 root 1.64
173 root 1.1 sub NODE() {
174     $NODE
175     }
176    
177     sub node_of($) {
178 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
179 root 1.1
180 root 1.21 $node
181 root 1.1 }
182    
183 root 1.17 BEGIN {
184     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
185     ? sub () { 1 }
186     : sub () { 0 };
187     }
188 root 1.1
189 root 1.42 our $DELAY_TIMER;
190     our @DELAY_QUEUE;
191    
192     sub _delay_run {
193 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
194 root 1.42 }
195    
196     sub delay($) {
197     push @DELAY_QUEUE, shift;
198     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
199     }
200    
201 root 1.1 sub _inject {
202 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
203 root 1.1 &{ $PORT{+shift} or return };
204     }
205    
206 root 1.20 # this function adds a node-ref, so you can send stuff to it
207     # it is basically the central routing component.
208 root 1.1 sub add_node {
209 root 1.21 my ($node) = @_;
210 root 1.1
211 root 1.71 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
212 root 1.13 }
213    
214 root 1.1 sub snd(@) {
215 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
216 root 1.1
217 root 1.48 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
218 root 1.1
219 root 1.49 defined $nodeid #d#UGLY
220     or Carp::croak "'undef' is not a valid node ID/port ID";
221    
222 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
223 root 1.2 ->{send} (["$portid", @_]);
224 root 1.1 }
225    
226 root 1.17 =item $is_local = port_is_local $port
227    
228     Returns true iff the port is a local port.
229    
230     =cut
231    
232     sub port_is_local($) {
233 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
234 root 1.17
235 root 1.21 $NODE{$nodeid} == $NODE{""}
236 root 1.17 }
237    
238 root 1.18 =item snd_to_func $node, $func, @args
239 root 1.11
240 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
241 root 1.11 this function with the given arguments on that node.
242    
243 root 1.20 This function can be used to implement C<spawn>-like interfaces.
244 root 1.11
245     =cut
246    
247 root 1.18 sub snd_to_func($$;@) {
248 root 1.21 my $nodeid = shift;
249 root 1.11
250 root 1.41 # on $NODE, we artificially delay... (for spawn)
251     # this is very ugly - maybe we should simply delay ALL messages,
252     # to avoid deep recursion issues. but that's so... slow...
253 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
254     if $nodeid ne $NODE;
255    
256 root 1.49 defined $nodeid #d#UGLY
257     or Carp::croak "'undef' is not a valid node ID/port ID";
258    
259 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
260 root 1.11 }
261    
262 root 1.18 =item snd_on $node, @msg
263    
264     Executes C<snd> with the given C<@msg> (which must include the destination
265     port) on the given node.
266    
267     =cut
268    
269     sub snd_on($@) {
270     my $node = shift;
271     snd $node, snd => @_;
272     }
273    
274 root 1.29 =item eval_on $node, $string[, @reply]
275 root 1.18
276 root 1.29 Evaluates the given string as Perl expression on the given node. When
277     @reply is specified, then it is used to construct a reply message with
278     C<"$@"> and any results from the eval appended.
279 root 1.18
280     =cut
281    
282 root 1.29 sub eval_on($$;@) {
283 root 1.18 my $node = shift;
284     snd $node, eval => @_;
285     }
286    
287 root 1.1 sub kil(@) {
288 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
289 root 1.1
290     length $portid
291 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
292 root 1.1
293 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
294 root 1.1 ->kill ("$portid", @_);
295     }
296    
297     #############################################################################
298 root 1.6 # node monitoring and info
299 root 1.3
300 root 1.21 =item node_is_known $nodeid
301 root 1.13
302 root 1.71 #TODO#
303     Returns true iff the given node is currently known to this node.
304 root 1.13
305     =cut
306    
307     sub node_is_known($) {
308     exists $NODE{$_[0]}
309     }
310    
311 root 1.21 =item node_is_up $nodeid
312 root 1.13
313     Returns true if the given node is "up", that is, the kernel thinks it has
314     a working connection to it.
315    
316 root 1.69 If the node is known (to this local node) but not currently connected,
317     returns C<0>. If the node is not known, returns C<undef>.
318 root 1.13
319     =cut
320    
321     sub node_is_up($) {
322     ($NODE{$_[0]} or return)->{transport}
323     ? 1 : 0
324     }
325    
326 root 1.3 =item up_nodes
327    
328 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
329     the node itself).
330 root 1.3
331     =cut
332    
333 root 1.49 sub up_nodes() {
334 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
335 root 1.3 }
336    
337 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
338 root 1.3
339 root 1.27 Registers a callback that is called each time a node goes up (a connection
340     is established) or down (the connection is lost).
341 root 1.3
342     Node up messages can only be followed by node down messages for the same
343     node, and vice versa.
344    
345 root 1.71 Note that monitoring a node is usually better done by monitoring its node
346 root 1.27 port. This function is mainly of interest to modules that are concerned
347     about the network topology and low-level connection handling.
348    
349     Callbacks I<must not> block and I<should not> send any messages.
350    
351     The function returns an optional guard which can be used to unregister
352 root 1.3 the monitoring callback again.
353    
354 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
355     or go up (and down).
356    
357     newnode $_, 1 for up_nodes;
358     mon_nodes \&newnode;
359    
360 root 1.3 =cut
361    
362     our %MON_NODES;
363    
364     sub mon_nodes($) {
365     my ($cb) = @_;
366    
367     $MON_NODES{$cb+0} = $cb;
368    
369 root 1.79 defined wantarray && Guard::guard { delete $MON_NODES{$cb+0} }
370 root 1.3 }
371    
372     sub _inject_nodeevent($$;@) {
373 root 1.16 my ($node, $up, @reason) = @_;
374 root 1.3
375     for my $cb (values %MON_NODES) {
376 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
377 root 1.16 or $WARN->(1, $@);
378 root 1.3 }
379 root 1.16
380 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
381 root 1.3 }
382    
383     #############################################################################
384 root 1.1 # self node code
385    
386 root 1.67 sub _kill {
387     my $port = shift;
388    
389     delete $PORT{$port}
390     or return; # killing nonexistent ports is O.K.
391     delete $PORT_DATA{$port};
392    
393     my $mon = delete $LMON{$port}
394     or !@_
395     or $WARN->(2, "unmonitored local port $port died with reason: @_");
396    
397     $_->(@_) for values %$mon;
398     }
399    
400     sub _monitor {
401     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
402     unless exists $PORT{$_[1]};
403    
404     $LMON{$_[1]}{$_[2]+0} = $_[2];
405     }
406    
407     sub _unmonitor {
408 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
409     if exists $LMON{$_[1]};
410 root 1.67 }
411    
412 root 1.79 our %NODE_REQ = (
413 root 1.1 # internal services
414    
415     # monitoring
416 root 1.65 mon0 => sub { # stop monitoring a port for another node
417 root 1.1 my $portid = shift;
418 root 1.67 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
419 root 1.1 },
420 root 1.65 mon1 => sub { # start monitoring a port for another node
421 root 1.1 my $portid = shift;
422 root 1.67 Scalar::Util::weaken (my $node = $SRCNODE);
423     _monitor undef, $portid, $node->{rmon}{$portid} = sub {
424 root 1.58 delete $node->{rmon}{$portid};
425 root 1.65 $node->send (["", kil0 => $portid, @_])
426 root 1.59 if $node && $node->{transport};
427 root 1.67 };
428 root 1.1 },
429 root 1.65 # another node has killed a monitored port
430     kil0 => sub {
431 root 1.1 my $cbs = delete $SRCNODE->{lmon}{+shift}
432     or return;
433    
434     $_->(@_) for @$cbs;
435     },
436    
437 root 1.18 # "public" services - not actually public
438 root 1.1
439 root 1.65 # another node wants to kill a local port
440 root 1.66 kil => \&_kill,
441 root 1.65
442 root 1.1 # relay message to another node / generic echo
443 root 1.15 snd => \&snd,
444 root 1.27 snd_multiple => sub {
445 root 1.1 snd @$_ for @_
446     },
447    
448 root 1.30 # random utilities
449 root 1.1 eval => sub {
450 root 1.76 #d#SECURE
451 root 1.50 my @res = do { package main; eval shift };
452 root 1.1 snd @_, "$@", @res if @_;
453     },
454     time => sub {
455 root 1.76 snd @_, AE::now;
456 root 1.1 },
457     devnull => sub {
458     #
459     },
460 root 1.15 "" => sub {
461 root 1.27 # empty messages are keepalives or similar devnull-applications
462 root 1.15 },
463 root 1.1 );
464    
465 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
466 root 1.1 $PORT{""} = sub {
467     my $tag = shift;
468 root 1.76 #d#SECURE (load_func)
469 root 1.79 eval { &{ $NODE_REQ{$tag} ||= load_func $tag } };
470 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
471 root 1.1 };
472    
473 root 1.69 #############################################################################
474 root 1.71 # seed management, try to keep connections to all seeds at all times
475 root 1.69
476 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
477     our %NODE_SEED; # map node ID to seed ID
478 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
479     our $SEED_WATCHER;
480 root 1.71 our $SEED_RETRY;
481 root 1.69
482     sub seed_connect {
483     my ($seed) = @_;
484    
485     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
486     or Carp::croak "$seed: unparsable seed address";
487    
488     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
489    
490 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
491     $host, $port,
492     on_greeted => sub {
493     # called after receiving remote greeting, learn remote node name
494    
495     # we rely on untrusted data here (the remote node name) this is
496     # hopefully ok, as this can at most be used for DOSing, which is easy
497     # when you can do MITM anyway.
498    
499     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
500     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
501 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
502 root 1.71 delete $SEED_NODE{$seed};
503     delete $NODE_SEED{$seed};
504     } else {
505     $SEED_NODE{$seed} = $_[0]{remote_node};
506     $NODE_SEED{$_[0]{remote_node}} = $seed;
507     }
508     },
509     on_destroy => sub {
510     delete $SEED_CONNECT{$seed};
511     },
512 root 1.69 sub {
513     $SEED_CONNECT{$seed} = 1;
514 root 1.71 }
515 root 1.69 ;
516     }
517    
518     sub seed_all {
519     my @seeds = grep {
520     !exists $SEED_CONNECT{$_}
521     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
522     } keys %SEED_NODE;
523    
524     if (@seeds) {
525 root 1.70 # start connection attempt for every seed we are not connected to yet
526 root 1.69 seed_connect $_
527     for @seeds;
528 root 1.71
529     $SEED_RETRY = $SEED_RETRY * 2 + rand;
530     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
531     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
532    
533     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
534    
535 root 1.69 } else {
536 root 1.71 # all seeds connected or connecting, no need to restart timer
537 root 1.69 undef $SEED_WATCHER;
538     }
539     }
540    
541     sub seed_again {
542 root 1.71 $SEED_RETRY = 1;
543     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
544 root 1.69 }
545    
546     # sets new seed list, starts connecting
547     sub set_seeds(@) {
548     %SEED_NODE = ();
549 root 1.71 %NODE_SEED = ();
550     %SEED_CONNECT = ();
551    
552 root 1.69 @SEED_NODE{@_} = ();
553    
554 root 1.71 seed_again;#d#
555     seed_all;
556     }
557    
558     mon_nodes sub {
559     # if we lost the connection to a seed node, make sure we are seeding
560     seed_again
561     if !$_[1] && exists $NODE_SEED{$_[0]};
562     };
563    
564     #############################################################################
565     # talk with/to global nodes
566    
567 root 1.72 # protocol messages:
568     #
569 root 1.73 # sent by all slave nodes (slave to master)
570     # g_slave database - make other global node master of the sender
571 root 1.72 #
572 root 1.73 # sent by any node to global nodes
573     # g_set database - set whole database
574     # g_add family key val - add/replace key to database
575     # g_del family key - delete key from database
576     # g_get family key reply... - send reply with data
577     #
578     # send by global nodes
579     # g_global - node became global, similar to global=1 greeting
580     #
581     # database families
582     # "'l" -> node -> listeners
583     # "'g" -> node -> undef
584     # ...
585 root 1.72 #
586    
587 root 1.73 # used on all nodes:
588 root 1.78 our $MASTER; # the global node we bind ourselves to, unless we are global ourselves
589     our $MASTER_MON;
590     our %LOCAL_DB; # this node database
591    
592     our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
593 root 1.71
594 root 1.73 #############################################################################
595     # master selection
596    
597     # master requests
598     our %GLOBAL_REQ; # $id => \@req
599 root 1.71
600 root 1.74 sub global_req_add {
601 root 1.80 my ($id, $req) = @_;
602 root 1.71
603 root 1.74 return if exists $GLOBAL_REQ{$id};
604    
605 root 1.80 $GLOBAL_REQ{$id} = $req;
606 root 1.71
607 root 1.80 snd $MASTER, @$req
608 root 1.73 if $MASTER;
609 root 1.74 }
610 root 1.71
611 root 1.74 sub global_req_del {
612     delete $GLOBAL_REQ{$_[0]};
613     }
614    
615     sub g_find {
616 root 1.80 global_req_add "g_find $_[0]", [g_find => $_[0]];
617 root 1.73 }
618 root 1.71
619 root 1.73 # reply for g_find started in Node.pm
620 root 1.79 $NODE_REQ{g_found} = sub {
621 root 1.74 global_req_del "g_find $_[0]";
622    
623 root 1.73 my $node = $NODE{$_[0]} or return;
624 root 1.71
625 root 1.79 $node->connect_to ($_[1]);
626 root 1.71 };
627    
628 root 1.73 sub master_set {
629     $MASTER = $_[0];
630 root 1.71
631 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
632 root 1.71
633 root 1.73 # (re-)send queued requests
634     snd $MASTER, @$_
635     for values %GLOBAL_REQ;
636     }
637 root 1.71
638 root 1.72 sub master_search {
639 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
640     for (keys %NODE_SEED) {
641 root 1.72 if (node_is_up $_) {
642     master_set $_;
643     return;
644 root 1.71 }
645 root 1.72 }
646 root 1.71
647 root 1.78 $MASTER_MON = mon_nodes sub {
648 root 1.72 return unless $_[1]; # we are only interested in node-ups
649     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
650 root 1.71
651 root 1.72 master_set $_[0];
652 root 1.71
653 root 1.78 $MASTER_MON = mon_nodes sub {
654 root 1.72 if ($_[0] eq $MASTER && !$_[1]) {
655     undef $MASTER;
656     master_search ();
657     }
658 root 1.71 };
659 root 1.72 };
660 root 1.71 }
661    
662 root 1.73 # other node wants to make us the master
663 root 1.79 $NODE_REQ{g_slave} = sub {
664 root 1.73 my ($db) = @_;
665    
666 root 1.80 # load global module and redo the request
667 root 1.73 require AnyEvent::MP::Global;
668 root 1.80 &{ $NODE_REQ{g_slave} }
669 root 1.71 };
670    
671 root 1.73 #############################################################################
672 root 1.79 # local database operations
673 root 1.71
674 root 1.79 # local database management
675     sub db_set($$$) {
676     $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
677     snd $MASTER, g_add => $_[0] => $_[1] => $_[2]
678     if defined $MASTER;
679     }
680    
681     sub db_del($$) {
682     delete $LOCAL_DB{$_[0]}{$_[1]};
683     snd $MASTER, g_del => $_[0] => $_[1]
684     if defined $MASTER;
685     }
686    
687     sub db_reg($$;$) {
688     my ($family, $key) = @_;
689     &db_set;
690     Guard::guard { db_del $family => $key }
691     }
692 root 1.71
693 root 1.80 sub db_keys($$$) {
694     #d#
695     }
696    
697     #d# db_values
698     #d# db_family
699     #d# db_key
700    
701 root 1.81 our %LOCAL_MON; # f, reply
702     our %MON_DB; # f, k, value
703 root 1.80
704 root 1.81 sub g_chg {
705     my $f = shift;
706 root 1.80
707 root 1.81 $#_ ? $MON_DB{$f}{$_[0]} = $_[1]
708     : delete $MON_DB{$f}{$_[0]};
709 root 1.80
710 root 1.81 &{ $_->[0] }
711     for values %{ $LOCAL_MON{$f} };
712     }
713    
714     sub db_mon($@) {
715     my ($family, @reply) = @_;
716 root 1.80
717 root 1.81 my $reply = \@reply;
718     my $id = $reply + 0;
719    
720     if (%{ $LOCAL_MON{$family} }) {
721     # if we already monitor this thingy, generate
722     # create events for all of them
723     while (my ($key, $value) = each %{ $MON_DB{$family} }) {
724     $reply->[0]->($key, $value);
725     }
726     } else {
727     # new monitor, request chg1 from upstream
728     global_req_add "mon1 $family" => [g_mon1 => $family];
729     $MON_DB{$family} = {};
730     }
731    
732     $LOCAL_MON{$family}{$id} = \@reply;
733 root 1.80
734     Guard::guard {
735 root 1.81 my $mon = $LOCAL_MON{$family};
736     delete $mon->{$id};
737    
738     unless (%$mon) {
739     global_req_del "mon1 $family";
740 root 1.80
741     # no global_req, because we don't care if we are not connected
742 root 1.81 snd $MASTER, g_mon0 => $family
743 root 1.80 if $MASTER;
744    
745 root 1.81 delete $LOCAL_MON{$family};
746     delete $MON_DB{$family};
747 root 1.80 }
748     }
749     }
750    
751 root 1.82 # full update
752 root 1.80 $NODE_REQ{g_chg1} = sub {
753 root 1.81 my ($f, $db) = @_;
754    
755 root 1.82 # add or replace keys
756     while (my ($k, $v) = each %$db) {
757     g_chg $f, $k, $v;
758     }
759 root 1.81
760 root 1.82 # delete keys that are no longer present
761     for (keys %{ $MON_DB{$f} }) {
762 root 1.81 g_chg $f, $_
763     unless exists $db->{$_};
764     }
765 root 1.80 };
766    
767 root 1.82 # incremental update
768 root 1.81 $NODE_REQ{g_chg2} = \&g_chg;
769 root 1.80
770 root 1.69 #############################################################################
771     # configure
772    
773 root 1.81 sub nodename {
774 root 1.69 require POSIX;
775     (POSIX::uname ())[1]
776     }
777    
778     sub _resolve($) {
779     my ($nodeid) = @_;
780    
781     my $cv = AE::cv;
782     my @res;
783    
784     $cv->begin (sub {
785     my %seen;
786     my @refs;
787     for (sort { $a->[0] <=> $b->[0] } @res) {
788     push @refs, $_->[1] unless $seen{$_->[1]}++
789     }
790     shift->send (@refs);
791     });
792    
793     my $idx;
794     for my $t (split /,/, $nodeid) {
795     my $pri = ++$idx;
796    
797 root 1.81 $t = length $t ? nodename . ":$t" : nodename
798 root 1.69 if $t =~ /^\d*$/;
799    
800     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
801     or Carp::croak "$t: unparsable transport descriptor";
802    
803     $port = "0" if $port eq "*";
804    
805     if ($host eq "*") {
806     $cv->begin;
807     # use fork_call, as Net::Interface is big, and we need it rarely.
808     require AnyEvent::Util;
809     AnyEvent::Util::fork_call (
810     sub {
811     my @addr;
812    
813     require Net::Interface;
814    
815     for my $if (Net::Interface->interfaces) {
816     # we statically lower-prioritise ipv6 here, TODO :()
817     for $_ ($if->address (Net::Interface::AF_INET ())) {
818     next if /^\x7f/; # skip localhost etc.
819     push @addr, $_;
820     }
821     for ($if->address (Net::Interface::AF_INET6 ())) {
822     #next if $if->scope ($_) <= 2;
823     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
824     push @addr, $_;
825     }
826    
827     }
828     @addr
829     }, sub {
830     for my $ip (@_) {
831     push @res, [
832     $pri += 1e-5,
833     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
834     ];
835     }
836     $cv->end;
837     }
838     );
839     } else {
840     $cv->begin;
841     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
842     for (@_) {
843     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
844     push @res, [
845     $pri += 1e-5,
846     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
847     ];
848     }
849     $cv->end;
850     };
851     }
852     }
853    
854     $cv->end;
855    
856     $cv
857     }
858    
859     sub configure(@) {
860     unshift @_, "profile" if @_ & 1;
861     my (%kv) = @_;
862    
863     delete $NODE{$NODE}; # we do not support doing stuff before configure
864     _init_names;
865    
866     my $profile = delete $kv{profile};
867    
868 root 1.81 $profile = nodename
869 root 1.69 unless defined $profile;
870    
871     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
872    
873 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
874 root 1.69
875     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
876    
877 root 1.72 $NODE = $node;
878 root 1.77
879 root 1.81 $NODE =~ s/%n/nodename/ge;
880    
881     if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
882 root 1.77 # nodes with randomised node names do not need randomised port names
883     $UNIQ = "";
884     }
885 root 1.69
886     $NODE{$NODE} = $NODE{""};
887     $NODE{$NODE}{id} = $NODE;
888    
889     my $seeds = $CONFIG->{seeds};
890     my $binds = $CONFIG->{binds};
891    
892     $binds ||= ["*"];
893    
894     $WARN->(8, "node $NODE starting up.");
895    
896     $LISTENER = [];
897     %LISTENER = ();
898    
899     for (map _resolve $_, @$binds) {
900     for my $bind ($_->recv) {
901     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
902     or Carp::croak "$bind: unparsable local bind address";
903    
904     my $listener = AnyEvent::MP::Transport::mp_server
905     $host,
906     $port,
907     prepare => sub {
908     my (undef, $host, $port) = @_;
909     $bind = AnyEvent::Socket::format_hostport $host, $port;
910     0
911     },
912     ;
913     $LISTENER{$bind} = $listener;
914     push @$LISTENER, $bind;
915     }
916     }
917    
918 root 1.79 db_set "'l" => $NODE => $LISTENER;
919 root 1.73
920 root 1.69 $WARN->(8, "node listens on [@$LISTENER].");
921    
922     # connect to all seednodes
923     set_seeds map $_->recv, map _resolve $_, @$seeds;
924    
925 root 1.73 master_search;
926    
927 root 1.71 if ($NODE eq "atha") {;#d#
928 root 1.72 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
929 root 1.71 }
930    
931 root 1.69 for (@{ $CONFIG->{services} }) {
932     if (ref) {
933     my ($func, @args) = @$_;
934     (load_func $func)->(@args);
935     } elsif (s/::$//) {
936     eval "require $_";
937     die $@ if $@;
938     } else {
939     (load_func $_)->();
940     }
941     }
942     }
943    
944 root 1.1 =back
945    
946     =head1 SEE ALSO
947    
948     L<AnyEvent::MP>.
949    
950     =head1 AUTHOR
951    
952     Marc Lehmann <schmorp@schmorp.de>
953     http://home.schmorp.de/
954    
955     =cut
956    
957     1
958