ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.90
Committed: Sat Mar 10 20:34:11 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.89: +16 -14 lines
Log Message:
cmt

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29
30 use AnyEvent ();
31 use Guard ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT_OK = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 );
41
42 our @EXPORT = qw(
43 add_node load_func snd_to_func snd_on eval_on
44
45 NODE $NODE node_of snd kil port_is_local
46 configure
47 up_nodes mon_nodes node_is_up
48 db_set db_del db_reg
49 db_mon db_family db_keys db_values
50 );
51
52 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
53
54 This value is called with an error or warning message, when e.g. a
55 connection could not be created, authorisation failed and so on.
56
57 It I<must not> block or send messages -queue it and use an idle watcher if
58 you need to do any of these things.
59
60 C<$level> should be C<0> for messages to be logged always, C<1> for
61 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62 node connectivity and services, C<8> for debugging messages and C<9> for
63 tracing messages.
64
65 The default simply logs the message to STDERR.
66
67 =item @AnyEvent::MP::Kernel::WARN
68
69 All code references in this array are called for every log message, from
70 the default C<$WARN> handler. This is an easy way to tie into the log
71 messages without disturbing others.
72
73 =cut
74
75 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
76 our @WARN;
77 our $WARN = sub {
78 &$_ for @WARN;
79
80 return if $WARNLEVEL < $_[0];
81
82 my ($level, $msg) = @_;
83
84 $msg =~ s/\n$//;
85
86 printf STDERR "%s <%d> %s\n",
87 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
88 $level,
89 $msg;
90 };
91
92 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
93
94 The maximum level at which warning messages will be printed to STDERR by
95 the default warn handler.
96
97 =cut
98
99 sub load_func($) {
100 my $func = $_[0];
101
102 unless (defined &$func) {
103 my $pkg = $func;
104 do {
105 $pkg =~ s/::[^:]+$//
106 or return sub { die "unable to resolve function '$func'" };
107
108 local $@;
109 unless (eval "require $pkg; 1") {
110 my $error = $@;
111 $error =~ /^Can't locate .*.pm in \@INC \(/
112 or return sub { die $error };
113 }
114 } until defined &$func;
115 }
116
117 \&$func
118 }
119
120 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
121
122 sub nonce($) {
123 join "", map chr rand 256, 1 .. $_[0]
124 }
125
126 sub nonce62($) {
127 join "", map $alnum[rand 62], 1 .. $_[0]
128 }
129
130 sub gen_uniq {
131 my $now = AE::now;
132 (join "",
133 map $alnum[$_],
134 $$ / 62 % 62,
135 $$ % 62,
136 (int $now ) % 62,
137 (int $now * 100) % 62,
138 (int $now * 10000) % 62,
139 ) . nonce62 4;
140 }
141
142 our $CONFIG; # this node's configuration
143 our $SECURE = sub { 1 };
144
145 our $RUNIQ; # remote uniq value
146 our $UNIQ; # per-process/node unique cookie
147 our $NODE;
148 our $ID = "a";
149
150 our %NODE; # node id to transport mapping, or "undef", for local node
151 our (%PORT, %PORT_DATA); # local ports
152
153 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
154 our %LMON; # monitored _local_ ports
155
156 our $GLOBAL; # true if node is a global ("directory") node
157 our %BINDS;
158 our $BINDS; # our listeners, as arrayref
159
160 our $SRCNODE; # holds the sending node _object_ during _inject
161
162 sub _init_names {
163 # ~54 bits, for local port names, lowercase $ID appended
164 $UNIQ = gen_uniq;
165
166 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
167 $RUNIQ = nonce62 10;
168 $RUNIQ =~ s/(.)$/\U$1/;
169
170 $NODE = "anon/$RUNIQ";
171 }
172
173 _init_names;
174
175 sub NODE() {
176 $NODE
177 }
178
179 sub node_of($) {
180 my ($node, undef) = split /#/, $_[0], 2;
181
182 $node
183 }
184
185 BEGIN {
186 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
187 ? sub () { 1 }
188 : sub () { 0 };
189 }
190
191 our $DELAY_TIMER;
192 our @DELAY_QUEUE;
193
194 sub _delay_run {
195 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
196 }
197
198 sub delay($) {
199 push @DELAY_QUEUE, shift;
200 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
201 }
202
203 sub _inject {
204 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
205 &{ $PORT{+shift} or return };
206 }
207
208 # this function adds a node-ref, so you can send stuff to it
209 # it is basically the central routing component.
210 sub add_node {
211 my ($node) = @_;
212
213 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
214 }
215
216 sub snd(@) {
217 my ($nodeid, $portid) = split /#/, shift, 2;
218
219 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
220
221 defined $nodeid #d#UGLY
222 or Carp::croak "'undef' is not a valid node ID/port ID";
223
224 ($NODE{$nodeid} || add_node $nodeid)
225 ->{send} (["$portid", @_]);
226 }
227
228 =item $is_local = port_is_local $port
229
230 Returns true iff the port is a local port.
231
232 =cut
233
234 sub port_is_local($) {
235 my ($nodeid, undef) = split /#/, $_[0], 2;
236
237 $NODE{$nodeid} == $NODE{""}
238 }
239
240 =item snd_to_func $node, $func, @args
241
242 Expects a node ID and a name of a function. Asynchronously tries to call
243 this function with the given arguments on that node.
244
245 This function can be used to implement C<spawn>-like interfaces.
246
247 =cut
248
249 sub snd_to_func($$;@) {
250 my $nodeid = shift;
251
252 # on $NODE, we artificially delay... (for spawn)
253 # this is very ugly - maybe we should simply delay ALL messages,
254 # to avoid deep recursion issues. but that's so... slow...
255 $AnyEvent::MP::Node::Self::DELAY = 1
256 if $nodeid ne $NODE;
257
258 defined $nodeid #d#UGLY
259 or Carp::croak "'undef' is not a valid node ID/port ID";
260
261 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
262 }
263
264 =item snd_on $node, @msg
265
266 Executes C<snd> with the given C<@msg> (which must include the destination
267 port) on the given node.
268
269 =cut
270
271 sub snd_on($@) {
272 my $node = shift;
273 snd $node, snd => @_;
274 }
275
276 =item eval_on $node, $string[, @reply]
277
278 Evaluates the given string as Perl expression on the given node. When
279 @reply is specified, then it is used to construct a reply message with
280 C<"$@"> and any results from the eval appended.
281
282 =cut
283
284 sub eval_on($$;@) {
285 my $node = shift;
286 snd $node, eval => @_;
287 }
288
289 sub kil(@) {
290 my ($nodeid, $portid) = split /#/, shift, 2;
291
292 length $portid
293 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
294
295 ($NODE{$nodeid} || add_node $nodeid)
296 ->kill ("$portid", @_);
297 }
298
299 #############################################################################
300 # node monitoring and info
301
302 =item node_is_known $nodeid
303
304 #TODO#
305 Returns true iff the given node is currently known to this node.
306
307 =cut
308
309 sub node_is_known($) {
310 exists $NODE{$_[0]}
311 }
312
313 =item node_is_up $nodeid
314
315 Returns true if the given node is "up", that is, the kernel thinks it has
316 a working connection to it.
317
318 If the node is known (to this local node) but not currently connected,
319 returns C<0>. If the node is not known, returns C<undef>.
320
321 =cut
322
323 sub node_is_up($) {
324 ($NODE{$_[0]} or return)->{transport}
325 ? 1 : 0
326 }
327
328 =item up_nodes
329
330 Return the node IDs of all nodes that are currently connected (excluding
331 the node itself).
332
333 =cut
334
335 sub up_nodes() {
336 map $_->{id}, grep $_->{transport}, values %NODE
337 }
338
339 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
340
341 Registers a callback that is called each time a node goes up (a connection
342 is established) or down (the connection is lost).
343
344 Node up messages can only be followed by node down messages for the same
345 node, and vice versa.
346
347 Note that monitoring a node is usually better done by monitoring its node
348 port. This function is mainly of interest to modules that are concerned
349 about the network topology and low-level connection handling.
350
351 Callbacks I<must not> block and I<should not> send any messages.
352
353 The function returns an optional guard which can be used to unregister
354 the monitoring callback again.
355
356 Example: make sure you call function C<newnode> for all nodes that are up
357 or go up (and down).
358
359 newnode $_, 1 for up_nodes;
360 mon_nodes \&newnode;
361
362 =cut
363
364 our %MON_NODES;
365
366 sub mon_nodes($) {
367 my ($cb) = @_;
368
369 $MON_NODES{$cb+0} = $cb;
370
371 defined wantarray
372 and Guard::guard { delete $MON_NODES{$cb+0} }
373 }
374
375 sub _inject_nodeevent($$;@) {
376 my ($node, $up, @reason) = @_;
377
378 for my $cb (values %MON_NODES) {
379 eval { $cb->($node->{id}, $up, @reason); 1 }
380 or $WARN->(1, $@);
381 }
382
383 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
384 }
385
386 #############################################################################
387 # self node code
388
389 sub _kill {
390 my $port = shift;
391
392 delete $PORT{$port}
393 or return; # killing nonexistent ports is O.K.
394 delete $PORT_DATA{$port};
395
396 my $mon = delete $LMON{$port}
397 or !@_
398 or $WARN->(2, "unmonitored local port $port died with reason: @_");
399
400 $_->(@_) for values %$mon;
401 }
402
403 sub _monitor {
404 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
405 unless exists $PORT{$_[1]};
406
407 $LMON{$_[1]}{$_[2]+0} = $_[2];
408 }
409
410 sub _unmonitor {
411 delete $LMON{$_[1]}{$_[2]+0}
412 if exists $LMON{$_[1]};
413 }
414
415 sub _secure_check {
416 $SECURE->($SRCNODE->{id})
417 or $SRCNODE->{id} eq $NODE
418 or die "remote execution attempt by insecure node\n";
419 }
420
421 our %NODE_REQ = (
422 # internal services
423
424 # monitoring
425 mon0 => sub { # stop monitoring a port for another node
426 my $portid = shift;
427 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
428 },
429 mon1 => sub { # start monitoring a port for another node
430 my $portid = shift;
431 Scalar::Util::weaken (my $node = $SRCNODE);
432 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
433 delete $node->{rmon}{$portid};
434 $node->send (["", kil0 => $portid, @_])
435 if $node && $node->{transport};
436 };
437 },
438 # another node has killed a monitored port
439 kil0 => sub {
440 my $cbs = delete $SRCNODE->{lmon}{+shift}
441 or return;
442
443 $_->(@_) for @$cbs;
444 },
445
446 # "public" services - not actually public
447
448 # another node wants to kill a local port
449 kil => \&_kill,
450
451 # is the remote node considered secure?
452 # secure => sub {
453 # #TODO#
454 # },
455
456 # relay message to another node / generic echo
457 snd => sub {
458 &_secure_check;
459 &snd
460 },
461
462 # random utilities
463 eval => sub {
464 &_secure_check;
465 my @res = do { package main; eval shift };
466 snd @_, "$@", @res if @_;
467 },
468 time => sub {
469 &_secure_check;
470 snd @_, AE::now;
471 },
472 devnull => sub {
473 #
474 },
475 "" => sub {
476 # empty messages are keepalives or similar devnull-applications
477 },
478 );
479
480 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
481 $PORT{""} = sub {
482 my $tag = shift;
483 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
484 $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@;
485 };
486
487 our $NPROTO = 1;
488
489 # tell everybody who connects our nproto
490 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
491 $_[0]{local_greeting}{nproto} = $NPROTO;
492 };
493
494 #############################################################################
495 # seed management, try to keep connections to all seeds at all times
496
497 our %SEED_NODE; # seed ID => node ID|undef
498 our %NODE_SEED; # map node ID to seed ID
499 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
500 our $SEED_WATCHER;
501 our $SEED_RETRY;
502
503 sub seed_connect {
504 my ($seed) = @_;
505
506 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
507 or Carp::croak "$seed: unparsable seed address";
508
509 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
510
511 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
512 $host, $port,
513 on_greeted => sub {
514 # called after receiving remote greeting, learn remote node name
515
516 # we rely on untrusted data here (the remote node name) this is
517 # hopefully ok, as this can at most be used for DOSing, which is easy
518 # when you can do MITM anyway.
519
520 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
521 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
522 require AnyEvent::MP::Global; # every seed becomes a global node currently
523 delete $SEED_NODE{$seed};
524 delete $NODE_SEED{$seed};
525 } else {
526 $SEED_NODE{$seed} = $_[0]{remote_node};
527 $NODE_SEED{$_[0]{remote_node}} = $seed;
528 }
529 },
530 on_destroy => sub {
531 delete $SEED_CONNECT{$seed};
532 },
533 sub {
534 $SEED_CONNECT{$seed} = 1;
535 }
536 ;
537 }
538
539 sub seed_all {
540 my @seeds = grep {
541 !exists $SEED_CONNECT{$_}
542 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
543 } keys %SEED_NODE;
544
545 if (@seeds) {
546 # start connection attempt for every seed we are not connected to yet
547 seed_connect $_
548 for @seeds;
549
550 $SEED_RETRY = $SEED_RETRY * 2 + rand;
551 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
552 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
553
554 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
555
556 } else {
557 # all seeds connected or connecting, no need to restart timer
558 undef $SEED_WATCHER;
559 }
560 }
561
562 sub seed_again {
563 $SEED_RETRY = 1;
564 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
565 }
566
567 # sets new seed list, starts connecting
568 sub set_seeds(@) {
569 %SEED_NODE = ();
570 %NODE_SEED = ();
571 %SEED_CONNECT = ();
572
573 @SEED_NODE{@_} = ();
574
575 seed_again;#d#
576 seed_all;
577 }
578
579 mon_nodes sub {
580 # if we lost the connection to a seed node, make sure we are seeding
581 seed_again
582 if !$_[1] && exists $NODE_SEED{$_[0]};
583 };
584
585 #############################################################################
586 # talk with/to global nodes
587
588 # protocol messages:
589 #
590 # sent by all slave nodes (slave to master)
591 # g_slave database - make other global node master of the sender
592 #
593 # sent by any node to global nodes
594 # g_set database - set whole database
595 # g_upd family set del - update single family
596 # g_del family key - delete key from database
597 # g_get family key reply... - send reply with data
598 #
599 # send by global nodes
600 # g_global - node became global, similar to global=1 greeting
601 #
602 # database families
603 # "'l" -> node -> listeners
604 # "'g" -> node -> undef
605 # ...
606 #
607
608 # used on all nodes:
609 our $MASTER; # the global node we bind ourselves to
610 our $MASTER_MON;
611 our %LOCAL_DB; # this node database
612
613 our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
614
615 our $GPROTO = 1;
616
617 # tell everybody who connects our nproto
618 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
619 $_[0]{local_greeting}{gproto} = $GPROTO;
620 };
621
622 #############################################################################
623 # master selection
624
625 # master requests
626 our %GLOBAL_REQ; # $id => \@req
627
628 sub global_req_add {
629 my ($id, $req) = @_;
630
631 return if exists $GLOBAL_REQ{$id};
632
633 $GLOBAL_REQ{$id} = $req;
634
635 snd $MASTER, @$req
636 if $MASTER;
637 }
638
639 sub global_req_del {
640 delete $GLOBAL_REQ{$_[0]};
641 }
642
643 #################################
644 # master rpc
645
646 our %GLOBAL_RES;
647 our $GLOBAL_RES_ID = "a";
648
649 sub global_call {
650 my $id = ++$GLOBAL_RES_ID;
651 $GLOBAL_RES{$id} = pop;
652 global_req_add $id, [@_, $id];
653 }
654
655 $NODE_REQ{g_reply} = sub {
656 my $id = shift;
657 global_req_del $id;
658 my $cb = delete $GLOBAL_RES{$id}
659 or return;
660 &$cb
661 };
662
663 #################################
664
665 sub g_find {
666 global_req_add "g_find $_[0]", [g_find => $_[0]];
667 }
668
669 # reply for g_find started in Node.pm
670 $NODE_REQ{g_found} = sub {
671 global_req_del "g_find $_[0]";
672
673 my $node = $NODE{$_[0]} or return;
674
675 $node->connect_to ($_[1]);
676 };
677
678 sub master_set {
679 $MASTER = $_[0];
680
681 snd $MASTER, g_slave => \%LOCAL_DB;
682
683 # (re-)send queued requests
684 snd $MASTER, @$_
685 for values %GLOBAL_REQ;
686 }
687
688 sub master_search {
689 #TODO: should also look for other global nodes, but we don't know them #d#
690 for (keys %NODE_SEED) {
691 if (node_is_up $_) {
692 master_set $_;
693 return;
694 }
695 }
696
697 $MASTER_MON = mon_nodes sub {
698 return unless $_[1]; # we are only interested in node-ups
699 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
700
701 master_set $_[0];
702
703 $MASTER_MON = mon_nodes sub {
704 if ($_[0] eq $MASTER && !$_[1]) {
705 undef $MASTER;
706 master_search ();
707 }
708 };
709 };
710 }
711
712 # other node wants to make us the master
713 $NODE_REQ{g_slave} = sub {
714 my ($db) = @_;
715
716 # load global module and redo the request
717 require AnyEvent::MP::Global;
718 &{ $NODE_REQ{g_slave} }
719 };
720
721 #############################################################################
722 # local database operations
723
724 # local database management
725
726 sub db_set($$;$) {
727 # if (ref $_[1]) {
728 # # bulk
729 # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
730 # $LOCAL_DB{$_[0]} = $_[1];
731 # snd $MASTER, g_upd => $_[0] => $_[1], \@del
732 # if defined $MASTER;
733 # } else {
734 # single-key
735 $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
736 snd $MASTER, g_upd => $_[0] => { $_[1] => $_[2] }
737 if defined $MASTER;
738 # }
739 }
740
741 sub db_del($@) {
742 my $family = shift;
743
744 delete @{ $LOCAL_DB{$family} }{@_};
745 snd $MASTER, g_upd => $family => undef, \@_
746 if defined $MASTER;
747 }
748
749 sub db_reg($$;$) {
750 my ($family, $key) = @_;
751 &db_set;
752 Guard::guard { db_del $family => $key }
753 }
754
755 # database query
756
757 sub db_family {
758 my ($family, $cb) = @_;
759 global_call g_db_family => $family, $cb;
760 }
761
762 sub db_keys {
763 my ($family, $cb) = @_;
764 global_call g_db_keys => $family, $cb;
765 }
766
767 sub db_values {
768 my ($family, $cb) = @_;
769 global_call g_db_values => $family, $cb;
770 }
771
772 # database monitoring
773
774 our %LOCAL_MON; # f, reply
775 our %MON_DB; # f, k, value
776
777 sub db_mon($@) {
778 my ($family, $cb) = @_;
779
780 if (my $db = $MON_DB{$family}) {
781 # we already monitor, so create a "dummy" change event
782 # this is postponed, which might be too late (we could process
783 # change events), so disable the callback at first
784 $LOCAL_MON{$family}{$cb+0} = sub { };
785 AE::postpone {
786 return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
787
788 # set actual callback
789 $LOCAL_MON{$family}{$cb+0} = $cb;
790 $cb->($db, [keys %$db]);
791 };
792 } else {
793 # new monitor, request chg1 from upstream
794 $LOCAL_MON{$family}{$cb+0} = $cb;
795 global_req_add "mon1 $family" => [g_mon1 => $family];
796 $MON_DB{$family} = {};
797 }
798
799 defined wantarray
800 and Guard::guard {
801 my $mon = $LOCAL_MON{$family};
802 delete $mon->{$cb+0};
803
804 unless (%$mon) {
805 global_req_del "mon1 $family";
806
807 # no global_req, because we don't care if we are not connected
808 snd $MASTER, g_mon0 => $family
809 if $MASTER;
810
811 delete $LOCAL_MON{$family};
812 delete $MON_DB{$family};
813 }
814 }
815 }
816
817 # full update
818 $NODE_REQ{g_chg1} = sub {
819 my ($f, $ndb) = @_;
820
821 my $db = $MON_DB{$f};
822 my (@a, @c, @d);
823
824 # add or replace keys
825 while (my ($k, $v) = each %$ndb) {
826 exists $db->{$k}
827 ? push @c, $k
828 : push @a, $k;
829 $db->{$k} = $v;
830 }
831
832 # delete keys that are no longer present
833 for (grep !exists $ndb->{$_}, keys %$db) {
834 delete $db->{$_};
835 push @d, $_;
836 }
837
838 $_->($db, \@a, \@c, \@d)
839 for values %{ $LOCAL_MON{$_[0]} };
840 };
841
842 # incremental update
843 $NODE_REQ{g_chg2} = sub {
844 my ($family, $set, $del) = @_;
845
846 my $db = $MON_DB{$family};
847
848 my (@a, @c);
849
850 while (my ($k, $v) = each %$set) {
851 exists $db->{$k}
852 ? push @c, $k
853 : push @a, $k;
854 $db->{$k} = $v;
855 }
856
857 delete @$db{@$del};
858
859 $_->($db, \@a, \@c, $del)
860 for values %{ $LOCAL_MON{$family} };
861 };
862
863 #############################################################################
864 # configure
865
866 sub nodename {
867 require POSIX;
868 (POSIX::uname ())[1]
869 }
870
871 sub _resolve($) {
872 my ($nodeid) = @_;
873
874 my $cv = AE::cv;
875 my @res;
876
877 $cv->begin (sub {
878 my %seen;
879 my @refs;
880 for (sort { $a->[0] <=> $b->[0] } @res) {
881 push @refs, $_->[1] unless $seen{$_->[1]}++
882 }
883 shift->send (@refs);
884 });
885
886 my $idx;
887 for my $t (split /,/, $nodeid) {
888 my $pri = ++$idx;
889
890 $t = length $t ? nodename . ":$t" : nodename
891 if $t =~ /^\d*$/;
892
893 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
894 or Carp::croak "$t: unparsable transport descriptor";
895
896 $port = "0" if $port eq "*";
897
898 if ($host eq "*") {
899 $cv->begin;
900 # use fork_call, as Net::Interface is big, and we need it rarely.
901 require AnyEvent::Util;
902 AnyEvent::Util::fork_call (
903 sub {
904 my @addr;
905
906 require Net::Interface;
907
908 for my $if (Net::Interface->interfaces) {
909 # we statically lower-prioritise ipv6 here, TODO :()
910 for $_ ($if->address (Net::Interface::AF_INET ())) {
911 next if /^\x7f/; # skip localhost etc.
912 push @addr, $_;
913 }
914 for ($if->address (Net::Interface::AF_INET6 ())) {
915 #next if $if->scope ($_) <= 2;
916 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
917 push @addr, $_;
918 }
919
920 }
921 @addr
922 }, sub {
923 for my $ip (@_) {
924 push @res, [
925 $pri += 1e-5,
926 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
927 ];
928 }
929 $cv->end;
930 }
931 );
932 } else {
933 $cv->begin;
934 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
935 for (@_) {
936 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
937 push @res, [
938 $pri += 1e-5,
939 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
940 ];
941 }
942 $cv->end;
943 };
944 }
945 }
946
947 $cv->end;
948
949 $cv
950 }
951
952 sub configure(@) {
953 unshift @_, "profile" if @_ & 1;
954 my (%kv) = @_;
955
956 delete $NODE{$NODE}; # we do not support doing stuff before configure
957 _init_names;
958
959 my $profile = delete $kv{profile};
960
961 $profile = nodename
962 unless defined $profile;
963
964 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
965
966 if (exists $CONFIG->{secure}) {
967 my $pass = !$CONFIG->{secure};
968 $SECURE = sub { $pass };
969 }
970
971 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
972
973 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
974
975 $NODE = $node;
976
977 $NODE =~ s/%n/nodename/ge;
978
979 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
980 # nodes with randomised node names do not need randomised port names
981 $UNIQ = "";
982 }
983
984 $NODE{$NODE} = $NODE{""};
985 $NODE{$NODE}{id} = $NODE;
986
987 my $seeds = $CONFIG->{seeds};
988 my $binds = $CONFIG->{binds};
989
990 $binds ||= ["*"];
991
992 $WARN->(8, "node $NODE starting up.");
993
994 $BINDS = [];
995 %BINDS = ();
996
997 for (map _resolve $_, @$binds) {
998 for my $bind ($_->recv) {
999 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1000 or Carp::croak "$bind: unparsable local bind address";
1001
1002 my $listener = AnyEvent::MP::Transport::mp_server
1003 $host,
1004 $port,
1005 prepare => sub {
1006 my (undef, $host, $port) = @_;
1007 $bind = AnyEvent::Socket::format_hostport $host, $port;
1008 0
1009 },
1010 ;
1011 $BINDS{$bind} = $listener;
1012 push @$BINDS, $bind;
1013 }
1014 }
1015
1016 db_set "'l" => $NODE => $BINDS;
1017
1018 $WARN->(8, "node listens on [@$BINDS].");
1019
1020 # connect to all seednodes
1021 set_seeds map $_->recv, map _resolve $_, @$seeds;
1022
1023 master_search;
1024
1025 if ($NODE eq "atha") {;#d#
1026 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
1027 }
1028
1029 for (@{ $CONFIG->{services} }) {
1030 if (ref) {
1031 my ($func, @args) = @$_;
1032 (load_func $func)->(@args);
1033 } elsif (s/::$//) {
1034 eval "require $_";
1035 die $@ if $@;
1036 } else {
1037 (load_func $_)->();
1038 }
1039 }
1040 }
1041
1042 =back
1043
1044 =head1 SEE ALSO
1045
1046 L<AnyEvent::MP>.
1047
1048 =head1 AUTHOR
1049
1050 Marc Lehmann <schmorp@schmorp.de>
1051 http://home.schmorp.de/
1052
1053 =cut
1054
1055 1
1056