ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.79
Committed: Sat Mar 3 11:38:43 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.78: +30 -52 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29
30 use AnyEvent ();
31 use Guard ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT_OK = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 );
41
42 our @EXPORT = qw(
43 add_node load_func snd_to_func snd_on eval_on
44
45 NODE $NODE node_of snd kil port_is_local
46 configure
47 up_nodes mon_nodes node_is_up
48 db_set db_del db_reg
49 );
50
51 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
52
53 This value is called with an error or warning message, when e.g. a
54 connection could not be created, authorisation failed and so on.
55
56 It I<must not> block or send messages -queue it and use an idle watcher if
57 you need to do any of these things.
58
59 C<$level> should be C<0> for messages to be logged always, C<1> for
60 unexpected messages and errors, C<2> for warnings, C<7> for messages about
61 node connectivity and services, C<8> for debugging messages and C<9> for
62 tracing messages.
63
64 The default simply logs the message to STDERR.
65
66 =item @AnyEvent::MP::Kernel::WARN
67
68 All code references in this array are called for every log message, from
69 the default C<$WARN> handler. This is an easy way to tie into the log
70 messages without disturbing others.
71
72 =cut
73
74 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
75 our @WARN;
76 our $WARN = sub {
77 &$_ for @WARN;
78
79 return if $WARNLEVEL < $_[0];
80
81 my ($level, $msg) = @_;
82
83 $msg =~ s/\n$//;
84
85 printf STDERR "%s <%d> %s\n",
86 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
87 $level,
88 $msg;
89 };
90
91 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
92
93 The maximum level at which warning messages will be printed to STDERR by
94 the default warn handler.
95
96 =cut
97
98 sub load_func($) {
99 my $func = $_[0];
100
101 unless (defined &$func) {
102 my $pkg = $func;
103 do {
104 $pkg =~ s/::[^:]+$//
105 or return sub { die "unable to resolve function '$func'" };
106
107 local $@;
108 unless (eval "require $pkg; 1") {
109 my $error = $@;
110 $error =~ /^Can't locate .*.pm in \@INC \(/
111 or return sub { die $error };
112 }
113 } until defined &$func;
114 }
115
116 \&$func
117 }
118
119 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
120
121 sub nonce($) {
122 join "", map chr rand 256, 1 .. $_[0]
123 }
124
125 sub nonce62($) {
126 join "", map $alnum[rand 62], 1 .. $_[0]
127 }
128
129 sub gen_uniq {
130 my $now = AE::now;
131 (join "",
132 map $alnum[$_],
133 $$ / 62 % 62,
134 $$ % 62,
135 (int $now ) % 62,
136 (int $now * 100) % 62,
137 (int $now * 10000) % 62,
138 ) . nonce62 4;
139 }
140
141 our $CONFIG; # this node's configuration
142
143 our $RUNIQ; # remote uniq value
144 our $UNIQ; # per-process/node unique cookie
145 our $NODE;
146 our $ID = "a";
147
148 our %NODE; # node id to transport mapping, or "undef", for local node
149 our (%PORT, %PORT_DATA); # local ports
150
151 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
152 our %LMON; # monitored _local_ ports
153
154 our $GLOBAL; # true if node is a global ("directory") node
155 our %LISTENER;
156 our $LISTENER; # our listeners, as arrayref
157
158 our $SRCNODE; # holds the sending node _object_ during _inject
159
160 sub _init_names {
161 # ~54 bits, for local port names, lowercase $ID appended
162 $UNIQ = gen_uniq;
163
164 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
165 $RUNIQ = nonce62 10;
166 $RUNIQ =~ s/(.)$/\U$1/;
167
168 $NODE = "anon/$RUNIQ";
169 }
170
171 _init_names;
172
173 sub NODE() {
174 $NODE
175 }
176
177 sub node_of($) {
178 my ($node, undef) = split /#/, $_[0], 2;
179
180 $node
181 }
182
183 BEGIN {
184 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
185 ? sub () { 1 }
186 : sub () { 0 };
187 }
188
189 our $DELAY_TIMER;
190 our @DELAY_QUEUE;
191
192 sub _delay_run {
193 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
194 }
195
196 sub delay($) {
197 push @DELAY_QUEUE, shift;
198 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
199 }
200
201 sub _inject {
202 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
203 &{ $PORT{+shift} or return };
204 }
205
206 # this function adds a node-ref, so you can send stuff to it
207 # it is basically the central routing component.
208 sub add_node {
209 my ($node) = @_;
210
211 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
212 }
213
214 sub snd(@) {
215 my ($nodeid, $portid) = split /#/, shift, 2;
216
217 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
218
219 defined $nodeid #d#UGLY
220 or Carp::croak "'undef' is not a valid node ID/port ID";
221
222 ($NODE{$nodeid} || add_node $nodeid)
223 ->{send} (["$portid", @_]);
224 }
225
226 =item $is_local = port_is_local $port
227
228 Returns true iff the port is a local port.
229
230 =cut
231
232 sub port_is_local($) {
233 my ($nodeid, undef) = split /#/, $_[0], 2;
234
235 $NODE{$nodeid} == $NODE{""}
236 }
237
238 =item snd_to_func $node, $func, @args
239
240 Expects a node ID and a name of a function. Asynchronously tries to call
241 this function with the given arguments on that node.
242
243 This function can be used to implement C<spawn>-like interfaces.
244
245 =cut
246
247 sub snd_to_func($$;@) {
248 my $nodeid = shift;
249
250 # on $NODE, we artificially delay... (for spawn)
251 # this is very ugly - maybe we should simply delay ALL messages,
252 # to avoid deep recursion issues. but that's so... slow...
253 $AnyEvent::MP::Node::Self::DELAY = 1
254 if $nodeid ne $NODE;
255
256 defined $nodeid #d#UGLY
257 or Carp::croak "'undef' is not a valid node ID/port ID";
258
259 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
260 }
261
262 =item snd_on $node, @msg
263
264 Executes C<snd> with the given C<@msg> (which must include the destination
265 port) on the given node.
266
267 =cut
268
269 sub snd_on($@) {
270 my $node = shift;
271 snd $node, snd => @_;
272 }
273
274 =item eval_on $node, $string[, @reply]
275
276 Evaluates the given string as Perl expression on the given node. When
277 @reply is specified, then it is used to construct a reply message with
278 C<"$@"> and any results from the eval appended.
279
280 =cut
281
282 sub eval_on($$;@) {
283 my $node = shift;
284 snd $node, eval => @_;
285 }
286
287 sub kil(@) {
288 my ($nodeid, $portid) = split /#/, shift, 2;
289
290 length $portid
291 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
292
293 ($NODE{$nodeid} || add_node $nodeid)
294 ->kill ("$portid", @_);
295 }
296
297 #############################################################################
298 # node monitoring and info
299
300 =item node_is_known $nodeid
301
302 #TODO#
303 Returns true iff the given node is currently known to this node.
304
305 =cut
306
307 sub node_is_known($) {
308 exists $NODE{$_[0]}
309 }
310
311 =item node_is_up $nodeid
312
313 Returns true if the given node is "up", that is, the kernel thinks it has
314 a working connection to it.
315
316 If the node is known (to this local node) but not currently connected,
317 returns C<0>. If the node is not known, returns C<undef>.
318
319 =cut
320
321 sub node_is_up($) {
322 ($NODE{$_[0]} or return)->{transport}
323 ? 1 : 0
324 }
325
326 =item known_nodes
327
328 #TODO#
329 Returns the node IDs of all nodes currently known to this node, including
330 itself and nodes not currently connected.
331
332 =cut
333
334 sub known_nodes() {
335 map $_->{id}, values %NODE
336 }
337
338 =item up_nodes
339
340 Return the node IDs of all nodes that are currently connected (excluding
341 the node itself).
342
343 =cut
344
345 sub up_nodes() {
346 map $_->{id}, grep $_->{transport}, values %NODE
347 }
348
349 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
350
351 Registers a callback that is called each time a node goes up (a connection
352 is established) or down (the connection is lost).
353
354 Node up messages can only be followed by node down messages for the same
355 node, and vice versa.
356
357 Note that monitoring a node is usually better done by monitoring its node
358 port. This function is mainly of interest to modules that are concerned
359 about the network topology and low-level connection handling.
360
361 Callbacks I<must not> block and I<should not> send any messages.
362
363 The function returns an optional guard which can be used to unregister
364 the monitoring callback again.
365
366 Example: make sure you call function C<newnode> for all nodes that are up
367 or go up (and down).
368
369 newnode $_, 1 for up_nodes;
370 mon_nodes \&newnode;
371
372 =cut
373
374 our %MON_NODES;
375
376 sub mon_nodes($) {
377 my ($cb) = @_;
378
379 $MON_NODES{$cb+0} = $cb;
380
381 defined wantarray && Guard::guard { delete $MON_NODES{$cb+0} }
382 }
383
384 sub _inject_nodeevent($$;@) {
385 my ($node, $up, @reason) = @_;
386
387 for my $cb (values %MON_NODES) {
388 eval { $cb->($node->{id}, $up, @reason); 1 }
389 or $WARN->(1, $@);
390 }
391
392 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
393 }
394
395 #############################################################################
396 # self node code
397
398 sub _kill {
399 my $port = shift;
400
401 delete $PORT{$port}
402 or return; # killing nonexistent ports is O.K.
403 delete $PORT_DATA{$port};
404
405 my $mon = delete $LMON{$port}
406 or !@_
407 or $WARN->(2, "unmonitored local port $port died with reason: @_");
408
409 $_->(@_) for values %$mon;
410 }
411
412 sub _monitor {
413 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
414 unless exists $PORT{$_[1]};
415
416 $LMON{$_[1]}{$_[2]+0} = $_[2];
417 }
418
419 sub _unmonitor {
420 delete $LMON{$_[1]}{$_[2]+0}
421 if exists $LMON{$_[1]};
422 }
423
424 our %NODE_REQ = (
425 # internal services
426
427 # monitoring
428 mon0 => sub { # stop monitoring a port for another node
429 my $portid = shift;
430 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
431 },
432 mon1 => sub { # start monitoring a port for another node
433 my $portid = shift;
434 Scalar::Util::weaken (my $node = $SRCNODE);
435 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
436 delete $node->{rmon}{$portid};
437 $node->send (["", kil0 => $portid, @_])
438 if $node && $node->{transport};
439 };
440 },
441 # another node has killed a monitored port
442 kil0 => sub {
443 my $cbs = delete $SRCNODE->{lmon}{+shift}
444 or return;
445
446 $_->(@_) for @$cbs;
447 },
448
449 # "public" services - not actually public
450
451 # another node wants to kill a local port
452 kil => \&_kill,
453
454 # relay message to another node / generic echo
455 snd => \&snd,
456 snd_multiple => sub {
457 snd @$_ for @_
458 },
459
460 # random utilities
461 eval => sub {
462 #d#SECURE
463 my @res = do { package main; eval shift };
464 snd @_, "$@", @res if @_;
465 },
466 time => sub {
467 snd @_, AE::now;
468 },
469 devnull => sub {
470 #
471 },
472 "" => sub {
473 # empty messages are keepalives or similar devnull-applications
474 },
475 );
476
477 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
478 $PORT{""} = sub {
479 my $tag = shift;
480 #d#SECURE (load_func)
481 eval { &{ $NODE_REQ{$tag} ||= load_func $tag } };
482 $WARN->(2, "error processing node message: $@") if $@;
483 };
484
485 #############################################################################
486 # seed management, try to keep connections to all seeds at all times
487
488 our %SEED_NODE; # seed ID => node ID|undef
489 our %NODE_SEED; # map node ID to seed ID
490 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
491 our $SEED_WATCHER;
492 our $SEED_RETRY;
493
494 sub seed_connect {
495 my ($seed) = @_;
496
497 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
498 or Carp::croak "$seed: unparsable seed address";
499
500 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
501
502 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
503 $host, $port,
504 on_greeted => sub {
505 # called after receiving remote greeting, learn remote node name
506
507 # we rely on untrusted data here (the remote node name) this is
508 # hopefully ok, as this can at most be used for DOSing, which is easy
509 # when you can do MITM anyway.
510
511 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
512 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
513 require AnyEvent::MP::Global; # every seed becomes a global node currently
514 delete $SEED_NODE{$seed};
515 delete $NODE_SEED{$seed};
516 } else {
517 $SEED_NODE{$seed} = $_[0]{remote_node};
518 $NODE_SEED{$_[0]{remote_node}} = $seed;
519 }
520 },
521 on_destroy => sub {
522 delete $SEED_CONNECT{$seed};
523 },
524 sub {
525 $SEED_CONNECT{$seed} = 1;
526 }
527 ;
528 }
529
530 sub seed_all {
531 my @seeds = grep {
532 !exists $SEED_CONNECT{$_}
533 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
534 } keys %SEED_NODE;
535
536 if (@seeds) {
537 # start connection attempt for every seed we are not connected to yet
538 seed_connect $_
539 for @seeds;
540
541 $SEED_RETRY = $SEED_RETRY * 2 + rand;
542 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
543 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
544
545 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
546
547 } else {
548 # all seeds connected or connecting, no need to restart timer
549 undef $SEED_WATCHER;
550 }
551 }
552
553 sub seed_again {
554 $SEED_RETRY = 1;
555 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
556 }
557
558 # sets new seed list, starts connecting
559 sub set_seeds(@) {
560 %SEED_NODE = ();
561 %NODE_SEED = ();
562 %SEED_CONNECT = ();
563
564 @SEED_NODE{@_} = ();
565
566 seed_again;#d#
567 seed_all;
568 }
569
570 mon_nodes sub {
571 # if we lost the connection to a seed node, make sure we are seeding
572 seed_again
573 if !$_[1] && exists $NODE_SEED{$_[0]};
574 };
575
576 #############################################################################
577 # talk with/to global nodes
578
579 # protocol messages:
580 #
581 # sent by all slave nodes (slave to master)
582 # g_slave database - make other global node master of the sender
583 #
584 # sent by any node to global nodes
585 # g_set database - set whole database
586 # g_add family key val - add/replace key to database
587 # g_del family key - delete key from database
588 # g_get family key reply... - send reply with data
589 #
590 # send by global nodes
591 # g_global - node became global, similar to global=1 greeting
592 #
593 # database families
594 # "'l" -> node -> listeners
595 # "'g" -> node -> undef
596 # ...
597 #
598
599 # used on all nodes:
600 our $MASTER; # the global node we bind ourselves to, unless we are global ourselves
601 our $MASTER_MON;
602 our %LOCAL_DB; # this node database
603
604 our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
605
606 #############################################################################
607 # master selection
608
609 # master requests
610 our %GLOBAL_REQ; # $id => \@req
611
612 sub global_req_add {
613 my $id = shift;
614
615 return if exists $GLOBAL_REQ{$id};
616
617 $GLOBAL_REQ{$id} = [@_];
618
619 snd $MASTER, @_
620 if $MASTER;
621 }
622
623 sub global_req_del {
624 delete $GLOBAL_REQ{$_[0]};
625 }
626
627 sub g_find {
628 global_req_add "g_find $_[0]", g_find => $_[0];
629 }
630
631 # reply for g_find started in Node.pm
632 $NODE_REQ{g_found} = sub {
633 global_req_del "g_find $_[0]";
634
635 my $node = $NODE{$_[0]} or return;
636
637 $node->connect_to ($_[1]);
638 };
639
640 sub master_set {
641 $MASTER = $_[0];
642
643 snd $MASTER, g_slave => \%LOCAL_DB;
644
645 # (re-)send queued requests
646 snd $MASTER, @$_
647 for values %GLOBAL_REQ;
648 }
649
650 sub master_search {
651 #TODO: should also look for other global nodes, but we don't know them #d#
652 for (keys %NODE_SEED) {
653 if (node_is_up $_) {
654 master_set $_;
655 return;
656 }
657 }
658
659 $MASTER_MON = mon_nodes sub {
660 return unless $_[1]; # we are only interested in node-ups
661 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
662
663 master_set $_[0];
664
665 $MASTER_MON = mon_nodes sub {
666 if ($_[0] eq $MASTER && !$_[1]) {
667 undef $MASTER;
668 master_search ();
669 }
670 };
671 };
672 }
673
674 # other node wants to make us the master
675 $NODE_REQ{g_slave} = sub {
676 my ($db) = @_;
677
678 warn "slave1\n";#d#
679
680 require AnyEvent::MP::Global;
681 &{ $NODE_REQ{g_slave} };
682 };
683
684 #############################################################################
685 # local database operations
686
687 # local database management
688 sub db_set($$$) {
689 $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
690 snd $MASTER, g_add => $_[0] => $_[1] => $_[2]
691 if defined $MASTER;
692 }
693
694 sub db_del($$) {
695 delete $LOCAL_DB{$_[0]}{$_[1]};
696 snd $MASTER, g_del => $_[0] => $_[1]
697 if defined $MASTER;
698 }
699
700 sub db_reg($$;$) {
701 my ($family, $key) = @_;
702 &db_set;
703 Guard::guard { db_del $family => $key }
704 }
705
706 #############################################################################
707 # configure
708
709 sub _nodename {
710 require POSIX;
711 (POSIX::uname ())[1]
712 }
713
714 sub _resolve($) {
715 my ($nodeid) = @_;
716
717 my $cv = AE::cv;
718 my @res;
719
720 $cv->begin (sub {
721 my %seen;
722 my @refs;
723 for (sort { $a->[0] <=> $b->[0] } @res) {
724 push @refs, $_->[1] unless $seen{$_->[1]}++
725 }
726 shift->send (@refs);
727 });
728
729 my $idx;
730 for my $t (split /,/, $nodeid) {
731 my $pri = ++$idx;
732
733 $t = length $t ? _nodename . ":$t" : _nodename
734 if $t =~ /^\d*$/;
735
736 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
737 or Carp::croak "$t: unparsable transport descriptor";
738
739 $port = "0" if $port eq "*";
740
741 if ($host eq "*") {
742 $cv->begin;
743 # use fork_call, as Net::Interface is big, and we need it rarely.
744 require AnyEvent::Util;
745 AnyEvent::Util::fork_call (
746 sub {
747 my @addr;
748
749 require Net::Interface;
750
751 for my $if (Net::Interface->interfaces) {
752 # we statically lower-prioritise ipv6 here, TODO :()
753 for $_ ($if->address (Net::Interface::AF_INET ())) {
754 next if /^\x7f/; # skip localhost etc.
755 push @addr, $_;
756 }
757 for ($if->address (Net::Interface::AF_INET6 ())) {
758 #next if $if->scope ($_) <= 2;
759 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
760 push @addr, $_;
761 }
762
763 }
764 @addr
765 }, sub {
766 for my $ip (@_) {
767 push @res, [
768 $pri += 1e-5,
769 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
770 ];
771 }
772 $cv->end;
773 }
774 );
775 } else {
776 $cv->begin;
777 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
778 for (@_) {
779 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
780 push @res, [
781 $pri += 1e-5,
782 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
783 ];
784 }
785 $cv->end;
786 };
787 }
788 }
789
790 $cv->end;
791
792 $cv
793 }
794
795 sub configure(@) {
796 unshift @_, "profile" if @_ & 1;
797 my (%kv) = @_;
798
799 delete $NODE{$NODE}; # we do not support doing stuff before configure
800 _init_names;
801
802 my $profile = delete $kv{profile};
803
804 $profile = _nodename
805 unless defined $profile;
806
807 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
808
809 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
810
811 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
812
813 $NODE = $node;
814
815 if ($NODE =~ s%/$%/$RUNIQ%) {
816 # nodes with randomised node names do not need randomised port names
817 $UNIQ = "";
818 }
819
820 $NODE{$NODE} = $NODE{""};
821 $NODE{$NODE}{id} = $NODE;
822
823 my $seeds = $CONFIG->{seeds};
824 my $binds = $CONFIG->{binds};
825
826 $binds ||= ["*"];
827
828 $WARN->(8, "node $NODE starting up.");
829
830 $LISTENER = [];
831 %LISTENER = ();
832
833 for (map _resolve $_, @$binds) {
834 for my $bind ($_->recv) {
835 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
836 or Carp::croak "$bind: unparsable local bind address";
837
838 my $listener = AnyEvent::MP::Transport::mp_server
839 $host,
840 $port,
841 prepare => sub {
842 my (undef, $host, $port) = @_;
843 $bind = AnyEvent::Socket::format_hostport $host, $port;
844 0
845 },
846 ;
847 $LISTENER{$bind} = $listener;
848 push @$LISTENER, $bind;
849 }
850 }
851
852 db_set "'l" => $NODE => $LISTENER;
853
854 $WARN->(8, "node listens on [@$LISTENER].");
855
856 # connect to all seednodes
857 set_seeds map $_->recv, map _resolve $_, @$seeds;
858
859 master_search;
860
861 if ($NODE eq "atha") {;#d#
862 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
863 }
864
865 for (@{ $CONFIG->{services} }) {
866 if (ref) {
867 my ($func, @args) = @$_;
868 (load_func $func)->(@args);
869 } elsif (s/::$//) {
870 eval "require $_";
871 die $@ if $@;
872 } else {
873 (load_func $_)->();
874 }
875 }
876 }
877
878 =back
879
880 =head1 SEE ALSO
881
882 L<AnyEvent::MP>.
883
884 =head1 AUTHOR
885
886 Marc Lehmann <schmorp@schmorp.de>
887 http://home.schmorp.de/
888
889 =cut
890
891 1
892