ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.74
Committed: Thu Mar 1 18:31:42 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.73: +14 -4 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT_OK = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 );
41
42 our @EXPORT = qw(
43 add_node load_func snd_to_func snd_on eval_on
44
45 NODE $NODE node_of snd kil port_is_local
46 configure
47 up_nodes mon_nodes node_is_up
48 );
49
50 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
51
52 This value is called with an error or warning message, when e.g. a
53 connection could not be created, authorisation failed and so on.
54
55 It I<must not> block or send messages -queue it and use an idle watcher if
56 you need to do any of these things.
57
58 C<$level> should be C<0> for messages to be logged always, C<1> for
59 unexpected messages and errors, C<2> for warnings, C<7> for messages about
60 node connectivity and services, C<8> for debugging messages and C<9> for
61 tracing messages.
62
63 The default simply logs the message to STDERR.
64
65 =item @AnyEvent::MP::Kernel::WARN
66
67 All code references in this array are called for every log message, from
68 the default C<$WARN> handler. This is an easy way to tie into the log
69 messages without disturbing others.
70
71 =cut
72
73 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
74 our @WARN;
75 our $WARN = sub {
76 &$_ for @WARN;
77
78 return if $WARNLEVEL < $_[0];
79
80 my ($level, $msg) = @_;
81
82 $msg =~ s/\n$//;
83
84 printf STDERR "%s <%d> %s\n",
85 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
86 $level,
87 $msg;
88 };
89
90 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
91
92 The maximum level at which warning messages will be printed to STDERR by
93 the default warn handler.
94
95 =cut
96
97 sub load_func($) {
98 my $func = $_[0];
99
100 unless (defined &$func) {
101 my $pkg = $func;
102 do {
103 $pkg =~ s/::[^:]+$//
104 or return sub { die "unable to resolve function '$func'" };
105
106 local $@;
107 unless (eval "require $pkg; 1") {
108 my $error = $@;
109 $error =~ /^Can't locate .*.pm in \@INC \(/
110 or return sub { die $error };
111 }
112 } until defined &$func;
113 }
114
115 \&$func
116 }
117
118 sub nonce($) {
119 my $nonce;
120
121 if (open my $fh, "</dev/urandom") {
122 sysread $fh, $nonce, $_[0];
123 } else {
124 # shit...
125 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
126 }
127
128 $nonce
129 }
130
131 sub alnumbits($) {
132 my $data = $_[0];
133
134 if (eval "use Math::GMP 2.05; 1") {
135 $data = Math::GMP::get_str_gmp (
136 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
137 62
138 );
139 } else {
140 $data = MIME::Base64::encode_base64 $data, "";
141 $data =~ s/=//;
142 $data =~ s/x/x0/g;
143 $data =~ s/\//x1/g;
144 $data =~ s/\+/x2/g;
145 }
146
147 $data
148 }
149
150 sub gen_uniq {
151 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
152 }
153
154 our $CONFIG; # this node's configuration
155
156 our $RUNIQ; # remote uniq value
157 our $UNIQ; # per-process/node unique cookie
158 our $NODE;
159 our $ID = "a";
160
161 our %NODE; # node id to transport mapping, or "undef", for local node
162 our (%PORT, %PORT_DATA); # local ports
163
164 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
165 our %LMON; # monitored _local_ ports
166
167 our $GLOBAL; # true if node is a global ("directory") node
168 our %LISTENER;
169 our $LISTENER; # our listeners, as arrayref
170
171 our $SRCNODE; # holds the sending node during _inject
172
173 sub _init_names {
174 $RUNIQ = alnumbits nonce 96/8;
175 $UNIQ = gen_uniq;
176 $NODE = "anon/$RUNIQ";
177 }
178
179 _init_names;
180
181 sub NODE() {
182 $NODE
183 }
184
185 sub node_of($) {
186 my ($node, undef) = split /#/, $_[0], 2;
187
188 $node
189 }
190
191 BEGIN {
192 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
193 ? sub () { 1 }
194 : sub () { 0 };
195 }
196
197 our $DELAY_TIMER;
198 our @DELAY_QUEUE;
199
200 sub _delay_run {
201 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
202 }
203
204 sub delay($) {
205 push @DELAY_QUEUE, shift;
206 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
207 }
208
209 sub _inject {
210 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
211 &{ $PORT{+shift} or return };
212 }
213
214 # this function adds a node-ref, so you can send stuff to it
215 # it is basically the central routing component.
216 sub add_node {
217 my ($node) = @_;
218
219 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
220 }
221
222 sub snd(@) {
223 my ($nodeid, $portid) = split /#/, shift, 2;
224
225 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
226
227 defined $nodeid #d#UGLY
228 or Carp::croak "'undef' is not a valid node ID/port ID";
229
230 ($NODE{$nodeid} || add_node $nodeid)
231 ->{send} (["$portid", @_]);
232 }
233
234 =item $is_local = port_is_local $port
235
236 Returns true iff the port is a local port.
237
238 =cut
239
240 sub port_is_local($) {
241 my ($nodeid, undef) = split /#/, $_[0], 2;
242
243 $NODE{$nodeid} == $NODE{""}
244 }
245
246 =item snd_to_func $node, $func, @args
247
248 Expects a node ID and a name of a function. Asynchronously tries to call
249 this function with the given arguments on that node.
250
251 This function can be used to implement C<spawn>-like interfaces.
252
253 =cut
254
255 sub snd_to_func($$;@) {
256 my $nodeid = shift;
257
258 # on $NODE, we artificially delay... (for spawn)
259 # this is very ugly - maybe we should simply delay ALL messages,
260 # to avoid deep recursion issues. but that's so... slow...
261 $AnyEvent::MP::Node::Self::DELAY = 1
262 if $nodeid ne $NODE;
263
264 defined $nodeid #d#UGLY
265 or Carp::croak "'undef' is not a valid node ID/port ID";
266
267 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
268 }
269
270 =item snd_on $node, @msg
271
272 Executes C<snd> with the given C<@msg> (which must include the destination
273 port) on the given node.
274
275 =cut
276
277 sub snd_on($@) {
278 my $node = shift;
279 snd $node, snd => @_;
280 }
281
282 =item eval_on $node, $string[, @reply]
283
284 Evaluates the given string as Perl expression on the given node. When
285 @reply is specified, then it is used to construct a reply message with
286 C<"$@"> and any results from the eval appended.
287
288 =cut
289
290 sub eval_on($$;@) {
291 my $node = shift;
292 snd $node, eval => @_;
293 }
294
295 sub kil(@) {
296 my ($nodeid, $portid) = split /#/, shift, 2;
297
298 length $portid
299 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
300
301 ($NODE{$nodeid} || add_node $nodeid)
302 ->kill ("$portid", @_);
303 }
304
305 #############################################################################
306 # node monitoring and info
307
308 =item node_is_known $nodeid
309
310 #TODO#
311 Returns true iff the given node is currently known to this node.
312
313 =cut
314
315 sub node_is_known($) {
316 exists $NODE{$_[0]}
317 }
318
319 =item node_is_up $nodeid
320
321 Returns true if the given node is "up", that is, the kernel thinks it has
322 a working connection to it.
323
324 If the node is known (to this local node) but not currently connected,
325 returns C<0>. If the node is not known, returns C<undef>.
326
327 =cut
328
329 sub node_is_up($) {
330 ($NODE{$_[0]} or return)->{transport}
331 ? 1 : 0
332 }
333
334 =item known_nodes
335
336 #TODO#
337 Returns the node IDs of all nodes currently known to this node, including
338 itself and nodes not currently connected.
339
340 =cut
341
342 sub known_nodes() {
343 map $_->{id}, values %NODE
344 }
345
346 =item up_nodes
347
348 Return the node IDs of all nodes that are currently connected (excluding
349 the node itself).
350
351 =cut
352
353 sub up_nodes() {
354 map $_->{id}, grep $_->{transport}, values %NODE
355 }
356
357 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
358
359 Registers a callback that is called each time a node goes up (a connection
360 is established) or down (the connection is lost).
361
362 Node up messages can only be followed by node down messages for the same
363 node, and vice versa.
364
365 Note that monitoring a node is usually better done by monitoring its node
366 port. This function is mainly of interest to modules that are concerned
367 about the network topology and low-level connection handling.
368
369 Callbacks I<must not> block and I<should not> send any messages.
370
371 The function returns an optional guard which can be used to unregister
372 the monitoring callback again.
373
374 Example: make sure you call function C<newnode> for all nodes that are up
375 or go up (and down).
376
377 newnode $_, 1 for up_nodes;
378 mon_nodes \&newnode;
379
380 =cut
381
382 our %MON_NODES;
383
384 sub mon_nodes($) {
385 my ($cb) = @_;
386
387 $MON_NODES{$cb+0} = $cb;
388
389 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
390 }
391
392 sub _inject_nodeevent($$;@) {
393 my ($node, $up, @reason) = @_;
394
395 for my $cb (values %MON_NODES) {
396 eval { $cb->($node->{id}, $up, @reason); 1 }
397 or $WARN->(1, $@);
398 }
399
400 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
401 }
402
403 #############################################################################
404 # self node code
405
406 sub _kill {
407 my $port = shift;
408
409 delete $PORT{$port}
410 or return; # killing nonexistent ports is O.K.
411 delete $PORT_DATA{$port};
412
413 my $mon = delete $LMON{$port}
414 or !@_
415 or $WARN->(2, "unmonitored local port $port died with reason: @_");
416
417 $_->(@_) for values %$mon;
418 }
419
420 sub _monitor {
421 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
422 unless exists $PORT{$_[1]};
423
424 $LMON{$_[1]}{$_[2]+0} = $_[2];
425 }
426
427 sub _unmonitor {
428 delete $LMON{$_[1]}{$_[2]+0}
429 if exists $LMON{$_[1]};
430 }
431
432 our %node_req = (
433 # internal services
434
435 # monitoring
436 mon0 => sub { # stop monitoring a port for another node
437 my $portid = shift;
438 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
439 },
440 mon1 => sub { # start monitoring a port for another node
441 my $portid = shift;
442 Scalar::Util::weaken (my $node = $SRCNODE);
443 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
444 delete $node->{rmon}{$portid};
445 $node->send (["", kil0 => $portid, @_])
446 if $node && $node->{transport};
447 };
448 },
449 # another node has killed a monitored port
450 kil0 => sub {
451 my $cbs = delete $SRCNODE->{lmon}{+shift}
452 or return;
453
454 $_->(@_) for @$cbs;
455 },
456
457 # "public" services - not actually public
458
459 # another node wants to kill a local port
460 kil => \&_kill,
461
462 # relay message to another node / generic echo
463 snd => \&snd,
464 snd_multiple => sub {
465 snd @$_ for @_
466 },
467
468 # informational
469 info => sub {
470 snd @_, $NODE;
471 },
472 known_nodes => sub {
473 snd @_, known_nodes;
474 },
475 up_nodes => sub {
476 snd @_, up_nodes;
477 },
478
479 # random utilities
480 eval => sub {
481 my @res = do { package main; eval shift };
482 snd @_, "$@", @res if @_;
483 },
484 time => sub {
485 snd @_, AE::time;
486 },
487 devnull => sub {
488 #
489 },
490 "" => sub {
491 # empty messages are keepalives or similar devnull-applications
492 },
493 );
494
495 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
496 $PORT{""} = sub {
497 my $tag = shift;
498 eval { &{ $node_req{$tag} ||= load_func $tag } };
499 $WARN->(2, "error processing node message: $@") if $@;
500 };
501
502 #############################################################################
503 # seed management, try to keep connections to all seeds at all times
504
505 our %SEED_NODE; # seed ID => node ID|undef
506 our %NODE_SEED; # map node ID to seed ID
507 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
508 our $SEED_WATCHER;
509 our $SEED_RETRY;
510
511 sub seed_connect {
512 my ($seed) = @_;
513
514 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
515 or Carp::croak "$seed: unparsable seed address";
516
517 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
518
519 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
520 $host, $port,
521 on_greeted => sub {
522 # called after receiving remote greeting, learn remote node name
523
524 # we rely on untrusted data here (the remote node name) this is
525 # hopefully ok, as this can at most be used for DOSing, which is easy
526 # when you can do MITM anyway.
527
528 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
529 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
530 require AnyEvent::MP::Global; # every seed becomes a global node currently
531 delete $SEED_NODE{$seed};
532 delete $NODE_SEED{$seed};
533 } else {
534 $SEED_NODE{$seed} = $_[0]{remote_node};
535 $NODE_SEED{$_[0]{remote_node}} = $seed;
536 }
537 },
538 on_destroy => sub {
539 delete $SEED_CONNECT{$seed};
540 },
541 sub {
542 $SEED_CONNECT{$seed} = 1;
543 }
544 ;
545 }
546
547 sub seed_all {
548 # my $next = List::Util::max 1,
549 # $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
550 # * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
551 # - rand;
552
553 my @seeds = grep {
554 !exists $SEED_CONNECT{$_}
555 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
556 } keys %SEED_NODE;
557
558 if (@seeds) {
559 # start connection attempt for every seed we are not connected to yet
560 seed_connect $_
561 for @seeds;
562
563 $SEED_RETRY = $SEED_RETRY * 2 + rand;
564 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
565 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
566
567 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
568
569 } else {
570 # all seeds connected or connecting, no need to restart timer
571 undef $SEED_WATCHER;
572 }
573 }
574
575 sub seed_again {
576 $SEED_RETRY = 1;
577 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
578 }
579
580 # sets new seed list, starts connecting
581 sub set_seeds(@) {
582 %SEED_NODE = ();
583 %NODE_SEED = ();
584 %SEED_CONNECT = ();
585
586 @SEED_NODE{@_} = ();
587
588 seed_again;#d#
589 seed_all;
590 }
591
592 mon_nodes sub {
593 # if we lost the connection to a seed node, make sure we are seeding
594 seed_again
595 if !$_[1] && exists $NODE_SEED{$_[0]};
596 };
597
598 #############################################################################
599 # talk with/to global nodes
600
601 # protocol messages:
602 #
603 # sent by all slave nodes (slave to master)
604 # g_slave database - make other global node master of the sender
605 #
606 # sent by any node to global nodes
607 # g_set database - set whole database
608 # g_add family key val - add/replace key to database
609 # g_del family key - delete key from database
610 # g_get family key reply... - send reply with data
611 #
612 # send by global nodes
613 # g_global - node became global, similar to global=1 greeting
614 #
615 # database families
616 # "'l" -> node -> listeners
617 # "'g" -> node -> undef
618 # ...
619 #
620
621 # used on all nodes:
622 our $MASTER; # the global node we bind ourselves to, unless we are global ourselves
623 our $GLOBAL_MON;
624 our $GLOBAL_TIMER;
625 our %LOCAL_DB; # this node database
626
627 # used on global nodes:
628 our %GLOBAL_DB; # all local databases, merged
629 our %LOCAL_DBS; # local databases of all global nodes
630 our %GLOBAL_DBS; # global db
631 our %GLOBAL_SLAVE; # nodes that are our slaves
632
633 #our $sv_ne_json = JSON::XS->new->canonical;
634 #
635 #sub sv_ne($$) {
636 # $sv_ne_json->encode ($_[0]) ne $sv_ne_json->encode ($_[1])
637 #}
638
639 # should be only on globals, but...
640 $node_req{g_global} = sub {
641 if ($GLOBAL) {
642 &g_clr ($SRCNODE->{id});
643 $SRCNODE->{transport}{remote_greeting}{global} = 1;
644 delete $GLOBAL_SLAVE{$SRCNODE->{id}};
645 snd $SRCNODE->{id}, g_set => \%LOCAL_DB;
646 }
647 };
648
649 sub other_globals() {
650 grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
651 }
652
653 # local database management
654 sub ldb_set($$;$) {
655 warn "ldb_set<@_>\n";#d#
656 if (@_ > 2) {
657 $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
658 snd $MASTER, g_add => $_[0] => $_[1] => $_[2]
659 if defined $MASTER;
660 } else {
661 delete $LOCAL_DB{$_[0]}{$_[1]};
662 snd $MASTER, g_del => $_[0] => $_[1]
663 if defined $MASTER;
664 }
665 }
666
667 #############################################################################
668 # master selection
669
670 # master requests
671 our %GLOBAL_REQ; # $id => \@req
672
673 sub global_req_add {
674 my $id = shift;
675
676 return if exists $GLOBAL_REQ{$id};
677
678 $GLOBAL_REQ{$id} = [@_];
679
680 snd $MASTER, @_
681 if $MASTER;
682 }
683
684 sub global_req_del {
685 delete $GLOBAL_REQ{$_[0]};
686 }
687
688 sub g_find {
689 global_req_add "g_find $_[0]", g_find => $_[0];
690 }
691
692 # reply for g_find started in Node.pm
693 $node_req{g_found} = sub {
694 global_req_del "g_find $_[0]";
695
696 @{ $_[1] } or return; # d'oh
697
698 my $node = $NODE{$_[0]} or return;
699
700 local $GLOBAL_DB{"'l"}{$_[0]} = $_[1]; #d# UGLY
701
702 delete $node->{connect_w};
703 $node->connect;
704 };
705
706 sub master_set {
707 $MASTER = $_[0];
708
709 snd $MASTER, g_slave => \%LOCAL_DB;
710
711 # (re-)send queued requests
712 snd $MASTER, @$_
713 for values %GLOBAL_REQ;
714 }
715
716 sub master_search {
717 #TODO: should also look for other global nodes, but we don't know them #d#
718 for (keys %NODE_SEED) {
719 if (node_is_up $_) {
720 master_set $_;
721 return;
722 }
723 }
724
725 $GLOBAL_MON = mon_nodes sub {
726 return unless $_[1]; # we are only interested in node-ups
727 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
728
729 master_set $_[0];
730
731 $GLOBAL_MON = mon_nodes sub {
732 if ($_[0] eq $MASTER && !$_[1]) {
733 undef $MASTER;
734 master_search ();
735 }
736 };
737 };
738 }
739
740 # other node wants to make us the master
741 $node_req{g_slave} = sub {
742 my ($db) = @_;
743
744 warn "slave1\n";#d#
745
746 require AnyEvent::MP::Global;
747 &{ $node_req{g_slave} };
748 };
749
750 #$node_req{g_reply} = sub {
751 # my $id = shift;
752 #
753 # my $cb = delete $GLOBAL_REQ{$id}
754 # or return;
755 #
756 # $cb->[0]->(@_);
757 #};
758
759 #############################################################################
760
761 #############################################################################
762
763 # $WARN->(1, "$SRCNODE->{id} treats us as global node, but we aren't");
764
765 #############################################################################
766 # configure
767
768 sub _nodename {
769 require POSIX;
770 (POSIX::uname ())[1]
771 }
772
773 sub _resolve($) {
774 my ($nodeid) = @_;
775
776 my $cv = AE::cv;
777 my @res;
778
779 $cv->begin (sub {
780 my %seen;
781 my @refs;
782 for (sort { $a->[0] <=> $b->[0] } @res) {
783 push @refs, $_->[1] unless $seen{$_->[1]}++
784 }
785 shift->send (@refs);
786 });
787
788 my $idx;
789 for my $t (split /,/, $nodeid) {
790 my $pri = ++$idx;
791
792 $t = length $t ? _nodename . ":$t" : _nodename
793 if $t =~ /^\d*$/;
794
795 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
796 or Carp::croak "$t: unparsable transport descriptor";
797
798 $port = "0" if $port eq "*";
799
800 if ($host eq "*") {
801 $cv->begin;
802 # use fork_call, as Net::Interface is big, and we need it rarely.
803 require AnyEvent::Util;
804 AnyEvent::Util::fork_call (
805 sub {
806 my @addr;
807
808 require Net::Interface;
809
810 for my $if (Net::Interface->interfaces) {
811 # we statically lower-prioritise ipv6 here, TODO :()
812 for $_ ($if->address (Net::Interface::AF_INET ())) {
813 next if /^\x7f/; # skip localhost etc.
814 push @addr, $_;
815 }
816 for ($if->address (Net::Interface::AF_INET6 ())) {
817 #next if $if->scope ($_) <= 2;
818 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
819 push @addr, $_;
820 }
821
822 }
823 @addr
824 }, sub {
825 for my $ip (@_) {
826 push @res, [
827 $pri += 1e-5,
828 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
829 ];
830 }
831 $cv->end;
832 }
833 );
834 } else {
835 $cv->begin;
836 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
837 for (@_) {
838 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
839 push @res, [
840 $pri += 1e-5,
841 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
842 ];
843 }
844 $cv->end;
845 };
846 }
847 }
848
849 $cv->end;
850
851 $cv
852 }
853
854 sub configure(@) {
855 unshift @_, "profile" if @_ & 1;
856 my (%kv) = @_;
857
858 delete $NODE{$NODE}; # we do not support doing stuff before configure
859 _init_names;
860
861 my $profile = delete $kv{profile};
862
863 $profile = _nodename
864 unless defined $profile;
865
866 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
867
868 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
869
870 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
871
872 $NODE = $node;
873 $NODE =~ s%/$%/$RUNIQ%;
874
875 $NODE{$NODE} = $NODE{""};
876 $NODE{$NODE}{id} = $NODE;
877
878 my $seeds = $CONFIG->{seeds};
879 my $binds = $CONFIG->{binds};
880
881 $binds ||= ["*"];
882
883 $WARN->(8, "node $NODE starting up.");
884
885 $LISTENER = [];
886 %LISTENER = ();
887
888 for (map _resolve $_, @$binds) {
889 for my $bind ($_->recv) {
890 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
891 or Carp::croak "$bind: unparsable local bind address";
892
893 my $listener = AnyEvent::MP::Transport::mp_server
894 $host,
895 $port,
896 prepare => sub {
897 my (undef, $host, $port) = @_;
898 $bind = AnyEvent::Socket::format_hostport $host, $port;
899 0
900 },
901 ;
902 $LISTENER{$bind} = $listener;
903 push @$LISTENER, $bind;
904 }
905 }
906
907 ldb_set "'l" => $NODE => $LISTENER;
908
909 $WARN->(8, "node listens on [@$LISTENER].");
910
911 # the global service is mandatory currently
912 #require AnyEvent::MP::Global;
913
914 # connect to all seednodes
915 set_seeds map $_->recv, map _resolve $_, @$seeds;
916
917 master_search;
918
919 if ($NODE eq "atha") {;#d#
920 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
921 }
922
923 for (@{ $CONFIG->{services} }) {
924 if (ref) {
925 my ($func, @args) = @$_;
926 (load_func $func)->(@args);
927 } elsif (s/::$//) {
928 eval "require $_";
929 die $@ if $@;
930 } else {
931 (load_func $_)->();
932 }
933 }
934 }
935
936 =back
937
938 =head1 SEE ALSO
939
940 L<AnyEvent::MP>.
941
942 =head1 AUTHOR
943
944 Marc Lehmann <schmorp@schmorp.de>
945 http://home.schmorp.de/
946
947 =cut
948
949 1
950