ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.104
Committed: Fri Mar 23 13:44:01 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.103: +5 -6 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use Carp ();
28
29 use AnyEvent ();
30 use Guard ();
31
32 use AnyEvent::MP::Node;
33 use AnyEvent::MP::Transport;
34
35 use base "Exporter";
36
37 our @EXPORT_OK = qw(
38 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
39 );
40
41 our @EXPORT = qw(
42 add_node load_func snd_to_func snd_on eval_on
43
44 NODE $NODE node_of snd kil port_is_local
45 configure
46 up_nodes mon_nodes node_is_up
47 db_set db_del
48 db_mon db_family db_keys db_values
49 );
50
51 sub load_func($) {
52 my $func = $_[0];
53
54 unless (defined &$func) {
55 my $pkg = $func;
56 do {
57 $pkg =~ s/::[^:]+$//
58 or return sub { die "unable to resolve function '$func'" };
59
60 local $@;
61 unless (eval "require $pkg; 1") {
62 my $error = $@;
63 $error =~ /^Can't locate .*.pm in \@INC \(/
64 or return sub { die $error };
65 }
66 } until defined &$func;
67 }
68
69 \&$func
70 }
71
72 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
73
74 sub nonce($) {
75 join "", map chr rand 256, 1 .. $_[0]
76 }
77
78 sub nonce62($) {
79 join "", map $alnum[rand 62], 1 .. $_[0]
80 }
81
82 sub gen_uniq {
83 my $now = AE::now;
84 (join "",
85 map $alnum[$_],
86 $$ / 62 % 62,
87 $$ % 62,
88 (int $now ) % 62,
89 (int $now * 100) % 62,
90 (int $now * 10000) % 62,
91 ) . nonce62 4;
92 }
93
94 our $CONFIG; # this node's configuration
95 our $SECURE;
96
97 our $RUNIQ; # remote uniq value
98 our $UNIQ; # per-process/node unique cookie
99 our $NODE;
100 our $ID = "a";
101
102 our %NODE; # node id to transport mapping, or "undef", for local node
103 our (%PORT, %PORT_DATA); # local ports
104
105 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
106 our %LMON; # monitored _local_ ports
107
108 our $GLOBAL; # true if node is a global ("directory") node
109 our %BINDS;
110 our $BINDS; # our listeners, as arrayref
111
112 our $SRCNODE; # holds the sending node _object_ during _inject
113
114 sub _init_names {
115 # ~54 bits, for local port names, lowercase $ID appended
116 $UNIQ = gen_uniq;
117
118 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
119 $RUNIQ = nonce62 10;
120 $RUNIQ =~ s/(.)$/\U$1/;
121
122 $NODE = "anon/$RUNIQ";
123 }
124
125 _init_names;
126
127 sub NODE() {
128 $NODE
129 }
130
131 sub node_of($) {
132 my ($node, undef) = split /#/, $_[0], 2;
133
134 $node
135 }
136
137 BEGIN {
138 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
139 ? sub () { 1 }
140 : sub () { 0 };
141 }
142
143 our $DELAY_TIMER;
144 our @DELAY_QUEUE;
145
146 our $delay_run = sub {
147 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
148 };
149
150 sub delay($) {
151 push @DELAY_QUEUE, shift;
152 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
153 }
154
155 =item $AnyEvent::MP::Kernel::SRCNODE
156
157 During execution of a message callback, this variable contains the node ID
158 of the origin node.
159
160 The main use of this variable is for debugging output - there are probably
161 very few other cases where you need to know the source node ID.
162
163 =cut
164
165 sub _inject {
166 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
167 &{ $PORT{+shift} or return };
168 }
169
170 # this function adds a node-ref, so you can send stuff to it
171 # it is basically the central routing component.
172 sub add_node {
173 my ($node) = @_;
174
175 length $node
176 or Carp::croak "'undef' or the empty string are not valid node/port IDs";
177
178 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
179 }
180
181 sub snd(@) {
182 my ($nodeid, $portid) = split /#/, shift, 2;
183
184 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
185
186 ($NODE{$nodeid} || add_node $nodeid)
187 ->{send} (["$portid", @_]);
188 }
189
190 =item $is_local = port_is_local $port
191
192 Returns true iff the port is a local port.
193
194 =cut
195
196 sub port_is_local($) {
197 my ($nodeid, undef) = split /#/, $_[0], 2;
198
199 $NODE{$nodeid} == $NODE{""}
200 }
201
202 =item snd_to_func $node, $func, @args
203
204 Expects a node ID and a name of a function. Asynchronously tries to call
205 this function with the given arguments on that node.
206
207 This function can be used to implement C<spawn>-like interfaces.
208
209 =cut
210
211 sub snd_to_func($$;@) {
212 my $nodeid = shift;
213
214 # on $NODE, we artificially delay... (for spawn)
215 # this is very ugly - maybe we should simply delay ALL messages,
216 # to avoid deep recursion issues. but that's so... slow...
217 $AnyEvent::MP::Node::Self::DELAY = 1
218 if $nodeid ne $NODE;
219
220 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
221 }
222
223 =item snd_on $node, @msg
224
225 Executes C<snd> with the given C<@msg> (which must include the destination
226 port) on the given node.
227
228 =cut
229
230 sub snd_on($@) {
231 my $node = shift;
232 snd $node, snd => @_;
233 }
234
235 =item eval_on $node, $string[, @reply]
236
237 Evaluates the given string as Perl expression on the given node. When
238 @reply is specified, then it is used to construct a reply message with
239 C<"$@"> and any results from the eval appended.
240
241 =cut
242
243 sub eval_on($$;@) {
244 my $node = shift;
245 snd $node, eval => @_;
246 }
247
248 sub kil(@) {
249 my ($nodeid, $portid) = split /#/, shift, 2;
250
251 length $portid
252 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
253
254 ($NODE{$nodeid} || add_node $nodeid)
255 ->kill ("$portid", @_);
256 }
257
258 #############################################################################
259 # node monitoring and info
260
261 =item node_is_up $nodeid
262
263 Returns true if the given node is "up", that is, the kernel thinks it has
264 a working connection to it.
265
266 If the node is up, returns C<1>. If the node is currently connecting or
267 otherwise known but not connected, returns C<0>. If nothing is known about
268 the node, returns C<undef>.
269
270 =cut
271
272 sub node_is_up($) {
273 ($NODE{$_[0]} or return)->{transport}
274 ? 1 : 0
275 }
276
277 =item up_nodes
278
279 Return the node IDs of all nodes that are currently connected (excluding
280 the node itself).
281
282 =cut
283
284 sub up_nodes() {
285 map $_->{id}, grep $_->{transport}, values %NODE
286 }
287
288 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
289
290 Registers a callback that is called each time a node goes up (a connection
291 is established) or down (the connection is lost).
292
293 Node up messages can only be followed by node down messages for the same
294 node, and vice versa.
295
296 Note that monitoring a node is usually better done by monitoring its node
297 port. This function is mainly of interest to modules that are concerned
298 about the network topology and low-level connection handling.
299
300 Callbacks I<must not> block and I<should not> send any messages.
301
302 The function returns an optional guard which can be used to unregister
303 the monitoring callback again.
304
305 Example: make sure you call function C<newnode> for all nodes that are up
306 or go up (and down).
307
308 newnode $_, 1 for up_nodes;
309 mon_nodes \&newnode;
310
311 =cut
312
313 our %MON_NODES;
314
315 sub mon_nodes($) {
316 my ($cb) = @_;
317
318 $MON_NODES{$cb+0} = $cb;
319
320 defined wantarray
321 and Guard::guard { delete $MON_NODES{$cb+0} }
322 }
323
324 sub _inject_nodeevent($$;@) {
325 my ($node, $up, @reason) = @_;
326
327 for my $cb (values %MON_NODES) {
328 eval { $cb->($node->{id}, $up, @reason); 1 }
329 or AE::log die => $@;
330 }
331
332 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
333 }
334
335 #############################################################################
336 # self node code
337
338 sub _kill {
339 my $port = shift;
340
341 delete $PORT{$port}
342 or return; # killing nonexistent ports is O.K.
343 delete $PORT_DATA{$port};
344
345 my $mon = delete $LMON{$port}
346 or !@_
347 or AE::log die => "unmonitored local port $port died with reason: @_";
348
349 $_->(@_) for values %$mon;
350 }
351
352 sub _monitor {
353 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
354 unless exists $PORT{$_[1]};
355
356 $LMON{$_[1]}{$_[2]+0} = $_[2];
357 }
358
359 sub _unmonitor {
360 delete $LMON{$_[1]}{$_[2]+0}
361 if exists $LMON{$_[1]};
362 }
363
364 sub _secure_check {
365 $SECURE
366 and die "secure mode caught remote execution attempt\n";
367 }
368
369 our %NODE_REQ = (
370 # internal services
371
372 # monitoring
373 mon0 => sub { # stop monitoring a port for another node
374 my $portid = shift;
375 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
376 },
377 mon1 => sub { # start monitoring a port for another node
378 my $portid = shift;
379 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
380 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
381 delete $node->{rmon}{$portid};
382 $node->send (["", kil0 => $portid, @_])
383 if $node && $node->{transport};
384 };
385 },
386 # another node has killed a monitored port
387 kil0 => sub {
388 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
389 or return;
390
391 $_->(@_) for @$cbs;
392 },
393
394 # "public" services - not actually public
395
396 # another node wants to kill a local port
397 kil => \&_kill,
398
399 # relay message to another node / generic echo
400 snd => sub {
401 &_secure_check;
402 &snd
403 },
404
405 # random utilities
406 eval => sub {
407 &_secure_check;
408 my @res = do { package main; eval shift };
409 snd @_, "$@", @res if @_;
410 },
411 time => sub {
412 &_secure_check;
413 snd @_, AE::now;
414 },
415 devnull => sub {
416 #
417 },
418 "" => sub {
419 # empty messages are keepalives or similar devnull-applications
420 },
421 );
422
423 # the node port
424 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
425 $PORT{""} = sub {
426 my $tag = shift;
427 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
428 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
429 };
430
431 our $NPROTO = 1;
432
433 # tell everybody who connects our nproto
434 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
435 $_[0]{local_greeting}{nproto} = $NPROTO;
436 };
437
438 #############################################################################
439 # seed management, try to keep connections to all seeds at all times
440
441 our %SEED_NODE; # seed ID => node ID|undef
442 our %NODE_SEED; # map node ID to seed ID
443 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
444 our $SEED_WATCHER;
445 our $SEED_RETRY;
446
447 sub seed_connect {
448 my ($seed) = @_;
449
450 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
451 or Carp::croak "$seed: unparsable seed address";
452
453 AE::log 9 => "trying connect to seed node $seed.";
454
455 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
456 $host, $port,
457 on_greeted => sub {
458 # called after receiving remote greeting, learn remote node name
459
460 # we rely on untrusted data here (the remote node name) this is
461 # hopefully ok, as this can at most be used for DOSing, which is easy
462 # when you can do MITM anyway.
463
464 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
465 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
466 require AnyEvent::MP::Global; # every seed becomes a global node currently
467 delete $SEED_NODE{$seed};
468 } else {
469 $SEED_NODE{$seed} = $_[0]{remote_node};
470 $NODE_SEED{$_[0]{remote_node}} = $seed;
471 # also start global service, if not running
472 # we need to check here in addition to the mon_nodes below
473 # because we might only learn late that a node is a seed
474 # and then we might already be connected
475 snd $_[0]{remote_node}, "g_slave"
476 unless $_[0]{remote_greeting}{global};
477 }
478 },
479 sub {
480 delete $SEED_CONNECT{$seed};
481 }
482 ;
483 }
484
485 sub seed_all {
486 my @seeds = grep
487 !exists $SEED_CONNECT{$_}
488 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
489 keys %SEED_NODE;
490
491 if (@seeds) {
492 # start connection attempt for every seed we are not connected to yet
493 seed_connect $_
494 for @seeds;
495
496 $SEED_RETRY = $SEED_RETRY * 2 + rand;
497 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
498 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
499
500 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
501
502 } else {
503 # all seeds connected or connecting, no need to restart timer
504 undef $SEED_WATCHER;
505 }
506 }
507
508 sub seed_again {
509 $SEED_RETRY = 1;
510 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
511 }
512
513 # sets new seed list, starts connecting
514 sub set_seeds(@) {
515 %SEED_NODE = ();
516 %NODE_SEED = ();
517 %SEED_CONNECT = ();
518
519 @SEED_NODE{@_} = ();
520
521 seed_all;
522 }
523
524 mon_nodes sub {
525 return unless exists $NODE_SEED{$_[0]};
526
527 if ($_[1]) {
528 # each time a connection to a seed node goes up, make
529 # sure it runs the global service.
530 snd $_[0], "g_slave"
531 unless $NODE{$_[0]}{transport}{remote_greeting}{global};
532 } else {
533 # if we lost the connection to a seed node, make sure we are seeding
534 seed_again;
535 }
536 };
537
538 #############################################################################
539 # talk with/to global nodes
540
541 # protocol messages:
542 #
543 # sent by all slave nodes (slave to master)
544 # g_slave database - make other global node master of the sender
545 #
546 # sent by any node to global nodes
547 # g_set database - set whole database
548 # g_upd family set del - update single family
549 # g_del family key - delete key from database
550 # g_get family key reply... - send reply with data
551 #
552 # send by global nodes
553 # g_global - node became global, similar to global=1 greeting
554 #
555 # database families
556 # "'l" -> node -> listeners
557 # "'g" -> node -> undef
558 # ...
559 #
560
561 # used on all nodes:
562 our $MASTER; # the global node we bind ourselves to
563 our $MASTER_MON;
564 our %LOCAL_DB; # this node database
565
566 our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
567
568 our $GPROTO = 1;
569
570 # tell everybody who connects our nproto
571 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
572 $_[0]{local_greeting}{gproto} = $GPROTO;
573 };
574
575 #############################################################################
576 # master selection
577
578 # master requests
579 our %GLOBAL_REQ; # $id => \@req
580
581 sub global_req_add {
582 my ($id, $req) = @_;
583
584 return if exists $GLOBAL_REQ{$id};
585
586 $GLOBAL_REQ{$id} = $req;
587
588 snd $MASTER, @$req
589 if $MASTER;
590 }
591
592 sub global_req_del {
593 delete $GLOBAL_REQ{$_[0]};
594 }
595
596 #################################
597 # master rpc
598
599 our %GLOBAL_RES;
600 our $GLOBAL_RES_ID = "a";
601
602 sub global_call {
603 my $id = ++$GLOBAL_RES_ID;
604 $GLOBAL_RES{$id} = pop;
605 global_req_add $id, [@_, $id];
606 }
607
608 $NODE_REQ{g_reply} = sub {
609 my $id = shift;
610 global_req_del $id;
611 my $cb = delete $GLOBAL_RES{$id}
612 or return;
613 &$cb
614 };
615
616 #################################
617
618 sub g_find {
619 global_req_add "g_find $_[0]", [g_find => $_[0]];
620 }
621
622 # reply for g_find started in Node.pm
623 $NODE_REQ{g_found} = sub {
624 global_req_del "g_find $_[0]";
625
626 my $node = $NODE{$_[0]} or return;
627
628 $node->connect_to ($_[1]);
629 };
630
631 sub master_set {
632 $MASTER = $_[0];
633
634 snd $MASTER, g_slave => \%LOCAL_DB;
635
636 # (re-)send queued requests
637 snd $MASTER, @$_
638 for values %GLOBAL_REQ;
639 }
640
641 sub master_search {
642 #TODO: should also look for other global nodes, but we don't know them #d#
643 for (keys %NODE_SEED) {
644 if (node_is_up $_) {
645 master_set $_;
646 return;
647 }
648 }
649
650 $MASTER_MON = mon_nodes sub {
651 return unless $_[1]; # we are only interested in node-ups
652 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
653
654 master_set $_[0];
655
656 $MASTER_MON = mon_nodes sub {
657 if ($_[0] eq $MASTER && !$_[1]) {
658 undef $MASTER;
659 master_search ();
660 }
661 };
662 };
663 }
664
665 # other node wants to make us the master
666 $NODE_REQ{g_slave} = sub {
667 my ($db) = @_;
668
669 # load global module and redo the request
670 require AnyEvent::MP::Global;
671 &{ $NODE_REQ{g_slave} }
672 };
673
674 #############################################################################
675 # local database operations
676
677 # local database management
678
679 sub db_set($$;$) {
680 my ($family, $subkey) = @_;
681
682 # if (ref $_[1]) {
683 # # bulk
684 # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
685 # $LOCAL_DB{$_[0]} = $_[1];
686 # snd $MASTER, g_upd => $_[0] => $_[1], \@del
687 # if defined $MASTER;
688 # } else {
689 # single-key
690 $LOCAL_DB{$family}{$subkey} = $_[2];
691 snd $MASTER, g_upd => $family => { $subkey => $_[2] }
692 if defined $MASTER;
693 # }
694
695 defined wantarray
696 and Guard::guard { db_del $family => $subkey }
697 }
698
699 sub db_del($@) {
700 my $family = shift;
701
702 delete @{ $LOCAL_DB{$family} }{@_};
703 snd $MASTER, g_upd => $family => undef, \@_
704 if defined $MASTER;
705 }
706
707 # database query
708
709 sub db_family {
710 my ($family, $cb) = @_;
711 global_call g_db_family => $family, $cb;
712 }
713
714 sub db_keys {
715 my ($family, $cb) = @_;
716 global_call g_db_keys => $family, $cb;
717 }
718
719 sub db_values {
720 my ($family, $cb) = @_;
721 global_call g_db_values => $family, $cb;
722 }
723
724 # database monitoring
725
726 our %LOCAL_MON; # f, reply
727 our %MON_DB; # f, k, value
728
729 sub db_mon($@) {
730 my ($family, $cb) = @_;
731
732 if (my $db = $MON_DB{$family}) {
733 # we already monitor, so create a "dummy" change event
734 # this is postponed, which might be too late (we could process
735 # change events), so disable the callback at first
736 $LOCAL_MON{$family}{$cb+0} = sub { };
737 AE::postpone {
738 return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
739
740 # set actual callback
741 $LOCAL_MON{$family}{$cb+0} = $cb;
742 $cb->($db, [keys %$db]);
743 };
744 } else {
745 # new monitor, request chg1 from upstream
746 $LOCAL_MON{$family}{$cb+0} = $cb;
747 global_req_add "mon1 $family" => [g_mon1 => $family];
748 $MON_DB{$family} = {};
749 }
750
751 defined wantarray
752 and Guard::guard {
753 my $mon = $LOCAL_MON{$family};
754 delete $mon->{$cb+0};
755
756 unless (%$mon) {
757 global_req_del "mon1 $family";
758
759 # no global_req, because we don't care if we are not connected
760 snd $MASTER, g_mon0 => $family
761 if $MASTER;
762
763 delete $LOCAL_MON{$family};
764 delete $MON_DB{$family};
765 }
766 }
767 }
768
769 # full update
770 $NODE_REQ{g_chg1} = sub {
771 return unless $SRCNODE eq $MASTER;
772 my ($f, $ndb) = @_;
773
774 my $db = $MON_DB{$f};
775 my (@a, @c, @d);
776
777 # add or replace keys
778 while (my ($k, $v) = each %$ndb) {
779 exists $db->{$k}
780 ? push @c, $k
781 : push @a, $k;
782 $db->{$k} = $v;
783 }
784
785 # delete keys that are no longer present
786 for (grep !exists $ndb->{$_}, keys %$db) {
787 delete $db->{$_};
788 push @d, $_;
789 }
790
791 $_->($db, \@a, \@c, \@d)
792 for values %{ $LOCAL_MON{$_[0]} };
793 };
794
795 # incremental update
796 $NODE_REQ{g_chg2} = sub {
797 return unless $SRCNODE eq $MASTER;
798 my ($family, $set, $del) = @_;
799
800 my $db = $MON_DB{$family};
801
802 my (@a, @c);
803
804 while (my ($k, $v) = each %$set) {
805 exists $db->{$k}
806 ? push @c, $k
807 : push @a, $k;
808 $db->{$k} = $v;
809 }
810
811 delete @$db{@$del};
812
813 $_->($db, \@a, \@c, $del)
814 for values %{ $LOCAL_MON{$family} };
815 };
816
817 #############################################################################
818 # configure
819
820 sub nodename {
821 require POSIX;
822 (POSIX::uname ())[1]
823 }
824
825 sub _resolve($) {
826 my ($nodeid) = @_;
827
828 my $cv = AE::cv;
829 my @res;
830
831 $cv->begin (sub {
832 my %seen;
833 my @refs;
834 for (sort { $a->[0] <=> $b->[0] } @res) {
835 push @refs, $_->[1] unless $seen{$_->[1]}++
836 }
837 shift->send (@refs);
838 });
839
840 my $idx;
841 for my $t (split /,/, $nodeid) {
842 my $pri = ++$idx;
843
844 $t = length $t ? nodename . ":$t" : nodename
845 if $t =~ /^\d*$/;
846
847 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
848 or Carp::croak "$t: unparsable transport descriptor";
849
850 $port = "0" if $port eq "*";
851
852 if ($host eq "*") {
853 $cv->begin;
854
855 my $get_addr = sub {
856 my @addr;
857
858 require Net::Interface;
859
860 # Net::Interface hangs on some systems, so hope for the best
861 local $SIG{ALRM} = 'DEFAULT';
862 alarm 2;
863
864 for my $if (Net::Interface->interfaces) {
865 # we statically lower-prioritise ipv6 here, TODO :()
866 for $_ ($if->address (Net::Interface::AF_INET ())) {
867 next if /^\x7f/; # skip localhost etc.
868 push @addr, $_;
869 }
870 for ($if->address (Net::Interface::AF_INET6 ())) {
871 #next if $if->scope ($_) <= 2;
872 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
873 push @addr, $_;
874 }
875 }
876
877 alarm 0;
878
879 @addr
880 };
881
882 my @addr;
883
884 if (AnyEvent::WIN32) {
885 @addr = $get_addr->();
886 } else {
887 # use a child process, as Net::Interface is big, and we need it only once.
888
889 pipe my $r, my $w
890 or die "pipe: $!";
891
892 if (fork eq 0) {
893 close $r;
894 syswrite $w, pack "(C/a*)*", $get_addr->();
895 require POSIX;
896 POSIX::_exit (0);
897 } else {
898 close $w;
899
900 my $addr;
901
902 1 while sysread $r, $addr, 1024, length $addr;
903
904 @addr = unpack "(C/a*)*", $addr;
905 }
906 }
907
908 for my $ip (@addr) {
909 push @res, [
910 $pri += 1e-5,
911 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
912 ];
913 }
914 $cv->end;
915 } else {
916 $cv->begin;
917 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
918 for (@_) {
919 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
920 push @res, [
921 $pri += 1e-5,
922 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
923 ];
924 }
925 $cv->end;
926 };
927 }
928 }
929
930 $cv->end;
931
932 $cv
933 }
934
935 sub configure(@) {
936 unshift @_, "profile" if @_ & 1;
937 my (%kv) = @_;
938
939 delete $NODE{$NODE}; # we do not support doing stuff before configure
940 _init_names;
941
942 my $profile = delete $kv{profile};
943
944 $profile = nodename
945 unless defined $profile;
946
947 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
948
949 $SECURE = $CONFIG->{secure};
950
951 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
952
953 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
954
955 $NODE = $node;
956
957 $NODE =~ s/%n/nodename/ge;
958
959 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
960 # nodes with randomised node names do not need randomised port names
961 $UNIQ = "";
962 }
963
964 $NODE{$NODE} = $NODE{""};
965 $NODE{$NODE}{id} = $NODE;
966
967 my $seeds = $CONFIG->{seeds};
968 my $binds = $CONFIG->{binds};
969
970 $binds ||= ["*"];
971
972 AE::log 8 => "node $NODE starting up.";
973
974 $BINDS = [];
975 %BINDS = ();
976
977 for (map _resolve $_, @$binds) {
978 for my $bind ($_->recv) {
979 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
980 or Carp::croak "$bind: unparsable local bind address";
981
982 my $listener = AnyEvent::MP::Transport::mp_server
983 $host,
984 $port,
985 prepare => sub {
986 my (undef, $host, $port) = @_;
987 $bind = AnyEvent::Socket::format_hostport $host, $port;
988 0
989 },
990 ;
991 $BINDS{$bind} = $listener;
992 push @$BINDS, $bind;
993 }
994 }
995
996 db_set "'l" => $NODE => $BINDS;
997
998 AE::log 8 => "node listens on [@$BINDS].";
999
1000 # connect to all seednodes
1001 set_seeds map $_->recv, map _resolve $_, @$seeds;
1002
1003 master_search;
1004
1005 # save gobs of memory
1006 undef &_resolve;
1007 *configure = sub (@){ };
1008
1009 for (@{ $CONFIG->{services} }) {
1010 if (ref) {
1011 my ($func, @args) = @$_;
1012 (load_func $func)->(@args);
1013 } elsif (s/::$//) {
1014 eval "require $_";
1015 die $@ if $@;
1016 } else {
1017 (load_func $_)->();
1018 }
1019 }
1020
1021 eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
1022 die "$@" if $@;
1023 }
1024
1025 =back
1026
1027 =head1 LOGGING
1028
1029 AnyEvent::MP::Kernel logs high-level information about the current node,
1030 when nodes go up and down, and most runtime errors. It also logs some
1031 debugging and trace messages about network maintainance, such as seed
1032 connections and global node management.
1033
1034 =head1 SEE ALSO
1035
1036 L<AnyEvent::MP>.
1037
1038 =head1 AUTHOR
1039
1040 Marc Lehmann <schmorp@schmorp.de>
1041 http://home.schmorp.de/
1042
1043 =cut
1044
1045 1
1046