ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP/Kernel.pm
Revision: 1.81
Committed: Sat Mar 3 19:43:41 2012 UTC (12 years, 4 months ago) by root
Branch: MAIN
Changes since 1.80: +55 -21 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29
30 use AnyEvent ();
31 use Guard ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT_OK = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 );
41
42 our @EXPORT = qw(
43 add_node load_func snd_to_func snd_on eval_on
44
45 NODE $NODE node_of snd kil port_is_local
46 configure
47 up_nodes mon_nodes node_is_up
48 db_set db_del db_reg
49 );
50
51 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
52
53 This value is called with an error or warning message, when e.g. a
54 connection could not be created, authorisation failed and so on.
55
56 It I<must not> block or send messages -queue it and use an idle watcher if
57 you need to do any of these things.
58
59 C<$level> should be C<0> for messages to be logged always, C<1> for
60 unexpected messages and errors, C<2> for warnings, C<7> for messages about
61 node connectivity and services, C<8> for debugging messages and C<9> for
62 tracing messages.
63
64 The default simply logs the message to STDERR.
65
66 =item @AnyEvent::MP::Kernel::WARN
67
68 All code references in this array are called for every log message, from
69 the default C<$WARN> handler. This is an easy way to tie into the log
70 messages without disturbing others.
71
72 =cut
73
74 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
75 our @WARN;
76 our $WARN = sub {
77 &$_ for @WARN;
78
79 return if $WARNLEVEL < $_[0];
80
81 my ($level, $msg) = @_;
82
83 $msg =~ s/\n$//;
84
85 printf STDERR "%s <%d> %s\n",
86 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
87 $level,
88 $msg;
89 };
90
91 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
92
93 The maximum level at which warning messages will be printed to STDERR by
94 the default warn handler.
95
96 =cut
97
98 sub load_func($) {
99 my $func = $_[0];
100
101 unless (defined &$func) {
102 my $pkg = $func;
103 do {
104 $pkg =~ s/::[^:]+$//
105 or return sub { die "unable to resolve function '$func'" };
106
107 local $@;
108 unless (eval "require $pkg; 1") {
109 my $error = $@;
110 $error =~ /^Can't locate .*.pm in \@INC \(/
111 or return sub { die $error };
112 }
113 } until defined &$func;
114 }
115
116 \&$func
117 }
118
119 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
120
121 sub nonce($) {
122 join "", map chr rand 256, 1 .. $_[0]
123 }
124
125 sub nonce62($) {
126 join "", map $alnum[rand 62], 1 .. $_[0]
127 }
128
129 sub gen_uniq {
130 my $now = AE::now;
131 (join "",
132 map $alnum[$_],
133 $$ / 62 % 62,
134 $$ % 62,
135 (int $now ) % 62,
136 (int $now * 100) % 62,
137 (int $now * 10000) % 62,
138 ) . nonce62 4;
139 }
140
141 our $CONFIG; # this node's configuration
142
143 our $RUNIQ; # remote uniq value
144 our $UNIQ; # per-process/node unique cookie
145 our $NODE;
146 our $ID = "a";
147
148 our %NODE; # node id to transport mapping, or "undef", for local node
149 our (%PORT, %PORT_DATA); # local ports
150
151 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
152 our %LMON; # monitored _local_ ports
153
154 our $GLOBAL; # true if node is a global ("directory") node
155 our %LISTENER;
156 our $LISTENER; # our listeners, as arrayref
157
158 our $SRCNODE; # holds the sending node _object_ during _inject
159
160 sub _init_names {
161 # ~54 bits, for local port names, lowercase $ID appended
162 $UNIQ = gen_uniq;
163
164 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
165 $RUNIQ = nonce62 10;
166 $RUNIQ =~ s/(.)$/\U$1/;
167
168 $NODE = "anon/$RUNIQ";
169 }
170
171 _init_names;
172
173 sub NODE() {
174 $NODE
175 }
176
177 sub node_of($) {
178 my ($node, undef) = split /#/, $_[0], 2;
179
180 $node
181 }
182
183 BEGIN {
184 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
185 ? sub () { 1 }
186 : sub () { 0 };
187 }
188
189 our $DELAY_TIMER;
190 our @DELAY_QUEUE;
191
192 sub _delay_run {
193 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
194 }
195
196 sub delay($) {
197 push @DELAY_QUEUE, shift;
198 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
199 }
200
201 sub _inject {
202 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
203 &{ $PORT{+shift} or return };
204 }
205
206 # this function adds a node-ref, so you can send stuff to it
207 # it is basically the central routing component.
208 sub add_node {
209 my ($node) = @_;
210
211 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
212 }
213
214 sub snd(@) {
215 my ($nodeid, $portid) = split /#/, shift, 2;
216
217 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
218
219 defined $nodeid #d#UGLY
220 or Carp::croak "'undef' is not a valid node ID/port ID";
221
222 ($NODE{$nodeid} || add_node $nodeid)
223 ->{send} (["$portid", @_]);
224 }
225
226 =item $is_local = port_is_local $port
227
228 Returns true iff the port is a local port.
229
230 =cut
231
232 sub port_is_local($) {
233 my ($nodeid, undef) = split /#/, $_[0], 2;
234
235 $NODE{$nodeid} == $NODE{""}
236 }
237
238 =item snd_to_func $node, $func, @args
239
240 Expects a node ID and a name of a function. Asynchronously tries to call
241 this function with the given arguments on that node.
242
243 This function can be used to implement C<spawn>-like interfaces.
244
245 =cut
246
247 sub snd_to_func($$;@) {
248 my $nodeid = shift;
249
250 # on $NODE, we artificially delay... (for spawn)
251 # this is very ugly - maybe we should simply delay ALL messages,
252 # to avoid deep recursion issues. but that's so... slow...
253 $AnyEvent::MP::Node::Self::DELAY = 1
254 if $nodeid ne $NODE;
255
256 defined $nodeid #d#UGLY
257 or Carp::croak "'undef' is not a valid node ID/port ID";
258
259 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
260 }
261
262 =item snd_on $node, @msg
263
264 Executes C<snd> with the given C<@msg> (which must include the destination
265 port) on the given node.
266
267 =cut
268
269 sub snd_on($@) {
270 my $node = shift;
271 snd $node, snd => @_;
272 }
273
274 =item eval_on $node, $string[, @reply]
275
276 Evaluates the given string as Perl expression on the given node. When
277 @reply is specified, then it is used to construct a reply message with
278 C<"$@"> and any results from the eval appended.
279
280 =cut
281
282 sub eval_on($$;@) {
283 my $node = shift;
284 snd $node, eval => @_;
285 }
286
287 sub kil(@) {
288 my ($nodeid, $portid) = split /#/, shift, 2;
289
290 length $portid
291 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
292
293 ($NODE{$nodeid} || add_node $nodeid)
294 ->kill ("$portid", @_);
295 }
296
297 #############################################################################
298 # node monitoring and info
299
300 =item node_is_known $nodeid
301
302 #TODO#
303 Returns true iff the given node is currently known to this node.
304
305 =cut
306
307 sub node_is_known($) {
308 exists $NODE{$_[0]}
309 }
310
311 =item node_is_up $nodeid
312
313 Returns true if the given node is "up", that is, the kernel thinks it has
314 a working connection to it.
315
316 If the node is known (to this local node) but not currently connected,
317 returns C<0>. If the node is not known, returns C<undef>.
318
319 =cut
320
321 sub node_is_up($) {
322 ($NODE{$_[0]} or return)->{transport}
323 ? 1 : 0
324 }
325
326 =item up_nodes
327
328 Return the node IDs of all nodes that are currently connected (excluding
329 the node itself).
330
331 =cut
332
333 sub up_nodes() {
334 map $_->{id}, grep $_->{transport}, values %NODE
335 }
336
337 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
338
339 Registers a callback that is called each time a node goes up (a connection
340 is established) or down (the connection is lost).
341
342 Node up messages can only be followed by node down messages for the same
343 node, and vice versa.
344
345 Note that monitoring a node is usually better done by monitoring its node
346 port. This function is mainly of interest to modules that are concerned
347 about the network topology and low-level connection handling.
348
349 Callbacks I<must not> block and I<should not> send any messages.
350
351 The function returns an optional guard which can be used to unregister
352 the monitoring callback again.
353
354 Example: make sure you call function C<newnode> for all nodes that are up
355 or go up (and down).
356
357 newnode $_, 1 for up_nodes;
358 mon_nodes \&newnode;
359
360 =cut
361
362 our %MON_NODES;
363
364 sub mon_nodes($) {
365 my ($cb) = @_;
366
367 $MON_NODES{$cb+0} = $cb;
368
369 defined wantarray && Guard::guard { delete $MON_NODES{$cb+0} }
370 }
371
372 sub _inject_nodeevent($$;@) {
373 my ($node, $up, @reason) = @_;
374
375 for my $cb (values %MON_NODES) {
376 eval { $cb->($node->{id}, $up, @reason); 1 }
377 or $WARN->(1, $@);
378 }
379
380 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
381 }
382
383 #############################################################################
384 # self node code
385
386 sub _kill {
387 my $port = shift;
388
389 delete $PORT{$port}
390 or return; # killing nonexistent ports is O.K.
391 delete $PORT_DATA{$port};
392
393 my $mon = delete $LMON{$port}
394 or !@_
395 or $WARN->(2, "unmonitored local port $port died with reason: @_");
396
397 $_->(@_) for values %$mon;
398 }
399
400 sub _monitor {
401 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
402 unless exists $PORT{$_[1]};
403
404 $LMON{$_[1]}{$_[2]+0} = $_[2];
405 }
406
407 sub _unmonitor {
408 delete $LMON{$_[1]}{$_[2]+0}
409 if exists $LMON{$_[1]};
410 }
411
412 our %NODE_REQ = (
413 # internal services
414
415 # monitoring
416 mon0 => sub { # stop monitoring a port for another node
417 my $portid = shift;
418 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
419 },
420 mon1 => sub { # start monitoring a port for another node
421 my $portid = shift;
422 Scalar::Util::weaken (my $node = $SRCNODE);
423 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
424 delete $node->{rmon}{$portid};
425 $node->send (["", kil0 => $portid, @_])
426 if $node && $node->{transport};
427 };
428 },
429 # another node has killed a monitored port
430 kil0 => sub {
431 my $cbs = delete $SRCNODE->{lmon}{+shift}
432 or return;
433
434 $_->(@_) for @$cbs;
435 },
436
437 # "public" services - not actually public
438
439 # another node wants to kill a local port
440 kil => \&_kill,
441
442 # relay message to another node / generic echo
443 snd => \&snd,
444 snd_multiple => sub {
445 snd @$_ for @_
446 },
447
448 # random utilities
449 eval => sub {
450 #d#SECURE
451 my @res = do { package main; eval shift };
452 snd @_, "$@", @res if @_;
453 },
454 time => sub {
455 snd @_, AE::now;
456 },
457 devnull => sub {
458 #
459 },
460 "" => sub {
461 # empty messages are keepalives or similar devnull-applications
462 },
463 );
464
465 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
466 $PORT{""} = sub {
467 my $tag = shift;
468 #d#SECURE (load_func)
469 eval { &{ $NODE_REQ{$tag} ||= load_func $tag } };
470 $WARN->(2, "error processing node message: $@") if $@;
471 };
472
473 #############################################################################
474 # seed management, try to keep connections to all seeds at all times
475
476 our %SEED_NODE; # seed ID => node ID|undef
477 our %NODE_SEED; # map node ID to seed ID
478 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
479 our $SEED_WATCHER;
480 our $SEED_RETRY;
481
482 sub seed_connect {
483 my ($seed) = @_;
484
485 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
486 or Carp::croak "$seed: unparsable seed address";
487
488 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
489
490 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
491 $host, $port,
492 on_greeted => sub {
493 # called after receiving remote greeting, learn remote node name
494
495 # we rely on untrusted data here (the remote node name) this is
496 # hopefully ok, as this can at most be used for DOSing, which is easy
497 # when you can do MITM anyway.
498
499 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
500 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
501 require AnyEvent::MP::Global; # every seed becomes a global node currently
502 delete $SEED_NODE{$seed};
503 delete $NODE_SEED{$seed};
504 } else {
505 $SEED_NODE{$seed} = $_[0]{remote_node};
506 $NODE_SEED{$_[0]{remote_node}} = $seed;
507 }
508 },
509 on_destroy => sub {
510 delete $SEED_CONNECT{$seed};
511 },
512 sub {
513 $SEED_CONNECT{$seed} = 1;
514 }
515 ;
516 }
517
518 sub seed_all {
519 my @seeds = grep {
520 !exists $SEED_CONNECT{$_}
521 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
522 } keys %SEED_NODE;
523
524 if (@seeds) {
525 # start connection attempt for every seed we are not connected to yet
526 seed_connect $_
527 for @seeds;
528
529 $SEED_RETRY = $SEED_RETRY * 2 + rand;
530 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
531 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
532
533 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
534
535 } else {
536 # all seeds connected or connecting, no need to restart timer
537 undef $SEED_WATCHER;
538 }
539 }
540
541 sub seed_again {
542 $SEED_RETRY = 1;
543 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
544 }
545
546 # sets new seed list, starts connecting
547 sub set_seeds(@) {
548 %SEED_NODE = ();
549 %NODE_SEED = ();
550 %SEED_CONNECT = ();
551
552 @SEED_NODE{@_} = ();
553
554 seed_again;#d#
555 seed_all;
556 }
557
558 mon_nodes sub {
559 # if we lost the connection to a seed node, make sure we are seeding
560 seed_again
561 if !$_[1] && exists $NODE_SEED{$_[0]};
562 };
563
564 #############################################################################
565 # talk with/to global nodes
566
567 # protocol messages:
568 #
569 # sent by all slave nodes (slave to master)
570 # g_slave database - make other global node master of the sender
571 #
572 # sent by any node to global nodes
573 # g_set database - set whole database
574 # g_add family key val - add/replace key to database
575 # g_del family key - delete key from database
576 # g_get family key reply... - send reply with data
577 #
578 # send by global nodes
579 # g_global - node became global, similar to global=1 greeting
580 #
581 # database families
582 # "'l" -> node -> listeners
583 # "'g" -> node -> undef
584 # ...
585 #
586
587 # used on all nodes:
588 our $MASTER; # the global node we bind ourselves to, unless we are global ourselves
589 our $MASTER_MON;
590 our %LOCAL_DB; # this node database
591
592 our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
593
594 #############################################################################
595 # master selection
596
597 # master requests
598 our %GLOBAL_REQ; # $id => \@req
599
600 sub global_req_add {
601 my ($id, $req) = @_;
602
603 return if exists $GLOBAL_REQ{$id};
604
605 $GLOBAL_REQ{$id} = $req;
606
607 snd $MASTER, @$req
608 if $MASTER;
609 }
610
611 sub global_req_del {
612 delete $GLOBAL_REQ{$_[0]};
613 }
614
615 sub g_find {
616 global_req_add "g_find $_[0]", [g_find => $_[0]];
617 }
618
619 # reply for g_find started in Node.pm
620 $NODE_REQ{g_found} = sub {
621 global_req_del "g_find $_[0]";
622
623 my $node = $NODE{$_[0]} or return;
624
625 $node->connect_to ($_[1]);
626 };
627
628 sub master_set {
629 $MASTER = $_[0];
630
631 snd $MASTER, g_slave => \%LOCAL_DB;
632
633 # (re-)send queued requests
634 snd $MASTER, @$_
635 for values %GLOBAL_REQ;
636 }
637
638 sub master_search {
639 #TODO: should also look for other global nodes, but we don't know them #d#
640 for (keys %NODE_SEED) {
641 if (node_is_up $_) {
642 master_set $_;
643 return;
644 }
645 }
646
647 $MASTER_MON = mon_nodes sub {
648 return unless $_[1]; # we are only interested in node-ups
649 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
650
651 master_set $_[0];
652
653 $MASTER_MON = mon_nodes sub {
654 if ($_[0] eq $MASTER && !$_[1]) {
655 undef $MASTER;
656 master_search ();
657 }
658 };
659 };
660 }
661
662 # other node wants to make us the master
663 $NODE_REQ{g_slave} = sub {
664 my ($db) = @_;
665
666 # load global module and redo the request
667 require AnyEvent::MP::Global;
668 &{ $NODE_REQ{g_slave} }
669 };
670
671 #############################################################################
672 # local database operations
673
674 # local database management
675 sub db_set($$$) {
676 $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
677 snd $MASTER, g_add => $_[0] => $_[1] => $_[2]
678 if defined $MASTER;
679 }
680
681 sub db_del($$) {
682 delete $LOCAL_DB{$_[0]}{$_[1]};
683 snd $MASTER, g_del => $_[0] => $_[1]
684 if defined $MASTER;
685 }
686
687 sub db_reg($$;$) {
688 my ($family, $key) = @_;
689 &db_set;
690 Guard::guard { db_del $family => $key }
691 }
692
693 sub db_keys($$$) {
694 #d#
695 }
696
697 #d# db_values
698 #d# db_family
699 #d# db_key
700
701 our %LOCAL_MON; # f, reply
702 our %MON_DB; # f, k, value
703
704 sub g_chg {
705 my $f = shift;
706
707 $#_ ? $MON_DB{$f}{$_[0]} = $_[1]
708 : delete $MON_DB{$f}{$_[0]};
709
710 &{ $_->[0] }
711 for values %{ $LOCAL_MON{$f} };
712 }
713
714 sub db_mon($@) {
715 my ($family, @reply) = @_;
716
717 my $reply = \@reply;
718 my $id = $reply + 0;
719
720 if (%{ $LOCAL_MON{$family} }) {
721 # if we already monitor this thingy, generate
722 # create events for all of them
723 while (my ($key, $value) = each %{ $MON_DB{$family} }) {
724 $reply->[0]->($key, $value);
725 }
726 } else {
727 # new monitor, request chg1 from upstream
728 global_req_add "mon1 $family" => [g_mon1 => $family];
729 $MON_DB{$family} = {};
730 }
731
732 $LOCAL_MON{$family}{$id} = \@reply;
733
734 Guard::guard {
735 my $mon = $LOCAL_MON{$family};
736 delete $mon->{$id};
737
738 unless (%$mon) {
739 global_req_del "mon1 $family";
740
741 # no global_req, because we don't care if we are not connected
742 snd $MASTER, g_mon0 => $family
743 if $MASTER;
744
745 delete $LOCAL_MON{$family};
746 delete $MON_DB{$family};
747 }
748 }
749 }
750
751 $NODE_REQ{g_chg1} = sub {
752 my ($f, $db) = @_;
753
754 my $odb = delete $MON_DB{$f};
755
756 for (keys %$odb) {
757 g_chg $f, $_
758 unless exists $db->{$_};
759 }
760
761 while (my ($k, $v) = each %$db) {
762 g_chg $f, $k, $v;
763 }
764 };
765
766 $NODE_REQ{g_chg2} = \&g_chg;
767
768 #############################################################################
769 # configure
770
771 sub nodename {
772 require POSIX;
773 (POSIX::uname ())[1]
774 }
775
776 sub _resolve($) {
777 my ($nodeid) = @_;
778
779 my $cv = AE::cv;
780 my @res;
781
782 $cv->begin (sub {
783 my %seen;
784 my @refs;
785 for (sort { $a->[0] <=> $b->[0] } @res) {
786 push @refs, $_->[1] unless $seen{$_->[1]}++
787 }
788 shift->send (@refs);
789 });
790
791 my $idx;
792 for my $t (split /,/, $nodeid) {
793 my $pri = ++$idx;
794
795 $t = length $t ? nodename . ":$t" : nodename
796 if $t =~ /^\d*$/;
797
798 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
799 or Carp::croak "$t: unparsable transport descriptor";
800
801 $port = "0" if $port eq "*";
802
803 if ($host eq "*") {
804 $cv->begin;
805 # use fork_call, as Net::Interface is big, and we need it rarely.
806 require AnyEvent::Util;
807 AnyEvent::Util::fork_call (
808 sub {
809 my @addr;
810
811 require Net::Interface;
812
813 for my $if (Net::Interface->interfaces) {
814 # we statically lower-prioritise ipv6 here, TODO :()
815 for $_ ($if->address (Net::Interface::AF_INET ())) {
816 next if /^\x7f/; # skip localhost etc.
817 push @addr, $_;
818 }
819 for ($if->address (Net::Interface::AF_INET6 ())) {
820 #next if $if->scope ($_) <= 2;
821 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
822 push @addr, $_;
823 }
824
825 }
826 @addr
827 }, sub {
828 for my $ip (@_) {
829 push @res, [
830 $pri += 1e-5,
831 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
832 ];
833 }
834 $cv->end;
835 }
836 );
837 } else {
838 $cv->begin;
839 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
840 for (@_) {
841 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
842 push @res, [
843 $pri += 1e-5,
844 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
845 ];
846 }
847 $cv->end;
848 };
849 }
850 }
851
852 $cv->end;
853
854 $cv
855 }
856
857 sub configure(@) {
858 unshift @_, "profile" if @_ & 1;
859 my (%kv) = @_;
860
861 delete $NODE{$NODE}; # we do not support doing stuff before configure
862 _init_names;
863
864 my $profile = delete $kv{profile};
865
866 $profile = nodename
867 unless defined $profile;
868
869 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
870
871 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
872
873 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
874
875 $NODE = $node;
876
877 $NODE =~ s/%n/nodename/ge;
878
879 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
880 # nodes with randomised node names do not need randomised port names
881 $UNIQ = "";
882 }
883
884 $NODE{$NODE} = $NODE{""};
885 $NODE{$NODE}{id} = $NODE;
886
887 my $seeds = $CONFIG->{seeds};
888 my $binds = $CONFIG->{binds};
889
890 $binds ||= ["*"];
891
892 $WARN->(8, "node $NODE starting up.");
893
894 $LISTENER = [];
895 %LISTENER = ();
896
897 for (map _resolve $_, @$binds) {
898 for my $bind ($_->recv) {
899 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
900 or Carp::croak "$bind: unparsable local bind address";
901
902 my $listener = AnyEvent::MP::Transport::mp_server
903 $host,
904 $port,
905 prepare => sub {
906 my (undef, $host, $port) = @_;
907 $bind = AnyEvent::Socket::format_hostport $host, $port;
908 0
909 },
910 ;
911 $LISTENER{$bind} = $listener;
912 push @$LISTENER, $bind;
913 }
914 }
915
916 db_set "'l" => $NODE => $LISTENER;
917
918 $WARN->(8, "node listens on [@$LISTENER].");
919
920 # connect to all seednodes
921 set_seeds map $_->recv, map _resolve $_, @$seeds;
922
923 master_search;
924
925 if ($NODE eq "atha") {;#d#
926 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
927 }
928
929 for (@{ $CONFIG->{services} }) {
930 if (ref) {
931 my ($func, @args) = @$_;
932 (load_func $func)->(@args);
933 } elsif (s/::$//) {
934 eval "require $_";
935 die $@ if $@;
936 } else {
937 (load_func $_)->();
938 }
939 }
940 }
941
942 =back
943
944 =head1 SEE ALSO
945
946 L<AnyEvent::MP>.
947
948 =head1 AUTHOR
949
950 Marc Lehmann <schmorp@schmorp.de>
951 http://home.schmorp.de/
952
953 =cut
954
955 1
956