ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.96
Committed: Wed Mar 21 15:22:16 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.95: +19 -14 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29
30 use AnyEvent ();
31 use Guard ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT_OK = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 );
41
42 our @EXPORT = qw(
43 add_node load_func snd_to_func snd_on eval_on
44
45 NODE $NODE node_of snd kil port_is_local
46 configure
47 up_nodes mon_nodes node_is_up
48 db_set db_del db_reg
49 db_mon db_family db_keys db_values
50 );
51
52 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
53
54 This value is called with an error or warning message, when e.g. a
55 connection could not be created, authorisation failed and so on.
56
57 It I<must not> block or send messages -queue it and use an idle watcher if
58 you need to do any of these things.
59
60 C<$level> should be C<0> for messages to be logged always, C<1> for
61 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62 node connectivity and services, C<8> for debugging messages and C<9> for
63 tracing messages.
64
65 The default simply logs the message to STDERR.
66
67 =item @AnyEvent::MP::Kernel::WARN
68
69 All code references in this array are called for every log message, from
70 the default C<$WARN> handler. This is an easy way to tie into the log
71 messages without disturbing others.
72
73 =cut
74
75 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
76 our @WARN;
77 our $WARN = sub {
78 &$_ for @WARN;
79
80 return if $WARNLEVEL < $_[0];
81
82 my ($level, $msg) = @_;
83
84 $msg =~ s/\n$//;
85
86 printf STDERR "%s <%d> %s\n",
87 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
88 $level,
89 $msg;
90 };
91
92 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
93
94 The maximum level at which warning messages will be printed to STDERR by
95 the default warn handler.
96
97 =cut
98
99 sub load_func($) {
100 my $func = $_[0];
101
102 unless (defined &$func) {
103 my $pkg = $func;
104 do {
105 $pkg =~ s/::[^:]+$//
106 or return sub { die "unable to resolve function '$func'" };
107
108 local $@;
109 unless (eval "require $pkg; 1") {
110 my $error = $@;
111 $error =~ /^Can't locate .*.pm in \@INC \(/
112 or return sub { die $error };
113 }
114 } until defined &$func;
115 }
116
117 \&$func
118 }
119
120 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
121
122 sub nonce($) {
123 join "", map chr rand 256, 1 .. $_[0]
124 }
125
126 sub nonce62($) {
127 join "", map $alnum[rand 62], 1 .. $_[0]
128 }
129
130 sub gen_uniq {
131 my $now = AE::now;
132 (join "",
133 map $alnum[$_],
134 $$ / 62 % 62,
135 $$ % 62,
136 (int $now ) % 62,
137 (int $now * 100) % 62,
138 (int $now * 10000) % 62,
139 ) . nonce62 4;
140 }
141
142 our $CONFIG; # this node's configuration
143 our $SECURE = sub { 1 };
144
145 our $RUNIQ; # remote uniq value
146 our $UNIQ; # per-process/node unique cookie
147 our $NODE;
148 our $ID = "a";
149
150 our %NODE; # node id to transport mapping, or "undef", for local node
151 our (%PORT, %PORT_DATA); # local ports
152
153 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
154 our %LMON; # monitored _local_ ports
155
156 our $GLOBAL; # true if node is a global ("directory") node
157 our %BINDS;
158 our $BINDS; # our listeners, as arrayref
159
160 our $SRCNODE; # holds the sending node _object_ during _inject
161
162 sub _init_names {
163 # ~54 bits, for local port names, lowercase $ID appended
164 $UNIQ = gen_uniq;
165
166 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
167 $RUNIQ = nonce62 10;
168 $RUNIQ =~ s/(.)$/\U$1/;
169
170 $NODE = "anon/$RUNIQ";
171 }
172
173 _init_names;
174
175 sub NODE() {
176 $NODE
177 }
178
179 sub node_of($) {
180 my ($node, undef) = split /#/, $_[0], 2;
181
182 $node
183 }
184
185 BEGIN {
186 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
187 ? sub () { 1 }
188 : sub () { 0 };
189 }
190
191 our $DELAY_TIMER;
192 our @DELAY_QUEUE;
193
194 sub _delay_run {
195 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
196 }
197
198 sub delay($) {
199 push @DELAY_QUEUE, shift;
200 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
201 }
202
203 =item $AnyEvent::MP::Kernel::SRCNODE
204
205 During execution of a message callback, this variable contains the node ID
206 of the origin node.
207
208 The main use of this variable is for debugging output - there are probably
209 very few other cases where you need to know the source node ID.
210
211 =cut
212
213 sub _inject {
214 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
215 &{ $PORT{+shift} or return };
216 }
217
218 # this function adds a node-ref, so you can send stuff to it
219 # it is basically the central routing component.
220 sub add_node {
221 my ($node) = @_;
222
223 length $node
224 or Carp::croak "'undef' or the empty string are not valid node/port IDs";
225
226 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
227 }
228
229 sub snd(@) {
230 my ($nodeid, $portid) = split /#/, shift, 2;
231
232 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
233
234 ($NODE{$nodeid} || add_node $nodeid)
235 ->{send} (["$portid", @_]);
236 }
237
238 =item $is_local = port_is_local $port
239
240 Returns true iff the port is a local port.
241
242 =cut
243
244 sub port_is_local($) {
245 my ($nodeid, undef) = split /#/, $_[0], 2;
246
247 $NODE{$nodeid} == $NODE{""}
248 }
249
250 =item snd_to_func $node, $func, @args
251
252 Expects a node ID and a name of a function. Asynchronously tries to call
253 this function with the given arguments on that node.
254
255 This function can be used to implement C<spawn>-like interfaces.
256
257 =cut
258
259 sub snd_to_func($$;@) {
260 my $nodeid = shift;
261
262 # on $NODE, we artificially delay... (for spawn)
263 # this is very ugly - maybe we should simply delay ALL messages,
264 # to avoid deep recursion issues. but that's so... slow...
265 $AnyEvent::MP::Node::Self::DELAY = 1
266 if $nodeid ne $NODE;
267
268 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
269 }
270
271 =item snd_on $node, @msg
272
273 Executes C<snd> with the given C<@msg> (which must include the destination
274 port) on the given node.
275
276 =cut
277
278 sub snd_on($@) {
279 my $node = shift;
280 snd $node, snd => @_;
281 }
282
283 =item eval_on $node, $string[, @reply]
284
285 Evaluates the given string as Perl expression on the given node. When
286 @reply is specified, then it is used to construct a reply message with
287 C<"$@"> and any results from the eval appended.
288
289 =cut
290
291 sub eval_on($$;@) {
292 my $node = shift;
293 snd $node, eval => @_;
294 }
295
296 sub kil(@) {
297 my ($nodeid, $portid) = split /#/, shift, 2;
298
299 length $portid
300 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
301
302 ($NODE{$nodeid} || add_node $nodeid)
303 ->kill ("$portid", @_);
304 }
305
306 #############################################################################
307 # node monitoring and info
308
309 =item node_is_up $nodeid
310
311 Returns true if the given node is "up", that is, the kernel thinks it has
312 a working connection to it.
313
314 If the node is up, returns C<1>. If the node is currently connecting or
315 otherwise known but not connected, returns C<0>. If nothing is known about
316 the node, returns C<undef>.
317
318 =cut
319
320 sub node_is_up($) {
321 ($NODE{$_[0]} or return)->{transport}
322 ? 1 : 0
323 }
324
325 =item up_nodes
326
327 Return the node IDs of all nodes that are currently connected (excluding
328 the node itself).
329
330 =cut
331
332 sub up_nodes() {
333 map $_->{id}, grep $_->{transport}, values %NODE
334 }
335
336 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
337
338 Registers a callback that is called each time a node goes up (a connection
339 is established) or down (the connection is lost).
340
341 Node up messages can only be followed by node down messages for the same
342 node, and vice versa.
343
344 Note that monitoring a node is usually better done by monitoring its node
345 port. This function is mainly of interest to modules that are concerned
346 about the network topology and low-level connection handling.
347
348 Callbacks I<must not> block and I<should not> send any messages.
349
350 The function returns an optional guard which can be used to unregister
351 the monitoring callback again.
352
353 Example: make sure you call function C<newnode> for all nodes that are up
354 or go up (and down).
355
356 newnode $_, 1 for up_nodes;
357 mon_nodes \&newnode;
358
359 =cut
360
361 our %MON_NODES;
362
363 sub mon_nodes($) {
364 my ($cb) = @_;
365
366 $MON_NODES{$cb+0} = $cb;
367
368 defined wantarray
369 and Guard::guard { delete $MON_NODES{$cb+0} }
370 }
371
372 sub _inject_nodeevent($$;@) {
373 my ($node, $up, @reason) = @_;
374
375 for my $cb (values %MON_NODES) {
376 eval { $cb->($node->{id}, $up, @reason); 1 }
377 or $WARN->(1, $@);
378 }
379
380 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
381 }
382
383 #############################################################################
384 # self node code
385
386 sub _kill {
387 my $port = shift;
388
389 delete $PORT{$port}
390 or return; # killing nonexistent ports is O.K.
391 delete $PORT_DATA{$port};
392
393 my $mon = delete $LMON{$port}
394 or !@_
395 or $WARN->(2, "unmonitored local port $port died with reason: @_");
396
397 $_->(@_) for values %$mon;
398 }
399
400 sub _monitor {
401 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
402 unless exists $PORT{$_[1]};
403
404 $LMON{$_[1]}{$_[2]+0} = $_[2];
405 }
406
407 sub _unmonitor {
408 delete $LMON{$_[1]}{$_[2]+0}
409 if exists $LMON{$_[1]};
410 }
411
412 sub _secure_check {
413 &$SECURE
414 or die "remote execution attempt by insecure node\n";
415 }
416
417 our %NODE_REQ = (
418 # internal services
419
420 # monitoring
421 mon0 => sub { # stop monitoring a port for another node
422 my $portid = shift;
423 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
424 },
425 mon1 => sub { # start monitoring a port for another node
426 my $portid = shift;
427 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
428 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
429 delete $node->{rmon}{$portid};
430 $node->send (["", kil0 => $portid, @_])
431 if $node && $node->{transport};
432 };
433 },
434 # another node has killed a monitored port
435 kil0 => sub {
436 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
437 or return;
438
439 $_->(@_) for @$cbs;
440 },
441
442 # "public" services - not actually public
443
444 # another node wants to kill a local port
445 kil => \&_kill,
446
447 # relay message to another node / generic echo
448 snd => sub {
449 &_secure_check;
450 &snd
451 },
452
453 # random utilities
454 eval => sub {
455 &_secure_check;
456 my @res = do { package main; eval shift };
457 snd @_, "$@", @res if @_;
458 },
459 time => sub {
460 &_secure_check;
461 snd @_, AE::now;
462 },
463 devnull => sub {
464 #
465 },
466 "" => sub {
467 # empty messages are keepalives or similar devnull-applications
468 },
469 );
470
471 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
472 $PORT{""} = sub {
473 my $tag = shift;
474 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
475 $WARN->(2, "error processing node message from $SRCNODE: $@") if $@;
476 };
477
478 our $NPROTO = 1;
479
480 # tell everybody who connects our nproto
481 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
482 $_[0]{local_greeting}{nproto} = $NPROTO;
483 };
484
485 #############################################################################
486 # seed management, try to keep connections to all seeds at all times
487
488 our %SEED_NODE; # seed ID => node ID|undef
489 our %NODE_SEED; # map node ID to seed ID
490 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
491 our $SEED_WATCHER;
492 our $SEED_RETRY;
493
494 sub seed_connect {
495 my ($seed) = @_;
496
497 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
498 or Carp::croak "$seed: unparsable seed address";
499
500 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
501
502 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
503 $host, $port,
504 on_greeted => sub {
505 # called after receiving remote greeting, learn remote node name
506
507 # we rely on untrusted data here (the remote node name) this is
508 # hopefully ok, as this can at most be used for DOSing, which is easy
509 # when you can do MITM anyway.
510
511 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
512 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
513 require AnyEvent::MP::Global; # every seed becomes a global node currently
514 delete $SEED_NODE{$seed};
515 } else {
516 $SEED_NODE{$seed} = $_[0]{remote_node};
517 $NODE_SEED{$_[0]{remote_node}} = $seed;
518 # also start global service, if not running
519 # we need to check here in addition to the mon_nodes below
520 # because we might only learn late that a node is a seed
521 # and then we might already be connected
522 snd $_[0]{remote_node}, "g_slave"
523 unless $_[0]{remote_greeting}{global};
524 }
525 },
526 sub {
527 delete $SEED_CONNECT{$seed};
528 }
529 ;
530 }
531
532 sub seed_all {
533 my @seeds = grep
534 !exists $SEED_CONNECT{$_}
535 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
536 keys %SEED_NODE;
537
538 if (@seeds) {
539 # start connection attempt for every seed we are not connected to yet
540 seed_connect $_
541 for @seeds;
542
543 $SEED_RETRY = $SEED_RETRY * 2 + rand;
544 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
545 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
546
547 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
548
549 } else {
550 # all seeds connected or connecting, no need to restart timer
551 undef $SEED_WATCHER;
552 }
553 }
554
555 sub seed_again {
556 $SEED_RETRY = 1;
557 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
558 }
559
560 # sets new seed list, starts connecting
561 sub set_seeds(@) {
562 %SEED_NODE = ();
563 %NODE_SEED = ();
564 %SEED_CONNECT = ();
565
566 @SEED_NODE{@_} = ();
567
568 seed_all;
569 }
570
571 mon_nodes sub {
572 return unless exists $NODE_SEED{$_[0]};
573
574 if ($_[1]) {
575 # each time a connection to a seed node goes up, make
576 # sure it runs the global service.
577 snd $_[0], "g_slave"
578 unless $NODE{$_[0]}{transport}{remote_greeting}{global};
579 } else {
580 # if we lost the connection to a seed node, make sure we are seeding
581 seed_again;
582 }
583 };
584
585 #############################################################################
586 # talk with/to global nodes
587
588 # protocol messages:
589 #
590 # sent by all slave nodes (slave to master)
591 # g_slave database - make other global node master of the sender
592 #
593 # sent by any node to global nodes
594 # g_set database - set whole database
595 # g_upd family set del - update single family
596 # g_del family key - delete key from database
597 # g_get family key reply... - send reply with data
598 #
599 # send by global nodes
600 # g_global - node became global, similar to global=1 greeting
601 #
602 # database families
603 # "'l" -> node -> listeners
604 # "'g" -> node -> undef
605 # ...
606 #
607
608 # used on all nodes:
609 our $MASTER; # the global node we bind ourselves to
610 our $MASTER_MON;
611 our %LOCAL_DB; # this node database
612
613 our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
614
615 our $GPROTO = 1;
616
617 # tell everybody who connects our nproto
618 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
619 $_[0]{local_greeting}{gproto} = $GPROTO;
620 };
621
622 #############################################################################
623 # master selection
624
625 # master requests
626 our %GLOBAL_REQ; # $id => \@req
627
628 sub global_req_add {
629 my ($id, $req) = @_;
630
631 return if exists $GLOBAL_REQ{$id};
632
633 $GLOBAL_REQ{$id} = $req;
634
635 snd $MASTER, @$req
636 if $MASTER;
637 }
638
639 sub global_req_del {
640 delete $GLOBAL_REQ{$_[0]};
641 }
642
643 #################################
644 # master rpc
645
646 our %GLOBAL_RES;
647 our $GLOBAL_RES_ID = "a";
648
649 sub global_call {
650 my $id = ++$GLOBAL_RES_ID;
651 $GLOBAL_RES{$id} = pop;
652 global_req_add $id, [@_, $id];
653 }
654
655 $NODE_REQ{g_reply} = sub {
656 my $id = shift;
657 global_req_del $id;
658 my $cb = delete $GLOBAL_RES{$id}
659 or return;
660 &$cb
661 };
662
663 #################################
664
665 sub g_find {
666 global_req_add "g_find $_[0]", [g_find => $_[0]];
667 }
668
669 # reply for g_find started in Node.pm
670 $NODE_REQ{g_found} = sub {
671 global_req_del "g_find $_[0]";
672
673 my $node = $NODE{$_[0]} or return;
674
675 $node->connect_to ($_[1]);
676 };
677
678 sub master_set {
679 $MASTER = $_[0];
680
681 snd $MASTER, g_slave => \%LOCAL_DB;
682
683 # (re-)send queued requests
684 snd $MASTER, @$_
685 for values %GLOBAL_REQ;
686 }
687
688 sub master_search {
689 #TODO: should also look for other global nodes, but we don't know them #d#
690 for (keys %NODE_SEED) {
691 if (node_is_up $_) {
692 master_set $_;
693 return;
694 }
695 }
696
697 $MASTER_MON = mon_nodes sub {
698 return unless $_[1]; # we are only interested in node-ups
699 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
700
701 master_set $_[0];
702
703 $MASTER_MON = mon_nodes sub {
704 if ($_[0] eq $MASTER && !$_[1]) {
705 undef $MASTER;
706 master_search ();
707 }
708 };
709 };
710 }
711
712 # other node wants to make us the master
713 $NODE_REQ{g_slave} = sub {
714 my ($db) = @_;
715
716 # load global module and redo the request
717 require AnyEvent::MP::Global;
718 &{ $NODE_REQ{g_slave} }
719 };
720
721 #############################################################################
722 # local database operations
723
724 # local database management
725
726 sub db_set($$;$) {
727 # if (ref $_[1]) {
728 # # bulk
729 # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
730 # $LOCAL_DB{$_[0]} = $_[1];
731 # snd $MASTER, g_upd => $_[0] => $_[1], \@del
732 # if defined $MASTER;
733 # } else {
734 # single-key
735 $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
736 snd $MASTER, g_upd => $_[0] => { $_[1] => $_[2] }
737 if defined $MASTER;
738 # }
739 }
740
741 sub db_del($@) {
742 my $family = shift;
743
744 delete @{ $LOCAL_DB{$family} }{@_};
745 snd $MASTER, g_upd => $family => undef, \@_
746 if defined $MASTER;
747 }
748
749 sub db_reg($$;$) {
750 my ($family, $key) = @_;
751 &db_set;
752 Guard::guard { db_del $family => $key }
753 }
754
755 # database query
756
757 sub db_family {
758 my ($family, $cb) = @_;
759 global_call g_db_family => $family, $cb;
760 }
761
762 sub db_keys {
763 my ($family, $cb) = @_;
764 global_call g_db_keys => $family, $cb;
765 }
766
767 sub db_values {
768 my ($family, $cb) = @_;
769 global_call g_db_values => $family, $cb;
770 }
771
772 # database monitoring
773
774 our %LOCAL_MON; # f, reply
775 our %MON_DB; # f, k, value
776
777 sub db_mon($@) {
778 my ($family, $cb) = @_;
779
780 if (my $db = $MON_DB{$family}) {
781 # we already monitor, so create a "dummy" change event
782 # this is postponed, which might be too late (we could process
783 # change events), so disable the callback at first
784 $LOCAL_MON{$family}{$cb+0} = sub { };
785 AE::postpone {
786 return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
787
788 # set actual callback
789 $LOCAL_MON{$family}{$cb+0} = $cb;
790 $cb->($db, [keys %$db]);
791 };
792 } else {
793 # new monitor, request chg1 from upstream
794 $LOCAL_MON{$family}{$cb+0} = $cb;
795 global_req_add "mon1 $family" => [g_mon1 => $family];
796 $MON_DB{$family} = {};
797 }
798
799 defined wantarray
800 and Guard::guard {
801 my $mon = $LOCAL_MON{$family};
802 delete $mon->{$cb+0};
803
804 unless (%$mon) {
805 global_req_del "mon1 $family";
806
807 # no global_req, because we don't care if we are not connected
808 snd $MASTER, g_mon0 => $family
809 if $MASTER;
810
811 delete $LOCAL_MON{$family};
812 delete $MON_DB{$family};
813 }
814 }
815 }
816
817 # full update
818 $NODE_REQ{g_chg1} = sub {
819 return unless $SRCNODE eq $MASTER;
820 my ($f, $ndb) = @_;
821
822 my $db = $MON_DB{$f};
823 my (@a, @c, @d);
824
825 # add or replace keys
826 while (my ($k, $v) = each %$ndb) {
827 exists $db->{$k}
828 ? push @c, $k
829 : push @a, $k;
830 $db->{$k} = $v;
831 }
832
833 # delete keys that are no longer present
834 for (grep !exists $ndb->{$_}, keys %$db) {
835 delete $db->{$_};
836 push @d, $_;
837 }
838
839 $_->($db, \@a, \@c, \@d)
840 for values %{ $LOCAL_MON{$_[0]} };
841 };
842
843 # incremental update
844 $NODE_REQ{g_chg2} = sub {
845 return unless $SRCNODE eq $MASTER;
846 my ($family, $set, $del) = @_;
847
848 my $db = $MON_DB{$family};
849
850 my (@a, @c);
851
852 while (my ($k, $v) = each %$set) {
853 exists $db->{$k}
854 ? push @c, $k
855 : push @a, $k;
856 $db->{$k} = $v;
857 }
858
859 delete @$db{@$del};
860
861 $_->($db, \@a, \@c, $del)
862 for values %{ $LOCAL_MON{$family} };
863 };
864
865 #############################################################################
866 # configure
867
868 sub nodename {
869 require POSIX;
870 (POSIX::uname ())[1]
871 }
872
873 sub _resolve($) {
874 my ($nodeid) = @_;
875
876 my $cv = AE::cv;
877 my @res;
878
879 $cv->begin (sub {
880 my %seen;
881 my @refs;
882 for (sort { $a->[0] <=> $b->[0] } @res) {
883 push @refs, $_->[1] unless $seen{$_->[1]}++
884 }
885 shift->send (@refs);
886 });
887
888 my $idx;
889 for my $t (split /,/, $nodeid) {
890 my $pri = ++$idx;
891
892 $t = length $t ? nodename . ":$t" : nodename
893 if $t =~ /^\d*$/;
894
895 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
896 or Carp::croak "$t: unparsable transport descriptor";
897
898 $port = "0" if $port eq "*";
899
900 if ($host eq "*") {
901 $cv->begin;
902 # use fork_call, as Net::Interface is big, and we need it rarely.
903 require AnyEvent::Util;
904 AnyEvent::Util::fork_call (
905 sub {
906 my @addr;
907
908 require Net::Interface;
909
910 for my $if (Net::Interface->interfaces) {
911 # we statically lower-prioritise ipv6 here, TODO :()
912 for $_ ($if->address (Net::Interface::AF_INET ())) {
913 next if /^\x7f/; # skip localhost etc.
914 push @addr, $_;
915 }
916 for ($if->address (Net::Interface::AF_INET6 ())) {
917 #next if $if->scope ($_) <= 2;
918 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
919 push @addr, $_;
920 }
921
922 }
923 @addr
924 }, sub {
925 for my $ip (@_) {
926 push @res, [
927 $pri += 1e-5,
928 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
929 ];
930 }
931 $cv->end;
932 }
933 );
934 } else {
935 $cv->begin;
936 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
937 for (@_) {
938 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
939 push @res, [
940 $pri += 1e-5,
941 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
942 ];
943 }
944 $cv->end;
945 };
946 }
947 }
948
949 $cv->end;
950
951 $cv
952 }
953
954 sub configure(@) {
955 unshift @_, "profile" if @_ & 1;
956 my (%kv) = @_;
957
958 delete $NODE{$NODE}; # we do not support doing stuff before configure
959 _init_names;
960
961 my $profile = delete $kv{profile};
962
963 $profile = nodename
964 unless defined $profile;
965
966 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
967
968 if (exists $CONFIG->{secure}) {
969 $SECURE = eval $CONFIG->{secure} ? "sub { 0 }" : "sub { 1 }";
970 }
971
972 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
973
974 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
975
976 $NODE = $node;
977
978 $NODE =~ s/%n/nodename/ge;
979
980 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
981 # nodes with randomised node names do not need randomised port names
982 $UNIQ = "";
983 }
984
985 $NODE{$NODE} = $NODE{""};
986 $NODE{$NODE}{id} = $NODE;
987
988 my $seeds = $CONFIG->{seeds};
989 my $binds = $CONFIG->{binds};
990
991 $binds ||= ["*"];
992
993 $WARN->(8, "node $NODE starting up.");
994
995 $BINDS = [];
996 %BINDS = ();
997
998 for (map _resolve $_, @$binds) {
999 for my $bind ($_->recv) {
1000 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1001 or Carp::croak "$bind: unparsable local bind address";
1002
1003 my $listener = AnyEvent::MP::Transport::mp_server
1004 $host,
1005 $port,
1006 prepare => sub {
1007 my (undef, $host, $port) = @_;
1008 $bind = AnyEvent::Socket::format_hostport $host, $port;
1009 0
1010 },
1011 ;
1012 $BINDS{$bind} = $listener;
1013 push @$BINDS, $bind;
1014 }
1015 }
1016
1017 db_set "'l" => $NODE => $BINDS;
1018
1019 $WARN->(8, "node listens on [@$BINDS].");
1020
1021 # connect to all seednodes
1022 set_seeds map $_->recv, map _resolve $_, @$seeds;
1023
1024 master_search;
1025
1026 for (@{ $CONFIG->{services} }) {
1027 if (ref) {
1028 my ($func, @args) = @$_;
1029 (load_func $func)->(@args);
1030 } elsif (s/::$//) {
1031 eval "require $_";
1032 die $@ if $@;
1033 } else {
1034 (load_func $_)->();
1035 }
1036 }
1037 }
1038
1039 =back
1040
1041 =head1 SEE ALSO
1042
1043 L<AnyEvent::MP>.
1044
1045 =head1 AUTHOR
1046
1047 Marc Lehmann <schmorp@schmorp.de>
1048 http://home.schmorp.de/
1049
1050 =cut
1051
1052 1
1053