ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.103
Committed: Fri Mar 23 03:24:41 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.102: +61 -29 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use Carp ();
28
29 use AnyEvent ();
30 use Guard ();
31
32 use AnyEvent::MP::Node;
33 use AnyEvent::MP::Transport;
34
35 use base "Exporter";
36
37 our @EXPORT_OK = qw(
38 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
39 );
40
41 our @EXPORT = qw(
42 add_node load_func snd_to_func snd_on eval_on
43
44 NODE $NODE node_of snd kil port_is_local
45 configure
46 up_nodes mon_nodes node_is_up
47 db_set db_del
48 db_mon db_family db_keys db_values
49 );
50
51 sub load_func($) {
52 my $func = $_[0];
53
54 unless (defined &$func) {
55 my $pkg = $func;
56 do {
57 $pkg =~ s/::[^:]+$//
58 or return sub { die "unable to resolve function '$func'" };
59
60 local $@;
61 unless (eval "require $pkg; 1") {
62 my $error = $@;
63 $error =~ /^Can't locate .*.pm in \@INC \(/
64 or return sub { die $error };
65 }
66 } until defined &$func;
67 }
68
69 \&$func
70 }
71
72 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
73
74 sub nonce($) {
75 join "", map chr rand 256, 1 .. $_[0]
76 }
77
78 sub nonce62($) {
79 join "", map $alnum[rand 62], 1 .. $_[0]
80 }
81
82 sub gen_uniq {
83 my $now = AE::now;
84 (join "",
85 map $alnum[$_],
86 $$ / 62 % 62,
87 $$ % 62,
88 (int $now ) % 62,
89 (int $now * 100) % 62,
90 (int $now * 10000) % 62,
91 ) . nonce62 4;
92 }
93
94 our $CONFIG; # this node's configuration
95 our $SECURE = sub { 1 };
96
97 our $RUNIQ; # remote uniq value
98 our $UNIQ; # per-process/node unique cookie
99 our $NODE;
100 our $ID = "a";
101
102 our %NODE; # node id to transport mapping, or "undef", for local node
103 our (%PORT, %PORT_DATA); # local ports
104
105 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
106 our %LMON; # monitored _local_ ports
107
108 our $GLOBAL; # true if node is a global ("directory") node
109 our %BINDS;
110 our $BINDS; # our listeners, as arrayref
111
112 our $SRCNODE; # holds the sending node _object_ during _inject
113
114 sub _init_names {
115 # ~54 bits, for local port names, lowercase $ID appended
116 $UNIQ = gen_uniq;
117
118 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
119 $RUNIQ = nonce62 10;
120 $RUNIQ =~ s/(.)$/\U$1/;
121
122 $NODE = "anon/$RUNIQ";
123 }
124
125 _init_names;
126
127 sub NODE() {
128 $NODE
129 }
130
131 sub node_of($) {
132 my ($node, undef) = split /#/, $_[0], 2;
133
134 $node
135 }
136
137 BEGIN {
138 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
139 ? sub () { 1 }
140 : sub () { 0 };
141 }
142
143 our $DELAY_TIMER;
144 our @DELAY_QUEUE;
145
146 our $delay_run = sub {
147 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
148 };
149
150 sub delay($) {
151 push @DELAY_QUEUE, shift;
152 $DELAY_TIMER ||= AE::timer 0, 0, $delay_run;
153 }
154
155 =item $AnyEvent::MP::Kernel::SRCNODE
156
157 During execution of a message callback, this variable contains the node ID
158 of the origin node.
159
160 The main use of this variable is for debugging output - there are probably
161 very few other cases where you need to know the source node ID.
162
163 =cut
164
165 sub _inject {
166 warn "RCV $SRCNODE -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
167 &{ $PORT{+shift} or return };
168 }
169
170 # this function adds a node-ref, so you can send stuff to it
171 # it is basically the central routing component.
172 sub add_node {
173 my ($node) = @_;
174
175 length $node
176 or Carp::croak "'undef' or the empty string are not valid node/port IDs";
177
178 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
179 }
180
181 sub snd(@) {
182 my ($nodeid, $portid) = split /#/, shift, 2;
183
184 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
185
186 ($NODE{$nodeid} || add_node $nodeid)
187 ->{send} (["$portid", @_]);
188 }
189
190 =item $is_local = port_is_local $port
191
192 Returns true iff the port is a local port.
193
194 =cut
195
196 sub port_is_local($) {
197 my ($nodeid, undef) = split /#/, $_[0], 2;
198
199 $NODE{$nodeid} == $NODE{""}
200 }
201
202 =item snd_to_func $node, $func, @args
203
204 Expects a node ID and a name of a function. Asynchronously tries to call
205 this function with the given arguments on that node.
206
207 This function can be used to implement C<spawn>-like interfaces.
208
209 =cut
210
211 sub snd_to_func($$;@) {
212 my $nodeid = shift;
213
214 # on $NODE, we artificially delay... (for spawn)
215 # this is very ugly - maybe we should simply delay ALL messages,
216 # to avoid deep recursion issues. but that's so... slow...
217 $AnyEvent::MP::Node::Self::DELAY = 1
218 if $nodeid ne $NODE;
219
220 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
221 }
222
223 =item snd_on $node, @msg
224
225 Executes C<snd> with the given C<@msg> (which must include the destination
226 port) on the given node.
227
228 =cut
229
230 sub snd_on($@) {
231 my $node = shift;
232 snd $node, snd => @_;
233 }
234
235 =item eval_on $node, $string[, @reply]
236
237 Evaluates the given string as Perl expression on the given node. When
238 @reply is specified, then it is used to construct a reply message with
239 C<"$@"> and any results from the eval appended.
240
241 =cut
242
243 sub eval_on($$;@) {
244 my $node = shift;
245 snd $node, eval => @_;
246 }
247
248 sub kil(@) {
249 my ($nodeid, $portid) = split /#/, shift, 2;
250
251 length $portid
252 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
253
254 ($NODE{$nodeid} || add_node $nodeid)
255 ->kill ("$portid", @_);
256 }
257
258 #############################################################################
259 # node monitoring and info
260
261 =item node_is_up $nodeid
262
263 Returns true if the given node is "up", that is, the kernel thinks it has
264 a working connection to it.
265
266 If the node is up, returns C<1>. If the node is currently connecting or
267 otherwise known but not connected, returns C<0>. If nothing is known about
268 the node, returns C<undef>.
269
270 =cut
271
272 sub node_is_up($) {
273 ($NODE{$_[0]} or return)->{transport}
274 ? 1 : 0
275 }
276
277 =item up_nodes
278
279 Return the node IDs of all nodes that are currently connected (excluding
280 the node itself).
281
282 =cut
283
284 sub up_nodes() {
285 map $_->{id}, grep $_->{transport}, values %NODE
286 }
287
288 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
289
290 Registers a callback that is called each time a node goes up (a connection
291 is established) or down (the connection is lost).
292
293 Node up messages can only be followed by node down messages for the same
294 node, and vice versa.
295
296 Note that monitoring a node is usually better done by monitoring its node
297 port. This function is mainly of interest to modules that are concerned
298 about the network topology and low-level connection handling.
299
300 Callbacks I<must not> block and I<should not> send any messages.
301
302 The function returns an optional guard which can be used to unregister
303 the monitoring callback again.
304
305 Example: make sure you call function C<newnode> for all nodes that are up
306 or go up (and down).
307
308 newnode $_, 1 for up_nodes;
309 mon_nodes \&newnode;
310
311 =cut
312
313 our %MON_NODES;
314
315 sub mon_nodes($) {
316 my ($cb) = @_;
317
318 $MON_NODES{$cb+0} = $cb;
319
320 defined wantarray
321 and Guard::guard { delete $MON_NODES{$cb+0} }
322 }
323
324 sub _inject_nodeevent($$;@) {
325 my ($node, $up, @reason) = @_;
326
327 for my $cb (values %MON_NODES) {
328 eval { $cb->($node->{id}, $up, @reason); 1 }
329 or AE::log die => $@;
330 }
331
332 AE::log 7 => "$node->{id} is " . ($up ? "up." : "down (@reason).");
333 }
334
335 #############################################################################
336 # self node code
337
338 sub _kill {
339 my $port = shift;
340
341 delete $PORT{$port}
342 or return; # killing nonexistent ports is O.K.
343 delete $PORT_DATA{$port};
344
345 my $mon = delete $LMON{$port}
346 or !@_
347 or AE::log die => "unmonitored local port $port died with reason: @_";
348
349 $_->(@_) for values %$mon;
350 }
351
352 sub _monitor {
353 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
354 unless exists $PORT{$_[1]};
355
356 $LMON{$_[1]}{$_[2]+0} = $_[2];
357 }
358
359 sub _unmonitor {
360 delete $LMON{$_[1]}{$_[2]+0}
361 if exists $LMON{$_[1]};
362 }
363
364 sub _secure_check {
365 &$SECURE
366 or die "remote execution attempt by insecure node\n";
367 }
368
369 our %NODE_REQ = (
370 # internal services
371
372 # monitoring
373 mon0 => sub { # stop monitoring a port for another node
374 my $portid = shift;
375 _unmonitor undef, $portid, delete $NODE{$SRCNODE}{rmon}{$portid};
376 },
377 mon1 => sub { # start monitoring a port for another node
378 my $portid = shift;
379 Scalar::Util::weaken (my $node = $NODE{$SRCNODE});
380 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
381 delete $node->{rmon}{$portid};
382 $node->send (["", kil0 => $portid, @_])
383 if $node && $node->{transport};
384 };
385 },
386 # another node has killed a monitored port
387 kil0 => sub {
388 my $cbs = delete $NODE{$SRCNODE}{lmon}{+shift}
389 or return;
390
391 $_->(@_) for @$cbs;
392 },
393
394 # "public" services - not actually public
395
396 # another node wants to kill a local port
397 kil => \&_kill,
398
399 # relay message to another node / generic echo
400 snd => sub {
401 &_secure_check;
402 &snd
403 },
404
405 # random utilities
406 eval => sub {
407 &_secure_check;
408 my @res = do { package main; eval shift };
409 snd @_, "$@", @res if @_;
410 },
411 time => sub {
412 &_secure_check;
413 snd @_, AE::now;
414 },
415 devnull => sub {
416 #
417 },
418 "" => sub {
419 # empty messages are keepalives or similar devnull-applications
420 },
421 );
422
423 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
424 $PORT{""} = sub {
425 my $tag = shift;
426 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
427 AE::log die => "error processing node message from $SRCNODE: $@" if $@;
428 };
429
430 our $NPROTO = 1;
431
432 # tell everybody who connects our nproto
433 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
434 $_[0]{local_greeting}{nproto} = $NPROTO;
435 };
436
437 #############################################################################
438 # seed management, try to keep connections to all seeds at all times
439
440 our %SEED_NODE; # seed ID => node ID|undef
441 our %NODE_SEED; # map node ID to seed ID
442 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
443 our $SEED_WATCHER;
444 our $SEED_RETRY;
445
446 sub seed_connect {
447 my ($seed) = @_;
448
449 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
450 or Carp::croak "$seed: unparsable seed address";
451
452 AE::log 9 => "trying connect to seed node $seed.";
453
454 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
455 $host, $port,
456 on_greeted => sub {
457 # called after receiving remote greeting, learn remote node name
458
459 # we rely on untrusted data here (the remote node name) this is
460 # hopefully ok, as this can at most be used for DOSing, which is easy
461 # when you can do MITM anyway.
462
463 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
464 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
465 require AnyEvent::MP::Global; # every seed becomes a global node currently
466 delete $SEED_NODE{$seed};
467 } else {
468 $SEED_NODE{$seed} = $_[0]{remote_node};
469 $NODE_SEED{$_[0]{remote_node}} = $seed;
470 # also start global service, if not running
471 # we need to check here in addition to the mon_nodes below
472 # because we might only learn late that a node is a seed
473 # and then we might already be connected
474 snd $_[0]{remote_node}, "g_slave"
475 unless $_[0]{remote_greeting}{global};
476 }
477 },
478 sub {
479 delete $SEED_CONNECT{$seed};
480 }
481 ;
482 }
483
484 sub seed_all {
485 my @seeds = grep
486 !exists $SEED_CONNECT{$_}
487 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}),
488 keys %SEED_NODE;
489
490 if (@seeds) {
491 # start connection attempt for every seed we are not connected to yet
492 seed_connect $_
493 for @seeds;
494
495 $SEED_RETRY = $SEED_RETRY * 2 + rand;
496 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
497 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
498
499 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
500
501 } else {
502 # all seeds connected or connecting, no need to restart timer
503 undef $SEED_WATCHER;
504 }
505 }
506
507 sub seed_again {
508 $SEED_RETRY = 1;
509 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
510 }
511
512 # sets new seed list, starts connecting
513 sub set_seeds(@) {
514 %SEED_NODE = ();
515 %NODE_SEED = ();
516 %SEED_CONNECT = ();
517
518 @SEED_NODE{@_} = ();
519
520 seed_all;
521 }
522
523 mon_nodes sub {
524 return unless exists $NODE_SEED{$_[0]};
525
526 if ($_[1]) {
527 # each time a connection to a seed node goes up, make
528 # sure it runs the global service.
529 snd $_[0], "g_slave"
530 unless $NODE{$_[0]}{transport}{remote_greeting}{global};
531 } else {
532 # if we lost the connection to a seed node, make sure we are seeding
533 seed_again;
534 }
535 };
536
537 #############################################################################
538 # talk with/to global nodes
539
540 # protocol messages:
541 #
542 # sent by all slave nodes (slave to master)
543 # g_slave database - make other global node master of the sender
544 #
545 # sent by any node to global nodes
546 # g_set database - set whole database
547 # g_upd family set del - update single family
548 # g_del family key - delete key from database
549 # g_get family key reply... - send reply with data
550 #
551 # send by global nodes
552 # g_global - node became global, similar to global=1 greeting
553 #
554 # database families
555 # "'l" -> node -> listeners
556 # "'g" -> node -> undef
557 # ...
558 #
559
560 # used on all nodes:
561 our $MASTER; # the global node we bind ourselves to
562 our $MASTER_MON;
563 our %LOCAL_DB; # this node database
564
565 our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
566
567 our $GPROTO = 1;
568
569 # tell everybody who connects our nproto
570 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
571 $_[0]{local_greeting}{gproto} = $GPROTO;
572 };
573
574 #############################################################################
575 # master selection
576
577 # master requests
578 our %GLOBAL_REQ; # $id => \@req
579
580 sub global_req_add {
581 my ($id, $req) = @_;
582
583 return if exists $GLOBAL_REQ{$id};
584
585 $GLOBAL_REQ{$id} = $req;
586
587 snd $MASTER, @$req
588 if $MASTER;
589 }
590
591 sub global_req_del {
592 delete $GLOBAL_REQ{$_[0]};
593 }
594
595 #################################
596 # master rpc
597
598 our %GLOBAL_RES;
599 our $GLOBAL_RES_ID = "a";
600
601 sub global_call {
602 my $id = ++$GLOBAL_RES_ID;
603 $GLOBAL_RES{$id} = pop;
604 global_req_add $id, [@_, $id];
605 }
606
607 $NODE_REQ{g_reply} = sub {
608 my $id = shift;
609 global_req_del $id;
610 my $cb = delete $GLOBAL_RES{$id}
611 or return;
612 &$cb
613 };
614
615 #################################
616
617 sub g_find {
618 global_req_add "g_find $_[0]", [g_find => $_[0]];
619 }
620
621 # reply for g_find started in Node.pm
622 $NODE_REQ{g_found} = sub {
623 global_req_del "g_find $_[0]";
624
625 my $node = $NODE{$_[0]} or return;
626
627 $node->connect_to ($_[1]);
628 };
629
630 sub master_set {
631 $MASTER = $_[0];
632
633 snd $MASTER, g_slave => \%LOCAL_DB;
634
635 # (re-)send queued requests
636 snd $MASTER, @$_
637 for values %GLOBAL_REQ;
638 }
639
640 sub master_search {
641 #TODO: should also look for other global nodes, but we don't know them #d#
642 for (keys %NODE_SEED) {
643 if (node_is_up $_) {
644 master_set $_;
645 return;
646 }
647 }
648
649 $MASTER_MON = mon_nodes sub {
650 return unless $_[1]; # we are only interested in node-ups
651 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
652
653 master_set $_[0];
654
655 $MASTER_MON = mon_nodes sub {
656 if ($_[0] eq $MASTER && !$_[1]) {
657 undef $MASTER;
658 master_search ();
659 }
660 };
661 };
662 }
663
664 # other node wants to make us the master
665 $NODE_REQ{g_slave} = sub {
666 my ($db) = @_;
667
668 # load global module and redo the request
669 require AnyEvent::MP::Global;
670 &{ $NODE_REQ{g_slave} }
671 };
672
673 #############################################################################
674 # local database operations
675
676 # local database management
677
678 sub db_set($$;$) {
679 my ($family, $subkey) = @_;
680
681 # if (ref $_[1]) {
682 # # bulk
683 # my @del = grep exists $LOCAL_DB{$_[0]}{$_}, keys ${ $_[1] };
684 # $LOCAL_DB{$_[0]} = $_[1];
685 # snd $MASTER, g_upd => $_[0] => $_[1], \@del
686 # if defined $MASTER;
687 # } else {
688 # single-key
689 $LOCAL_DB{$family}{$subkey} = $_[2];
690 snd $MASTER, g_upd => $family => { $subkey => $_[2] }
691 if defined $MASTER;
692 # }
693
694 defined wantarray
695 and Guard::guard { db_del $family => $subkey }
696 }
697
698 sub db_del($@) {
699 my $family = shift;
700
701 delete @{ $LOCAL_DB{$family} }{@_};
702 snd $MASTER, g_upd => $family => undef, \@_
703 if defined $MASTER;
704 }
705
706 # database query
707
708 sub db_family {
709 my ($family, $cb) = @_;
710 global_call g_db_family => $family, $cb;
711 }
712
713 sub db_keys {
714 my ($family, $cb) = @_;
715 global_call g_db_keys => $family, $cb;
716 }
717
718 sub db_values {
719 my ($family, $cb) = @_;
720 global_call g_db_values => $family, $cb;
721 }
722
723 # database monitoring
724
725 our %LOCAL_MON; # f, reply
726 our %MON_DB; # f, k, value
727
728 sub db_mon($@) {
729 my ($family, $cb) = @_;
730
731 if (my $db = $MON_DB{$family}) {
732 # we already monitor, so create a "dummy" change event
733 # this is postponed, which might be too late (we could process
734 # change events), so disable the callback at first
735 $LOCAL_MON{$family}{$cb+0} = sub { };
736 AE::postpone {
737 return unless exists $LOCAL_MON{$family}{$cb+0}; # guard might have gone away already
738
739 # set actual callback
740 $LOCAL_MON{$family}{$cb+0} = $cb;
741 $cb->($db, [keys %$db]);
742 };
743 } else {
744 # new monitor, request chg1 from upstream
745 $LOCAL_MON{$family}{$cb+0} = $cb;
746 global_req_add "mon1 $family" => [g_mon1 => $family];
747 $MON_DB{$family} = {};
748 }
749
750 defined wantarray
751 and Guard::guard {
752 my $mon = $LOCAL_MON{$family};
753 delete $mon->{$cb+0};
754
755 unless (%$mon) {
756 global_req_del "mon1 $family";
757
758 # no global_req, because we don't care if we are not connected
759 snd $MASTER, g_mon0 => $family
760 if $MASTER;
761
762 delete $LOCAL_MON{$family};
763 delete $MON_DB{$family};
764 }
765 }
766 }
767
768 # full update
769 $NODE_REQ{g_chg1} = sub {
770 return unless $SRCNODE eq $MASTER;
771 my ($f, $ndb) = @_;
772
773 my $db = $MON_DB{$f};
774 my (@a, @c, @d);
775
776 # add or replace keys
777 while (my ($k, $v) = each %$ndb) {
778 exists $db->{$k}
779 ? push @c, $k
780 : push @a, $k;
781 $db->{$k} = $v;
782 }
783
784 # delete keys that are no longer present
785 for (grep !exists $ndb->{$_}, keys %$db) {
786 delete $db->{$_};
787 push @d, $_;
788 }
789
790 $_->($db, \@a, \@c, \@d)
791 for values %{ $LOCAL_MON{$_[0]} };
792 };
793
794 # incremental update
795 $NODE_REQ{g_chg2} = sub {
796 return unless $SRCNODE eq $MASTER;
797 my ($family, $set, $del) = @_;
798
799 my $db = $MON_DB{$family};
800
801 my (@a, @c);
802
803 while (my ($k, $v) = each %$set) {
804 exists $db->{$k}
805 ? push @c, $k
806 : push @a, $k;
807 $db->{$k} = $v;
808 }
809
810 delete @$db{@$del};
811
812 $_->($db, \@a, \@c, $del)
813 for values %{ $LOCAL_MON{$family} };
814 };
815
816 #############################################################################
817 # configure
818
819 sub nodename {
820 require POSIX;
821 (POSIX::uname ())[1]
822 }
823
824 sub _resolve($) {
825 my ($nodeid) = @_;
826
827 my $cv = AE::cv;
828 my @res;
829
830 $cv->begin (sub {
831 my %seen;
832 my @refs;
833 for (sort { $a->[0] <=> $b->[0] } @res) {
834 push @refs, $_->[1] unless $seen{$_->[1]}++
835 }
836 shift->send (@refs);
837 });
838
839 my $idx;
840 for my $t (split /,/, $nodeid) {
841 my $pri = ++$idx;
842
843 $t = length $t ? nodename . ":$t" : nodename
844 if $t =~ /^\d*$/;
845
846 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
847 or Carp::croak "$t: unparsable transport descriptor";
848
849 $port = "0" if $port eq "*";
850
851 if ($host eq "*") {
852 $cv->begin;
853
854 my $get_addr = sub {
855 my @addr;
856
857 require Net::Interface;
858
859 # Net::Interface hangs on some systems, so hope for the best
860 local $SIG{ALRM} = 'DEFAULT';
861 alarm 2;
862
863 for my $if (Net::Interface->interfaces) {
864 # we statically lower-prioritise ipv6 here, TODO :()
865 for $_ ($if->address (Net::Interface::AF_INET ())) {
866 next if /^\x7f/; # skip localhost etc.
867 push @addr, $_;
868 }
869 for ($if->address (Net::Interface::AF_INET6 ())) {
870 #next if $if->scope ($_) <= 2;
871 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
872 push @addr, $_;
873 }
874 }
875
876 alarm 0;
877
878 @addr
879 };
880
881 my @addr;
882
883 if (AnyEvent::WIN32) {
884 @addr = $get_addr->();
885 } else {
886 # use a child process, as Net::Interface is big, and we need it only once.
887
888 pipe my $r, my $w
889 or die "pipe: $!";
890
891 if (fork eq 0) {
892 close $r;
893 syswrite $w, pack "(C/a*)*", $get_addr->();
894 require POSIX;
895 POSIX::_exit (0);
896 } else {
897 close $w;
898
899 my $addr;
900
901 1 while sysread $r, $addr, 1024, length $addr;
902
903 @addr = unpack "(C/a*)*", $addr;
904 }
905 }
906
907 for my $ip (@addr) {
908 push @res, [
909 $pri += 1e-5,
910 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
911 ];
912 }
913 $cv->end;
914 } else {
915 $cv->begin;
916 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
917 for (@_) {
918 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
919 push @res, [
920 $pri += 1e-5,
921 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
922 ];
923 }
924 $cv->end;
925 };
926 }
927 }
928
929 $cv->end;
930
931 $cv
932 }
933
934 sub configure(@) {
935 unshift @_, "profile" if @_ & 1;
936 my (%kv) = @_;
937
938 delete $NODE{$NODE}; # we do not support doing stuff before configure
939 _init_names;
940
941 my $profile = delete $kv{profile};
942
943 $profile = nodename
944 unless defined $profile;
945
946 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
947
948 if (exists $CONFIG->{secure}) {
949 $SECURE = eval +($CONFIG->{secure} ? "sub { 0 }" : "sub { 1 }");
950 }
951
952 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
953
954 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
955
956 $NODE = $node;
957
958 $NODE =~ s/%n/nodename/ge;
959
960 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
961 # nodes with randomised node names do not need randomised port names
962 $UNIQ = "";
963 }
964
965 $NODE{$NODE} = $NODE{""};
966 $NODE{$NODE}{id} = $NODE;
967
968 my $seeds = $CONFIG->{seeds};
969 my $binds = $CONFIG->{binds};
970
971 $binds ||= ["*"];
972
973 AE::log 8 => "node $NODE starting up.";
974
975 $BINDS = [];
976 %BINDS = ();
977
978 for (map _resolve $_, @$binds) {
979 for my $bind ($_->recv) {
980 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
981 or Carp::croak "$bind: unparsable local bind address";
982
983 my $listener = AnyEvent::MP::Transport::mp_server
984 $host,
985 $port,
986 prepare => sub {
987 my (undef, $host, $port) = @_;
988 $bind = AnyEvent::Socket::format_hostport $host, $port;
989 0
990 },
991 ;
992 $BINDS{$bind} = $listener;
993 push @$BINDS, $bind;
994 }
995 }
996
997 db_set "'l" => $NODE => $BINDS;
998
999 AE::log 8 => "node listens on [@$BINDS].";
1000
1001 # connect to all seednodes
1002 set_seeds map $_->recv, map _resolve $_, @$seeds;
1003
1004 master_search;
1005
1006 # save gobs of memory
1007 undef &_resolve;
1008 *configure = sub (@){ };
1009
1010 for (@{ $CONFIG->{services} }) {
1011 if (ref) {
1012 my ($func, @args) = @$_;
1013 (load_func $func)->(@args);
1014 } elsif (s/::$//) {
1015 eval "require $_";
1016 die $@ if $@;
1017 } else {
1018 (load_func $_)->();
1019 }
1020 }
1021
1022 eval "#line 1 \"(eval configure parameter)\"\n$CONFIG->{eval}";
1023 die "$@" if $@;
1024 }
1025
1026 =back
1027
1028 =head1 LOGGING
1029
1030 AnyEvent::MP::Kernel logs high-level information about the current node,
1031 when nodes go up and down, and most runtime errors. It also logs some
1032 debugging and trace messages about network maintainance, such as seed
1033 connections and global node management.
1034
1035 =head1 SEE ALSO
1036
1037 L<AnyEvent::MP>.
1038
1039 =head1 AUTHOR
1040
1041 Marc Lehmann <schmorp@schmorp.de>
1042 http://home.schmorp.de/
1043
1044 =cut
1045
1046 1
1047