ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.75
Committed: Thu Mar 1 18:48:02 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.74: +0 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.71 our @EXPORT_OK = qw(
39     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40     );
41    
42 root 1.1 our @EXPORT = qw(
43 root 1.21 add_node load_func snd_to_func snd_on eval_on
44 root 1.1
45 root 1.34 NODE $NODE node_of snd kil port_is_local
46     configure
47 root 1.46 up_nodes mon_nodes node_is_up
48 root 1.1 );
49    
50 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
51 root 1.1
52 root 1.27 This value is called with an error or warning message, when e.g. a
53     connection could not be created, authorisation failed and so on.
54    
55     It I<must not> block or send messages -queue it and use an idle watcher if
56     you need to do any of these things.
57 root 1.1
58 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
59 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
60     node connectivity and services, C<8> for debugging messages and C<9> for
61     tracing messages.
62    
63 root 1.1 The default simply logs the message to STDERR.
64    
65 root 1.44 =item @AnyEvent::MP::Kernel::WARN
66    
67     All code references in this array are called for every log message, from
68     the default C<$WARN> handler. This is an easy way to tie into the log
69     messages without disturbing others.
70    
71 root 1.1 =cut
72    
73 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
74 root 1.44 our @WARN;
75     our $WARN = sub {
76     &$_ for @WARN;
77 root 1.29
78     return if $WARNLEVEL < $_[0];
79    
80 root 1.16 my ($level, $msg) = @_;
81    
82 root 1.1 $msg =~ s/\n$//;
83 root 1.16
84     printf STDERR "%s <%d> %s\n",
85     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
86     $level,
87     $msg;
88 root 1.1 };
89    
90 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
91    
92     The maximum level at which warning messages will be printed to STDERR by
93     the default warn handler.
94    
95     =cut
96    
97 root 1.6 sub load_func($) {
98     my $func = $_[0];
99    
100     unless (defined &$func) {
101     my $pkg = $func;
102     do {
103     $pkg =~ s/::[^:]+$//
104 root 1.63 or return sub { die "unable to resolve function '$func'" };
105 root 1.60
106     local $@;
107 root 1.61 unless (eval "require $pkg; 1") {
108     my $error = $@;
109     $error =~ /^Can't locate .*.pm in \@INC \(/
110     or return sub { die $error };
111     }
112 root 1.6 } until defined &$func;
113     }
114    
115     \&$func
116     }
117    
118 root 1.1 sub nonce($) {
119     my $nonce;
120    
121     if (open my $fh, "</dev/urandom") {
122     sysread $fh, $nonce, $_[0];
123     } else {
124     # shit...
125     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
126     }
127    
128     $nonce
129     }
130    
131 root 1.21 sub alnumbits($) {
132 root 1.1 my $data = $_[0];
133    
134     if (eval "use Math::GMP 2.05; 1") {
135     $data = Math::GMP::get_str_gmp (
136     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
137     62
138     );
139     } else {
140     $data = MIME::Base64::encode_base64 $data, "";
141     $data =~ s/=//;
142 root 1.31 $data =~ s/x/x0/g;
143     $data =~ s/\//x1/g;
144     $data =~ s/\+/x2/g;
145 root 1.1 }
146    
147     $data
148     }
149    
150     sub gen_uniq {
151 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
152 root 1.1 }
153    
154 root 1.20 our $CONFIG; # this node's configuration
155 root 1.21
156 root 1.64 our $RUNIQ; # remote uniq value
157     our $UNIQ; # per-process/node unique cookie
158     our $NODE;
159     our $ID = "a";
160 root 1.1
161     our %NODE; # node id to transport mapping, or "undef", for local node
162     our (%PORT, %PORT_DATA); # local ports
163    
164 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
165 root 1.1 our %LMON; # monitored _local_ ports
166    
167 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
168 root 1.1 our %LISTENER;
169 root 1.21 our $LISTENER; # our listeners, as arrayref
170 root 1.1
171     our $SRCNODE; # holds the sending node during _inject
172    
173 root 1.69 sub _init_names {
174 root 1.64 $RUNIQ = alnumbits nonce 96/8;
175     $UNIQ = gen_uniq;
176     $NODE = "anon/$RUNIQ";
177     }
178    
179 root 1.69 _init_names;
180 root 1.64
181 root 1.1 sub NODE() {
182     $NODE
183     }
184    
185     sub node_of($) {
186 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
187 root 1.1
188 root 1.21 $node
189 root 1.1 }
190    
191 root 1.17 BEGIN {
192     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
193     ? sub () { 1 }
194     : sub () { 0 };
195     }
196 root 1.1
197 root 1.42 our $DELAY_TIMER;
198     our @DELAY_QUEUE;
199    
200     sub _delay_run {
201 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
202 root 1.42 }
203    
204     sub delay($) {
205     push @DELAY_QUEUE, shift;
206     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
207     }
208    
209 root 1.1 sub _inject {
210 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
211 root 1.1 &{ $PORT{+shift} or return };
212     }
213    
214 root 1.20 # this function adds a node-ref, so you can send stuff to it
215     # it is basically the central routing component.
216 root 1.1 sub add_node {
217 root 1.21 my ($node) = @_;
218 root 1.1
219 root 1.71 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
220 root 1.13 }
221    
222 root 1.1 sub snd(@) {
223 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
224 root 1.1
225 root 1.48 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
226 root 1.1
227 root 1.49 defined $nodeid #d#UGLY
228     or Carp::croak "'undef' is not a valid node ID/port ID";
229    
230 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
231 root 1.2 ->{send} (["$portid", @_]);
232 root 1.1 }
233    
234 root 1.17 =item $is_local = port_is_local $port
235    
236     Returns true iff the port is a local port.
237    
238     =cut
239    
240     sub port_is_local($) {
241 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
242 root 1.17
243 root 1.21 $NODE{$nodeid} == $NODE{""}
244 root 1.17 }
245    
246 root 1.18 =item snd_to_func $node, $func, @args
247 root 1.11
248 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
249 root 1.11 this function with the given arguments on that node.
250    
251 root 1.20 This function can be used to implement C<spawn>-like interfaces.
252 root 1.11
253     =cut
254    
255 root 1.18 sub snd_to_func($$;@) {
256 root 1.21 my $nodeid = shift;
257 root 1.11
258 root 1.41 # on $NODE, we artificially delay... (for spawn)
259     # this is very ugly - maybe we should simply delay ALL messages,
260     # to avoid deep recursion issues. but that's so... slow...
261 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
262     if $nodeid ne $NODE;
263    
264 root 1.49 defined $nodeid #d#UGLY
265     or Carp::croak "'undef' is not a valid node ID/port ID";
266    
267 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
268 root 1.11 }
269    
270 root 1.18 =item snd_on $node, @msg
271    
272     Executes C<snd> with the given C<@msg> (which must include the destination
273     port) on the given node.
274    
275     =cut
276    
277     sub snd_on($@) {
278     my $node = shift;
279     snd $node, snd => @_;
280     }
281    
282 root 1.29 =item eval_on $node, $string[, @reply]
283 root 1.18
284 root 1.29 Evaluates the given string as Perl expression on the given node. When
285     @reply is specified, then it is used to construct a reply message with
286     C<"$@"> and any results from the eval appended.
287 root 1.18
288     =cut
289    
290 root 1.29 sub eval_on($$;@) {
291 root 1.18 my $node = shift;
292     snd $node, eval => @_;
293     }
294    
295 root 1.1 sub kil(@) {
296 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
297 root 1.1
298     length $portid
299 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
300 root 1.1
301 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
302 root 1.1 ->kill ("$portid", @_);
303     }
304    
305     #############################################################################
306 root 1.6 # node monitoring and info
307 root 1.3
308 root 1.21 =item node_is_known $nodeid
309 root 1.13
310 root 1.71 #TODO#
311     Returns true iff the given node is currently known to this node.
312 root 1.13
313     =cut
314    
315     sub node_is_known($) {
316     exists $NODE{$_[0]}
317     }
318    
319 root 1.21 =item node_is_up $nodeid
320 root 1.13
321     Returns true if the given node is "up", that is, the kernel thinks it has
322     a working connection to it.
323    
324 root 1.69 If the node is known (to this local node) but not currently connected,
325     returns C<0>. If the node is not known, returns C<undef>.
326 root 1.13
327     =cut
328    
329     sub node_is_up($) {
330     ($NODE{$_[0]} or return)->{transport}
331     ? 1 : 0
332     }
333    
334 root 1.3 =item known_nodes
335    
336 root 1.71 #TODO#
337 root 1.26 Returns the node IDs of all nodes currently known to this node, including
338     itself and nodes not currently connected.
339 root 1.3
340     =cut
341    
342 root 1.49 sub known_nodes() {
343 root 1.26 map $_->{id}, values %NODE
344 root 1.3 }
345    
346     =item up_nodes
347    
348 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
349     the node itself).
350 root 1.3
351     =cut
352    
353 root 1.49 sub up_nodes() {
354 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
355 root 1.3 }
356    
357 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
358 root 1.3
359 root 1.27 Registers a callback that is called each time a node goes up (a connection
360     is established) or down (the connection is lost).
361 root 1.3
362     Node up messages can only be followed by node down messages for the same
363     node, and vice versa.
364    
365 root 1.71 Note that monitoring a node is usually better done by monitoring its node
366 root 1.27 port. This function is mainly of interest to modules that are concerned
367     about the network topology and low-level connection handling.
368    
369     Callbacks I<must not> block and I<should not> send any messages.
370    
371     The function returns an optional guard which can be used to unregister
372 root 1.3 the monitoring callback again.
373    
374 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
375     or go up (and down).
376    
377     newnode $_, 1 for up_nodes;
378     mon_nodes \&newnode;
379    
380 root 1.3 =cut
381    
382     our %MON_NODES;
383    
384     sub mon_nodes($) {
385     my ($cb) = @_;
386    
387     $MON_NODES{$cb+0} = $cb;
388    
389 root 1.62 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
390 root 1.3 }
391    
392     sub _inject_nodeevent($$;@) {
393 root 1.16 my ($node, $up, @reason) = @_;
394 root 1.3
395     for my $cb (values %MON_NODES) {
396 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
397 root 1.16 or $WARN->(1, $@);
398 root 1.3 }
399 root 1.16
400 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
401 root 1.3 }
402    
403     #############################################################################
404 root 1.1 # self node code
405    
406 root 1.67 sub _kill {
407     my $port = shift;
408    
409     delete $PORT{$port}
410     or return; # killing nonexistent ports is O.K.
411     delete $PORT_DATA{$port};
412    
413     my $mon = delete $LMON{$port}
414     or !@_
415     or $WARN->(2, "unmonitored local port $port died with reason: @_");
416    
417     $_->(@_) for values %$mon;
418     }
419    
420     sub _monitor {
421     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
422     unless exists $PORT{$_[1]};
423    
424     $LMON{$_[1]}{$_[2]+0} = $_[2];
425     }
426    
427     sub _unmonitor {
428 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
429     if exists $LMON{$_[1]};
430 root 1.67 }
431    
432 root 1.1 our %node_req = (
433     # internal services
434    
435     # monitoring
436 root 1.65 mon0 => sub { # stop monitoring a port for another node
437 root 1.1 my $portid = shift;
438 root 1.67 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
439 root 1.1 },
440 root 1.65 mon1 => sub { # start monitoring a port for another node
441 root 1.1 my $portid = shift;
442 root 1.67 Scalar::Util::weaken (my $node = $SRCNODE);
443     _monitor undef, $portid, $node->{rmon}{$portid} = sub {
444 root 1.58 delete $node->{rmon}{$portid};
445 root 1.65 $node->send (["", kil0 => $portid, @_])
446 root 1.59 if $node && $node->{transport};
447 root 1.67 };
448 root 1.1 },
449 root 1.65 # another node has killed a monitored port
450     kil0 => sub {
451 root 1.1 my $cbs = delete $SRCNODE->{lmon}{+shift}
452     or return;
453    
454     $_->(@_) for @$cbs;
455     },
456    
457 root 1.18 # "public" services - not actually public
458 root 1.1
459 root 1.65 # another node wants to kill a local port
460 root 1.66 kil => \&_kill,
461 root 1.65
462 root 1.1 # relay message to another node / generic echo
463 root 1.15 snd => \&snd,
464 root 1.27 snd_multiple => sub {
465 root 1.1 snd @$_ for @_
466     },
467    
468 root 1.4 # informational
469     info => sub {
470     snd @_, $NODE;
471     },
472     known_nodes => sub {
473     snd @_, known_nodes;
474     },
475     up_nodes => sub {
476     snd @_, up_nodes;
477     },
478    
479 root 1.30 # random utilities
480 root 1.1 eval => sub {
481 root 1.50 my @res = do { package main; eval shift };
482 root 1.1 snd @_, "$@", @res if @_;
483     },
484     time => sub {
485     snd @_, AE::time;
486     },
487     devnull => sub {
488     #
489     },
490 root 1.15 "" => sub {
491 root 1.27 # empty messages are keepalives or similar devnull-applications
492 root 1.15 },
493 root 1.1 );
494    
495 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
496 root 1.1 $PORT{""} = sub {
497     my $tag = shift;
498     eval { &{ $node_req{$tag} ||= load_func $tag } };
499 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
500 root 1.1 };
501    
502 root 1.69 #############################################################################
503 root 1.71 # seed management, try to keep connections to all seeds at all times
504 root 1.69
505 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
506     our %NODE_SEED; # map node ID to seed ID
507 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
508     our $SEED_WATCHER;
509 root 1.71 our $SEED_RETRY;
510 root 1.69
511     sub seed_connect {
512     my ($seed) = @_;
513    
514     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
515     or Carp::croak "$seed: unparsable seed address";
516    
517     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
518    
519 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
520     $host, $port,
521     on_greeted => sub {
522     # called after receiving remote greeting, learn remote node name
523    
524     # we rely on untrusted data here (the remote node name) this is
525     # hopefully ok, as this can at most be used for DOSing, which is easy
526     # when you can do MITM anyway.
527    
528     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
529     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
530 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
531 root 1.71 delete $SEED_NODE{$seed};
532     delete $NODE_SEED{$seed};
533     } else {
534     $SEED_NODE{$seed} = $_[0]{remote_node};
535     $NODE_SEED{$_[0]{remote_node}} = $seed;
536     }
537     },
538     on_destroy => sub {
539     delete $SEED_CONNECT{$seed};
540     },
541 root 1.69 sub {
542     $SEED_CONNECT{$seed} = 1;
543 root 1.71 }
544 root 1.69 ;
545     }
546    
547     sub seed_all {
548     # my $next = List::Util::max 1,
549     # $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
550     # * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
551     # - rand;
552    
553     my @seeds = grep {
554     !exists $SEED_CONNECT{$_}
555     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
556     } keys %SEED_NODE;
557    
558     if (@seeds) {
559 root 1.70 # start connection attempt for every seed we are not connected to yet
560 root 1.69 seed_connect $_
561     for @seeds;
562 root 1.71
563     $SEED_RETRY = $SEED_RETRY * 2 + rand;
564     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
565     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
566    
567     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
568    
569 root 1.69 } else {
570 root 1.71 # all seeds connected or connecting, no need to restart timer
571 root 1.69 undef $SEED_WATCHER;
572     }
573     }
574    
575     sub seed_again {
576 root 1.71 $SEED_RETRY = 1;
577     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
578 root 1.69 }
579    
580     # sets new seed list, starts connecting
581     sub set_seeds(@) {
582     %SEED_NODE = ();
583 root 1.71 %NODE_SEED = ();
584     %SEED_CONNECT = ();
585    
586 root 1.69 @SEED_NODE{@_} = ();
587    
588 root 1.71 seed_again;#d#
589     seed_all;
590     }
591    
592     mon_nodes sub {
593     # if we lost the connection to a seed node, make sure we are seeding
594     seed_again
595     if !$_[1] && exists $NODE_SEED{$_[0]};
596     };
597    
598     #############################################################################
599     # talk with/to global nodes
600    
601 root 1.72 # protocol messages:
602     #
603 root 1.73 # sent by all slave nodes (slave to master)
604     # g_slave database - make other global node master of the sender
605 root 1.72 #
606 root 1.73 # sent by any node to global nodes
607     # g_set database - set whole database
608     # g_add family key val - add/replace key to database
609     # g_del family key - delete key from database
610     # g_get family key reply... - send reply with data
611     #
612     # send by global nodes
613     # g_global - node became global, similar to global=1 greeting
614     #
615     # database families
616     # "'l" -> node -> listeners
617     # "'g" -> node -> undef
618     # ...
619 root 1.72 #
620    
621 root 1.73 # used on all nodes:
622     our $MASTER; # the global node we bind ourselves to, unless we are global ourselves
623 root 1.71 our $GLOBAL_MON;
624     our $GLOBAL_TIMER;
625 root 1.73 our %LOCAL_DB; # this node database
626 root 1.71
627 root 1.73 # used on global nodes:
628     our %GLOBAL_DB; # all local databases, merged
629     our %LOCAL_DBS; # local databases of all global nodes
630     our %GLOBAL_DBS; # global db
631     our %GLOBAL_SLAVE; # nodes that are our slaves
632 root 1.71
633 root 1.73 #our $sv_ne_json = JSON::XS->new->canonical;
634     #
635     #sub sv_ne($$) {
636     # $sv_ne_json->encode ($_[0]) ne $sv_ne_json->encode ($_[1])
637     #}
638 root 1.71
639 root 1.73 # should be only on globals, but...
640     $node_req{g_global} = sub {
641     if ($GLOBAL) {
642     &g_clr ($SRCNODE->{id});
643     $SRCNODE->{transport}{remote_greeting}{global} = 1;
644     delete $GLOBAL_SLAVE{$SRCNODE->{id}};
645     snd $SRCNODE->{id}, g_set => \%LOCAL_DB;
646     }
647     };
648 root 1.71
649 root 1.73 sub other_globals() {
650     grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
651 root 1.71 }
652    
653 root 1.73 # local database management
654     sub ldb_set($$;$) {
655     warn "ldb_set<@_>\n";#d#
656     if (@_ > 2) {
657     $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
658     snd $MASTER, g_add => $_[0] => $_[1] => $_[2]
659     if defined $MASTER;
660     } else {
661     delete $LOCAL_DB{$_[0]}{$_[1]};
662     snd $MASTER, g_del => $_[0] => $_[1]
663     if defined $MASTER;
664 root 1.71 }
665     }
666    
667 root 1.73 #############################################################################
668     # master selection
669    
670     # master requests
671     our %GLOBAL_REQ; # $id => \@req
672 root 1.71
673 root 1.74 sub global_req_add {
674     my $id = shift;
675 root 1.71
676 root 1.74 return if exists $GLOBAL_REQ{$id};
677    
678     $GLOBAL_REQ{$id} = [@_];
679 root 1.71
680 root 1.73 snd $MASTER, @_
681     if $MASTER;
682 root 1.74 }
683 root 1.71
684 root 1.74 sub global_req_del {
685     delete $GLOBAL_REQ{$_[0]};
686     }
687    
688     sub g_find {
689     global_req_add "g_find $_[0]", g_find => $_[0];
690 root 1.73 }
691 root 1.71
692 root 1.73 # reply for g_find started in Node.pm
693     $node_req{g_found} = sub {
694 root 1.74 global_req_del "g_find $_[0]";
695    
696 root 1.73 @{ $_[1] } or return; # d'oh
697 root 1.72
698 root 1.73 my $node = $NODE{$_[0]} or return;
699 root 1.71
700 root 1.73 local $GLOBAL_DB{"'l"}{$_[0]} = $_[1]; #d# UGLY
701     $node->connect;
702 root 1.71 };
703    
704 root 1.73 sub master_set {
705     $MASTER = $_[0];
706 root 1.71
707 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
708 root 1.71
709 root 1.73 # (re-)send queued requests
710     snd $MASTER, @$_
711     for values %GLOBAL_REQ;
712     }
713 root 1.71
714 root 1.72 sub master_search {
715 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
716     for (keys %NODE_SEED) {
717 root 1.72 if (node_is_up $_) {
718     master_set $_;
719     return;
720 root 1.71 }
721 root 1.72 }
722 root 1.71
723 root 1.72 $GLOBAL_MON = mon_nodes sub {
724     return unless $_[1]; # we are only interested in node-ups
725     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
726 root 1.71
727 root 1.72 master_set $_[0];
728 root 1.71
729 root 1.72 $GLOBAL_MON = mon_nodes sub {
730     if ($_[0] eq $MASTER && !$_[1]) {
731     undef $MASTER;
732     master_search ();
733     }
734 root 1.71 };
735 root 1.72 };
736 root 1.71 }
737    
738 root 1.73 # other node wants to make us the master
739     $node_req{g_slave} = sub {
740     my ($db) = @_;
741    
742     warn "slave1\n";#d#
743 root 1.71
744 root 1.73 require AnyEvent::MP::Global;
745     &{ $node_req{g_slave} };
746 root 1.71 };
747    
748 root 1.73 #$node_req{g_reply} = sub {
749     # my $id = shift;
750     #
751     # my $cb = delete $GLOBAL_REQ{$id}
752     # or return;
753     #
754     # $cb->[0]->(@_);
755     #};
756 root 1.71
757 root 1.73 #############################################################################
758 root 1.71
759 root 1.73 #############################################################################
760 root 1.71
761 root 1.73 # $WARN->(1, "$SRCNODE->{id} treats us as global node, but we aren't");
762 root 1.71
763 root 1.69 #############################################################################
764     # configure
765    
766     sub _nodename {
767     require POSIX;
768     (POSIX::uname ())[1]
769     }
770    
771     sub _resolve($) {
772     my ($nodeid) = @_;
773    
774     my $cv = AE::cv;
775     my @res;
776    
777     $cv->begin (sub {
778     my %seen;
779     my @refs;
780     for (sort { $a->[0] <=> $b->[0] } @res) {
781     push @refs, $_->[1] unless $seen{$_->[1]}++
782     }
783     shift->send (@refs);
784     });
785    
786     my $idx;
787     for my $t (split /,/, $nodeid) {
788     my $pri = ++$idx;
789    
790     $t = length $t ? _nodename . ":$t" : _nodename
791     if $t =~ /^\d*$/;
792    
793     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
794     or Carp::croak "$t: unparsable transport descriptor";
795    
796     $port = "0" if $port eq "*";
797    
798     if ($host eq "*") {
799     $cv->begin;
800     # use fork_call, as Net::Interface is big, and we need it rarely.
801     require AnyEvent::Util;
802     AnyEvent::Util::fork_call (
803     sub {
804     my @addr;
805    
806     require Net::Interface;
807    
808     for my $if (Net::Interface->interfaces) {
809     # we statically lower-prioritise ipv6 here, TODO :()
810     for $_ ($if->address (Net::Interface::AF_INET ())) {
811     next if /^\x7f/; # skip localhost etc.
812     push @addr, $_;
813     }
814     for ($if->address (Net::Interface::AF_INET6 ())) {
815     #next if $if->scope ($_) <= 2;
816     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
817     push @addr, $_;
818     }
819    
820     }
821     @addr
822     }, sub {
823     for my $ip (@_) {
824     push @res, [
825     $pri += 1e-5,
826     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
827     ];
828     }
829     $cv->end;
830     }
831     );
832     } else {
833     $cv->begin;
834     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
835     for (@_) {
836     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
837     push @res, [
838     $pri += 1e-5,
839     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
840     ];
841     }
842     $cv->end;
843     };
844     }
845     }
846    
847     $cv->end;
848    
849     $cv
850     }
851    
852     sub configure(@) {
853     unshift @_, "profile" if @_ & 1;
854     my (%kv) = @_;
855    
856     delete $NODE{$NODE}; # we do not support doing stuff before configure
857     _init_names;
858    
859     my $profile = delete $kv{profile};
860    
861     $profile = _nodename
862     unless defined $profile;
863    
864     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
865    
866 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
867 root 1.69
868     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
869    
870 root 1.72 $NODE = $node;
871     $NODE =~ s%/$%/$RUNIQ%;
872 root 1.69
873     $NODE{$NODE} = $NODE{""};
874     $NODE{$NODE}{id} = $NODE;
875    
876     my $seeds = $CONFIG->{seeds};
877     my $binds = $CONFIG->{binds};
878    
879     $binds ||= ["*"];
880    
881     $WARN->(8, "node $NODE starting up.");
882    
883     $LISTENER = [];
884     %LISTENER = ();
885    
886     for (map _resolve $_, @$binds) {
887     for my $bind ($_->recv) {
888     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
889     or Carp::croak "$bind: unparsable local bind address";
890    
891     my $listener = AnyEvent::MP::Transport::mp_server
892     $host,
893     $port,
894     prepare => sub {
895     my (undef, $host, $port) = @_;
896     $bind = AnyEvent::Socket::format_hostport $host, $port;
897     0
898     },
899     ;
900     $LISTENER{$bind} = $listener;
901     push @$LISTENER, $bind;
902     }
903     }
904    
905 root 1.73 ldb_set "'l" => $NODE => $LISTENER;
906    
907 root 1.69 $WARN->(8, "node listens on [@$LISTENER].");
908    
909     # the global service is mandatory currently
910 root 1.71 #require AnyEvent::MP::Global;
911 root 1.69
912     # connect to all seednodes
913     set_seeds map $_->recv, map _resolve $_, @$seeds;
914    
915 root 1.73 master_search;
916    
917 root 1.71 if ($NODE eq "atha") {;#d#
918 root 1.72 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
919 root 1.71 }
920    
921 root 1.69 for (@{ $CONFIG->{services} }) {
922     if (ref) {
923     my ($func, @args) = @$_;
924     (load_func $func)->(@args);
925     } elsif (s/::$//) {
926     eval "require $_";
927     die $@ if $@;
928     } else {
929     (load_func $_)->();
930     }
931     }
932     }
933    
934 root 1.1 =back
935    
936     =head1 SEE ALSO
937    
938     L<AnyEvent::MP>.
939    
940     =head1 AUTHOR
941    
942     Marc Lehmann <schmorp@schmorp.de>
943     http://home.schmorp.de/
944    
945     =cut
946    
947     1
948