ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.74
Committed: Thu Mar 1 18:31:42 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.73: +14 -4 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.71 our @EXPORT_OK = qw(
39     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40     );
41    
42 root 1.1 our @EXPORT = qw(
43 root 1.21 add_node load_func snd_to_func snd_on eval_on
44 root 1.1
45 root 1.34 NODE $NODE node_of snd kil port_is_local
46     configure
47 root 1.46 up_nodes mon_nodes node_is_up
48 root 1.1 );
49    
50 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
51 root 1.1
52 root 1.27 This value is called with an error or warning message, when e.g. a
53     connection could not be created, authorisation failed and so on.
54    
55     It I<must not> block or send messages -queue it and use an idle watcher if
56     you need to do any of these things.
57 root 1.1
58 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
59 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
60     node connectivity and services, C<8> for debugging messages and C<9> for
61     tracing messages.
62    
63 root 1.1 The default simply logs the message to STDERR.
64    
65 root 1.44 =item @AnyEvent::MP::Kernel::WARN
66    
67     All code references in this array are called for every log message, from
68     the default C<$WARN> handler. This is an easy way to tie into the log
69     messages without disturbing others.
70    
71 root 1.1 =cut
72    
73 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
74 root 1.44 our @WARN;
75     our $WARN = sub {
76     &$_ for @WARN;
77 root 1.29
78     return if $WARNLEVEL < $_[0];
79    
80 root 1.16 my ($level, $msg) = @_;
81    
82 root 1.1 $msg =~ s/\n$//;
83 root 1.16
84     printf STDERR "%s <%d> %s\n",
85     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
86     $level,
87     $msg;
88 root 1.1 };
89    
90 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
91    
92     The maximum level at which warning messages will be printed to STDERR by
93     the default warn handler.
94    
95     =cut
96    
97 root 1.6 sub load_func($) {
98     my $func = $_[0];
99    
100     unless (defined &$func) {
101     my $pkg = $func;
102     do {
103     $pkg =~ s/::[^:]+$//
104 root 1.63 or return sub { die "unable to resolve function '$func'" };
105 root 1.60
106     local $@;
107 root 1.61 unless (eval "require $pkg; 1") {
108     my $error = $@;
109     $error =~ /^Can't locate .*.pm in \@INC \(/
110     or return sub { die $error };
111     }
112 root 1.6 } until defined &$func;
113     }
114    
115     \&$func
116     }
117    
118 root 1.1 sub nonce($) {
119     my $nonce;
120    
121     if (open my $fh, "</dev/urandom") {
122     sysread $fh, $nonce, $_[0];
123     } else {
124     # shit...
125     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
126     }
127    
128     $nonce
129     }
130    
131 root 1.21 sub alnumbits($) {
132 root 1.1 my $data = $_[0];
133    
134     if (eval "use Math::GMP 2.05; 1") {
135     $data = Math::GMP::get_str_gmp (
136     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
137     62
138     );
139     } else {
140     $data = MIME::Base64::encode_base64 $data, "";
141     $data =~ s/=//;
142 root 1.31 $data =~ s/x/x0/g;
143     $data =~ s/\//x1/g;
144     $data =~ s/\+/x2/g;
145 root 1.1 }
146    
147     $data
148     }
149    
150     sub gen_uniq {
151 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
152 root 1.1 }
153    
154 root 1.20 our $CONFIG; # this node's configuration
155 root 1.21
156 root 1.64 our $RUNIQ; # remote uniq value
157     our $UNIQ; # per-process/node unique cookie
158     our $NODE;
159     our $ID = "a";
160 root 1.1
161     our %NODE; # node id to transport mapping, or "undef", for local node
162     our (%PORT, %PORT_DATA); # local ports
163    
164 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
165 root 1.1 our %LMON; # monitored _local_ ports
166    
167 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
168 root 1.1 our %LISTENER;
169 root 1.21 our $LISTENER; # our listeners, as arrayref
170 root 1.1
171     our $SRCNODE; # holds the sending node during _inject
172    
173 root 1.69 sub _init_names {
174 root 1.64 $RUNIQ = alnumbits nonce 96/8;
175     $UNIQ = gen_uniq;
176     $NODE = "anon/$RUNIQ";
177     }
178    
179 root 1.69 _init_names;
180 root 1.64
181 root 1.1 sub NODE() {
182     $NODE
183     }
184    
185     sub node_of($) {
186 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
187 root 1.1
188 root 1.21 $node
189 root 1.1 }
190    
191 root 1.17 BEGIN {
192     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
193     ? sub () { 1 }
194     : sub () { 0 };
195     }
196 root 1.1
197 root 1.42 our $DELAY_TIMER;
198     our @DELAY_QUEUE;
199    
200     sub _delay_run {
201 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
202 root 1.42 }
203    
204     sub delay($) {
205     push @DELAY_QUEUE, shift;
206     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
207     }
208    
209 root 1.1 sub _inject {
210 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
211 root 1.1 &{ $PORT{+shift} or return };
212     }
213    
214 root 1.20 # this function adds a node-ref, so you can send stuff to it
215     # it is basically the central routing component.
216 root 1.1 sub add_node {
217 root 1.21 my ($node) = @_;
218 root 1.1
219 root 1.71 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
220 root 1.13 }
221    
222 root 1.1 sub snd(@) {
223 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
224 root 1.1
225 root 1.48 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
226 root 1.1
227 root 1.49 defined $nodeid #d#UGLY
228     or Carp::croak "'undef' is not a valid node ID/port ID";
229    
230 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
231 root 1.2 ->{send} (["$portid", @_]);
232 root 1.1 }
233    
234 root 1.17 =item $is_local = port_is_local $port
235    
236     Returns true iff the port is a local port.
237    
238     =cut
239    
240     sub port_is_local($) {
241 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
242 root 1.17
243 root 1.21 $NODE{$nodeid} == $NODE{""}
244 root 1.17 }
245    
246 root 1.18 =item snd_to_func $node, $func, @args
247 root 1.11
248 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
249 root 1.11 this function with the given arguments on that node.
250    
251 root 1.20 This function can be used to implement C<spawn>-like interfaces.
252 root 1.11
253     =cut
254    
255 root 1.18 sub snd_to_func($$;@) {
256 root 1.21 my $nodeid = shift;
257 root 1.11
258 root 1.41 # on $NODE, we artificially delay... (for spawn)
259     # this is very ugly - maybe we should simply delay ALL messages,
260     # to avoid deep recursion issues. but that's so... slow...
261 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
262     if $nodeid ne $NODE;
263    
264 root 1.49 defined $nodeid #d#UGLY
265     or Carp::croak "'undef' is not a valid node ID/port ID";
266    
267 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
268 root 1.11 }
269    
270 root 1.18 =item snd_on $node, @msg
271    
272     Executes C<snd> with the given C<@msg> (which must include the destination
273     port) on the given node.
274    
275     =cut
276    
277     sub snd_on($@) {
278     my $node = shift;
279     snd $node, snd => @_;
280     }
281    
282 root 1.29 =item eval_on $node, $string[, @reply]
283 root 1.18
284 root 1.29 Evaluates the given string as Perl expression on the given node. When
285     @reply is specified, then it is used to construct a reply message with
286     C<"$@"> and any results from the eval appended.
287 root 1.18
288     =cut
289    
290 root 1.29 sub eval_on($$;@) {
291 root 1.18 my $node = shift;
292     snd $node, eval => @_;
293     }
294    
295 root 1.1 sub kil(@) {
296 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
297 root 1.1
298     length $portid
299 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
300 root 1.1
301 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
302 root 1.1 ->kill ("$portid", @_);
303     }
304    
305     #############################################################################
306 root 1.6 # node monitoring and info
307 root 1.3
308 root 1.21 =item node_is_known $nodeid
309 root 1.13
310 root 1.71 #TODO#
311     Returns true iff the given node is currently known to this node.
312 root 1.13
313     =cut
314    
315     sub node_is_known($) {
316     exists $NODE{$_[0]}
317     }
318    
319 root 1.21 =item node_is_up $nodeid
320 root 1.13
321     Returns true if the given node is "up", that is, the kernel thinks it has
322     a working connection to it.
323    
324 root 1.69 If the node is known (to this local node) but not currently connected,
325     returns C<0>. If the node is not known, returns C<undef>.
326 root 1.13
327     =cut
328    
329     sub node_is_up($) {
330     ($NODE{$_[0]} or return)->{transport}
331     ? 1 : 0
332     }
333    
334 root 1.3 =item known_nodes
335    
336 root 1.71 #TODO#
337 root 1.26 Returns the node IDs of all nodes currently known to this node, including
338     itself and nodes not currently connected.
339 root 1.3
340     =cut
341    
342 root 1.49 sub known_nodes() {
343 root 1.26 map $_->{id}, values %NODE
344 root 1.3 }
345    
346     =item up_nodes
347    
348 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
349     the node itself).
350 root 1.3
351     =cut
352    
353 root 1.49 sub up_nodes() {
354 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
355 root 1.3 }
356    
357 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
358 root 1.3
359 root 1.27 Registers a callback that is called each time a node goes up (a connection
360     is established) or down (the connection is lost).
361 root 1.3
362     Node up messages can only be followed by node down messages for the same
363     node, and vice versa.
364    
365 root 1.71 Note that monitoring a node is usually better done by monitoring its node
366 root 1.27 port. This function is mainly of interest to modules that are concerned
367     about the network topology and low-level connection handling.
368    
369     Callbacks I<must not> block and I<should not> send any messages.
370    
371     The function returns an optional guard which can be used to unregister
372 root 1.3 the monitoring callback again.
373    
374 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
375     or go up (and down).
376    
377     newnode $_, 1 for up_nodes;
378     mon_nodes \&newnode;
379    
380 root 1.3 =cut
381    
382     our %MON_NODES;
383    
384     sub mon_nodes($) {
385     my ($cb) = @_;
386    
387     $MON_NODES{$cb+0} = $cb;
388    
389 root 1.62 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
390 root 1.3 }
391    
392     sub _inject_nodeevent($$;@) {
393 root 1.16 my ($node, $up, @reason) = @_;
394 root 1.3
395     for my $cb (values %MON_NODES) {
396 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
397 root 1.16 or $WARN->(1, $@);
398 root 1.3 }
399 root 1.16
400 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
401 root 1.3 }
402    
403     #############################################################################
404 root 1.1 # self node code
405    
406 root 1.67 sub _kill {
407     my $port = shift;
408    
409     delete $PORT{$port}
410     or return; # killing nonexistent ports is O.K.
411     delete $PORT_DATA{$port};
412    
413     my $mon = delete $LMON{$port}
414     or !@_
415     or $WARN->(2, "unmonitored local port $port died with reason: @_");
416    
417     $_->(@_) for values %$mon;
418     }
419    
420     sub _monitor {
421     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
422     unless exists $PORT{$_[1]};
423    
424     $LMON{$_[1]}{$_[2]+0} = $_[2];
425     }
426    
427     sub _unmonitor {
428 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
429     if exists $LMON{$_[1]};
430 root 1.67 }
431    
432 root 1.1 our %node_req = (
433     # internal services
434    
435     # monitoring
436 root 1.65 mon0 => sub { # stop monitoring a port for another node
437 root 1.1 my $portid = shift;
438 root 1.67 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
439 root 1.1 },
440 root 1.65 mon1 => sub { # start monitoring a port for another node
441 root 1.1 my $portid = shift;
442 root 1.67 Scalar::Util::weaken (my $node = $SRCNODE);
443     _monitor undef, $portid, $node->{rmon}{$portid} = sub {
444 root 1.58 delete $node->{rmon}{$portid};
445 root 1.65 $node->send (["", kil0 => $portid, @_])
446 root 1.59 if $node && $node->{transport};
447 root 1.67 };
448 root 1.1 },
449 root 1.65 # another node has killed a monitored port
450     kil0 => sub {
451 root 1.1 my $cbs = delete $SRCNODE->{lmon}{+shift}
452     or return;
453    
454     $_->(@_) for @$cbs;
455     },
456    
457 root 1.18 # "public" services - not actually public
458 root 1.1
459 root 1.65 # another node wants to kill a local port
460 root 1.66 kil => \&_kill,
461 root 1.65
462 root 1.1 # relay message to another node / generic echo
463 root 1.15 snd => \&snd,
464 root 1.27 snd_multiple => sub {
465 root 1.1 snd @$_ for @_
466     },
467    
468 root 1.4 # informational
469     info => sub {
470     snd @_, $NODE;
471     },
472     known_nodes => sub {
473     snd @_, known_nodes;
474     },
475     up_nodes => sub {
476     snd @_, up_nodes;
477     },
478    
479 root 1.30 # random utilities
480 root 1.1 eval => sub {
481 root 1.50 my @res = do { package main; eval shift };
482 root 1.1 snd @_, "$@", @res if @_;
483     },
484     time => sub {
485     snd @_, AE::time;
486     },
487     devnull => sub {
488     #
489     },
490 root 1.15 "" => sub {
491 root 1.27 # empty messages are keepalives or similar devnull-applications
492 root 1.15 },
493 root 1.1 );
494    
495 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
496 root 1.1 $PORT{""} = sub {
497     my $tag = shift;
498     eval { &{ $node_req{$tag} ||= load_func $tag } };
499 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
500 root 1.1 };
501    
502 root 1.69 #############################################################################
503 root 1.71 # seed management, try to keep connections to all seeds at all times
504 root 1.69
505 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
506     our %NODE_SEED; # map node ID to seed ID
507 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
508     our $SEED_WATCHER;
509 root 1.71 our $SEED_RETRY;
510 root 1.69
511     sub seed_connect {
512     my ($seed) = @_;
513    
514     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
515     or Carp::croak "$seed: unparsable seed address";
516    
517     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
518    
519 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
520     $host, $port,
521     on_greeted => sub {
522     # called after receiving remote greeting, learn remote node name
523    
524     # we rely on untrusted data here (the remote node name) this is
525     # hopefully ok, as this can at most be used for DOSing, which is easy
526     # when you can do MITM anyway.
527    
528     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
529     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
530 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
531 root 1.71 delete $SEED_NODE{$seed};
532     delete $NODE_SEED{$seed};
533     } else {
534     $SEED_NODE{$seed} = $_[0]{remote_node};
535     $NODE_SEED{$_[0]{remote_node}} = $seed;
536     }
537     },
538     on_destroy => sub {
539     delete $SEED_CONNECT{$seed};
540     },
541 root 1.69 sub {
542     $SEED_CONNECT{$seed} = 1;
543 root 1.71 }
544 root 1.69 ;
545     }
546    
547     sub seed_all {
548     # my $next = List::Util::max 1,
549     # $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
550     # * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
551     # - rand;
552    
553     my @seeds = grep {
554     !exists $SEED_CONNECT{$_}
555     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
556     } keys %SEED_NODE;
557    
558     if (@seeds) {
559 root 1.70 # start connection attempt for every seed we are not connected to yet
560 root 1.69 seed_connect $_
561     for @seeds;
562 root 1.71
563     $SEED_RETRY = $SEED_RETRY * 2 + rand;
564     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
565     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
566    
567     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
568    
569 root 1.69 } else {
570 root 1.71 # all seeds connected or connecting, no need to restart timer
571 root 1.69 undef $SEED_WATCHER;
572     }
573     }
574    
575     sub seed_again {
576 root 1.71 $SEED_RETRY = 1;
577     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
578 root 1.69 }
579    
580     # sets new seed list, starts connecting
581     sub set_seeds(@) {
582     %SEED_NODE = ();
583 root 1.71 %NODE_SEED = ();
584     %SEED_CONNECT = ();
585    
586 root 1.69 @SEED_NODE{@_} = ();
587    
588 root 1.71 seed_again;#d#
589     seed_all;
590     }
591    
592     mon_nodes sub {
593     # if we lost the connection to a seed node, make sure we are seeding
594     seed_again
595     if !$_[1] && exists $NODE_SEED{$_[0]};
596     };
597    
598     #############################################################################
599     # talk with/to global nodes
600    
601 root 1.72 # protocol messages:
602     #
603 root 1.73 # sent by all slave nodes (slave to master)
604     # g_slave database - make other global node master of the sender
605 root 1.72 #
606 root 1.73 # sent by any node to global nodes
607     # g_set database - set whole database
608     # g_add family key val - add/replace key to database
609     # g_del family key - delete key from database
610     # g_get family key reply... - send reply with data
611     #
612     # send by global nodes
613     # g_global - node became global, similar to global=1 greeting
614     #
615     # database families
616     # "'l" -> node -> listeners
617     # "'g" -> node -> undef
618     # ...
619 root 1.72 #
620    
621 root 1.73 # used on all nodes:
622     our $MASTER; # the global node we bind ourselves to, unless we are global ourselves
623 root 1.71 our $GLOBAL_MON;
624     our $GLOBAL_TIMER;
625 root 1.73 our %LOCAL_DB; # this node database
626 root 1.71
627 root 1.73 # used on global nodes:
628     our %GLOBAL_DB; # all local databases, merged
629     our %LOCAL_DBS; # local databases of all global nodes
630     our %GLOBAL_DBS; # global db
631     our %GLOBAL_SLAVE; # nodes that are our slaves
632 root 1.71
633 root 1.73 #our $sv_ne_json = JSON::XS->new->canonical;
634     #
635     #sub sv_ne($$) {
636     # $sv_ne_json->encode ($_[0]) ne $sv_ne_json->encode ($_[1])
637     #}
638 root 1.71
639 root 1.73 # should be only on globals, but...
640     $node_req{g_global} = sub {
641     if ($GLOBAL) {
642     &g_clr ($SRCNODE->{id});
643     $SRCNODE->{transport}{remote_greeting}{global} = 1;
644     delete $GLOBAL_SLAVE{$SRCNODE->{id}};
645     snd $SRCNODE->{id}, g_set => \%LOCAL_DB;
646     }
647     };
648 root 1.71
649 root 1.73 sub other_globals() {
650     grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
651 root 1.71 }
652    
653 root 1.73 # local database management
654     sub ldb_set($$;$) {
655     warn "ldb_set<@_>\n";#d#
656     if (@_ > 2) {
657     $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
658     snd $MASTER, g_add => $_[0] => $_[1] => $_[2]
659     if defined $MASTER;
660     } else {
661     delete $LOCAL_DB{$_[0]}{$_[1]};
662     snd $MASTER, g_del => $_[0] => $_[1]
663     if defined $MASTER;
664 root 1.71 }
665     }
666    
667 root 1.73 #############################################################################
668     # master selection
669    
670     # master requests
671     our %GLOBAL_REQ; # $id => \@req
672 root 1.71
673 root 1.74 sub global_req_add {
674     my $id = shift;
675 root 1.71
676 root 1.74 return if exists $GLOBAL_REQ{$id};
677    
678     $GLOBAL_REQ{$id} = [@_];
679 root 1.71
680 root 1.73 snd $MASTER, @_
681     if $MASTER;
682 root 1.74 }
683 root 1.71
684 root 1.74 sub global_req_del {
685     delete $GLOBAL_REQ{$_[0]};
686     }
687    
688     sub g_find {
689     global_req_add "g_find $_[0]", g_find => $_[0];
690 root 1.73 }
691 root 1.71
692 root 1.73 # reply for g_find started in Node.pm
693     $node_req{g_found} = sub {
694 root 1.74 global_req_del "g_find $_[0]";
695    
696 root 1.73 @{ $_[1] } or return; # d'oh
697 root 1.72
698 root 1.73 my $node = $NODE{$_[0]} or return;
699 root 1.71
700 root 1.73 local $GLOBAL_DB{"'l"}{$_[0]} = $_[1]; #d# UGLY
701 root 1.71
702 root 1.73 delete $node->{connect_w};
703     $node->connect;
704 root 1.71 };
705    
706 root 1.73 sub master_set {
707     $MASTER = $_[0];
708 root 1.71
709 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
710 root 1.71
711 root 1.73 # (re-)send queued requests
712     snd $MASTER, @$_
713     for values %GLOBAL_REQ;
714     }
715 root 1.71
716 root 1.72 sub master_search {
717 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
718     for (keys %NODE_SEED) {
719 root 1.72 if (node_is_up $_) {
720     master_set $_;
721     return;
722 root 1.71 }
723 root 1.72 }
724 root 1.71
725 root 1.72 $GLOBAL_MON = mon_nodes sub {
726     return unless $_[1]; # we are only interested in node-ups
727     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
728 root 1.71
729 root 1.72 master_set $_[0];
730 root 1.71
731 root 1.72 $GLOBAL_MON = mon_nodes sub {
732     if ($_[0] eq $MASTER && !$_[1]) {
733     undef $MASTER;
734     master_search ();
735     }
736 root 1.71 };
737 root 1.72 };
738 root 1.71 }
739    
740 root 1.73 # other node wants to make us the master
741     $node_req{g_slave} = sub {
742     my ($db) = @_;
743    
744     warn "slave1\n";#d#
745 root 1.71
746 root 1.73 require AnyEvent::MP::Global;
747     &{ $node_req{g_slave} };
748 root 1.71 };
749    
750 root 1.73 #$node_req{g_reply} = sub {
751     # my $id = shift;
752     #
753     # my $cb = delete $GLOBAL_REQ{$id}
754     # or return;
755     #
756     # $cb->[0]->(@_);
757     #};
758 root 1.71
759 root 1.73 #############################################################################
760 root 1.71
761 root 1.73 #############################################################################
762 root 1.71
763 root 1.73 # $WARN->(1, "$SRCNODE->{id} treats us as global node, but we aren't");
764 root 1.71
765 root 1.69 #############################################################################
766     # configure
767    
768     sub _nodename {
769     require POSIX;
770     (POSIX::uname ())[1]
771     }
772    
773     sub _resolve($) {
774     my ($nodeid) = @_;
775    
776     my $cv = AE::cv;
777     my @res;
778    
779     $cv->begin (sub {
780     my %seen;
781     my @refs;
782     for (sort { $a->[0] <=> $b->[0] } @res) {
783     push @refs, $_->[1] unless $seen{$_->[1]}++
784     }
785     shift->send (@refs);
786     });
787    
788     my $idx;
789     for my $t (split /,/, $nodeid) {
790     my $pri = ++$idx;
791    
792     $t = length $t ? _nodename . ":$t" : _nodename
793     if $t =~ /^\d*$/;
794    
795     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
796     or Carp::croak "$t: unparsable transport descriptor";
797    
798     $port = "0" if $port eq "*";
799    
800     if ($host eq "*") {
801     $cv->begin;
802     # use fork_call, as Net::Interface is big, and we need it rarely.
803     require AnyEvent::Util;
804     AnyEvent::Util::fork_call (
805     sub {
806     my @addr;
807    
808     require Net::Interface;
809    
810     for my $if (Net::Interface->interfaces) {
811     # we statically lower-prioritise ipv6 here, TODO :()
812     for $_ ($if->address (Net::Interface::AF_INET ())) {
813     next if /^\x7f/; # skip localhost etc.
814     push @addr, $_;
815     }
816     for ($if->address (Net::Interface::AF_INET6 ())) {
817     #next if $if->scope ($_) <= 2;
818     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
819     push @addr, $_;
820     }
821    
822     }
823     @addr
824     }, sub {
825     for my $ip (@_) {
826     push @res, [
827     $pri += 1e-5,
828     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
829     ];
830     }
831     $cv->end;
832     }
833     );
834     } else {
835     $cv->begin;
836     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
837     for (@_) {
838     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
839     push @res, [
840     $pri += 1e-5,
841     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
842     ];
843     }
844     $cv->end;
845     };
846     }
847     }
848    
849     $cv->end;
850    
851     $cv
852     }
853    
854     sub configure(@) {
855     unshift @_, "profile" if @_ & 1;
856     my (%kv) = @_;
857    
858     delete $NODE{$NODE}; # we do not support doing stuff before configure
859     _init_names;
860    
861     my $profile = delete $kv{profile};
862    
863     $profile = _nodename
864     unless defined $profile;
865    
866     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
867    
868 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
869 root 1.69
870     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
871    
872 root 1.72 $NODE = $node;
873     $NODE =~ s%/$%/$RUNIQ%;
874 root 1.69
875     $NODE{$NODE} = $NODE{""};
876     $NODE{$NODE}{id} = $NODE;
877    
878     my $seeds = $CONFIG->{seeds};
879     my $binds = $CONFIG->{binds};
880    
881     $binds ||= ["*"];
882    
883     $WARN->(8, "node $NODE starting up.");
884    
885     $LISTENER = [];
886     %LISTENER = ();
887    
888     for (map _resolve $_, @$binds) {
889     for my $bind ($_->recv) {
890     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
891     or Carp::croak "$bind: unparsable local bind address";
892    
893     my $listener = AnyEvent::MP::Transport::mp_server
894     $host,
895     $port,
896     prepare => sub {
897     my (undef, $host, $port) = @_;
898     $bind = AnyEvent::Socket::format_hostport $host, $port;
899     0
900     },
901     ;
902     $LISTENER{$bind} = $listener;
903     push @$LISTENER, $bind;
904     }
905     }
906    
907 root 1.73 ldb_set "'l" => $NODE => $LISTENER;
908    
909 root 1.69 $WARN->(8, "node listens on [@$LISTENER].");
910    
911     # the global service is mandatory currently
912 root 1.71 #require AnyEvent::MP::Global;
913 root 1.69
914     # connect to all seednodes
915     set_seeds map $_->recv, map _resolve $_, @$seeds;
916    
917 root 1.73 master_search;
918    
919 root 1.71 if ($NODE eq "atha") {;#d#
920 root 1.72 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
921 root 1.71 }
922    
923 root 1.69 for (@{ $CONFIG->{services} }) {
924     if (ref) {
925     my ($func, @args) = @$_;
926     (load_func $func)->(@args);
927     } elsif (s/::$//) {
928     eval "require $_";
929     die $@ if $@;
930     } else {
931     (load_func $_)->();
932     }
933     }
934     }
935    
936 root 1.1 =back
937    
938     =head1 SEE ALSO
939    
940     L<AnyEvent::MP>.
941    
942     =head1 AUTHOR
943    
944     Marc Lehmann <schmorp@schmorp.de>
945     http://home.schmorp.de/
946    
947     =cut
948    
949     1
950