ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.79
Committed: Sat Mar 3 11:38:43 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.78: +30 -52 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29    
30 root 1.79 use AnyEvent ();
31     use Guard ();
32 root 1.1
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.71 our @EXPORT_OK = qw(
39     %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40     );
41    
42 root 1.1 our @EXPORT = qw(
43 root 1.21 add_node load_func snd_to_func snd_on eval_on
44 root 1.1
45 root 1.34 NODE $NODE node_of snd kil port_is_local
46     configure
47 root 1.46 up_nodes mon_nodes node_is_up
48 root 1.79 db_set db_del db_reg
49 root 1.1 );
50    
51 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
52 root 1.1
53 root 1.27 This value is called with an error or warning message, when e.g. a
54     connection could not be created, authorisation failed and so on.
55    
56     It I<must not> block or send messages -queue it and use an idle watcher if
57     you need to do any of these things.
58 root 1.1
59 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
60 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
61     node connectivity and services, C<8> for debugging messages and C<9> for
62     tracing messages.
63    
64 root 1.1 The default simply logs the message to STDERR.
65    
66 root 1.44 =item @AnyEvent::MP::Kernel::WARN
67    
68     All code references in this array are called for every log message, from
69     the default C<$WARN> handler. This is an easy way to tie into the log
70     messages without disturbing others.
71    
72 root 1.1 =cut
73    
74 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
75 root 1.44 our @WARN;
76     our $WARN = sub {
77     &$_ for @WARN;
78 root 1.29
79     return if $WARNLEVEL < $_[0];
80    
81 root 1.16 my ($level, $msg) = @_;
82    
83 root 1.1 $msg =~ s/\n$//;
84 root 1.16
85     printf STDERR "%s <%d> %s\n",
86     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
87     $level,
88     $msg;
89 root 1.1 };
90    
91 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
92    
93     The maximum level at which warning messages will be printed to STDERR by
94     the default warn handler.
95    
96     =cut
97    
98 root 1.6 sub load_func($) {
99     my $func = $_[0];
100    
101     unless (defined &$func) {
102     my $pkg = $func;
103     do {
104     $pkg =~ s/::[^:]+$//
105 root 1.63 or return sub { die "unable to resolve function '$func'" };
106 root 1.60
107     local $@;
108 root 1.61 unless (eval "require $pkg; 1") {
109     my $error = $@;
110     $error =~ /^Can't locate .*.pm in \@INC \(/
111     or return sub { die $error };
112     }
113 root 1.6 } until defined &$func;
114     }
115    
116     \&$func
117     }
118    
119 root 1.78 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
120    
121 root 1.1 sub nonce($) {
122 root 1.78 join "", map chr rand 256, 1 .. $_[0]
123 root 1.1 }
124    
125 root 1.78 sub nonce62($) {
126     join "", map $alnum[rand 62], 1 .. $_[0]
127 root 1.1 }
128    
129     sub gen_uniq {
130 root 1.78 my $now = AE::now;
131     (join "",
132     map $alnum[$_],
133     $$ / 62 % 62,
134     $$ % 62,
135     (int $now ) % 62,
136     (int $now * 100) % 62,
137     (int $now * 10000) % 62,
138     ) . nonce62 4;
139 root 1.1 }
140    
141 root 1.20 our $CONFIG; # this node's configuration
142 root 1.21
143 root 1.64 our $RUNIQ; # remote uniq value
144     our $UNIQ; # per-process/node unique cookie
145     our $NODE;
146     our $ID = "a";
147 root 1.1
148     our %NODE; # node id to transport mapping, or "undef", for local node
149     our (%PORT, %PORT_DATA); # local ports
150    
151 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
152 root 1.1 our %LMON; # monitored _local_ ports
153    
154 root 1.71 our $GLOBAL; # true if node is a global ("directory") node
155 root 1.1 our %LISTENER;
156 root 1.21 our $LISTENER; # our listeners, as arrayref
157 root 1.1
158 root 1.76 our $SRCNODE; # holds the sending node _object_ during _inject
159 root 1.1
160 root 1.69 sub _init_names {
161 root 1.78 # ~54 bits, for local port names, lowercase $ID appended
162     $UNIQ = gen_uniq;
163    
164     # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
165     $RUNIQ = nonce62 10;
166     $RUNIQ =~ s/(.)$/\U$1/;
167    
168     $NODE = "anon/$RUNIQ";
169 root 1.64 }
170    
171 root 1.69 _init_names;
172 root 1.64
173 root 1.1 sub NODE() {
174     $NODE
175     }
176    
177     sub node_of($) {
178 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
179 root 1.1
180 root 1.21 $node
181 root 1.1 }
182    
183 root 1.17 BEGIN {
184     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
185     ? sub () { 1 }
186     : sub () { 0 };
187     }
188 root 1.1
189 root 1.42 our $DELAY_TIMER;
190     our @DELAY_QUEUE;
191    
192     sub _delay_run {
193 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
194 root 1.42 }
195    
196     sub delay($) {
197     push @DELAY_QUEUE, shift;
198     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
199     }
200    
201 root 1.1 sub _inject {
202 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
203 root 1.1 &{ $PORT{+shift} or return };
204     }
205    
206 root 1.20 # this function adds a node-ref, so you can send stuff to it
207     # it is basically the central routing component.
208 root 1.1 sub add_node {
209 root 1.21 my ($node) = @_;
210 root 1.1
211 root 1.71 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
212 root 1.13 }
213    
214 root 1.1 sub snd(@) {
215 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
216 root 1.1
217 root 1.48 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
218 root 1.1
219 root 1.49 defined $nodeid #d#UGLY
220     or Carp::croak "'undef' is not a valid node ID/port ID";
221    
222 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
223 root 1.2 ->{send} (["$portid", @_]);
224 root 1.1 }
225    
226 root 1.17 =item $is_local = port_is_local $port
227    
228     Returns true iff the port is a local port.
229    
230     =cut
231    
232     sub port_is_local($) {
233 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
234 root 1.17
235 root 1.21 $NODE{$nodeid} == $NODE{""}
236 root 1.17 }
237    
238 root 1.18 =item snd_to_func $node, $func, @args
239 root 1.11
240 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
241 root 1.11 this function with the given arguments on that node.
242    
243 root 1.20 This function can be used to implement C<spawn>-like interfaces.
244 root 1.11
245     =cut
246    
247 root 1.18 sub snd_to_func($$;@) {
248 root 1.21 my $nodeid = shift;
249 root 1.11
250 root 1.41 # on $NODE, we artificially delay... (for spawn)
251     # this is very ugly - maybe we should simply delay ALL messages,
252     # to avoid deep recursion issues. but that's so... slow...
253 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
254     if $nodeid ne $NODE;
255    
256 root 1.49 defined $nodeid #d#UGLY
257     or Carp::croak "'undef' is not a valid node ID/port ID";
258    
259 root 1.71 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
260 root 1.11 }
261    
262 root 1.18 =item snd_on $node, @msg
263    
264     Executes C<snd> with the given C<@msg> (which must include the destination
265     port) on the given node.
266    
267     =cut
268    
269     sub snd_on($@) {
270     my $node = shift;
271     snd $node, snd => @_;
272     }
273    
274 root 1.29 =item eval_on $node, $string[, @reply]
275 root 1.18
276 root 1.29 Evaluates the given string as Perl expression on the given node. When
277     @reply is specified, then it is used to construct a reply message with
278     C<"$@"> and any results from the eval appended.
279 root 1.18
280     =cut
281    
282 root 1.29 sub eval_on($$;@) {
283 root 1.18 my $node = shift;
284     snd $node, eval => @_;
285     }
286    
287 root 1.1 sub kil(@) {
288 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
289 root 1.1
290     length $portid
291 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
292 root 1.1
293 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
294 root 1.1 ->kill ("$portid", @_);
295     }
296    
297     #############################################################################
298 root 1.6 # node monitoring and info
299 root 1.3
300 root 1.21 =item node_is_known $nodeid
301 root 1.13
302 root 1.71 #TODO#
303     Returns true iff the given node is currently known to this node.
304 root 1.13
305     =cut
306    
307     sub node_is_known($) {
308     exists $NODE{$_[0]}
309     }
310    
311 root 1.21 =item node_is_up $nodeid
312 root 1.13
313     Returns true if the given node is "up", that is, the kernel thinks it has
314     a working connection to it.
315    
316 root 1.69 If the node is known (to this local node) but not currently connected,
317     returns C<0>. If the node is not known, returns C<undef>.
318 root 1.13
319     =cut
320    
321     sub node_is_up($) {
322     ($NODE{$_[0]} or return)->{transport}
323     ? 1 : 0
324     }
325    
326 root 1.3 =item known_nodes
327    
328 root 1.71 #TODO#
329 root 1.26 Returns the node IDs of all nodes currently known to this node, including
330     itself and nodes not currently connected.
331 root 1.3
332     =cut
333    
334 root 1.49 sub known_nodes() {
335 root 1.26 map $_->{id}, values %NODE
336 root 1.3 }
337    
338     =item up_nodes
339    
340 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
341     the node itself).
342 root 1.3
343     =cut
344    
345 root 1.49 sub up_nodes() {
346 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
347 root 1.3 }
348    
349 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
350 root 1.3
351 root 1.27 Registers a callback that is called each time a node goes up (a connection
352     is established) or down (the connection is lost).
353 root 1.3
354     Node up messages can only be followed by node down messages for the same
355     node, and vice versa.
356    
357 root 1.71 Note that monitoring a node is usually better done by monitoring its node
358 root 1.27 port. This function is mainly of interest to modules that are concerned
359     about the network topology and low-level connection handling.
360    
361     Callbacks I<must not> block and I<should not> send any messages.
362    
363     The function returns an optional guard which can be used to unregister
364 root 1.3 the monitoring callback again.
365    
366 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
367     or go up (and down).
368    
369     newnode $_, 1 for up_nodes;
370     mon_nodes \&newnode;
371    
372 root 1.3 =cut
373    
374     our %MON_NODES;
375    
376     sub mon_nodes($) {
377     my ($cb) = @_;
378    
379     $MON_NODES{$cb+0} = $cb;
380    
381 root 1.79 defined wantarray && Guard::guard { delete $MON_NODES{$cb+0} }
382 root 1.3 }
383    
384     sub _inject_nodeevent($$;@) {
385 root 1.16 my ($node, $up, @reason) = @_;
386 root 1.3
387     for my $cb (values %MON_NODES) {
388 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
389 root 1.16 or $WARN->(1, $@);
390 root 1.3 }
391 root 1.16
392 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
393 root 1.3 }
394    
395     #############################################################################
396 root 1.1 # self node code
397    
398 root 1.67 sub _kill {
399     my $port = shift;
400    
401     delete $PORT{$port}
402     or return; # killing nonexistent ports is O.K.
403     delete $PORT_DATA{$port};
404    
405     my $mon = delete $LMON{$port}
406     or !@_
407     or $WARN->(2, "unmonitored local port $port died with reason: @_");
408    
409     $_->(@_) for values %$mon;
410     }
411    
412     sub _monitor {
413     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
414     unless exists $PORT{$_[1]};
415    
416     $LMON{$_[1]}{$_[2]+0} = $_[2];
417     }
418    
419     sub _unmonitor {
420 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
421     if exists $LMON{$_[1]};
422 root 1.67 }
423    
424 root 1.79 our %NODE_REQ = (
425 root 1.1 # internal services
426    
427     # monitoring
428 root 1.65 mon0 => sub { # stop monitoring a port for another node
429 root 1.1 my $portid = shift;
430 root 1.67 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
431 root 1.1 },
432 root 1.65 mon1 => sub { # start monitoring a port for another node
433 root 1.1 my $portid = shift;
434 root 1.67 Scalar::Util::weaken (my $node = $SRCNODE);
435     _monitor undef, $portid, $node->{rmon}{$portid} = sub {
436 root 1.58 delete $node->{rmon}{$portid};
437 root 1.65 $node->send (["", kil0 => $portid, @_])
438 root 1.59 if $node && $node->{transport};
439 root 1.67 };
440 root 1.1 },
441 root 1.65 # another node has killed a monitored port
442     kil0 => sub {
443 root 1.1 my $cbs = delete $SRCNODE->{lmon}{+shift}
444     or return;
445    
446     $_->(@_) for @$cbs;
447     },
448    
449 root 1.18 # "public" services - not actually public
450 root 1.1
451 root 1.65 # another node wants to kill a local port
452 root 1.66 kil => \&_kill,
453 root 1.65
454 root 1.1 # relay message to another node / generic echo
455 root 1.15 snd => \&snd,
456 root 1.27 snd_multiple => sub {
457 root 1.1 snd @$_ for @_
458     },
459    
460 root 1.30 # random utilities
461 root 1.1 eval => sub {
462 root 1.76 #d#SECURE
463 root 1.50 my @res = do { package main; eval shift };
464 root 1.1 snd @_, "$@", @res if @_;
465     },
466     time => sub {
467 root 1.76 snd @_, AE::now;
468 root 1.1 },
469     devnull => sub {
470     #
471     },
472 root 1.15 "" => sub {
473 root 1.27 # empty messages are keepalives or similar devnull-applications
474 root 1.15 },
475 root 1.1 );
476    
477 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
478 root 1.1 $PORT{""} = sub {
479     my $tag = shift;
480 root 1.76 #d#SECURE (load_func)
481 root 1.79 eval { &{ $NODE_REQ{$tag} ||= load_func $tag } };
482 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
483 root 1.1 };
484    
485 root 1.69 #############################################################################
486 root 1.71 # seed management, try to keep connections to all seeds at all times
487 root 1.69
488 root 1.71 our %SEED_NODE; # seed ID => node ID|undef
489     our %NODE_SEED; # map node ID to seed ID
490 root 1.69 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
491     our $SEED_WATCHER;
492 root 1.71 our $SEED_RETRY;
493 root 1.69
494     sub seed_connect {
495     my ($seed) = @_;
496    
497     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
498     or Carp::croak "$seed: unparsable seed address";
499    
500     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
501    
502 root 1.71 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
503     $host, $port,
504     on_greeted => sub {
505     # called after receiving remote greeting, learn remote node name
506    
507     # we rely on untrusted data here (the remote node name) this is
508     # hopefully ok, as this can at most be used for DOSing, which is easy
509     # when you can do MITM anyway.
510    
511     # if we connect to ourselves, nuke this seed, but make sure we act like a seed
512     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
513 root 1.72 require AnyEvent::MP::Global; # every seed becomes a global node currently
514 root 1.71 delete $SEED_NODE{$seed};
515     delete $NODE_SEED{$seed};
516     } else {
517     $SEED_NODE{$seed} = $_[0]{remote_node};
518     $NODE_SEED{$_[0]{remote_node}} = $seed;
519     }
520     },
521     on_destroy => sub {
522     delete $SEED_CONNECT{$seed};
523     },
524 root 1.69 sub {
525     $SEED_CONNECT{$seed} = 1;
526 root 1.71 }
527 root 1.69 ;
528     }
529    
530     sub seed_all {
531     my @seeds = grep {
532     !exists $SEED_CONNECT{$_}
533     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
534     } keys %SEED_NODE;
535    
536     if (@seeds) {
537 root 1.70 # start connection attempt for every seed we are not connected to yet
538 root 1.69 seed_connect $_
539     for @seeds;
540 root 1.71
541     $SEED_RETRY = $SEED_RETRY * 2 + rand;
542     $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
543     if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
544    
545     $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
546    
547 root 1.69 } else {
548 root 1.71 # all seeds connected or connecting, no need to restart timer
549 root 1.69 undef $SEED_WATCHER;
550     }
551     }
552    
553     sub seed_again {
554 root 1.71 $SEED_RETRY = 1;
555     $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
556 root 1.69 }
557    
558     # sets new seed list, starts connecting
559     sub set_seeds(@) {
560     %SEED_NODE = ();
561 root 1.71 %NODE_SEED = ();
562     %SEED_CONNECT = ();
563    
564 root 1.69 @SEED_NODE{@_} = ();
565    
566 root 1.71 seed_again;#d#
567     seed_all;
568     }
569    
570     mon_nodes sub {
571     # if we lost the connection to a seed node, make sure we are seeding
572     seed_again
573     if !$_[1] && exists $NODE_SEED{$_[0]};
574     };
575    
576     #############################################################################
577     # talk with/to global nodes
578    
579 root 1.72 # protocol messages:
580     #
581 root 1.73 # sent by all slave nodes (slave to master)
582     # g_slave database - make other global node master of the sender
583 root 1.72 #
584 root 1.73 # sent by any node to global nodes
585     # g_set database - set whole database
586     # g_add family key val - add/replace key to database
587     # g_del family key - delete key from database
588     # g_get family key reply... - send reply with data
589     #
590     # send by global nodes
591     # g_global - node became global, similar to global=1 greeting
592     #
593     # database families
594     # "'l" -> node -> listeners
595     # "'g" -> node -> undef
596     # ...
597 root 1.72 #
598    
599 root 1.73 # used on all nodes:
600 root 1.78 our $MASTER; # the global node we bind ourselves to, unless we are global ourselves
601     our $MASTER_MON;
602     our %LOCAL_DB; # this node database
603    
604     our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
605 root 1.71
606 root 1.73 #############################################################################
607     # master selection
608    
609     # master requests
610     our %GLOBAL_REQ; # $id => \@req
611 root 1.71
612 root 1.74 sub global_req_add {
613     my $id = shift;
614 root 1.71
615 root 1.74 return if exists $GLOBAL_REQ{$id};
616    
617     $GLOBAL_REQ{$id} = [@_];
618 root 1.71
619 root 1.73 snd $MASTER, @_
620     if $MASTER;
621 root 1.74 }
622 root 1.71
623 root 1.74 sub global_req_del {
624     delete $GLOBAL_REQ{$_[0]};
625     }
626    
627     sub g_find {
628     global_req_add "g_find $_[0]", g_find => $_[0];
629 root 1.73 }
630 root 1.71
631 root 1.73 # reply for g_find started in Node.pm
632 root 1.79 $NODE_REQ{g_found} = sub {
633 root 1.74 global_req_del "g_find $_[0]";
634    
635 root 1.73 my $node = $NODE{$_[0]} or return;
636 root 1.71
637 root 1.79 $node->connect_to ($_[1]);
638 root 1.71 };
639    
640 root 1.73 sub master_set {
641     $MASTER = $_[0];
642 root 1.71
643 root 1.73 snd $MASTER, g_slave => \%LOCAL_DB;
644 root 1.71
645 root 1.73 # (re-)send queued requests
646     snd $MASTER, @$_
647     for values %GLOBAL_REQ;
648     }
649 root 1.71
650 root 1.72 sub master_search {
651 root 1.73 #TODO: should also look for other global nodes, but we don't know them #d#
652     for (keys %NODE_SEED) {
653 root 1.72 if (node_is_up $_) {
654     master_set $_;
655     return;
656 root 1.71 }
657 root 1.72 }
658 root 1.71
659 root 1.78 $MASTER_MON = mon_nodes sub {
660 root 1.72 return unless $_[1]; # we are only interested in node-ups
661     return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
662 root 1.71
663 root 1.72 master_set $_[0];
664 root 1.71
665 root 1.78 $MASTER_MON = mon_nodes sub {
666 root 1.72 if ($_[0] eq $MASTER && !$_[1]) {
667     undef $MASTER;
668     master_search ();
669     }
670 root 1.71 };
671 root 1.72 };
672 root 1.71 }
673    
674 root 1.73 # other node wants to make us the master
675 root 1.79 $NODE_REQ{g_slave} = sub {
676 root 1.73 my ($db) = @_;
677    
678     warn "slave1\n";#d#
679 root 1.71
680 root 1.73 require AnyEvent::MP::Global;
681 root 1.79 &{ $NODE_REQ{g_slave} };
682 root 1.71 };
683    
684 root 1.73 #############################################################################
685 root 1.79 # local database operations
686 root 1.71
687 root 1.79 # local database management
688     sub db_set($$$) {
689     $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
690     snd $MASTER, g_add => $_[0] => $_[1] => $_[2]
691     if defined $MASTER;
692     }
693    
694     sub db_del($$) {
695     delete $LOCAL_DB{$_[0]}{$_[1]};
696     snd $MASTER, g_del => $_[0] => $_[1]
697     if defined $MASTER;
698     }
699    
700     sub db_reg($$;$) {
701     my ($family, $key) = @_;
702     &db_set;
703     Guard::guard { db_del $family => $key }
704     }
705 root 1.71
706 root 1.69 #############################################################################
707     # configure
708    
709     sub _nodename {
710     require POSIX;
711     (POSIX::uname ())[1]
712     }
713    
714     sub _resolve($) {
715     my ($nodeid) = @_;
716    
717     my $cv = AE::cv;
718     my @res;
719    
720     $cv->begin (sub {
721     my %seen;
722     my @refs;
723     for (sort { $a->[0] <=> $b->[0] } @res) {
724     push @refs, $_->[1] unless $seen{$_->[1]}++
725     }
726     shift->send (@refs);
727     });
728    
729     my $idx;
730     for my $t (split /,/, $nodeid) {
731     my $pri = ++$idx;
732    
733     $t = length $t ? _nodename . ":$t" : _nodename
734     if $t =~ /^\d*$/;
735    
736     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
737     or Carp::croak "$t: unparsable transport descriptor";
738    
739     $port = "0" if $port eq "*";
740    
741     if ($host eq "*") {
742     $cv->begin;
743     # use fork_call, as Net::Interface is big, and we need it rarely.
744     require AnyEvent::Util;
745     AnyEvent::Util::fork_call (
746     sub {
747     my @addr;
748    
749     require Net::Interface;
750    
751     for my $if (Net::Interface->interfaces) {
752     # we statically lower-prioritise ipv6 here, TODO :()
753     for $_ ($if->address (Net::Interface::AF_INET ())) {
754     next if /^\x7f/; # skip localhost etc.
755     push @addr, $_;
756     }
757     for ($if->address (Net::Interface::AF_INET6 ())) {
758     #next if $if->scope ($_) <= 2;
759     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
760     push @addr, $_;
761     }
762    
763     }
764     @addr
765     }, sub {
766     for my $ip (@_) {
767     push @res, [
768     $pri += 1e-5,
769     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
770     ];
771     }
772     $cv->end;
773     }
774     );
775     } else {
776     $cv->begin;
777     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
778     for (@_) {
779     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
780     push @res, [
781     $pri += 1e-5,
782     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
783     ];
784     }
785     $cv->end;
786     };
787     }
788     }
789    
790     $cv->end;
791    
792     $cv
793     }
794    
795     sub configure(@) {
796     unshift @_, "profile" if @_ & 1;
797     my (%kv) = @_;
798    
799     delete $NODE{$NODE}; # we do not support doing stuff before configure
800     _init_names;
801    
802     my $profile = delete $kv{profile};
803    
804     $profile = _nodename
805     unless defined $profile;
806    
807     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
808    
809 root 1.72 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
810 root 1.69
811     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
812    
813 root 1.72 $NODE = $node;
814 root 1.77
815     if ($NODE =~ s%/$%/$RUNIQ%) {
816     # nodes with randomised node names do not need randomised port names
817     $UNIQ = "";
818     }
819 root 1.69
820     $NODE{$NODE} = $NODE{""};
821     $NODE{$NODE}{id} = $NODE;
822    
823     my $seeds = $CONFIG->{seeds};
824     my $binds = $CONFIG->{binds};
825    
826     $binds ||= ["*"];
827    
828     $WARN->(8, "node $NODE starting up.");
829    
830     $LISTENER = [];
831     %LISTENER = ();
832    
833     for (map _resolve $_, @$binds) {
834     for my $bind ($_->recv) {
835     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
836     or Carp::croak "$bind: unparsable local bind address";
837    
838     my $listener = AnyEvent::MP::Transport::mp_server
839     $host,
840     $port,
841     prepare => sub {
842     my (undef, $host, $port) = @_;
843     $bind = AnyEvent::Socket::format_hostport $host, $port;
844     0
845     },
846     ;
847     $LISTENER{$bind} = $listener;
848     push @$LISTENER, $bind;
849     }
850     }
851    
852 root 1.79 db_set "'l" => $NODE => $LISTENER;
853 root 1.73
854 root 1.69 $WARN->(8, "node listens on [@$LISTENER].");
855    
856     # connect to all seednodes
857     set_seeds map $_->recv, map _resolve $_, @$seeds;
858    
859 root 1.73 master_search;
860    
861 root 1.71 if ($NODE eq "atha") {;#d#
862 root 1.72 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
863 root 1.71 }
864    
865 root 1.69 for (@{ $CONFIG->{services} }) {
866     if (ref) {
867     my ($func, @args) = @$_;
868     (load_func $func)->(@args);
869     } elsif (s/::$//) {
870     eval "require $_";
871     die $@ if $@;
872     } else {
873     (load_func $_)->();
874     }
875     }
876     }
877    
878 root 1.1 =back
879    
880     =head1 SEE ALSO
881    
882     L<AnyEvent::MP>.
883    
884     =head1 AUTHOR
885    
886     Marc Lehmann <schmorp@schmorp.de>
887     http://home.schmorp.de/
888    
889     =cut
890    
891     1
892