ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP/Kernel.pm
Revision: 1.92
Committed: Wed Mar 14 22:59:58 2012 UTC (12 years, 4 months ago) by root
Branch: MAIN
Changes since 1.91: +12 -4 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29
30 use AnyEvent ();
31 use Guard ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT_OK = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 );
41
42 our @EXPORT = qw(
43 add_node load_func snd_to_func snd_on eval_on
44
45 NODE $NODE node_of snd kil port_is_local
46 configure
47 up_nodes mon_nodes node_is_up
48 db_set db_del db_reg
49 db_mon db_family db_keys db_values
50 );
51
52 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
53
54 This value is called with an error or warning message, when e.g. a
55 connection could not be created, authorisation failed and so on.
56
57 It I<must not> block or send messages -queue it and use an idle watcher if
58 you need to do any of these things.
59
60 C<$level> should be C<0> for messages to be logged always, C<1> for
61 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62 node connectivity and services, C<8> for debugging messages and C<9> for
63 tracing messages.
64
65 The default simply logs the message to STDERR.
66
67 =item @AnyEvent::MP::Kernel::WARN
68
69 All code references in this array are called for every log message, from
70 the default C<$WARN> handler. This is an easy way to tie into the log
71 messages without disturbing others.
72
73 =cut
74
75 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
76 our @WARN;
77 our $WARN = sub {
78 &$_ for @WARN;
79
80 return if $WARNLEVEL < $_[0];
81
82 my ($level, $msg) = @_;
83
84 $msg =~ s/\n$//;
85
86 printf STDERR "%s <%d> %s\n",
87 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
88 $level,
89 $msg;
90 };
91
92 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
93
94 The maximum level at which warning messages will be printed to STDERR by
95 the default warn handler.
96
97 =cut
98
99 sub load_func($) {
100 my $func = $_[0];
101
102 unless (defined &$func) {
103 my $pkg = $func;
104 do {
105 $pkg =~ s/::[^:]+$//
106 or return sub { die "unable to resolve function '$func'" };
107
108 local $@;
109 unless (eval "require $pkg; 1") {
110 my $error = $@;
111 $error =~ /^Can't locate .*.pm in \@INC \(/
112 or return sub { die $error };
113 }
114 } until defined &$func;
115 }
116
117 \&$func
118 }
119
120 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
121
122 sub nonce($) {
123 join "", map chr rand 256, 1 .. $_[0]
124 }
125
126 sub nonce62($) {
127 join "", map $alnum[rand 62], 1 .. $_[0]
128 }
129
130 sub gen_uniq {
131 my $now = AE::now;
132 (join "",
133 map $alnum[$_],
134 $$ / 62 % 62,
135 $$ % 62,
136 (int $now ) % 62,
137 (int $now * 100) % 62,
138 (int $now * 10000) % 62,
139 ) . nonce62 4;
140 }
141
142 our $CONFIG; # this node's configuration
143 our $SECURE = sub { 1 };
144
145 our $RUNIQ; # remote uniq value
146 our $UNIQ; # per-process/node unique cookie
147 our $NODE;
148 our $ID = "a";
149
150 our %NODE; # node id to transport mapping, or "undef", for local node
151 our (%PORT, %PORT_DATA); # local ports
152
153 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
154 our %LMON; # monitored _local_ ports
155
156 our $GLOBAL; # true if node is a global ("directory") node
157 our %BINDS;
158 our $BINDS; # our listeners, as arrayref
159
160 our $SRCNODE; # holds the sending node _object_ during _inject
161
162 sub _init_names {
163 # ~54 bits, for local port names, lowercase $ID appended
164 $UNIQ = gen_uniq;
165
166 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
167 $RUNIQ = nonce62 10;
168 $RUNIQ =~ s/(.)$/\U$1/;
169
170 $NODE = "anon/$RUNIQ";
171 }
172
173 _init_names;
174
175 sub NODE() {
176 $NODE
177 }
178
179 sub node_of($) {
180 my ($node, undef) = split /#/, $_[0], 2;
181
182 $node
183 }
184
185 BEGIN {
186 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
187 ? sub () { 1 }
188 : sub () { 0 };
189 }
190
191 our $DELAY_TIMER;
192 our @DELAY_QUEUE;
193
194 sub _delay_run {
195 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
196 }
197
198 sub delay($) {
199 push @DELAY_QUEUE, shift;
200 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
201 }
202
203 sub _inject {
204 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
205 &{ $PORT{+shift} or return };
206 }
207
208 # this function adds a node-ref, so you can send stuff to it
209 # it is basically the central routing component.
210 sub add_node {
211 my ($node) = @_;
212
213 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
214 }
215
216 sub snd(@) {
217 my ($nodeid, $portid) = split /#/, shift, 2;
218
219 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
220
221 defined $nodeid #d#UGLY
222 or Carp::croak "'undef' is not a valid node ID/port ID";
223
224 ($NODE{$nodeid} || add_node $nodeid)
225 ->{send} (["$portid", @_]);
226 }
227
228 =item $is_local = port_is_local $port
229
230 Returns true iff the port is a local port.
231
232 =cut
233
234 sub port_is_local($) {
235 my ($nodeid, undef) = split /#/, $_[0], 2;
236
237 $NODE{$nodeid} == $NODE{""}
238 }
239
240 =item snd_to_func $node, $func, @args
241
242 Expects a node ID and a name of a function. Asynchronously tries to call
243 this function with the given arguments on that node.
244
245 This function can be used to implement C<spawn>-like interfaces.
246
247 =cut
248
249 sub snd_to_func($$;@) {
250 my $nodeid = shift;
251
252 # on $NODE, we artificially delay... (for spawn)
253 # this is very ugly - maybe we should simply delay ALL messages,
254 # to avoid deep recursion issues. but that's so... slow...
255 $AnyEvent::MP::Node::Self::DELAY = 1
256 if $nodeid ne $NODE;
257
258 defined $nodeid #d#UGLY
259 or Carp::croak "'undef' is not a valid node ID/port ID";
260
261 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
262 }
263
264 =item snd_on $node, @msg
265
266 Executes C<snd> with the given C<@msg> (which must include the destination
267 port) on the given node.
268
269 =cut
270
271 sub snd_on($@) {
272 my $node = shift;
273 snd $node, snd => @_;
274 }
275
276 =item eval_on $node, $string[, @reply]
277
278 Evaluates the given string as Perl expression on the given node. When
279 @reply is specified, then it is used to construct a reply message with
280 C<"$@"> and any results from the eval appended.
281
282 =cut
283
284 sub eval_on($$;@) {
285 my $node = shift;
286 snd $node, eval => @_;
287 }
288
289 sub kil(@) {
290 my ($nodeid, $portid) = split /#/, shift, 2;
291
292 length $portid
293 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
294
295 ($NODE{$nodeid} || add_node $nodeid)
296 ->kill ("$portid", @_);
297 }
298
299 #############################################################################
300 # node monitoring and info
301
302 =item node_is_known $nodeid
303
304 #TODO#
305 Returns true iff the given node is currently known to this node.
306
307 =cut
308
309 sub node_is_known($) {
310 exists $NODE{$_[0]}
311 }
312
313 =item node_is_up $nodeid
314
315 Returns true if the given node is "up", that is, the kernel thinks it has
316 a working connection to it.
317
318 If the node is known (to this local node) but not currently connected,
319 returns C<0>. If the node is not known, returns C<undef>.
320
321 =cut
322
323 sub node_is_up($) {
324 ($NODE{$_[0]} or return)->{transport}
325 ? 1 : 0
326 }
327
328 =item up_nodes
329
330 Return the node IDs of all nodes that are currently connected (excluding
331 the node itself).
332
333 =cut
334
335 sub up_nodes() {
336 map $_->{id}, grep $_->{transport}, values %NODE
337 }
338
339 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
340
341 Registers a callback that is called each time a node goes up (a connection
342 is established) or down (the connection is lost).
343
344 Node up messages can only be followed by node down messages for the same
345 node, and vice versa.
346
347 Note that monitoring a node is usually better done by monitoring its node
348 port. This function is mainly of interest to modules that are concerned
349 about the network topology and low-level connection handling.
350
351 Callbacks I<must not> block and I<should not> send any messages.
352
353 The function returns an optional guard which can be used to unregister
354 the monitoring callback again.
355
356 Example: make sure you call function C<newnode> for all nodes that are up
357 or go up (and down).
358
359 newnode $_, 1 for up_nodes;
360 mon_nodes \&newnode;
361
362 =cut
363
364 our %MON_NODES;
365
366 sub mon_nodes($) {
367 my ($cb) = @_;
368
369 $MON_NODES{$cb+0} = $cb;
370
371 defined wantarray
372 and Guard::guard { delete $MON_NODES{$cb+0} }
373 }
374
375 sub _inject_nodeevent($$;@) {
376 my ($node, $up, @reason) = @_;
377
378 for my $cb (values %MON_NODES) {
379 eval { $cb->($node->{id}, $up, @reason); 1 }
380 or $WARN->(1, $@);
381 }
382
383 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
384 }
385
386 #############################################################################
387 # self node code
388
389 sub _kill {
390 my $port = shift;
391
392 delete $PORT{$port}
393 or return; # killing nonexistent ports is O.K.
394 delete $PORT_DATA{$port};
395
396 my $mon = delete $LMON{$port}
397 or !@_
398 or $WARN->(2, "unmonitored local port $port died with reason: @_");
399
400 $_->(@_) for values %$mon;
401 }
402
403 sub _monitor {
404 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
405 unless exists $PORT{$_[1]};
406
407 $LMON{$_[1]}{$_[2]+0} = $_[2];
408 }
409
410 sub _unmonitor {
411 delete $LMON{$_[1]}{$_[2]+0}
412 if exists $LMON{$_[1]};
413 }
414
415 sub _secure_check {
416 $SECURE->($SRCNODE->{id})
417 or $SRCNODE->{id} eq $NODE
418 or die "remote execution attempt by insecure node\n";
419 }
420
421 our %NODE_REQ = (
422 # internal services
423
424 # monitoring
425 mon0 => sub { # stop monitoring a port for another node
426 my $portid = shift;
427 _unmonitor undef, $portid, delete $NODE{$SRCNODE->{id}}{rmon}{$portid};
428 },
429 mon1 => sub { # start monitoring a port for another node
430 my $portid = shift;
431 Scalar::Util::weaken (my $node = $NODE{$SRCNODE->{id}});
432 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
433 delete $node->{rmon}{$portid};
434 $node->send (["", kil0 => $portid, @_])
435 if $node && $node->{transport};
436 };
437 },
438 # another node has killed a monitored port
439 kil0 => sub {
440 my $cbs = delete $NODE{$SRCNODE->{id}}{lmon}{+shift}
441 or return;
442
443 $_->(@_) for @$cbs;
444 },
445
446 # "public" services - not actually public
447
448 # another node wants to kill a local port
449 kil => \&_kill,
450
451 # is the remote node considered secure?
452 # secure => sub {
453 # #TODO#
454 # },
455
456 # relay message to another node / generic echo
457 snd => sub {
458 &_secure_check;
459 &snd
460 },
461
462 # random utilities
463 eval => sub {
464 &_secure_check;
465 my @res = do { package main; eval shift };
466 snd @_, "$@", @res if @_;
467 },
468 time => sub {
469 &_secure_check;
470 snd @_, AE::now;
471 },
472 devnull => sub {
473 #
474 },
475 "" => sub {
476 # empty messages are keepalives or similar devnull-applications
477 },
478 );
479
480 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
481 $PORT{""} = sub {
482 my $tag = shift;
483 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
484 $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@;
485 };
486
487 our $NPROTO = 1;
488
489 # tell everybody who connects our nproto
490 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
491 $_[0]{local_greeting}{nproto} = $NPROTO;
492 };
493
494 #############################################################################
495 # seed management, try to keep connections to all seeds at all times
496
497 our %SEED_NODE; # seed ID => node ID|undef
498 our %NODE_SEED; # map node ID to seed ID
499 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
500 our $SEED_WATCHER;
501 our $SEED_RETRY;
502
503 sub seed_connect {
504 my ($seed) = @_;
505
506 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
507 or Carp::croak "$seed: unparsable seed address";
508
509 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
510
511 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
512 $host, $port,
513 on_greeted => sub {
514 # called after receiving remote greeting, learn remote node name
515
516 # we rely on untrusted data here (the remote node name) this is
517 # hopefully ok, as this can at most be used for DOSing, which is easy
518 # when you can do MITM anyway.
519
520 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
521 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
522 require AnyEvent::MP::Global; # every seed becomes a global node currently
523 delete $SEED_NODE{$seed};
524 delete $NODE_SEED{$seed};
525 } else {
526 $SEED_NODE{$seed} = $_[0]{remote_node};
527 $NODE_SEED{$_[0]{remote_node}} = $seed;
528 }
529 },
530 on_destroy => sub {
531 delete $SEED_CONNECT{$seed};
532 },
533 sub {
534 $SEED_CONNECT{$seed} = 1;
535 }
536 ;
537 }
538
539 sub seed_all {
540 my @seeds;
541
542 for (grep !exists $SEED_CONNECT{$_}, keys %SEED_NODE) {
543 if (defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})) {
544 # node is up, make sure it's running the global service
545 snd $_, "g_slave"
546 unless $NODE{$_}{transport}{remote_greeting}{global};
547 } else {
548 # else node is down, we need to seed
549 push @seeds, $_;
550 }
551 }
552
553 if (@seeds) {
554 # start connection attempt for every seed we are not connected to yet
555 seed_connect $_
556 for @seeds;
557
558 $SEED_RETRY = $SEED_RETRY * 2 + rand;
559 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
560 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
561
562 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
563
564 } else {
565 # all seeds connected or connecting, no need to restart timer
566 undef $SEED_WATCHER;
567 }
568 }
569
570 sub seed_again {
571 $SEED_RETRY = 1;
572 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
573 }
574
575 # sets new seed list, starts connecting
576 sub set_seeds(@) {
577 %SEED_NODE = ();
578 %NODE_SEED = ();
579 %SEED_CONNECT = ();
580
581 @SEED_NODE{@_} = ();
582
583 seed_again;#d#
584 seed_all;
585 }
586
587 mon_nodes sub {
588 # if we lost the connection to a seed node, make sure we are seeding
589 seed_again
590 if !$_[1] && exists $NODE_SEED{$_[0]};
591 };
592
593 #############################################################################
594 # talk with/to global nodes
595
596 # protocol messages:
597 #
598 # sent by all slave nodes (slave to master)
599 # g_slave database - make other global node master of the sender
600 #
601 # sent by any node to global nodes
602 # g_set database - set whole database
603 # g_upd family set del - update single family
604 # g_del family key - delete key from database
605 # g_get family key reply... - send reply with data
606 #
607 # send by global nodes
608 # g_global - node became global, similar to global=1 greeting
609 #
610 # database families
611 # "'l" -> node -> listeners
612 # "'g" -> node -> undef
613 # ...
614 #
615
616 # used on all nodes:
617 our $MASTER; # the global node we bind ourselves to
618 our $MASTER_MON;
619 our %LOCAL_DB; # this node database
620
621 our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
622
623 our $GPROTO = 1;
624
625 # tell everybody who connects our nproto
626 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
627 $_[0]{local_greeting}{gproto} = $GPROTO;
628 };
629
630 #############################################################################
631 # master selection
632
633 # master requests
634 our %GLOBAL_REQ; # $id => \@req
635
636 sub global_req_add {
637 my ($id, $req) = @_;
638
639 return if exists $GLOBAL_REQ{$id};
640
641 $GLOBAL_REQ{$id} = $req;
642
643 snd $MASTER, @$req
644 if $MASTER;
645 }
646
647 sub global_req_del {
648 delete $GLOBAL_REQ{$_[0]};
649 }
650
651 #################################
652 # master rpc
653
654 our %GLOBAL_RES;
655 our $GLOBAL_RES_ID = "a";
656
657 sub global_call {
658 my $id = ++$GLOBAL_RES_ID;
659 $GLOBAL_RES{$id} = pop;
660 global_req_add $id, [@_, $id];
661 }
662
663 $NODE_REQ{g_reply} = sub {
664 my $id = shift;
665 global_req_del $id;
666 my $cb = delete $GLOBAL_RES{$id}
667 or return;
668 &$cb
669 };
670
671 #################################
672
673 sub g_find {
674 global_req_add "g_find $_[0]", [g_find => $_[0]];
675 }
676
677 # reply for g_find started in Node.pm
678 $NODE_REQ{g_found} = sub {
679 global_req_del "g_find $_[0]";
680
681 my $node = $NODE{$_[0]} or return;
682
683 $node->connect_to ($_[1]);
684 };
685
686 sub master_set {
687 $MASTER = $_[0];
688
689 snd $MASTER, g_slave => \%LOCAL_DB;
690
691 # (re-)send queued requests
692 snd $MASTER, @$_
693 for values %GLOBAL_REQ;
694 }
695
696 sub master_search {
697 #TODO: should also look for other global nodes, but we don't know them #d#
698 for (keys %NODE_SEED) {
699 if (node_is_up $_) {
700 master_set $_;
701 return;
702 }
703 }
704
705 $MASTER_MON = mon_nodes sub {
706 return unless $_[1]; # we are only interested in node-ups
707 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
708
709 master_set $_[0];
710
711 $MASTER_MON = mon_nodes sub {
712 if ($_[0] eq $MASTER && !$_[1]) {
713 undef $MASTER;
714 master_search ();
715 }
716 };
717 };
718 }
719
720 # other node wants to make us the master
721 $NODE_REQ{g_slave} = sub {
722 my ($db) = @_;
723
724 # load global module and redo the request
725 require AnyEvent::MP::Global;
726 &{ $NODE_REQ{g_slave} }
727 };
728
729 #############################################################################
730 # local database operations
731
732 # local database management
733
734 sub db_set($$;$) {
735 # if (ref $_[1]) {
736 # # bulk
737 # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
738 # $LOCAL_DB{$_[0]} = $_[1];
739 # snd $MASTER, g_upd => $_[0] => $_[1], \@del
740 # if defined $MASTER;
741 # } else {
742 # single-key
743 $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
744 snd $MASTER, g_upd => $_[0] => { $_[1] => $_[2] }
745 if defined $MASTER;
746 # }
747 }
748
749 sub db_del($@) {
750 my $family = shift;
751
752 delete @{ $LOCAL_DB{$family} }{@_};
753 snd $MASTER, g_upd => $family => undef, \@_
754 if defined $MASTER;
755 }
756
757 sub db_reg($$;$) {
758 my ($family, $key) = @_;
759 &db_set;
760 Guard::guard { db_del $family => $key }
761 }
762
763 # database query
764
765 sub db_family {
766 my ($family, $cb) = @_;
767 global_call g_db_family => $family, $cb;
768 }
769
770 sub db_keys {
771 my ($family, $cb) = @_;
772 global_call g_db_keys => $family, $cb;
773 }
774
775 sub db_values {
776 my ($family, $cb) = @_;
777 global_call g_db_values => $family, $cb;
778 }
779
780 # database monitoring
781
782 our %LOCAL_MON; # f, reply
783 our %MON_DB; # f, k, value
784
785 sub db_mon($@) {
786 my ($family, $cb) = @_;
787
788 if (my $db = $MON_DB{$family}) {
789 # we already monitor, so create a "dummy" change event
790 # this is postponed, which might be too late (we could process
791 # change events), so disable the callback at first
792 $LOCAL_MON{$family}{$cb+0} = sub { };
793 AE::postpone {
794 return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
795
796 # set actual callback
797 $LOCAL_MON{$family}{$cb+0} = $cb;
798 $cb->($db, [keys %$db]);
799 };
800 } else {
801 # new monitor, request chg1 from upstream
802 $LOCAL_MON{$family}{$cb+0} = $cb;
803 global_req_add "mon1 $family" => [g_mon1 => $family];
804 $MON_DB{$family} = {};
805 }
806
807 defined wantarray
808 and Guard::guard {
809 my $mon = $LOCAL_MON{$family};
810 delete $mon->{$cb+0};
811
812 unless (%$mon) {
813 global_req_del "mon1 $family";
814
815 # no global_req, because we don't care if we are not connected
816 snd $MASTER, g_mon0 => $family
817 if $MASTER;
818
819 delete $LOCAL_MON{$family};
820 delete $MON_DB{$family};
821 }
822 }
823 }
824
825 # full update
826 $NODE_REQ{g_chg1} = sub {
827 my ($f, $ndb) = @_;
828
829 my $db = $MON_DB{$f};
830 my (@a, @c, @d);
831
832 # add or replace keys
833 while (my ($k, $v) = each %$ndb) {
834 exists $db->{$k}
835 ? push @c, $k
836 : push @a, $k;
837 $db->{$k} = $v;
838 }
839
840 # delete keys that are no longer present
841 for (grep !exists $ndb->{$_}, keys %$db) {
842 delete $db->{$_};
843 push @d, $_;
844 }
845
846 $_->($db, \@a, \@c, \@d)
847 for values %{ $LOCAL_MON{$_[0]} };
848 };
849
850 # incremental update
851 $NODE_REQ{g_chg2} = sub {
852 my ($family, $set, $del) = @_;
853
854 my $db = $MON_DB{$family};
855
856 my (@a, @c);
857
858 while (my ($k, $v) = each %$set) {
859 exists $db->{$k}
860 ? push @c, $k
861 : push @a, $k;
862 $db->{$k} = $v;
863 }
864
865 delete @$db{@$del};
866
867 $_->($db, \@a, \@c, $del)
868 for values %{ $LOCAL_MON{$family} };
869 };
870
871 #############################################################################
872 # configure
873
874 sub nodename {
875 require POSIX;
876 (POSIX::uname ())[1]
877 }
878
879 sub _resolve($) {
880 my ($nodeid) = @_;
881
882 my $cv = AE::cv;
883 my @res;
884
885 $cv->begin (sub {
886 my %seen;
887 my @refs;
888 for (sort { $a->[0] <=> $b->[0] } @res) {
889 push @refs, $_->[1] unless $seen{$_->[1]}++
890 }
891 shift->send (@refs);
892 });
893
894 my $idx;
895 for my $t (split /,/, $nodeid) {
896 my $pri = ++$idx;
897
898 $t = length $t ? nodename . ":$t" : nodename
899 if $t =~ /^\d*$/;
900
901 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
902 or Carp::croak "$t: unparsable transport descriptor";
903
904 $port = "0" if $port eq "*";
905
906 if ($host eq "*") {
907 $cv->begin;
908 # use fork_call, as Net::Interface is big, and we need it rarely.
909 require AnyEvent::Util;
910 AnyEvent::Util::fork_call (
911 sub {
912 my @addr;
913
914 require Net::Interface;
915
916 for my $if (Net::Interface->interfaces) {
917 # we statically lower-prioritise ipv6 here, TODO :()
918 for $_ ($if->address (Net::Interface::AF_INET ())) {
919 next if /^\x7f/; # skip localhost etc.
920 push @addr, $_;
921 }
922 for ($if->address (Net::Interface::AF_INET6 ())) {
923 #next if $if->scope ($_) <= 2;
924 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
925 push @addr, $_;
926 }
927
928 }
929 @addr
930 }, sub {
931 for my $ip (@_) {
932 push @res, [
933 $pri += 1e-5,
934 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
935 ];
936 }
937 $cv->end;
938 }
939 );
940 } else {
941 $cv->begin;
942 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
943 for (@_) {
944 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
945 push @res, [
946 $pri += 1e-5,
947 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
948 ];
949 }
950 $cv->end;
951 };
952 }
953 }
954
955 $cv->end;
956
957 $cv
958 }
959
960 sub configure(@) {
961 unshift @_, "profile" if @_ & 1;
962 my (%kv) = @_;
963
964 delete $NODE{$NODE}; # we do not support doing stuff before configure
965 _init_names;
966
967 my $profile = delete $kv{profile};
968
969 $profile = nodename
970 unless defined $profile;
971
972 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
973
974 if (exists $CONFIG->{secure}) {
975 my $pass = !$CONFIG->{secure};
976 $SECURE = sub { $pass };
977 }
978
979 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
980
981 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
982
983 $NODE = $node;
984
985 $NODE =~ s/%n/nodename/ge;
986
987 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
988 # nodes with randomised node names do not need randomised port names
989 $UNIQ = "";
990 }
991
992 $NODE{$NODE} = $NODE{""};
993 $NODE{$NODE}{id} = $NODE;
994
995 my $seeds = $CONFIG->{seeds};
996 my $binds = $CONFIG->{binds};
997
998 $binds ||= ["*"];
999
1000 $WARN->(8, "node $NODE starting up.");
1001
1002 $BINDS = [];
1003 %BINDS = ();
1004
1005 for (map _resolve $_, @$binds) {
1006 for my $bind ($_->recv) {
1007 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1008 or Carp::croak "$bind: unparsable local bind address";
1009
1010 my $listener = AnyEvent::MP::Transport::mp_server
1011 $host,
1012 $port,
1013 prepare => sub {
1014 my (undef, $host, $port) = @_;
1015 $bind = AnyEvent::Socket::format_hostport $host, $port;
1016 0
1017 },
1018 ;
1019 $BINDS{$bind} = $listener;
1020 push @$BINDS, $bind;
1021 }
1022 }
1023
1024 db_set "'l" => $NODE => $BINDS;
1025
1026 $WARN->(8, "node listens on [@$BINDS].");
1027
1028 # connect to all seednodes
1029 set_seeds map $_->recv, map _resolve $_, @$seeds;
1030
1031 master_search;
1032
1033 if ($NODE eq "atha") {;#d#
1034 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
1035 }
1036
1037 for (@{ $CONFIG->{services} }) {
1038 if (ref) {
1039 my ($func, @args) = @$_;
1040 (load_func $func)->(@args);
1041 } elsif (s/::$//) {
1042 eval "require $_";
1043 die $@ if $@;
1044 } else {
1045 (load_func $_)->();
1046 }
1047 }
1048 }
1049
1050 =back
1051
1052 =head1 SEE ALSO
1053
1054 L<AnyEvent::MP>.
1055
1056 =head1 AUTHOR
1057
1058 Marc Lehmann <schmorp@schmorp.de>
1059 http://home.schmorp.de/
1060
1061 =cut
1062
1063 1
1064