ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.95
Committed: Wed Mar 21 00:14:25 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.94: +3 -13 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29
30 use AnyEvent ();
31 use Guard ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT_OK = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 );
41
42 our @EXPORT = qw(
43 add_node load_func snd_to_func snd_on eval_on
44
45 NODE $NODE node_of snd kil port_is_local
46 configure
47 up_nodes mon_nodes node_is_up
48 db_set db_del db_reg
49 db_mon db_family db_keys db_values
50 );
51
52 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
53
54 This value is called with an error or warning message, when e.g. a
55 connection could not be created, authorisation failed and so on.
56
57 It I<must not> block or send messages -queue it and use an idle watcher if
58 you need to do any of these things.
59
60 C<$level> should be C<0> for messages to be logged always, C<1> for
61 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62 node connectivity and services, C<8> for debugging messages and C<9> for
63 tracing messages.
64
65 The default simply logs the message to STDERR.
66
67 =item @AnyEvent::MP::Kernel::WARN
68
69 All code references in this array are called for every log message, from
70 the default C<$WARN> handler. This is an easy way to tie into the log
71 messages without disturbing others.
72
73 =cut
74
75 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
76 our @WARN;
77 our $WARN = sub {
78 &$_ for @WARN;
79
80 return if $WARNLEVEL < $_[0];
81
82 my ($level, $msg) = @_;
83
84 $msg =~ s/\n$//;
85
86 printf STDERR "%s <%d> %s\n",
87 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
88 $level,
89 $msg;
90 };
91
92 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
93
94 The maximum level at which warning messages will be printed to STDERR by
95 the default warn handler.
96
97 =cut
98
99 sub load_func($) {
100 my $func = $_[0];
101
102 unless (defined &$func) {
103 my $pkg = $func;
104 do {
105 $pkg =~ s/::[^:]+$//
106 or return sub { die "unable to resolve function '$func'" };
107
108 local $@;
109 unless (eval "require $pkg; 1") {
110 my $error = $@;
111 $error =~ /^Can't locate .*.pm in \@INC \(/
112 or return sub { die $error };
113 }
114 } until defined &$func;
115 }
116
117 \&$func
118 }
119
120 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
121
122 sub nonce($) {
123 join "", map chr rand 256, 1 .. $_[0]
124 }
125
126 sub nonce62($) {
127 join "", map $alnum[rand 62], 1 .. $_[0]
128 }
129
130 sub gen_uniq {
131 my $now = AE::now;
132 (join "",
133 map $alnum[$_],
134 $$ / 62 % 62,
135 $$ % 62,
136 (int $now ) % 62,
137 (int $now * 100) % 62,
138 (int $now * 10000) % 62,
139 ) . nonce62 4;
140 }
141
142 our $CONFIG; # this node's configuration
143 our $SECURE = sub { 1 };
144
145 our $RUNIQ; # remote uniq value
146 our $UNIQ; # per-process/node unique cookie
147 our $NODE;
148 our $ID = "a";
149
150 our %NODE; # node id to transport mapping, or "undef", for local node
151 our (%PORT, %PORT_DATA); # local ports
152
153 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
154 our %LMON; # monitored _local_ ports
155
156 our $GLOBAL; # true if node is a global ("directory") node
157 our %BINDS;
158 our $BINDS; # our listeners, as arrayref
159
160 our $SRCNODE; # holds the sending node _object_ during _inject
161
162 sub _init_names {
163 # ~54 bits, for local port names, lowercase $ID appended
164 $UNIQ = gen_uniq;
165
166 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
167 $RUNIQ = nonce62 10;
168 $RUNIQ =~ s/(.)$/\U$1/;
169
170 $NODE = "anon/$RUNIQ";
171 }
172
173 _init_names;
174
175 sub NODE() {
176 $NODE
177 }
178
179 sub node_of($) {
180 my ($node, undef) = split /#/, $_[0], 2;
181
182 $node
183 }
184
185 BEGIN {
186 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
187 ? sub () { 1 }
188 : sub () { 0 };
189 }
190
191 our $DELAY_TIMER;
192 our @DELAY_QUEUE;
193
194 sub _delay_run {
195 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
196 }
197
198 sub delay($) {
199 push @DELAY_QUEUE, shift;
200 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
201 }
202
203 sub _inject {
204 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
205 &{ $PORT{+shift} or return };
206 }
207
208 # this function adds a node-ref, so you can send stuff to it
209 # it is basically the central routing component.
210 sub add_node {
211 my ($node) = @_;
212
213 length $node
214 or Carp::croak "'undef' or the empty string are not valid node/port IDs";
215
216 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
217 }
218
219 sub snd(@) {
220 my ($nodeid, $portid) = split /#/, shift, 2;
221
222 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
223
224 ($NODE{$nodeid} || add_node $nodeid)
225 ->{send} (["$portid", @_]);
226 }
227
228 =item $is_local = port_is_local $port
229
230 Returns true iff the port is a local port.
231
232 =cut
233
234 sub port_is_local($) {
235 my ($nodeid, undef) = split /#/, $_[0], 2;
236
237 $NODE{$nodeid} == $NODE{""}
238 }
239
240 =item snd_to_func $node, $func, @args
241
242 Expects a node ID and a name of a function. Asynchronously tries to call
243 this function with the given arguments on that node.
244
245 This function can be used to implement C<spawn>-like interfaces.
246
247 =cut
248
249 sub snd_to_func($$;@) {
250 my $nodeid = shift;
251
252 # on $NODE, we artificially delay... (for spawn)
253 # this is very ugly - maybe we should simply delay ALL messages,
254 # to avoid deep recursion issues. but that's so... slow...
255 $AnyEvent::MP::Node::Self::DELAY = 1
256 if $nodeid ne $NODE;
257
258 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
259 }
260
261 =item snd_on $node, @msg
262
263 Executes C<snd> with the given C<@msg> (which must include the destination
264 port) on the given node.
265
266 =cut
267
268 sub snd_on($@) {
269 my $node = shift;
270 snd $node, snd => @_;
271 }
272
273 =item eval_on $node, $string[, @reply]
274
275 Evaluates the given string as Perl expression on the given node. When
276 @reply is specified, then it is used to construct a reply message with
277 C<"$@"> and any results from the eval appended.
278
279 =cut
280
281 sub eval_on($$;@) {
282 my $node = shift;
283 snd $node, eval => @_;
284 }
285
286 sub kil(@) {
287 my ($nodeid, $portid) = split /#/, shift, 2;
288
289 length $portid
290 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
291
292 ($NODE{$nodeid} || add_node $nodeid)
293 ->kill ("$portid", @_);
294 }
295
296 #############################################################################
297 # node monitoring and info
298
299 =item node_is_up $nodeid
300
301 Returns true if the given node is "up", that is, the kernel thinks it has
302 a working connection to it.
303
304 If the node is up, returns C<1>. If the node is currently connecting or
305 otherwise known but not connected, returns C<0>. If nothing is known about
306 the node, returns C<undef>.
307
308 =cut
309
310 sub node_is_up($) {
311 ($NODE{$_[0]} or return)->{transport}
312 ? 1 : 0
313 }
314
315 =item up_nodes
316
317 Return the node IDs of all nodes that are currently connected (excluding
318 the node itself).
319
320 =cut
321
322 sub up_nodes() {
323 map $_->{id}, grep $_->{transport}, values %NODE
324 }
325
326 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
327
328 Registers a callback that is called each time a node goes up (a connection
329 is established) or down (the connection is lost).
330
331 Node up messages can only be followed by node down messages for the same
332 node, and vice versa.
333
334 Note that monitoring a node is usually better done by monitoring its node
335 port. This function is mainly of interest to modules that are concerned
336 about the network topology and low-level connection handling.
337
338 Callbacks I<must not> block and I<should not> send any messages.
339
340 The function returns an optional guard which can be used to unregister
341 the monitoring callback again.
342
343 Example: make sure you call function C<newnode> for all nodes that are up
344 or go up (and down).
345
346 newnode $_, 1 for up_nodes;
347 mon_nodes \&newnode;
348
349 =cut
350
351 our %MON_NODES;
352
353 sub mon_nodes($) {
354 my ($cb) = @_;
355
356 $MON_NODES{$cb+0} = $cb;
357
358 defined wantarray
359 and Guard::guard { delete $MON_NODES{$cb+0} }
360 }
361
362 sub _inject_nodeevent($$;@) {
363 my ($node, $up, @reason) = @_;
364
365 for my $cb (values %MON_NODES) {
366 eval { $cb->($node->{id}, $up, @reason); 1 }
367 or $WARN->(1, $@);
368 }
369
370 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
371 }
372
373 #############################################################################
374 # self node code
375
376 sub _kill {
377 my $port = shift;
378
379 delete $PORT{$port}
380 or return; # killing nonexistent ports is O.K.
381 delete $PORT_DATA{$port};
382
383 my $mon = delete $LMON{$port}
384 or !@_
385 or $WARN->(2, "unmonitored local port $port died with reason: @_");
386
387 $_->(@_) for values %$mon;
388 }
389
390 sub _monitor {
391 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
392 unless exists $PORT{$_[1]};
393
394 $LMON{$_[1]}{$_[2]+0} = $_[2];
395 }
396
397 sub _unmonitor {
398 delete $LMON{$_[1]}{$_[2]+0}
399 if exists $LMON{$_[1]};
400 }
401
402 sub _secure_check {
403 $SECURE->($SRCNODE->{id})
404 or $SRCNODE->{id} eq $NODE
405 or die "remote execution attempt by insecure node\n";
406 }
407
408 our %NODE_REQ = (
409 # internal services
410
411 # monitoring
412 mon0 => sub { # stop monitoring a port for another node
413 my $portid = shift;
414 _unmonitor undef, $portid, delete $NODE{$SRCNODE->{id}}{rmon}{$portid};
415 },
416 mon1 => sub { # start monitoring a port for another node
417 my $portid = shift;
418 Scalar::Util::weaken (my $node = $NODE{$SRCNODE->{id}});
419 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
420 delete $node->{rmon}{$portid};
421 $node->send (["", kil0 => $portid, @_])
422 if $node && $node->{transport};
423 };
424 },
425 # another node has killed a monitored port
426 kil0 => sub {
427 my $cbs = delete $NODE{$SRCNODE->{id}}{lmon}{+shift}
428 or return;
429
430 $_->(@_) for @$cbs;
431 },
432
433 # "public" services - not actually public
434
435 # another node wants to kill a local port
436 kil => \&_kill,
437
438 # is the remote node considered secure?
439 # secure => sub {
440 # #TODO#
441 # },
442
443 # relay message to another node / generic echo
444 snd => sub {
445 &_secure_check;
446 &snd
447 },
448
449 # random utilities
450 eval => sub {
451 &_secure_check;
452 my @res = do { package main; eval shift };
453 snd @_, "$@", @res if @_;
454 },
455 time => sub {
456 &_secure_check;
457 snd @_, AE::now;
458 },
459 devnull => sub {
460 #
461 },
462 "" => sub {
463 # empty messages are keepalives or similar devnull-applications
464 },
465 );
466
467 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
468 $PORT{""} = sub {
469 my $tag = shift;
470 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
471 $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@;
472 };
473
474 our $NPROTO = 1;
475
476 # tell everybody who connects our nproto
477 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
478 $_[0]{local_greeting}{nproto} = $NPROTO;
479 };
480
481 #############################################################################
482 # seed management, try to keep connections to all seeds at all times
483
484 our %SEED_NODE; # seed ID => node ID|undef
485 our %NODE_SEED; # map node ID to seed ID
486 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
487 our $SEED_WATCHER;
488 our $SEED_RETRY;
489
490 sub seed_connect {
491 my ($seed) = @_;
492
493 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
494 or Carp::croak "$seed: unparsable seed address";
495
496 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
497
498 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
499 $host, $port,
500 on_greeted => sub {
501 # called after receiving remote greeting, learn remote node name
502
503 # we rely on untrusted data here (the remote node name) this is
504 # hopefully ok, as this can at most be used for DOSing, which is easy
505 # when you can do MITM anyway.
506
507 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
508 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
509 require AnyEvent::MP::Global; # every seed becomes a global node currently
510 delete $SEED_NODE{$seed};
511 } else {
512 $SEED_NODE{$seed} = $_[0]{remote_node};
513 $NODE_SEED{$_[0]{remote_node}} = $seed;
514 # also start global service, if not running
515 # we need to check here in addition to the mon_nodes below
516 # because we might only learn late that a node is a seed
517 # and then we might already be connected
518 snd $_[0]{remote_node}, "g_slave"
519 unless $_[0]{remote_greeting}{global};
520 }
521 },
522 sub {
523 delete $SEED_CONNECT{$seed};
524 }
525 ;
526 }
527
528 sub seed_all {
529 my @seeds = grep
530 !exists $SEED_CONNECT{$_}
531 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
532 keys %SEED_NODE;
533
534 if (@seeds) {
535 # start connection attempt for every seed we are not connected to yet
536 seed_connect $_
537 for @seeds;
538
539 $SEED_RETRY = $SEED_RETRY * 2 + rand;
540 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
541 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
542
543 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
544
545 } else {
546 # all seeds connected or connecting, no need to restart timer
547 undef $SEED_WATCHER;
548 }
549 }
550
551 sub seed_again {
552 $SEED_RETRY = 1;
553 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
554 }
555
556 # sets new seed list, starts connecting
557 sub set_seeds(@) {
558 %SEED_NODE = ();
559 %NODE_SEED = ();
560 %SEED_CONNECT = ();
561
562 @SEED_NODE{@_} = ();
563
564 seed_all;
565 }
566
567 mon_nodes sub {
568 return unless exists $NODE_SEED{$_[0]};
569
570 if ($_[1]) {
571 # each time a connection to a seed node goes up, make
572 # sure it runs the global service.
573 snd $_[0], "g_slave"
574 unless $NODE{$_[0]}{transport}{remote_greeting}{global};
575 } else {
576 # if we lost the connection to a seed node, make sure we are seeding
577 seed_again;
578 }
579 };
580
581 #############################################################################
582 # talk with/to global nodes
583
584 # protocol messages:
585 #
586 # sent by all slave nodes (slave to master)
587 # g_slave database - make other global node master of the sender
588 #
589 # sent by any node to global nodes
590 # g_set database - set whole database
591 # g_upd family set del - update single family
592 # g_del family key - delete key from database
593 # g_get family key reply... - send reply with data
594 #
595 # send by global nodes
596 # g_global - node became global, similar to global=1 greeting
597 #
598 # database families
599 # "'l" -> node -> listeners
600 # "'g" -> node -> undef
601 # ...
602 #
603
604 # used on all nodes:
605 our $MASTER; # the global node we bind ourselves to
606 our $MASTER_MON;
607 our %LOCAL_DB; # this node database
608
609 our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
610
611 our $GPROTO = 1;
612
613 # tell everybody who connects our nproto
614 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
615 $_[0]{local_greeting}{gproto} = $GPROTO;
616 };
617
618 #############################################################################
619 # master selection
620
621 # master requests
622 our %GLOBAL_REQ; # $id => \@req
623
624 sub global_req_add {
625 my ($id, $req) = @_;
626
627 return if exists $GLOBAL_REQ{$id};
628
629 $GLOBAL_REQ{$id} = $req;
630
631 snd $MASTER, @$req
632 if $MASTER;
633 }
634
635 sub global_req_del {
636 delete $GLOBAL_REQ{$_[0]};
637 }
638
639 #################################
640 # master rpc
641
642 our %GLOBAL_RES;
643 our $GLOBAL_RES_ID = "a";
644
645 sub global_call {
646 my $id = ++$GLOBAL_RES_ID;
647 $GLOBAL_RES{$id} = pop;
648 global_req_add $id, [@_, $id];
649 }
650
651 $NODE_REQ{g_reply} = sub {
652 my $id = shift;
653 global_req_del $id;
654 my $cb = delete $GLOBAL_RES{$id}
655 or return;
656 &$cb
657 };
658
659 #################################
660
661 sub g_find {
662 global_req_add "g_find $_[0]", [g_find => $_[0]];
663 }
664
665 # reply for g_find started in Node.pm
666 $NODE_REQ{g_found} = sub {
667 global_req_del "g_find $_[0]";
668
669 my $node = $NODE{$_[0]} or return;
670
671 $node->connect_to ($_[1]);
672 };
673
674 sub master_set {
675 $MASTER = $_[0];
676
677 snd $MASTER, g_slave => \%LOCAL_DB;
678
679 # (re-)send queued requests
680 snd $MASTER, @$_
681 for values %GLOBAL_REQ;
682 }
683
684 sub master_search {
685 #TODO: should also look for other global nodes, but we don't know them #d#
686 for (keys %NODE_SEED) {
687 if (node_is_up $_) {
688 master_set $_;
689 return;
690 }
691 }
692
693 $MASTER_MON = mon_nodes sub {
694 return unless $_[1]; # we are only interested in node-ups
695 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
696
697 master_set $_[0];
698
699 $MASTER_MON = mon_nodes sub {
700 if ($_[0] eq $MASTER && !$_[1]) {
701 undef $MASTER;
702 master_search ();
703 }
704 };
705 };
706 }
707
708 # other node wants to make us the master
709 $NODE_REQ{g_slave} = sub {
710 my ($db) = @_;
711
712 # load global module and redo the request
713 require AnyEvent::MP::Global;
714 &{ $NODE_REQ{g_slave} }
715 };
716
717 #############################################################################
718 # local database operations
719
720 # local database management
721
722 sub db_set($$;$) {
723 # if (ref $_[1]) {
724 # # bulk
725 # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
726 # $LOCAL_DB{$_[0]} = $_[1];
727 # snd $MASTER, g_upd => $_[0] => $_[1], \@del
728 # if defined $MASTER;
729 # } else {
730 # single-key
731 $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
732 snd $MASTER, g_upd => $_[0] => { $_[1] => $_[2] }
733 if defined $MASTER;
734 # }
735 }
736
737 sub db_del($@) {
738 my $family = shift;
739
740 delete @{ $LOCAL_DB{$family} }{@_};
741 snd $MASTER, g_upd => $family => undef, \@_
742 if defined $MASTER;
743 }
744
745 sub db_reg($$;$) {
746 my ($family, $key) = @_;
747 &db_set;
748 Guard::guard { db_del $family => $key }
749 }
750
751 # database query
752
753 sub db_family {
754 my ($family, $cb) = @_;
755 global_call g_db_family => $family, $cb;
756 }
757
758 sub db_keys {
759 my ($family, $cb) = @_;
760 global_call g_db_keys => $family, $cb;
761 }
762
763 sub db_values {
764 my ($family, $cb) = @_;
765 global_call g_db_values => $family, $cb;
766 }
767
768 # database monitoring
769
770 our %LOCAL_MON; # f, reply
771 our %MON_DB; # f, k, value
772
773 sub db_mon($@) {
774 my ($family, $cb) = @_;
775
776 if (my $db = $MON_DB{$family}) {
777 # we already monitor, so create a "dummy" change event
778 # this is postponed, which might be too late (we could process
779 # change events), so disable the callback at first
780 $LOCAL_MON{$family}{$cb+0} = sub { };
781 AE::postpone {
782 return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
783
784 # set actual callback
785 $LOCAL_MON{$family}{$cb+0} = $cb;
786 $cb->($db, [keys %$db]);
787 };
788 } else {
789 # new monitor, request chg1 from upstream
790 $LOCAL_MON{$family}{$cb+0} = $cb;
791 global_req_add "mon1 $family" => [g_mon1 => $family];
792 $MON_DB{$family} = {};
793 }
794
795 defined wantarray
796 and Guard::guard {
797 my $mon = $LOCAL_MON{$family};
798 delete $mon->{$cb+0};
799
800 unless (%$mon) {
801 global_req_del "mon1 $family";
802
803 # no global_req, because we don't care if we are not connected
804 snd $MASTER, g_mon0 => $family
805 if $MASTER;
806
807 delete $LOCAL_MON{$family};
808 delete $MON_DB{$family};
809 }
810 }
811 }
812
813 # full update
814 $NODE_REQ{g_chg1} = sub {
815 my ($f, $ndb) = @_;
816
817 my $db = $MON_DB{$f};
818 my (@a, @c, @d);
819
820 # add or replace keys
821 while (my ($k, $v) = each %$ndb) {
822 exists $db->{$k}
823 ? push @c, $k
824 : push @a, $k;
825 $db->{$k} = $v;
826 }
827
828 # delete keys that are no longer present
829 for (grep !exists $ndb->{$_}, keys %$db) {
830 delete $db->{$_};
831 push @d, $_;
832 }
833
834 $_->($db, \@a, \@c, \@d)
835 for values %{ $LOCAL_MON{$_[0]} };
836 };
837
838 # incremental update
839 $NODE_REQ{g_chg2} = sub {
840 my ($family, $set, $del) = @_;
841
842 my $db = $MON_DB{$family};
843
844 my (@a, @c);
845
846 while (my ($k, $v) = each %$set) {
847 exists $db->{$k}
848 ? push @c, $k
849 : push @a, $k;
850 $db->{$k} = $v;
851 }
852
853 delete @$db{@$del};
854
855 $_->($db, \@a, \@c, $del)
856 for values %{ $LOCAL_MON{$family} };
857 };
858
859 #############################################################################
860 # configure
861
862 sub nodename {
863 require POSIX;
864 (POSIX::uname ())[1]
865 }
866
867 sub _resolve($) {
868 my ($nodeid) = @_;
869
870 my $cv = AE::cv;
871 my @res;
872
873 $cv->begin (sub {
874 my %seen;
875 my @refs;
876 for (sort { $a->[0] <=> $b->[0] } @res) {
877 push @refs, $_->[1] unless $seen{$_->[1]}++
878 }
879 shift->send (@refs);
880 });
881
882 my $idx;
883 for my $t (split /,/, $nodeid) {
884 my $pri = ++$idx;
885
886 $t = length $t ? nodename . ":$t" : nodename
887 if $t =~ /^\d*$/;
888
889 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
890 or Carp::croak "$t: unparsable transport descriptor";
891
892 $port = "0" if $port eq "*";
893
894 if ($host eq "*") {
895 $cv->begin;
896 # use fork_call, as Net::Interface is big, and we need it rarely.
897 require AnyEvent::Util;
898 AnyEvent::Util::fork_call (
899 sub {
900 my @addr;
901
902 require Net::Interface;
903
904 for my $if (Net::Interface->interfaces) {
905 # we statically lower-prioritise ipv6 here, TODO :()
906 for $_ ($if->address (Net::Interface::AF_INET ())) {
907 next if /^\x7f/; # skip localhost etc.
908 push @addr, $_;
909 }
910 for ($if->address (Net::Interface::AF_INET6 ())) {
911 #next if $if->scope ($_) <= 2;
912 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
913 push @addr, $_;
914 }
915
916 }
917 @addr
918 }, sub {
919 for my $ip (@_) {
920 push @res, [
921 $pri += 1e-5,
922 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
923 ];
924 }
925 $cv->end;
926 }
927 );
928 } else {
929 $cv->begin;
930 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
931 for (@_) {
932 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
933 push @res, [
934 $pri += 1e-5,
935 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
936 ];
937 }
938 $cv->end;
939 };
940 }
941 }
942
943 $cv->end;
944
945 $cv
946 }
947
948 sub configure(@) {
949 unshift @_, "profile" if @_ & 1;
950 my (%kv) = @_;
951
952 delete $NODE{$NODE}; # we do not support doing stuff before configure
953 _init_names;
954
955 my $profile = delete $kv{profile};
956
957 $profile = nodename
958 unless defined $profile;
959
960 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
961
962 if (exists $CONFIG->{secure}) {
963 my $pass = !$CONFIG->{secure};
964 $SECURE = sub { $pass };
965 }
966
967 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
968
969 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
970
971 $NODE = $node;
972
973 $NODE =~ s/%n/nodename/ge;
974
975 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
976 # nodes with randomised node names do not need randomised port names
977 $UNIQ = "";
978 }
979
980 $NODE{$NODE} = $NODE{""};
981 $NODE{$NODE}{id} = $NODE;
982
983 my $seeds = $CONFIG->{seeds};
984 my $binds = $CONFIG->{binds};
985
986 $binds ||= ["*"];
987
988 $WARN->(8, "node $NODE starting up.");
989
990 $BINDS = [];
991 %BINDS = ();
992
993 for (map _resolve $_, @$binds) {
994 for my $bind ($_->recv) {
995 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
996 or Carp::croak "$bind: unparsable local bind address";
997
998 my $listener = AnyEvent::MP::Transport::mp_server
999 $host,
1000 $port,
1001 prepare => sub {
1002 my (undef, $host, $port) = @_;
1003 $bind = AnyEvent::Socket::format_hostport $host, $port;
1004 0
1005 },
1006 ;
1007 $BINDS{$bind} = $listener;
1008 push @$BINDS, $bind;
1009 }
1010 }
1011
1012 db_set "'l" => $NODE => $BINDS;
1013
1014 $WARN->(8, "node listens on [@$BINDS].");
1015
1016 # connect to all seednodes
1017 set_seeds map $_->recv, map _resolve $_, @$seeds;
1018
1019 master_search;
1020
1021 for (@{ $CONFIG->{services} }) {
1022 if (ref) {
1023 my ($func, @args) = @$_;
1024 (load_func $func)->(@args);
1025 } elsif (s/::$//) {
1026 eval "require $_";
1027 die $@ if $@;
1028 } else {
1029 (load_func $_)->();
1030 }
1031 }
1032 }
1033
1034 =back
1035
1036 =head1 SEE ALSO
1037
1038 L<AnyEvent::MP>.
1039
1040 =head1 AUTHOR
1041
1042 Marc Lehmann <schmorp@schmorp.de>
1043 http://home.schmorp.de/
1044
1045 =cut
1046
1047 1
1048