ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.100
Committed: Thu Mar 22 01:24:26 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.99: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29
30 use AnyEvent ();
31 use Guard ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT_OK = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 );
41
42 our @EXPORT = qw(
43 add_node load_func snd_to_func snd_on eval_on
44
45 NODE $NODE node_of snd kil port_is_local
46 configure
47 up_nodes mon_nodes node_is_up
48 db_set db_del
49 db_mon db_family db_keys db_values
50 );
51
52 sub load_func($) {
53 my $func = $_[0];
54
55 unless (defined &$func) {
56 my $pkg = $func;
57 do {
58 $pkg =~ s/::[^:]+$//
59 or return sub { die "unable to resolve function '$func'" };
60
61 local $@;
62 unless (eval "require $pkg; 1") {
63 my $error = $@;
64 $error =~ /^Can't locate .*.pm in \@INC \(/
65 or return sub { die $error };
66 }
67 } until defined &$func;
68 }
69
70 \&$func
71 }
72
73 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
74
75 sub nonce($) {
76 join "", map chr rand 256, 1 .. $_[0]
77 }
78
79 sub nonce62($) {
80 join "", map $alnum[rand 62], 1 .. $_[0]
81 }
82
83 sub gen_uniq {
84 my $now = AE::now;
85 (join "",
86 map $alnum[$_],
87 $$ / 62 % 62,
88 $$ % 62,
89 (int $now ) % 62,
90 (int $now * 100) % 62,
91 (int $now * 10000) % 62,
92 ) . nonce62 4;
93 }
94
95 our $CONFIG; # this node's configuration
96 our $SECURE = sub { 1 };
97
98 our $RUNIQ; # remote uniq value
99 our $UNIQ; # per-process/node unique cookie
100 our $NODE;
101 our $ID = "a";
102
103 our %NODE; # node id to transport mapping, or "undef", for local node
104 our (%PORT, %PORT_DATA); # local ports
105
106 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
107 our %LMON; # monitored _local_ ports
108
109 our $GLOBAL; # true if node is a global ("directory") node
110 our %BINDS;
111 our $BINDS; # our listeners, as arrayref
112
113 our $SRCNODE; # holds the sending node _object_ during _inject
114
115 sub _init_names {
116 # ~54 bits, for local port names, lowercase $ID appended
117 $UNIQ = gen_uniq;
118
119 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
120 $RUNIQ = nonce62 10;
121 $RUNIQ =~ s/(.)$/\U$1/;
122
123 $NODE = "anon/$RUNIQ";
124 }
125
126 _init_names;
127
128 sub NODE() {
129 $NODE
130 }
131
132 sub node_of($) {
133 my ($node, undef) = split /#/, $_[0], 2;
134
135 $node
136 }
137
138 BEGIN {
139 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
140 ? sub () { 1 }
141 : sub () { 0 };
142 }
143
144 our $DELAY_TIMER;
145 our @DELAY_QUEUE;
146
147 our $delay_run = sub {
148 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
149 };
150
151 sub delay($) {
152 push @DELAY_QUEUE, shift;
153 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
154 }
155
156 =item $AnyEvent::MP::Kernel::SRCNODE
157
158 During execution of a message callback, this variable contains the node ID
159 of the origin node.
160
161 The main use of this variable is for debugging output - there are probably
162 very few other cases where you need to know the source node ID.
163
164 =cut
165
166 sub _inject {
167 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
168 &{ $PORT{+shift} or return };
169 }
170
171 # this function adds a node-ref, so you can send stuff to it
172 # it is basically the central routing component.
173 sub add_node {
174 my ($node) = @_;
175
176 length $node
177 or Carp::croak "'undef' or the empty string are not valid node/port IDs";
178
179 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
180 }
181
182 sub snd(@) {
183 my ($nodeid, $portid) = split /#/, shift, 2;
184
185 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
186
187 ($NODE{$nodeid} || add_node $nodeid)
188 ->{send} (["$portid", @_]);
189 }
190
191 =item $is_local = port_is_local $port
192
193 Returns true iff the port is a local port.
194
195 =cut
196
197 sub port_is_local($) {
198 my ($nodeid, undef) = split /#/, $_[0], 2;
199
200 $NODE{$nodeid} == $NODE{""}
201 }
202
203 =item snd_to_func $node, $func, @args
204
205 Expects a node ID and a name of a function. Asynchronously tries to call
206 this function with the given arguments on that node.
207
208 This function can be used to implement C<spawn>-like interfaces.
209
210 =cut
211
212 sub snd_to_func($$;@) {
213 my $nodeid = shift;
214
215 # on $NODE, we artificially delay... (for spawn)
216 # this is very ugly - maybe we should simply delay ALL messages,
217 # to avoid deep recursion issues. but that's so... slow...
218 $AnyEvent::MP::Node::Self::DELAY = 1
219 if $nodeid ne $NODE;
220
221 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
222 }
223
224 =item snd_on $node, @msg
225
226 Executes C<snd> with the given C<@msg> (which must include the destination
227 port) on the given node.
228
229 =cut
230
231 sub snd_on($@) {
232 my $node = shift;
233 snd $node, snd => @_;
234 }
235
236 =item eval_on $node, $string[, @reply]
237
238 Evaluates the given string as Perl expression on the given node. When
239 @reply is specified, then it is used to construct a reply message with
240 C<"$@"> and any results from the eval appended.
241
242 =cut
243
244 sub eval_on($$;@) {
245 my $node = shift;
246 snd $node, eval => @_;
247 }
248
249 sub kil(@) {
250 my ($nodeid, $portid) = split /#/, shift, 2;
251
252 length $portid
253 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
254
255 ($NODE{$nodeid} || add_node $nodeid)
256 ->kill ("$portid", @_);
257 }
258
259 #############################################################################
260 # node monitoring and info
261
262 =item node_is_up $nodeid
263
264 Returns true if the given node is "up", that is, the kernel thinks it has
265 a working connection to it.
266
267 If the node is up, returns C<1>. If the node is currently connecting or
268 otherwise known but not connected, returns C<0>. If nothing is known about
269 the node, returns C<undef>.
270
271 =cut
272
273 sub node_is_up($) {
274 ($NODE{$_[0]} or return)->{transport}
275 ? 1 : 0
276 }
277
278 =item up_nodes
279
280 Return the node IDs of all nodes that are currently connected (excluding
281 the node itself).
282
283 =cut
284
285 sub up_nodes() {
286 map $_->{id}, grep $_->{transport}, values %NODE
287 }
288
289 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
290
291 Registers a callback that is called each time a node goes up (a connection
292 is established) or down (the connection is lost).
293
294 Node up messages can only be followed by node down messages for the same
295 node, and vice versa.
296
297 Note that monitoring a node is usually better done by monitoring its node
298 port. This function is mainly of interest to modules that are concerned
299 about the network topology and low-level connection handling.
300
301 Callbacks I<must not> block and I<should not> send any messages.
302
303 The function returns an optional guard which can be used to unregister
304 the monitoring callback again.
305
306 Example: make sure you call function C<newnode> for all nodes that are up
307 or go up (and down).
308
309 newnode $_, 1 for up_nodes;
310 mon_nodes \&newnode;
311
312 =cut
313
314 our %MON_NODES;
315
316 sub mon_nodes($) {
317 my ($cb) = @_;
318
319 $MON_NODES{$cb+0} = $cb;
320
321 defined wantarray
322 and Guard::guard { delete $MON_NODES{$cb+0} }
323 }
324
325 sub _inject_nodeevent($$;@) {
326 my ($node, $up, @reason) = @_;
327
328 for my $cb (values %MON_NODES) {
329 eval { $cb->($node->{id}, $up, @reason); 1 }
330 or AE::log die => $@;
331 }
332
333 AE::log 7 => "$node->{id} is " . ($up ? "up" : "down") . " (@reason).";
334 }
335
336 #############################################################################
337 # self node code
338
339 sub _kill {
340 my $port = shift;
341
342 delete $PORT{$port}
343 or return; # killing nonexistent ports is O.K.
344 delete $PORT_DATA{$port};
345
346 my $mon = delete $LMON{$port}
347 or !@_
348 or AE::log die => "unmonitored local port $port died with reason: @_";
349
350 $_->(@_) for values %$mon;
351 }
352
353 sub _monitor {
354 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
355 unless exists $PORT{$_[1]};
356
357 $LMON{$_[1]}{$_[2]+0} = $_[2];
358 }
359
360 sub _unmonitor {
361 delete $LMON{$_[1]}{$_[2]+0}
362 if exists $LMON{$_[1]};
363 }
364
365 sub _secure_check {
366 &$SECURE
367 or die "remote execution attempt by insecure node\n";
368 }
369
370 our %NODE_REQ = (
371 # internal services
372
373 # monitoring
374 mon0 => sub { # stop monitoring a port for another node
375 my $portid = shift;
376 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
377 },
378 mon1 => sub { # start monitoring a port for another node
379 my $portid = shift;
380 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
381 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
382 delete $node->{rmon}{$portid};
383 $node->send (["", kil0 => $portid, @_])
384 if $node && $node->{transport};
385 };
386 },
387 # another node has killed a monitored port
388 kil0 => sub {
389 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
390 or return;
391
392 $_->(@_) for @$cbs;
393 },
394
395 # "public" services - not actually public
396
397 # another node wants to kill a local port
398 kil => \&_kill,
399
400 # relay message to another node / generic echo
401 snd => sub {
402 &_secure_check;
403 &snd
404 },
405
406 # random utilities
407 eval => sub {
408 &_secure_check;
409 my @res = do { package main; eval shift };
410 snd @_, "$@", @res if @_;
411 },
412 time => sub {
413 &_secure_check;
414 snd @_, AE::now;
415 },
416 devnull => sub {
417 #
418 },
419 "" => sub {
420 # empty messages are keepalives or similar devnull-applications
421 },
422 );
423
424 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
425 $PORT{""} = sub {
426 my $tag = shift;
427 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
428 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
429 };
430
431 our $NPROTO = 1;
432
433 # tell everybody who connects our nproto
434 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
435 $_[0]{local_greeting}{nproto} = $NPROTO;
436 };
437
438 #############################################################################
439 # seed management, try to keep connections to all seeds at all times
440
441 our %SEED_NODE; # seed ID => node ID|undef
442 our %NODE_SEED; # map node ID to seed ID
443 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
444 our $SEED_WATCHER;
445 our $SEED_RETRY;
446
447 sub seed_connect {
448 my ($seed) = @_;
449
450 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
451 or Carp::croak "$seed: unparsable seed address";
452
453 AE::log 9 => "trying connect to seed node $seed.";
454
455 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
456 $host, $port,
457 on_greeted => sub {
458 # called after receiving remote greeting, learn remote node name
459
460 # we rely on untrusted data here (the remote node name) this is
461 # hopefully ok, as this can at most be used for DOSing, which is easy
462 # when you can do MITM anyway.
463
464 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
465 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
466 require AnyEvent::MP::Global; # every seed becomes a global node currently
467 delete $SEED_NODE{$seed};
468 } else {
469 $SEED_NODE{$seed} = $_[0]{remote_node};
470 $NODE_SEED{$_[0]{remote_node}} = $seed;
471 # also start global service, if not running
472 # we need to check here in addition to the mon_nodes below
473 # because we might only learn late that a node is a seed
474 # and then we might already be connected
475 snd $_[0]{remote_node}, "g_slave"
476 unless $_[0]{remote_greeting}{global};
477 }
478 },
479 sub {
480 delete $SEED_CONNECT{$seed};
481 }
482 ;
483 }
484
485 sub seed_all {
486 my @seeds = grep
487 !exists $SEED_CONNECT{$_}
488 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
489 keys %SEED_NODE;
490
491 if (@seeds) {
492 # start connection attempt for every seed we are not connected to yet
493 seed_connect $_
494 for @seeds;
495
496 $SEED_RETRY = $SEED_RETRY * 2 + rand;
497 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
498 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
499
500 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
501
502 } else {
503 # all seeds connected or connecting, no need to restart timer
504 undef $SEED_WATCHER;
505 }
506 }
507
508 sub seed_again {
509 $SEED_RETRY = 1;
510 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
511 }
512
513 # sets new seed list, starts connecting
514 sub set_seeds(@) {
515 %SEED_NODE = ();
516 %NODE_SEED = ();
517 %SEED_CONNECT = ();
518
519 @SEED_NODE{@_} = ();
520
521 seed_all;
522 }
523
524 mon_nodes sub {
525 return unless exists $NODE_SEED{$_[0]};
526
527 if ($_[1]) {
528 # each time a connection to a seed node goes up, make
529 # sure it runs the global service.
530 snd $_[0], "g_slave"
531 unless $NODE{$_[0]}{transport}{remote_greeting}{global};
532 } else {
533 # if we lost the connection to a seed node, make sure we are seeding
534 seed_again;
535 }
536 };
537
538 #############################################################################
539 # talk with/to global nodes
540
541 # protocol messages:
542 #
543 # sent by all slave nodes (slave to master)
544 # g_slave database - make other global node master of the sender
545 #
546 # sent by any node to global nodes
547 # g_set database - set whole database
548 # g_upd family set del - update single family
549 # g_del family key - delete key from database
550 # g_get family key reply... - send reply with data
551 #
552 # send by global nodes
553 # g_global - node became global, similar to global=1 greeting
554 #
555 # database families
556 # "'l" -> node -> listeners
557 # "'g" -> node -> undef
558 # ...
559 #
560
561 # used on all nodes:
562 our $MASTER; # the global node we bind ourselves to
563 our $MASTER_MON;
564 our %LOCAL_DB; # this node database
565
566 our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
567
568 our $GPROTO = 1;
569
570 # tell everybody who connects our nproto
571 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
572 $_[0]{local_greeting}{gproto} = $GPROTO;
573 };
574
575 #############################################################################
576 # master selection
577
578 # master requests
579 our %GLOBAL_REQ; # $id => \@req
580
581 sub global_req_add {
582 my ($id, $req) = @_;
583
584 return if exists $GLOBAL_REQ{$id};
585
586 $GLOBAL_REQ{$id} = $req;
587
588 snd $MASTER, @$req
589 if $MASTER;
590 }
591
592 sub global_req_del {
593 delete $GLOBAL_REQ{$_[0]};
594 }
595
596 #################################
597 # master rpc
598
599 our %GLOBAL_RES;
600 our $GLOBAL_RES_ID = "a";
601
602 sub global_call {
603 my $id = ++$GLOBAL_RES_ID;
604 $GLOBAL_RES{$id} = pop;
605 global_req_add $id, [@_, $id];
606 }
607
608 $NODE_REQ{g_reply} = sub {
609 my $id = shift;
610 global_req_del $id;
611 my $cb = delete $GLOBAL_RES{$id}
612 or return;
613 &$cb
614 };
615
616 #################################
617
618 sub g_find {
619 global_req_add "g_find $_[0]", [g_find => $_[0]];
620 }
621
622 # reply for g_find started in Node.pm
623 $NODE_REQ{g_found} = sub {
624 global_req_del "g_find $_[0]";
625
626 my $node = $NODE{$_[0]} or return;
627
628 $node->connect_to ($_[1]);
629 };
630
631 sub master_set {
632 $MASTER = $_[0];
633
634 snd $MASTER, g_slave => \%LOCAL_DB;
635
636 # (re-)send queued requests
637 snd $MASTER, @$_
638 for values %GLOBAL_REQ;
639 }
640
641 sub master_search {
642 #TODO: should also look for other global nodes, but we don't know them #d#
643 for (keys %NODE_SEED) {
644 if (node_is_up $_) {
645 master_set $_;
646 return;
647 }
648 }
649
650 $MASTER_MON = mon_nodes sub {
651 return unless $_[1]; # we are only interested in node-ups
652 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
653
654 master_set $_[0];
655
656 $MASTER_MON = mon_nodes sub {
657 if ($_[0] eq $MASTER && !$_[1]) {
658 undef $MASTER;
659 master_search ();
660 }
661 };
662 };
663 }
664
665 # other node wants to make us the master
666 $NODE_REQ{g_slave} = sub {
667 my ($db) = @_;
668
669 # load global module and redo the request
670 require AnyEvent::MP::Global;
671 &{ $NODE_REQ{g_slave} }
672 };
673
674 #############################################################################
675 # local database operations
676
677 # local database management
678
679 sub db_set($$;$) {
680 my ($family, $subkey) = @_;
681
682 # if (ref $_[1]) {
683 # # bulk
684 # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
685 # $LOCAL_DB{$_[0]} = $_[1];
686 # snd $MASTER, g_upd => $_[0] => $_[1], \@del
687 # if defined $MASTER;
688 # } else {
689 # single-key
690 $LOCAL_DB{$family}{$subkey} = $_[2];
691 snd $MASTER, g_upd => $family => { $subkey => $_[2] }
692 if defined $MASTER;
693 # }
694
695 defined wantarray
696 and Guard::guard { db_del $family => $subkey }
697 }
698
699 sub db_del($@) {
700 my $family = shift;
701
702 delete @{ $LOCAL_DB{$family} }{@_};
703 snd $MASTER, g_upd => $family => undef, \@_
704 if defined $MASTER;
705 }
706
707 # database query
708
709 sub db_family {
710 my ($family, $cb) = @_;
711 global_call g_db_family => $family, $cb;
712 }
713
714 sub db_keys {
715 my ($family, $cb) = @_;
716 global_call g_db_keys => $family, $cb;
717 }
718
719 sub db_values {
720 my ($family, $cb) = @_;
721 global_call g_db_values => $family, $cb;
722 }
723
724 # database monitoring
725
726 our %LOCAL_MON; # f, reply
727 our %MON_DB; # f, k, value
728
729 sub db_mon($@) {
730 my ($family, $cb) = @_;
731
732 if (my $db = $MON_DB{$family}) {
733 # we already monitor, so create a "dummy" change event
734 # this is postponed, which might be too late (we could process
735 # change events), so disable the callback at first
736 $LOCAL_MON{$family}{$cb+0} = sub { };
737 AE::postpone {
738 return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
739
740 # set actual callback
741 $LOCAL_MON{$family}{$cb+0} = $cb;
742 $cb->($db, [keys %$db]);
743 };
744 } else {
745 # new monitor, request chg1 from upstream
746 $LOCAL_MON{$family}{$cb+0} = $cb;
747 global_req_add "mon1 $family" => [g_mon1 => $family];
748 $MON_DB{$family} = {};
749 }
750
751 defined wantarray
752 and Guard::guard {
753 my $mon = $LOCAL_MON{$family};
754 delete $mon->{$cb+0};
755
756 unless (%$mon) {
757 global_req_del "mon1 $family";
758
759 # no global_req, because we don't care if we are not connected
760 snd $MASTER, g_mon0 => $family
761 if $MASTER;
762
763 delete $LOCAL_MON{$family};
764 delete $MON_DB{$family};
765 }
766 }
767 }
768
769 # full update
770 $NODE_REQ{g_chg1} = sub {
771 return unless $SRCNODE eq $MASTER;
772 my ($f, $ndb) = @_;
773
774 my $db = $MON_DB{$f};
775 my (@a, @c, @d);
776
777 # add or replace keys
778 while (my ($k, $v) = each %$ndb) {
779 exists $db->{$k}
780 ? push @c, $k
781 : push @a, $k;
782 $db->{$k} = $v;
783 }
784
785 # delete keys that are no longer present
786 for (grep !exists $ndb->{$_}, keys %$db) {
787 delete $db->{$_};
788 push @d, $_;
789 }
790
791 $_->($db, \@a, \@c, \@d)
792 for values %{ $LOCAL_MON{$_[0]} };
793 };
794
795 # incremental update
796 $NODE_REQ{g_chg2} = sub {
797 return unless $SRCNODE eq $MASTER;
798 my ($family, $set, $del) = @_;
799
800 my $db = $MON_DB{$family};
801
802 my (@a, @c);
803
804 while (my ($k, $v) = each %$set) {
805 exists $db->{$k}
806 ? push @c, $k
807 : push @a, $k;
808 $db->{$k} = $v;
809 }
810
811 delete @$db{@$del};
812
813 $_->($db, \@a, \@c, $del)
814 for values %{ $LOCAL_MON{$family} };
815 };
816
817 #############################################################################
818 # configure
819
820 sub nodename {
821 require POSIX;
822 (POSIX::uname ())[1]
823 }
824
825 sub _resolve($) {
826 my ($nodeid) = @_;
827
828 my $cv = AE::cv;
829 my @res;
830
831 $cv->begin (sub {
832 my %seen;
833 my @refs;
834 for (sort { $a->[0] <=> $b->[0] } @res) {
835 push @refs, $_->[1] unless $seen{$_->[1]}++
836 }
837 shift->send (@refs);
838 });
839
840 my $idx;
841 for my $t (split /,/, $nodeid) {
842 my $pri = ++$idx;
843
844 $t = length $t ? nodename . ":$t" : nodename
845 if $t =~ /^\d*$/;
846
847 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
848 or Carp::croak "$t: unparsable transport descriptor";
849
850 $port = "0" if $port eq "*";
851
852 if ($host eq "*") {
853 $cv->begin;
854 # use fork_call, as Net::Interface is big, and we need it rarely.
855 require AnyEvent::Util;
856 AnyEvent::Util::fork_call (
857 sub {
858 my @addr;
859
860 require Net::Interface;
861
862 for my $if (Net::Interface->interfaces) {
863 # we statically lower-prioritise ipv6 here, TODO :()
864 for $_ ($if->address (Net::Interface::AF_INET ())) {
865 next if /^\x7f/; # skip localhost etc.
866 push @addr, $_;
867 }
868 for ($if->address (Net::Interface::AF_INET6 ())) {
869 #next if $if->scope ($_) <= 2;
870 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
871 push @addr, $_;
872 }
873
874 }
875 @addr
876 }, sub {
877 for my $ip (@_) {
878 push @res, [
879 $pri += 1e-5,
880 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
881 ];
882 }
883 $cv->end;
884 }
885 );
886 } else {
887 $cv->begin;
888 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
889 for (@_) {
890 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
891 push @res, [
892 $pri += 1e-5,
893 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
894 ];
895 }
896 $cv->end;
897 };
898 }
899 }
900
901 $cv->end;
902
903 $cv
904 }
905
906 sub configure(@) {
907 unshift @_, "profile" if @_ & 1;
908 my (%kv) = @_;
909
910 delete $NODE{$NODE}; # we do not support doing stuff before configure
911 _init_names;
912
913 my $profile = delete $kv{profile};
914
915 $profile = nodename
916 unless defined $profile;
917
918 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
919
920 if (exists $CONFIG->{secure}) {
921 $SECURE = eval +($CONFIG->{secure} ? "sub { 0 }" : "sub { 1 }");
922 }
923
924 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
925
926 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
927
928 $NODE = $node;
929
930 $NODE =~ s/%n/nodename/ge;
931
932 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
933 # nodes with randomised node names do not need randomised port names
934 $UNIQ = "";
935 }
936
937 $NODE{$NODE} = $NODE{""};
938 $NODE{$NODE}{id} = $NODE;
939
940 my $seeds = $CONFIG->{seeds};
941 my $binds = $CONFIG->{binds};
942
943 $binds ||= ["*"];
944
945 AE::log 8 => "node $NODE starting up.";
946
947 $BINDS = [];
948 %BINDS = ();
949
950 for (map _resolve $_, @$binds) {
951 for my $bind ($_->recv) {
952 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
953 or Carp::croak "$bind: unparsable local bind address";
954
955 my $listener = AnyEvent::MP::Transport::mp_server
956 $host,
957 $port,
958 prepare => sub {
959 my (undef, $host, $port) = @_;
960 $bind = AnyEvent::Socket::format_hostport $host, $port;
961 0
962 },
963 ;
964 $BINDS{$bind} = $listener;
965 push @$BINDS, $bind;
966 }
967 }
968
969 db_set "'l" => $NODE => $BINDS;
970
971 AE::log 8 => "node listens on [@$BINDS].";
972
973 # connect to all seednodes
974 set_seeds map $_->recv, map _resolve $_, @$seeds;
975
976 master_search;
977
978 for (@{ $CONFIG->{services} }) {
979 if (ref) {
980 my ($func, @args) = @$_;
981 (load_func $func)->(@args);
982 } elsif (s/::$//) {
983 eval "require $_";
984 die $@ if $@;
985 } else {
986 (load_func $_)->();
987 }
988 }
989 }
990
991 =back
992
993 =head1 SEE ALSO
994
995 L<AnyEvent::MP>.
996
997 =head1 AUTHOR
998
999 Marc Lehmann <schmorp@schmorp.de>
1000 http://home.schmorp.de/
1001
1002 =cut
1003
1004 1
1005