ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.72
Committed: Wed Feb 29 18:44:59 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.71: +58 -95 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT_OK = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 );
41
42 our @EXPORT = qw(
43 add_node load_func snd_to_func snd_on eval_on
44
45 NODE $NODE node_of snd kil port_is_local
46 configure
47 up_nodes mon_nodes node_is_up
48 );
49
50 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
51
52 This value is called with an error or warning message, when e.g. a
53 connection could not be created, authorisation failed and so on.
54
55 It I<must not> block or send messages -queue it and use an idle watcher if
56 you need to do any of these things.
57
58 C<$level> should be C<0> for messages to be logged always, C<1> for
59 unexpected messages and errors, C<2> for warnings, C<7> for messages about
60 node connectivity and services, C<8> for debugging messages and C<9> for
61 tracing messages.
62
63 The default simply logs the message to STDERR.
64
65 =item @AnyEvent::MP::Kernel::WARN
66
67 All code references in this array are called for every log message, from
68 the default C<$WARN> handler. This is an easy way to tie into the log
69 messages without disturbing others.
70
71 =cut
72
73 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
74 our @WARN;
75 our $WARN = sub {
76 &$_ for @WARN;
77
78 return if $WARNLEVEL < $_[0];
79
80 my ($level, $msg) = @_;
81
82 $msg =~ s/\n$//;
83
84 printf STDERR "%s <%d> %s\n",
85 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
86 $level,
87 $msg;
88 };
89
90 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
91
92 The maximum level at which warning messages will be printed to STDERR by
93 the default warn handler.
94
95 =cut
96
97 sub load_func($) {
98 my $func = $_[0];
99
100 unless (defined &$func) {
101 my $pkg = $func;
102 do {
103 $pkg =~ s/::[^:]+$//
104 or return sub { die "unable to resolve function '$func'" };
105
106 local $@;
107 unless (eval "require $pkg; 1") {
108 my $error = $@;
109 $error =~ /^Can't locate .*.pm in \@INC \(/
110 or return sub { die $error };
111 }
112 } until defined &$func;
113 }
114
115 \&$func
116 }
117
118 sub nonce($) {
119 my $nonce;
120
121 if (open my $fh, "</dev/urandom") {
122 sysread $fh, $nonce, $_[0];
123 } else {
124 # shit...
125 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
126 }
127
128 $nonce
129 }
130
131 sub alnumbits($) {
132 my $data = $_[0];
133
134 if (eval "use Math::GMP 2.05; 1") {
135 $data = Math::GMP::get_str_gmp (
136 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
137 62
138 );
139 } else {
140 $data = MIME::Base64::encode_base64 $data, "";
141 $data =~ s/=//;
142 $data =~ s/x/x0/g;
143 $data =~ s/\//x1/g;
144 $data =~ s/\+/x2/g;
145 }
146
147 $data
148 }
149
150 sub gen_uniq {
151 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
152 }
153
154 our $CONFIG; # this node's configuration
155
156 our $RUNIQ; # remote uniq value
157 our $UNIQ; # per-process/node unique cookie
158 our $NODE;
159 our $ID = "a";
160
161 our %NODE; # node id to transport mapping, or "undef", for local node
162 our (%PORT, %PORT_DATA); # local ports
163
164 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
165 our %LMON; # monitored _local_ ports
166
167 our $GLOBAL; # true if node is a global ("directory") node
168 our %LISTENER;
169 our $LISTENER; # our listeners, as arrayref
170
171 our $SRCNODE; # holds the sending node during _inject
172
173 sub _init_names {
174 $RUNIQ = alnumbits nonce 96/8;
175 $UNIQ = gen_uniq;
176 $NODE = "anon/$RUNIQ";
177 }
178
179 _init_names;
180
181 sub NODE() {
182 $NODE
183 }
184
185 sub node_of($) {
186 my ($node, undef) = split /#/, $_[0], 2;
187
188 $node
189 }
190
191 BEGIN {
192 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
193 ? sub () { 1 }
194 : sub () { 0 };
195 }
196
197 our $DELAY_TIMER;
198 our @DELAY_QUEUE;
199
200 sub _delay_run {
201 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
202 }
203
204 sub delay($) {
205 push @DELAY_QUEUE, shift;
206 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
207 }
208
209 sub _inject {
210 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
211 &{ $PORT{+shift} or return };
212 }
213
214 # this function adds a node-ref, so you can send stuff to it
215 # it is basically the central routing component.
216 sub add_node {
217 my ($node) = @_;
218
219 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
220 }
221
222 sub snd(@) {
223 my ($nodeid, $portid) = split /#/, shift, 2;
224
225 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
226
227 defined $nodeid #d#UGLY
228 or Carp::croak "'undef' is not a valid node ID/port ID";
229
230 ($NODE{$nodeid} || add_node $nodeid)
231 ->{send} (["$portid", @_]);
232 }
233
234 =item $is_local = port_is_local $port
235
236 Returns true iff the port is a local port.
237
238 =cut
239
240 sub port_is_local($) {
241 my ($nodeid, undef) = split /#/, $_[0], 2;
242
243 $NODE{$nodeid} == $NODE{""}
244 }
245
246 =item snd_to_func $node, $func, @args
247
248 Expects a node ID and a name of a function. Asynchronously tries to call
249 this function with the given arguments on that node.
250
251 This function can be used to implement C<spawn>-like interfaces.
252
253 =cut
254
255 sub snd_to_func($$;@) {
256 my $nodeid = shift;
257
258 # on $NODE, we artificially delay... (for spawn)
259 # this is very ugly - maybe we should simply delay ALL messages,
260 # to avoid deep recursion issues. but that's so... slow...
261 $AnyEvent::MP::Node::Self::DELAY = 1
262 if $nodeid ne $NODE;
263
264 defined $nodeid #d#UGLY
265 or Carp::croak "'undef' is not a valid node ID/port ID";
266
267 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
268 }
269
270 =item snd_on $node, @msg
271
272 Executes C<snd> with the given C<@msg> (which must include the destination
273 port) on the given node.
274
275 =cut
276
277 sub snd_on($@) {
278 my $node = shift;
279 snd $node, snd => @_;
280 }
281
282 =item eval_on $node, $string[, @reply]
283
284 Evaluates the given string as Perl expression on the given node. When
285 @reply is specified, then it is used to construct a reply message with
286 C<"$@"> and any results from the eval appended.
287
288 =cut
289
290 sub eval_on($$;@) {
291 my $node = shift;
292 snd $node, eval => @_;
293 }
294
295 sub kil(@) {
296 my ($nodeid, $portid) = split /#/, shift, 2;
297
298 length $portid
299 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
300
301 ($NODE{$nodeid} || add_node $nodeid)
302 ->kill ("$portid", @_);
303 }
304
305 #############################################################################
306 # node monitoring and info
307
308 =item node_is_known $nodeid
309
310 #TODO#
311 Returns true iff the given node is currently known to this node.
312
313 =cut
314
315 sub node_is_known($) {
316 exists $NODE{$_[0]}
317 }
318
319 =item node_is_up $nodeid
320
321 Returns true if the given node is "up", that is, the kernel thinks it has
322 a working connection to it.
323
324 If the node is known (to this local node) but not currently connected,
325 returns C<0>. If the node is not known, returns C<undef>.
326
327 =cut
328
329 sub node_is_up($) {
330 ($NODE{$_[0]} or return)->{transport}
331 ? 1 : 0
332 }
333
334 =item known_nodes
335
336 #TODO#
337 Returns the node IDs of all nodes currently known to this node, including
338 itself and nodes not currently connected.
339
340 =cut
341
342 sub known_nodes() {
343 map $_->{id}, values %NODE
344 }
345
346 =item up_nodes
347
348 Return the node IDs of all nodes that are currently connected (excluding
349 the node itself).
350
351 =cut
352
353 sub up_nodes() {
354 map $_->{id}, grep $_->{transport}, values %NODE
355 }
356
357 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
358
359 Registers a callback that is called each time a node goes up (a connection
360 is established) or down (the connection is lost).
361
362 Node up messages can only be followed by node down messages for the same
363 node, and vice versa.
364
365 Note that monitoring a node is usually better done by monitoring its node
366 port. This function is mainly of interest to modules that are concerned
367 about the network topology and low-level connection handling.
368
369 Callbacks I<must not> block and I<should not> send any messages.
370
371 The function returns an optional guard which can be used to unregister
372 the monitoring callback again.
373
374 Example: make sure you call function C<newnode> for all nodes that are up
375 or go up (and down).
376
377 newnode $_, 1 for up_nodes;
378 mon_nodes \&newnode;
379
380 =cut
381
382 our %MON_NODES;
383
384 sub mon_nodes($) {
385 my ($cb) = @_;
386
387 $MON_NODES{$cb+0} = $cb;
388
389 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
390 }
391
392 sub _inject_nodeevent($$;@) {
393 my ($node, $up, @reason) = @_;
394
395 for my $cb (values %MON_NODES) {
396 eval { $cb->($node->{id}, $up, @reason); 1 }
397 or $WARN->(1, $@);
398 }
399
400 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
401 }
402
403 #############################################################################
404 # self node code
405
406 sub _kill {
407 my $port = shift;
408
409 delete $PORT{$port}
410 or return; # killing nonexistent ports is O.K.
411 delete $PORT_DATA{$port};
412
413 my $mon = delete $LMON{$port}
414 or !@_
415 or $WARN->(2, "unmonitored local port $port died with reason: @_");
416
417 $_->(@_) for values %$mon;
418 }
419
420 sub _monitor {
421 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
422 unless exists $PORT{$_[1]};
423
424 $LMON{$_[1]}{$_[2]+0} = $_[2];
425 }
426
427 sub _unmonitor {
428 delete $LMON{$_[1]}{$_[2]+0}
429 if exists $LMON{$_[1]};
430 }
431
432 our %node_req = (
433 # internal services
434
435 # monitoring
436 mon0 => sub { # stop monitoring a port for another node
437 my $portid = shift;
438 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
439 },
440 mon1 => sub { # start monitoring a port for another node
441 my $portid = shift;
442 Scalar::Util::weaken (my $node = $SRCNODE);
443 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
444 delete $node->{rmon}{$portid};
445 $node->send (["", kil0 => $portid, @_])
446 if $node && $node->{transport};
447 };
448 },
449 # another node has killed a monitored port
450 kil0 => sub {
451 my $cbs = delete $SRCNODE->{lmon}{+shift}
452 or return;
453
454 $_->(@_) for @$cbs;
455 },
456
457 # "public" services - not actually public
458
459 # another node wants to kill a local port
460 kil => \&_kill,
461
462 # relay message to another node / generic echo
463 snd => \&snd,
464 snd_multiple => sub {
465 snd @$_ for @_
466 },
467
468 # informational
469 info => sub {
470 snd @_, $NODE;
471 },
472 known_nodes => sub {
473 snd @_, known_nodes;
474 },
475 up_nodes => sub {
476 snd @_, up_nodes;
477 },
478
479 # random utilities
480 eval => sub {
481 my @res = do { package main; eval shift };
482 snd @_, "$@", @res if @_;
483 },
484 time => sub {
485 snd @_, AE::time;
486 },
487 devnull => sub {
488 #
489 },
490 "" => sub {
491 # empty messages are keepalives or similar devnull-applications
492 },
493 );
494
495 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
496 $PORT{""} = sub {
497 my $tag = shift;
498 eval { &{ $node_req{$tag} ||= load_func $tag } };
499 $WARN->(2, "error processing node message: $@") if $@;
500 };
501
502 #############################################################################
503 # seed management, try to keep connections to all seeds at all times
504
505 our %SEED_NODE; # seed ID => node ID|undef
506 our %NODE_SEED; # map node ID to seed ID
507 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
508 our $SEED_WATCHER;
509 our $SEED_RETRY;
510
511 sub seed_connect {
512 my ($seed) = @_;
513
514 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
515 or Carp::croak "$seed: unparsable seed address";
516
517 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
518
519 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
520 $host, $port,
521 on_greeted => sub {
522 # called after receiving remote greeting, learn remote node name
523
524 # we rely on untrusted data here (the remote node name) this is
525 # hopefully ok, as this can at most be used for DOSing, which is easy
526 # when you can do MITM anyway.
527
528 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
529 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
530 require AnyEvent::MP::Global; # every seed becomes a global node currently
531 delete $SEED_NODE{$seed};
532 delete $NODE_SEED{$seed};
533 } else {
534 $SEED_NODE{$seed} = $_[0]{remote_node};
535 $NODE_SEED{$_[0]{remote_node}} = $seed;
536 }
537 },
538 on_destroy => sub {
539 delete $SEED_CONNECT{$seed};
540 },
541 sub {
542 $SEED_CONNECT{$seed} = 1;
543 }
544 ;
545 }
546
547 sub seed_all {
548 # my $next = List::Util::max 1,
549 # $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
550 # * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
551 # - rand;
552
553 my @seeds = grep {
554 !exists $SEED_CONNECT{$_}
555 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
556 } keys %SEED_NODE;
557
558 if (@seeds) {
559 # start connection attempt for every seed we are not connected to yet
560 seed_connect $_
561 for @seeds;
562
563 $SEED_RETRY = $SEED_RETRY * 2 + rand;
564 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
565 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
566
567 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
568
569 } else {
570 # all seeds connected or connecting, no need to restart timer
571 undef $SEED_WATCHER;
572 }
573 }
574
575 sub seed_again {
576 $SEED_RETRY = 1;
577 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
578 }
579
580 # sets new seed list, starts connecting
581 sub set_seeds(@) {
582 %SEED_NODE = ();
583 %NODE_SEED = ();
584 %SEED_CONNECT = ();
585
586 @SEED_NODE{@_} = ();
587
588 seed_again;#d#
589 seed_all;
590 }
591
592 mon_nodes sub {
593 # if we lost the connection to a seed node, make sure we are seeding
594 seed_again
595 if !$_[1] && exists $NODE_SEED{$_[0]};
596 };
597
598 #############################################################################
599 # talk with/to global nodes
600
601 # protocol messages:
602 #
603 # sent by all nodes
604 # g_reg listeners - register node at all global nodes
605 # g_slave - make other global node master of the sender
606 # g_db_get - request global node database
607 # g_find node id - g_reply with listeners
608 # g_reply - used for rpc (global_req function)
609 #
610 # sent by global nodes (master to slave)
611 # g_db_set db - master sets global database in slave
612 # g_add node listen - add node to global database
613 # g_del - remove node from global database
614 #
615
616 our $NODE_ADDR; # node => [listeners] - contains global nodes or, for global nodes, all nodes
617 our $GLOBAL_ADDR;
618 our $GLOBAL_MON;
619 our $GLOBAL_TIMER;
620 our $MASTER; # the global node we use for lookups ourselves
621
622 # master requests
623 our %GLOBAL_REQ; # [$cb, \@req]
624
625 sub global_req {
626 my $cb = pop;
627
628 $GLOBAL_REQ{$cb+0} = [$cb, [@_]];
629
630 snd $MASTER, @_, $cb+0
631 if $MASTER;
632
633 defined wantarray && AnyEvent::Util::guard { delete $GLOBAL_REQ{$cb+0} }
634 }
635
636 sub master_set {
637 $MASTER = $_[0];
638
639 snd $MASTER, "g_slave";
640
641 # send queued requests
642 while (my ($k, $v) = each %GLOBAL_REQ) {
643 snd $MASTER, @{$v->[1]}, $k;
644 }
645 }
646
647 $node_req{g_reply} = sub {
648 my $id = shift;
649
650 my $cb = delete $GLOBAL_REQ{$id}
651 or return;
652
653 $cb->[0]->(@_);
654 };
655
656 {#d#
657 # global code
658
659 our %GLOBAL_SLAVE;
660
661 # other node wants to make us the master
662 $node_req{g_slave} = sub {
663 require AnyEvent::MP::Global;
664
665 undef $GLOBAL_SLAVE{$SRCNODE->{id}};
666 snd $SRCNODE->{id}, g_set => $GLOBAL_ADDR;
667 };
668
669 $node_req{g_find} = sub {
670 my ($node, $id) = @_;
671
672 snd $SRCNODE->{id}, g_reply => $id, $NODE_ADDR->{$node};
673 };
674
675 # delete slaves on node-down
676 mon_nodes sub {
677 return if $_[1];
678
679 delete $GLOBAL_SLAVE{$_[0]};
680 };
681
682 }#d#
683
684 $node_req{g_set} = sub {
685 $GLOBAL_ADDR = shift;
686 $GLOBAL_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
687 (add_node $_)->connect
688 for keys %$GLOBAL_ADDR;
689 };
690 };
691
692 sub master_search {
693 for (keys %NODE_SEED, keys %$GLOBAL_ADDR) {
694 if (node_is_up $_) {
695 master_set $_;
696 return;
697 }
698 }
699
700 $GLOBAL_MON = mon_nodes sub {
701 return unless $_[1]; # we are only interested in node-ups
702 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
703
704 master_set $_[0];
705
706 $GLOBAL_MON = mon_nodes sub {
707 if ($_[0] eq $MASTER && !$_[1]) {
708 undef $MASTER;
709 master_search ();
710 }
711 };
712 };
713 }
714
715 # register remote node listeners
716 $node_req{g_reg} = sub {
717 if ($GLOBAL) {
718 $NODE_ADDR->{$SRCNODE->{id}} = $_[0];
719 } else {
720 # should not happen normally, except when there are bad races
721 $WARN->(1, "$SRCNODE->{id} treats us as global node, but we aren't");
722 }
723 };
724
725 # also used
726 $node_req{g_add} = sub {
727 my ($node, $listener) = @_;
728
729 # no-op if we already know the info
730 return
731 if (join "\x00", @$listener) eq (join "\x00", @{ $GLOBAL_ADDR->{$node} });
732
733 $GLOBAL_ADDR->{$node} = $listener;
734
735 # let us be a good citizen and register (also connects)
736 snd $node, g_reg => $LISTENER;
737
738 # if we are global, we update all our slaves
739 our %GLOBAL_SLAVE; # ugh, will be in AnyEvent::MP::Global
740 snd $_, g_add => $node, $listener
741 for keys %GLOBAL_SLAVE;
742 };
743
744 $node_req{g_del} = sub {
745 my ($node) = @_;
746
747 delete $GLOBAL_ADDR->{$node};
748 };
749
750 # tell every global node our listeners
751 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
752 snd $_[0]{remote_node}, g_reg => $LISTENER
753 if $_[0]{remote_greeting}{global};
754 };
755
756 master_search;
757
758 #############################################################################
759 # configure
760
761 sub _nodename {
762 require POSIX;
763 (POSIX::uname ())[1]
764 }
765
766 sub _resolve($) {
767 my ($nodeid) = @_;
768
769 my $cv = AE::cv;
770 my @res;
771
772 $cv->begin (sub {
773 my %seen;
774 my @refs;
775 for (sort { $a->[0] <=> $b->[0] } @res) {
776 push @refs, $_->[1] unless $seen{$_->[1]}++
777 }
778 shift->send (@refs);
779 });
780
781 my $idx;
782 for my $t (split /,/, $nodeid) {
783 my $pri = ++$idx;
784
785 $t = length $t ? _nodename . ":$t" : _nodename
786 if $t =~ /^\d*$/;
787
788 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
789 or Carp::croak "$t: unparsable transport descriptor";
790
791 $port = "0" if $port eq "*";
792
793 if ($host eq "*") {
794 $cv->begin;
795 # use fork_call, as Net::Interface is big, and we need it rarely.
796 require AnyEvent::Util;
797 AnyEvent::Util::fork_call (
798 sub {
799 my @addr;
800
801 require Net::Interface;
802
803 for my $if (Net::Interface->interfaces) {
804 # we statically lower-prioritise ipv6 here, TODO :()
805 for $_ ($if->address (Net::Interface::AF_INET ())) {
806 next if /^\x7f/; # skip localhost etc.
807 push @addr, $_;
808 }
809 for ($if->address (Net::Interface::AF_INET6 ())) {
810 #next if $if->scope ($_) <= 2;
811 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
812 push @addr, $_;
813 }
814
815 }
816 @addr
817 }, sub {
818 for my $ip (@_) {
819 push @res, [
820 $pri += 1e-5,
821 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
822 ];
823 }
824 $cv->end;
825 }
826 );
827 } else {
828 $cv->begin;
829 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
830 for (@_) {
831 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
832 push @res, [
833 $pri += 1e-5,
834 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
835 ];
836 }
837 $cv->end;
838 };
839 }
840 }
841
842 $cv->end;
843
844 $cv
845 }
846
847 sub configure(@) {
848 unshift @_, "profile" if @_ & 1;
849 my (%kv) = @_;
850
851 delete $NODE{$NODE}; # we do not support doing stuff before configure
852 _init_names;
853
854 my $profile = delete $kv{profile};
855
856 $profile = _nodename
857 unless defined $profile;
858
859 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
860
861 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
862
863 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
864
865 $NODE = $node;
866 $NODE =~ s%/$%/$RUNIQ%;
867
868 $NODE{$NODE} = $NODE{""};
869 $NODE{$NODE}{id} = $NODE;
870
871 my $seeds = $CONFIG->{seeds};
872 my $binds = $CONFIG->{binds};
873
874 $binds ||= ["*"];
875
876 $WARN->(8, "node $NODE starting up.");
877
878 $LISTENER = [];
879 %LISTENER = ();
880
881 for (map _resolve $_, @$binds) {
882 for my $bind ($_->recv) {
883 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
884 or Carp::croak "$bind: unparsable local bind address";
885
886 my $listener = AnyEvent::MP::Transport::mp_server
887 $host,
888 $port,
889 prepare => sub {
890 my (undef, $host, $port) = @_;
891 $bind = AnyEvent::Socket::format_hostport $host, $port;
892 0
893 },
894 ;
895 $LISTENER{$bind} = $listener;
896 push @$LISTENER, $bind;
897 }
898 }
899
900 $WARN->(8, "node listens on [@$LISTENER].");
901
902 # the global service is mandatory currently
903 #require AnyEvent::MP::Global;
904
905 # connect to all seednodes
906 set_seeds map $_->recv, map _resolve $_, @$seeds;
907
908 if ($NODE eq "atha") {;#d#
909 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
910 }
911
912 for (@{ $CONFIG->{services} }) {
913 if (ref) {
914 my ($func, @args) = @$_;
915 (load_func $func)->(@args);
916 } elsif (s/::$//) {
917 eval "require $_";
918 die $@ if $@;
919 } else {
920 (load_func $_)->();
921 }
922 }
923 }
924
925 =back
926
927 =head1 SEE ALSO
928
929 L<AnyEvent::MP>.
930
931 =head1 AUTHOR
932
933 Marc Lehmann <schmorp@schmorp.de>
934 http://home.schmorp.de/
935
936 =cut
937
938 1
939