ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.69
Committed: Sun Feb 26 10:29:59 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.68: +276 -165 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38     our @EXPORT = qw(
39 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 root 1.21 add_node load_func snd_to_func snd_on eval_on
41 root 1.1
42 root 1.34 NODE $NODE node_of snd kil port_is_local
43     configure
44 root 1.46 up_nodes mon_nodes node_is_up
45 root 1.1 );
46    
47 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
48 root 1.1
49 root 1.27 This value is called with an error or warning message, when e.g. a
50     connection could not be created, authorisation failed and so on.
51    
52     It I<must not> block or send messages -queue it and use an idle watcher if
53     you need to do any of these things.
54 root 1.1
55 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
56 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
57     node connectivity and services, C<8> for debugging messages and C<9> for
58     tracing messages.
59    
60 root 1.1 The default simply logs the message to STDERR.
61    
62 root 1.44 =item @AnyEvent::MP::Kernel::WARN
63    
64     All code references in this array are called for every log message, from
65     the default C<$WARN> handler. This is an easy way to tie into the log
66     messages without disturbing others.
67    
68 root 1.1 =cut
69    
70 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
71 root 1.44 our @WARN;
72     our $WARN = sub {
73     &$_ for @WARN;
74 root 1.29
75     return if $WARNLEVEL < $_[0];
76    
77 root 1.16 my ($level, $msg) = @_;
78    
79 root 1.1 $msg =~ s/\n$//;
80 root 1.16
81     printf STDERR "%s <%d> %s\n",
82     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
83     $level,
84     $msg;
85 root 1.1 };
86    
87 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
88    
89     The maximum level at which warning messages will be printed to STDERR by
90     the default warn handler.
91    
92     =cut
93    
94 root 1.6 sub load_func($) {
95     my $func = $_[0];
96    
97     unless (defined &$func) {
98     my $pkg = $func;
99     do {
100     $pkg =~ s/::[^:]+$//
101 root 1.63 or return sub { die "unable to resolve function '$func'" };
102 root 1.60
103     local $@;
104 root 1.61 unless (eval "require $pkg; 1") {
105     my $error = $@;
106     $error =~ /^Can't locate .*.pm in \@INC \(/
107     or return sub { die $error };
108     }
109 root 1.6 } until defined &$func;
110     }
111    
112     \&$func
113     }
114    
115 root 1.1 sub nonce($) {
116     my $nonce;
117    
118     if (open my $fh, "</dev/urandom") {
119     sysread $fh, $nonce, $_[0];
120     } else {
121     # shit...
122     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
123     }
124    
125     $nonce
126     }
127    
128 root 1.21 sub alnumbits($) {
129 root 1.1 my $data = $_[0];
130    
131     if (eval "use Math::GMP 2.05; 1") {
132     $data = Math::GMP::get_str_gmp (
133     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
134     62
135     );
136     } else {
137     $data = MIME::Base64::encode_base64 $data, "";
138     $data =~ s/=//;
139 root 1.31 $data =~ s/x/x0/g;
140     $data =~ s/\//x1/g;
141     $data =~ s/\+/x2/g;
142 root 1.1 }
143    
144     $data
145     }
146    
147     sub gen_uniq {
148 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
149 root 1.1 }
150    
151 root 1.20 our $CONFIG; # this node's configuration
152 root 1.21
153 root 1.64 our $RUNIQ; # remote uniq value
154     our $UNIQ; # per-process/node unique cookie
155     our $NODE;
156     our $ID = "a";
157 root 1.1
158     our %NODE; # node id to transport mapping, or "undef", for local node
159     our (%PORT, %PORT_DATA); # local ports
160    
161 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
162 root 1.1 our %LMON; # monitored _local_ ports
163    
164     our %LISTENER;
165 root 1.21 our $LISTENER; # our listeners, as arrayref
166 root 1.1
167     our $SRCNODE; # holds the sending node during _inject
168    
169 root 1.69 sub _init_names {
170 root 1.64 $RUNIQ = alnumbits nonce 96/8;
171     $UNIQ = gen_uniq;
172     $NODE = "anon/$RUNIQ";
173     }
174    
175 root 1.69 _init_names;
176 root 1.64
177 root 1.1 sub NODE() {
178     $NODE
179     }
180    
181     sub node_of($) {
182 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
183 root 1.1
184 root 1.21 $node
185 root 1.1 }
186    
187 root 1.17 BEGIN {
188     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
189     ? sub () { 1 }
190     : sub () { 0 };
191     }
192 root 1.1
193 root 1.42 our $DELAY_TIMER;
194     our @DELAY_QUEUE;
195    
196     sub _delay_run {
197 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
198 root 1.42 }
199    
200     sub delay($) {
201     push @DELAY_QUEUE, shift;
202     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
203     }
204    
205 root 1.1 sub _inject {
206 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
207 root 1.1 &{ $PORT{+shift} or return };
208     }
209    
210 root 1.20 # this function adds a node-ref, so you can send stuff to it
211     # it is basically the central routing component.
212 root 1.1 sub add_node {
213 root 1.21 my ($node) = @_;
214 root 1.1
215 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
216 root 1.13 }
217    
218 root 1.1 sub snd(@) {
219 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
220 root 1.1
221 root 1.48 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
222 root 1.1
223 root 1.49 defined $nodeid #d#UGLY
224     or Carp::croak "'undef' is not a valid node ID/port ID";
225    
226 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
227 root 1.2 ->{send} (["$portid", @_]);
228 root 1.1 }
229    
230 root 1.17 =item $is_local = port_is_local $port
231    
232     Returns true iff the port is a local port.
233    
234     =cut
235    
236     sub port_is_local($) {
237 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
238 root 1.17
239 root 1.21 $NODE{$nodeid} == $NODE{""}
240 root 1.17 }
241    
242 root 1.18 =item snd_to_func $node, $func, @args
243 root 1.11
244 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
245 root 1.11 this function with the given arguments on that node.
246    
247 root 1.20 This function can be used to implement C<spawn>-like interfaces.
248 root 1.11
249     =cut
250    
251 root 1.18 sub snd_to_func($$;@) {
252 root 1.21 my $nodeid = shift;
253 root 1.11
254 root 1.41 # on $NODE, we artificially delay... (for spawn)
255     # this is very ugly - maybe we should simply delay ALL messages,
256     # to avoid deep recursion issues. but that's so... slow...
257 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
258     if $nodeid ne $NODE;
259    
260 root 1.49 defined $nodeid #d#UGLY
261     or Carp::croak "'undef' is not a valid node ID/port ID";
262    
263 root 1.45 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
264 root 1.11 }
265    
266 root 1.18 =item snd_on $node, @msg
267    
268     Executes C<snd> with the given C<@msg> (which must include the destination
269     port) on the given node.
270    
271     =cut
272    
273     sub snd_on($@) {
274     my $node = shift;
275     snd $node, snd => @_;
276     }
277    
278 root 1.29 =item eval_on $node, $string[, @reply]
279 root 1.18
280 root 1.29 Evaluates the given string as Perl expression on the given node. When
281     @reply is specified, then it is used to construct a reply message with
282     C<"$@"> and any results from the eval appended.
283 root 1.18
284     =cut
285    
286 root 1.29 sub eval_on($$;@) {
287 root 1.18 my $node = shift;
288     snd $node, eval => @_;
289     }
290    
291 root 1.1 sub kil(@) {
292 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
293 root 1.1
294     length $portid
295 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
296 root 1.1
297 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
298 root 1.1 ->kill ("$portid", @_);
299     }
300    
301     #############################################################################
302 root 1.6 # node monitoring and info
303 root 1.3
304 root 1.21 =item node_is_known $nodeid
305 root 1.13
306 root 1.46 Returns true iff the given node is currently known to the system. The only
307 root 1.69 time a node is known but not up currently is when a connection request is
308 root 1.46 pending.
309 root 1.13
310     =cut
311    
312     sub node_is_known($) {
313     exists $NODE{$_[0]}
314     }
315    
316 root 1.21 =item node_is_up $nodeid
317 root 1.13
318     Returns true if the given node is "up", that is, the kernel thinks it has
319     a working connection to it.
320    
321 root 1.69 If the node is known (to this local node) but not currently connected,
322     returns C<0>. If the node is not known, returns C<undef>.
323 root 1.13
324     =cut
325    
326     sub node_is_up($) {
327     ($NODE{$_[0]} or return)->{transport}
328     ? 1 : 0
329     }
330    
331 root 1.3 =item known_nodes
332    
333 root 1.26 Returns the node IDs of all nodes currently known to this node, including
334     itself and nodes not currently connected.
335 root 1.3
336     =cut
337    
338 root 1.49 sub known_nodes() {
339 root 1.26 map $_->{id}, values %NODE
340 root 1.3 }
341    
342     =item up_nodes
343    
344 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
345     the node itself).
346 root 1.3
347     =cut
348    
349 root 1.49 sub up_nodes() {
350 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
351 root 1.3 }
352    
353 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
354 root 1.3
355 root 1.27 Registers a callback that is called each time a node goes up (a connection
356     is established) or down (the connection is lost).
357 root 1.3
358     Node up messages can only be followed by node down messages for the same
359     node, and vice versa.
360    
361 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
362     port. This function is mainly of interest to modules that are concerned
363     about the network topology and low-level connection handling.
364    
365     Callbacks I<must not> block and I<should not> send any messages.
366    
367     The function returns an optional guard which can be used to unregister
368 root 1.3 the monitoring callback again.
369    
370 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
371     or go up (and down).
372    
373     newnode $_, 1 for up_nodes;
374     mon_nodes \&newnode;
375    
376 root 1.3 =cut
377    
378     our %MON_NODES;
379    
380     sub mon_nodes($) {
381     my ($cb) = @_;
382    
383     $MON_NODES{$cb+0} = $cb;
384    
385 root 1.62 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
386 root 1.3 }
387    
388     sub _inject_nodeevent($$;@) {
389 root 1.16 my ($node, $up, @reason) = @_;
390 root 1.3
391     for my $cb (values %MON_NODES) {
392 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
393 root 1.16 or $WARN->(1, $@);
394 root 1.3 }
395 root 1.16
396 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
397 root 1.3 }
398    
399     #############################################################################
400 root 1.1 # self node code
401    
402 root 1.67 sub _kill {
403     my $port = shift;
404    
405     delete $PORT{$port}
406     or return; # killing nonexistent ports is O.K.
407     delete $PORT_DATA{$port};
408    
409     my $mon = delete $LMON{$port}
410     or !@_
411     or $WARN->(2, "unmonitored local port $port died with reason: @_");
412    
413     $_->(@_) for values %$mon;
414     }
415    
416     sub _monitor {
417     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
418     unless exists $PORT{$_[1]};
419    
420     $LMON{$_[1]}{$_[2]+0} = $_[2];
421     }
422    
423     sub _unmonitor {
424 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
425     if exists $LMON{$_[1]};
426 root 1.67 }
427    
428 root 1.1 our %node_req = (
429     # internal services
430    
431     # monitoring
432 root 1.65 mon0 => sub { # stop monitoring a port for another node
433 root 1.1 my $portid = shift;
434 root 1.67 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
435 root 1.1 },
436 root 1.65 mon1 => sub { # start monitoring a port for another node
437 root 1.1 my $portid = shift;
438 root 1.67 Scalar::Util::weaken (my $node = $SRCNODE);
439     _monitor undef, $portid, $node->{rmon}{$portid} = sub {
440 root 1.58 delete $node->{rmon}{$portid};
441 root 1.65 $node->send (["", kil0 => $portid, @_])
442 root 1.59 if $node && $node->{transport};
443 root 1.67 };
444 root 1.1 },
445 root 1.65 # another node has killed a monitored port
446     kil0 => sub {
447 root 1.1 my $cbs = delete $SRCNODE->{lmon}{+shift}
448     or return;
449    
450     $_->(@_) for @$cbs;
451     },
452    
453 root 1.18 # "public" services - not actually public
454 root 1.1
455 root 1.65 # another node wants to kill a local port
456 root 1.66 kil => \&_kill,
457 root 1.65
458 root 1.1 # relay message to another node / generic echo
459 root 1.15 snd => \&snd,
460 root 1.27 snd_multiple => sub {
461 root 1.1 snd @$_ for @_
462     },
463    
464 root 1.4 # informational
465     info => sub {
466     snd @_, $NODE;
467     },
468     known_nodes => sub {
469     snd @_, known_nodes;
470     },
471     up_nodes => sub {
472     snd @_, up_nodes;
473     },
474    
475 root 1.30 # random utilities
476 root 1.1 eval => sub {
477 root 1.50 my @res = do { package main; eval shift };
478 root 1.1 snd @_, "$@", @res if @_;
479     },
480     time => sub {
481     snd @_, AE::time;
482     },
483     devnull => sub {
484     #
485     },
486 root 1.15 "" => sub {
487 root 1.27 # empty messages are keepalives or similar devnull-applications
488 root 1.15 },
489 root 1.1 );
490    
491 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
492 root 1.1 $PORT{""} = sub {
493     my $tag = shift;
494     eval { &{ $node_req{$tag} ||= load_func $tag } };
495 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
496 root 1.1 };
497    
498 root 1.69 #############################################################################
499     # seed management, try to keep connections to all seeds
500    
501     our %SEED_NODE; # seed ID => node ID|undef
502     our %NODE_SEED; # map node ID to seed ID
503     our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
504     our $SEED_WATCHER;
505    
506     # called before sending the greeting, grabs address we connect to
507     push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
508     defined (my $seed = $_[0]{seed})
509     or return;
510    
511     $SEED_CONNECT{$seed} = 2;
512     };
513    
514     # called after receiving remote greeting, grabs remote node name
515     push @AnyEvent::MP::Transport::HOOK_GREETING, sub {
516     defined (my $seed = $_[0]{seed})
517     or return;
518    
519     # we rely on untrusted data here (the remote node name) this is
520     # hopefully ok, as this can at most be used for DOSing, which is easy
521     # when you can do MITM anyway.
522    
523     # if we connect to ourselves, nuke this seed
524     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
525     delete $SEED_NODE{$_[0]{seed}};
526     delete $NODE_SEED{$_[0]{seed}};
527     } else {
528     $SEED_NODE{$seed} = $_[0]{remote_node};
529     $NODE_SEED{$_[0]{remote_node}} = $seed;
530     }
531     };
532    
533     ## called when connection is up, same as above, but now verified
534     #push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
535     # defined (my $seed = $_[0]{seed})
536     # or return;
537     # AE::log 5, "connected($seed)\n";#d#
538     #
539     # $SEED_NODE{$seed} = $_[0]{remote_node};
540     # $NODE_SEED{$_[0]{remote_node}} = $seed;
541     #};
542    
543     # called when connections get destroyed, update our data structures
544     # and check for self-connects
545     push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
546     # if we lost the connection to a seed node, make sure we start seeding
547     seed_again ()#d#
548     if exists $NODE_SEED{ $_[0]{remote_node} };
549    
550     defined (my $seed = $_[0]{seed})
551     or return;
552    
553     delete $SEED_CONNECT{$seed};
554     };
555    
556     sub seed_connect {
557     my ($seed) = @_;
558    
559     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
560     or Carp::croak "$seed: unparsable seed address";
561    
562     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
563    
564     # ughhh
565     $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
566     seed => $seed,
567     sub {
568     $SEED_CONNECT{$seed} = 1;
569     },
570     ;
571     }
572    
573     sub seed_all {
574     # my $next = List::Util::max 1,
575     # $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
576     # * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
577     # - rand;
578    
579     my @seeds = grep {
580     !exists $SEED_CONNECT{$_}
581     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
582     } keys %SEED_NODE;
583    
584     if (@seeds) {
585     # start conenction attempt for every seed we are not connected to yet
586     seed_connect $_
587     for @seeds;
588     } else {
589     # all seeds connected or connecting
590     undef $SEED_WATCHER;
591     }
592     }
593    
594     sub seed_again {
595     $SEED_WATCHER ||= AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, \&seed_all;
596     }
597    
598     # sets new seed list, starts connecting
599     sub set_seeds(@) {
600     %SEED_NODE = ();
601     @SEED_NODE{@_} = ();
602    
603     seed_again;
604     }
605    
606     #############################################################################
607     # configure
608    
609     sub _nodename {
610     require POSIX;
611     (POSIX::uname ())[1]
612     }
613    
614     sub _resolve($) {
615     my ($nodeid) = @_;
616    
617     my $cv = AE::cv;
618     my @res;
619    
620     $cv->begin (sub {
621     my %seen;
622     my @refs;
623     for (sort { $a->[0] <=> $b->[0] } @res) {
624     push @refs, $_->[1] unless $seen{$_->[1]}++
625     }
626     shift->send (@refs);
627     });
628    
629     my $idx;
630     for my $t (split /,/, $nodeid) {
631     my $pri = ++$idx;
632    
633     $t = length $t ? _nodename . ":$t" : _nodename
634     if $t =~ /^\d*$/;
635    
636     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
637     or Carp::croak "$t: unparsable transport descriptor";
638    
639     $port = "0" if $port eq "*";
640    
641     if ($host eq "*") {
642     $cv->begin;
643     # use fork_call, as Net::Interface is big, and we need it rarely.
644     require AnyEvent::Util;
645     AnyEvent::Util::fork_call (
646     sub {
647     my @addr;
648    
649     require Net::Interface;
650    
651     for my $if (Net::Interface->interfaces) {
652     # we statically lower-prioritise ipv6 here, TODO :()
653     for $_ ($if->address (Net::Interface::AF_INET ())) {
654     next if /^\x7f/; # skip localhost etc.
655     push @addr, $_;
656     }
657     for ($if->address (Net::Interface::AF_INET6 ())) {
658     #next if $if->scope ($_) <= 2;
659     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
660     push @addr, $_;
661     }
662    
663     }
664     @addr
665     }, sub {
666     for my $ip (@_) {
667     push @res, [
668     $pri += 1e-5,
669     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
670     ];
671     }
672     $cv->end;
673     }
674     );
675     } else {
676     $cv->begin;
677     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
678     for (@_) {
679     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
680     push @res, [
681     $pri += 1e-5,
682     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
683     ];
684     }
685     $cv->end;
686     };
687     }
688     }
689    
690     $cv->end;
691    
692     $cv
693     }
694    
695     sub configure(@) {
696     unshift @_, "profile" if @_ & 1;
697     my (%kv) = @_;
698    
699     delete $NODE{$NODE}; # we do not support doing stuff before configure
700     _init_names;
701    
702     my $profile = delete $kv{profile};
703    
704     $profile = _nodename
705     unless defined $profile;
706    
707     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
708    
709     my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
710    
711     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
712    
713     $NODE = $node
714     unless $node eq "anon/";
715    
716     $NODE{$NODE} = $NODE{""};
717     $NODE{$NODE}{id} = $NODE;
718    
719     my $seeds = $CONFIG->{seeds};
720     my $binds = $CONFIG->{binds};
721    
722     $binds ||= ["*"];
723    
724     $WARN->(8, "node $NODE starting up.");
725    
726     $LISTENER = [];
727     %LISTENER = ();
728    
729     for (map _resolve $_, @$binds) {
730     for my $bind ($_->recv) {
731     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
732     or Carp::croak "$bind: unparsable local bind address";
733    
734     my $listener = AnyEvent::MP::Transport::mp_server
735     $host,
736     $port,
737     prepare => sub {
738     my (undef, $host, $port) = @_;
739     $bind = AnyEvent::Socket::format_hostport $host, $port;
740     0
741     },
742     ;
743     $LISTENER{$bind} = $listener;
744     push @$LISTENER, $bind;
745     }
746     }
747    
748     $WARN->(8, "node listens on [@$LISTENER].");
749    
750     # the global service is mandatory currently
751     require AnyEvent::MP::Global;
752    
753     # connect to all seednodes
754     set_seeds map $_->recv, map _resolve $_, @$seeds;
755    
756     for (@{ $CONFIG->{services} }) {
757     if (ref) {
758     my ($func, @args) = @$_;
759     (load_func $func)->(@args);
760     } elsif (s/::$//) {
761     eval "require $_";
762     die $@ if $@;
763     } else {
764     (load_func $_)->();
765     }
766     }
767     }
768    
769 root 1.1 =back
770    
771     =head1 SEE ALSO
772    
773     L<AnyEvent::MP>.
774    
775     =head1 AUTHOR
776    
777     Marc Lehmann <schmorp@schmorp.de>
778     http://home.schmorp.de/
779    
780     =cut
781    
782     1
783