ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.78
Committed: Fri Mar 2 19:19:21 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.77: +29 -47 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29    
30     use AE ();
31    
32     use AnyEvent::MP::Node;
33     use AnyEvent::MP::Transport;
34    
35     use base "Exporter";
36    
37 root 1.71 our @EXPORT_OK = qw(
38     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
39     );
40    
41 root 1.1 our @EXPORT = qw(
42 root 1.21 add_node load_func snd_to_func snd_on eval_on
43 root 1.1
44 root 1.34 NODE $NODE node_of snd kil port_is_local
45     configure
46 root 1.46 up_nodes mon_nodes node_is_up
47 root 1.1 );
48    
49 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
50 root 1.1
51 root 1.27 This value is called with an error or warning message, when e.g. a
52     connection could not be created, authorisation failed and so on.
53    
54     It I<must not> block or send messages -queue it and use an idle watcher if
55     you need to do any of these things.
56 root 1.1
57 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
58 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
59     node connectivity and services, C<8> for debugging messages and C<9> for
60     tracing messages.
61    
62 root 1.1 The default simply logs the message to STDERR.
63    
64 root 1.44 =item @AnyEvent::MP::Kernel::WARN
65    
66     All code references in this array are called for every log message, from
67     the default C<$WARN> handler. This is an easy way to tie into the log
68     messages without disturbing others.
69    
70 root 1.1 =cut
71    
72 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
73 root 1.44 our @WARN;
74     our $WARN = sub {
75     &$_ for @WARN;
76 root 1.29
77     return if $WARNLEVEL < $_[0];
78    
79 root 1.16 my ($level, $msg) = @_;
80    
81 root 1.1 $msg =~ s/\n$//;
82 root 1.16
83     printf STDERR "%s <%d> %s\n",
84     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
85     $level,
86     $msg;
87 root 1.1 };
88    
89 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
90    
91     The maximum level at which warning messages will be printed to STDERR by
92     the default warn handler.
93    
94     =cut
95    
96 root 1.6 sub load_func($) {
97     my $func = $_[0];
98    
99     unless (defined &$func) {
100     my $pkg = $func;
101     do {
102     $pkg =~ s/::[^:]+$//
103 root 1.63 or return sub { die "unable to resolve function '$func'" };
104 root 1.60
105     local $@;
106 root 1.61 unless (eval "require $pkg; 1") {
107     my $error = $@;
108     $error =~ /^Can't locate .*.pm in \@INC \(/
109     or return sub { die $error };
110     }
111 root 1.6 } until defined &$func;
112     }
113    
114     \&$func
115     }
116    
117 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
118    
119 root 1.1 sub nonce($) {
120 root 1.78 join "", map chr rand 256, 1 .. $_[0]
121 root 1.1 }
122    
123 root 1.78 sub nonce62($) {
124     join "", map $alnum[rand 62], 1 .. $_[0]
125 root 1.1 }
126    
127     sub gen_uniq {
128 root 1.78 my $now = AE::now;
129     (join "",
130     map $alnum[$_],
131     $$ / 62 % 62,
132     $$ % 62,
133     (int $now ) % 62,
134     (int $now * 100) % 62,
135     (int $now * 10000) % 62,
136     ) . nonce62 4;
137 root 1.1 }
138    
139 root 1.20 our $CONFIG; # this node's configuration
140 root 1.21
141 root 1.64 our $RUNIQ; # remote uniq value
142     our $UNIQ; # per-process/node unique cookie
143     our $NODE;
144     our $ID = "a";
145 root 1.1
146     our %NODE; # node id to transport mapping, or "undef", for local node
147     our (%PORT, %PORT_DATA); # local ports
148    
149 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
150 root 1.1 our %LMON; # monitored _local_ ports
151    
152 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
153 root 1.1 our %LISTENER;
154 root 1.21 our $LISTENER; # our listeners, as arrayref
155 root 1.1
156 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
157 root 1.1
158 root 1.69 sub _init_names {
159 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
160     $UNIQ = gen_uniq;
161    
162     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
163     $RUNIQ = nonce62 10;
164     $RUNIQ =~ s/(.)$/\U$1/;
165    
166     $NODE = "anon/$RUNIQ";
167 root 1.64 }
168    
169 root 1.69 _init_names;
170 root 1.64
171 root 1.1 sub NODE() {
172     $NODE
173     }
174    
175     sub node_of($) {
176 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
177 root 1.1
178 root 1.21 $node
179 root 1.1 }
180    
181 root 1.17 BEGIN {
182     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
183     ? sub () { 1 }
184     : sub () { 0 };
185     }
186 root 1.1
187 root 1.42 our $DELAY_TIMER;
188     our @DELAY_QUEUE;
189    
190     sub _delay_run {
191 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
192 root 1.42 }
193    
194     sub delay($) {
195     push @DELAY_QUEUE, shift;
196     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
197     }
198    
199 root 1.1 sub _inject {
200 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
201 root 1.1 &{ $PORT{+shift} or return };
202     }
203    
204 root 1.20 # this function adds a node-ref, so you can send stuff to it
205     # it is basically the central routing component.
206 root 1.1 sub add_node {
207 root 1.21 my ($node) = @_;
208 root 1.1
209 root 1.71 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
210 root 1.13 }
211    
212 root 1.1 sub snd(@) {
213 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
214 root 1.1
215 root 1.48 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
216 root 1.1
217 root 1.49 defined $nodeid #d#UGLY
218     or Carp::croak "'undef' is not a valid node ID/port ID";
219    
220 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
221 root 1.2 ->{send} (["$portid", @_]);
222 root 1.1 }
223    
224 root 1.17 =item $is_local = port_is_local $port
225    
226     Returns true iff the port is a local port.
227    
228     =cut
229    
230     sub port_is_local($) {
231 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
232 root 1.17
233 root 1.21 $NODE{$nodeid} == $NODE{""}
234 root 1.17 }
235    
236 root 1.18 =item snd_to_func $node, $func, @args
237 root 1.11
238 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
239 root 1.11 this function with the given arguments on that node.
240    
241 root 1.20 This function can be used to implement C<spawn>-like interfaces.
242 root 1.11
243     =cut
244    
245 root 1.18 sub snd_to_func($$;@) {
246 root 1.21 my $nodeid = shift;
247 root 1.11
248 root 1.41 # on $NODE, we artificially delay... (for spawn)
249     # this is very ugly - maybe we should simply delay ALL messages,
250     # to avoid deep recursion issues. but that's so... slow...
251 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
252     if $nodeid ne $NODE;
253    
254 root 1.49 defined $nodeid #d#UGLY
255     or Carp::croak "'undef' is not a valid node ID/port ID";
256    
257 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
258 root 1.11 }
259    
260 root 1.18 =item snd_on $node, @msg
261    
262     Executes C<snd> with the given C<@msg> (which must include the destination
263     port) on the given node.
264    
265     =cut
266    
267     sub snd_on($@) {
268     my $node = shift;
269     snd $node, snd => @_;
270     }
271    
272 root 1.29 =item eval_on $node, $string[, @reply]
273 root 1.18
274 root 1.29 Evaluates the given string as Perl expression on the given node. When
275     @reply is specified, then it is used to construct a reply message with
276     C<"$@"> and any results from the eval appended.
277 root 1.18
278     =cut
279    
280 root 1.29 sub eval_on($$;@) {
281 root 1.18 my $node = shift;
282     snd $node, eval => @_;
283     }
284    
285 root 1.1 sub kil(@) {
286 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
287 root 1.1
288     length $portid
289 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
290 root 1.1
291 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
292 root 1.1 ->kill ("$portid", @_);
293     }
294    
295     #############################################################################
296 root 1.6 # node monitoring and info
297 root 1.3
298 root 1.21 =item node_is_known $nodeid
299 root 1.13
300 root 1.71 #TODO#
301     Returns true iff the given node is currently known to this node.
302 root 1.13
303     =cut
304    
305     sub node_is_known($) {
306     exists $NODE{$_[0]}
307     }
308    
309 root 1.21 =item node_is_up $nodeid
310 root 1.13
311     Returns true if the given node is "up", that is, the kernel thinks it has
312     a working connection to it.
313    
314 root 1.69 If the node is known (to this local node) but not currently connected,
315     returns C<0>. If the node is not known, returns C<undef>.
316 root 1.13
317     =cut
318    
319     sub node_is_up($) {
320     ($NODE{$_[0]} or return)->{transport}
321     ? 1 : 0
322     }
323    
324 root 1.3 =item known_nodes
325    
326 root 1.71 #TODO#
327 root 1.26 Returns the node IDs of all nodes currently known to this node, including
328     itself and nodes not currently connected.
329 root 1.3
330     =cut
331    
332 root 1.49 sub known_nodes() {
333 root 1.26 map $_->{id}, values %NODE
334 root 1.3 }
335    
336     =item up_nodes
337    
338 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
339     the node itself).
340 root 1.3
341     =cut
342    
343 root 1.49 sub up_nodes() {
344 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
345 root 1.3 }
346    
347 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
348 root 1.3
349 root 1.27 Registers a callback that is called each time a node goes up (a connection
350     is established) or down (the connection is lost).
351 root 1.3
352     Node up messages can only be followed by node down messages for the same
353     node, and vice versa.
354    
355 root 1.71 Note that monitoring a node is usually better done by monitoring its node
356 root 1.27 port. This function is mainly of interest to modules that are concerned
357     about the network topology and low-level connection handling.
358    
359     Callbacks I<must not> block and I<should not> send any messages.
360    
361     The function returns an optional guard which can be used to unregister
362 root 1.3 the monitoring callback again.
363    
364 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
365     or go up (and down).
366    
367     newnode $_, 1 for up_nodes;
368     mon_nodes \&newnode;
369    
370 root 1.3 =cut
371    
372     our %MON_NODES;
373    
374     sub mon_nodes($) {
375     my ($cb) = @_;
376    
377     $MON_NODES{$cb+0} = $cb;
378    
379 root 1.62 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
380 root 1.3 }
381    
382     sub _inject_nodeevent($$;@) {
383 root 1.16 my ($node, $up, @reason) = @_;
384 root 1.3
385     for my $cb (values %MON_NODES) {
386 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
387 root 1.16 or $WARN->(1, $@);
388 root 1.3 }
389 root 1.16
390 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
391 root 1.3 }
392    
393     #############################################################################
394 root 1.1 # self node code
395    
396 root 1.67 sub _kill {
397     my $port = shift;
398    
399     delete $PORT{$port}
400     or return; # killing nonexistent ports is O.K.
401     delete $PORT_DATA{$port};
402    
403     my $mon = delete $LMON{$port}
404     or !@_
405     or $WARN->(2, "unmonitored local port $port died with reason: @_");
406    
407     $_->(@_) for values %$mon;
408     }
409    
410     sub _monitor {
411     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
412     unless exists $PORT{$_[1]};
413    
414     $LMON{$_[1]}{$_[2]+0} = $_[2];
415     }
416    
417     sub _unmonitor {
418 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
419     if exists $LMON{$_[1]};
420 root 1.67 }
421    
422 root 1.1 our %node_req = (
423     # internal services
424    
425     # monitoring
426 root 1.65 mon0 => sub { # stop monitoring a port for another node
427 root 1.1 my $portid = shift;
428 root 1.67 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
429 root 1.1 },
430 root 1.65 mon1 => sub { # start monitoring a port for another node
431 root 1.1 my $portid = shift;
432 root 1.67 Scalar::Util::weaken (my $node = $SRCNODE);
433     _monitor undef, $portid, $node->{rmon}{$portid} = sub {
434 root 1.58 delete $node->{rmon}{$portid};
435 root 1.65 $node->send (["", kil0 => $portid, @_])
436 root 1.59 if $node && $node->{transport};
437 root 1.67 };
438 root 1.1 },
439 root 1.65 # another node has killed a monitored port
440     kil0 => sub {
441 root 1.1 my $cbs = delete $SRCNODE->{lmon}{+shift}
442     or return;
443    
444     $_->(@_) for @$cbs;
445     },
446    
447 root 1.18 # "public" services - not actually public
448 root 1.1
449 root 1.65 # another node wants to kill a local port
450 root 1.66 kil => \&_kill,
451 root 1.65
452 root 1.1 # relay message to another node / generic echo
453 root 1.15 snd => \&snd,
454 root 1.27 snd_multiple => sub {
455 root 1.1 snd @$_ for @_
456     },
457    
458 root 1.30 # random utilities
459 root 1.1 eval => sub {
460 root 1.76 #d#SECURE
461 root 1.50 my @res = do { package main; eval shift };
462 root 1.1 snd @_, "$@", @res if @_;
463     },
464     time => sub {
465 root 1.76 snd @_, AE::now;
466 root 1.1 },
467     devnull => sub {
468     #
469     },
470 root 1.15 "" => sub {
471 root 1.27 # empty messages are keepalives or similar devnull-applications
472 root 1.15 },
473 root 1.1 );
474    
475 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
476 root 1.1 $PORT{""} = sub {
477     my $tag = shift;
478 root 1.76 #d#SECURE (load_func)
479 root 1.1 eval { &{ $node_req{$tag} ||= load_func $tag } };
480 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
481 root 1.1 };
482    
483 root 1.69 #############################################################################
484 root 1.71 # seed management, try to keep connections to all seeds at all times
485 root 1.69
486 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
487     our %NODE_SEED; # map node ID to seed ID
488 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
489     our $SEED_WATCHER;
490 root 1.71 our $SEED_RETRY;
491 root 1.69
492     sub seed_connect {
493     my ($seed) = @_;
494    
495     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
496     or Carp::croak "$seed: unparsable seed address";
497    
498     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
499    
500 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
501     $host, $port,
502     on_greeted => sub {
503     # called after receiving remote greeting, learn remote node name
504    
505     # we rely on untrusted data here (the remote node name) this is
506     # hopefully ok, as this can at most be used for DOSing, which is easy
507     # when you can do MITM anyway.
508    
509     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
510     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
511 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
512 root 1.71 delete $SEED_NODE{$seed};
513     delete $NODE_SEED{$seed};
514     } else {
515     $SEED_NODE{$seed} = $_[0]{remote_node};
516     $NODE_SEED{$_[0]{remote_node}} = $seed;
517     }
518     },
519     on_destroy => sub {
520     delete $SEED_CONNECT{$seed};
521     },
522 root 1.69 sub {
523     $SEED_CONNECT{$seed} = 1;
524 root 1.71 }
525 root 1.69 ;
526     }
527    
528     sub seed_all {
529     # my $next = List::Util::max 1,
530     # $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
531     # * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
532     # - rand;
533    
534     my @seeds = grep {
535     !exists $SEED_CONNECT{$_}
536     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
537     } keys %SEED_NODE;
538    
539     if (@seeds) {
540 root 1.70 # start connection attempt for every seed we are not connected to yet
541 root 1.69 seed_connect $_
542     for @seeds;
543 root 1.71
544     $SEED_RETRY = $SEED_RETRY * 2 + rand;
545     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
546     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
547    
548     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
549    
550 root 1.69 } else {
551 root 1.71 # all seeds connected or connecting, no need to restart timer
552 root 1.69 undef $SEED_WATCHER;
553     }
554     }
555    
556     sub seed_again {
557 root 1.71 $SEED_RETRY = 1;
558     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
559 root 1.69 }
560    
561     # sets new seed list, starts connecting
562     sub set_seeds(@) {
563     %SEED_NODE = ();
564 root 1.71 %NODE_SEED = ();
565     %SEED_CONNECT = ();
566    
567 root 1.69 @SEED_NODE{@_} = ();
568    
569 root 1.71 seed_again;#d#
570     seed_all;
571     }
572    
573     mon_nodes sub {
574     # if we lost the connection to a seed node, make sure we are seeding
575     seed_again
576     if !$_[1] && exists $NODE_SEED{$_[0]};
577     };
578    
579     #############################################################################
580     # talk with/to global nodes
581    
582 root 1.72 # protocol messages:
583     #
584 root 1.73 # sent by all slave nodes (slave to master)
585     # g_slave database - make other global node master of the sender
586 root 1.72 #
587 root 1.73 # sent by any node to global nodes
588     # g_set database - set whole database
589     # g_add family key val - add/replace key to database
590     # g_del family key - delete key from database
591     # g_get family key reply... - send reply with data
592     #
593     # send by global nodes
594     # g_global - node became global, similar to global=1 greeting
595     #
596     # database families
597     # "'l" -> node -> listeners
598     # "'g" -> node -> undef
599     # ...
600 root 1.72 #
601    
602 root 1.73 # used on all nodes:
603 root 1.78 our $MASTER; # the global node we bind ourselves to, unless we are global ourselves
604     our $MASTER_MON;
605     our %LOCAL_DB; # this node database
606    
607     our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
608 root 1.71
609 root 1.73 #our $sv_ne_json = JSON::XS->new->canonical;
610     #
611     #sub sv_ne($$) {
612     # $sv_ne_json->encode ($_[0]) ne $sv_ne_json->encode ($_[1])
613     #}
614 root 1.71
615 root 1.73 # local database management
616     sub ldb_set($$;$) {
617     warn "ldb_set<@_>\n";#d#
618     if (@_ > 2) {
619     $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
620     snd $MASTER, g_add => $_[0] => $_[1] => $_[2]
621     if defined $MASTER;
622     } else {
623     delete $LOCAL_DB{$_[0]}{$_[1]};
624     snd $MASTER, g_del => $_[0] => $_[1]
625     if defined $MASTER;
626 root 1.71 }
627     }
628    
629 root 1.73 #############################################################################
630     # master selection
631    
632     # master requests
633     our %GLOBAL_REQ; # $id => \@req
634 root 1.71
635 root 1.74 sub global_req_add {
636     my $id = shift;
637 root 1.71
638 root 1.74 return if exists $GLOBAL_REQ{$id};
639    
640     $GLOBAL_REQ{$id} = [@_];
641 root 1.71
642 root 1.73 snd $MASTER, @_
643     if $MASTER;
644 root 1.74 }
645 root 1.71
646 root 1.74 sub global_req_del {
647     delete $GLOBAL_REQ{$_[0]};
648     }
649    
650     sub g_find {
651     global_req_add "g_find $_[0]", g_find => $_[0];
652 root 1.73 }
653 root 1.71
654 root 1.73 # reply for g_find started in Node.pm
655     $node_req{g_found} = sub {
656 root 1.74 global_req_del "g_find $_[0]";
657    
658 root 1.73 @{ $_[1] } or return; # d'oh
659 root 1.72
660 root 1.73 my $node = $NODE{$_[0]} or return;
661 root 1.71
662 root 1.73 local $GLOBAL_DB{"'l"}{$_[0]} = $_[1]; #d# UGLY
663     $node->connect;
664 root 1.71 };
665    
666 root 1.73 sub master_set {
667     $MASTER = $_[0];
668 root 1.71
669 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
670 root 1.71
671 root 1.73 # (re-)send queued requests
672     snd $MASTER, @$_
673     for values %GLOBAL_REQ;
674     }
675 root 1.71
676 root 1.72 sub master_search {
677 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
678     for (keys %NODE_SEED) {
679 root 1.72 if (node_is_up $_) {
680     master_set $_;
681     return;
682 root 1.71 }
683 root 1.72 }
684 root 1.71
685 root 1.78 $MASTER_MON = mon_nodes sub {
686 root 1.72 return unless $_[1]; # we are only interested in node-ups
687     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
688 root 1.71
689 root 1.72 master_set $_[0];
690 root 1.71
691 root 1.78 $MASTER_MON = mon_nodes sub {
692 root 1.72 if ($_[0] eq $MASTER && !$_[1]) {
693     undef $MASTER;
694     master_search ();
695     }
696 root 1.71 };
697 root 1.72 };
698 root 1.71 }
699    
700 root 1.73 # other node wants to make us the master
701     $node_req{g_slave} = sub {
702     my ($db) = @_;
703    
704     warn "slave1\n";#d#
705 root 1.71
706 root 1.73 require AnyEvent::MP::Global;
707     &{ $node_req{g_slave} };
708 root 1.71 };
709    
710 root 1.73 #$node_req{g_reply} = sub {
711     # my $id = shift;
712     #
713     # my $cb = delete $GLOBAL_REQ{$id}
714     # or return;
715     #
716     # $cb->[0]->(@_);
717     #};
718 root 1.71
719 root 1.73 #############################################################################
720 root 1.71
721 root 1.73 #############################################################################
722 root 1.71
723 root 1.73 # $WARN->(1, "$SRCNODE->{id} treats us as global node, but we aren't");
724 root 1.71
725 root 1.69 #############################################################################
726     # configure
727    
728     sub _nodename {
729     require POSIX;
730     (POSIX::uname ())[1]
731     }
732    
733     sub _resolve($) {
734     my ($nodeid) = @_;
735    
736     my $cv = AE::cv;
737     my @res;
738    
739     $cv->begin (sub {
740     my %seen;
741     my @refs;
742     for (sort { $a->[0] <=> $b->[0] } @res) {
743     push @refs, $_->[1] unless $seen{$_->[1]}++
744     }
745     shift->send (@refs);
746     });
747    
748     my $idx;
749     for my $t (split /,/, $nodeid) {
750     my $pri = ++$idx;
751    
752     $t = length $t ? _nodename . ":$t" : _nodename
753     if $t =~ /^\d*$/;
754    
755     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
756     or Carp::croak "$t: unparsable transport descriptor";
757    
758     $port = "0" if $port eq "*";
759    
760     if ($host eq "*") {
761     $cv->begin;
762     # use fork_call, as Net::Interface is big, and we need it rarely.
763     require AnyEvent::Util;
764     AnyEvent::Util::fork_call (
765     sub {
766     my @addr;
767    
768     require Net::Interface;
769    
770     for my $if (Net::Interface->interfaces) {
771     # we statically lower-prioritise ipv6 here, TODO :()
772     for $_ ($if->address (Net::Interface::AF_INET ())) {
773     next if /^\x7f/; # skip localhost etc.
774     push @addr, $_;
775     }
776     for ($if->address (Net::Interface::AF_INET6 ())) {
777     #next if $if->scope ($_) <= 2;
778     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
779     push @addr, $_;
780     }
781    
782     }
783     @addr
784     }, sub {
785     for my $ip (@_) {
786     push @res, [
787     $pri += 1e-5,
788     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
789     ];
790     }
791     $cv->end;
792     }
793     );
794     } else {
795     $cv->begin;
796     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
797     for (@_) {
798     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
799     push @res, [
800     $pri += 1e-5,
801     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
802     ];
803     }
804     $cv->end;
805     };
806     }
807     }
808    
809     $cv->end;
810    
811     $cv
812     }
813    
814     sub configure(@) {
815     unshift @_, "profile" if @_ & 1;
816     my (%kv) = @_;
817    
818     delete $NODE{$NODE}; # we do not support doing stuff before configure
819     _init_names;
820    
821     my $profile = delete $kv{profile};
822    
823     $profile = _nodename
824     unless defined $profile;
825    
826     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
827    
828 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
829 root 1.69
830     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
831    
832 root 1.72 $NODE = $node;
833 root 1.77
834     if ($NODE =~ s%/$%/$RUNIQ%) {
835     # nodes with randomised node names do not need randomised port names
836     $UNIQ = "";
837     }
838 root 1.69
839     $NODE{$NODE} = $NODE{""};
840     $NODE{$NODE}{id} = $NODE;
841    
842     my $seeds = $CONFIG->{seeds};
843     my $binds = $CONFIG->{binds};
844    
845     $binds ||= ["*"];
846    
847     $WARN->(8, "node $NODE starting up.");
848    
849     $LISTENER = [];
850     %LISTENER = ();
851    
852     for (map _resolve $_, @$binds) {
853     for my $bind ($_->recv) {
854     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
855     or Carp::croak "$bind: unparsable local bind address";
856    
857     my $listener = AnyEvent::MP::Transport::mp_server
858     $host,
859     $port,
860     prepare => sub {
861     my (undef, $host, $port) = @_;
862     $bind = AnyEvent::Socket::format_hostport $host, $port;
863     0
864     },
865     ;
866     $LISTENER{$bind} = $listener;
867     push @$LISTENER, $bind;
868     }
869     }
870    
871 root 1.73 ldb_set "'l" => $NODE => $LISTENER;
872    
873 root 1.69 $WARN->(8, "node listens on [@$LISTENER].");
874    
875     # the global service is mandatory currently
876 root 1.71 #require AnyEvent::MP::Global;
877 root 1.69
878     # connect to all seednodes
879     set_seeds map $_->recv, map _resolve $_, @$seeds;
880    
881 root 1.73 master_search;
882    
883 root 1.71 if ($NODE eq "atha") {;#d#
884 root 1.72 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
885 root 1.71 }
886    
887 root 1.69 for (@{ $CONFIG->{services} }) {
888     if (ref) {
889     my ($func, @args) = @$_;
890     (load_func $func)->(@args);
891     } elsif (s/::$//) {
892     eval "require $_";
893     die $@ if $@;
894     } else {
895     (load_func $_)->();
896     }
897     }
898     }
899    
900 root 1.1 =back
901    
902     =head1 SEE ALSO
903    
904     L<AnyEvent::MP>.
905    
906     =head1 AUTHOR
907    
908     Marc Lehmann <schmorp@schmorp.de>
909     http://home.schmorp.de/
910    
911     =cut
912    
913     1
914