ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.78
Committed: Fri Mar 2 19:19:21 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.77: +29 -47 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29
30 use AE ();
31
32 use AnyEvent::MP::Node;
33 use AnyEvent::MP::Transport;
34
35 use base "Exporter";
36
37 our @EXPORT_OK = qw(
38 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
39 );
40
41 our @EXPORT = qw(
42 add_node load_func snd_to_func snd_on eval_on
43
44 NODE $NODE node_of snd kil port_is_local
45 configure
46 up_nodes mon_nodes node_is_up
47 );
48
49 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
50
51 This value is called with an error or warning message, when e.g. a
52 connection could not be created, authorisation failed and so on.
53
54 It I<must not> block or send messages -queue it and use an idle watcher if
55 you need to do any of these things.
56
57 C<$level> should be C<0> for messages to be logged always, C<1> for
58 unexpected messages and errors, C<2> for warnings, C<7> for messages about
59 node connectivity and services, C<8> for debugging messages and C<9> for
60 tracing messages.
61
62 The default simply logs the message to STDERR.
63
64 =item @AnyEvent::MP::Kernel::WARN
65
66 All code references in this array are called for every log message, from
67 the default C<$WARN> handler. This is an easy way to tie into the log
68 messages without disturbing others.
69
70 =cut
71
72 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
73 our @WARN;
74 our $WARN = sub {
75 &$_ for @WARN;
76
77 return if $WARNLEVEL < $_[0];
78
79 my ($level, $msg) = @_;
80
81 $msg =~ s/\n$//;
82
83 printf STDERR "%s <%d> %s\n",
84 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
85 $level,
86 $msg;
87 };
88
89 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
90
91 The maximum level at which warning messages will be printed to STDERR by
92 the default warn handler.
93
94 =cut
95
96 sub load_func($) {
97 my $func = $_[0];
98
99 unless (defined &$func) {
100 my $pkg = $func;
101 do {
102 $pkg =~ s/::[^:]+$//
103 or return sub { die "unable to resolve function '$func'" };
104
105 local $@;
106 unless (eval "require $pkg; 1") {
107 my $error = $@;
108 $error =~ /^Can't locate .*.pm in \@INC \(/
109 or return sub { die $error };
110 }
111 } until defined &$func;
112 }
113
114 \&$func
115 }
116
117 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
118
119 sub nonce($) {
120 join "", map chr rand 256, 1 .. $_[0]
121 }
122
123 sub nonce62($) {
124 join "", map $alnum[rand 62], 1 .. $_[0]
125 }
126
127 sub gen_uniq {
128 my $now = AE::now;
129 (join "",
130 map $alnum[$_],
131 $$ / 62 % 62,
132 $$ % 62,
133 (int $now ) % 62,
134 (int $now * 100) % 62,
135 (int $now * 10000) % 62,
136 ) . nonce62 4;
137 }
138
139 our $CONFIG; # this node's configuration
140
141 our $RUNIQ; # remote uniq value
142 our $UNIQ; # per-process/node unique cookie
143 our $NODE;
144 our $ID = "a";
145
146 our %NODE; # node id to transport mapping, or "undef", for local node
147 our (%PORT, %PORT_DATA); # local ports
148
149 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
150 our %LMON; # monitored _local_ ports
151
152 our $GLOBAL; # true if node is a global ("directory") node
153 our %LISTENER;
154 our $LISTENER; # our listeners, as arrayref
155
156 our $SRCNODE; # holds the sending node _object_ during _inject
157
158 sub _init_names {
159 # ~54 bits, for local port names, lowercase $ID appended
160 $UNIQ = gen_uniq;
161
162 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
163 $RUNIQ = nonce62 10;
164 $RUNIQ =~ s/(.)$/\U$1/;
165
166 $NODE = "anon/$RUNIQ";
167 }
168
169 _init_names;
170
171 sub NODE() {
172 $NODE
173 }
174
175 sub node_of($) {
176 my ($node, undef) = split /#/, $_[0], 2;
177
178 $node
179 }
180
181 BEGIN {
182 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
183 ? sub () { 1 }
184 : sub () { 0 };
185 }
186
187 our $DELAY_TIMER;
188 our @DELAY_QUEUE;
189
190 sub _delay_run {
191 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
192 }
193
194 sub delay($) {
195 push @DELAY_QUEUE, shift;
196 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
197 }
198
199 sub _inject {
200 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
201 &{ $PORT{+shift} or return };
202 }
203
204 # this function adds a node-ref, so you can send stuff to it
205 # it is basically the central routing component.
206 sub add_node {
207 my ($node) = @_;
208
209 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
210 }
211
212 sub snd(@) {
213 my ($nodeid, $portid) = split /#/, shift, 2;
214
215 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
216
217 defined $nodeid #d#UGLY
218 or Carp::croak "'undef' is not a valid node ID/port ID";
219
220 ($NODE{$nodeid} || add_node $nodeid)
221 ->{send} (["$portid", @_]);
222 }
223
224 =item $is_local = port_is_local $port
225
226 Returns true iff the port is a local port.
227
228 =cut
229
230 sub port_is_local($) {
231 my ($nodeid, undef) = split /#/, $_[0], 2;
232
233 $NODE{$nodeid} == $NODE{""}
234 }
235
236 =item snd_to_func $node, $func, @args
237
238 Expects a node ID and a name of a function. Asynchronously tries to call
239 this function with the given arguments on that node.
240
241 This function can be used to implement C<spawn>-like interfaces.
242
243 =cut
244
245 sub snd_to_func($$;@) {
246 my $nodeid = shift;
247
248 # on $NODE, we artificially delay... (for spawn)
249 # this is very ugly - maybe we should simply delay ALL messages,
250 # to avoid deep recursion issues. but that's so... slow...
251 $AnyEvent::MP::Node::Self::DELAY = 1
252 if $nodeid ne $NODE;
253
254 defined $nodeid #d#UGLY
255 or Carp::croak "'undef' is not a valid node ID/port ID";
256
257 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
258 }
259
260 =item snd_on $node, @msg
261
262 Executes C<snd> with the given C<@msg> (which must include the destination
263 port) on the given node.
264
265 =cut
266
267 sub snd_on($@) {
268 my $node = shift;
269 snd $node, snd => @_;
270 }
271
272 =item eval_on $node, $string[, @reply]
273
274 Evaluates the given string as Perl expression on the given node. When
275 @reply is specified, then it is used to construct a reply message with
276 C<"$@"> and any results from the eval appended.
277
278 =cut
279
280 sub eval_on($$;@) {
281 my $node = shift;
282 snd $node, eval => @_;
283 }
284
285 sub kil(@) {
286 my ($nodeid, $portid) = split /#/, shift, 2;
287
288 length $portid
289 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
290
291 ($NODE{$nodeid} || add_node $nodeid)
292 ->kill ("$portid", @_);
293 }
294
295 #############################################################################
296 # node monitoring and info
297
298 =item node_is_known $nodeid
299
300 #TODO#
301 Returns true iff the given node is currently known to this node.
302
303 =cut
304
305 sub node_is_known($) {
306 exists $NODE{$_[0]}
307 }
308
309 =item node_is_up $nodeid
310
311 Returns true if the given node is "up", that is, the kernel thinks it has
312 a working connection to it.
313
314 If the node is known (to this local node) but not currently connected,
315 returns C<0>. If the node is not known, returns C<undef>.
316
317 =cut
318
319 sub node_is_up($) {
320 ($NODE{$_[0]} or return)->{transport}
321 ? 1 : 0
322 }
323
324 =item known_nodes
325
326 #TODO#
327 Returns the node IDs of all nodes currently known to this node, including
328 itself and nodes not currently connected.
329
330 =cut
331
332 sub known_nodes() {
333 map $_->{id}, values %NODE
334 }
335
336 =item up_nodes
337
338 Return the node IDs of all nodes that are currently connected (excluding
339 the node itself).
340
341 =cut
342
343 sub up_nodes() {
344 map $_->{id}, grep $_->{transport}, values %NODE
345 }
346
347 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
348
349 Registers a callback that is called each time a node goes up (a connection
350 is established) or down (the connection is lost).
351
352 Node up messages can only be followed by node down messages for the same
353 node, and vice versa.
354
355 Note that monitoring a node is usually better done by monitoring its node
356 port. This function is mainly of interest to modules that are concerned
357 about the network topology and low-level connection handling.
358
359 Callbacks I<must not> block and I<should not> send any messages.
360
361 The function returns an optional guard which can be used to unregister
362 the monitoring callback again.
363
364 Example: make sure you call function C<newnode> for all nodes that are up
365 or go up (and down).
366
367 newnode $_, 1 for up_nodes;
368 mon_nodes \&newnode;
369
370 =cut
371
372 our %MON_NODES;
373
374 sub mon_nodes($) {
375 my ($cb) = @_;
376
377 $MON_NODES{$cb+0} = $cb;
378
379 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
380 }
381
382 sub _inject_nodeevent($$;@) {
383 my ($node, $up, @reason) = @_;
384
385 for my $cb (values %MON_NODES) {
386 eval { $cb->($node->{id}, $up, @reason); 1 }
387 or $WARN->(1, $@);
388 }
389
390 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
391 }
392
393 #############################################################################
394 # self node code
395
396 sub _kill {
397 my $port = shift;
398
399 delete $PORT{$port}
400 or return; # killing nonexistent ports is O.K.
401 delete $PORT_DATA{$port};
402
403 my $mon = delete $LMON{$port}
404 or !@_
405 or $WARN->(2, "unmonitored local port $port died with reason: @_");
406
407 $_->(@_) for values %$mon;
408 }
409
410 sub _monitor {
411 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
412 unless exists $PORT{$_[1]};
413
414 $LMON{$_[1]}{$_[2]+0} = $_[2];
415 }
416
417 sub _unmonitor {
418 delete $LMON{$_[1]}{$_[2]+0}
419 if exists $LMON{$_[1]};
420 }
421
422 our %node_req = (
423 # internal services
424
425 # monitoring
426 mon0 => sub { # stop monitoring a port for another node
427 my $portid = shift;
428 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
429 },
430 mon1 => sub { # start monitoring a port for another node
431 my $portid = shift;
432 Scalar::Util::weaken (my $node = $SRCNODE);
433 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
434 delete $node->{rmon}{$portid};
435 $node->send (["", kil0 => $portid, @_])
436 if $node && $node->{transport};
437 };
438 },
439 # another node has killed a monitored port
440 kil0 => sub {
441 my $cbs = delete $SRCNODE->{lmon}{+shift}
442 or return;
443
444 $_->(@_) for @$cbs;
445 },
446
447 # "public" services - not actually public
448
449 # another node wants to kill a local port
450 kil => \&_kill,
451
452 # relay message to another node / generic echo
453 snd => \&snd,
454 snd_multiple => sub {
455 snd @$_ for @_
456 },
457
458 # random utilities
459 eval => sub {
460 #d#SECURE
461 my @res = do { package main; eval shift };
462 snd @_, "$@", @res if @_;
463 },
464 time => sub {
465 snd @_, AE::now;
466 },
467 devnull => sub {
468 #
469 },
470 "" => sub {
471 # empty messages are keepalives or similar devnull-applications
472 },
473 );
474
475 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
476 $PORT{""} = sub {
477 my $tag = shift;
478 #d#SECURE (load_func)
479 eval { &{ $node_req{$tag} ||= load_func $tag } };
480 $WARN->(2, "error processing node message: $@") if $@;
481 };
482
483 #############################################################################
484 # seed management, try to keep connections to all seeds at all times
485
486 our %SEED_NODE; # seed ID => node ID|undef
487 our %NODE_SEED; # map node ID to seed ID
488 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
489 our $SEED_WATCHER;
490 our $SEED_RETRY;
491
492 sub seed_connect {
493 my ($seed) = @_;
494
495 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
496 or Carp::croak "$seed: unparsable seed address";
497
498 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
499
500 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
501 $host, $port,
502 on_greeted => sub {
503 # called after receiving remote greeting, learn remote node name
504
505 # we rely on untrusted data here (the remote node name) this is
506 # hopefully ok, as this can at most be used for DOSing, which is easy
507 # when you can do MITM anyway.
508
509 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
510 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
511 require AnyEvent::MP::Global; # every seed becomes a global node currently
512 delete $SEED_NODE{$seed};
513 delete $NODE_SEED{$seed};
514 } else {
515 $SEED_NODE{$seed} = $_[0]{remote_node};
516 $NODE_SEED{$_[0]{remote_node}} = $seed;
517 }
518 },
519 on_destroy => sub {
520 delete $SEED_CONNECT{$seed};
521 },
522 sub {
523 $SEED_CONNECT{$seed} = 1;
524 }
525 ;
526 }
527
528 sub seed_all {
529 # my $next = List::Util::max 1,
530 # $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
531 # * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
532 # - rand;
533
534 my @seeds = grep {
535 !exists $SEED_CONNECT{$_}
536 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
537 } keys %SEED_NODE;
538
539 if (@seeds) {
540 # start connection attempt for every seed we are not connected to yet
541 seed_connect $_
542 for @seeds;
543
544 $SEED_RETRY = $SEED_RETRY * 2 + rand;
545 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
546 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
547
548 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
549
550 } else {
551 # all seeds connected or connecting, no need to restart timer
552 undef $SEED_WATCHER;
553 }
554 }
555
556 sub seed_again {
557 $SEED_RETRY = 1;
558 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
559 }
560
561 # sets new seed list, starts connecting
562 sub set_seeds(@) {
563 %SEED_NODE = ();
564 %NODE_SEED = ();
565 %SEED_CONNECT = ();
566
567 @SEED_NODE{@_} = ();
568
569 seed_again;#d#
570 seed_all;
571 }
572
573 mon_nodes sub {
574 # if we lost the connection to a seed node, make sure we are seeding
575 seed_again
576 if !$_[1] && exists $NODE_SEED{$_[0]};
577 };
578
579 #############################################################################
580 # talk with/to global nodes
581
582 # protocol messages:
583 #
584 # sent by all slave nodes (slave to master)
585 # g_slave database - make other global node master of the sender
586 #
587 # sent by any node to global nodes
588 # g_set database - set whole database
589 # g_add family key val - add/replace key to database
590 # g_del family key - delete key from database
591 # g_get family key reply... - send reply with data
592 #
593 # send by global nodes
594 # g_global - node became global, similar to global=1 greeting
595 #
596 # database families
597 # "'l" -> node -> listeners
598 # "'g" -> node -> undef
599 # ...
600 #
601
602 # used on all nodes:
603 our $MASTER; # the global node we bind ourselves to, unless we are global ourselves
604 our $MASTER_MON;
605 our %LOCAL_DB; # this node database
606
607 our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
608
609 #our $sv_ne_json = JSON::XS->new->canonical;
610 #
611 #sub sv_ne($$) {
612 # $sv_ne_json->encode ($_[0]) ne $sv_ne_json->encode ($_[1])
613 #}
614
615 # local database management
616 sub ldb_set($$;$) {
617 warn "ldb_set<@_>\n";#d#
618 if (@_ > 2) {
619 $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
620 snd $MASTER, g_add => $_[0] => $_[1] => $_[2]
621 if defined $MASTER;
622 } else {
623 delete $LOCAL_DB{$_[0]}{$_[1]};
624 snd $MASTER, g_del => $_[0] => $_[1]
625 if defined $MASTER;
626 }
627 }
628
629 #############################################################################
630 # master selection
631
632 # master requests
633 our %GLOBAL_REQ; # $id => \@req
634
635 sub global_req_add {
636 my $id = shift;
637
638 return if exists $GLOBAL_REQ{$id};
639
640 $GLOBAL_REQ{$id} = [@_];
641
642 snd $MASTER, @_
643 if $MASTER;
644 }
645
646 sub global_req_del {
647 delete $GLOBAL_REQ{$_[0]};
648 }
649
650 sub g_find {
651 global_req_add "g_find $_[0]", g_find => $_[0];
652 }
653
654 # reply for g_find started in Node.pm
655 $node_req{g_found} = sub {
656 global_req_del "g_find $_[0]";
657
658 @{ $_[1] } or return; # d'oh
659
660 my $node = $NODE{$_[0]} or return;
661
662 local $GLOBAL_DB{"'l"}{$_[0]} = $_[1]; #d# UGLY
663 $node->connect;
664 };
665
666 sub master_set {
667 $MASTER = $_[0];
668
669 snd $MASTER, g_slave => \%LOCAL_DB;
670
671 # (re-)send queued requests
672 snd $MASTER, @$_
673 for values %GLOBAL_REQ;
674 }
675
676 sub master_search {
677 #TODO: should also look for other global nodes, but we don't know them #d#
678 for (keys %NODE_SEED) {
679 if (node_is_up $_) {
680 master_set $_;
681 return;
682 }
683 }
684
685 $MASTER_MON = mon_nodes sub {
686 return unless $_[1]; # we are only interested in node-ups
687 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
688
689 master_set $_[0];
690
691 $MASTER_MON = mon_nodes sub {
692 if ($_[0] eq $MASTER && !$_[1]) {
693 undef $MASTER;
694 master_search ();
695 }
696 };
697 };
698 }
699
700 # other node wants to make us the master
701 $node_req{g_slave} = sub {
702 my ($db) = @_;
703
704 warn "slave1\n";#d#
705
706 require AnyEvent::MP::Global;
707 &{ $node_req{g_slave} };
708 };
709
710 #$node_req{g_reply} = sub {
711 # my $id = shift;
712 #
713 # my $cb = delete $GLOBAL_REQ{$id}
714 # or return;
715 #
716 # $cb->[0]->(@_);
717 #};
718
719 #############################################################################
720
721 #############################################################################
722
723 # $WARN->(1, "$SRCNODE->{id} treats us as global node, but we aren't");
724
725 #############################################################################
726 # configure
727
728 sub _nodename {
729 require POSIX;
730 (POSIX::uname ())[1]
731 }
732
733 sub _resolve($) {
734 my ($nodeid) = @_;
735
736 my $cv = AE::cv;
737 my @res;
738
739 $cv->begin (sub {
740 my %seen;
741 my @refs;
742 for (sort { $a->[0] <=> $b->[0] } @res) {
743 push @refs, $_->[1] unless $seen{$_->[1]}++
744 }
745 shift->send (@refs);
746 });
747
748 my $idx;
749 for my $t (split /,/, $nodeid) {
750 my $pri = ++$idx;
751
752 $t = length $t ? _nodename . ":$t" : _nodename
753 if $t =~ /^\d*$/;
754
755 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
756 or Carp::croak "$t: unparsable transport descriptor";
757
758 $port = "0" if $port eq "*";
759
760 if ($host eq "*") {
761 $cv->begin;
762 # use fork_call, as Net::Interface is big, and we need it rarely.
763 require AnyEvent::Util;
764 AnyEvent::Util::fork_call (
765 sub {
766 my @addr;
767
768 require Net::Interface;
769
770 for my $if (Net::Interface->interfaces) {
771 # we statically lower-prioritise ipv6 here, TODO :()
772 for $_ ($if->address (Net::Interface::AF_INET ())) {
773 next if /^\x7f/; # skip localhost etc.
774 push @addr, $_;
775 }
776 for ($if->address (Net::Interface::AF_INET6 ())) {
777 #next if $if->scope ($_) <= 2;
778 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
779 push @addr, $_;
780 }
781
782 }
783 @addr
784 }, sub {
785 for my $ip (@_) {
786 push @res, [
787 $pri += 1e-5,
788 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
789 ];
790 }
791 $cv->end;
792 }
793 );
794 } else {
795 $cv->begin;
796 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
797 for (@_) {
798 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
799 push @res, [
800 $pri += 1e-5,
801 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
802 ];
803 }
804 $cv->end;
805 };
806 }
807 }
808
809 $cv->end;
810
811 $cv
812 }
813
814 sub configure(@) {
815 unshift @_, "profile" if @_ & 1;
816 my (%kv) = @_;
817
818 delete $NODE{$NODE}; # we do not support doing stuff before configure
819 _init_names;
820
821 my $profile = delete $kv{profile};
822
823 $profile = _nodename
824 unless defined $profile;
825
826 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
827
828 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
829
830 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
831
832 $NODE = $node;
833
834 if ($NODE =~ s%/$%/$RUNIQ%) {
835 # nodes with randomised node names do not need randomised port names
836 $UNIQ = "";
837 }
838
839 $NODE{$NODE} = $NODE{""};
840 $NODE{$NODE}{id} = $NODE;
841
842 my $seeds = $CONFIG->{seeds};
843 my $binds = $CONFIG->{binds};
844
845 $binds ||= ["*"];
846
847 $WARN->(8, "node $NODE starting up.");
848
849 $LISTENER = [];
850 %LISTENER = ();
851
852 for (map _resolve $_, @$binds) {
853 for my $bind ($_->recv) {
854 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
855 or Carp::croak "$bind: unparsable local bind address";
856
857 my $listener = AnyEvent::MP::Transport::mp_server
858 $host,
859 $port,
860 prepare => sub {
861 my (undef, $host, $port) = @_;
862 $bind = AnyEvent::Socket::format_hostport $host, $port;
863 0
864 },
865 ;
866 $LISTENER{$bind} = $listener;
867 push @$LISTENER, $bind;
868 }
869 }
870
871 ldb_set "'l" => $NODE => $LISTENER;
872
873 $WARN->(8, "node listens on [@$LISTENER].");
874
875 # the global service is mandatory currently
876 #require AnyEvent::MP::Global;
877
878 # connect to all seednodes
879 set_seeds map $_->recv, map _resolve $_, @$seeds;
880
881 master_search;
882
883 if ($NODE eq "atha") {;#d#
884 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
885 }
886
887 for (@{ $CONFIG->{services} }) {
888 if (ref) {
889 my ($func, @args) = @$_;
890 (load_func $func)->(@args);
891 } elsif (s/::$//) {
892 eval "require $_";
893 die $@ if $@;
894 } else {
895 (load_func $_)->();
896 }
897 }
898 }
899
900 =back
901
902 =head1 SEE ALSO
903
904 L<AnyEvent::MP>.
905
906 =head1 AUTHOR
907
908 Marc Lehmann <schmorp@schmorp.de>
909 http://home.schmorp.de/
910
911 =cut
912
913 1
914