ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.97
Committed: Wed Mar 21 23:48:39 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.96: +11 -12 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29
30 use AnyEvent ();
31 use Guard ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT_OK = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 );
41
42 our @EXPORT = qw(
43 add_node load_func snd_to_func snd_on eval_on
44
45 NODE $NODE node_of snd kil port_is_local
46 configure
47 up_nodes mon_nodes node_is_up
48 db_set db_del
49 db_mon db_family db_keys db_values
50 );
51
52 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
53
54 This value is called with an error or warning message, when e.g. a
55 connection could not be created, authorisation failed and so on.
56
57 It I<must not> block or send messages -queue it and use an idle watcher if
58 you need to do any of these things.
59
60 C<$level> should be C<0> for messages to be logged always, C<1> for
61 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62 node connectivity and services, C<8> for debugging messages and C<9> for
63 tracing messages.
64
65 The default simply logs the message to STDERR.
66
67 =item @AnyEvent::MP::Kernel::WARN
68
69 All code references in this array are called for every log message, from
70 the default C<$WARN> handler. This is an easy way to tie into the log
71 messages without disturbing others.
72
73 =cut
74
75 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
76 our @WARN;
77 our $WARN = sub {
78 &$_ for @WARN;
79
80 return if $WARNLEVEL < $_[0];
81
82 my ($level, $msg) = @_;
83
84 $msg =~ s/\n$//;
85
86 printf STDERR "%s <%d> %s\n",
87 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
88 $level,
89 $msg;
90 };
91
92 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
93
94 The maximum level at which warning messages will be printed to STDERR by
95 the default warn handler.
96
97 =cut
98
99 sub load_func($) {
100 my $func = $_[0];
101
102 unless (defined &$func) {
103 my $pkg = $func;
104 do {
105 $pkg =~ s/::[^:]+$//
106 or return sub { die "unable to resolve function '$func'" };
107
108 local $@;
109 unless (eval "require $pkg; 1") {
110 my $error = $@;
111 $error =~ /^Can't locate .*.pm in \@INC \(/
112 or return sub { die $error };
113 }
114 } until defined &$func;
115 }
116
117 \&$func
118 }
119
120 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
121
122 sub nonce($) {
123 join "", map chr rand 256, 1 .. $_[0]
124 }
125
126 sub nonce62($) {
127 join "", map $alnum[rand 62], 1 .. $_[0]
128 }
129
130 sub gen_uniq {
131 my $now = AE::now;
132 (join "",
133 map $alnum[$_],
134 $$ / 62 % 62,
135 $$ % 62,
136 (int $now ) % 62,
137 (int $now * 100) % 62,
138 (int $now * 10000) % 62,
139 ) . nonce62 4;
140 }
141
142 our $CONFIG; # this node's configuration
143 our $SECURE = sub { 1 };
144
145 our $RUNIQ; # remote uniq value
146 our $UNIQ; # per-process/node unique cookie
147 our $NODE;
148 our $ID = "a";
149
150 our %NODE; # node id to transport mapping, or "undef", for local node
151 our (%PORT, %PORT_DATA); # local ports
152
153 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
154 our %LMON; # monitored _local_ ports
155
156 our $GLOBAL; # true if node is a global ("directory") node
157 our %BINDS;
158 our $BINDS; # our listeners, as arrayref
159
160 our $SRCNODE; # holds the sending node _object_ during _inject
161
162 sub _init_names {
163 # ~54 bits, for local port names, lowercase $ID appended
164 $UNIQ = gen_uniq;
165
166 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
167 $RUNIQ = nonce62 10;
168 $RUNIQ =~ s/(.)$/\U$1/;
169
170 $NODE = "anon/$RUNIQ";
171 }
172
173 _init_names;
174
175 sub NODE() {
176 $NODE
177 }
178
179 sub node_of($) {
180 my ($node, undef) = split /#/, $_[0], 2;
181
182 $node
183 }
184
185 BEGIN {
186 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
187 ? sub () { 1 }
188 : sub () { 0 };
189 }
190
191 our $DELAY_TIMER;
192 our @DELAY_QUEUE;
193
194 our $delay_run = sub {
195 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
196 };
197
198 sub delay($) {
199 push @DELAY_QUEUE, shift;
200 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
201 }
202
203 =item $AnyEvent::MP::Kernel::SRCNODE
204
205 During execution of a message callback, this variable contains the node ID
206 of the origin node.
207
208 The main use of this variable is for debugging output - there are probably
209 very few other cases where you need to know the source node ID.
210
211 =cut
212
213 sub _inject {
214 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
215 &{ $PORT{+shift} or return };
216 }
217
218 # this function adds a node-ref, so you can send stuff to it
219 # it is basically the central routing component.
220 sub add_node {
221 my ($node) = @_;
222
223 length $node
224 or Carp::croak "'undef' or the empty string are not valid node/port IDs";
225
226 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
227 }
228
229 sub snd(@) {
230 my ($nodeid, $portid) = split /#/, shift, 2;
231
232 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
233
234 ($NODE{$nodeid} || add_node $nodeid)
235 ->{send} (["$portid", @_]);
236 }
237
238 =item $is_local = port_is_local $port
239
240 Returns true iff the port is a local port.
241
242 =cut
243
244 sub port_is_local($) {
245 my ($nodeid, undef) = split /#/, $_[0], 2;
246
247 $NODE{$nodeid} == $NODE{""}
248 }
249
250 =item snd_to_func $node, $func, @args
251
252 Expects a node ID and a name of a function. Asynchronously tries to call
253 this function with the given arguments on that node.
254
255 This function can be used to implement C<spawn>-like interfaces.
256
257 =cut
258
259 sub snd_to_func($$;@) {
260 my $nodeid = shift;
261
262 # on $NODE, we artificially delay... (for spawn)
263 # this is very ugly - maybe we should simply delay ALL messages,
264 # to avoid deep recursion issues. but that's so... slow...
265 $AnyEvent::MP::Node::Self::DELAY = 1
266 if $nodeid ne $NODE;
267
268 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
269 }
270
271 =item snd_on $node, @msg
272
273 Executes C<snd> with the given C<@msg> (which must include the destination
274 port) on the given node.
275
276 =cut
277
278 sub snd_on($@) {
279 my $node = shift;
280 snd $node, snd => @_;
281 }
282
283 =item eval_on $node, $string[, @reply]
284
285 Evaluates the given string as Perl expression on the given node. When
286 @reply is specified, then it is used to construct a reply message with
287 C<"$@"> and any results from the eval appended.
288
289 =cut
290
291 sub eval_on($$;@) {
292 my $node = shift;
293 snd $node, eval => @_;
294 }
295
296 sub kil(@) {
297 my ($nodeid, $portid) = split /#/, shift, 2;
298
299 length $portid
300 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
301
302 ($NODE{$nodeid} || add_node $nodeid)
303 ->kill ("$portid", @_);
304 }
305
306 #############################################################################
307 # node monitoring and info
308
309 =item node_is_up $nodeid
310
311 Returns true if the given node is "up", that is, the kernel thinks it has
312 a working connection to it.
313
314 If the node is up, returns C<1>. If the node is currently connecting or
315 otherwise known but not connected, returns C<0>. If nothing is known about
316 the node, returns C<undef>.
317
318 =cut
319
320 sub node_is_up($) {
321 ($NODE{$_[0]} or return)->{transport}
322 ? 1 : 0
323 }
324
325 =item up_nodes
326
327 Return the node IDs of all nodes that are currently connected (excluding
328 the node itself).
329
330 =cut
331
332 sub up_nodes() {
333 map $_->{id}, grep $_->{transport}, values %NODE
334 }
335
336 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
337
338 Registers a callback that is called each time a node goes up (a connection
339 is established) or down (the connection is lost).
340
341 Node up messages can only be followed by node down messages for the same
342 node, and vice versa.
343
344 Note that monitoring a node is usually better done by monitoring its node
345 port. This function is mainly of interest to modules that are concerned
346 about the network topology and low-level connection handling.
347
348 Callbacks I<must not> block and I<should not> send any messages.
349
350 The function returns an optional guard which can be used to unregister
351 the monitoring callback again.
352
353 Example: make sure you call function C<newnode> for all nodes that are up
354 or go up (and down).
355
356 newnode $_, 1 for up_nodes;
357 mon_nodes \&newnode;
358
359 =cut
360
361 our %MON_NODES;
362
363 sub mon_nodes($) {
364 my ($cb) = @_;
365
366 $MON_NODES{$cb+0} = $cb;
367
368 defined wantarray
369 and Guard::guard { delete $MON_NODES{$cb+0} }
370 }
371
372 sub _inject_nodeevent($$;@) {
373 my ($node, $up, @reason) = @_;
374
375 for my $cb (values %MON_NODES) {
376 eval { $cb->($node->{id}, $up, @reason); 1 }
377 or $WARN->(1, $@);
378 }
379
380 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
381 }
382
383 #############################################################################
384 # self node code
385
386 sub _kill {
387 my $port = shift;
388
389 delete $PORT{$port}
390 or return; # killing nonexistent ports is O.K.
391 delete $PORT_DATA{$port};
392
393 my $mon = delete $LMON{$port}
394 or !@_
395 or $WARN->(2, "unmonitored local port $port died with reason: @_");
396
397 $_->(@_) for values %$mon;
398 }
399
400 sub _monitor {
401 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
402 unless exists $PORT{$_[1]};
403
404 $LMON{$_[1]}{$_[2]+0} = $_[2];
405 }
406
407 sub _unmonitor {
408 delete $LMON{$_[1]}{$_[2]+0}
409 if exists $LMON{$_[1]};
410 }
411
412 sub _secure_check {
413 &$SECURE
414 or die "remote execution attempt by insecure node\n";
415 }
416
417 our %NODE_REQ = (
418 # internal services
419
420 # monitoring
421 mon0 => sub { # stop monitoring a port for another node
422 my $portid = shift;
423 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
424 },
425 mon1 => sub { # start monitoring a port for another node
426 my $portid = shift;
427 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
428 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
429 delete $node->{rmon}{$portid};
430 $node->send (["", kil0 => $portid, @_])
431 if $node && $node->{transport};
432 };
433 },
434 # another node has killed a monitored port
435 kil0 => sub {
436 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
437 or return;
438
439 $_->(@_) for @$cbs;
440 },
441
442 # "public" services - not actually public
443
444 # another node wants to kill a local port
445 kil => \&_kill,
446
447 # relay message to another node / generic echo
448 snd => sub {
449 &_secure_check;
450 &snd
451 },
452
453 # random utilities
454 eval => sub {
455 &_secure_check;
456 my @res = do { package main; eval shift };
457 snd @_, "$@", @res if @_;
458 },
459 time => sub {
460 &_secure_check;
461 snd @_, AE::now;
462 },
463 devnull => sub {
464 #
465 },
466 "" => sub {
467 # empty messages are keepalives or similar devnull-applications
468 },
469 );
470
471 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
472 $PORT{""} = sub {
473 my $tag = shift;
474 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
475 $WARN->(2, "error processing node message from $SRCNODE: $@") if $@;
476 };
477
478 our $NPROTO = 1;
479
480 # tell everybody who connects our nproto
481 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
482 $_[0]{local_greeting}{nproto} = $NPROTO;
483 };
484
485 #############################################################################
486 # seed management, try to keep connections to all seeds at all times
487
488 our %SEED_NODE; # seed ID => node ID|undef
489 our %NODE_SEED; # map node ID to seed ID
490 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
491 our $SEED_WATCHER;
492 our $SEED_RETRY;
493
494 sub seed_connect {
495 my ($seed) = @_;
496
497 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
498 or Carp::croak "$seed: unparsable seed address";
499
500 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
501
502 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
503 $host, $port,
504 on_greeted => sub {
505 # called after receiving remote greeting, learn remote node name
506
507 # we rely on untrusted data here (the remote node name) this is
508 # hopefully ok, as this can at most be used for DOSing, which is easy
509 # when you can do MITM anyway.
510
511 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
512 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
513 require AnyEvent::MP::Global; # every seed becomes a global node currently
514 delete $SEED_NODE{$seed};
515 } else {
516 $SEED_NODE{$seed} = $_[0]{remote_node};
517 $NODE_SEED{$_[0]{remote_node}} = $seed;
518 # also start global service, if not running
519 # we need to check here in addition to the mon_nodes below
520 # because we might only learn late that a node is a seed
521 # and then we might already be connected
522 snd $_[0]{remote_node}, "g_slave"
523 unless $_[0]{remote_greeting}{global};
524 }
525 },
526 sub {
527 delete $SEED_CONNECT{$seed};
528 }
529 ;
530 }
531
532 sub seed_all {
533 my @seeds = grep
534 !exists $SEED_CONNECT{$_}
535 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
536 keys %SEED_NODE;
537
538 if (@seeds) {
539 # start connection attempt for every seed we are not connected to yet
540 seed_connect $_
541 for @seeds;
542
543 $SEED_RETRY = $SEED_RETRY * 2 + rand;
544 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
545 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
546
547 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
548
549 } else {
550 # all seeds connected or connecting, no need to restart timer
551 undef $SEED_WATCHER;
552 }
553 }
554
555 sub seed_again {
556 $SEED_RETRY = 1;
557 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
558 }
559
560 # sets new seed list, starts connecting
561 sub set_seeds(@) {
562 %SEED_NODE = ();
563 %NODE_SEED = ();
564 %SEED_CONNECT = ();
565
566 @SEED_NODE{@_} = ();
567
568 seed_all;
569 }
570
571 mon_nodes sub {
572 return unless exists $NODE_SEED{$_[0]};
573
574 if ($_[1]) {
575 # each time a connection to a seed node goes up, make
576 # sure it runs the global service.
577 snd $_[0], "g_slave"
578 unless $NODE{$_[0]}{transport}{remote_greeting}{global};
579 } else {
580 # if we lost the connection to a seed node, make sure we are seeding
581 seed_again;
582 }
583 };
584
585 #############################################################################
586 # talk with/to global nodes
587
588 # protocol messages:
589 #
590 # sent by all slave nodes (slave to master)
591 # g_slave database - make other global node master of the sender
592 #
593 # sent by any node to global nodes
594 # g_set database - set whole database
595 # g_upd family set del - update single family
596 # g_del family key - delete key from database
597 # g_get family key reply... - send reply with data
598 #
599 # send by global nodes
600 # g_global - node became global, similar to global=1 greeting
601 #
602 # database families
603 # "'l" -> node -> listeners
604 # "'g" -> node -> undef
605 # ...
606 #
607
608 # used on all nodes:
609 our $MASTER; # the global node we bind ourselves to
610 our $MASTER_MON;
611 our %LOCAL_DB; # this node database
612
613 our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
614
615 our $GPROTO = 1;
616
617 # tell everybody who connects our nproto
618 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
619 $_[0]{local_greeting}{gproto} = $GPROTO;
620 };
621
622 #############################################################################
623 # master selection
624
625 # master requests
626 our %GLOBAL_REQ; # $id => \@req
627
628 sub global_req_add {
629 my ($id, $req) = @_;
630
631 return if exists $GLOBAL_REQ{$id};
632
633 $GLOBAL_REQ{$id} = $req;
634
635 snd $MASTER, @$req
636 if $MASTER;
637 }
638
639 sub global_req_del {
640 delete $GLOBAL_REQ{$_[0]};
641 }
642
643 #################################
644 # master rpc
645
646 our %GLOBAL_RES;
647 our $GLOBAL_RES_ID = "a";
648
649 sub global_call {
650 my $id = ++$GLOBAL_RES_ID;
651 $GLOBAL_RES{$id} = pop;
652 global_req_add $id, [@_, $id];
653 }
654
655 $NODE_REQ{g_reply} = sub {
656 my $id = shift;
657 global_req_del $id;
658 my $cb = delete $GLOBAL_RES{$id}
659 or return;
660 &$cb
661 };
662
663 #################################
664
665 sub g_find {
666 global_req_add "g_find $_[0]", [g_find => $_[0]];
667 }
668
669 # reply for g_find started in Node.pm
670 $NODE_REQ{g_found} = sub {
671 global_req_del "g_find $_[0]";
672
673 my $node = $NODE{$_[0]} or return;
674
675 $node->connect_to ($_[1]);
676 };
677
678 sub master_set {
679 $MASTER = $_[0];
680
681 snd $MASTER, g_slave => \%LOCAL_DB;
682
683 # (re-)send queued requests
684 snd $MASTER, @$_
685 for values %GLOBAL_REQ;
686 }
687
688 sub master_search {
689 #TODO: should also look for other global nodes, but we don't know them #d#
690 for (keys %NODE_SEED) {
691 if (node_is_up $_) {
692 master_set $_;
693 return;
694 }
695 }
696
697 $MASTER_MON = mon_nodes sub {
698 return unless $_[1]; # we are only interested in node-ups
699 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
700
701 master_set $_[0];
702
703 $MASTER_MON = mon_nodes sub {
704 if ($_[0] eq $MASTER && !$_[1]) {
705 undef $MASTER;
706 master_search ();
707 }
708 };
709 };
710 }
711
712 # other node wants to make us the master
713 $NODE_REQ{g_slave} = sub {
714 my ($db) = @_;
715
716 # load global module and redo the request
717 require AnyEvent::MP::Global;
718 &{ $NODE_REQ{g_slave} }
719 };
720
721 #############################################################################
722 # local database operations
723
724 # local database management
725
726 sub db_set($$;$) {
727 my ($family, $subkey) = @_;
728
729 # if (ref $_[1]) {
730 # # bulk
731 # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
732 # $LOCAL_DB{$_[0]} = $_[1];
733 # snd $MASTER, g_upd => $_[0] => $_[1], \@del
734 # if defined $MASTER;
735 # } else {
736 # single-key
737 $LOCAL_DB{$family}{$subkey} = $_[2];
738 snd $MASTER, g_upd => $family => { $subkey => $_[2] }
739 if defined $MASTER;
740 # }
741
742 defined wantarray
743 and Guard::guard { db_del $family => $subkey }
744 }
745
746 sub db_del($@) {
747 my $family = shift;
748
749 delete @{ $LOCAL_DB{$family} }{@_};
750 snd $MASTER, g_upd => $family => undef, \@_
751 if defined $MASTER;
752 }
753
754 # database query
755
756 sub db_family {
757 my ($family, $cb) = @_;
758 global_call g_db_family => $family, $cb;
759 }
760
761 sub db_keys {
762 my ($family, $cb) = @_;
763 global_call g_db_keys => $family, $cb;
764 }
765
766 sub db_values {
767 my ($family, $cb) = @_;
768 global_call g_db_values => $family, $cb;
769 }
770
771 # database monitoring
772
773 our %LOCAL_MON; # f, reply
774 our %MON_DB; # f, k, value
775
776 sub db_mon($@) {
777 my ($family, $cb) = @_;
778
779 if (my $db = $MON_DB{$family}) {
780 # we already monitor, so create a "dummy" change event
781 # this is postponed, which might be too late (we could process
782 # change events), so disable the callback at first
783 $LOCAL_MON{$family}{$cb+0} = sub { };
784 AE::postpone {
785 return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
786
787 # set actual callback
788 $LOCAL_MON{$family}{$cb+0} = $cb;
789 $cb->($db, [keys %$db]);
790 };
791 } else {
792 # new monitor, request chg1 from upstream
793 $LOCAL_MON{$family}{$cb+0} = $cb;
794 global_req_add "mon1 $family" => [g_mon1 => $family];
795 $MON_DB{$family} = {};
796 }
797
798 defined wantarray
799 and Guard::guard {
800 my $mon = $LOCAL_MON{$family};
801 delete $mon->{$cb+0};
802
803 unless (%$mon) {
804 global_req_del "mon1 $family";
805
806 # no global_req, because we don't care if we are not connected
807 snd $MASTER, g_mon0 => $family
808 if $MASTER;
809
810 delete $LOCAL_MON{$family};
811 delete $MON_DB{$family};
812 }
813 }
814 }
815
816 # full update
817 $NODE_REQ{g_chg1} = sub {
818 return unless $SRCNODE eq $MASTER;
819 my ($f, $ndb) = @_;
820
821 my $db = $MON_DB{$f};
822 my (@a, @c, @d);
823
824 # add or replace keys
825 while (my ($k, $v) = each %$ndb) {
826 exists $db->{$k}
827 ? push @c, $k
828 : push @a, $k;
829 $db->{$k} = $v;
830 }
831
832 # delete keys that are no longer present
833 for (grep !exists $ndb->{$_}, keys %$db) {
834 delete $db->{$_};
835 push @d, $_;
836 }
837
838 $_->($db, \@a, \@c, \@d)
839 for values %{ $LOCAL_MON{$_[0]} };
840 };
841
842 # incremental update
843 $NODE_REQ{g_chg2} = sub {
844 return unless $SRCNODE eq $MASTER;
845 my ($family, $set, $del) = @_;
846
847 my $db = $MON_DB{$family};
848
849 my (@a, @c);
850
851 while (my ($k, $v) = each %$set) {
852 exists $db->{$k}
853 ? push @c, $k
854 : push @a, $k;
855 $db->{$k} = $v;
856 }
857
858 delete @$db{@$del};
859
860 $_->($db, \@a, \@c, $del)
861 for values %{ $LOCAL_MON{$family} };
862 };
863
864 #############################################################################
865 # configure
866
867 sub nodename {
868 require POSIX;
869 (POSIX::uname ())[1]
870 }
871
872 sub _resolve($) {
873 my ($nodeid) = @_;
874
875 my $cv = AE::cv;
876 my @res;
877
878 $cv->begin (sub {
879 my %seen;
880 my @refs;
881 for (sort { $a->[0] <=> $b->[0] } @res) {
882 push @refs, $_->[1] unless $seen{$_->[1]}++
883 }
884 shift->send (@refs);
885 });
886
887 my $idx;
888 for my $t (split /,/, $nodeid) {
889 my $pri = ++$idx;
890
891 $t = length $t ? nodename . ":$t" : nodename
892 if $t =~ /^\d*$/;
893
894 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
895 or Carp::croak "$t: unparsable transport descriptor";
896
897 $port = "0" if $port eq "*";
898
899 if ($host eq "*") {
900 $cv->begin;
901 # use fork_call, as Net::Interface is big, and we need it rarely.
902 require AnyEvent::Util;
903 AnyEvent::Util::fork_call (
904 sub {
905 my @addr;
906
907 require Net::Interface;
908
909 for my $if (Net::Interface->interfaces) {
910 # we statically lower-prioritise ipv6 here, TODO :()
911 for $_ ($if->address (Net::Interface::AF_INET ())) {
912 next if /^\x7f/; # skip localhost etc.
913 push @addr, $_;
914 }
915 for ($if->address (Net::Interface::AF_INET6 ())) {
916 #next if $if->scope ($_) <= 2;
917 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
918 push @addr, $_;
919 }
920
921 }
922 @addr
923 }, sub {
924 for my $ip (@_) {
925 push @res, [
926 $pri += 1e-5,
927 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
928 ];
929 }
930 $cv->end;
931 }
932 );
933 } else {
934 $cv->begin;
935 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
936 for (@_) {
937 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
938 push @res, [
939 $pri += 1e-5,
940 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
941 ];
942 }
943 $cv->end;
944 };
945 }
946 }
947
948 $cv->end;
949
950 $cv
951 }
952
953 sub configure(@) {
954 unshift @_, "profile" if @_ & 1;
955 my (%kv) = @_;
956
957 delete $NODE{$NODE}; # we do not support doing stuff before configure
958 _init_names;
959
960 my $profile = delete $kv{profile};
961
962 $profile = nodename
963 unless defined $profile;
964
965 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
966
967 if (exists $CONFIG->{secure}) {
968 $SECURE = eval $CONFIG->{secure} ? "sub { 0 }" : "sub { 1 }";
969 }
970
971 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
972
973 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
974
975 $NODE = $node;
976
977 $NODE =~ s/%n/nodename/ge;
978
979 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
980 # nodes with randomised node names do not need randomised port names
981 $UNIQ = "";
982 }
983
984 $NODE{$NODE} = $NODE{""};
985 $NODE{$NODE}{id} = $NODE;
986
987 my $seeds = $CONFIG->{seeds};
988 my $binds = $CONFIG->{binds};
989
990 $binds ||= ["*"];
991
992 $WARN->(8, "node $NODE starting up.");
993
994 $BINDS = [];
995 %BINDS = ();
996
997 for (map _resolve $_, @$binds) {
998 for my $bind ($_->recv) {
999 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1000 or Carp::croak "$bind: unparsable local bind address";
1001
1002 my $listener = AnyEvent::MP::Transport::mp_server
1003 $host,
1004 $port,
1005 prepare => sub {
1006 my (undef, $host, $port) = @_;
1007 $bind = AnyEvent::Socket::format_hostport $host, $port;
1008 0
1009 },
1010 ;
1011 $BINDS{$bind} = $listener;
1012 push @$BINDS, $bind;
1013 }
1014 }
1015
1016 db_set "'l" => $NODE => $BINDS;
1017
1018 $WARN->(8, "node listens on [@$BINDS].");
1019
1020 # connect to all seednodes
1021 set_seeds map $_->recv, map _resolve $_, @$seeds;
1022
1023 master_search;
1024
1025 for (@{ $CONFIG->{services} }) {
1026 if (ref) {
1027 my ($func, @args) = @$_;
1028 (load_func $func)->(@args);
1029 } elsif (s/::$//) {
1030 eval "require $_";
1031 die $@ if $@;
1032 } else {
1033 (load_func $_)->();
1034 }
1035 }
1036 }
1037
1038 =back
1039
1040 =head1 SEE ALSO
1041
1042 L<AnyEvent::MP>.
1043
1044 =head1 AUTHOR
1045
1046 Marc Lehmann <schmorp@schmorp.de>
1047 http://home.schmorp.de/
1048
1049 =cut
1050
1051 1
1052