ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.110
Committed: Sat Mar 24 16:50:15 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.109: +19 -11 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 $AnyEvent::MP::Kernel::SRCNODE # contains msg origin node id, for debugging
10
11 snd_to_func $node, $func, @args # send msg to function
12 snd_on $node, @msg # snd message again (relay)
13 eval_on $node, $string[, @reply] # execute perl code on another node
14
15 node_is_up $nodeid # return true if a node is connected
16 @nodes = up_nodes # return a list of all connected nodes
17 $guard = mon_nodes $callback->($node, $is_up, @reason) # connections up/downs
18
19 =head1 DESCRIPTION
20
21 This module implements most of the inner workings of AnyEvent::MP. It
22 offers mostly lower-level functions that deal with network connectivity
23 and special requests.
24
25 You normally interface with AnyEvent::MP through a higher level interface
26 such as L<AnyEvent::MP> and L<Coro::MP>, although there is nothing wrong
27 with using the functions from this module.
28
29 =head1 GLOBALS AND FUNCTIONS
30
31 =over 4
32
33 =cut
34
35 package AnyEvent::MP::Kernel;
36
37 use common::sense;
38 use Carp ();
39
40 use AnyEvent ();
41 use Guard ();
42
43 use AnyEvent::MP::Node;
44 use AnyEvent::MP::Transport;
45
46 use base "Exporter";
47
48 # for re-export in AnyEvent::MP and Coro::MP
49 our @EXPORT_API = qw(
50 NODE $NODE
51 configure
52 node_of port_is_local
53 snd kil
54 db_set db_del
55 db_mon db_family db_keys db_values
56 );
57
58 our @EXPORT_OK = (
59 # these are internal
60 qw(
61 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
62 add_node load_func
63 ),
64 @EXPORT_API,
65 );
66
67 our @EXPORT = qw(
68 snd_to_func snd_on eval_on
69 port_is_local
70 up_nodes mon_nodes node_is_up
71 );
72
73 sub load_func($) {
74 my $func = $_[0];
75
76 unless (defined &$func) {
77 my $pkg = $func;
78 do {
79 $pkg =~ s/::[^:]+$//
80 or return sub { die "unable to resolve function '$func'" };
81
82 local $@;
83 unless (eval "require $pkg; 1") {
84 my $error = $@;
85 $error =~ /^Can't locate .*.pm in \@INC \(/
86 or return sub { die $error };
87 }
88 } until defined &$func;
89 }
90
91 \&$func
92 }
93
94 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
95
96 sub nonce($) {
97 join "", map chr rand 256, 1 .. $_[0]
98 }
99
100 sub nonce62($) {
101 join "", map $alnum[rand 62], 1 .. $_[0]
102 }
103
104 our $CONFIG; # this node's configuration
105 our $SECURE;
106
107 our $RUNIQ; # remote uniq value
108 our $UNIQ; # per-process/node unique cookie
109 our $NODE;
110 our $ID = "a";
111
112 our %NODE; # node id to transport mapping, or "undef", for local node
113 our (%PORT, %PORT_DATA); # local ports
114
115 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
116 our %LMON; # monitored _local_ ports
117
118 our $GLOBAL; # true if node is a global ("directory") node
119 our %BINDS;
120 our $BINDS; # our listeners, as arrayref
121
122 our $SRCNODE; # holds the sending node _object_ during _inject
123
124 # initialise names for non-networked operation
125 {
126 # ~54 bits, for local port names, lowercase $ID appended
127 my $now = AE::now;
128 $UNIQ =
129 (join "",
130 map $alnum[$_],
131 $$ / 62 % 62,
132 $$ % 62,
133 (int $now ) % 62,
134 (int $now * 100) % 62,
135 (int $now * 10000) % 62,
136 ) . nonce62 4
137 ;
138
139 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
140 $RUNIQ = nonce62 10;
141 $RUNIQ =~ s/(.)$/\U$1/;
142
143 $NODE = "";
144 }
145
146 sub NODE() {
147 $NODE
148 }
149
150 sub node_of($) {
151 my ($node, undef) = split /#/, $_[0], 2;
152
153 $node
154 }
155
156 BEGIN {
157 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
158 ? sub () { 1 }
159 : sub () { 0 };
160 }
161
162 our $DELAY_TIMER;
163 our @DELAY_QUEUE;
164
165 our $delay_run = sub {
166 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
167 };
168
169 sub delay($) {
170 push @DELAY_QUEUE, shift;
171 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
172 }
173
174 =item $AnyEvent::MP::Kernel::SRCNODE
175
176 During execution of a message callback, this variable contains the node ID
177 of the origin node.
178
179 The main use of this variable is for debugging output - there are probably
180 very few other cases where you need to know the source node ID.
181
182 =cut
183
184 sub _inject {
185 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
186 &{ $PORT{+shift} or return };
187 }
188
189 # this function adds a node-ref, so you can send stuff to it
190 # it is basically the central routing component.
191 sub add_node {
192 $NODE{$_[0]} || do {
193 my ($node) = @_;
194
195 length $node
196 or Carp::croak "'undef' or the empty string are not valid node/port IDs";
197
198 # registers itself in %NODE
199 new AnyEvent::MP::Node::Remote $node
200 }
201 }
202
203 sub snd(@) {
204 my ($nodeid, $portid) = split /#/, shift, 2;
205
206 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
207
208 ($NODE{$nodeid} || add_node $nodeid)
209 ->{send} (["$portid", @_]);
210 }
211
212 sub port_is_local($) {
213 my ($nodeid, undef) = split /#/, $_[0], 2;
214
215 $nodeid eq $NODE
216 }
217
218 =item snd_to_func $node, $func, @args
219
220 Expects a node ID and a name of a function. Asynchronously tries to call
221 this function with the given arguments on that node.
222
223 This function can be used to implement C<spawn>-like interfaces.
224
225 =cut
226
227 sub snd_to_func($$;@) {
228 my $nodeid = shift;
229
230 # on $NODE, we artificially delay... (for spawn)
231 # this is very ugly - maybe we should simply delay ALL messages,
232 # to avoid deep recursion issues. but that's so... slow...
233 $AnyEvent::MP::Node::Self::DELAY = 1
234 if $nodeid ne $NODE;
235
236 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
237 }
238
239 =item snd_on $node, @msg
240
241 Executes C<snd> with the given C<@msg> (which must include the destination
242 port) on the given node.
243
244 =cut
245
246 sub snd_on($@) {
247 my $node = shift;
248 snd $node, snd => @_;
249 }
250
251 =item eval_on $node, $string[, @reply]
252
253 Evaluates the given string as Perl expression on the given node. When
254 @reply is specified, then it is used to construct a reply message with
255 C<"$@"> and any results from the eval appended.
256
257 =cut
258
259 sub eval_on($$;@) {
260 my $node = shift;
261 snd $node, eval => @_;
262 }
263
264 sub kil(@) {
265 my ($nodeid, $portid) = split /#/, shift, 2;
266
267 length $portid
268 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
269
270 ($NODE{$nodeid} || add_node $nodeid)
271 ->kill ("$portid", @_);
272 }
273
274 #############################################################################
275 # node monitoring and info
276
277 =item $bool = node_is_up $nodeid
278
279 Returns true if the given node is "up", that is, the kernel thinks it has
280 a working connection to it.
281
282 More precisely, if the node is up, returns C<1>. If the node is currently
283 connecting or otherwise known but not connected, returns C<0>. If nothing
284 is known about the node, returns C<undef>.
285
286 =cut
287
288 sub node_is_up($) {
289 ($_[0] eq $NODE) || ($NODE{$_[0]} or return)->{transport}
290 ? 1 : 0
291 }
292
293 =item @nodes = up_nodes
294
295 Return the node IDs of all nodes that are currently connected (excluding
296 the node itself).
297
298 =cut
299
300 sub up_nodes() {
301 map $_->{id}, grep $_->{transport}, values %NODE
302 }
303
304 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
305
306 Registers a callback that is called each time a node goes up (a connection
307 is established) or down (the connection is lost).
308
309 Node up messages can only be followed by node down messages for the same
310 node, and vice versa.
311
312 Note that monitoring a node is usually better done by monitoring its node
313 port. This function is mainly of interest to modules that are concerned
314 about the network topology and low-level connection handling.
315
316 Callbacks I<must not> block and I<should not> send any messages.
317
318 The function returns an optional guard which can be used to unregister
319 the monitoring callback again.
320
321 Example: make sure you call function C<newnode> for all nodes that are up
322 or go up (and down).
323
324 newnode $_, 1 for up_nodes;
325 mon_nodes \&newnode;
326
327 =cut
328
329 our %MON_NODES;
330
331 sub mon_nodes($) {
332 my ($cb) = @_;
333
334 $MON_NODES{$cb+0} = $cb;
335
336 defined wantarray
337 and Guard::guard { delete $MON_NODES{$cb+0} }
338 }
339
340 sub _inject_nodeevent($$;@) {
341 my ($node, $up, @reason) = @_;
342
343 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
344
345 for my $cb (values %MON_NODES) {
346 eval { $cb->($node->{id}, $up, @reason); 1 }
347 or AE::log die => $@;
348 }
349 }
350
351 #############################################################################
352 # self node code
353
354 sub _kill {
355 my $port = shift;
356
357 delete $PORT{$port}
358 or return; # killing nonexistent ports is O.K.
359 delete $PORT_DATA{$port};
360
361 my $mon = delete $LMON{$port}
362 or !@_
363 or AE::log die => "unmonitored local port $port died with reason: @_";
364
365 $_->(@_) for values %$mon;
366 }
367
368 sub _monitor {
369 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
370 unless exists $PORT{$_[1]};
371
372 $LMON{$_[1]}{$_[2]+0} = $_[2];
373 }
374
375 sub _unmonitor {
376 delete $LMON{$_[1]}{$_[2]+0}
377 if exists $LMON{$_[1]};
378 }
379
380 sub _secure_check {
381 $SECURE
382 and die "remote execution not allowed\n";
383 }
384
385 our %NODE_REQ;
386
387 %NODE_REQ = (
388 # "mproto" - monitoring protocol
389
390 # monitoring
391 mon0 => sub { # stop monitoring a port for another node
392 my $portid = shift;
393 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
394 },
395 mon1 => sub { # start monitoring a port for another node
396 my $portid = shift;
397 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
398 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
399 delete $node->{rmon}{$portid};
400 $node->send (["", kil0 => $portid, @_])
401 if $node && $node->{transport};
402 };
403 },
404 # another node has killed a monitored port
405 kil0 => sub {
406 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
407 or return;
408
409 $_->(@_) for @$cbs;
410 },
411 # another node wants to kill a local port
412 kil1 => \&_kill,
413
414 # "public" services - not actually public
415
416 # relay message to another node / generic echo
417 snd => sub {
418 &snd
419 },
420 # ask if a node supports the given request, only works for fixed tags
421 can => sub {
422 my $method = shift;
423 snd @_, exists $NODE_REQ{$method};
424 },
425
426 # random utilities
427 eval => sub {
428 &_secure_check;
429 my @res = do { package main; eval shift };
430 snd @_, "$@", @res if @_;
431 },
432 time => sub {
433 snd @_, AE::now;
434 },
435 devnull => sub {
436 #
437 },
438 "" => sub {
439 # empty messages are keepalives or similar devnull-applications
440 },
441 );
442
443 # the node port
444 new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE
445
446 $PORT{""} = sub {
447 my $tag = shift;
448 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
449 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
450 };
451
452 our $MPROTO = 1;
453
454 # tell everybody who connects our nproto
455 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
456 $_[0]{local_greeting}{mproto} = $MPROTO;
457 };
458
459 #############################################################################
460 # seed management, try to keep connections to all seeds at all times
461
462 our %SEED_NODE; # seed ID => node ID|undef
463 our %NODE_SEED; # map node ID to seed ID
464 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
465 our $SEED_WATCHER;
466 our $SEED_RETRY;
467
468 sub seed_connect {
469 my ($seed) = @_;
470
471 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
472 or Carp::croak "$seed: unparsable seed address";
473
474 AE::log 9 => "trying connect to seed node $seed.";
475
476 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
477 $host, $port,
478 on_greeted => sub {
479 # called after receiving remote greeting, learn remote node name
480
481 # we rely on untrusted data here (the remote node name) this is
482 # hopefully ok, as this can at most be used for DOSing, which is easy
483 # when you can do MITM anyway.
484
485 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
486 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
487 require AnyEvent::MP::Global; # every seed becomes a global node currently
488 delete $SEED_NODE{$seed};
489 } else {
490 $SEED_NODE{$seed} = $_[0]{remote_node};
491 $NODE_SEED{$_[0]{remote_node}} = $seed;
492 # also start global service, if not running
493 # we need to check here in addition to the mon_nodes below
494 # because we might only learn late that a node is a seed
495 # and then we might already be connected
496 snd $_[0]{remote_node}, "g_slave"
497 unless $_[0]{remote_greeting}{global};
498 }
499 },
500 sub {
501 delete $SEED_CONNECT{$seed};
502 }
503 ;
504 }
505
506 sub seed_all {
507 my @seeds = grep
508 !exists $SEED_CONNECT{$_}
509 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
510 keys %SEED_NODE;
511
512 if (@seeds) {
513 # start connection attempt for every seed we are not connected to yet
514 seed_connect $_
515 for @seeds;
516
517 $SEED_RETRY = $SEED_RETRY * 2 + rand;
518 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
519 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
520
521 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
522
523 } else {
524 # all seeds connected or connecting, no need to restart timer
525 undef $SEED_WATCHER;
526 }
527 }
528
529 sub seed_again {
530 $SEED_RETRY = 1;
531 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
532 }
533
534 # sets new seed list, starts connecting
535 sub set_seeds(@) {
536 %SEED_NODE = ();
537 %NODE_SEED = ();
538 %SEED_CONNECT = ();
539
540 @SEED_NODE{@_} = ();
541
542 seed_all;
543 }
544
545 mon_nodes sub {
546 return unless exists $NODE_SEED{$_[0]};
547
548 if ($_[1]) {
549 # each time a connection to a seed node goes up, make
550 # sure it runs the global service.
551 snd $_[0], "g_slave"
552 unless $NODE{$_[0]}{transport}{remote_greeting}{global};
553 } else {
554 # if we lost the connection to a seed node, make sure we are seeding
555 seed_again;
556 }
557 };
558
559 #############################################################################
560 # keepalive code - used to kepe conenctions to certain nodes alive
561 # only used by global code atm., but ought to be exposed somehow.
562 #TODO: should probbaly be done directly by node objects
563
564 our $KEEPALIVE_RETRY;
565 our $KEEPALIVE_WATCHER;
566 our %KEEPALIVE; # we want to keep these nodes alive
567 our %KEEPALIVE_DOWN; # nodes that are down currently
568
569 sub keepalive_all {
570 AE::log 9 => "keepalive: trying to establish connections with: "
571 . (join " ", keys %KEEPALIVE_DOWN)
572 . ".";
573
574 (add_node $_)->connect
575 for keys %KEEPALIVE_DOWN;
576
577 $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2 + rand;
578 $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
579 if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
580
581 $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
582 }
583
584 sub keepalive_again {
585 $KEEPALIVE_RETRY = 1;
586 keepalive_all;
587 }
588
589 sub keepalive_add {
590 return if $KEEPALIVE{$_[0]}++;
591
592 return if node_is_up $_[0];
593 undef $KEEPALIVE_DOWN{$_[0]};
594 keepalive_again;
595 }
596
597 sub keepalive_del {
598 return if --$KEEPALIVE{$_[0]};
599
600 delete $KEEPALIVE {$_[0]};
601 delete $KEEPALIVE_DOWN{$_[0]};
602
603 undef $KEEPALIVE_WATCHER
604 unless %KEEPALIVE_DOWN;
605 }
606
607 mon_nodes sub {
608 return unless exists $KEEPALIVE{$_[0]};
609
610 if ($_[1]) {
611 delete $KEEPALIVE_DOWN{$_[0]};
612
613 undef $KEEPALIVE_WATCHER
614 unless %KEEPALIVE_DOWN;
615 } else {
616 # lost the conenction, try to connect again
617 undef $KEEPALIVE_DOWN{$_[0]};
618 keepalive_again;
619 }
620 };
621
622 #############################################################################
623 # talk with/to global nodes
624
625 # protocol messages:
626 #
627 # sent by global nodes
628 # g_global - node became global, similar to global=1 greeting
629 #
630 # database protocol
631 # g_slave database - make other global node master of the sender
632 # g_set database - global node's database to other global nodes
633 # g_upd family set del - update single family (all to global)
634 #
635 # slave <-> global protocol
636 # g_find node - query addresses for node (slave to global)
637 # g_found node binds - node addresses (global to slave)
638 # g_db_family family id - send g_reply with data (global to slave)
639 # g_db_keys family id - send g_reply with data (global to slave)
640 # g_db_values family id - send g_reply with data (global to slave)
641 # g_reply id result - result of any query (global to slave)
642 # g_mon1 family - start to monitor family, replies with g_chg1
643 # g_mon0 family - stop monitoring family
644 # g_chg1 family hash - initial value of family when starting to monitor
645 # g_chg2 family set del - like g_upd, but for monitoring only
646 #
647 # internal database families
648 # "'l" -> node -> listeners
649 # "'g" -> node -> undef
650 # ...
651 #
652
653 # used on all nodes:
654 our $MASTER; # the global node we bind ourselves to
655 our $MASTER_MON;
656 our %LOCAL_DB; # this node database
657
658 our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
659
660 our $GPROTO = 1;
661
662 # tell everybody who connects our nproto
663 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
664 $_[0]{local_greeting}{gproto} = $GPROTO;
665 };
666
667 #############################################################################
668 # master selection
669
670 # master requests
671 our %GLOBAL_REQ; # $id => \@req
672
673 sub global_req_add {
674 my ($id, $req) = @_;
675
676 return if exists $GLOBAL_REQ{$id};
677
678 $GLOBAL_REQ{$id} = $req;
679
680 snd $MASTER, @$req
681 if $MASTER;
682 }
683
684 sub global_req_del {
685 delete $GLOBAL_REQ{$_[0]};
686 }
687
688 #################################
689 # master rpc
690
691 our %GLOBAL_RES;
692 our $GLOBAL_RES_ID = "a";
693
694 sub global_call {
695 my $id = ++$GLOBAL_RES_ID;
696 $GLOBAL_RES{$id} = pop;
697 global_req_add $id, [@_, $id];
698 }
699
700 $NODE_REQ{g_reply} = sub {
701 my $id = shift;
702 global_req_del $id;
703 my $cb = delete $GLOBAL_RES{$id}
704 or return;
705 &$cb
706 };
707
708 #################################
709
710 sub g_find {
711 global_req_add "g_find $_[0]", [g_find => $_[0]];
712 }
713
714 # reply for g_find started in Node.pm
715 $NODE_REQ{g_found} = sub {
716 global_req_del "g_find $_[0]";
717
718 my $node = $NODE{$_[0]} or return;
719
720 $node->connect_to ($_[1]);
721 };
722
723 sub master_set {
724 $MASTER = $_[0];
725
726 snd $MASTER, g_slave => \%LOCAL_DB;
727
728 # (re-)send queued requests
729 snd $MASTER, @$_
730 for values %GLOBAL_REQ;
731 }
732
733 sub master_search {
734 #TODO: should also look for other global nodes, but we don't know them #d#
735 for (keys %NODE_SEED) {
736 if (node_is_up $_) {
737 master_set $_;
738 return;
739 }
740 }
741
742 $MASTER_MON = mon_nodes sub {
743 return unless $_[1]; # we are only interested in node-ups
744 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
745
746 master_set $_[0];
747
748 $MASTER_MON = mon_nodes sub {
749 if ($_[0] eq $MASTER && !$_[1]) {
750 undef $MASTER;
751 master_search ();
752 }
753 };
754 };
755 }
756
757 # other node wants to make us the master, so start the global service
758 $NODE_REQ{g_slave} = sub {
759 my ($db) = @_;
760
761 # load global module and redo the request
762 require AnyEvent::MP::Global;
763 &{ $NODE_REQ{g_slave} }
764 };
765
766 #############################################################################
767 # local database operations
768
769 # local database management
770
771 sub db_set($$;$) {
772 my ($family, $subkey) = @_;
773
774 # if (ref $_[1]) {
775 # # bulk
776 # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
777 # $LOCAL_DB{$_[0]} = $_[1];
778 # snd $MASTER, g_upd => $_[0] => $_[1], \@del
779 # if defined $MASTER;
780 # } else {
781 # single-key
782 $LOCAL_DB{$family}{$subkey} = $_[2];
783 snd $MASTER, g_upd => $family => { $subkey => $_[2] }
784 if defined $MASTER;
785 # }
786
787 defined wantarray
788 and Guard::guard { db_del $family => $subkey }
789 }
790
791 sub db_del($@) {
792 my $family = shift;
793
794 my @del = grep exists $LOCAL_DB{$family}{$_}, @_;
795
796 return unless @del;
797
798 delete @{ $LOCAL_DB{$family} }{@del};
799 snd $MASTER, g_upd => $family => undef, \@del
800 if defined $MASTER;
801 }
802
803 # database query
804
805 sub db_family {
806 my ($family, $cb) = @_;
807 global_call g_db_family => $family, $cb;
808 }
809
810 sub db_keys {
811 my ($family, $cb) = @_;
812 global_call g_db_keys => $family, $cb;
813 }
814
815 sub db_values {
816 my ($family, $cb) = @_;
817 global_call g_db_values => $family, $cb;
818 }
819
820 # database monitoring
821
822 our %LOCAL_MON; # f, reply
823 our %MON_DB; # f, k, value
824
825 sub db_mon($@) {
826 my ($family, $cb) = @_;
827
828 if (my $db = $MON_DB{$family}) {
829 # we already monitor, so create a "dummy" change event
830 # this is postponed, which might be too late (we could process
831 # change events), so disable the callback at first
832 $LOCAL_MON{$family}{$cb+0} = sub { };
833 AE::postpone {
834 return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
835
836 # set actual callback
837 $LOCAL_MON{$family}{$cb+0} = $cb;
838 $cb->($db, [keys %$db]);
839 };
840 } else {
841 # new monitor, request chg1 from upstream
842 $LOCAL_MON{$family}{$cb+0} = $cb;
843 global_req_add "mon1 $family" => [g_mon1 => $family];
844 $MON_DB{$family} = {};
845 }
846
847 defined wantarray
848 and Guard::guard {
849 my $mon = $LOCAL_MON{$family};
850 delete $mon->{$cb+0};
851
852 unless (%$mon) {
853 global_req_del "mon1 $family";
854
855 # no global_req, because we don't care if we are not connected
856 snd $MASTER, g_mon0 => $family
857 if $MASTER;
858
859 delete $LOCAL_MON{$family};
860 delete $MON_DB{$family};
861 }
862 }
863 }
864
865 # full update
866 $NODE_REQ{g_chg1} = sub {
867 return unless $SRCNODE eq $MASTER;
868 my ($f, $ndb) = @_;
869
870 my $db = $MON_DB{$f};
871 my (@a, @c, @d);
872
873 # add or replace keys
874 while (my ($k, $v) = each %$ndb) {
875 exists $db->{$k}
876 ? push @c, $k
877 : push @a, $k;
878 $db->{$k} = $v;
879 }
880
881 # delete keys that are no longer present
882 for (grep !exists $ndb->{$_}, keys %$db) {
883 delete $db->{$_};
884 push @d, $_;
885 }
886
887 $_->($db, \@a, \@c, \@d)
888 for values %{ $LOCAL_MON{$_[0]} };
889 };
890
891 # incremental update
892 $NODE_REQ{g_chg2} = sub {
893 return unless $SRCNODE eq $MASTER;
894 my ($family, $set, $del) = @_;
895
896 my $db = $MON_DB{$family};
897
898 my (@a, @c);
899
900 while (my ($k, $v) = each %$set) {
901 exists $db->{$k}
902 ? push @c, $k
903 : push @a, $k;
904 $db->{$k} = $v;
905 }
906
907 delete @$db{@$del};
908
909 $_->($db, \@a, \@c, $del)
910 for values %{ $LOCAL_MON{$family} };
911 };
912
913 #############################################################################
914 # configure
915
916 sub nodename {
917 require POSIX;
918 (POSIX::uname ())[1]
919 }
920
921 sub _resolve($) {
922 my ($nodeid) = @_;
923
924 my $cv = AE::cv;
925 my @res;
926
927 $cv->begin (sub {
928 my %seen;
929 my @refs;
930 for (sort { $a->[0] <=> $b->[0] } @res) {
931 push @refs, $_->[1] unless $seen{$_->[1]}++
932 }
933 shift->send (@refs);
934 });
935
936 my $idx;
937 for my $t (split /,/, $nodeid) {
938 my $pri = ++$idx;
939
940 $t = length $t ? nodename . ":$t" : nodename
941 if $t =~ /^\d*$/;
942
943 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
944 or Carp::croak "$t: unparsable transport descriptor";
945
946 $port = "0" if $port eq "*";
947
948 if ($host eq "*") {
949 $cv->begin;
950
951 my $get_addr = sub {
952 my @addr;
953
954 require Net::Interface;
955
956 # Net::Interface hangs on some systems, so hope for the best
957 local $SIG{ALRM} = 'DEFAULT';
958 alarm 2;
959
960 for my $if (Net::Interface->interfaces) {
961 # we statically lower-prioritise ipv6 here, TODO :()
962 for $_ ($if->address (Net::Interface::AF_INET ())) {
963 next if /^\x7f/; # skip localhost etc.
964 push @addr, $_;
965 }
966 for ($if->address (Net::Interface::AF_INET6 ())) {
967 #next if $if->scope ($_) <= 2;
968 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
969 push @addr, $_;
970 }
971 }
972
973 alarm 0;
974
975 @addr
976 };
977
978 my @addr;
979
980 if (AnyEvent::WIN32) {
981 @addr = $get_addr->();
982 } else {
983 # use a child process, as Net::Interface is big, and we need it only once.
984
985 pipe my $r, my $w
986 or die "pipe: $!";
987
988 if (fork eq 0) {
989 close $r;
990 syswrite $w, pack "(C/a*)*", $get_addr->();
991 require POSIX;
992 POSIX::_exit (0);
993 } else {
994 close $w;
995
996 my $addr;
997
998 1 while sysread $r, $addr, 1024, length $addr;
999
1000 @addr = unpack "(C/a*)*", $addr;
1001 }
1002 }
1003
1004 for my $ip (@addr) {
1005 push @res, [
1006 $pri += 1e-5,
1007 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
1008 ];
1009 }
1010 $cv->end;
1011 } else {
1012 $cv->begin;
1013 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
1014 for (@_) {
1015 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
1016 push @res, [
1017 $pri += 1e-5,
1018 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
1019 ];
1020 }
1021 $cv->end;
1022 };
1023 }
1024 }
1025
1026 $cv->end;
1027
1028 $cv
1029 }
1030
1031 sub configure(@) {
1032 unshift @_, "profile" if @_ & 1;
1033 my (%kv) = @_;
1034
1035 my $profile = delete $kv{profile};
1036
1037 $profile = nodename
1038 unless defined $profile;
1039
1040 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
1041
1042 $SECURE = $CONFIG->{secure};
1043
1044 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
1045
1046 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
1047
1048 my $node_obj = delete $NODE{$NODE}; # we do not support doing stuff before configure
1049
1050 $NODE = $node;
1051
1052 $NODE =~ s/%n/nodename/ge;
1053
1054 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
1055 # nodes with randomised node names do not need randomised port names
1056 $UNIQ = "";
1057 }
1058
1059 $node_obj->{id} = $NODE;
1060 $NODE{$NODE} = $node_obj;
1061
1062 my $seeds = $CONFIG->{seeds};
1063 my $binds = $CONFIG->{binds};
1064
1065 $binds ||= ["*"];
1066
1067 AE::log 8 => "node $NODE starting up.";
1068
1069 $BINDS = [];
1070 %BINDS = ();
1071
1072 for (map _resolve $_, @$binds) {
1073 for my $bind ($_->recv) {
1074 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1075 or Carp::croak "$bind: unparsable local bind address";
1076
1077 my $listener = AnyEvent::MP::Transport::mp_server
1078 $host,
1079 $port,
1080 prepare => sub {
1081 my (undef, $host, $port) = @_;
1082 $bind = AnyEvent::Socket::format_hostport $host, $port;
1083 0
1084 },
1085 ;
1086 $BINDS{$bind} = $listener;
1087 push @$BINDS, $bind;
1088 }
1089 }
1090
1091 db_set "'l" => $NODE => $BINDS;
1092
1093 AE::log 8 => "node listens on [@$BINDS].";
1094
1095 # connect to all seednodes
1096 set_seeds map $_->recv, map _resolve $_, @$seeds;
1097 master_search;
1098
1099 # save gobs of memory
1100 undef &_resolve;
1101 *configure = sub (@){ };
1102
1103 for (@{ $CONFIG->{services} }) {
1104 if (ref) {
1105 my ($func, @args) = @$_;
1106 (load_func $func)->(@args);
1107 } elsif (s/::$//) {
1108 eval "require $_";
1109 die $@ if $@;
1110 } else {
1111 (load_func $_)->();
1112 }
1113 }
1114
1115 eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
1116 die "$@" if $@;
1117 }
1118
1119 =back
1120
1121 =head1 LOGGING
1122
1123 AnyEvent::MP::Kernel logs high-level information about the current node,
1124 when nodes go up and down, and most runtime errors. It also logs some
1125 debugging and trace messages about network maintainance, such as seed
1126 connections and global node management.
1127
1128 =head1 SEE ALSO
1129
1130 L<AnyEvent::MP>.
1131
1132 =head1 AUTHOR
1133
1134 Marc Lehmann <schmorp@schmorp.de>
1135 http://home.schmorp.de/
1136
1137 =cut
1138
1139 1
1140