ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.83
Committed: Sat Mar 3 20:35:10 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.82: +14 -4 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29
30 use AnyEvent ();
31 use Guard ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT_OK = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 );
41
42 our @EXPORT = qw(
43 add_node load_func snd_to_func snd_on eval_on
44
45 NODE $NODE node_of snd kil port_is_local
46 configure
47 up_nodes mon_nodes node_is_up
48 db_set db_del db_reg
49 );
50
51 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
52
53 This value is called with an error or warning message, when e.g. a
54 connection could not be created, authorisation failed and so on.
55
56 It I<must not> block or send messages -queue it and use an idle watcher if
57 you need to do any of these things.
58
59 C<$level> should be C<0> for messages to be logged always, C<1> for
60 unexpected messages and errors, C<2> for warnings, C<7> for messages about
61 node connectivity and services, C<8> for debugging messages and C<9> for
62 tracing messages.
63
64 The default simply logs the message to STDERR.
65
66 =item @AnyEvent::MP::Kernel::WARN
67
68 All code references in this array are called for every log message, from
69 the default C<$WARN> handler. This is an easy way to tie into the log
70 messages without disturbing others.
71
72 =cut
73
74 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
75 our @WARN;
76 our $WARN = sub {
77 &$_ for @WARN;
78
79 return if $WARNLEVEL < $_[0];
80
81 my ($level, $msg) = @_;
82
83 $msg =~ s/\n$//;
84
85 printf STDERR "%s <%d> %s\n",
86 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
87 $level,
88 $msg;
89 };
90
91 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
92
93 The maximum level at which warning messages will be printed to STDERR by
94 the default warn handler.
95
96 =cut
97
98 sub load_func($) {
99 my $func = $_[0];
100
101 unless (defined &$func) {
102 my $pkg = $func;
103 do {
104 $pkg =~ s/::[^:]+$//
105 or return sub { die "unable to resolve function '$func'" };
106
107 local $@;
108 unless (eval "require $pkg; 1") {
109 my $error = $@;
110 $error =~ /^Can't locate .*.pm in \@INC \(/
111 or return sub { die $error };
112 }
113 } until defined &$func;
114 }
115
116 \&$func
117 }
118
119 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
120
121 sub nonce($) {
122 join "", map chr rand 256, 1 .. $_[0]
123 }
124
125 sub nonce62($) {
126 join "", map $alnum[rand 62], 1 .. $_[0]
127 }
128
129 sub gen_uniq {
130 my $now = AE::now;
131 (join "",
132 map $alnum[$_],
133 $$ / 62 % 62,
134 $$ % 62,
135 (int $now ) % 62,
136 (int $now * 100) % 62,
137 (int $now * 10000) % 62,
138 ) . nonce62 4;
139 }
140
141 our $CONFIG; # this node's configuration
142 our $SECURE = sub { 1 };
143
144 our $RUNIQ; # remote uniq value
145 our $UNIQ; # per-process/node unique cookie
146 our $NODE;
147 our $ID = "a";
148
149 our %NODE; # node id to transport mapping, or "undef", for local node
150 our (%PORT, %PORT_DATA); # local ports
151
152 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
153 our %LMON; # monitored _local_ ports
154
155 our $GLOBAL; # true if node is a global ("directory") node
156 our %LISTENER;
157 our $LISTENER; # our listeners, as arrayref
158
159 our $SRCNODE; # holds the sending node _object_ during _inject
160
161 sub _init_names {
162 # ~54 bits, for local port names, lowercase $ID appended
163 $UNIQ = gen_uniq;
164
165 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
166 $RUNIQ = nonce62 10;
167 $RUNIQ =~ s/(.)$/\U$1/;
168
169 $NODE = "anon/$RUNIQ";
170 }
171
172 _init_names;
173
174 sub NODE() {
175 $NODE
176 }
177
178 sub node_of($) {
179 my ($node, undef) = split /#/, $_[0], 2;
180
181 $node
182 }
183
184 BEGIN {
185 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
186 ? sub () { 1 }
187 : sub () { 0 };
188 }
189
190 our $DELAY_TIMER;
191 our @DELAY_QUEUE;
192
193 sub _delay_run {
194 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
195 }
196
197 sub delay($) {
198 push @DELAY_QUEUE, shift;
199 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
200 }
201
202 sub _inject {
203 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
204 &{ $PORT{+shift} or return };
205 }
206
207 # this function adds a node-ref, so you can send stuff to it
208 # it is basically the central routing component.
209 sub add_node {
210 my ($node) = @_;
211
212 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
213 }
214
215 sub snd(@) {
216 my ($nodeid, $portid) = split /#/, shift, 2;
217
218 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
219
220 defined $nodeid #d#UGLY
221 or Carp::croak "'undef' is not a valid node ID/port ID";
222
223 ($NODE{$nodeid} || add_node $nodeid)
224 ->{send} (["$portid", @_]);
225 }
226
227 =item $is_local = port_is_local $port
228
229 Returns true iff the port is a local port.
230
231 =cut
232
233 sub port_is_local($) {
234 my ($nodeid, undef) = split /#/, $_[0], 2;
235
236 $NODE{$nodeid} == $NODE{""}
237 }
238
239 =item snd_to_func $node, $func, @args
240
241 Expects a node ID and a name of a function. Asynchronously tries to call
242 this function with the given arguments on that node.
243
244 This function can be used to implement C<spawn>-like interfaces.
245
246 =cut
247
248 sub snd_to_func($$;@) {
249 my $nodeid = shift;
250
251 # on $NODE, we artificially delay... (for spawn)
252 # this is very ugly - maybe we should simply delay ALL messages,
253 # to avoid deep recursion issues. but that's so... slow...
254 $AnyEvent::MP::Node::Self::DELAY = 1
255 if $nodeid ne $NODE;
256
257 defined $nodeid #d#UGLY
258 or Carp::croak "'undef' is not a valid node ID/port ID";
259
260 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
261 }
262
263 =item snd_on $node, @msg
264
265 Executes C<snd> with the given C<@msg> (which must include the destination
266 port) on the given node.
267
268 =cut
269
270 sub snd_on($@) {
271 my $node = shift;
272 snd $node, snd => @_;
273 }
274
275 =item eval_on $node, $string[, @reply]
276
277 Evaluates the given string as Perl expression on the given node. When
278 @reply is specified, then it is used to construct a reply message with
279 C<"$@"> and any results from the eval appended.
280
281 =cut
282
283 sub eval_on($$;@) {
284 my $node = shift;
285 snd $node, eval => @_;
286 }
287
288 sub kil(@) {
289 my ($nodeid, $portid) = split /#/, shift, 2;
290
291 length $portid
292 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
293
294 ($NODE{$nodeid} || add_node $nodeid)
295 ->kill ("$portid", @_);
296 }
297
298 #############################################################################
299 # node monitoring and info
300
301 =item node_is_known $nodeid
302
303 #TODO#
304 Returns true iff the given node is currently known to this node.
305
306 =cut
307
308 sub node_is_known($) {
309 exists $NODE{$_[0]}
310 }
311
312 =item node_is_up $nodeid
313
314 Returns true if the given node is "up", that is, the kernel thinks it has
315 a working connection to it.
316
317 If the node is known (to this local node) but not currently connected,
318 returns C<0>. If the node is not known, returns C<undef>.
319
320 =cut
321
322 sub node_is_up($) {
323 ($NODE{$_[0]} or return)->{transport}
324 ? 1 : 0
325 }
326
327 =item up_nodes
328
329 Return the node IDs of all nodes that are currently connected (excluding
330 the node itself).
331
332 =cut
333
334 sub up_nodes() {
335 map $_->{id}, grep $_->{transport}, values %NODE
336 }
337
338 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
339
340 Registers a callback that is called each time a node goes up (a connection
341 is established) or down (the connection is lost).
342
343 Node up messages can only be followed by node down messages for the same
344 node, and vice versa.
345
346 Note that monitoring a node is usually better done by monitoring its node
347 port. This function is mainly of interest to modules that are concerned
348 about the network topology and low-level connection handling.
349
350 Callbacks I<must not> block and I<should not> send any messages.
351
352 The function returns an optional guard which can be used to unregister
353 the monitoring callback again.
354
355 Example: make sure you call function C<newnode> for all nodes that are up
356 or go up (and down).
357
358 newnode $_, 1 for up_nodes;
359 mon_nodes \&newnode;
360
361 =cut
362
363 our %MON_NODES;
364
365 sub mon_nodes($) {
366 my ($cb) = @_;
367
368 $MON_NODES{$cb+0} = $cb;
369
370 defined wantarray && Guard::guard { delete $MON_NODES{$cb+0} }
371 }
372
373 sub _inject_nodeevent($$;@) {
374 my ($node, $up, @reason) = @_;
375
376 for my $cb (values %MON_NODES) {
377 eval { $cb->($node->{id}, $up, @reason); 1 }
378 or $WARN->(1, $@);
379 }
380
381 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
382 }
383
384 #############################################################################
385 # self node code
386
387 sub _kill {
388 my $port = shift;
389
390 delete $PORT{$port}
391 or return; # killing nonexistent ports is O.K.
392 delete $PORT_DATA{$port};
393
394 my $mon = delete $LMON{$port}
395 or !@_
396 or $WARN->(2, "unmonitored local port $port died with reason: @_");
397
398 $_->(@_) for values %$mon;
399 }
400
401 sub _monitor {
402 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
403 unless exists $PORT{$_[1]};
404
405 $LMON{$_[1]}{$_[2]+0} = $_[2];
406 }
407
408 sub _unmonitor {
409 delete $LMON{$_[1]}{$_[2]+0}
410 if exists $LMON{$_[1]};
411 }
412
413 sub _secure_check {
414 $SECURE->($SRCNODE->{id})
415 or die "remote execution attempt by insecure node\n";
416 }
417
418 our %NODE_REQ = (
419 # internal services
420
421 # monitoring
422 mon0 => sub { # stop monitoring a port for another node
423 my $portid = shift;
424 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
425 },
426 mon1 => sub { # start monitoring a port for another node
427 my $portid = shift;
428 Scalar::Util::weaken (my $node = $SRCNODE);
429 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
430 delete $node->{rmon}{$portid};
431 $node->send (["", kil0 => $portid, @_])
432 if $node && $node->{transport};
433 };
434 },
435 # another node has killed a monitored port
436 kil0 => sub {
437 my $cbs = delete $SRCNODE->{lmon}{+shift}
438 or return;
439
440 $_->(@_) for @$cbs;
441 },
442
443 # "public" services - not actually public
444
445 # another node wants to kill a local port
446 kil => \&_kill,
447
448 # relay message to another node / generic echo
449 snd => \&snd,
450 snd_multiple => sub {
451 snd @$_ for @_
452 },
453
454 # random utilities
455 eval => sub {
456 &_secure_check;
457 my @res = do { package main; eval shift };
458 snd @_, "$@", @res if @_;
459 },
460 time => sub {
461 snd @_, AE::now;
462 },
463 devnull => sub {
464 #
465 },
466 "" => sub {
467 # empty messages are keepalives or similar devnull-applications
468 },
469 );
470
471 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
472 $PORT{""} = sub {
473 my $tag = shift;
474 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
475 $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@;
476 };
477
478 #############################################################################
479 # seed management, try to keep connections to all seeds at all times
480
481 our %SEED_NODE; # seed ID => node ID|undef
482 our %NODE_SEED; # map node ID to seed ID
483 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
484 our $SEED_WATCHER;
485 our $SEED_RETRY;
486
487 sub seed_connect {
488 my ($seed) = @_;
489
490 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
491 or Carp::croak "$seed: unparsable seed address";
492
493 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
494
495 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
496 $host, $port,
497 on_greeted => sub {
498 # called after receiving remote greeting, learn remote node name
499
500 # we rely on untrusted data here (the remote node name) this is
501 # hopefully ok, as this can at most be used for DOSing, which is easy
502 # when you can do MITM anyway.
503
504 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
505 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
506 require AnyEvent::MP::Global; # every seed becomes a global node currently
507 delete $SEED_NODE{$seed};
508 delete $NODE_SEED{$seed};
509 } else {
510 $SEED_NODE{$seed} = $_[0]{remote_node};
511 $NODE_SEED{$_[0]{remote_node}} = $seed;
512 }
513 },
514 on_destroy => sub {
515 delete $SEED_CONNECT{$seed};
516 },
517 sub {
518 $SEED_CONNECT{$seed} = 1;
519 }
520 ;
521 }
522
523 sub seed_all {
524 my @seeds = grep {
525 !exists $SEED_CONNECT{$_}
526 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
527 } keys %SEED_NODE;
528
529 if (@seeds) {
530 # start connection attempt for every seed we are not connected to yet
531 seed_connect $_
532 for @seeds;
533
534 $SEED_RETRY = $SEED_RETRY * 2 + rand;
535 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
536 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
537
538 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
539
540 } else {
541 # all seeds connected or connecting, no need to restart timer
542 undef $SEED_WATCHER;
543 }
544 }
545
546 sub seed_again {
547 $SEED_RETRY = 1;
548 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
549 }
550
551 # sets new seed list, starts connecting
552 sub set_seeds(@) {
553 %SEED_NODE = ();
554 %NODE_SEED = ();
555 %SEED_CONNECT = ();
556
557 @SEED_NODE{@_} = ();
558
559 seed_again;#d#
560 seed_all;
561 }
562
563 mon_nodes sub {
564 # if we lost the connection to a seed node, make sure we are seeding
565 seed_again
566 if !$_[1] && exists $NODE_SEED{$_[0]};
567 };
568
569 #############################################################################
570 # talk with/to global nodes
571
572 # protocol messages:
573 #
574 # sent by all slave nodes (slave to master)
575 # g_slave database - make other global node master of the sender
576 #
577 # sent by any node to global nodes
578 # g_set database - set whole database
579 # g_add family key val - add/replace key to database
580 # g_del family key - delete key from database
581 # g_get family key reply... - send reply with data
582 #
583 # send by global nodes
584 # g_global - node became global, similar to global=1 greeting
585 #
586 # database families
587 # "'l" -> node -> listeners
588 # "'g" -> node -> undef
589 # ...
590 #
591
592 # used on all nodes:
593 our $MASTER; # the global node we bind ourselves to, unless we are global ourselves
594 our $MASTER_MON;
595 our %LOCAL_DB; # this node database
596
597 our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
598
599 #############################################################################
600 # master selection
601
602 # master requests
603 our %GLOBAL_REQ; # $id => \@req
604
605 sub global_req_add {
606 my ($id, $req) = @_;
607
608 return if exists $GLOBAL_REQ{$id};
609
610 $GLOBAL_REQ{$id} = $req;
611
612 snd $MASTER, @$req
613 if $MASTER;
614 }
615
616 sub global_req_del {
617 delete $GLOBAL_REQ{$_[0]};
618 }
619
620 sub g_find {
621 global_req_add "g_find $_[0]", [g_find => $_[0]];
622 }
623
624 # reply for g_find started in Node.pm
625 $NODE_REQ{g_found} = sub {
626 global_req_del "g_find $_[0]";
627
628 my $node = $NODE{$_[0]} or return;
629
630 $node->connect_to ($_[1]);
631 };
632
633 sub master_set {
634 $MASTER = $_[0];
635
636 snd $MASTER, g_slave => \%LOCAL_DB;
637
638 # (re-)send queued requests
639 snd $MASTER, @$_
640 for values %GLOBAL_REQ;
641 }
642
643 sub master_search {
644 #TODO: should also look for other global nodes, but we don't know them #d#
645 for (keys %NODE_SEED) {
646 if (node_is_up $_) {
647 master_set $_;
648 return;
649 }
650 }
651
652 $MASTER_MON = mon_nodes sub {
653 return unless $_[1]; # we are only interested in node-ups
654 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
655
656 master_set $_[0];
657
658 $MASTER_MON = mon_nodes sub {
659 if ($_[0] eq $MASTER && !$_[1]) {
660 undef $MASTER;
661 master_search ();
662 }
663 };
664 };
665 }
666
667 # other node wants to make us the master
668 $NODE_REQ{g_slave} = sub {
669 my ($db) = @_;
670
671 # load global module and redo the request
672 require AnyEvent::MP::Global;
673 &{ $NODE_REQ{g_slave} }
674 };
675
676 #############################################################################
677 # local database operations
678
679 # local database management
680 sub db_set($$$) {
681 $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
682 snd $MASTER, g_add => $_[0] => $_[1] => $_[2]
683 if defined $MASTER;
684 }
685
686 sub db_del($$) {
687 delete $LOCAL_DB{$_[0]}{$_[1]};
688 snd $MASTER, g_del => $_[0] => $_[1]
689 if defined $MASTER;
690 }
691
692 sub db_reg($$;$) {
693 my ($family, $key) = @_;
694 &db_set;
695 Guard::guard { db_del $family => $key }
696 }
697
698 sub db_keys($$$) {
699 #d#
700 }
701
702 #d# db_values
703 #d# db_family
704 #d# db_key
705
706 our %LOCAL_MON; # f, reply
707 our %MON_DB; # f, k, value
708
709 sub g_chg {
710 my $f = shift;
711
712 $#_ ? $MON_DB{$f}{$_[0]} = $_[1]
713 : delete $MON_DB{$f}{$_[0]};
714
715 &{ $_->[0] }
716 for values %{ $LOCAL_MON{$f} };
717 }
718
719 sub db_mon($@) {
720 my ($family, @reply) = @_;
721
722 my $reply = \@reply;
723 my $id = $reply + 0;
724
725 if (%{ $LOCAL_MON{$family} }) {
726 # if we already monitor this thingy, generate
727 # create events for all of them
728 while (my ($key, $value) = each %{ $MON_DB{$family} }) {
729 $reply->[0]->($key, $value);
730 }
731 } else {
732 # new monitor, request chg1 from upstream
733 global_req_add "mon1 $family" => [g_mon1 => $family];
734 $MON_DB{$family} = {};
735 }
736
737 $LOCAL_MON{$family}{$id} = \@reply;
738
739 Guard::guard {
740 my $mon = $LOCAL_MON{$family};
741 delete $mon->{$id};
742
743 unless (%$mon) {
744 global_req_del "mon1 $family";
745
746 # no global_req, because we don't care if we are not connected
747 snd $MASTER, g_mon0 => $family
748 if $MASTER;
749
750 delete $LOCAL_MON{$family};
751 delete $MON_DB{$family};
752 }
753 }
754 }
755
756 # full update
757 $NODE_REQ{g_chg1} = sub {
758 my ($f, $db) = @_;
759
760 # add or replace keys
761 while (my ($k, $v) = each %$db) {
762 g_chg $f, $k, $v;
763 }
764
765 # delete keys that are no longer present
766 for (keys %{ $MON_DB{$f} }) {
767 g_chg $f, $_
768 unless exists $db->{$_};
769 }
770 };
771
772 # incremental update
773 $NODE_REQ{g_chg2} = \&g_chg;
774
775 #############################################################################
776 # configure
777
778 sub nodename {
779 require POSIX;
780 (POSIX::uname ())[1]
781 }
782
783 sub _resolve($) {
784 my ($nodeid) = @_;
785
786 my $cv = AE::cv;
787 my @res;
788
789 $cv->begin (sub {
790 my %seen;
791 my @refs;
792 for (sort { $a->[0] <=> $b->[0] } @res) {
793 push @refs, $_->[1] unless $seen{$_->[1]}++
794 }
795 shift->send (@refs);
796 });
797
798 my $idx;
799 for my $t (split /,/, $nodeid) {
800 my $pri = ++$idx;
801
802 $t = length $t ? nodename . ":$t" : nodename
803 if $t =~ /^\d*$/;
804
805 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
806 or Carp::croak "$t: unparsable transport descriptor";
807
808 $port = "0" if $port eq "*";
809
810 if ($host eq "*") {
811 $cv->begin;
812 # use fork_call, as Net::Interface is big, and we need it rarely.
813 require AnyEvent::Util;
814 AnyEvent::Util::fork_call (
815 sub {
816 my @addr;
817
818 require Net::Interface;
819
820 for my $if (Net::Interface->interfaces) {
821 # we statically lower-prioritise ipv6 here, TODO :()
822 for $_ ($if->address (Net::Interface::AF_INET ())) {
823 next if /^\x7f/; # skip localhost etc.
824 push @addr, $_;
825 }
826 for ($if->address (Net::Interface::AF_INET6 ())) {
827 #next if $if->scope ($_) <= 2;
828 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
829 push @addr, $_;
830 }
831
832 }
833 @addr
834 }, sub {
835 for my $ip (@_) {
836 push @res, [
837 $pri += 1e-5,
838 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
839 ];
840 }
841 $cv->end;
842 }
843 );
844 } else {
845 $cv->begin;
846 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
847 for (@_) {
848 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
849 push @res, [
850 $pri += 1e-5,
851 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
852 ];
853 }
854 $cv->end;
855 };
856 }
857 }
858
859 $cv->end;
860
861 $cv
862 }
863
864 sub configure(@) {
865 unshift @_, "profile" if @_ & 1;
866 my (%kv) = @_;
867
868 delete $NODE{$NODE}; # we do not support doing stuff before configure
869 _init_names;
870
871 my $profile = delete $kv{profile};
872
873 $profile = nodename
874 unless defined $profile;
875
876 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
877
878 if (exists $CONFIG->{secure}) {
879 my $pass = !$CONFIG->{secure};
880 $SECURE = sub { $pass };
881 }
882
883 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
884
885 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
886
887 $NODE = $node;
888
889 $NODE =~ s/%n/nodename/ge;
890
891 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
892 # nodes with randomised node names do not need randomised port names
893 $UNIQ = "";
894 }
895
896 $NODE{$NODE} = $NODE{""};
897 $NODE{$NODE}{id} = $NODE;
898
899 my $seeds = $CONFIG->{seeds};
900 my $binds = $CONFIG->{binds};
901
902 $binds ||= ["*"];
903
904 $WARN->(8, "node $NODE starting up.");
905
906 $LISTENER = [];
907 %LISTENER = ();
908
909 for (map _resolve $_, @$binds) {
910 for my $bind ($_->recv) {
911 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
912 or Carp::croak "$bind: unparsable local bind address";
913
914 my $listener = AnyEvent::MP::Transport::mp_server
915 $host,
916 $port,
917 prepare => sub {
918 my (undef, $host, $port) = @_;
919 $bind = AnyEvent::Socket::format_hostport $host, $port;
920 0
921 },
922 ;
923 $LISTENER{$bind} = $listener;
924 push @$LISTENER, $bind;
925 }
926 }
927
928 db_set "'l" => $NODE => $LISTENER;
929
930 $WARN->(8, "node listens on [@$LISTENER].");
931
932 # connect to all seednodes
933 set_seeds map $_->recv, map _resolve $_, @$seeds;
934
935 master_search;
936
937 if ($NODE eq "atha") {;#d#
938 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
939 }
940
941 for (@{ $CONFIG->{services} }) {
942 if (ref) {
943 my ($func, @args) = @$_;
944 (load_func $func)->(@args);
945 } elsif (s/::$//) {
946 eval "require $_";
947 die $@ if $@;
948 } else {
949 (load_func $_)->();
950 }
951 }
952 }
953
954 =back
955
956 =head1 SEE ALSO
957
958 L<AnyEvent::MP>.
959
960 =head1 AUTHOR
961
962 Marc Lehmann <schmorp@schmorp.de>
963 http://home.schmorp.de/
964
965 =cut
966
967 1
968