ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.76
Committed: Thu Mar 1 19:37:59 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.75: +10 -30 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT_OK = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 );
41
42 our @EXPORT = qw(
43 add_node load_func snd_to_func snd_on eval_on
44
45 NODE $NODE node_of snd kil port_is_local
46 configure
47 up_nodes mon_nodes node_is_up
48 );
49
50 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
51
52 This value is called with an error or warning message, when e.g. a
53 connection could not be created, authorisation failed and so on.
54
55 It I<must not> block or send messages -queue it and use an idle watcher if
56 you need to do any of these things.
57
58 C<$level> should be C<0> for messages to be logged always, C<1> for
59 unexpected messages and errors, C<2> for warnings, C<7> for messages about
60 node connectivity and services, C<8> for debugging messages and C<9> for
61 tracing messages.
62
63 The default simply logs the message to STDERR.
64
65 =item @AnyEvent::MP::Kernel::WARN
66
67 All code references in this array are called for every log message, from
68 the default C<$WARN> handler. This is an easy way to tie into the log
69 messages without disturbing others.
70
71 =cut
72
73 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
74 our @WARN;
75 our $WARN = sub {
76 &$_ for @WARN;
77
78 return if $WARNLEVEL < $_[0];
79
80 my ($level, $msg) = @_;
81
82 $msg =~ s/\n$//;
83
84 printf STDERR "%s <%d> %s\n",
85 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
86 $level,
87 $msg;
88 };
89
90 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
91
92 The maximum level at which warning messages will be printed to STDERR by
93 the default warn handler.
94
95 =cut
96
97 sub load_func($) {
98 my $func = $_[0];
99
100 unless (defined &$func) {
101 my $pkg = $func;
102 do {
103 $pkg =~ s/::[^:]+$//
104 or return sub { die "unable to resolve function '$func'" };
105
106 local $@;
107 unless (eval "require $pkg; 1") {
108 my $error = $@;
109 $error =~ /^Can't locate .*.pm in \@INC \(/
110 or return sub { die $error };
111 }
112 } until defined &$func;
113 }
114
115 \&$func
116 }
117
118 sub nonce($) {
119 my $nonce;
120
121 if (open my $fh, "</dev/urandom") {
122 sysread $fh, $nonce, $_[0];
123 } else {
124 # shit...
125 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
126 }
127
128 $nonce
129 }
130
131 # binary to alphanumeric
132 sub alnumbits($) {
133 (my $data = MIME::Base64::encode_base64 $_[0], "") =~ y%+/=%-_%d;
134 $data
135 }
136
137 sub gen_uniq {
138 # 48 bits happen to fit perfectly into 8 chars
139 # we add one more char to get 54 bits.
140 substr +(alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 3), 0, 9
141 }
142
143 our $CONFIG; # this node's configuration
144
145 our $RUNIQ; # remote uniq value
146 our $UNIQ; # per-process/node unique cookie
147 our $NODE;
148 our $ID = "a";
149
150 our %NODE; # node id to transport mapping, or "undef", for local node
151 our (%PORT, %PORT_DATA); # local ports
152
153 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
154 our %LMON; # monitored _local_ ports
155
156 our $GLOBAL; # true if node is a global ("directory") node
157 our %LISTENER;
158 our $LISTENER; # our listeners, as arrayref
159
160 our $SRCNODE; # holds the sending node _object_ during _inject
161
162 sub _init_names {
163 $UNIQ = gen_uniq;
164 $RUNIQ = substr +(alnumbits nonce 64/8), 0, 10; # for remote port names &c, should be longer than $UNIQ
165 $NODE = "anon/$RUNIQ";
166 }
167
168 _init_names;
169
170 sub NODE() {
171 $NODE
172 }
173
174 sub node_of($) {
175 my ($node, undef) = split /#/, $_[0], 2;
176
177 $node
178 }
179
180 BEGIN {
181 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
182 ? sub () { 1 }
183 : sub () { 0 };
184 }
185
186 our $DELAY_TIMER;
187 our @DELAY_QUEUE;
188
189 sub _delay_run {
190 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
191 }
192
193 sub delay($) {
194 push @DELAY_QUEUE, shift;
195 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
196 }
197
198 sub _inject {
199 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
200 &{ $PORT{+shift} or return };
201 }
202
203 # this function adds a node-ref, so you can send stuff to it
204 # it is basically the central routing component.
205 sub add_node {
206 my ($node) = @_;
207
208 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
209 }
210
211 sub snd(@) {
212 my ($nodeid, $portid) = split /#/, shift, 2;
213
214 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
215
216 defined $nodeid #d#UGLY
217 or Carp::croak "'undef' is not a valid node ID/port ID";
218
219 ($NODE{$nodeid} || add_node $nodeid)
220 ->{send} (["$portid", @_]);
221 }
222
223 =item $is_local = port_is_local $port
224
225 Returns true iff the port is a local port.
226
227 =cut
228
229 sub port_is_local($) {
230 my ($nodeid, undef) = split /#/, $_[0], 2;
231
232 $NODE{$nodeid} == $NODE{""}
233 }
234
235 =item snd_to_func $node, $func, @args
236
237 Expects a node ID and a name of a function. Asynchronously tries to call
238 this function with the given arguments on that node.
239
240 This function can be used to implement C<spawn>-like interfaces.
241
242 =cut
243
244 sub snd_to_func($$;@) {
245 my $nodeid = shift;
246
247 # on $NODE, we artificially delay... (for spawn)
248 # this is very ugly - maybe we should simply delay ALL messages,
249 # to avoid deep recursion issues. but that's so... slow...
250 $AnyEvent::MP::Node::Self::DELAY = 1
251 if $nodeid ne $NODE;
252
253 defined $nodeid #d#UGLY
254 or Carp::croak "'undef' is not a valid node ID/port ID";
255
256 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
257 }
258
259 =item snd_on $node, @msg
260
261 Executes C<snd> with the given C<@msg> (which must include the destination
262 port) on the given node.
263
264 =cut
265
266 sub snd_on($@) {
267 my $node = shift;
268 snd $node, snd => @_;
269 }
270
271 =item eval_on $node, $string[, @reply]
272
273 Evaluates the given string as Perl expression on the given node. When
274 @reply is specified, then it is used to construct a reply message with
275 C<"$@"> and any results from the eval appended.
276
277 =cut
278
279 sub eval_on($$;@) {
280 my $node = shift;
281 snd $node, eval => @_;
282 }
283
284 sub kil(@) {
285 my ($nodeid, $portid) = split /#/, shift, 2;
286
287 length $portid
288 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
289
290 ($NODE{$nodeid} || add_node $nodeid)
291 ->kill ("$portid", @_);
292 }
293
294 #############################################################################
295 # node monitoring and info
296
297 =item node_is_known $nodeid
298
299 #TODO#
300 Returns true iff the given node is currently known to this node.
301
302 =cut
303
304 sub node_is_known($) {
305 exists $NODE{$_[0]}
306 }
307
308 =item node_is_up $nodeid
309
310 Returns true if the given node is "up", that is, the kernel thinks it has
311 a working connection to it.
312
313 If the node is known (to this local node) but not currently connected,
314 returns C<0>. If the node is not known, returns C<undef>.
315
316 =cut
317
318 sub node_is_up($) {
319 ($NODE{$_[0]} or return)->{transport}
320 ? 1 : 0
321 }
322
323 =item known_nodes
324
325 #TODO#
326 Returns the node IDs of all nodes currently known to this node, including
327 itself and nodes not currently connected.
328
329 =cut
330
331 sub known_nodes() {
332 map $_->{id}, values %NODE
333 }
334
335 =item up_nodes
336
337 Return the node IDs of all nodes that are currently connected (excluding
338 the node itself).
339
340 =cut
341
342 sub up_nodes() {
343 map $_->{id}, grep $_->{transport}, values %NODE
344 }
345
346 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
347
348 Registers a callback that is called each time a node goes up (a connection
349 is established) or down (the connection is lost).
350
351 Node up messages can only be followed by node down messages for the same
352 node, and vice versa.
353
354 Note that monitoring a node is usually better done by monitoring its node
355 port. This function is mainly of interest to modules that are concerned
356 about the network topology and low-level connection handling.
357
358 Callbacks I<must not> block and I<should not> send any messages.
359
360 The function returns an optional guard which can be used to unregister
361 the monitoring callback again.
362
363 Example: make sure you call function C<newnode> for all nodes that are up
364 or go up (and down).
365
366 newnode $_, 1 for up_nodes;
367 mon_nodes \&newnode;
368
369 =cut
370
371 our %MON_NODES;
372
373 sub mon_nodes($) {
374 my ($cb) = @_;
375
376 $MON_NODES{$cb+0} = $cb;
377
378 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
379 }
380
381 sub _inject_nodeevent($$;@) {
382 my ($node, $up, @reason) = @_;
383
384 for my $cb (values %MON_NODES) {
385 eval { $cb->($node->{id}, $up, @reason); 1 }
386 or $WARN->(1, $@);
387 }
388
389 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
390 }
391
392 #############################################################################
393 # self node code
394
395 sub _kill {
396 my $port = shift;
397
398 delete $PORT{$port}
399 or return; # killing nonexistent ports is O.K.
400 delete $PORT_DATA{$port};
401
402 my $mon = delete $LMON{$port}
403 or !@_
404 or $WARN->(2, "unmonitored local port $port died with reason: @_");
405
406 $_->(@_) for values %$mon;
407 }
408
409 sub _monitor {
410 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
411 unless exists $PORT{$_[1]};
412
413 $LMON{$_[1]}{$_[2]+0} = $_[2];
414 }
415
416 sub _unmonitor {
417 delete $LMON{$_[1]}{$_[2]+0}
418 if exists $LMON{$_[1]};
419 }
420
421 our %node_req = (
422 # internal services
423
424 # monitoring
425 mon0 => sub { # stop monitoring a port for another node
426 my $portid = shift;
427 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
428 },
429 mon1 => sub { # start monitoring a port for another node
430 my $portid = shift;
431 Scalar::Util::weaken (my $node = $SRCNODE);
432 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
433 delete $node->{rmon}{$portid};
434 $node->send (["", kil0 => $portid, @_])
435 if $node && $node->{transport};
436 };
437 },
438 # another node has killed a monitored port
439 kil0 => sub {
440 my $cbs = delete $SRCNODE->{lmon}{+shift}
441 or return;
442
443 $_->(@_) for @$cbs;
444 },
445
446 # "public" services - not actually public
447
448 # another node wants to kill a local port
449 kil => \&_kill,
450
451 # relay message to another node / generic echo
452 snd => \&snd,
453 snd_multiple => sub {
454 snd @$_ for @_
455 },
456
457 # random utilities
458 eval => sub {
459 #d#SECURE
460 my @res = do { package main; eval shift };
461 snd @_, "$@", @res if @_;
462 },
463 time => sub {
464 snd @_, AE::now;
465 },
466 devnull => sub {
467 #
468 },
469 "" => sub {
470 # empty messages are keepalives or similar devnull-applications
471 },
472 );
473
474 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
475 $PORT{""} = sub {
476 my $tag = shift;
477 #d#SECURE (load_func)
478 eval { &{ $node_req{$tag} ||= load_func $tag } };
479 $WARN->(2, "error processing node message: $@") if $@;
480 };
481
482 #############################################################################
483 # seed management, try to keep connections to all seeds at all times
484
485 our %SEED_NODE; # seed ID => node ID|undef
486 our %NODE_SEED; # map node ID to seed ID
487 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
488 our $SEED_WATCHER;
489 our $SEED_RETRY;
490
491 sub seed_connect {
492 my ($seed) = @_;
493
494 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
495 or Carp::croak "$seed: unparsable seed address";
496
497 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
498
499 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
500 $host, $port,
501 on_greeted => sub {
502 # called after receiving remote greeting, learn remote node name
503
504 # we rely on untrusted data here (the remote node name) this is
505 # hopefully ok, as this can at most be used for DOSing, which is easy
506 # when you can do MITM anyway.
507
508 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
509 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
510 require AnyEvent::MP::Global; # every seed becomes a global node currently
511 delete $SEED_NODE{$seed};
512 delete $NODE_SEED{$seed};
513 } else {
514 $SEED_NODE{$seed} = $_[0]{remote_node};
515 $NODE_SEED{$_[0]{remote_node}} = $seed;
516 }
517 },
518 on_destroy => sub {
519 delete $SEED_CONNECT{$seed};
520 },
521 sub {
522 $SEED_CONNECT{$seed} = 1;
523 }
524 ;
525 }
526
527 sub seed_all {
528 # my $next = List::Util::max 1,
529 # $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
530 # * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
531 # - rand;
532
533 my @seeds = grep {
534 !exists $SEED_CONNECT{$_}
535 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
536 } keys %SEED_NODE;
537
538 if (@seeds) {
539 # start connection attempt for every seed we are not connected to yet
540 seed_connect $_
541 for @seeds;
542
543 $SEED_RETRY = $SEED_RETRY * 2 + rand;
544 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
545 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
546
547 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
548
549 } else {
550 # all seeds connected or connecting, no need to restart timer
551 undef $SEED_WATCHER;
552 }
553 }
554
555 sub seed_again {
556 $SEED_RETRY = 1;
557 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
558 }
559
560 # sets new seed list, starts connecting
561 sub set_seeds(@) {
562 %SEED_NODE = ();
563 %NODE_SEED = ();
564 %SEED_CONNECT = ();
565
566 @SEED_NODE{@_} = ();
567
568 seed_again;#d#
569 seed_all;
570 }
571
572 mon_nodes sub {
573 # if we lost the connection to a seed node, make sure we are seeding
574 seed_again
575 if !$_[1] && exists $NODE_SEED{$_[0]};
576 };
577
578 #############################################################################
579 # talk with/to global nodes
580
581 # protocol messages:
582 #
583 # sent by all slave nodes (slave to master)
584 # g_slave database - make other global node master of the sender
585 #
586 # sent by any node to global nodes
587 # g_set database - set whole database
588 # g_add family key val - add/replace key to database
589 # g_del family key - delete key from database
590 # g_get family key reply... - send reply with data
591 #
592 # send by global nodes
593 # g_global - node became global, similar to global=1 greeting
594 #
595 # database families
596 # "'l" -> node -> listeners
597 # "'g" -> node -> undef
598 # ...
599 #
600
601 # used on all nodes:
602 our $MASTER; # the global node we bind ourselves to, unless we are global ourselves
603 our $GLOBAL_MON;
604 our $GLOBAL_TIMER;
605 our %LOCAL_DB; # this node database
606
607 # used on global nodes:
608 our %GLOBAL_DB; # all local databases, merged
609 our %LOCAL_DBS; # local databases of all global nodes
610 our %GLOBAL_DBS; # global db
611 our %GLOBAL_SLAVE; # nodes that are our slaves
612
613 #our $sv_ne_json = JSON::XS->new->canonical;
614 #
615 #sub sv_ne($$) {
616 # $sv_ne_json->encode ($_[0]) ne $sv_ne_json->encode ($_[1])
617 #}
618
619 # should be only on globals, but...
620 $node_req{g_global} = sub {
621 if ($GLOBAL) {
622 &g_clr ($SRCNODE->{id});
623 $SRCNODE->{transport}{remote_greeting}{global} = 1;
624 delete $GLOBAL_SLAVE{$SRCNODE->{id}};
625 snd $SRCNODE->{id}, g_set => \%LOCAL_DB;
626 }
627 };
628
629 sub other_globals() {
630 grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
631 }
632
633 # local database management
634 sub ldb_set($$;$) {
635 warn "ldb_set<@_>\n";#d#
636 if (@_ > 2) {
637 $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
638 snd $MASTER, g_add => $_[0] => $_[1] => $_[2]
639 if defined $MASTER;
640 } else {
641 delete $LOCAL_DB{$_[0]}{$_[1]};
642 snd $MASTER, g_del => $_[0] => $_[1]
643 if defined $MASTER;
644 }
645 }
646
647 #############################################################################
648 # master selection
649
650 # master requests
651 our %GLOBAL_REQ; # $id => \@req
652
653 sub global_req_add {
654 my $id = shift;
655
656 return if exists $GLOBAL_REQ{$id};
657
658 $GLOBAL_REQ{$id} = [@_];
659
660 snd $MASTER, @_
661 if $MASTER;
662 }
663
664 sub global_req_del {
665 delete $GLOBAL_REQ{$_[0]};
666 }
667
668 sub g_find {
669 global_req_add "g_find $_[0]", g_find => $_[0];
670 }
671
672 # reply for g_find started in Node.pm
673 $node_req{g_found} = sub {
674 global_req_del "g_find $_[0]";
675
676 @{ $_[1] } or return; # d'oh
677
678 my $node = $NODE{$_[0]} or return;
679
680 local $GLOBAL_DB{"'l"}{$_[0]} = $_[1]; #d# UGLY
681 $node->connect;
682 };
683
684 sub master_set {
685 $MASTER = $_[0];
686
687 snd $MASTER, g_slave => \%LOCAL_DB;
688
689 # (re-)send queued requests
690 snd $MASTER, @$_
691 for values %GLOBAL_REQ;
692 }
693
694 sub master_search {
695 #TODO: should also look for other global nodes, but we don't know them #d#
696 for (keys %NODE_SEED) {
697 if (node_is_up $_) {
698 master_set $_;
699 return;
700 }
701 }
702
703 $GLOBAL_MON = mon_nodes sub {
704 return unless $_[1]; # we are only interested in node-ups
705 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
706
707 master_set $_[0];
708
709 $GLOBAL_MON = mon_nodes sub {
710 if ($_[0] eq $MASTER && !$_[1]) {
711 undef $MASTER;
712 master_search ();
713 }
714 };
715 };
716 }
717
718 # other node wants to make us the master
719 $node_req{g_slave} = sub {
720 my ($db) = @_;
721
722 warn "slave1\n";#d#
723
724 require AnyEvent::MP::Global;
725 &{ $node_req{g_slave} };
726 };
727
728 #$node_req{g_reply} = sub {
729 # my $id = shift;
730 #
731 # my $cb = delete $GLOBAL_REQ{$id}
732 # or return;
733 #
734 # $cb->[0]->(@_);
735 #};
736
737 #############################################################################
738
739 #############################################################################
740
741 # $WARN->(1, "$SRCNODE->{id} treats us as global node, but we aren't");
742
743 #############################################################################
744 # configure
745
746 sub _nodename {
747 require POSIX;
748 (POSIX::uname ())[1]
749 }
750
751 sub _resolve($) {
752 my ($nodeid) = @_;
753
754 my $cv = AE::cv;
755 my @res;
756
757 $cv->begin (sub {
758 my %seen;
759 my @refs;
760 for (sort { $a->[0] <=> $b->[0] } @res) {
761 push @refs, $_->[1] unless $seen{$_->[1]}++
762 }
763 shift->send (@refs);
764 });
765
766 my $idx;
767 for my $t (split /,/, $nodeid) {
768 my $pri = ++$idx;
769
770 $t = length $t ? _nodename . ":$t" : _nodename
771 if $t =~ /^\d*$/;
772
773 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
774 or Carp::croak "$t: unparsable transport descriptor";
775
776 $port = "0" if $port eq "*";
777
778 if ($host eq "*") {
779 $cv->begin;
780 # use fork_call, as Net::Interface is big, and we need it rarely.
781 require AnyEvent::Util;
782 AnyEvent::Util::fork_call (
783 sub {
784 my @addr;
785
786 require Net::Interface;
787
788 for my $if (Net::Interface->interfaces) {
789 # we statically lower-prioritise ipv6 here, TODO :()
790 for $_ ($if->address (Net::Interface::AF_INET ())) {
791 next if /^\x7f/; # skip localhost etc.
792 push @addr, $_;
793 }
794 for ($if->address (Net::Interface::AF_INET6 ())) {
795 #next if $if->scope ($_) <= 2;
796 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
797 push @addr, $_;
798 }
799
800 }
801 @addr
802 }, sub {
803 for my $ip (@_) {
804 push @res, [
805 $pri += 1e-5,
806 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
807 ];
808 }
809 $cv->end;
810 }
811 );
812 } else {
813 $cv->begin;
814 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
815 for (@_) {
816 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
817 push @res, [
818 $pri += 1e-5,
819 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
820 ];
821 }
822 $cv->end;
823 };
824 }
825 }
826
827 $cv->end;
828
829 $cv
830 }
831
832 sub configure(@) {
833 unshift @_, "profile" if @_ & 1;
834 my (%kv) = @_;
835
836 delete $NODE{$NODE}; # we do not support doing stuff before configure
837 _init_names;
838
839 my $profile = delete $kv{profile};
840
841 $profile = _nodename
842 unless defined $profile;
843
844 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
845
846 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
847
848 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
849
850 $NODE = $node;
851 $NODE =~ s%/$%/$RUNIQ%;
852
853 $NODE{$NODE} = $NODE{""};
854 $NODE{$NODE}{id} = $NODE;
855
856 my $seeds = $CONFIG->{seeds};
857 my $binds = $CONFIG->{binds};
858
859 $binds ||= ["*"];
860
861 $WARN->(8, "node $NODE starting up.");
862
863 $LISTENER = [];
864 %LISTENER = ();
865
866 for (map _resolve $_, @$binds) {
867 for my $bind ($_->recv) {
868 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
869 or Carp::croak "$bind: unparsable local bind address";
870
871 my $listener = AnyEvent::MP::Transport::mp_server
872 $host,
873 $port,
874 prepare => sub {
875 my (undef, $host, $port) = @_;
876 $bind = AnyEvent::Socket::format_hostport $host, $port;
877 0
878 },
879 ;
880 $LISTENER{$bind} = $listener;
881 push @$LISTENER, $bind;
882 }
883 }
884
885 ldb_set "'l" => $NODE => $LISTENER;
886
887 $WARN->(8, "node listens on [@$LISTENER].");
888
889 # the global service is mandatory currently
890 #require AnyEvent::MP::Global;
891
892 # connect to all seednodes
893 set_seeds map $_->recv, map _resolve $_, @$seeds;
894
895 master_search;
896
897 if ($NODE eq "atha") {;#d#
898 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
899 }
900
901 for (@{ $CONFIG->{services} }) {
902 if (ref) {
903 my ($func, @args) = @$_;
904 (load_func $func)->(@args);
905 } elsif (s/::$//) {
906 eval "require $_";
907 die $@ if $@;
908 } else {
909 (load_func $_)->();
910 }
911 }
912 }
913
914 =back
915
916 =head1 SEE ALSO
917
918 L<AnyEvent::MP>.
919
920 =head1 AUTHOR
921
922 Marc Lehmann <schmorp@schmorp.de>
923 http://home.schmorp.de/
924
925 =cut
926
927 1
928