ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.70
Committed: Sun Feb 26 11:12:54 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.69: +2 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38     our @EXPORT = qw(
39 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 root 1.21 add_node load_func snd_to_func snd_on eval_on
41 root 1.1
42 root 1.34 NODE $NODE node_of snd kil port_is_local
43     configure
44 root 1.46 up_nodes mon_nodes node_is_up
45 root 1.1 );
46    
47 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
48 root 1.1
49 root 1.27 This value is called with an error or warning message, when e.g. a
50     connection could not be created, authorisation failed and so on.
51    
52     It I<must not> block or send messages -queue it and use an idle watcher if
53     you need to do any of these things.
54 root 1.1
55 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
56 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
57     node connectivity and services, C<8> for debugging messages and C<9> for
58     tracing messages.
59    
60 root 1.1 The default simply logs the message to STDERR.
61    
62 root 1.44 =item @AnyEvent::MP::Kernel::WARN
63    
64     All code references in this array are called for every log message, from
65     the default C<$WARN> handler. This is an easy way to tie into the log
66     messages without disturbing others.
67    
68 root 1.1 =cut
69    
70 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
71 root 1.44 our @WARN;
72     our $WARN = sub {
73     &$_ for @WARN;
74 root 1.29
75     return if $WARNLEVEL < $_[0];
76    
77 root 1.16 my ($level, $msg) = @_;
78    
79 root 1.1 $msg =~ s/\n$//;
80 root 1.16
81     printf STDERR "%s <%d> %s\n",
82     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
83     $level,
84     $msg;
85 root 1.1 };
86    
87 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
88    
89     The maximum level at which warning messages will be printed to STDERR by
90     the default warn handler.
91    
92     =cut
93    
94 root 1.6 sub load_func($) {
95     my $func = $_[0];
96    
97     unless (defined &$func) {
98     my $pkg = $func;
99     do {
100     $pkg =~ s/::[^:]+$//
101 root 1.63 or return sub { die "unable to resolve function '$func'" };
102 root 1.60
103     local $@;
104 root 1.61 unless (eval "require $pkg; 1") {
105     my $error = $@;
106     $error =~ /^Can't locate .*.pm in \@INC \(/
107     or return sub { die $error };
108     }
109 root 1.6 } until defined &$func;
110     }
111    
112     \&$func
113     }
114    
115 root 1.1 sub nonce($) {
116     my $nonce;
117    
118     if (open my $fh, "</dev/urandom") {
119     sysread $fh, $nonce, $_[0];
120     } else {
121     # shit...
122     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
123     }
124    
125     $nonce
126     }
127    
128 root 1.21 sub alnumbits($) {
129 root 1.1 my $data = $_[0];
130    
131     if (eval "use Math::GMP 2.05; 1") {
132     $data = Math::GMP::get_str_gmp (
133     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
134     62
135     );
136     } else {
137     $data = MIME::Base64::encode_base64 $data, "";
138     $data =~ s/=//;
139 root 1.31 $data =~ s/x/x0/g;
140     $data =~ s/\//x1/g;
141     $data =~ s/\+/x2/g;
142 root 1.1 }
143    
144     $data
145     }
146    
147     sub gen_uniq {
148 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
149 root 1.1 }
150    
151 root 1.20 our $CONFIG; # this node's configuration
152 root 1.21
153 root 1.64 our $RUNIQ; # remote uniq value
154     our $UNIQ; # per-process/node unique cookie
155     our $NODE;
156     our $ID = "a";
157 root 1.1
158     our %NODE; # node id to transport mapping, or "undef", for local node
159     our (%PORT, %PORT_DATA); # local ports
160    
161 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
162 root 1.1 our %LMON; # monitored _local_ ports
163    
164     our %LISTENER;
165 root 1.21 our $LISTENER; # our listeners, as arrayref
166 root 1.1
167     our $SRCNODE; # holds the sending node during _inject
168    
169 root 1.69 sub _init_names {
170 root 1.64 $RUNIQ = alnumbits nonce 96/8;
171     $UNIQ = gen_uniq;
172     $NODE = "anon/$RUNIQ";
173     }
174    
175 root 1.69 _init_names;
176 root 1.64
177 root 1.1 sub NODE() {
178     $NODE
179     }
180    
181     sub node_of($) {
182 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
183 root 1.1
184 root 1.21 $node
185 root 1.1 }
186    
187 root 1.17 BEGIN {
188     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
189     ? sub () { 1 }
190     : sub () { 0 };
191     }
192 root 1.1
193 root 1.42 our $DELAY_TIMER;
194     our @DELAY_QUEUE;
195    
196     sub _delay_run {
197 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
198 root 1.42 }
199    
200     sub delay($) {
201     push @DELAY_QUEUE, shift;
202     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
203     }
204    
205 root 1.1 sub _inject {
206 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
207 root 1.1 &{ $PORT{+shift} or return };
208     }
209    
210 root 1.20 # this function adds a node-ref, so you can send stuff to it
211     # it is basically the central routing component.
212 root 1.1 sub add_node {
213 root 1.21 my ($node) = @_;
214 root 1.1
215 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
216 root 1.13 }
217    
218 root 1.1 sub snd(@) {
219 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
220 root 1.1
221 root 1.48 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
222 root 1.1
223 root 1.49 defined $nodeid #d#UGLY
224     or Carp::croak "'undef' is not a valid node ID/port ID";
225    
226 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
227 root 1.2 ->{send} (["$portid", @_]);
228 root 1.1 }
229    
230 root 1.17 =item $is_local = port_is_local $port
231    
232     Returns true iff the port is a local port.
233    
234     =cut
235    
236     sub port_is_local($) {
237 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
238 root 1.17
239 root 1.21 $NODE{$nodeid} == $NODE{""}
240 root 1.17 }
241    
242 root 1.18 =item snd_to_func $node, $func, @args
243 root 1.11
244 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
245 root 1.11 this function with the given arguments on that node.
246    
247 root 1.20 This function can be used to implement C<spawn>-like interfaces.
248 root 1.11
249     =cut
250    
251 root 1.18 sub snd_to_func($$;@) {
252 root 1.21 my $nodeid = shift;
253 root 1.11
254 root 1.41 # on $NODE, we artificially delay... (for spawn)
255     # this is very ugly - maybe we should simply delay ALL messages,
256     # to avoid deep recursion issues. but that's so... slow...
257 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
258     if $nodeid ne $NODE;
259    
260 root 1.49 defined $nodeid #d#UGLY
261     or Carp::croak "'undef' is not a valid node ID/port ID";
262    
263 root 1.45 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
264 root 1.11 }
265    
266 root 1.18 =item snd_on $node, @msg
267    
268     Executes C<snd> with the given C<@msg> (which must include the destination
269     port) on the given node.
270    
271     =cut
272    
273     sub snd_on($@) {
274     my $node = shift;
275     snd $node, snd => @_;
276     }
277    
278 root 1.29 =item eval_on $node, $string[, @reply]
279 root 1.18
280 root 1.29 Evaluates the given string as Perl expression on the given node. When
281     @reply is specified, then it is used to construct a reply message with
282     C<"$@"> and any results from the eval appended.
283 root 1.18
284     =cut
285    
286 root 1.29 sub eval_on($$;@) {
287 root 1.18 my $node = shift;
288     snd $node, eval => @_;
289     }
290    
291 root 1.1 sub kil(@) {
292 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
293 root 1.1
294     length $portid
295 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
296 root 1.1
297 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
298 root 1.1 ->kill ("$portid", @_);
299     }
300    
301     #############################################################################
302 root 1.6 # node monitoring and info
303 root 1.3
304 root 1.21 =item node_is_known $nodeid
305 root 1.13
306 root 1.46 Returns true iff the given node is currently known to the system. The only
307 root 1.69 time a node is known but not up currently is when a connection request is
308 root 1.46 pending.
309 root 1.13
310     =cut
311    
312     sub node_is_known($) {
313     exists $NODE{$_[0]}
314     }
315    
316 root 1.21 =item node_is_up $nodeid
317 root 1.13
318     Returns true if the given node is "up", that is, the kernel thinks it has
319     a working connection to it.
320    
321 root 1.69 If the node is known (to this local node) but not currently connected,
322     returns C<0>. If the node is not known, returns C<undef>.
323 root 1.13
324     =cut
325    
326     sub node_is_up($) {
327     ($NODE{$_[0]} or return)->{transport}
328     ? 1 : 0
329     }
330    
331 root 1.3 =item known_nodes
332    
333 root 1.26 Returns the node IDs of all nodes currently known to this node, including
334     itself and nodes not currently connected.
335 root 1.3
336     =cut
337    
338 root 1.49 sub known_nodes() {
339 root 1.26 map $_->{id}, values %NODE
340 root 1.3 }
341    
342     =item up_nodes
343    
344 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
345     the node itself).
346 root 1.3
347     =cut
348    
349 root 1.49 sub up_nodes() {
350 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
351 root 1.3 }
352    
353 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
354 root 1.3
355 root 1.27 Registers a callback that is called each time a node goes up (a connection
356     is established) or down (the connection is lost).
357 root 1.3
358     Node up messages can only be followed by node down messages for the same
359     node, and vice versa.
360    
361 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
362     port. This function is mainly of interest to modules that are concerned
363     about the network topology and low-level connection handling.
364    
365     Callbacks I<must not> block and I<should not> send any messages.
366    
367     The function returns an optional guard which can be used to unregister
368 root 1.3 the monitoring callback again.
369    
370 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
371     or go up (and down).
372    
373     newnode $_, 1 for up_nodes;
374     mon_nodes \&newnode;
375    
376 root 1.3 =cut
377    
378     our %MON_NODES;
379    
380     sub mon_nodes($) {
381     my ($cb) = @_;
382    
383     $MON_NODES{$cb+0} = $cb;
384    
385 root 1.62 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
386 root 1.3 }
387    
388     sub _inject_nodeevent($$;@) {
389 root 1.16 my ($node, $up, @reason) = @_;
390 root 1.3
391     for my $cb (values %MON_NODES) {
392 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
393 root 1.16 or $WARN->(1, $@);
394 root 1.3 }
395 root 1.16
396 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
397 root 1.3 }
398    
399     #############################################################################
400 root 1.1 # self node code
401    
402 root 1.67 sub _kill {
403     my $port = shift;
404    
405     delete $PORT{$port}
406     or return; # killing nonexistent ports is O.K.
407     delete $PORT_DATA{$port};
408    
409     my $mon = delete $LMON{$port}
410     or !@_
411     or $WARN->(2, "unmonitored local port $port died with reason: @_");
412    
413     $_->(@_) for values %$mon;
414     }
415    
416     sub _monitor {
417     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
418     unless exists $PORT{$_[1]};
419    
420     $LMON{$_[1]}{$_[2]+0} = $_[2];
421     }
422    
423     sub _unmonitor {
424 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
425     if exists $LMON{$_[1]};
426 root 1.67 }
427    
428 root 1.1 our %node_req = (
429     # internal services
430    
431     # monitoring
432 root 1.65 mon0 => sub { # stop monitoring a port for another node
433 root 1.1 my $portid = shift;
434 root 1.67 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
435 root 1.1 },
436 root 1.65 mon1 => sub { # start monitoring a port for another node
437 root 1.1 my $portid = shift;
438 root 1.67 Scalar::Util::weaken (my $node = $SRCNODE);
439     _monitor undef, $portid, $node->{rmon}{$portid} = sub {
440 root 1.58 delete $node->{rmon}{$portid};
441 root 1.65 $node->send (["", kil0 => $portid, @_])
442 root 1.59 if $node && $node->{transport};
443 root 1.67 };
444 root 1.1 },
445 root 1.65 # another node has killed a monitored port
446     kil0 => sub {
447 root 1.1 my $cbs = delete $SRCNODE->{lmon}{+shift}
448     or return;
449    
450     $_->(@_) for @$cbs;
451     },
452    
453 root 1.18 # "public" services - not actually public
454 root 1.1
455 root 1.65 # another node wants to kill a local port
456 root 1.66 kil => \&_kill,
457 root 1.65
458 root 1.1 # relay message to another node / generic echo
459 root 1.15 snd => \&snd,
460 root 1.27 snd_multiple => sub {
461 root 1.1 snd @$_ for @_
462     },
463    
464 root 1.4 # informational
465     info => sub {
466     snd @_, $NODE;
467     },
468     known_nodes => sub {
469     snd @_, known_nodes;
470     },
471     up_nodes => sub {
472     snd @_, up_nodes;
473     },
474    
475 root 1.30 # random utilities
476 root 1.1 eval => sub {
477 root 1.50 my @res = do { package main; eval shift };
478 root 1.1 snd @_, "$@", @res if @_;
479     },
480     time => sub {
481     snd @_, AE::time;
482     },
483     devnull => sub {
484     #
485     },
486 root 1.15 "" => sub {
487 root 1.27 # empty messages are keepalives or similar devnull-applications
488 root 1.15 },
489 root 1.1 );
490    
491 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
492 root 1.1 $PORT{""} = sub {
493     my $tag = shift;
494     eval { &{ $node_req{$tag} ||= load_func $tag } };
495 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
496 root 1.1 };
497    
498 root 1.69 #############################################################################
499     # seed management, try to keep connections to all seeds
500    
501     our %SEED_NODE; # seed ID => node ID|undef
502     our %NODE_SEED; # map node ID to seed ID
503     our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
504     our $SEED_WATCHER;
505    
506     # called before sending the greeting, grabs address we connect to
507     push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
508     defined (my $seed = $_[0]{seed})
509     or return;
510    
511     $SEED_CONNECT{$seed} = 2;
512     };
513    
514     # called after receiving remote greeting, grabs remote node name
515     push @AnyEvent::MP::Transport::HOOK_GREETING, sub {
516     defined (my $seed = $_[0]{seed})
517     or return;
518    
519     # we rely on untrusted data here (the remote node name) this is
520     # hopefully ok, as this can at most be used for DOSing, which is easy
521     # when you can do MITM anyway.
522    
523     # if we connect to ourselves, nuke this seed
524     if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
525 root 1.70 require AnyEvent::MP::Global; # every seed becomes a global node currently
526 root 1.69 delete $SEED_NODE{$_[0]{seed}};
527     delete $NODE_SEED{$_[0]{seed}};
528     } else {
529     $SEED_NODE{$seed} = $_[0]{remote_node};
530     $NODE_SEED{$_[0]{remote_node}} = $seed;
531     }
532     };
533    
534     ## called when connection is up, same as above, but now verified
535     #push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
536     # defined (my $seed = $_[0]{seed})
537     # or return;
538     # AE::log 5, "connected($seed)\n";#d#
539     #
540     # $SEED_NODE{$seed} = $_[0]{remote_node};
541     # $NODE_SEED{$_[0]{remote_node}} = $seed;
542     #};
543    
544     # called when connections get destroyed, update our data structures
545     # and check for self-connects
546     push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
547     # if we lost the connection to a seed node, make sure we start seeding
548     seed_again ()#d#
549     if exists $NODE_SEED{ $_[0]{remote_node} };
550    
551     defined (my $seed = $_[0]{seed})
552     or return;
553    
554     delete $SEED_CONNECT{$seed};
555     };
556    
557     sub seed_connect {
558     my ($seed) = @_;
559    
560     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
561     or Carp::croak "$seed: unparsable seed address";
562    
563     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
564    
565     # ughhh
566     $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
567     seed => $seed,
568     sub {
569     $SEED_CONNECT{$seed} = 1;
570     },
571     ;
572     }
573    
574     sub seed_all {
575     # my $next = List::Util::max 1,
576     # $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
577     # * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
578     # - rand;
579    
580     my @seeds = grep {
581     !exists $SEED_CONNECT{$_}
582     && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
583     } keys %SEED_NODE;
584    
585     if (@seeds) {
586 root 1.70 # start connection attempt for every seed we are not connected to yet
587 root 1.69 seed_connect $_
588     for @seeds;
589     } else {
590     # all seeds connected or connecting
591     undef $SEED_WATCHER;
592     }
593     }
594    
595     sub seed_again {
596     $SEED_WATCHER ||= AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, \&seed_all;
597     }
598    
599     # sets new seed list, starts connecting
600     sub set_seeds(@) {
601     %SEED_NODE = ();
602     @SEED_NODE{@_} = ();
603    
604     seed_again;
605     }
606    
607     #############################################################################
608     # configure
609    
610     sub _nodename {
611     require POSIX;
612     (POSIX::uname ())[1]
613     }
614    
615     sub _resolve($) {
616     my ($nodeid) = @_;
617    
618     my $cv = AE::cv;
619     my @res;
620    
621     $cv->begin (sub {
622     my %seen;
623     my @refs;
624     for (sort { $a->[0] <=> $b->[0] } @res) {
625     push @refs, $_->[1] unless $seen{$_->[1]}++
626     }
627     shift->send (@refs);
628     });
629    
630     my $idx;
631     for my $t (split /,/, $nodeid) {
632     my $pri = ++$idx;
633    
634     $t = length $t ? _nodename . ":$t" : _nodename
635     if $t =~ /^\d*$/;
636    
637     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
638     or Carp::croak "$t: unparsable transport descriptor";
639    
640     $port = "0" if $port eq "*";
641    
642     if ($host eq "*") {
643     $cv->begin;
644     # use fork_call, as Net::Interface is big, and we need it rarely.
645     require AnyEvent::Util;
646     AnyEvent::Util::fork_call (
647     sub {
648     my @addr;
649    
650     require Net::Interface;
651    
652     for my $if (Net::Interface->interfaces) {
653     # we statically lower-prioritise ipv6 here, TODO :()
654     for $_ ($if->address (Net::Interface::AF_INET ())) {
655     next if /^\x7f/; # skip localhost etc.
656     push @addr, $_;
657     }
658     for ($if->address (Net::Interface::AF_INET6 ())) {
659     #next if $if->scope ($_) <= 2;
660     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
661     push @addr, $_;
662     }
663    
664     }
665     @addr
666     }, sub {
667     for my $ip (@_) {
668     push @res, [
669     $pri += 1e-5,
670     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
671     ];
672     }
673     $cv->end;
674     }
675     );
676     } else {
677     $cv->begin;
678     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
679     for (@_) {
680     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
681     push @res, [
682     $pri += 1e-5,
683     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
684     ];
685     }
686     $cv->end;
687     };
688     }
689     }
690    
691     $cv->end;
692    
693     $cv
694     }
695    
696     sub configure(@) {
697     unshift @_, "profile" if @_ & 1;
698     my (%kv) = @_;
699    
700     delete $NODE{$NODE}; # we do not support doing stuff before configure
701     _init_names;
702    
703     my $profile = delete $kv{profile};
704    
705     $profile = _nodename
706     unless defined $profile;
707    
708     $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
709    
710     my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
711    
712     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
713    
714     $NODE = $node
715     unless $node eq "anon/";
716    
717     $NODE{$NODE} = $NODE{""};
718     $NODE{$NODE}{id} = $NODE;
719    
720     my $seeds = $CONFIG->{seeds};
721     my $binds = $CONFIG->{binds};
722    
723     $binds ||= ["*"];
724    
725     $WARN->(8, "node $NODE starting up.");
726    
727     $LISTENER = [];
728     %LISTENER = ();
729    
730     for (map _resolve $_, @$binds) {
731     for my $bind ($_->recv) {
732     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
733     or Carp::croak "$bind: unparsable local bind address";
734    
735     my $listener = AnyEvent::MP::Transport::mp_server
736     $host,
737     $port,
738     prepare => sub {
739     my (undef, $host, $port) = @_;
740     $bind = AnyEvent::Socket::format_hostport $host, $port;
741     0
742     },
743     ;
744     $LISTENER{$bind} = $listener;
745     push @$LISTENER, $bind;
746     }
747     }
748    
749     $WARN->(8, "node listens on [@$LISTENER].");
750    
751     # the global service is mandatory currently
752     require AnyEvent::MP::Global;
753    
754     # connect to all seednodes
755     set_seeds map $_->recv, map _resolve $_, @$seeds;
756    
757     for (@{ $CONFIG->{services} }) {
758     if (ref) {
759     my ($func, @args) = @$_;
760     (load_func $func)->(@args);
761     } elsif (s/::$//) {
762     eval "require $_";
763     die $@ if $@;
764     } else {
765     (load_func $_)->();
766     }
767     }
768     }
769    
770 root 1.1 =back
771    
772     =head1 SEE ALSO
773    
774     L<AnyEvent::MP>.
775    
776     =head1 AUTHOR
777    
778     Marc Lehmann <schmorp@schmorp.de>
779     http://home.schmorp.de/
780    
781     =cut
782    
783     1
784