ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.76
Committed: Thu Mar 1 19:37:59 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.75: +10 -30 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.71 our @EXPORT_OK = qw(
39     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40     );
41    
42 root 1.1 our @EXPORT = qw(
43 root 1.21 add_node load_func snd_to_func snd_on eval_on
44 root 1.1
45 root 1.34 NODE $NODE node_of snd kil port_is_local
46     configure
47 root 1.46 up_nodes mon_nodes node_is_up
48 root 1.1 );
49    
50 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
51 root 1.1
52 root 1.27 This value is called with an error or warning message, when e.g. a
53     connection could not be created, authorisation failed and so on.
54    
55     It I<must not> block or send messages -queue it and use an idle watcher if
56     you need to do any of these things.
57 root 1.1
58 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
59 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
60     node connectivity and services, C<8> for debugging messages and C<9> for
61     tracing messages.
62    
63 root 1.1 The default simply logs the message to STDERR.
64    
65 root 1.44 =item @AnyEvent::MP::Kernel::WARN
66    
67     All code references in this array are called for every log message, from
68     the default C<$WARN> handler. This is an easy way to tie into the log
69     messages without disturbing others.
70    
71 root 1.1 =cut
72    
73 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
74 root 1.44 our @WARN;
75     our $WARN = sub {
76     &$_ for @WARN;
77 root 1.29
78     return if $WARNLEVEL < $_[0];
79    
80 root 1.16 my ($level, $msg) = @_;
81    
82 root 1.1 $msg =~ s/\n$//;
83 root 1.16
84     printf STDERR "%s <%d> %s\n",
85     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
86     $level,
87     $msg;
88 root 1.1 };
89    
90 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
91    
92     The maximum level at which warning messages will be printed to STDERR by
93     the default warn handler.
94    
95     =cut
96    
97 root 1.6 sub load_func($) {
98     my $func = $_[0];
99    
100     unless (defined &$func) {
101     my $pkg = $func;
102     do {
103     $pkg =~ s/::[^:]+$//
104 root 1.63 or return sub { die "unable to resolve function '$func'" };
105 root 1.60
106     local $@;
107 root 1.61 unless (eval "require $pkg; 1") {
108     my $error = $@;
109     $error =~ /^Can't locate .*.pm in \@INC \(/
110     or return sub { die $error };
111     }
112 root 1.6 } until defined &$func;
113     }
114    
115     \&$func
116     }
117    
118 root 1.1 sub nonce($) {
119     my $nonce;
120    
121     if (open my $fh, "</dev/urandom") {
122     sysread $fh, $nonce, $_[0];
123     } else {
124     # shit...
125     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
126     }
127    
128     $nonce
129     }
130    
131 root 1.76 # binary to alphanumeric
132 root 1.21 sub alnumbits($) {
133 root 1.76 (my $data = MIME::Base64::encode_base64 $_[0], "") =~ y%+/=%-_%d;
134 root 1.1 $data
135     }
136    
137     sub gen_uniq {
138 root 1.76 # 48 bits happen to fit perfectly into 8 chars
139     # we add one more char to get 54 bits.
140     substr +(alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 3), 0, 9
141 root 1.1 }
142    
143 root 1.20 our $CONFIG; # this node's configuration
144 root 1.21
145 root 1.64 our $RUNIQ; # remote uniq value
146     our $UNIQ; # per-process/node unique cookie
147     our $NODE;
148     our $ID = "a";
149 root 1.1
150     our %NODE; # node id to transport mapping, or "undef", for local node
151     our (%PORT, %PORT_DATA); # local ports
152    
153 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
154 root 1.1 our %LMON; # monitored _local_ ports
155    
156 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
157 root 1.1 our %LISTENER;
158 root 1.21 our $LISTENER; # our listeners, as arrayref
159 root 1.1
160 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
161 root 1.1
162 root 1.69 sub _init_names {
163 root 1.64 $UNIQ = gen_uniq;
164 root 1.76 $RUNIQ = substr +(alnumbits nonce 64/8), 0, 10; # for remote port names &c, should be longer than $UNIQ
165 root 1.64 $NODE = "anon/$RUNIQ";
166     }
167    
168 root 1.69 _init_names;
169 root 1.64
170 root 1.1 sub NODE() {
171     $NODE
172     }
173    
174     sub node_of($) {
175 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
176 root 1.1
177 root 1.21 $node
178 root 1.1 }
179    
180 root 1.17 BEGIN {
181     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
182     ? sub () { 1 }
183     : sub () { 0 };
184     }
185 root 1.1
186 root 1.42 our $DELAY_TIMER;
187     our @DELAY_QUEUE;
188    
189     sub _delay_run {
190 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
191 root 1.42 }
192    
193     sub delay($) {
194     push @DELAY_QUEUE, shift;
195     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
196     }
197    
198 root 1.1 sub _inject {
199 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
200 root 1.1 &{ $PORT{+shift} or return };
201     }
202    
203 root 1.20 # this function adds a node-ref, so you can send stuff to it
204     # it is basically the central routing component.
205 root 1.1 sub add_node {
206 root 1.21 my ($node) = @_;
207 root 1.1
208 root 1.71 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
209 root 1.13 }
210    
211 root 1.1 sub snd(@) {
212 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
213 root 1.1
214 root 1.48 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
215 root 1.1
216 root 1.49 defined $nodeid #d#UGLY
217     or Carp::croak "'undef' is not a valid node ID/port ID";
218    
219 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
220 root 1.2 ->{send} (["$portid", @_]);
221 root 1.1 }
222    
223 root 1.17 =item $is_local = port_is_local $port
224    
225     Returns true iff the port is a local port.
226    
227     =cut
228    
229     sub port_is_local($) {
230 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
231 root 1.17
232 root 1.21 $NODE{$nodeid} == $NODE{""}
233 root 1.17 }
234    
235 root 1.18 =item snd_to_func $node, $func, @args
236 root 1.11
237 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
238 root 1.11 this function with the given arguments on that node.
239    
240 root 1.20 This function can be used to implement C<spawn>-like interfaces.
241 root 1.11
242     =cut
243    
244 root 1.18 sub snd_to_func($$;@) {
245 root 1.21 my $nodeid = shift;
246 root 1.11
247 root 1.41 # on $NODE, we artificially delay... (for spawn)
248     # this is very ugly - maybe we should simply delay ALL messages,
249     # to avoid deep recursion issues. but that's so... slow...
250 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
251     if $nodeid ne $NODE;
252    
253 root 1.49 defined $nodeid #d#UGLY
254     or Carp::croak "'undef' is not a valid node ID/port ID";
255    
256 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
257 root 1.11 }
258    
259 root 1.18 =item snd_on $node, @msg
260    
261     Executes C<snd> with the given C<@msg> (which must include the destination
262     port) on the given node.
263    
264     =cut
265    
266     sub snd_on($@) {
267     my $node = shift;
268     snd $node, snd => @_;
269     }
270    
271 root 1.29 =item eval_on $node, $string[, @reply]
272 root 1.18
273 root 1.29 Evaluates the given string as Perl expression on the given node. When
274     @reply is specified, then it is used to construct a reply message with
275     C<"$@"> and any results from the eval appended.
276 root 1.18
277     =cut
278    
279 root 1.29 sub eval_on($$;@) {
280 root 1.18 my $node = shift;
281     snd $node, eval => @_;
282     }
283    
284 root 1.1 sub kil(@) {
285 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
286 root 1.1
287     length $portid
288 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
289 root 1.1
290 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
291 root 1.1 ->kill ("$portid", @_);
292     }
293    
294     #############################################################################
295 root 1.6 # node monitoring and info
296 root 1.3
297 root 1.21 =item node_is_known $nodeid
298 root 1.13
299 root 1.71 #TODO#
300     Returns true iff the given node is currently known to this node.
301 root 1.13
302     =cut
303    
304     sub node_is_known($) {
305     exists $NODE{$_[0]}
306     }
307    
308 root 1.21 =item node_is_up $nodeid
309 root 1.13
310     Returns true if the given node is "up", that is, the kernel thinks it has
311     a working connection to it.
312    
313 root 1.69 If the node is known (to this local node) but not currently connected,
314     returns C<0>. If the node is not known, returns C<undef>.
315 root 1.13
316     =cut
317    
318     sub node_is_up($) {
319     ($NODE{$_[0]} or return)->{transport}
320     ? 1 : 0
321     }
322    
323 root 1.3 =item known_nodes
324    
325 root 1.71 #TODO#
326 root 1.26 Returns the node IDs of all nodes currently known to this node, including
327     itself and nodes not currently connected.
328 root 1.3
329     =cut
330    
331 root 1.49 sub known_nodes() {
332 root 1.26 map $_->{id}, values %NODE
333 root 1.3 }
334    
335     =item up_nodes
336    
337 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
338     the node itself).
339 root 1.3
340     =cut
341    
342 root 1.49 sub up_nodes() {
343 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
344 root 1.3 }
345    
346 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
347 root 1.3
348 root 1.27 Registers a callback that is called each time a node goes up (a connection
349     is established) or down (the connection is lost).
350 root 1.3
351     Node up messages can only be followed by node down messages for the same
352     node, and vice versa.
353    
354 root 1.71 Note that monitoring a node is usually better done by monitoring its node
355 root 1.27 port. This function is mainly of interest to modules that are concerned
356     about the network topology and low-level connection handling.
357    
358     Callbacks I<must not> block and I<should not> send any messages.
359    
360     The function returns an optional guard which can be used to unregister
361 root 1.3 the monitoring callback again.
362    
363 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
364     or go up (and down).
365    
366     newnode $_, 1 for up_nodes;
367     mon_nodes \&newnode;
368    
369 root 1.3 =cut
370    
371     our %MON_NODES;
372    
373     sub mon_nodes($) {
374     my ($cb) = @_;
375    
376     $MON_NODES{$cb+0} = $cb;
377    
378 root 1.62 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
379 root 1.3 }
380    
381     sub _inject_nodeevent($$;@) {
382 root 1.16 my ($node, $up, @reason) = @_;
383 root 1.3
384     for my $cb (values %MON_NODES) {
385 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
386 root 1.16 or $WARN->(1, $@);
387 root 1.3 }
388 root 1.16
389 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
390 root 1.3 }
391    
392     #############################################################################
393 root 1.1 # self node code
394    
395 root 1.67 sub _kill {
396     my $port = shift;
397    
398     delete $PORT{$port}
399     or return; # killing nonexistent ports is O.K.
400     delete $PORT_DATA{$port};
401    
402     my $mon = delete $LMON{$port}
403     or !@_
404     or $WARN->(2, "unmonitored local port $port died with reason: @_");
405    
406     $_->(@_) for values %$mon;
407     }
408    
409     sub _monitor {
410     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
411     unless exists $PORT{$_[1]};
412    
413     $LMON{$_[1]}{$_[2]+0} = $_[2];
414     }
415    
416     sub _unmonitor {
417 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
418     if exists $LMON{$_[1]};
419 root 1.67 }
420    
421 root 1.1 our %node_req = (
422     # internal services
423    
424     # monitoring
425 root 1.65 mon0 => sub { # stop monitoring a port for another node
426 root 1.1 my $portid = shift;
427 root 1.67 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
428 root 1.1 },
429 root 1.65 mon1 => sub { # start monitoring a port for another node
430 root 1.1 my $portid = shift;
431 root 1.67 Scalar::Util::weaken (my $node = $SRCNODE);
432     _monitor undef, $portid, $node->{rmon}{$portid} = sub {
433 root 1.58 delete $node->{rmon}{$portid};
434 root 1.65 $node->send (["", kil0 => $portid, @_])
435 root 1.59 if $node && $node->{transport};
436 root 1.67 };
437 root 1.1 },
438 root 1.65 # another node has killed a monitored port
439     kil0 => sub {
440 root 1.1 my $cbs = delete $SRCNODE->{lmon}{+shift}
441     or return;
442    
443     $_->(@_) for @$cbs;
444     },
445    
446 root 1.18 # "public" services - not actually public
447 root 1.1
448 root 1.65 # another node wants to kill a local port
449 root 1.66 kil => \&_kill,
450 root 1.65
451 root 1.1 # relay message to another node / generic echo
452 root 1.15 snd => \&snd,
453 root 1.27 snd_multiple => sub {
454 root 1.1 snd @$_ for @_
455     },
456    
457 root 1.30 # random utilities
458 root 1.1 eval => sub {
459 root 1.76 #d#SECURE
460 root 1.50 my @res = do { package main; eval shift };
461 root 1.1 snd @_, "$@", @res if @_;
462     },
463     time => sub {
464 root 1.76 snd @_, AE::now;
465 root 1.1 },
466     devnull => sub {
467     #
468     },
469 root 1.15 "" => sub {
470 root 1.27 # empty messages are keepalives or similar devnull-applications
471 root 1.15 },
472 root 1.1 );
473    
474 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
475 root 1.1 $PORT{""} = sub {
476     my $tag = shift;
477 root 1.76 #d#SECURE (load_func)
478 root 1.1 eval { &{ $node_req{$tag} ||= load_func $tag } };
479 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
480 root 1.1 };
481    
482 root 1.69 #############################################################################
483 root 1.71 # seed management, try to keep connections to all seeds at all times
484 root 1.69
485 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
486     our %NODE_SEED; # map node ID to seed ID
487 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
488     our $SEED_WATCHER;
489 root 1.71 our $SEED_RETRY;
490 root 1.69
491     sub seed_connect {
492     my ($seed) = @_;
493    
494     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
495     or Carp::croak "$seed: unparsable seed address";
496    
497     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
498    
499 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
500     $host, $port,
501     on_greeted => sub {
502     # called after receiving remote greeting, learn remote node name
503    
504     # we rely on untrusted data here (the remote node name) this is
505     # hopefully ok, as this can at most be used for DOSing, which is easy
506     # when you can do MITM anyway.
507    
508     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
509     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
510 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
511 root 1.71 delete $SEED_NODE{$seed};
512     delete $NODE_SEED{$seed};
513     } else {
514     $SEED_NODE{$seed} = $_[0]{remote_node};
515     $NODE_SEED{$_[0]{remote_node}} = $seed;
516     }
517     },
518     on_destroy => sub {
519     delete $SEED_CONNECT{$seed};
520     },
521 root 1.69 sub {
522     $SEED_CONNECT{$seed} = 1;
523 root 1.71 }
524 root 1.69 ;
525     }
526    
527     sub seed_all {
528     # my $next = List::Util::max 1,
529     # $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
530     # * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
531     # - rand;
532    
533     my @seeds = grep {
534     !exists $SEED_CONNECT{$_}
535     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
536     } keys %SEED_NODE;
537    
538     if (@seeds) {
539 root 1.70 # start connection attempt for every seed we are not connected to yet
540 root 1.69 seed_connect $_
541     for @seeds;
542 root 1.71
543     $SEED_RETRY = $SEED_RETRY * 2 + rand;
544     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
545     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
546    
547     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
548    
549 root 1.69 } else {
550 root 1.71 # all seeds connected or connecting, no need to restart timer
551 root 1.69 undef $SEED_WATCHER;
552     }
553     }
554    
555     sub seed_again {
556 root 1.71 $SEED_RETRY = 1;
557     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
558 root 1.69 }
559    
560     # sets new seed list, starts connecting
561     sub set_seeds(@) {
562     %SEED_NODE = ();
563 root 1.71 %NODE_SEED = ();
564     %SEED_CONNECT = ();
565    
566 root 1.69 @SEED_NODE{@_} = ();
567    
568 root 1.71 seed_again;#d#
569     seed_all;
570     }
571    
572     mon_nodes sub {
573     # if we lost the connection to a seed node, make sure we are seeding
574     seed_again
575     if !$_[1] && exists $NODE_SEED{$_[0]};
576     };
577    
578     #############################################################################
579     # talk with/to global nodes
580    
581 root 1.72 # protocol messages:
582     #
583 root 1.73 # sent by all slave nodes (slave to master)
584     # g_slave database - make other global node master of the sender
585 root 1.72 #
586 root 1.73 # sent by any node to global nodes
587     # g_set database - set whole database
588     # g_add family key val - add/replace key to database
589     # g_del family key - delete key from database
590     # g_get family key reply... - send reply with data
591     #
592     # send by global nodes
593     # g_global - node became global, similar to global=1 greeting
594     #
595     # database families
596     # "'l" -> node -> listeners
597     # "'g" -> node -> undef
598     # ...
599 root 1.72 #
600    
601 root 1.73 # used on all nodes:
602     our $MASTER; # the global node we bind ourselves to, unless we are global ourselves
603 root 1.71 our $GLOBAL_MON;
604     our $GLOBAL_TIMER;
605 root 1.73 our %LOCAL_DB; # this node database
606 root 1.71
607 root 1.73 # used on global nodes:
608     our %GLOBAL_DB; # all local databases, merged
609     our %LOCAL_DBS; # local databases of all global nodes
610     our %GLOBAL_DBS; # global db
611     our %GLOBAL_SLAVE; # nodes that are our slaves
612 root 1.71
613 root 1.73 #our $sv_ne_json = JSON::XS->new->canonical;
614     #
615     #sub sv_ne($$) {
616     # $sv_ne_json->encode ($_[0]) ne $sv_ne_json->encode ($_[1])
617     #}
618 root 1.71
619 root 1.73 # should be only on globals, but...
620     $node_req{g_global} = sub {
621     if ($GLOBAL) {
622     &g_clr ($SRCNODE->{id});
623     $SRCNODE->{transport}{remote_greeting}{global} = 1;
624     delete $GLOBAL_SLAVE{$SRCNODE->{id}};
625     snd $SRCNODE->{id}, g_set => \%LOCAL_DB;
626     }
627     };
628 root 1.71
629 root 1.73 sub other_globals() {
630     grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
631 root 1.71 }
632    
633 root 1.73 # local database management
634     sub ldb_set($$;$) {
635     warn "ldb_set<@_>\n";#d#
636     if (@_ > 2) {
637     $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
638     snd $MASTER, g_add => $_[0] => $_[1] => $_[2]
639     if defined $MASTER;
640     } else {
641     delete $LOCAL_DB{$_[0]}{$_[1]};
642     snd $MASTER, g_del => $_[0] => $_[1]
643     if defined $MASTER;
644 root 1.71 }
645     }
646    
647 root 1.73 #############################################################################
648     # master selection
649    
650     # master requests
651     our %GLOBAL_REQ; # $id => \@req
652 root 1.71
653 root 1.74 sub global_req_add {
654     my $id = shift;
655 root 1.71
656 root 1.74 return if exists $GLOBAL_REQ{$id};
657    
658     $GLOBAL_REQ{$id} = [@_];
659 root 1.71
660 root 1.73 snd $MASTER, @_
661     if $MASTER;
662 root 1.74 }
663 root 1.71
664 root 1.74 sub global_req_del {
665     delete $GLOBAL_REQ{$_[0]};
666     }
667    
668     sub g_find {
669     global_req_add "g_find $_[0]", g_find => $_[0];
670 root 1.73 }
671 root 1.71
672 root 1.73 # reply for g_find started in Node.pm
673     $node_req{g_found} = sub {
674 root 1.74 global_req_del "g_find $_[0]";
675    
676 root 1.73 @{ $_[1] } or return; # d'oh
677 root 1.72
678 root 1.73 my $node = $NODE{$_[0]} or return;
679 root 1.71
680 root 1.73 local $GLOBAL_DB{"'l"}{$_[0]} = $_[1]; #d# UGLY
681     $node->connect;
682 root 1.71 };
683    
684 root 1.73 sub master_set {
685     $MASTER = $_[0];
686 root 1.71
687 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
688 root 1.71
689 root 1.73 # (re-)send queued requests
690     snd $MASTER, @$_
691     for values %GLOBAL_REQ;
692     }
693 root 1.71
694 root 1.72 sub master_search {
695 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
696     for (keys %NODE_SEED) {
697 root 1.72 if (node_is_up $_) {
698     master_set $_;
699     return;
700 root 1.71 }
701 root 1.72 }
702 root 1.71
703 root 1.72 $GLOBAL_MON = mon_nodes sub {
704     return unless $_[1]; # we are only interested in node-ups
705     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
706 root 1.71
707 root 1.72 master_set $_[0];
708 root 1.71
709 root 1.72 $GLOBAL_MON = mon_nodes sub {
710     if ($_[0] eq $MASTER && !$_[1]) {
711     undef $MASTER;
712     master_search ();
713     }
714 root 1.71 };
715 root 1.72 };
716 root 1.71 }
717    
718 root 1.73 # other node wants to make us the master
719     $node_req{g_slave} = sub {
720     my ($db) = @_;
721    
722     warn "slave1\n";#d#
723 root 1.71
724 root 1.73 require AnyEvent::MP::Global;
725     &{ $node_req{g_slave} };
726 root 1.71 };
727    
728 root 1.73 #$node_req{g_reply} = sub {
729     # my $id = shift;
730     #
731     # my $cb = delete $GLOBAL_REQ{$id}
732     # or return;
733     #
734     # $cb->[0]->(@_);
735     #};
736 root 1.71
737 root 1.73 #############################################################################
738 root 1.71
739 root 1.73 #############################################################################
740 root 1.71
741 root 1.73 # $WARN->(1, "$SRCNODE->{id} treats us as global node, but we aren't");
742 root 1.71
743 root 1.69 #############################################################################
744     # configure
745    
746     sub _nodename {
747     require POSIX;
748     (POSIX::uname ())[1]
749     }
750    
751     sub _resolve($) {
752     my ($nodeid) = @_;
753    
754     my $cv = AE::cv;
755     my @res;
756    
757     $cv->begin (sub {
758     my %seen;
759     my @refs;
760     for (sort { $a->[0] <=> $b->[0] } @res) {
761     push @refs, $_->[1] unless $seen{$_->[1]}++
762     }
763     shift->send (@refs);
764     });
765    
766     my $idx;
767     for my $t (split /,/, $nodeid) {
768     my $pri = ++$idx;
769    
770     $t = length $t ? _nodename . ":$t" : _nodename
771     if $t =~ /^\d*$/;
772    
773     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
774     or Carp::croak "$t: unparsable transport descriptor";
775    
776     $port = "0" if $port eq "*";
777    
778     if ($host eq "*") {
779     $cv->begin;
780     # use fork_call, as Net::Interface is big, and we need it rarely.
781     require AnyEvent::Util;
782     AnyEvent::Util::fork_call (
783     sub {
784     my @addr;
785    
786     require Net::Interface;
787    
788     for my $if (Net::Interface->interfaces) {
789     # we statically lower-prioritise ipv6 here, TODO :()
790     for $_ ($if->address (Net::Interface::AF_INET ())) {
791     next if /^\x7f/; # skip localhost etc.
792     push @addr, $_;
793     }
794     for ($if->address (Net::Interface::AF_INET6 ())) {
795     #next if $if->scope ($_) <= 2;
796     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
797     push @addr, $_;
798     }
799    
800     }
801     @addr
802     }, sub {
803     for my $ip (@_) {
804     push @res, [
805     $pri += 1e-5,
806     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
807     ];
808     }
809     $cv->end;
810     }
811     );
812     } else {
813     $cv->begin;
814     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
815     for (@_) {
816     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
817     push @res, [
818     $pri += 1e-5,
819     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
820     ];
821     }
822     $cv->end;
823     };
824     }
825     }
826    
827     $cv->end;
828    
829     $cv
830     }
831    
832     sub configure(@) {
833     unshift @_, "profile" if @_ & 1;
834     my (%kv) = @_;
835    
836     delete $NODE{$NODE}; # we do not support doing stuff before configure
837     _init_names;
838    
839     my $profile = delete $kv{profile};
840    
841     $profile = _nodename
842     unless defined $profile;
843    
844     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
845    
846 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
847 root 1.69
848     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
849    
850 root 1.72 $NODE = $node;
851     $NODE =~ s%/$%/$RUNIQ%;
852 root 1.69
853     $NODE{$NODE} = $NODE{""};
854     $NODE{$NODE}{id} = $NODE;
855    
856     my $seeds = $CONFIG->{seeds};
857     my $binds = $CONFIG->{binds};
858    
859     $binds ||= ["*"];
860    
861     $WARN->(8, "node $NODE starting up.");
862    
863     $LISTENER = [];
864     %LISTENER = ();
865    
866     for (map _resolve $_, @$binds) {
867     for my $bind ($_->recv) {
868     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
869     or Carp::croak "$bind: unparsable local bind address";
870    
871     my $listener = AnyEvent::MP::Transport::mp_server
872     $host,
873     $port,
874     prepare => sub {
875     my (undef, $host, $port) = @_;
876     $bind = AnyEvent::Socket::format_hostport $host, $port;
877     0
878     },
879     ;
880     $LISTENER{$bind} = $listener;
881     push @$LISTENER, $bind;
882     }
883     }
884    
885 root 1.73 ldb_set "'l" => $NODE => $LISTENER;
886    
887 root 1.69 $WARN->(8, "node listens on [@$LISTENER].");
888    
889     # the global service is mandatory currently
890 root 1.71 #require AnyEvent::MP::Global;
891 root 1.69
892     # connect to all seednodes
893     set_seeds map $_->recv, map _resolve $_, @$seeds;
894    
895 root 1.73 master_search;
896    
897 root 1.71 if ($NODE eq "atha") {;#d#
898 root 1.72 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
899 root 1.71 }
900    
901 root 1.69 for (@{ $CONFIG->{services} }) {
902     if (ref) {
903     my ($func, @args) = @$_;
904     (load_func $func)->(@args);
905     } elsif (s/::$//) {
906     eval "require $_";
907     die $@ if $@;
908     } else {
909     (load_func $_)->();
910     }
911     }
912     }
913    
914 root 1.1 =back
915    
916     =head1 SEE ALSO
917    
918     L<AnyEvent::MP>.
919    
920     =head1 AUTHOR
921    
922     Marc Lehmann <schmorp@schmorp.de>
923     http://home.schmorp.de/
924    
925     =cut
926    
927     1
928