ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.88
Committed: Thu Mar 8 21:37:51 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.87: +50 -9 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29
30 use AnyEvent ();
31 use Guard ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT_OK = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 );
41
42 our @EXPORT = qw(
43 add_node load_func snd_to_func snd_on eval_on
44
45 NODE $NODE node_of snd kil port_is_local
46 configure
47 up_nodes mon_nodes node_is_up
48 db_set db_del db_reg
49 db_mon db_family db_keys db_values
50 );
51
52 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
53
54 This value is called with an error or warning message, when e.g. a
55 connection could not be created, authorisation failed and so on.
56
57 It I<must not> block or send messages -queue it and use an idle watcher if
58 you need to do any of these things.
59
60 C<$level> should be C<0> for messages to be logged always, C<1> for
61 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62 node connectivity and services, C<8> for debugging messages and C<9> for
63 tracing messages.
64
65 The default simply logs the message to STDERR.
66
67 =item @AnyEvent::MP::Kernel::WARN
68
69 All code references in this array are called for every log message, from
70 the default C<$WARN> handler. This is an easy way to tie into the log
71 messages without disturbing others.
72
73 =cut
74
75 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
76 our @WARN;
77 our $WARN = sub {
78 &$_ for @WARN;
79
80 return if $WARNLEVEL < $_[0];
81
82 my ($level, $msg) = @_;
83
84 $msg =~ s/\n$//;
85
86 printf STDERR "%s <%d> %s\n",
87 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
88 $level,
89 $msg;
90 };
91
92 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
93
94 The maximum level at which warning messages will be printed to STDERR by
95 the default warn handler.
96
97 =cut
98
99 sub load_func($) {
100 my $func = $_[0];
101
102 unless (defined &$func) {
103 my $pkg = $func;
104 do {
105 $pkg =~ s/::[^:]+$//
106 or return sub { die "unable to resolve function '$func'" };
107
108 local $@;
109 unless (eval "require $pkg; 1") {
110 my $error = $@;
111 $error =~ /^Can't locate .*.pm in \@INC \(/
112 or return sub { die $error };
113 }
114 } until defined &$func;
115 }
116
117 \&$func
118 }
119
120 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
121
122 sub nonce($) {
123 join "", map chr rand 256, 1 .. $_[0]
124 }
125
126 sub nonce62($) {
127 join "", map $alnum[rand 62], 1 .. $_[0]
128 }
129
130 sub gen_uniq {
131 my $now = AE::now;
132 (join "",
133 map $alnum[$_],
134 $$ / 62 % 62,
135 $$ % 62,
136 (int $now ) % 62,
137 (int $now * 100) % 62,
138 (int $now * 10000) % 62,
139 ) . nonce62 4;
140 }
141
142 our $CONFIG; # this node's configuration
143 our $SECURE = sub { 1 };
144
145 our $RUNIQ; # remote uniq value
146 our $UNIQ; # per-process/node unique cookie
147 our $NODE;
148 our $ID = "a";
149
150 our %NODE; # node id to transport mapping, or "undef", for local node
151 our (%PORT, %PORT_DATA); # local ports
152
153 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
154 our %LMON; # monitored _local_ ports
155
156 our $GLOBAL; # true if node is a global ("directory") node
157 our %BINDS;
158 our $BINDS; # our listeners, as arrayref
159
160 our $SRCNODE; # holds the sending node _object_ during _inject
161
162 sub _init_names {
163 # ~54 bits, for local port names, lowercase $ID appended
164 $UNIQ = gen_uniq;
165
166 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
167 $RUNIQ = nonce62 10;
168 $RUNIQ =~ s/(.)$/\U$1/;
169
170 $NODE = "anon/$RUNIQ";
171 }
172
173 _init_names;
174
175 sub NODE() {
176 $NODE
177 }
178
179 sub node_of($) {
180 my ($node, undef) = split /#/, $_[0], 2;
181
182 $node
183 }
184
185 BEGIN {
186 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
187 ? sub () { 1 }
188 : sub () { 0 };
189 }
190
191 our $DELAY_TIMER;
192 our @DELAY_QUEUE;
193
194 sub _delay_run {
195 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
196 }
197
198 sub delay($) {
199 push @DELAY_QUEUE, shift;
200 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
201 }
202
203 sub _inject {
204 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
205 &{ $PORT{+shift} or return };
206 }
207
208 # this function adds a node-ref, so you can send stuff to it
209 # it is basically the central routing component.
210 sub add_node {
211 my ($node) = @_;
212
213 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
214 }
215
216 sub snd(@) {
217 my ($nodeid, $portid) = split /#/, shift, 2;
218
219 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
220
221 defined $nodeid #d#UGLY
222 or Carp::croak "'undef' is not a valid node ID/port ID";
223
224 ($NODE{$nodeid} || add_node $nodeid)
225 ->{send} (["$portid", @_]);
226 }
227
228 =item $is_local = port_is_local $port
229
230 Returns true iff the port is a local port.
231
232 =cut
233
234 sub port_is_local($) {
235 my ($nodeid, undef) = split /#/, $_[0], 2;
236
237 $NODE{$nodeid} == $NODE{""}
238 }
239
240 =item snd_to_func $node, $func, @args
241
242 Expects a node ID and a name of a function. Asynchronously tries to call
243 this function with the given arguments on that node.
244
245 This function can be used to implement C<spawn>-like interfaces.
246
247 =cut
248
249 sub snd_to_func($$;@) {
250 my $nodeid = shift;
251
252 # on $NODE, we artificially delay... (for spawn)
253 # this is very ugly - maybe we should simply delay ALL messages,
254 # to avoid deep recursion issues. but that's so... slow...
255 $AnyEvent::MP::Node::Self::DELAY = 1
256 if $nodeid ne $NODE;
257
258 defined $nodeid #d#UGLY
259 or Carp::croak "'undef' is not a valid node ID/port ID";
260
261 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
262 }
263
264 =item snd_on $node, @msg
265
266 Executes C<snd> with the given C<@msg> (which must include the destination
267 port) on the given node.
268
269 =cut
270
271 sub snd_on($@) {
272 my $node = shift;
273 snd $node, snd => @_;
274 }
275
276 =item eval_on $node, $string[, @reply]
277
278 Evaluates the given string as Perl expression on the given node. When
279 @reply is specified, then it is used to construct a reply message with
280 C<"$@"> and any results from the eval appended.
281
282 =cut
283
284 sub eval_on($$;@) {
285 my $node = shift;
286 snd $node, eval => @_;
287 }
288
289 sub kil(@) {
290 my ($nodeid, $portid) = split /#/, shift, 2;
291
292 length $portid
293 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
294
295 ($NODE{$nodeid} || add_node $nodeid)
296 ->kill ("$portid", @_);
297 }
298
299 #############################################################################
300 # node monitoring and info
301
302 =item node_is_known $nodeid
303
304 #TODO#
305 Returns true iff the given node is currently known to this node.
306
307 =cut
308
309 sub node_is_known($) {
310 exists $NODE{$_[0]}
311 }
312
313 =item node_is_up $nodeid
314
315 Returns true if the given node is "up", that is, the kernel thinks it has
316 a working connection to it.
317
318 If the node is known (to this local node) but not currently connected,
319 returns C<0>. If the node is not known, returns C<undef>.
320
321 =cut
322
323 sub node_is_up($) {
324 ($NODE{$_[0]} or return)->{transport}
325 ? 1 : 0
326 }
327
328 =item up_nodes
329
330 Return the node IDs of all nodes that are currently connected (excluding
331 the node itself).
332
333 =cut
334
335 sub up_nodes() {
336 map $_->{id}, grep $_->{transport}, values %NODE
337 }
338
339 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
340
341 Registers a callback that is called each time a node goes up (a connection
342 is established) or down (the connection is lost).
343
344 Node up messages can only be followed by node down messages for the same
345 node, and vice versa.
346
347 Note that monitoring a node is usually better done by monitoring its node
348 port. This function is mainly of interest to modules that are concerned
349 about the network topology and low-level connection handling.
350
351 Callbacks I<must not> block and I<should not> send any messages.
352
353 The function returns an optional guard which can be used to unregister
354 the monitoring callback again.
355
356 Example: make sure you call function C<newnode> for all nodes that are up
357 or go up (and down).
358
359 newnode $_, 1 for up_nodes;
360 mon_nodes \&newnode;
361
362 =cut
363
364 our %MON_NODES;
365
366 sub mon_nodes($) {
367 my ($cb) = @_;
368
369 $MON_NODES{$cb+0} = $cb;
370
371 defined wantarray && Guard::guard { delete $MON_NODES{$cb+0} }
372 }
373
374 sub _inject_nodeevent($$;@) {
375 my ($node, $up, @reason) = @_;
376
377 for my $cb (values %MON_NODES) {
378 eval { $cb->($node->{id}, $up, @reason); 1 }
379 or $WARN->(1, $@);
380 }
381
382 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
383 }
384
385 #############################################################################
386 # self node code
387
388 sub _kill {
389 my $port = shift;
390
391 delete $PORT{$port}
392 or return; # killing nonexistent ports is O.K.
393 delete $PORT_DATA{$port};
394
395 my $mon = delete $LMON{$port}
396 or !@_
397 or $WARN->(2, "unmonitored local port $port died with reason: @_");
398
399 $_->(@_) for values %$mon;
400 }
401
402 sub _monitor {
403 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
404 unless exists $PORT{$_[1]};
405
406 $LMON{$_[1]}{$_[2]+0} = $_[2];
407 }
408
409 sub _unmonitor {
410 delete $LMON{$_[1]}{$_[2]+0}
411 if exists $LMON{$_[1]};
412 }
413
414 sub _secure_check {
415 $SECURE->($SRCNODE->{id})
416 or $SRCNODE->{id} eq $NODE
417 or die "remote execution attempt by insecure node\n";
418 }
419
420 our %NODE_REQ = (
421 # internal services
422
423 # monitoring
424 mon0 => sub { # stop monitoring a port for another node
425 my $portid = shift;
426 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
427 },
428 mon1 => sub { # start monitoring a port for another node
429 my $portid = shift;
430 Scalar::Util::weaken (my $node = $SRCNODE);
431 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
432 delete $node->{rmon}{$portid};
433 $node->send (["", kil0 => $portid, @_])
434 if $node && $node->{transport};
435 };
436 },
437 # another node has killed a monitored port
438 kil0 => sub {
439 my $cbs = delete $SRCNODE->{lmon}{+shift}
440 or return;
441
442 $_->(@_) for @$cbs;
443 },
444
445 # "public" services - not actually public
446
447 # another node wants to kill a local port
448 kil => \&_kill,
449
450 # is the remote node considered secure?
451 # secure => sub {
452 # #TODO#
453 # },
454
455 # relay message to another node / generic echo
456 snd => sub {
457 &_secure_check;
458 &snd
459 },
460
461 # random utilities
462 eval => sub {
463 &_secure_check;
464 my @res = do { package main; eval shift };
465 snd @_, "$@", @res if @_;
466 },
467 time => sub {
468 &_secure_check;
469 snd @_, AE::now;
470 },
471 devnull => sub {
472 #
473 },
474 "" => sub {
475 # empty messages are keepalives or similar devnull-applications
476 },
477 );
478
479 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
480 $PORT{""} = sub {
481 my $tag = shift;
482 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
483 $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@;
484 };
485
486 our $NPROTO = 1;
487
488 # tell everybody who connects our nproto
489 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
490 $_[0]{local_greeting}{nproto} = $NPROTO;
491 };
492
493 #############################################################################
494 # seed management, try to keep connections to all seeds at all times
495
496 our %SEED_NODE; # seed ID => node ID|undef
497 our %NODE_SEED; # map node ID to seed ID
498 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
499 our $SEED_WATCHER;
500 our $SEED_RETRY;
501
502 sub seed_connect {
503 my ($seed) = @_;
504
505 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
506 or Carp::croak "$seed: unparsable seed address";
507
508 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
509
510 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
511 $host, $port,
512 on_greeted => sub {
513 # called after receiving remote greeting, learn remote node name
514
515 # we rely on untrusted data here (the remote node name) this is
516 # hopefully ok, as this can at most be used for DOSing, which is easy
517 # when you can do MITM anyway.
518
519 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
520 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
521 require AnyEvent::MP::Global; # every seed becomes a global node currently
522 delete $SEED_NODE{$seed};
523 delete $NODE_SEED{$seed};
524 } else {
525 $SEED_NODE{$seed} = $_[0]{remote_node};
526 $NODE_SEED{$_[0]{remote_node}} = $seed;
527 }
528 },
529 on_destroy => sub {
530 delete $SEED_CONNECT{$seed};
531 },
532 sub {
533 $SEED_CONNECT{$seed} = 1;
534 }
535 ;
536 }
537
538 sub seed_all {
539 my @seeds = grep {
540 !exists $SEED_CONNECT{$_}
541 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
542 } keys %SEED_NODE;
543
544 if (@seeds) {
545 # start connection attempt for every seed we are not connected to yet
546 seed_connect $_
547 for @seeds;
548
549 $SEED_RETRY = $SEED_RETRY * 2 + rand;
550 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
551 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
552
553 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
554
555 } else {
556 # all seeds connected or connecting, no need to restart timer
557 undef $SEED_WATCHER;
558 }
559 }
560
561 sub seed_again {
562 $SEED_RETRY = 1;
563 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
564 }
565
566 # sets new seed list, starts connecting
567 sub set_seeds(@) {
568 %SEED_NODE = ();
569 %NODE_SEED = ();
570 %SEED_CONNECT = ();
571
572 @SEED_NODE{@_} = ();
573
574 seed_again;#d#
575 seed_all;
576 }
577
578 mon_nodes sub {
579 # if we lost the connection to a seed node, make sure we are seeding
580 seed_again
581 if !$_[1] && exists $NODE_SEED{$_[0]};
582 };
583
584 #############################################################################
585 # talk with/to global nodes
586
587 # protocol messages:
588 #
589 # sent by all slave nodes (slave to master)
590 # g_slave database - make other global node master of the sender
591 #
592 # sent by any node to global nodes
593 # g_set database - set whole database
594 # g_add family key val - add/replace key to database
595 # g_del family key - delete key from database
596 # g_get family key reply... - send reply with data
597 #
598 # send by global nodes
599 # g_global - node became global, similar to global=1 greeting
600 #
601 # database families
602 # "'l" -> node -> listeners
603 # "'g" -> node -> undef
604 # ...
605 #
606
607 # used on all nodes:
608 our $MASTER; # the global node we bind ourselves to
609 our $MASTER_MON;
610 our %LOCAL_DB; # this node database
611
612 our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
613
614 our $GPROTO = 1;
615
616 # tell everybody who connects our nproto
617 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
618 $_[0]{local_greeting}{gproto} = $GPROTO;
619 };
620
621 #############################################################################
622 # master selection
623
624 # master requests
625 our %GLOBAL_REQ; # $id => \@req
626
627 sub global_req_add {
628 my ($id, $req) = @_;
629
630 return if exists $GLOBAL_REQ{$id};
631
632 $GLOBAL_REQ{$id} = $req;
633
634 snd $MASTER, @$req
635 if $MASTER;
636 }
637
638 sub global_req_del {
639 delete $GLOBAL_REQ{$_[0]};
640 }
641
642 #################################
643 # master rpc
644
645 our %GLOBAL_RES;
646 our $GLOBAL_RES_ID = "a";
647
648 sub global_call {
649 my $id = ++$GLOBAL_RES_ID;
650 $GLOBAL_RES{$id} = pop;
651 global_req_add $id, [@_, $id];
652 }
653
654 $NODE_REQ{g_reply} = sub {
655 my $id = shift;
656 global_req_del $id;
657 my $cb = delete $GLOBAL_RES{$id}
658 or return;
659 &$cb
660 };
661
662 #################################
663
664 sub g_find {
665 global_req_add "g_find $_[0]", [g_find => $_[0]];
666 }
667
668 # reply for g_find started in Node.pm
669 $NODE_REQ{g_found} = sub {
670 global_req_del "g_find $_[0]";
671
672 my $node = $NODE{$_[0]} or return;
673
674 $node->connect_to ($_[1]);
675 };
676
677 sub master_set {
678 $MASTER = $_[0];
679
680 snd $MASTER, g_slave => \%LOCAL_DB;
681
682 # (re-)send queued requests
683 snd $MASTER, @$_
684 for values %GLOBAL_REQ;
685 }
686
687 sub master_search {
688 #TODO: should also look for other global nodes, but we don't know them #d#
689 for (keys %NODE_SEED) {
690 if (node_is_up $_) {
691 master_set $_;
692 return;
693 }
694 }
695
696 $MASTER_MON = mon_nodes sub {
697 return unless $_[1]; # we are only interested in node-ups
698 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
699
700 master_set $_[0];
701
702 $MASTER_MON = mon_nodes sub {
703 if ($_[0] eq $MASTER && !$_[1]) {
704 undef $MASTER;
705 master_search ();
706 }
707 };
708 };
709 }
710
711 # other node wants to make us the master
712 $NODE_REQ{g_slave} = sub {
713 my ($db) = @_;
714
715 # load global module and redo the request
716 require AnyEvent::MP::Global;
717 &{ $NODE_REQ{g_slave} }
718 };
719
720 #############################################################################
721 # local database operations
722
723 # local database management
724
725 sub db_set($$;$) {
726 $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
727 snd $MASTER, g_add => $_[0] => $_[1] => $_[2]
728 if defined $MASTER;
729 }
730
731 sub db_del($$) {
732 delete $LOCAL_DB{$_[0]}{$_[1]};
733 snd $MASTER, g_del => $_[0] => $_[1]
734 if defined $MASTER;
735 }
736
737 sub db_reg($$;$) {
738 my ($family, $key) = @_;
739 &db_set;
740 Guard::guard { db_del $family => $key }
741 }
742
743 # database query
744
745 sub db_family {
746 my ($family, $cb) = @_;
747 global_call g_db_family => $family, $cb;
748 }
749
750 sub db_keys {
751 my ($family, $cb) = @_;
752 global_call g_db_keys => $family, $cb;
753 }
754
755 sub db_values {
756 my ($family, $cb) = @_;
757 global_call g_db_values => $family, $cb;
758 }
759
760 # database monitoring
761
762 our %LOCAL_MON; # f, reply
763 our %MON_DB; # f, k, value
764
765 sub db_mon($@) {
766 my ($family, $cb) = @_;
767
768 if (my $db = $MON_DB{$family}) {
769 # if we already monitor this thingy, generate
770 # create events for all of them
771 $cb->($db, [keys %$db]);
772 } else {
773 # new monitor, request chg1 from upstream
774 global_req_add "mon1 $family" => [g_mon1 => $family];
775 $MON_DB{$family} = {};
776 }
777
778 $LOCAL_MON{$family}{$cb+0} = $cb;
779
780 Guard::guard {
781 my $mon = $LOCAL_MON{$family};
782 delete $mon->{$cb+0};
783
784 unless (%$mon) {
785 global_req_del "mon1 $family";
786
787 # no global_req, because we don't care if we are not connected
788 snd $MASTER, g_mon0 => $family
789 if $MASTER;
790
791 delete $LOCAL_MON{$family};
792 delete $MON_DB{$family};
793 }
794 }
795 }
796
797 # full update
798 $NODE_REQ{g_chg1} = sub {
799 my ($f, $ndb) = @_;
800
801 my $db = $MON_DB{$f};
802 my @k;
803
804 # add or replace keys
805 while (my ($k, $v) = each %$ndb) {
806 $db->{$k} = $v;
807 push @k, $k;
808 }
809
810 # delete keys that are no longer present
811 for (grep !exists $ndb->{$_}, keys %$db) {
812 delete $db->{$_};
813 push @k, $_;
814 }
815
816 $_->($db, \@k)
817 for values %{ $LOCAL_MON{$_[0]} };
818 };
819
820 # incremental update
821 $NODE_REQ{g_chg2} = sub {
822 my $db = $MON_DB{$_[0]};
823
824 @_ >= 3
825 ? $db->{$_[1]} = $_[2]
826 : delete $db->{$_[1]};
827
828 $_->($db, [$_[1]])
829 for values %{ $LOCAL_MON{$_[0]} };
830 };
831
832 #############################################################################
833 # configure
834
835 sub nodename {
836 require POSIX;
837 (POSIX::uname ())[1]
838 }
839
840 sub _resolve($) {
841 my ($nodeid) = @_;
842
843 my $cv = AE::cv;
844 my @res;
845
846 $cv->begin (sub {
847 my %seen;
848 my @refs;
849 for (sort { $a->[0] <=> $b->[0] } @res) {
850 push @refs, $_->[1] unless $seen{$_->[1]}++
851 }
852 shift->send (@refs);
853 });
854
855 my $idx;
856 for my $t (split /,/, $nodeid) {
857 my $pri = ++$idx;
858
859 $t = length $t ? nodename . ":$t" : nodename
860 if $t =~ /^\d*$/;
861
862 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
863 or Carp::croak "$t: unparsable transport descriptor";
864
865 $port = "0" if $port eq "*";
866
867 if ($host eq "*") {
868 $cv->begin;
869 # use fork_call, as Net::Interface is big, and we need it rarely.
870 require AnyEvent::Util;
871 AnyEvent::Util::fork_call (
872 sub {
873 my @addr;
874
875 require Net::Interface;
876
877 for my $if (Net::Interface->interfaces) {
878 # we statically lower-prioritise ipv6 here, TODO :()
879 for $_ ($if->address (Net::Interface::AF_INET ())) {
880 next if /^\x7f/; # skip localhost etc.
881 push @addr, $_;
882 }
883 for ($if->address (Net::Interface::AF_INET6 ())) {
884 #next if $if->scope ($_) <= 2;
885 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
886 push @addr, $_;
887 }
888
889 }
890 @addr
891 }, sub {
892 for my $ip (@_) {
893 push @res, [
894 $pri += 1e-5,
895 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
896 ];
897 }
898 $cv->end;
899 }
900 );
901 } else {
902 $cv->begin;
903 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
904 for (@_) {
905 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
906 push @res, [
907 $pri += 1e-5,
908 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
909 ];
910 }
911 $cv->end;
912 };
913 }
914 }
915
916 $cv->end;
917
918 $cv
919 }
920
921 sub configure(@) {
922 unshift @_, "profile" if @_ & 1;
923 my (%kv) = @_;
924
925 delete $NODE{$NODE}; # we do not support doing stuff before configure
926 _init_names;
927
928 my $profile = delete $kv{profile};
929
930 $profile = nodename
931 unless defined $profile;
932
933 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
934
935 if (exists $CONFIG->{secure}) {
936 my $pass = !$CONFIG->{secure};
937 $SECURE = sub { $pass };
938 }
939
940 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
941
942 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
943
944 $NODE = $node;
945
946 $NODE =~ s/%n/nodename/ge;
947
948 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
949 # nodes with randomised node names do not need randomised port names
950 $UNIQ = "";
951 }
952
953 $NODE{$NODE} = $NODE{""};
954 $NODE{$NODE}{id} = $NODE;
955
956 my $seeds = $CONFIG->{seeds};
957 my $binds = $CONFIG->{binds};
958
959 $binds ||= ["*"];
960
961 $WARN->(8, "node $NODE starting up.");
962
963 $BINDS = [];
964 %BINDS = ();
965
966 for (map _resolve $_, @$binds) {
967 for my $bind ($_->recv) {
968 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
969 or Carp::croak "$bind: unparsable local bind address";
970
971 my $listener = AnyEvent::MP::Transport::mp_server
972 $host,
973 $port,
974 prepare => sub {
975 my (undef, $host, $port) = @_;
976 $bind = AnyEvent::Socket::format_hostport $host, $port;
977 0
978 },
979 ;
980 $BINDS{$bind} = $listener;
981 push @$BINDS, $bind;
982 }
983 }
984
985 db_set "'l" => $NODE => $BINDS;
986
987 $WARN->(8, "node listens on [@$BINDS].");
988
989 # connect to all seednodes
990 set_seeds map $_->recv, map _resolve $_, @$seeds;
991
992 master_search;
993
994 if ($NODE eq "atha") {;#d#
995 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
996 }
997
998 for (@{ $CONFIG->{services} }) {
999 if (ref) {
1000 my ($func, @args) = @$_;
1001 (load_func $func)->(@args);
1002 } elsif (s/::$//) {
1003 eval "require $_";
1004 die $@ if $@;
1005 } else {
1006 (load_func $_)->();
1007 }
1008 }
1009 }
1010
1011 =back
1012
1013 =head1 SEE ALSO
1014
1015 L<AnyEvent::MP>.
1016
1017 =head1 AUTHOR
1018
1019 Marc Lehmann <schmorp@schmorp.de>
1020 http://home.schmorp.de/
1021
1022 =cut
1023
1024 1
1025