ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.112
Committed: Tue Oct 2 13:58:07 2012 UTC (11 years, 8 months ago) by root
Branch: MAIN
Changes since 1.111: +18 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 $AnyEvent::MP::Kernel::SRCNODE # contains msg origin node id, for debugging
10
11 snd_to_func $node, $func, @args # send msg to function
12 snd_on $node, @msg # snd message again (relay)
13 eval_on $node, $string[, @reply] # execute perl code on another node
14
15 node_is_up $nodeid # return true if a node is connected
16 @nodes = up_nodes # return a list of all connected nodes
17 $guard = mon_nodes $callback->($node, $is_up, @reason) # connections up/downs
18
19 =head1 DESCRIPTION
20
21 This module implements most of the inner workings of AnyEvent::MP. It
22 offers mostly lower-level functions that deal with network connectivity
23 and special requests.
24
25 You normally interface with AnyEvent::MP through a higher level interface
26 such as L<AnyEvent::MP> and L<Coro::MP>, although there is nothing wrong
27 with using the functions from this module.
28
29 =head1 GLOBALS AND FUNCTIONS
30
31 =over 4
32
33 =cut
34
35 package AnyEvent::MP::Kernel;
36
37 use common::sense;
38 use Carp ();
39
40 use AnyEvent ();
41 use Guard ();
42
43 use AnyEvent::MP::Node;
44 use AnyEvent::MP::Transport;
45
46 use base "Exporter";
47
48 # for re-export in AnyEvent::MP and Coro::MP
49 our @EXPORT_API = qw(
50 NODE $NODE
51 configure
52 node_of port_is_local
53 snd kil
54 db_set db_del
55 db_mon db_family db_keys db_values
56 );
57
58 our @EXPORT_OK = (
59 # these are internal
60 qw(
61 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
62 add_node load_func
63 ),
64 @EXPORT_API,
65 );
66
67 our @EXPORT = qw(
68 snd_to_func snd_on eval_on
69 port_is_local
70 up_nodes mon_nodes node_is_up
71 );
72
73 sub load_func($) {
74 my $func = $_[0];
75
76 unless (defined &$func) {
77 my $pkg = $func;
78 do {
79 $pkg =~ s/::[^:]+$//
80 or return sub { die "unable to resolve function '$func'" };
81
82 local $@;
83 unless (eval "require $pkg; 1") {
84 my $error = $@;
85 $error =~ /^Can't locate .*.pm in \@INC \(/
86 or return sub { die $error };
87 }
88 } until defined &$func;
89 }
90
91 \&$func
92 }
93
94 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
95
96 sub nonce($) {
97 join "", map chr rand 256, 1 .. $_[0]
98 }
99
100 sub nonce62($) {
101 join "", map $alnum[rand 62], 1 .. $_[0]
102 }
103
104 our $CONFIG; # this node's configuration
105 our $SECURE;
106
107 our $RUNIQ; # remote uniq value
108 our $UNIQ; # per-process/node unique cookie
109 our $NODE;
110 our $ID = "a";
111
112 our %NODE; # node id to transport mapping, or "undef", for local node
113 our (%PORT, %PORT_DATA); # local ports
114
115 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
116 our %LMON; # monitored _local_ ports
117
118 #our $GLOBAL; # true if node is a global ("directory") node
119 our %BINDS;
120 our $BINDS; # our listeners, as arrayref
121
122 our $SRCNODE; # holds the sending node _object_ during _inject
123 our $GLOBAL; # true when this is a global node (only set by AnyEvent::MP::Global)
124
125 # initialise names for non-networked operation
126 {
127 # ~54 bits, for local port names, lowercase $ID appended
128 my $now = AE::now;
129 $UNIQ =
130 (join "",
131 map $alnum[$_],
132 $$ / 62 % 62,
133 $$ % 62,
134 (int $now ) % 62,
135 (int $now * 100) % 62,
136 (int $now * 10000) % 62,
137 ) . nonce62 4
138 ;
139
140 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
141 $RUNIQ = nonce62 10;
142 $RUNIQ =~ s/(.)$/\U$1/;
143
144 $NODE = "";
145 }
146
147 sub NODE() {
148 $NODE
149 }
150
151 sub node_of($) {
152 my ($node, undef) = split /#/, $_[0], 2;
153
154 $node
155 }
156
157 BEGIN {
158 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
159 ? sub () { 1 }
160 : sub () { 0 };
161 }
162
163 our $DELAY_TIMER;
164 our @DELAY_QUEUE;
165
166 our $delay_run = sub {
167 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
168 };
169
170 sub delay($) {
171 push @DELAY_QUEUE, shift;
172 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
173 }
174
175 =item $AnyEvent::MP::Kernel::SRCNODE
176
177 During execution of a message callback, this variable contains the node ID
178 of the origin node.
179
180 The main use of this variable is for debugging output - there are probably
181 very few other cases where you need to know the source node ID.
182
183 =cut
184
185 sub _inject {
186 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
187 &{ $PORT{+shift} or return };
188 }
189
190 # this function adds a node-ref, so you can send stuff to it
191 # it is basically the central routing component.
192 sub add_node {
193 $NODE{$_[0]} || do {
194 my ($node) = @_;
195
196 length $node
197 or Carp::croak "'undef' or the empty string are not valid node/port IDs";
198
199 # registers itself in %NODE
200 new AnyEvent::MP::Node::Remote $node
201 }
202 }
203
204 sub snd(@) {
205 my ($nodeid, $portid) = split /#/, shift, 2;
206
207 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
208
209 ($NODE{$nodeid} || add_node $nodeid)
210 ->{send} (["$portid", @_]);
211 }
212
213 sub port_is_local($) {
214 my ($nodeid, undef) = split /#/, $_[0], 2;
215
216 $nodeid eq $NODE
217 }
218
219 =item snd_to_func $node, $func, @args
220
221 Expects a node ID and a name of a function. Asynchronously tries to call
222 this function with the given arguments on that node.
223
224 This function can be used to implement C<spawn>-like interfaces.
225
226 =cut
227
228 sub snd_to_func($$;@) {
229 my $nodeid = shift;
230
231 # on $NODE, we artificially delay... (for spawn)
232 # this is very ugly - maybe we should simply delay ALL messages,
233 # to avoid deep recursion issues. but that's so... slow...
234 $AnyEvent::MP::Node::Self::DELAY = 1
235 if $nodeid ne $NODE;
236
237 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
238 }
239
240 =item snd_on $node, @msg
241
242 Executes C<snd> with the given C<@msg> (which must include the destination
243 port) on the given node.
244
245 =cut
246
247 sub snd_on($@) {
248 my $node = shift;
249 snd $node, snd => @_;
250 }
251
252 =item eval_on $node, $string[, @reply]
253
254 Evaluates the given string as Perl expression on the given node. When
255 @reply is specified, then it is used to construct a reply message with
256 C<"$@"> and any results from the eval appended.
257
258 =cut
259
260 sub eval_on($$;@) {
261 my $node = shift;
262 snd $node, eval => @_;
263 }
264
265 sub kil(@) {
266 my ($nodeid, $portid) = split /#/, shift, 2;
267
268 length $portid
269 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
270
271 ($NODE{$nodeid} || add_node $nodeid)
272 ->kill ("$portid", @_);
273 }
274
275 #############################################################################
276 # node monitoring and info
277
278 =item $bool = node_is_up $nodeid
279
280 Returns true if the given node is "up", that is, the kernel thinks it has
281 a working connection to it.
282
283 More precisely, if the node is up, returns C<1>. If the node is currently
284 connecting or otherwise known but not connected, returns C<0>. If nothing
285 is known about the node, returns C<undef>.
286
287 =cut
288
289 sub node_is_up($) {
290 ($_[0] eq $NODE) || ($NODE{$_[0]} or return)->{transport}
291 ? 1 : 0
292 }
293
294 =item @nodes = up_nodes
295
296 Return the node IDs of all nodes that are currently connected (excluding
297 the node itself).
298
299 =cut
300
301 sub up_nodes() {
302 map $_->{id}, grep $_->{transport}, values %NODE
303 }
304
305 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
306
307 Registers a callback that is called each time a node goes up (a connection
308 is established) or down (the connection is lost).
309
310 Node up messages can only be followed by node down messages for the same
311 node, and vice versa.
312
313 Note that monitoring a node is usually better done by monitoring its node
314 port. This function is mainly of interest to modules that are concerned
315 about the network topology and low-level connection handling.
316
317 Callbacks I<must not> block and I<should not> send any messages.
318
319 The function returns an optional guard which can be used to unregister
320 the monitoring callback again.
321
322 Example: make sure you call function C<newnode> for all nodes that are up
323 or go up (and down).
324
325 newnode $_, 1 for up_nodes;
326 mon_nodes \&newnode;
327
328 =cut
329
330 our %MON_NODES;
331
332 sub mon_nodes($) {
333 my ($cb) = @_;
334
335 $MON_NODES{$cb+0} = $cb;
336
337 defined wantarray
338 and Guard::guard { delete $MON_NODES{$cb+0} }
339 }
340
341 sub _inject_nodeevent($$;@) {
342 my ($node, $up, @reason) = @_;
343
344 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
345
346 for my $cb (values %MON_NODES) {
347 eval { $cb->($node->{id}, $up, @reason); 1 }
348 or AE::log die => $@;
349 }
350 }
351
352 #############################################################################
353 # self node code
354
355 sub _kill {
356 my $port = shift;
357
358 delete $PORT{$port}
359 or return; # killing nonexistent ports is O.K.
360 delete $PORT_DATA{$port};
361
362 my $mon = delete $LMON{$port}
363 or !@_
364 or AE::log die => "unmonitored local port $port died with reason: @_";
365
366 $_->(@_) for values %$mon;
367 }
368
369 sub _monitor {
370 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
371 unless exists $PORT{$_[1]};
372
373 $LMON{$_[1]}{$_[2]+0} = $_[2];
374 }
375
376 sub _unmonitor {
377 delete $LMON{$_[1]}{$_[2]+0}
378 if exists $LMON{$_[1]};
379 }
380
381 sub _secure_check {
382 $SECURE
383 and die "remote execution not allowed\n";
384 }
385
386 our %NODE_REQ;
387
388 %NODE_REQ = (
389 # "mproto" - monitoring protocol
390
391 # monitoring
392 mon0 => sub { # stop monitoring a port for another node
393 my $portid = shift;
394 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
395 },
396 mon1 => sub { # start monitoring a port for another node
397 my $portid = shift;
398 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
399 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
400 delete $node->{rmon}{$portid};
401 $node->send (["", kil0 => $portid, @_])
402 if $node && $node->{transport};
403 };
404 },
405 # another node has killed a monitored port
406 kil0 => sub {
407 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
408 or return;
409
410 $_->(@_) for @$cbs;
411 },
412 # another node wants to kill a local port
413 kil1 => \&_kill,
414
415 # "public" services - not actually public
416
417 # relay message to another node / generic echo
418 snd => sub {
419 &snd
420 },
421 # ask if a node supports the given request, only works for fixed tags
422 can => sub {
423 my $method = shift;
424 snd @_, exists $NODE_REQ{$method};
425 },
426
427 # random utilities
428 eval => sub {
429 &_secure_check;
430 my @res = do { package main; eval shift };
431 snd @_, "$@", @res if @_;
432 },
433 time => sub {
434 snd @_, AE::now;
435 },
436 devnull => sub {
437 #
438 },
439 "" => sub {
440 # empty messages are keepalives or similar devnull-applications
441 },
442 );
443
444 # the node port
445 new AnyEvent::MP::Node::Self $NODE; # registers itself in %NODE
446
447 $PORT{""} = sub {
448 my $tag = shift;
449 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
450 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
451 };
452
453 our $MPROTO = 1;
454
455 # tell everybody who connects our nproto
456 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
457 $_[0]{local_greeting}{mproto} = $MPROTO;
458 };
459
460 #############################################################################
461 # seed management, try to keep connections to all seeds at all times
462
463 our %SEED_NODE; # seed ID => node ID|undef
464 our %NODE_SEED; # map node ID to seed ID
465 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
466 our $SEED_WATCHER;
467 our $SEED_RETRY;
468 our %GLOBAL_NODE; # global => undef
469
470 sub seed_connect {
471 my ($seed) = @_;
472
473 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
474 or Carp::croak "$seed: unparsable seed address";
475
476 AE::log 9 => "trying connect to seed node $seed.";
477
478 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
479 $host, $port,
480 on_greeted => sub {
481 # called after receiving remote greeting, learn remote node name
482
483 # we rely on untrusted data here (the remote node name) this is
484 # hopefully ok, as this can at most be used for DOSing, which is easy
485 # when you can do MITM anyway.
486
487 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
488 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
489 require AnyEvent::MP::Global; # every seed becomes a global node currently
490 delete $SEED_NODE{$seed};
491 } else {
492 $SEED_NODE{$seed} = $_[0]{remote_node};
493 $NODE_SEED{$_[0]{remote_node}} = $seed;
494
495 # also start global service, in case it isn't running
496 # since we probably switch conenctions, maybe we don't need to do this here?
497 snd $_[0]{remote_node}, "g_slave";
498 }
499 },
500 sub {
501 delete $SEED_CONNECT{$seed};
502 }
503 ;
504 }
505
506 sub seed_all {
507 my @seeds = grep
508 !exists $SEED_CONNECT{$_}
509 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
510 keys %SEED_NODE;
511
512 if (@seeds) {
513 # start connection attempt for every seed we are not connected to yet
514 seed_connect $_
515 for @seeds;
516
517 $SEED_RETRY = $SEED_RETRY * 2;
518 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
519 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
520
521 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
522
523 } else {
524 # all seeds connected or connecting, no need to restart timer
525 undef $SEED_WATCHER;
526 }
527 }
528
529 sub seed_again {
530 $SEED_RETRY = (1 + rand) * 0.6;
531 $SEED_WATCHER ||= AE::timer $SEED_RETRY, 0, \&seed_all;
532 }
533
534 # sets new seed list, starts connecting
535 sub set_seeds(@) {
536 %SEED_NODE = ();
537 %NODE_SEED = ();
538 %SEED_CONNECT = ();
539
540 @SEED_NODE{@_} = ();
541
542 seed_all;
543 }
544
545 # normal nodes only record global node connections
546 $NODE_REQ{g_global} = sub {
547 undef $GLOBAL_NODE{$SRCNODE};
548 };
549
550 mon_nodes sub {
551 delete $GLOBAL_NODE{$_[0]}
552 unless $_[1];
553
554 return unless exists $NODE_SEED{$_[0]};
555
556 if ($_[1]) {
557 # each time a connection to a seed node goes up, make
558 # sure it runs the global service.
559 snd $_[0], "g_slave";
560 } else {
561 # if we lost the connection to a seed node, make sure we are seeding
562 seed_again;
563 }
564 };
565
566 #############################################################################
567 # keepalive code - used to kepe conenctions to certain nodes alive
568 # only used by global code atm., but ought to be exposed somehow.
569 #TODO: should probbaly be done directly by node objects
570
571 our $KEEPALIVE_RETRY;
572 our $KEEPALIVE_WATCHER;
573 our %KEEPALIVE; # we want to keep these nodes alive
574 our %KEEPALIVE_DOWN; # nodes that are down currently
575
576 sub keepalive_all {
577 AE::log 9 => "keepalive: trying to establish connections with: "
578 . (join " ", keys %KEEPALIVE_DOWN)
579 . ".";
580
581 (add_node $_)->connect
582 for keys %KEEPALIVE_DOWN;
583
584 $KEEPALIVE_RETRY = $KEEPALIVE_RETRY * 2;
585 $KEEPALIVE_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
586 if $KEEPALIVE_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
587
588 $KEEPALIVE_WATCHER = AE::timer $KEEPALIVE_RETRY, 0, \&keepalive_all;
589 }
590
591 sub keepalive_again {
592 $KEEPALIVE_RETRY = (1 + rand) * 0.3;
593 keepalive_all;
594 }
595
596 sub keepalive_add {
597 return if $KEEPALIVE{$_[0]}++;
598
599 return if node_is_up $_[0];
600 undef $KEEPALIVE_DOWN{$_[0]};
601 keepalive_again;
602 }
603
604 sub keepalive_del {
605 return if --$KEEPALIVE{$_[0]};
606
607 delete $KEEPALIVE {$_[0]};
608 delete $KEEPALIVE_DOWN{$_[0]};
609
610 undef $KEEPALIVE_WATCHER
611 unless %KEEPALIVE_DOWN;
612 }
613
614 mon_nodes sub {
615 return unless exists $KEEPALIVE{$_[0]};
616
617 if ($_[1]) {
618 delete $KEEPALIVE_DOWN{$_[0]};
619
620 undef $KEEPALIVE_WATCHER
621 unless %KEEPALIVE_DOWN;
622 } else {
623 # lost the conenction, try to connect again
624 undef $KEEPALIVE_DOWN{$_[0]};
625 keepalive_again;
626 }
627 };
628
629 #############################################################################
630 # talk with/to global nodes
631
632 # protocol messages:
633 #
634 # sent by global nodes
635 # g_global - global nodes send this to all others
636 #
637 # database protocol
638 # g_slave database - make other global node master of the sender
639 # g_set database - global node's database to other global nodes
640 # g_upd family set del - update single family (any to global)
641 #
642 # slave <-> global protocol
643 # g_find node - query addresses for node (slave to global)
644 # g_found node binds - node addresses (global to slave)
645 # g_db_family family id - send g_reply with data (global to slave)
646 # g_db_keys family id - send g_reply with data (global to slave)
647 # g_db_values family id - send g_reply with data (global to slave)
648 # g_reply id result - result of any query (global to slave)
649 # g_mon1 family - start to monitor family, replies with g_chg1
650 # g_mon0 family - stop monitoring family
651 # g_chg1 family hash - initial value of family when starting to monitor
652 # g_chg2 family set del - like g_upd, but for monitoring only
653 #
654 # internal database families:
655 # "'l" -> node -> listeners
656 # "'g" -> node -> undef
657 # ...
658 #
659
660 # used on all nodes:
661 our $MASTER; # the global node we bind ourselves to
662 our $MASTER_MON;
663 our %LOCAL_DB; # this node database
664
665 our $GPROTO = 1;
666
667 # tell everybody who connects our nproto
668 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
669 $_[0]{local_greeting}{gproto} = $GPROTO;
670 };
671
672 #############################################################################
673 # master selection
674
675 # master requests
676 our %GLOBAL_REQ; # $id => \@req
677
678 sub global_req_add {
679 my ($id, $req) = @_;
680
681 return if exists $GLOBAL_REQ{$id};
682
683 $GLOBAL_REQ{$id} = $req;
684
685 snd $MASTER, @$req
686 if $MASTER;
687 }
688
689 sub global_req_del {
690 delete $GLOBAL_REQ{$_[0]};
691 }
692
693 #################################
694 # master rpc
695
696 our %GLOBAL_RES;
697 our $GLOBAL_RES_ID = "a";
698
699 sub global_call {
700 my $id = ++$GLOBAL_RES_ID;
701 $GLOBAL_RES{$id} = pop;
702 global_req_add $id, [@_, $id];
703 }
704
705 $NODE_REQ{g_reply} = sub {
706 my $id = shift;
707 global_req_del $id;
708 my $cb = delete $GLOBAL_RES{$id}
709 or return;
710 &$cb
711 };
712
713 #################################
714
715 sub g_find {
716 global_req_add "g_find $_[0]", [g_find => $_[0]];
717 }
718
719 # reply for g_find started in Node.pm
720 $NODE_REQ{g_found} = sub {
721 global_req_del "g_find $_[0]";
722
723 my $node = $NODE{$_[0]} or return;
724
725 $node->connect_to ($_[1]);
726 };
727
728 sub master_set {
729 $MASTER = $_[0];
730 AE::log 8 => "new master node: $MASTER.";
731
732 $MASTER_MON = mon_nodes sub {
733 if ($_[0] eq $MASTER && !$_[1]) {
734 undef $MASTER;
735 master_search ();
736 }
737 };
738
739 snd $MASTER, g_slave => \%LOCAL_DB;
740
741 # (re-)send queued requests
742 snd $MASTER, @$_
743 for values %GLOBAL_REQ;
744 }
745
746 sub master_search {
747 AE::log 9 => "starting search for master node.";
748
749 #TODO: should also look for other global nodes, but we don't know them #d#
750 for (keys %NODE_SEED) {
751 if (node_is_up $_) {
752 master_set $_;
753 return;
754 }
755 }
756
757 $MASTER_MON = mon_nodes sub {
758 return unless $_[1]; # we are only interested in node-ups
759 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
760
761 master_set $_[0];
762 };
763 }
764
765 # other node wants to make us the master, so start the global service
766 $NODE_REQ{g_slave} = sub {
767 # load global module and redo the request
768 require AnyEvent::MP::Global;
769 &{ $NODE_REQ{g_slave} }
770 };
771
772 #############################################################################
773 # local database operations
774
775 # canonical probably not needed
776 our $sv_eq_coder = JSON::XS->new->utf8->allow_nonref;
777
778 # are the two scalars equal? very very ugly and slow, need better way
779 sub sv_eq($$) {
780 ref $_[0] || ref $_[1]
781 ? (JSON::XS::encode $sv_eq_coder, $_[0]) eq (JSON::XS::encode $sv_eq_coder, $_[1])
782 : $_[0] eq $_[1]
783 && defined $_[0] == defined $_[1]
784 }
785
786 # local database management
787
788 sub db_del($@) {
789 my $family = shift;
790
791 my @del = grep exists $LOCAL_DB{$family}{$_}, @_;
792
793 return unless @del;
794
795 delete @{ $LOCAL_DB{$family} }{@del};
796 snd $MASTER, g_upd => $family => undef, \@del
797 if defined $MASTER;
798 }
799
800 sub db_set($$;$) {
801 my ($family, $subkey) = @_;
802
803 # if (ref $_[1]) {
804 # # bulk
805 # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
806 # $LOCAL_DB{$_[0]} = $_[1];
807 # snd $MASTER, g_upd => $_[0] => $_[1], \@del
808 # if defined $MASTER;
809 # } else {
810 # single-key
811 unless (exists $LOCAL_DB{$family}{$subkey} && sv_eq $LOCAL_DB{$family}{$subkey}, $_[2]) {
812 $LOCAL_DB{$family}{$subkey} = $_[2];
813 snd $MASTER, g_upd => $family => { $subkey => $_[2] }
814 if defined $MASTER;
815 }
816 # }
817
818 defined wantarray
819 and Guard::guard { db_del $family => $subkey }
820 }
821
822 # database query
823
824 sub db_family {
825 my ($family, $cb) = @_;
826 global_call g_db_family => $family, $cb;
827 }
828
829 sub db_keys {
830 my ($family, $cb) = @_;
831 global_call g_db_keys => $family, $cb;
832 }
833
834 sub db_values {
835 my ($family, $cb) = @_;
836 global_call g_db_values => $family, $cb;
837 }
838
839 # database monitoring
840
841 our %LOCAL_MON; # f, reply
842 our %MON_DB; # f, k, value
843
844 sub db_mon($@) {
845 my ($family, $cb) = @_;
846
847 if (my $db = $MON_DB{$family}) {
848 # we already monitor, so create a "dummy" change event
849 # this is postponed, which might be too late (we could process
850 # change events), so disable the callback at first
851 $LOCAL_MON{$family}{$cb+0} = sub { };
852 AE::postpone {
853 return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
854
855 # set actual callback
856 $LOCAL_MON{$family}{$cb+0} = $cb;
857 $cb->($db, [keys %$db]);
858 };
859 } else {
860 # new monitor, request chg1 from upstream
861 $LOCAL_MON{$family}{$cb+0} = $cb;
862 global_req_add "mon1 $family" => [g_mon1 => $family];
863 $MON_DB{$family} = {};
864 }
865
866 defined wantarray
867 and Guard::guard {
868 my $mon = $LOCAL_MON{$family};
869 delete $mon->{$cb+0};
870
871 unless (%$mon) {
872 global_req_del "mon1 $family";
873
874 # no global_req, because we don't care if we are not connected
875 snd $MASTER, g_mon0 => $family
876 if $MASTER;
877
878 delete $LOCAL_MON{$family};
879 delete $MON_DB{$family};
880 }
881 }
882 }
883
884 # full update
885 $NODE_REQ{g_chg1} = sub {
886 return unless $SRCNODE eq $MASTER;
887 my ($f, $ndb) = @_;
888
889 my $db = $MON_DB{$f};
890 my (@a, @c, @d);
891
892 # add or replace keys
893 while (my ($k, $v) = each %$ndb) {
894 exists $db->{$k}
895 ? push @c, $k
896 : push @a, $k;
897 $db->{$k} = $v;
898 }
899
900 # delete keys that are no longer present
901 for (grep !exists $ndb->{$_}, keys %$db) {
902 delete $db->{$_};
903 push @d, $_;
904 }
905
906 $_->($db, \@a, \@c, \@d)
907 for values %{ $LOCAL_MON{$_[0]} };
908 };
909
910 # incremental update
911 $NODE_REQ{g_chg2} = sub {
912 return unless $SRCNODE eq $MASTER;
913 my ($family, $set, $del) = @_;
914
915 my $db = $MON_DB{$family};
916
917 my (@a, @c);
918
919 while (my ($k, $v) = each %$set) {
920 exists $db->{$k}
921 ? push @c, $k
922 : push @a, $k;
923 $db->{$k} = $v;
924 }
925
926 delete @$db{@$del};
927
928 $_->($db, \@a, \@c, $del)
929 for values %{ $LOCAL_MON{$family} };
930 };
931
932 #############################################################################
933 # configure
934
935 sub nodename {
936 require POSIX;
937 (POSIX::uname ())[1]
938 }
939
940 sub _resolve($) {
941 my ($nodeid) = @_;
942
943 my $cv = AE::cv;
944 my @res;
945
946 $cv->begin (sub {
947 my %seen;
948 my @refs;
949 for (sort { $a->[0] <=> $b->[0] } @res) {
950 push @refs, $_->[1] unless $seen{$_->[1]}++
951 }
952 shift->send (@refs);
953 });
954
955 my $idx;
956 for my $t (split /,/, $nodeid) {
957 my $pri = ++$idx;
958
959 $t = length $t ? nodename . ":$t" : nodename
960 if $t =~ /^\d*$/;
961
962 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
963 or Carp::croak "$t: unparsable transport descriptor";
964
965 $port = "0" if $port eq "*";
966
967 if ($host eq "*") {
968 $cv->begin;
969
970 my $get_addr = sub {
971 my @addr;
972
973 require Net::Interface;
974
975 # Net::Interface hangs on some systems, so hope for the best
976 local $SIG{ALRM} = 'DEFAULT';
977 alarm 2;
978
979 for my $if (Net::Interface->interfaces) {
980 # we statically lower-prioritise ipv6 here, TODO :()
981 for $_ ($if->address (Net::Interface::AF_INET ())) {
982 next if /^\x7f/; # skip localhost etc.
983 push @addr, $_;
984 }
985 for ($if->address (Net::Interface::AF_INET6 ())) {
986 #next if $if->scope ($_) <= 2;
987 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
988 push @addr, $_;
989 }
990 }
991
992 alarm 0;
993
994 @addr
995 };
996
997 my @addr;
998
999 if (AnyEvent::WIN32) {
1000 @addr = $get_addr->();
1001 } else {
1002 # use a child process, as Net::Interface is big, and we need it only once.
1003
1004 pipe my $r, my $w
1005 or die "pipe: $!";
1006
1007 if (fork eq 0) {
1008 close $r;
1009 syswrite $w, pack "(C/a*)*", $get_addr->();
1010 require POSIX;
1011 POSIX::_exit (0);
1012 } else {
1013 close $w;
1014
1015 my $addr;
1016
1017 1 while sysread $r, $addr, 1024, length $addr;
1018
1019 @addr = unpack "(C/a*)*", $addr;
1020 }
1021 }
1022
1023 for my $ip (@addr) {
1024 push @res, [
1025 $pri += 1e-5,
1026 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
1027 ];
1028 }
1029 $cv->end;
1030 } else {
1031 $cv->begin;
1032 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
1033 for (@_) {
1034 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
1035 push @res, [
1036 $pri += 1e-5,
1037 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
1038 ];
1039 }
1040 $cv->end;
1041 };
1042 }
1043 }
1044
1045 $cv->end;
1046
1047 $cv
1048 }
1049
1050 our @POST_CONFIGURE;
1051
1052 # not yet documented
1053 sub post_configure(&) {
1054 die "AnyEvent::MP::Kernel::post_configure must be called in void context" if defined wantarray;
1055
1056 push @POST_CONFIGURE, @_;
1057 (shift @POST_CONFIGURE)->() while $NODE && @POST_CONFIGURE;
1058 }
1059
1060 sub configure(@) {
1061 unshift @_, "profile" if @_ & 1;
1062 my (%kv) = @_;
1063
1064 my $profile = delete $kv{profile};
1065
1066 $profile = nodename
1067 unless defined $profile;
1068
1069 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
1070
1071 $SECURE = $CONFIG->{secure};
1072
1073 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
1074
1075 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
1076
1077 my $node_obj = delete $NODE{$NODE}; # we do not support doing stuff before configure
1078
1079 $NODE = $node;
1080
1081 $NODE =~ s/%n/nodename/ge;
1082
1083 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
1084 # nodes with randomised node names do not need randomised port names
1085 $UNIQ = "";
1086 }
1087
1088 $node_obj->{id} = $NODE;
1089 $NODE{$NODE} = $node_obj;
1090
1091 my $seeds = $CONFIG->{seeds};
1092 my $binds = $CONFIG->{binds};
1093
1094 $binds ||= ["*"];
1095
1096 AE::log 8 => "node $NODE starting up.";
1097
1098 $BINDS = [];
1099 %BINDS = ();
1100
1101 for (map _resolve $_, @$binds) {
1102 for my $bind ($_->recv) {
1103 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1104 or Carp::croak "$bind: unparsable local bind address";
1105
1106 my $listener = AnyEvent::MP::Transport::mp_server
1107 $host,
1108 $port,
1109 prepare => sub {
1110 my (undef, $host, $port) = @_;
1111 $bind = AnyEvent::Socket::format_hostport $host, $port;
1112 0
1113 },
1114 ;
1115 $BINDS{$bind} = $listener;
1116 push @$BINDS, $bind;
1117 }
1118 }
1119
1120 AE::log 9 => "running post config hooks and init.";
1121
1122 # might initialise Global, so need to do it before db_set
1123 post_configure { };
1124
1125 db_set "'l" => $NODE => $BINDS;
1126
1127 AE::log 8 => "node listens on [@$BINDS].";
1128
1129 # connect to all seednodes
1130 set_seeds map $_->recv, map _resolve $_, @$seeds;
1131 master_search;
1132
1133 # save gobs of memory
1134 undef &_resolve;
1135 *configure = sub (@){ };
1136
1137 AE::log 9 => "starting services.";
1138
1139 for (@{ $CONFIG->{services} }) {
1140 if (ref) {
1141 my ($func, @args) = @$_;
1142 (load_func $func)->(@args);
1143 } elsif (s/::$//) {
1144 eval "require $_";
1145 die $@ if $@;
1146 } else {
1147 (load_func $_)->();
1148 }
1149 }
1150
1151 eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
1152 die "$@" if $@;
1153 }
1154
1155 =back
1156
1157 =head1 LOGGING
1158
1159 AnyEvent::MP::Kernel logs high-level information about the current node,
1160 when nodes go up and down, and most runtime errors. It also logs some
1161 debugging and trace messages about network maintainance, such as seed
1162 connections and global node management.
1163
1164 =head1 SEE ALSO
1165
1166 L<AnyEvent::MP>.
1167
1168 =head1 AUTHOR
1169
1170 Marc Lehmann <schmorp@schmorp.de>
1171 http://home.schmorp.de/
1172
1173 =cut
1174
1175 1
1176