ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.106
Committed: Fri Mar 23 21:16:25 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.105: +58 -46 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 $AnyEvent::MP::Kernel::SRCNODE # contains msg origin node id, for debugging
10
11 snd_to_func $node, $func, @args # send msg to function
12 snd_on $node, @msg # snd message again (relay)
13 eval_on $node, $string[, @reply] # execute perl code on another node
14
15 node_is_up $nodeid # return true if a node is connected
16 @nodes = up_nodes # return a list of all connected nodes
17 $guard = mon_nodes $callback->($node, $is_up, @reason) # connections up/downs
18
19 =head1 DESCRIPTION
20
21 This module implements most of the inner workings of AnyEvent::MP. It
22 offers mostly lower-level functions that deal with network connectivity
23 and special requests.
24
25 You normally interface with AnyEvent::MP through a higher level interface
26 such as L<AnyEvent::MP> and L<Coro::MP>, although there is nothing wrong
27 with using the functions from this module.
28
29 =head1 GLOBALS AND FUNCTIONS
30
31 =over 4
32
33 =cut
34
35 package AnyEvent::MP::Kernel;
36
37 use common::sense;
38 use Carp ();
39
40 use AnyEvent ();
41 use Guard ();
42
43 use AnyEvent::MP::Node;
44 use AnyEvent::MP::Transport;
45
46 use base "Exporter";
47
48 # for re-export in AnyEvent::MP and Coro::MP
49 our @EXPORT_API = qw(
50 NODE $NODE
51 configure
52 node_of port_is_local
53 snd kil
54 db_set db_del
55 db_mon db_family db_keys db_values
56 );
57
58 our @EXPORT_OK = (
59 # these are internal
60 qw(
61 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
62 add_node load_func
63 ),
64 @EXPORT_API,
65 );
66
67 our @EXPORT = qw(
68 snd_to_func snd_on eval_on
69 port_is_local
70 up_nodes mon_nodes node_is_up
71 );
72
73 sub load_func($) {
74 my $func = $_[0];
75
76 unless (defined &$func) {
77 my $pkg = $func;
78 do {
79 $pkg =~ s/::[^:]+$//
80 or return sub { die "unable to resolve function '$func'" };
81
82 local $@;
83 unless (eval "require $pkg; 1") {
84 my $error = $@;
85 $error =~ /^Can't locate .*.pm in \@INC \(/
86 or return sub { die $error };
87 }
88 } until defined &$func;
89 }
90
91 \&$func
92 }
93
94 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
95
96 sub nonce($) {
97 join "", map chr rand 256, 1 .. $_[0]
98 }
99
100 sub nonce62($) {
101 join "", map $alnum[rand 62], 1 .. $_[0]
102 }
103
104 our $CONFIG; # this node's configuration
105 our $SECURE;
106
107 our $RUNIQ; # remote uniq value
108 our $UNIQ; # per-process/node unique cookie
109 our $NODE;
110 our $ID = "a";
111
112 our %NODE; # node id to transport mapping, or "undef", for local node
113 our (%PORT, %PORT_DATA); # local ports
114
115 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
116 our %LMON; # monitored _local_ ports
117
118 our $GLOBAL; # true if node is a global ("directory") node
119 our %BINDS;
120 our $BINDS; # our listeners, as arrayref
121
122 our $SRCNODE; # holds the sending node _object_ during _inject
123
124 # initialise names for non-networked operation
125 {
126 # ~54 bits, for local port names, lowercase $ID appended
127 my $now = AE::now;
128 $UNIQ =
129 (join "",
130 map $alnum[$_],
131 $$ / 62 % 62,
132 $$ % 62,
133 (int $now ) % 62,
134 (int $now * 100) % 62,
135 (int $now * 10000) % 62,
136 ) . nonce62 4
137 ;
138
139 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
140 $RUNIQ = nonce62 10;
141 $RUNIQ =~ s/(.)$/\U$1/;
142
143 $NODE = "";
144 }
145
146 sub NODE() {
147 $NODE
148 }
149
150 sub node_of($) {
151 my ($node, undef) = split /#/, $_[0], 2;
152
153 $node
154 }
155
156 BEGIN {
157 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
158 ? sub () { 1 }
159 : sub () { 0 };
160 }
161
162 our $DELAY_TIMER;
163 our @DELAY_QUEUE;
164
165 our $delay_run = sub {
166 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
167 };
168
169 sub delay($) {
170 push @DELAY_QUEUE, shift;
171 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
172 }
173
174 =item $AnyEvent::MP::Kernel::SRCNODE
175
176 During execution of a message callback, this variable contains the node ID
177 of the origin node.
178
179 The main use of this variable is for debugging output - there are probably
180 very few other cases where you need to know the source node ID.
181
182 =cut
183
184 sub _inject {
185 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
186 &{ $PORT{+shift} or return };
187 }
188
189 # this function adds a node-ref, so you can send stuff to it
190 # it is basically the central routing component.
191 sub add_node {
192 my ($node) = @_;
193
194 length $node
195 or Carp::croak "'undef' or the empty string are not valid node/port IDs";
196
197 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
198 }
199
200 sub snd(@) {
201 my ($nodeid, $portid) = split /#/, shift, 2;
202
203 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
204
205 ($NODE{$nodeid} || add_node $nodeid)
206 ->{send} (["$portid", @_]);
207 }
208
209 sub port_is_local($) {
210 my ($nodeid, undef) = split /#/, $_[0], 2;
211
212 $nodeid eq $NODE
213 }
214
215 =item snd_to_func $node, $func, @args
216
217 Expects a node ID and a name of a function. Asynchronously tries to call
218 this function with the given arguments on that node.
219
220 This function can be used to implement C<spawn>-like interfaces.
221
222 =cut
223
224 sub snd_to_func($$;@) {
225 my $nodeid = shift;
226
227 # on $NODE, we artificially delay... (for spawn)
228 # this is very ugly - maybe we should simply delay ALL messages,
229 # to avoid deep recursion issues. but that's so... slow...
230 $AnyEvent::MP::Node::Self::DELAY = 1
231 if $nodeid ne $NODE;
232
233 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
234 }
235
236 =item snd_on $node, @msg
237
238 Executes C<snd> with the given C<@msg> (which must include the destination
239 port) on the given node.
240
241 =cut
242
243 sub snd_on($@) {
244 my $node = shift;
245 snd $node, snd => @_;
246 }
247
248 =item eval_on $node, $string[, @reply]
249
250 Evaluates the given string as Perl expression on the given node. When
251 @reply is specified, then it is used to construct a reply message with
252 C<"$@"> and any results from the eval appended.
253
254 =cut
255
256 sub eval_on($$;@) {
257 my $node = shift;
258 snd $node, eval => @_;
259 }
260
261 sub kil(@) {
262 my ($nodeid, $portid) = split /#/, shift, 2;
263
264 length $portid
265 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
266
267 ($NODE{$nodeid} || add_node $nodeid)
268 ->kill ("$portid", @_);
269 }
270
271 #############################################################################
272 # node monitoring and info
273
274 =item $bool = node_is_up $nodeid
275
276 Returns true if the given node is "up", that is, the kernel thinks it has
277 a working connection to it.
278
279 If the node is up, returns C<1>. If the node is currently connecting or
280 otherwise known but not connected, returns C<0>. If nothing is known about
281 the node, returns C<undef>.
282
283 =cut
284
285 sub node_is_up($) {
286 ($NODE{$_[0]} or return)->{transport}
287 ? 1 : 0
288 }
289
290 =item @nodes = up_nodes
291
292 Return the node IDs of all nodes that are currently connected (excluding
293 the node itself).
294
295 =cut
296
297 sub up_nodes() {
298 map $_->{id}, grep $_->{transport}, values %NODE
299 }
300
301 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
302
303 Registers a callback that is called each time a node goes up (a connection
304 is established) or down (the connection is lost).
305
306 Node up messages can only be followed by node down messages for the same
307 node, and vice versa.
308
309 Note that monitoring a node is usually better done by monitoring its node
310 port. This function is mainly of interest to modules that are concerned
311 about the network topology and low-level connection handling.
312
313 Callbacks I<must not> block and I<should not> send any messages.
314
315 The function returns an optional guard which can be used to unregister
316 the monitoring callback again.
317
318 Example: make sure you call function C<newnode> for all nodes that are up
319 or go up (and down).
320
321 newnode $_, 1 for up_nodes;
322 mon_nodes \&newnode;
323
324 =cut
325
326 our %MON_NODES;
327
328 sub mon_nodes($) {
329 my ($cb) = @_;
330
331 $MON_NODES{$cb+0} = $cb;
332
333 defined wantarray
334 and Guard::guard { delete $MON_NODES{$cb+0} }
335 }
336
337 sub _inject_nodeevent($$;@) {
338 my ($node, $up, @reason) = @_;
339
340 for my $cb (values %MON_NODES) {
341 eval { $cb->($node->{id}, $up, @reason); 1 }
342 or AE::log die => $@;
343 }
344
345 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
346 }
347
348 #############################################################################
349 # self node code
350
351 sub _kill {
352 my $port = shift;
353
354 delete $PORT{$port}
355 or return; # killing nonexistent ports is O.K.
356 delete $PORT_DATA{$port};
357
358 my $mon = delete $LMON{$port}
359 or !@_
360 or AE::log die => "unmonitored local port $port died with reason: @_";
361
362 $_->(@_) for values %$mon;
363 }
364
365 sub _monitor {
366 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
367 unless exists $PORT{$_[1]};
368
369 $LMON{$_[1]}{$_[2]+0} = $_[2];
370 }
371
372 sub _unmonitor {
373 delete $LMON{$_[1]}{$_[2]+0}
374 if exists $LMON{$_[1]};
375 }
376
377 sub _secure_check {
378 $SECURE
379 and die "remote execution not allowed\n";
380 }
381
382 our %NODE_REQ = (
383 # internal services
384
385 # monitoring
386 mon0 => sub { # stop monitoring a port for another node
387 my $portid = shift;
388 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
389 },
390 mon1 => sub { # start monitoring a port for another node
391 my $portid = shift;
392 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
393 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
394 delete $node->{rmon}{$portid};
395 $node->send (["", kil0 => $portid, @_])
396 if $node && $node->{transport};
397 };
398 },
399 # another node has killed a monitored port
400 kil0 => sub {
401 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
402 or return;
403
404 $_->(@_) for @$cbs;
405 },
406
407 # "public" services - not actually public
408
409 # another node wants to kill a local port
410 kil => \&_kill,
411
412 # relay message to another node / generic echo
413 snd => sub {
414 &_secure_check;
415 &snd
416 },
417
418 # random utilities
419 eval => sub {
420 &_secure_check;
421 my @res = do { package main; eval shift };
422 snd @_, "$@", @res if @_;
423 },
424 time => sub {
425 &_secure_check;
426 snd @_, AE::now;
427 },
428 devnull => sub {
429 #
430 },
431 "" => sub {
432 # empty messages are keepalives or similar devnull-applications
433 },
434 );
435
436 # the node port
437 $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
438 $PORT{""} = sub {
439 my $tag = shift;
440 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
441 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
442 };
443
444 our $NPROTO = 1;
445
446 # tell everybody who connects our nproto
447 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
448 $_[0]{local_greeting}{nproto} = $NPROTO;
449 };
450
451 #############################################################################
452 # seed management, try to keep connections to all seeds at all times
453
454 our %SEED_NODE; # seed ID => node ID|undef
455 our %NODE_SEED; # map node ID to seed ID
456 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
457 our $SEED_WATCHER;
458 our $SEED_RETRY;
459
460 sub seed_connect {
461 my ($seed) = @_;
462
463 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
464 or Carp::croak "$seed: unparsable seed address";
465
466 AE::log 9 => "trying connect to seed node $seed.";
467
468 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
469 $host, $port,
470 on_greeted => sub {
471 # called after receiving remote greeting, learn remote node name
472
473 # we rely on untrusted data here (the remote node name) this is
474 # hopefully ok, as this can at most be used for DOSing, which is easy
475 # when you can do MITM anyway.
476
477 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
478 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
479 require AnyEvent::MP::Global; # every seed becomes a global node currently
480 delete $SEED_NODE{$seed};
481 } else {
482 $SEED_NODE{$seed} = $_[0]{remote_node};
483 $NODE_SEED{$_[0]{remote_node}} = $seed;
484 # also start global service, if not running
485 # we need to check here in addition to the mon_nodes below
486 # because we might only learn late that a node is a seed
487 # and then we might already be connected
488 snd $_[0]{remote_node}, "g_slave"
489 unless $_[0]{remote_greeting}{global};
490 }
491 },
492 sub {
493 delete $SEED_CONNECT{$seed};
494 }
495 ;
496 }
497
498 sub seed_all {
499 my @seeds = grep
500 !exists $SEED_CONNECT{$_}
501 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
502 keys %SEED_NODE;
503
504 if (@seeds) {
505 # start connection attempt for every seed we are not connected to yet
506 seed_connect $_
507 for @seeds;
508
509 $SEED_RETRY = $SEED_RETRY * 2 + rand;
510 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
511 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
512
513 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
514
515 } else {
516 # all seeds connected or connecting, no need to restart timer
517 undef $SEED_WATCHER;
518 }
519 }
520
521 sub seed_again {
522 $SEED_RETRY = 1;
523 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
524 }
525
526 # sets new seed list, starts connecting
527 sub set_seeds(@) {
528 %SEED_NODE = ();
529 %NODE_SEED = ();
530 %SEED_CONNECT = ();
531
532 @SEED_NODE{@_} = ();
533
534 seed_all;
535 }
536
537 mon_nodes sub {
538 return unless exists $NODE_SEED{$_[0]};
539
540 if ($_[1]) {
541 # each time a connection to a seed node goes up, make
542 # sure it runs the global service.
543 snd $_[0], "g_slave"
544 unless $NODE{$_[0]}{transport}{remote_greeting}{global};
545 } else {
546 # if we lost the connection to a seed node, make sure we are seeding
547 seed_again;
548 }
549 };
550
551 #############################################################################
552 # talk with/to global nodes
553
554 # protocol messages:
555 #
556 # sent by all slave nodes (slave to master)
557 # g_slave database - make other global node master of the sender
558 #
559 # sent by any node to global nodes
560 # g_set database - set whole database
561 # g_upd family set del - update single family
562 # g_del family key - delete key from database
563 # g_get family key reply... - send reply with data
564 #
565 # send by global nodes
566 # g_global - node became global, similar to global=1 greeting
567 #
568 # database families
569 # "'l" -> node -> listeners
570 # "'g" -> node -> undef
571 # ...
572 #
573
574 # used on all nodes:
575 our $MASTER; # the global node we bind ourselves to
576 our $MASTER_MON;
577 our %LOCAL_DB; # this node database
578
579 our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
580
581 our $GPROTO = 1;
582
583 # tell everybody who connects our nproto
584 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
585 $_[0]{local_greeting}{gproto} = $GPROTO;
586 };
587
588 #############################################################################
589 # master selection
590
591 # master requests
592 our %GLOBAL_REQ; # $id => \@req
593
594 sub global_req_add {
595 my ($id, $req) = @_;
596
597 return if exists $GLOBAL_REQ{$id};
598
599 $GLOBAL_REQ{$id} = $req;
600
601 snd $MASTER, @$req
602 if $MASTER;
603 }
604
605 sub global_req_del {
606 delete $GLOBAL_REQ{$_[0]};
607 }
608
609 #################################
610 # master rpc
611
612 our %GLOBAL_RES;
613 our $GLOBAL_RES_ID = "a";
614
615 sub global_call {
616 my $id = ++$GLOBAL_RES_ID;
617 $GLOBAL_RES{$id} = pop;
618 global_req_add $id, [@_, $id];
619 }
620
621 $NODE_REQ{g_reply} = sub {
622 my $id = shift;
623 global_req_del $id;
624 my $cb = delete $GLOBAL_RES{$id}
625 or return;
626 &$cb
627 };
628
629 #################################
630
631 sub g_find {
632 global_req_add "g_find $_[0]", [g_find => $_[0]];
633 }
634
635 # reply for g_find started in Node.pm
636 $NODE_REQ{g_found} = sub {
637 global_req_del "g_find $_[0]";
638
639 my $node = $NODE{$_[0]} or return;
640
641 $node->connect_to ($_[1]);
642 };
643
644 sub master_set {
645 $MASTER = $_[0];
646
647 snd $MASTER, g_slave => \%LOCAL_DB;
648
649 # (re-)send queued requests
650 snd $MASTER, @$_
651 for values %GLOBAL_REQ;
652 }
653
654 sub master_search {
655 #TODO: should also look for other global nodes, but we don't know them #d#
656 for (keys %NODE_SEED) {
657 if (node_is_up $_) {
658 master_set $_;
659 return;
660 }
661 }
662
663 $MASTER_MON = mon_nodes sub {
664 return unless $_[1]; # we are only interested in node-ups
665 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
666
667 master_set $_[0];
668
669 $MASTER_MON = mon_nodes sub {
670 if ($_[0] eq $MASTER && !$_[1]) {
671 undef $MASTER;
672 master_search ();
673 }
674 };
675 };
676 }
677
678 # other node wants to make us the master
679 $NODE_REQ{g_slave} = sub {
680 my ($db) = @_;
681
682 # load global module and redo the request
683 require AnyEvent::MP::Global;
684 &{ $NODE_REQ{g_slave} }
685 };
686
687 #############################################################################
688 # local database operations
689
690 # local database management
691
692 sub db_set($$;$) {
693 my ($family, $subkey) = @_;
694
695 # if (ref $_[1]) {
696 # # bulk
697 # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
698 # $LOCAL_DB{$_[0]} = $_[1];
699 # snd $MASTER, g_upd => $_[0] => $_[1], \@del
700 # if defined $MASTER;
701 # } else {
702 # single-key
703 $LOCAL_DB{$family}{$subkey} = $_[2];
704 snd $MASTER, g_upd => $family => { $subkey => $_[2] }
705 if defined $MASTER;
706 # }
707
708 defined wantarray
709 and Guard::guard { db_del $family => $subkey }
710 }
711
712 sub db_del($@) {
713 my $family = shift;
714
715 delete @{ $LOCAL_DB{$family} }{@_};
716 snd $MASTER, g_upd => $family => undef, \@_
717 if defined $MASTER;
718 }
719
720 # database query
721
722 sub db_family {
723 my ($family, $cb) = @_;
724 global_call g_db_family => $family, $cb;
725 }
726
727 sub db_keys {
728 my ($family, $cb) = @_;
729 global_call g_db_keys => $family, $cb;
730 }
731
732 sub db_values {
733 my ($family, $cb) = @_;
734 global_call g_db_values => $family, $cb;
735 }
736
737 # database monitoring
738
739 our %LOCAL_MON; # f, reply
740 our %MON_DB; # f, k, value
741
742 sub db_mon($@) {
743 my ($family, $cb) = @_;
744
745 if (my $db = $MON_DB{$family}) {
746 # we already monitor, so create a "dummy" change event
747 # this is postponed, which might be too late (we could process
748 # change events), so disable the callback at first
749 $LOCAL_MON{$family}{$cb+0} = sub { };
750 AE::postpone {
751 return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
752
753 # set actual callback
754 $LOCAL_MON{$family}{$cb+0} = $cb;
755 $cb->($db, [keys %$db]);
756 };
757 } else {
758 # new monitor, request chg1 from upstream
759 $LOCAL_MON{$family}{$cb+0} = $cb;
760 global_req_add "mon1 $family" => [g_mon1 => $family];
761 $MON_DB{$family} = {};
762 }
763
764 defined wantarray
765 and Guard::guard {
766 my $mon = $LOCAL_MON{$family};
767 delete $mon->{$cb+0};
768
769 unless (%$mon) {
770 global_req_del "mon1 $family";
771
772 # no global_req, because we don't care if we are not connected
773 snd $MASTER, g_mon0 => $family
774 if $MASTER;
775
776 delete $LOCAL_MON{$family};
777 delete $MON_DB{$family};
778 }
779 }
780 }
781
782 # full update
783 $NODE_REQ{g_chg1} = sub {
784 return unless $SRCNODE eq $MASTER;
785 my ($f, $ndb) = @_;
786
787 my $db = $MON_DB{$f};
788 my (@a, @c, @d);
789
790 # add or replace keys
791 while (my ($k, $v) = each %$ndb) {
792 exists $db->{$k}
793 ? push @c, $k
794 : push @a, $k;
795 $db->{$k} = $v;
796 }
797
798 # delete keys that are no longer present
799 for (grep !exists $ndb->{$_}, keys %$db) {
800 delete $db->{$_};
801 push @d, $_;
802 }
803
804 $_->($db, \@a, \@c, \@d)
805 for values %{ $LOCAL_MON{$_[0]} };
806 };
807
808 # incremental update
809 $NODE_REQ{g_chg2} = sub {
810 return unless $SRCNODE eq $MASTER;
811 my ($family, $set, $del) = @_;
812
813 my $db = $MON_DB{$family};
814
815 my (@a, @c);
816
817 while (my ($k, $v) = each %$set) {
818 exists $db->{$k}
819 ? push @c, $k
820 : push @a, $k;
821 $db->{$k} = $v;
822 }
823
824 delete @$db{@$del};
825
826 $_->($db, \@a, \@c, $del)
827 for values %{ $LOCAL_MON{$family} };
828 };
829
830 #############################################################################
831 # configure
832
833 sub nodename {
834 require POSIX;
835 (POSIX::uname ())[1]
836 }
837
838 sub _resolve($) {
839 my ($nodeid) = @_;
840
841 my $cv = AE::cv;
842 my @res;
843
844 $cv->begin (sub {
845 my %seen;
846 my @refs;
847 for (sort { $a->[0] <=> $b->[0] } @res) {
848 push @refs, $_->[1] unless $seen{$_->[1]}++
849 }
850 shift->send (@refs);
851 });
852
853 my $idx;
854 for my $t (split /,/, $nodeid) {
855 my $pri = ++$idx;
856
857 $t = length $t ? nodename . ":$t" : nodename
858 if $t =~ /^\d*$/;
859
860 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
861 or Carp::croak "$t: unparsable transport descriptor";
862
863 $port = "0" if $port eq "*";
864
865 if ($host eq "*") {
866 $cv->begin;
867
868 my $get_addr = sub {
869 my @addr;
870
871 require Net::Interface;
872
873 # Net::Interface hangs on some systems, so hope for the best
874 local $SIG{ALRM} = 'DEFAULT';
875 alarm 2;
876
877 for my $if (Net::Interface->interfaces) {
878 # we statically lower-prioritise ipv6 here, TODO :()
879 for $_ ($if->address (Net::Interface::AF_INET ())) {
880 next if /^\x7f/; # skip localhost etc.
881 push @addr, $_;
882 }
883 for ($if->address (Net::Interface::AF_INET6 ())) {
884 #next if $if->scope ($_) <= 2;
885 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
886 push @addr, $_;
887 }
888 }
889
890 alarm 0;
891
892 @addr
893 };
894
895 my @addr;
896
897 if (AnyEvent::WIN32) {
898 @addr = $get_addr->();
899 } else {
900 # use a child process, as Net::Interface is big, and we need it only once.
901
902 pipe my $r, my $w
903 or die "pipe: $!";
904
905 if (fork eq 0) {
906 close $r;
907 syswrite $w, pack "(C/a*)*", $get_addr->();
908 require POSIX;
909 POSIX::_exit (0);
910 } else {
911 close $w;
912
913 my $addr;
914
915 1 while sysread $r, $addr, 1024, length $addr;
916
917 @addr = unpack "(C/a*)*", $addr;
918 }
919 }
920
921 for my $ip (@addr) {
922 push @res, [
923 $pri += 1e-5,
924 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
925 ];
926 }
927 $cv->end;
928 } else {
929 $cv->begin;
930 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
931 for (@_) {
932 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
933 push @res, [
934 $pri += 1e-5,
935 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
936 ];
937 }
938 $cv->end;
939 };
940 }
941 }
942
943 $cv->end;
944
945 $cv
946 }
947
948 sub configure(@) {
949 unshift @_, "profile" if @_ & 1;
950 my (%kv) = @_;
951
952 my $profile = delete $kv{profile};
953
954 $profile = nodename
955 unless defined $profile;
956
957 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
958
959 $SECURE = $CONFIG->{secure};
960
961 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
962
963 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
964
965 my $node_obj = delete $NODE{$NODE}; # we do not support doing stuff before configure
966
967 $NODE = $node;
968
969 $NODE =~ s/%n/nodename/ge;
970
971 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
972 # nodes with randomised node names do not need randomised port names
973 $UNIQ = "";
974 }
975
976 $node_obj->{id} = $NODE;
977 $NODE{$NODE} = $node_obj;
978
979 my $seeds = $CONFIG->{seeds};
980 my $binds = $CONFIG->{binds};
981
982 $binds ||= ["*"];
983
984 AE::log 8 => "node $NODE starting up.";
985
986 $BINDS = [];
987 %BINDS = ();
988
989 for (map _resolve $_, @$binds) {
990 for my $bind ($_->recv) {
991 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
992 or Carp::croak "$bind: unparsable local bind address";
993
994 my $listener = AnyEvent::MP::Transport::mp_server
995 $host,
996 $port,
997 prepare => sub {
998 my (undef, $host, $port) = @_;
999 $bind = AnyEvent::Socket::format_hostport $host, $port;
1000 0
1001 },
1002 ;
1003 $BINDS{$bind} = $listener;
1004 push @$BINDS, $bind;
1005 }
1006 }
1007
1008 db_set "'l" => $NODE => $BINDS;
1009
1010 AE::log 8 => "node listens on [@$BINDS].";
1011
1012 # connect to all seednodes
1013 set_seeds map $_->recv, map _resolve $_, @$seeds;
1014
1015 master_search;
1016
1017 # save gobs of memory
1018 undef &_resolve;
1019 *configure = sub (@){ };
1020
1021 for (@{ $CONFIG->{services} }) {
1022 if (ref) {
1023 my ($func, @args) = @$_;
1024 (load_func $func)->(@args);
1025 } elsif (s/::$//) {
1026 eval "require $_";
1027 die $@ if $@;
1028 } else {
1029 (load_func $_)->();
1030 }
1031 }
1032
1033 eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
1034 die "$@" if $@;
1035 }
1036
1037 =back
1038
1039 =head1 LOGGING
1040
1041 AnyEvent::MP::Kernel logs high-level information about the current node,
1042 when nodes go up and down, and most runtime errors. It also logs some
1043 debugging and trace messages about network maintainance, such as seed
1044 connections and global node management.
1045
1046 =head1 SEE ALSO
1047
1048 L<AnyEvent::MP>.
1049
1050 =head1 AUTHOR
1051
1052 Marc Lehmann <schmorp@schmorp.de>
1053 http://home.schmorp.de/
1054
1055 =cut
1056
1057 1
1058