ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.94
Committed: Wed Mar 14 23:36:33 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.93: +2 -2 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29
30 use AnyEvent ();
31 use Guard ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT_OK = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 );
41
42 our @EXPORT = qw(
43 add_node load_func snd_to_func snd_on eval_on
44
45 NODE $NODE node_of snd kil port_is_local
46 configure
47 up_nodes mon_nodes node_is_up
48 db_set db_del db_reg
49 db_mon db_family db_keys db_values
50 );
51
52 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
53
54 This value is called with an error or warning message, when e.g. a
55 connection could not be created, authorisation failed and so on.
56
57 It I<must not> block or send messages -queue it and use an idle watcher if
58 you need to do any of these things.
59
60 C<$level> should be C<0> for messages to be logged always, C<1> for
61 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62 node connectivity and services, C<8> for debugging messages and C<9> for
63 tracing messages.
64
65 The default simply logs the message to STDERR.
66
67 =item @AnyEvent::MP::Kernel::WARN
68
69 All code references in this array are called for every log message, from
70 the default C<$WARN> handler. This is an easy way to tie into the log
71 messages without disturbing others.
72
73 =cut
74
75 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
76 our @WARN;
77 our $WARN = sub {
78 &$_ for @WARN;
79
80 return if $WARNLEVEL < $_[0];
81
82 my ($level, $msg) = @_;
83
84 $msg =~ s/\n$//;
85
86 printf STDERR "%s <%d> %s\n",
87 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
88 $level,
89 $msg;
90 };
91
92 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
93
94 The maximum level at which warning messages will be printed to STDERR by
95 the default warn handler.
96
97 =cut
98
99 sub load_func($) {
100 my $func = $_[0];
101
102 unless (defined &$func) {
103 my $pkg = $func;
104 do {
105 $pkg =~ s/::[^:]+$//
106 or return sub { die "unable to resolve function '$func'" };
107
108 local $@;
109 unless (eval "require $pkg; 1") {
110 my $error = $@;
111 $error =~ /^Can't locate .*.pm in \@INC \(/
112 or return sub { die $error };
113 }
114 } until defined &$func;
115 }
116
117 \&$func
118 }
119
120 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
121
122 sub nonce($) {
123 join "", map chr rand 256, 1 .. $_[0]
124 }
125
126 sub nonce62($) {
127 join "", map $alnum[rand 62], 1 .. $_[0]
128 }
129
130 sub gen_uniq {
131 my $now = AE::now;
132 (join "",
133 map $alnum[$_],
134 $$ / 62 % 62,
135 $$ % 62,
136 (int $now ) % 62,
137 (int $now * 100) % 62,
138 (int $now * 10000) % 62,
139 ) . nonce62 4;
140 }
141
142 our $CONFIG; # this node's configuration
143 our $SECURE = sub { 1 };
144
145 our $RUNIQ; # remote uniq value
146 our $UNIQ; # per-process/node unique cookie
147 our $NODE;
148 our $ID = "a";
149
150 our %NODE; # node id to transport mapping, or "undef", for local node
151 our (%PORT, %PORT_DATA); # local ports
152
153 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
154 our %LMON; # monitored _local_ ports
155
156 our $GLOBAL; # true if node is a global ("directory") node
157 our %BINDS;
158 our $BINDS; # our listeners, as arrayref
159
160 our $SRCNODE; # holds the sending node _object_ during _inject
161
162 sub _init_names {
163 # ~54 bits, for local port names, lowercase $ID appended
164 $UNIQ = gen_uniq;
165
166 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
167 $RUNIQ = nonce62 10;
168 $RUNIQ =~ s/(.)$/\U$1/;
169
170 $NODE = "anon/$RUNIQ";
171 }
172
173 _init_names;
174
175 sub NODE() {
176 $NODE
177 }
178
179 sub node_of($) {
180 my ($node, undef) = split /#/, $_[0], 2;
181
182 $node
183 }
184
185 BEGIN {
186 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
187 ? sub () { 1 }
188 : sub () { 0 };
189 }
190
191 our $DELAY_TIMER;
192 our @DELAY_QUEUE;
193
194 sub _delay_run {
195 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
196 }
197
198 sub delay($) {
199 push @DELAY_QUEUE, shift;
200 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
201 }
202
203 sub _inject {
204 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
205 &{ $PORT{+shift} or return };
206 }
207
208 # this function adds a node-ref, so you can send stuff to it
209 # it is basically the central routing component.
210 sub add_node {
211 my ($node) = @_;
212
213 length $node
214 or Carp::croak "'undef' or the empty string are not valid node/port IDs";
215
216 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
217 }
218
219 sub snd(@) {
220 my ($nodeid, $portid) = split /#/, shift, 2;
221
222 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
223
224 ($NODE{$nodeid} || add_node $nodeid)
225 ->{send} (["$portid", @_]);
226 }
227
228 =item $is_local = port_is_local $port
229
230 Returns true iff the port is a local port.
231
232 =cut
233
234 sub port_is_local($) {
235 my ($nodeid, undef) = split /#/, $_[0], 2;
236
237 $NODE{$nodeid} == $NODE{""}
238 }
239
240 =item snd_to_func $node, $func, @args
241
242 Expects a node ID and a name of a function. Asynchronously tries to call
243 this function with the given arguments on that node.
244
245 This function can be used to implement C<spawn>-like interfaces.
246
247 =cut
248
249 sub snd_to_func($$;@) {
250 my $nodeid = shift;
251
252 # on $NODE, we artificially delay... (for spawn)
253 # this is very ugly - maybe we should simply delay ALL messages,
254 # to avoid deep recursion issues. but that's so... slow...
255 $AnyEvent::MP::Node::Self::DELAY = 1
256 if $nodeid ne $NODE;
257
258 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
259 }
260
261 =item snd_on $node, @msg
262
263 Executes C<snd> with the given C<@msg> (which must include the destination
264 port) on the given node.
265
266 =cut
267
268 sub snd_on($@) {
269 my $node = shift;
270 snd $node, snd => @_;
271 }
272
273 =item eval_on $node, $string[, @reply]
274
275 Evaluates the given string as Perl expression on the given node. When
276 @reply is specified, then it is used to construct a reply message with
277 C<"$@"> and any results from the eval appended.
278
279 =cut
280
281 sub eval_on($$;@) {
282 my $node = shift;
283 snd $node, eval => @_;
284 }
285
286 sub kil(@) {
287 my ($nodeid, $portid) = split /#/, shift, 2;
288
289 length $portid
290 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
291
292 ($NODE{$nodeid} || add_node $nodeid)
293 ->kill ("$portid", @_);
294 }
295
296 #############################################################################
297 # node monitoring and info
298
299 =item node_is_known $nodeid
300
301 #TODO#
302 Returns true iff the given node is currently known to this node.
303
304 =cut
305
306 sub node_is_known($) {
307 exists $NODE{$_[0]}
308 }
309
310 =item node_is_up $nodeid
311
312 Returns true if the given node is "up", that is, the kernel thinks it has
313 a working connection to it.
314
315 If the node is known (to this local node) but not currently connected,
316 returns C<0>. If the node is not known, returns C<undef>.
317
318 =cut
319
320 sub node_is_up($) {
321 ($NODE{$_[0]} or return)->{transport}
322 ? 1 : 0
323 }
324
325 =item up_nodes
326
327 Return the node IDs of all nodes that are currently connected (excluding
328 the node itself).
329
330 =cut
331
332 sub up_nodes() {
333 map $_->{id}, grep $_->{transport}, values %NODE
334 }
335
336 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
337
338 Registers a callback that is called each time a node goes up (a connection
339 is established) or down (the connection is lost).
340
341 Node up messages can only be followed by node down messages for the same
342 node, and vice versa.
343
344 Note that monitoring a node is usually better done by monitoring its node
345 port. This function is mainly of interest to modules that are concerned
346 about the network topology and low-level connection handling.
347
348 Callbacks I<must not> block and I<should not> send any messages.
349
350 The function returns an optional guard which can be used to unregister
351 the monitoring callback again.
352
353 Example: make sure you call function C<newnode> for all nodes that are up
354 or go up (and down).
355
356 newnode $_, 1 for up_nodes;
357 mon_nodes \&newnode;
358
359 =cut
360
361 our %MON_NODES;
362
363 sub mon_nodes($) {
364 my ($cb) = @_;
365
366 $MON_NODES{$cb+0} = $cb;
367
368 defined wantarray
369 and Guard::guard { delete $MON_NODES{$cb+0} }
370 }
371
372 sub _inject_nodeevent($$;@) {
373 my ($node, $up, @reason) = @_;
374
375 for my $cb (values %MON_NODES) {
376 eval { $cb->($node->{id}, $up, @reason); 1 }
377 or $WARN->(1, $@);
378 }
379
380 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
381 }
382
383 #############################################################################
384 # self node code
385
386 sub _kill {
387 my $port = shift;
388
389 delete $PORT{$port}
390 or return; # killing nonexistent ports is O.K.
391 delete $PORT_DATA{$port};
392
393 my $mon = delete $LMON{$port}
394 or !@_
395 or $WARN->(2, "unmonitored local port $port died with reason: @_");
396
397 $_->(@_) for values %$mon;
398 }
399
400 sub _monitor {
401 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
402 unless exists $PORT{$_[1]};
403
404 $LMON{$_[1]}{$_[2]+0} = $_[2];
405 }
406
407 sub _unmonitor {
408 delete $LMON{$_[1]}{$_[2]+0}
409 if exists $LMON{$_[1]};
410 }
411
412 sub _secure_check {
413 $SECURE->($SRCNODE->{id})
414 or $SRCNODE->{id} eq $NODE
415 or die "remote execution attempt by insecure node\n";
416 }
417
418 our %NODE_REQ = (
419 # internal services
420
421 # monitoring
422 mon0 => sub { # stop monitoring a port for another node
423 my $portid = shift;
424 _unmonitor undef, $portid, delete $NODE{$SRCNODE->{id}}{rmon}{$portid};
425 },
426 mon1 => sub { # start monitoring a port for another node
427 my $portid = shift;
428 Scalar::Util::weaken (my $node = $NODE{$SRCNODE->{id}});
429 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
430 delete $node->{rmon}{$portid};
431 $node->send (["", kil0 => $portid, @_])
432 if $node && $node->{transport};
433 };
434 },
435 # another node has killed a monitored port
436 kil0 => sub {
437 my $cbs = delete $NODE{$SRCNODE->{id}}{lmon}{+shift}
438 or return;
439
440 $_->(@_) for @$cbs;
441 },
442
443 # "public" services - not actually public
444
445 # another node wants to kill a local port
446 kil => \&_kill,
447
448 # is the remote node considered secure?
449 # secure => sub {
450 # #TODO#
451 # },
452
453 # relay message to another node / generic echo
454 snd => sub {
455 &_secure_check;
456 &snd
457 },
458
459 # random utilities
460 eval => sub {
461 &_secure_check;
462 my @res = do { package main; eval shift };
463 snd @_, "$@", @res if @_;
464 },
465 time => sub {
466 &_secure_check;
467 snd @_, AE::now;
468 },
469 devnull => sub {
470 #
471 },
472 "" => sub {
473 # empty messages are keepalives or similar devnull-applications
474 },
475 );
476
477 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
478 $PORT{""} = sub {
479 my $tag = shift;
480 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
481 $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@;
482 };
483
484 our $NPROTO = 1;
485
486 # tell everybody who connects our nproto
487 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
488 $_[0]{local_greeting}{nproto} = $NPROTO;
489 };
490
491 #############################################################################
492 # seed management, try to keep connections to all seeds at all times
493
494 our %SEED_NODE; # seed ID => node ID|undef
495 our %NODE_SEED; # map node ID to seed ID
496 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
497 our $SEED_WATCHER;
498 our $SEED_RETRY;
499
500 sub seed_connect {
501 my ($seed) = @_;
502
503 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
504 or Carp::croak "$seed: unparsable seed address";
505
506 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
507
508 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
509 $host, $port,
510 on_greeted => sub {
511 # called after receiving remote greeting, learn remote node name
512
513 # we rely on untrusted data here (the remote node name) this is
514 # hopefully ok, as this can at most be used for DOSing, which is easy
515 # when you can do MITM anyway.
516
517 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
518 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
519 require AnyEvent::MP::Global; # every seed becomes a global node currently
520 delete $SEED_NODE{$seed};
521 } else {
522 $SEED_NODE{$seed} = $_[0]{remote_node};
523 $NODE_SEED{$_[0]{remote_node}} = $seed;
524 # also start global service, if not running
525 # we need to check here in addition to the mon_nodes below
526 # because we might only learn late that a node is a seed
527 # and then we might already be connected
528 snd $_[0]{remote_node}, "g_slave"
529 unless $_[0]{remote_greeting}{global};
530 }
531 },
532 sub {
533 delete $SEED_CONNECT{$seed};
534 }
535 ;
536 }
537
538 sub seed_all {
539 my @seeds = grep
540 !exists $SEED_CONNECT{$_}
541 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
542 keys %SEED_NODE;
543
544 if (@seeds) {
545 # start connection attempt for every seed we are not connected to yet
546 seed_connect $_
547 for @seeds;
548
549 $SEED_RETRY = $SEED_RETRY * 2 + rand;
550 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
551 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
552
553 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
554
555 } else {
556 # all seeds connected or connecting, no need to restart timer
557 undef $SEED_WATCHER;
558 }
559 }
560
561 sub seed_again {
562 $SEED_RETRY = 1;
563 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
564 }
565
566 # sets new seed list, starts connecting
567 sub set_seeds(@) {
568 %SEED_NODE = ();
569 %NODE_SEED = ();
570 %SEED_CONNECT = ();
571
572 @SEED_NODE{@_} = ();
573
574 seed_all;
575 }
576
577 mon_nodes sub {
578 return unless exists $NODE_SEED{$_[0]};
579
580 if ($_[1]) {
581 # each time a connection to a seed node goes up, make
582 # sure it runs the global service.
583 snd $_[0], "g_slave"
584 unless $NODE{$_[0]}{transport}{remote_greeting}{global};
585 } else {
586 # if we lost the connection to a seed node, make sure we are seeding
587 seed_again;
588 }
589 };
590
591 #############################################################################
592 # talk with/to global nodes
593
594 # protocol messages:
595 #
596 # sent by all slave nodes (slave to master)
597 # g_slave database - make other global node master of the sender
598 #
599 # sent by any node to global nodes
600 # g_set database - set whole database
601 # g_upd family set del - update single family
602 # g_del family key - delete key from database
603 # g_get family key reply... - send reply with data
604 #
605 # send by global nodes
606 # g_global - node became global, similar to global=1 greeting
607 #
608 # database families
609 # "'l" -> node -> listeners
610 # "'g" -> node -> undef
611 # ...
612 #
613
614 # used on all nodes:
615 our $MASTER; # the global node we bind ourselves to
616 our $MASTER_MON;
617 our %LOCAL_DB; # this node database
618
619 our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
620
621 our $GPROTO = 1;
622
623 # tell everybody who connects our nproto
624 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
625 $_[0]{local_greeting}{gproto} = $GPROTO;
626 };
627
628 #############################################################################
629 # master selection
630
631 # master requests
632 our %GLOBAL_REQ; # $id => \@req
633
634 sub global_req_add {
635 my ($id, $req) = @_;
636
637 return if exists $GLOBAL_REQ{$id};
638
639 $GLOBAL_REQ{$id} = $req;
640
641 snd $MASTER, @$req
642 if $MASTER;
643 }
644
645 sub global_req_del {
646 delete $GLOBAL_REQ{$_[0]};
647 }
648
649 #################################
650 # master rpc
651
652 our %GLOBAL_RES;
653 our $GLOBAL_RES_ID = "a";
654
655 sub global_call {
656 my $id = ++$GLOBAL_RES_ID;
657 $GLOBAL_RES{$id} = pop;
658 global_req_add $id, [@_, $id];
659 }
660
661 $NODE_REQ{g_reply} = sub {
662 my $id = shift;
663 global_req_del $id;
664 my $cb = delete $GLOBAL_RES{$id}
665 or return;
666 &$cb
667 };
668
669 #################################
670
671 sub g_find {
672 global_req_add "g_find $_[0]", [g_find => $_[0]];
673 }
674
675 # reply for g_find started in Node.pm
676 $NODE_REQ{g_found} = sub {
677 global_req_del "g_find $_[0]";
678
679 my $node = $NODE{$_[0]} or return;
680
681 $node->connect_to ($_[1]);
682 };
683
684 sub master_set {
685 $MASTER = $_[0];
686
687 snd $MASTER, g_slave => \%LOCAL_DB;
688
689 # (re-)send queued requests
690 snd $MASTER, @$_
691 for values %GLOBAL_REQ;
692 }
693
694 sub master_search {
695 #TODO: should also look for other global nodes, but we don't know them #d#
696 for (keys %NODE_SEED) {
697 if (node_is_up $_) {
698 master_set $_;
699 return;
700 }
701 }
702
703 $MASTER_MON = mon_nodes sub {
704 return unless $_[1]; # we are only interested in node-ups
705 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
706
707 master_set $_[0];
708
709 $MASTER_MON = mon_nodes sub {
710 if ($_[0] eq $MASTER && !$_[1]) {
711 undef $MASTER;
712 master_search ();
713 }
714 };
715 };
716 }
717
718 # other node wants to make us the master
719 $NODE_REQ{g_slave} = sub {
720 my ($db) = @_;
721
722 # load global module and redo the request
723 require AnyEvent::MP::Global;
724 &{ $NODE_REQ{g_slave} }
725 };
726
727 #############################################################################
728 # local database operations
729
730 # local database management
731
732 sub db_set($$;$) {
733 # if (ref $_[1]) {
734 # # bulk
735 # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
736 # $LOCAL_DB{$_[0]} = $_[1];
737 # snd $MASTER, g_upd => $_[0] => $_[1], \@del
738 # if defined $MASTER;
739 # } else {
740 # single-key
741 $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
742 snd $MASTER, g_upd => $_[0] => { $_[1] => $_[2] }
743 if defined $MASTER;
744 # }
745 }
746
747 sub db_del($@) {
748 my $family = shift;
749
750 delete @{ $LOCAL_DB{$family} }{@_};
751 snd $MASTER, g_upd => $family => undef, \@_
752 if defined $MASTER;
753 }
754
755 sub db_reg($$;$) {
756 my ($family, $key) = @_;
757 &db_set;
758 Guard::guard { db_del $family => $key }
759 }
760
761 # database query
762
763 sub db_family {
764 my ($family, $cb) = @_;
765 global_call g_db_family => $family, $cb;
766 }
767
768 sub db_keys {
769 my ($family, $cb) = @_;
770 global_call g_db_keys => $family, $cb;
771 }
772
773 sub db_values {
774 my ($family, $cb) = @_;
775 global_call g_db_values => $family, $cb;
776 }
777
778 # database monitoring
779
780 our %LOCAL_MON; # f, reply
781 our %MON_DB; # f, k, value
782
783 sub db_mon($@) {
784 my ($family, $cb) = @_;
785
786 if (my $db = $MON_DB{$family}) {
787 # we already monitor, so create a "dummy" change event
788 # this is postponed, which might be too late (we could process
789 # change events), so disable the callback at first
790 $LOCAL_MON{$family}{$cb+0} = sub { };
791 AE::postpone {
792 return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
793
794 # set actual callback
795 $LOCAL_MON{$family}{$cb+0} = $cb;
796 $cb->($db, [keys %$db]);
797 };
798 } else {
799 # new monitor, request chg1 from upstream
800 $LOCAL_MON{$family}{$cb+0} = $cb;
801 global_req_add "mon1 $family" => [g_mon1 => $family];
802 $MON_DB{$family} = {};
803 }
804
805 defined wantarray
806 and Guard::guard {
807 my $mon = $LOCAL_MON{$family};
808 delete $mon->{$cb+0};
809
810 unless (%$mon) {
811 global_req_del "mon1 $family";
812
813 # no global_req, because we don't care if we are not connected
814 snd $MASTER, g_mon0 => $family
815 if $MASTER;
816
817 delete $LOCAL_MON{$family};
818 delete $MON_DB{$family};
819 }
820 }
821 }
822
823 # full update
824 $NODE_REQ{g_chg1} = sub {
825 my ($f, $ndb) = @_;
826
827 my $db = $MON_DB{$f};
828 my (@a, @c, @d);
829
830 # add or replace keys
831 while (my ($k, $v) = each %$ndb) {
832 exists $db->{$k}
833 ? push @c, $k
834 : push @a, $k;
835 $db->{$k} = $v;
836 }
837
838 # delete keys that are no longer present
839 for (grep !exists $ndb->{$_}, keys %$db) {
840 delete $db->{$_};
841 push @d, $_;
842 }
843
844 $_->($db, \@a, \@c, \@d)
845 for values %{ $LOCAL_MON{$_[0]} };
846 };
847
848 # incremental update
849 $NODE_REQ{g_chg2} = sub {
850 my ($family, $set, $del) = @_;
851
852 my $db = $MON_DB{$family};
853
854 my (@a, @c);
855
856 while (my ($k, $v) = each %$set) {
857 exists $db->{$k}
858 ? push @c, $k
859 : push @a, $k;
860 $db->{$k} = $v;
861 }
862
863 delete @$db{@$del};
864
865 $_->($db, \@a, \@c, $del)
866 for values %{ $LOCAL_MON{$family} };
867 };
868
869 #############################################################################
870 # configure
871
872 sub nodename {
873 require POSIX;
874 (POSIX::uname ())[1]
875 }
876
877 sub _resolve($) {
878 my ($nodeid) = @_;
879
880 my $cv = AE::cv;
881 my @res;
882
883 $cv->begin (sub {
884 my %seen;
885 my @refs;
886 for (sort { $a->[0] <=> $b->[0] } @res) {
887 push @refs, $_->[1] unless $seen{$_->[1]}++
888 }
889 shift->send (@refs);
890 });
891
892 my $idx;
893 for my $t (split /,/, $nodeid) {
894 my $pri = ++$idx;
895
896 $t = length $t ? nodename . ":$t" : nodename
897 if $t =~ /^\d*$/;
898
899 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
900 or Carp::croak "$t: unparsable transport descriptor";
901
902 $port = "0" if $port eq "*";
903
904 if ($host eq "*") {
905 $cv->begin;
906 # use fork_call, as Net::Interface is big, and we need it rarely.
907 require AnyEvent::Util;
908 AnyEvent::Util::fork_call (
909 sub {
910 my @addr;
911
912 require Net::Interface;
913
914 for my $if (Net::Interface->interfaces) {
915 # we statically lower-prioritise ipv6 here, TODO :()
916 for $_ ($if->address (Net::Interface::AF_INET ())) {
917 next if /^\x7f/; # skip localhost etc.
918 push @addr, $_;
919 }
920 for ($if->address (Net::Interface::AF_INET6 ())) {
921 #next if $if->scope ($_) <= 2;
922 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
923 push @addr, $_;
924 }
925
926 }
927 @addr
928 }, sub {
929 for my $ip (@_) {
930 push @res, [
931 $pri += 1e-5,
932 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
933 ];
934 }
935 $cv->end;
936 }
937 );
938 } else {
939 $cv->begin;
940 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
941 for (@_) {
942 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
943 push @res, [
944 $pri += 1e-5,
945 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
946 ];
947 }
948 $cv->end;
949 };
950 }
951 }
952
953 $cv->end;
954
955 $cv
956 }
957
958 sub configure(@) {
959 unshift @_, "profile" if @_ & 1;
960 my (%kv) = @_;
961
962 delete $NODE{$NODE}; # we do not support doing stuff before configure
963 _init_names;
964
965 my $profile = delete $kv{profile};
966
967 $profile = nodename
968 unless defined $profile;
969
970 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
971
972 if (exists $CONFIG->{secure}) {
973 my $pass = !$CONFIG->{secure};
974 $SECURE = sub { $pass };
975 }
976
977 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
978
979 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
980
981 $NODE = $node;
982
983 $NODE =~ s/%n/nodename/ge;
984
985 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
986 # nodes with randomised node names do not need randomised port names
987 $UNIQ = "";
988 }
989
990 $NODE{$NODE} = $NODE{""};
991 $NODE{$NODE}{id} = $NODE;
992
993 my $seeds = $CONFIG->{seeds};
994 my $binds = $CONFIG->{binds};
995
996 $binds ||= ["*"];
997
998 $WARN->(8, "node $NODE starting up.");
999
1000 $BINDS = [];
1001 %BINDS = ();
1002
1003 for (map _resolve $_, @$binds) {
1004 for my $bind ($_->recv) {
1005 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
1006 or Carp::croak "$bind: unparsable local bind address";
1007
1008 my $listener = AnyEvent::MP::Transport::mp_server
1009 $host,
1010 $port,
1011 prepare => sub {
1012 my (undef, $host, $port) = @_;
1013 $bind = AnyEvent::Socket::format_hostport $host, $port;
1014 0
1015 },
1016 ;
1017 $BINDS{$bind} = $listener;
1018 push @$BINDS, $bind;
1019 }
1020 }
1021
1022 db_set "'l" => $NODE => $BINDS;
1023
1024 $WARN->(8, "node listens on [@$BINDS].");
1025
1026 # connect to all seednodes
1027 set_seeds map $_->recv, map _resolve $_, @$seeds;
1028
1029 master_search;
1030
1031 for (@{ $CONFIG->{services} }) {
1032 if (ref) {
1033 my ($func, @args) = @$_;
1034 (load_func $func)->(@args);
1035 } elsif (s/::$//) {
1036 eval "require $_";
1037 die $@ if $@;
1038 } else {
1039 (load_func $_)->();
1040 }
1041 }
1042 }
1043
1044 =back
1045
1046 =head1 SEE ALSO
1047
1048 L<AnyEvent::MP>.
1049
1050 =head1 AUTHOR
1051
1052 Marc Lehmann <schmorp@schmorp.de>
1053 http://home.schmorp.de/
1054
1055 =cut
1056
1057 1
1058