ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.71
Committed: Tue Feb 28 18:37:24 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.70: +261 -69 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT_OK = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 );
41
42 our @EXPORT = qw(
43 add_node load_func snd_to_func snd_on eval_on
44
45 NODE $NODE node_of snd kil port_is_local
46 configure
47 up_nodes mon_nodes node_is_up
48 );
49
50 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
51
52 This value is called with an error or warning message, when e.g. a
53 connection could not be created, authorisation failed and so on.
54
55 It I<must not> block or send messages -queue it and use an idle watcher if
56 you need to do any of these things.
57
58 C<$level> should be C<0> for messages to be logged always, C<1> for
59 unexpected messages and errors, C<2> for warnings, C<7> for messages about
60 node connectivity and services, C<8> for debugging messages and C<9> for
61 tracing messages.
62
63 The default simply logs the message to STDERR.
64
65 =item @AnyEvent::MP::Kernel::WARN
66
67 All code references in this array are called for every log message, from
68 the default C<$WARN> handler. This is an easy way to tie into the log
69 messages without disturbing others.
70
71 =cut
72
73 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
74 our @WARN;
75 our $WARN = sub {
76 &$_ for @WARN;
77
78 return if $WARNLEVEL < $_[0];
79
80 my ($level, $msg) = @_;
81
82 $msg =~ s/\n$//;
83
84 printf STDERR "%s <%d> %s\n",
85 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
86 $level,
87 $msg;
88 };
89
90 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
91
92 The maximum level at which warning messages will be printed to STDERR by
93 the default warn handler.
94
95 =cut
96
97 sub load_func($) {
98 my $func = $_[0];
99
100 unless (defined &$func) {
101 my $pkg = $func;
102 do {
103 $pkg =~ s/::[^:]+$//
104 or return sub { die "unable to resolve function '$func'" };
105
106 local $@;
107 unless (eval "require $pkg; 1") {
108 my $error = $@;
109 $error =~ /^Can't locate .*.pm in \@INC \(/
110 or return sub { die $error };
111 }
112 } until defined &$func;
113 }
114
115 \&$func
116 }
117
118 sub nonce($) {
119 my $nonce;
120
121 if (open my $fh, "</dev/urandom") {
122 sysread $fh, $nonce, $_[0];
123 } else {
124 # shit...
125 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
126 }
127
128 $nonce
129 }
130
131 sub alnumbits($) {
132 my $data = $_[0];
133
134 if (eval "use Math::GMP 2.05; 1") {
135 $data = Math::GMP::get_str_gmp (
136 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
137 62
138 );
139 } else {
140 $data = MIME::Base64::encode_base64 $data, "";
141 $data =~ s/=//;
142 $data =~ s/x/x0/g;
143 $data =~ s/\//x1/g;
144 $data =~ s/\+/x2/g;
145 }
146
147 $data
148 }
149
150 sub gen_uniq {
151 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
152 }
153
154 our $CONFIG; # this node's configuration
155
156 our $RUNIQ; # remote uniq value
157 our $UNIQ; # per-process/node unique cookie
158 our $NODE;
159 our $ID = "a";
160
161 our %NODE; # node id to transport mapping, or "undef", for local node
162 our (%PORT, %PORT_DATA); # local ports
163
164 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
165 our %LMON; # monitored _local_ ports
166
167 our $GLOBAL; # true if node is a global ("directory") node
168 our %LISTENER;
169 our $LISTENER; # our listeners, as arrayref
170
171 our $SRCNODE; # holds the sending node during _inject
172
173 sub _init_names {
174 $RUNIQ = alnumbits nonce 96/8;
175 $UNIQ = gen_uniq;
176 $NODE = "anon/$RUNIQ";
177 }
178
179 _init_names;
180
181 sub NODE() {
182 $NODE
183 }
184
185 sub node_of($) {
186 my ($node, undef) = split /#/, $_[0], 2;
187
188 $node
189 }
190
191 BEGIN {
192 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
193 ? sub () { 1 }
194 : sub () { 0 };
195 }
196
197 our $DELAY_TIMER;
198 our @DELAY_QUEUE;
199
200 sub _delay_run {
201 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
202 }
203
204 sub delay($) {
205 push @DELAY_QUEUE, shift;
206 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
207 }
208
209 sub _inject {
210 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
211 &{ $PORT{+shift} or return };
212 }
213
214 # this function adds a node-ref, so you can send stuff to it
215 # it is basically the central routing component.
216 sub add_node {
217 my ($node) = @_;
218
219 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
220 }
221
222 sub snd(@) {
223 my ($nodeid, $portid) = split /#/, shift, 2;
224
225 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
226
227 defined $nodeid #d#UGLY
228 or Carp::croak "'undef' is not a valid node ID/port ID";
229
230 ($NODE{$nodeid} || add_node $nodeid)
231 ->{send} (["$portid", @_]);
232 }
233
234 =item $is_local = port_is_local $port
235
236 Returns true iff the port is a local port.
237
238 =cut
239
240 sub port_is_local($) {
241 my ($nodeid, undef) = split /#/, $_[0], 2;
242
243 $NODE{$nodeid} == $NODE{""}
244 }
245
246 =item snd_to_func $node, $func, @args
247
248 Expects a node ID and a name of a function. Asynchronously tries to call
249 this function with the given arguments on that node.
250
251 This function can be used to implement C<spawn>-like interfaces.
252
253 =cut
254
255 sub snd_to_func($$;@) {
256 my $nodeid = shift;
257
258 # on $NODE, we artificially delay... (for spawn)
259 # this is very ugly - maybe we should simply delay ALL messages,
260 # to avoid deep recursion issues. but that's so... slow...
261 $AnyEvent::MP::Node::Self::DELAY = 1
262 if $nodeid ne $NODE;
263
264 defined $nodeid #d#UGLY
265 or Carp::croak "'undef' is not a valid node ID/port ID";
266
267 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
268 }
269
270 =item snd_on $node, @msg
271
272 Executes C<snd> with the given C<@msg> (which must include the destination
273 port) on the given node.
274
275 =cut
276
277 sub snd_on($@) {
278 my $node = shift;
279 snd $node, snd => @_;
280 }
281
282 =item eval_on $node, $string[, @reply]
283
284 Evaluates the given string as Perl expression on the given node. When
285 @reply is specified, then it is used to construct a reply message with
286 C<"$@"> and any results from the eval appended.
287
288 =cut
289
290 sub eval_on($$;@) {
291 my $node = shift;
292 snd $node, eval => @_;
293 }
294
295 sub kil(@) {
296 my ($nodeid, $portid) = split /#/, shift, 2;
297
298 length $portid
299 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
300
301 ($NODE{$nodeid} || add_node $nodeid)
302 ->kill ("$portid", @_);
303 }
304
305 #############################################################################
306 # node monitoring and info
307
308 =item node_is_known $nodeid
309
310 #TODO#
311 Returns true iff the given node is currently known to this node.
312
313 =cut
314
315 sub node_is_known($) {
316 exists $NODE{$_[0]}
317 }
318
319 =item node_is_up $nodeid
320
321 Returns true if the given node is "up", that is, the kernel thinks it has
322 a working connection to it.
323
324 If the node is known (to this local node) but not currently connected,
325 returns C<0>. If the node is not known, returns C<undef>.
326
327 =cut
328
329 sub node_is_up($) {
330 ($NODE{$_[0]} or return)->{transport}
331 ? 1 : 0
332 }
333
334 =item known_nodes
335
336 #TODO#
337 Returns the node IDs of all nodes currently known to this node, including
338 itself and nodes not currently connected.
339
340 =cut
341
342 sub known_nodes() {
343 map $_->{id}, values %NODE
344 }
345
346 =item up_nodes
347
348 Return the node IDs of all nodes that are currently connected (excluding
349 the node itself).
350
351 =cut
352
353 sub up_nodes() {
354 map $_->{id}, grep $_->{transport}, values %NODE
355 }
356
357 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
358
359 Registers a callback that is called each time a node goes up (a connection
360 is established) or down (the connection is lost).
361
362 Node up messages can only be followed by node down messages for the same
363 node, and vice versa.
364
365 Note that monitoring a node is usually better done by monitoring its node
366 port. This function is mainly of interest to modules that are concerned
367 about the network topology and low-level connection handling.
368
369 Callbacks I<must not> block and I<should not> send any messages.
370
371 The function returns an optional guard which can be used to unregister
372 the monitoring callback again.
373
374 Example: make sure you call function C<newnode> for all nodes that are up
375 or go up (and down).
376
377 newnode $_, 1 for up_nodes;
378 mon_nodes \&newnode;
379
380 =cut
381
382 our %MON_NODES;
383
384 sub mon_nodes($) {
385 my ($cb) = @_;
386
387 $MON_NODES{$cb+0} = $cb;
388
389 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
390 }
391
392 sub _inject_nodeevent($$;@) {
393 my ($node, $up, @reason) = @_;
394
395 for my $cb (values %MON_NODES) {
396 eval { $cb->($node->{id}, $up, @reason); 1 }
397 or $WARN->(1, $@);
398 }
399
400 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
401 }
402
403 #############################################################################
404 # self node code
405
406 sub _kill {
407 my $port = shift;
408
409 delete $PORT{$port}
410 or return; # killing nonexistent ports is O.K.
411 delete $PORT_DATA{$port};
412
413 my $mon = delete $LMON{$port}
414 or !@_
415 or $WARN->(2, "unmonitored local port $port died with reason: @_");
416
417 $_->(@_) for values %$mon;
418 }
419
420 sub _monitor {
421 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
422 unless exists $PORT{$_[1]};
423
424 $LMON{$_[1]}{$_[2]+0} = $_[2];
425 }
426
427 sub _unmonitor {
428 delete $LMON{$_[1]}{$_[2]+0}
429 if exists $LMON{$_[1]};
430 }
431
432 our %node_req = (
433 # internal services
434
435 # monitoring
436 mon0 => sub { # stop monitoring a port for another node
437 my $portid = shift;
438 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
439 },
440 mon1 => sub { # start monitoring a port for another node
441 my $portid = shift;
442 Scalar::Util::weaken (my $node = $SRCNODE);
443 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
444 delete $node->{rmon}{$portid};
445 $node->send (["", kil0 => $portid, @_])
446 if $node && $node->{transport};
447 };
448 },
449 # another node has killed a monitored port
450 kil0 => sub {
451 my $cbs = delete $SRCNODE->{lmon}{+shift}
452 or return;
453
454 $_->(@_) for @$cbs;
455 },
456
457 # "public" services - not actually public
458
459 # another node wants to kill a local port
460 kil => \&_kill,
461
462 # relay message to another node / generic echo
463 snd => \&snd,
464 snd_multiple => sub {
465 snd @$_ for @_
466 },
467
468 # informational
469 info => sub {
470 snd @_, $NODE;
471 },
472 known_nodes => sub {
473 snd @_, known_nodes;
474 },
475 up_nodes => sub {
476 snd @_, up_nodes;
477 },
478
479 # random utilities
480 eval => sub {
481 my @res = do { package main; eval shift };
482 snd @_, "$@", @res if @_;
483 },
484 time => sub {
485 snd @_, AE::time;
486 },
487 devnull => sub {
488 #
489 },
490 "" => sub {
491 # empty messages are keepalives or similar devnull-applications
492 },
493 );
494
495 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
496 $PORT{""} = sub {
497 my $tag = shift;
498 eval { &{ $node_req{$tag} ||= load_func $tag } };
499 $WARN->(2, "error processing node message: $@") if $@;
500 };
501
502 #############################################################################
503 # seed management, try to keep connections to all seeds at all times
504
505 our %SEED_NODE; # seed ID => node ID|undef
506 our %NODE_SEED; # map node ID to seed ID
507 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
508 our $SEED_WATCHER;
509 our $SEED_RETRY;
510
511 sub seed_connect {
512 my ($seed) = @_;
513
514 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
515 or Carp::croak "$seed: unparsable seed address";
516
517 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
518
519 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
520 $host, $port,
521 on_greeted => sub {
522 # called after receiving remote greeting, learn remote node name
523
524 # we rely on untrusted data here (the remote node name) this is
525 # hopefully ok, as this can at most be used for DOSing, which is easy
526 # when you can do MITM anyway.
527
528 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
529 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
530 #d# require AnyEvent::MP::Global; # every seed becomes a global node currently
531 _become_global ();#d#
532 delete $SEED_NODE{$seed};
533 delete $NODE_SEED{$seed};
534 } else {
535 $SEED_NODE{$seed} = $_[0]{remote_node};
536 $NODE_SEED{$_[0]{remote_node}} = $seed;
537 }
538 },
539 on_destroy => sub {
540 delete $SEED_CONNECT{$seed};
541 },
542 sub {
543 $SEED_CONNECT{$seed} = 1;
544 }
545 ;
546 }
547
548 sub seed_all {
549 # my $next = List::Util::max 1,
550 # $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
551 # * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
552 # - rand;
553
554 my @seeds = grep {
555 !exists $SEED_CONNECT{$_}
556 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
557 } keys %SEED_NODE;
558
559 if (@seeds) {
560 # start connection attempt for every seed we are not connected to yet
561 seed_connect $_
562 for @seeds;
563
564 $SEED_RETRY = $SEED_RETRY * 2 + rand;
565 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
566 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
567
568 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
569
570 } else {
571 # all seeds connected or connecting, no need to restart timer
572 undef $SEED_WATCHER;
573 }
574 }
575
576 sub seed_again {
577 $SEED_RETRY = 1;
578 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
579 }
580
581 # sets new seed list, starts connecting
582 sub set_seeds(@) {
583 %SEED_NODE = ();
584 %NODE_SEED = ();
585 %SEED_CONNECT = ();
586
587 @SEED_NODE{@_} = ();
588
589 seed_again;#d#
590 seed_all;
591 }
592
593 mon_nodes sub {
594 # if we lost the connection to a seed node, make sure we are seeding
595 seed_again
596 if !$_[1] && exists $NODE_SEED{$_[0]};
597 };
598
599 #############################################################################
600 # talk with/to global nodes
601
602 our $NODE_ADDR; # node => [listeners] - contains global nodes or, for global nodes, all nodes
603 our $GLOBAL_ADDR;
604 our $GLOBAL_MON;
605 our $GLOBAL_TIMER;
606 our $MASTER; # the global node we use for lookups ourselves
607
608 # master requests
609 our %GLOBAL_REQ; # [$cb, \@req]
610
611 sub global_req {
612 my $cb = pop;
613
614 $GLOBAL_REQ{$cb+0} = [$cb, [@_]];
615
616 snd $MASTER, @_, $cb+0
617 if $MASTER;
618
619 defined wantarray && AnyEvent::Util::guard { delete $GLOBAL_REQ{$cb+0} }
620 }
621
622 sub global_master {
623 $MASTER = $_[0];
624
625 snd $MASTER, "global_slave";
626 # ask master for list of global nodes (already done by global_reg)
627 # snd $MASTER, "global_db_get";
628
629 # send queued requests
630 while (my ($k, $v) = each %GLOBAL_REQ) {
631 snd $MASTER, @{$v->[1]}, $k;
632 }
633 }
634
635 $node_req{global_reply} = sub {
636 my $id = shift;
637
638 my $cb = delete $GLOBAL_REQ{$id}
639 or return;
640
641 $cb->[0]->(@_);
642 };
643
644 {#d#
645 # global code
646
647 our %SLAVE;
648
649 # other node wants to make us the master
650 $node_req{global_slave} = sub {
651 undef $SLAVE{$SRCNODE->{id}};
652 snd $SRCNODE->{id}, global_db_set => $GLOBAL_ADDR;
653 };
654
655 $node_req{global_find} = sub {
656 my ($node, $id) = @_;
657
658 snd $SRCNODE->{id}, global_reply => $id, $NODE_ADDR->{$node};
659 };
660
661 # delete slaves on node-down
662 mon_nodes sub {
663 return if $_[1];
664
665 delete $SLAVE{$_[0]};
666 };
667
668 }#d#
669
670 # reply with global db
671 $node_req{global_db_get} = sub {
672 snd $SRCNODE->{id}, global_db_set => $GLOBAL_ADDR;
673 };
674
675 $node_req{global_db_set} = sub {
676 $GLOBAL_ADDR = shift;
677 $GLOBAL_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
678 (add_node $_)->connect
679 for keys %$GLOBAL_ADDR;
680 };
681 };
682
683 sub global_search {
684 if ($GLOBAL) {
685 $MASTER = $NODE;
686 undef $GLOBAL_MON;
687 } else {
688 for (keys %NODE_SEED, keys %$GLOBAL_ADDR) {
689 if (node_is_up $_) {
690 global_master $_;
691 return;
692 }
693 }
694
695 $GLOBAL_MON = mon_nodes sub {
696 return unless $_[1]; # we are only interested in node-ups
697 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
698
699 global_master $_[0];
700
701 $GLOBAL_MON = mon_nodes sub {
702 if ($_[0] eq $MASTER && !$_[1]) {
703 undef $MASTER;
704 global_search ();
705 }
706 };
707 };
708 }
709 }
710
711 # register remote node listeners
712 $node_req{global_reg} = sub {
713 if ($GLOBAL) {
714 $NODE_ADDR->{$SRCNODE->{id}} = $_[0];
715 } else {
716 # should not happen normally, except when there are bad races
717 $WARN->(1, "$SRCNODE->{id} treats us as global node, but we aren't");
718 }
719 };
720
721 # also used
722 $node_req{global_add} = sub {
723 my ($node, $listener) = @_;
724
725 # no-op if we already know the info
726 return
727 if (join "\x00", @$listener) eq (join "\x00", @{ $GLOBAL_ADDR->{$node} });
728
729 $GLOBAL_ADDR->{$node} = $listener;
730
731 # let us be a good citizen and register (also connects)
732 snd $node, global_reg => $LISTENER;
733
734 # if we are global, we update all our slaves
735 our %SLAVE; # ugh, will be in AnyEvent::MP::Global
736 snd $_, global_add => $node, $listener
737 for keys %SLAVE;
738 };
739
740 $node_req{global_del} = sub {
741 my ($node) = @_;
742
743 delete $GLOBAL_ADDR->{$node};
744 };
745
746 # tell every global node our listeners
747 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
748 snd $_[0]{remote_node}, global_reg => $LISTENER
749 if $_[0]{remote_greeting}{global};
750 };
751
752 sub _become_global { #d# hack
753 *_become_global = sub { };
754
755 warn "becoming global\n";#d#
756
757 $GLOBAL = 1;
758 global_search;
759
760 $GLOBAL_ADDR->{$NODE} = $LISTENER;
761
762 $GLOBAL_MON = mon_nodes sub {
763 return if $_[1];
764
765 delete $NODE_ADDR->{$_[0]};
766
767 if (delete $GLOBAL_ADDR->{$_[0]}) {
768 # if the node is global, tell our slaves
769
770 warn "global_del $_[0]\n";#d#
771 our %SLAVE; # ugh, will be in AnyEvent::MP::Global
772 snd $_, global_del => $_[0]
773 for keys %SLAVE;
774 }
775 };
776
777 # tell everybody who connects that we are a master
778 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
779 $_[0]{local_greeting}{global} = 1;
780 };
781
782 # tell every global node that connects that we are global too
783 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
784 snd $_[0], global_add => $NODE, $LISTENER
785 if $_[0]{remote_greeting}{global};
786 };
787
788 # tell everybody else that we are global now
789 snd $_ => global_add => $NODE, $LISTENER
790 for up_nodes;
791 }
792
793 global_search;
794
795 #############################################################################
796 # configure
797
798 sub _nodename {
799 require POSIX;
800 (POSIX::uname ())[1]
801 }
802
803 sub _resolve($) {
804 my ($nodeid) = @_;
805
806 my $cv = AE::cv;
807 my @res;
808
809 $cv->begin (sub {
810 my %seen;
811 my @refs;
812 for (sort { $a->[0] <=> $b->[0] } @res) {
813 push @refs, $_->[1] unless $seen{$_->[1]}++
814 }
815 shift->send (@refs);
816 });
817
818 my $idx;
819 for my $t (split /,/, $nodeid) {
820 my $pri = ++$idx;
821
822 $t = length $t ? _nodename . ":$t" : _nodename
823 if $t =~ /^\d*$/;
824
825 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
826 or Carp::croak "$t: unparsable transport descriptor";
827
828 $port = "0" if $port eq "*";
829
830 if ($host eq "*") {
831 $cv->begin;
832 # use fork_call, as Net::Interface is big, and we need it rarely.
833 require AnyEvent::Util;
834 AnyEvent::Util::fork_call (
835 sub {
836 my @addr;
837
838 require Net::Interface;
839
840 for my $if (Net::Interface->interfaces) {
841 # we statically lower-prioritise ipv6 here, TODO :()
842 for $_ ($if->address (Net::Interface::AF_INET ())) {
843 next if /^\x7f/; # skip localhost etc.
844 push @addr, $_;
845 }
846 for ($if->address (Net::Interface::AF_INET6 ())) {
847 #next if $if->scope ($_) <= 2;
848 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
849 push @addr, $_;
850 }
851
852 }
853 @addr
854 }, sub {
855 for my $ip (@_) {
856 push @res, [
857 $pri += 1e-5,
858 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
859 ];
860 }
861 $cv->end;
862 }
863 );
864 } else {
865 $cv->begin;
866 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
867 for (@_) {
868 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
869 push @res, [
870 $pri += 1e-5,
871 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
872 ];
873 }
874 $cv->end;
875 };
876 }
877 }
878
879 $cv->end;
880
881 $cv
882 }
883
884 sub configure(@) {
885 unshift @_, "profile" if @_ & 1;
886 my (%kv) = @_;
887
888 delete $NODE{$NODE}; # we do not support doing stuff before configure
889 _init_names;
890
891 my $profile = delete $kv{profile};
892
893 $profile = _nodename
894 unless defined $profile;
895
896 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
897
898 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
899
900 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
901
902 $NODE = $node
903 unless $node eq "anon/";
904
905 $NODE{$NODE} = $NODE{""};
906 $NODE{$NODE}{id} = $NODE;
907
908 my $seeds = $CONFIG->{seeds};
909 my $binds = $CONFIG->{binds};
910
911 $binds ||= ["*"];
912
913 $WARN->(8, "node $NODE starting up.");
914
915 $LISTENER = [];
916 %LISTENER = ();
917
918 for (map _resolve $_, @$binds) {
919 for my $bind ($_->recv) {
920 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
921 or Carp::croak "$bind: unparsable local bind address";
922
923 my $listener = AnyEvent::MP::Transport::mp_server
924 $host,
925 $port,
926 prepare => sub {
927 my (undef, $host, $port) = @_;
928 $bind = AnyEvent::Socket::format_hostport $host, $port;
929 0
930 },
931 ;
932 $LISTENER{$bind} = $listener;
933 push @$LISTENER, $bind;
934 }
935 }
936
937 $WARN->(8, "node listens on [@$LISTENER].");
938
939 # the global service is mandatory currently
940 #require AnyEvent::MP::Global;
941
942 # connect to all seednodes
943 set_seeds map $_->recv, map _resolve $_, @$seeds;
944
945 if ($NODE eq "atha") {;#d#
946 my $w; $w = AE::timer 4, 0, sub { undef $w; _become_global };
947 }
948
949 for (@{ $CONFIG->{services} }) {
950 if (ref) {
951 my ($func, @args) = @$_;
952 (load_func $func)->(@args);
953 } elsif (s/::$//) {
954 eval "require $_";
955 die $@ if $@;
956 } else {
957 (load_func $_)->();
958 }
959 }
960 }
961
962 =back
963
964 =head1 SEE ALSO
965
966 L<AnyEvent::MP>.
967
968 =head1 AUTHOR
969
970 Marc Lehmann <schmorp@schmorp.de>
971 http://home.schmorp.de/
972
973 =cut
974
975 1
976