ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.87
Committed: Sun Mar 4 19:45:03 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.86: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29
30 use AnyEvent ();
31 use Guard ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT_OK = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 );
41
42 our @EXPORT = qw(
43 add_node load_func snd_to_func snd_on eval_on
44
45 NODE $NODE node_of snd kil port_is_local
46 configure
47 up_nodes mon_nodes node_is_up
48 db_set db_del db_reg
49 db_mon db_family db_keys db_values
50 );
51
52 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
53
54 This value is called with an error or warning message, when e.g. a
55 connection could not be created, authorisation failed and so on.
56
57 It I<must not> block or send messages -queue it and use an idle watcher if
58 you need to do any of these things.
59
60 C<$level> should be C<0> for messages to be logged always, C<1> for
61 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62 node connectivity and services, C<8> for debugging messages and C<9> for
63 tracing messages.
64
65 The default simply logs the message to STDERR.
66
67 =item @AnyEvent::MP::Kernel::WARN
68
69 All code references in this array are called for every log message, from
70 the default C<$WARN> handler. This is an easy way to tie into the log
71 messages without disturbing others.
72
73 =cut
74
75 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
76 our @WARN;
77 our $WARN = sub {
78 &$_ for @WARN;
79
80 return if $WARNLEVEL < $_[0];
81
82 my ($level, $msg) = @_;
83
84 $msg =~ s/\n$//;
85
86 printf STDERR "%s <%d> %s\n",
87 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
88 $level,
89 $msg;
90 };
91
92 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
93
94 The maximum level at which warning messages will be printed to STDERR by
95 the default warn handler.
96
97 =cut
98
99 sub load_func($) {
100 my $func = $_[0];
101
102 unless (defined &$func) {
103 my $pkg = $func;
104 do {
105 $pkg =~ s/::[^:]+$//
106 or return sub { die "unable to resolve function '$func'" };
107
108 local $@;
109 unless (eval "require $pkg; 1") {
110 my $error = $@;
111 $error =~ /^Can't locate .*.pm in \@INC \(/
112 or return sub { die $error };
113 }
114 } until defined &$func;
115 }
116
117 \&$func
118 }
119
120 my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z');
121
122 sub nonce($) {
123 join "", map chr rand 256, 1 .. $_[0]
124 }
125
126 sub nonce62($) {
127 join "", map $alnum[rand 62], 1 .. $_[0]
128 }
129
130 sub gen_uniq {
131 my $now = AE::now;
132 (join "",
133 map $alnum[$_],
134 $$ / 62 % 62,
135 $$ % 62,
136 (int $now ) % 62,
137 (int $now * 100) % 62,
138 (int $now * 10000) % 62,
139 ) . nonce62 4;
140 }
141
142 our $CONFIG; # this node's configuration
143 our $SECURE = sub { 1 };
144
145 our $RUNIQ; # remote uniq value
146 our $UNIQ; # per-process/node unique cookie
147 our $NODE;
148 our $ID = "a";
149
150 our %NODE; # node id to transport mapping, or "undef", for local node
151 our (%PORT, %PORT_DATA); # local ports
152
153 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
154 our %LMON; # monitored _local_ ports
155
156 our $GLOBAL; # true if node is a global ("directory") node
157 our %BINDS;
158 our $BINDS; # our listeners, as arrayref
159
160 our $SRCNODE; # holds the sending node _object_ during _inject
161
162 sub _init_names {
163 # ~54 bits, for local port names, lowercase $ID appended
164 $UNIQ = gen_uniq;
165
166 # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes
167 $RUNIQ = nonce62 10;
168 $RUNIQ =~ s/(.)$/\U$1/;
169
170 $NODE = "anon/$RUNIQ";
171 }
172
173 _init_names;
174
175 sub NODE() {
176 $NODE
177 }
178
179 sub node_of($) {
180 my ($node, undef) = split /#/, $_[0], 2;
181
182 $node
183 }
184
185 BEGIN {
186 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
187 ? sub () { 1 }
188 : sub () { 0 };
189 }
190
191 our $DELAY_TIMER;
192 our @DELAY_QUEUE;
193
194 sub _delay_run {
195 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
196 }
197
198 sub delay($) {
199 push @DELAY_QUEUE, shift;
200 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
201 }
202
203 sub _inject {
204 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
205 &{ $PORT{+shift} or return };
206 }
207
208 # this function adds a node-ref, so you can send stuff to it
209 # it is basically the central routing component.
210 sub add_node {
211 my ($node) = @_;
212
213 $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node
214 }
215
216 sub snd(@) {
217 my ($nodeid, $portid) = split /#/, shift, 2;
218
219 warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d#
220
221 defined $nodeid #d#UGLY
222 or Carp::croak "'undef' is not a valid node ID/port ID";
223
224 ($NODE{$nodeid} || add_node $nodeid)
225 ->{send} (["$portid", @_]);
226 }
227
228 =item $is_local = port_is_local $port
229
230 Returns true iff the port is a local port.
231
232 =cut
233
234 sub port_is_local($) {
235 my ($nodeid, undef) = split /#/, $_[0], 2;
236
237 $NODE{$nodeid} == $NODE{""}
238 }
239
240 =item snd_to_func $node, $func, @args
241
242 Expects a node ID and a name of a function. Asynchronously tries to call
243 this function with the given arguments on that node.
244
245 This function can be used to implement C<spawn>-like interfaces.
246
247 =cut
248
249 sub snd_to_func($$;@) {
250 my $nodeid = shift;
251
252 # on $NODE, we artificially delay... (for spawn)
253 # this is very ugly - maybe we should simply delay ALL messages,
254 # to avoid deep recursion issues. but that's so... slow...
255 $AnyEvent::MP::Node::Self::DELAY = 1
256 if $nodeid ne $NODE;
257
258 defined $nodeid #d#UGLY
259 or Carp::croak "'undef' is not a valid node ID/port ID";
260
261 ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]);
262 }
263
264 =item snd_on $node, @msg
265
266 Executes C<snd> with the given C<@msg> (which must include the destination
267 port) on the given node.
268
269 =cut
270
271 sub snd_on($@) {
272 my $node = shift;
273 snd $node, snd => @_;
274 }
275
276 =item eval_on $node, $string[, @reply]
277
278 Evaluates the given string as Perl expression on the given node. When
279 @reply is specified, then it is used to construct a reply message with
280 C<"$@"> and any results from the eval appended.
281
282 =cut
283
284 sub eval_on($$;@) {
285 my $node = shift;
286 snd $node, eval => @_;
287 }
288
289 sub kil(@) {
290 my ($nodeid, $portid) = split /#/, shift, 2;
291
292 length $portid
293 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
294
295 ($NODE{$nodeid} || add_node $nodeid)
296 ->kill ("$portid", @_);
297 }
298
299 #############################################################################
300 # node monitoring and info
301
302 =item node_is_known $nodeid
303
304 #TODO#
305 Returns true iff the given node is currently known to this node.
306
307 =cut
308
309 sub node_is_known($) {
310 exists $NODE{$_[0]}
311 }
312
313 =item node_is_up $nodeid
314
315 Returns true if the given node is "up", that is, the kernel thinks it has
316 a working connection to it.
317
318 If the node is known (to this local node) but not currently connected,
319 returns C<0>. If the node is not known, returns C<undef>.
320
321 =cut
322
323 sub node_is_up($) {
324 ($NODE{$_[0]} or return)->{transport}
325 ? 1 : 0
326 }
327
328 =item up_nodes
329
330 Return the node IDs of all nodes that are currently connected (excluding
331 the node itself).
332
333 =cut
334
335 sub up_nodes() {
336 map $_->{id}, grep $_->{transport}, values %NODE
337 }
338
339 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
340
341 Registers a callback that is called each time a node goes up (a connection
342 is established) or down (the connection is lost).
343
344 Node up messages can only be followed by node down messages for the same
345 node, and vice versa.
346
347 Note that monitoring a node is usually better done by monitoring its node
348 port. This function is mainly of interest to modules that are concerned
349 about the network topology and low-level connection handling.
350
351 Callbacks I<must not> block and I<should not> send any messages.
352
353 The function returns an optional guard which can be used to unregister
354 the monitoring callback again.
355
356 Example: make sure you call function C<newnode> for all nodes that are up
357 or go up (and down).
358
359 newnode $_, 1 for up_nodes;
360 mon_nodes \&newnode;
361
362 =cut
363
364 our %MON_NODES;
365
366 sub mon_nodes($) {
367 my ($cb) = @_;
368
369 $MON_NODES{$cb+0} = $cb;
370
371 defined wantarray && Guard::guard { delete $MON_NODES{$cb+0} }
372 }
373
374 sub _inject_nodeevent($$;@) {
375 my ($node, $up, @reason) = @_;
376
377 for my $cb (values %MON_NODES) {
378 eval { $cb->($node->{id}, $up, @reason); 1 }
379 or $WARN->(1, $@);
380 }
381
382 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
383 }
384
385 #############################################################################
386 # self node code
387
388 sub _kill {
389 my $port = shift;
390
391 delete $PORT{$port}
392 or return; # killing nonexistent ports is O.K.
393 delete $PORT_DATA{$port};
394
395 my $mon = delete $LMON{$port}
396 or !@_
397 or $WARN->(2, "unmonitored local port $port died with reason: @_");
398
399 $_->(@_) for values %$mon;
400 }
401
402 sub _monitor {
403 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
404 unless exists $PORT{$_[1]};
405
406 $LMON{$_[1]}{$_[2]+0} = $_[2];
407 }
408
409 sub _unmonitor {
410 delete $LMON{$_[1]}{$_[2]+0}
411 if exists $LMON{$_[1]};
412 }
413
414 sub _secure_check {
415 $SECURE->($SRCNODE->{id})
416 or die "remote execution attempt by insecure node\n";
417 }
418
419 our %NODE_REQ = (
420 # internal services
421
422 # monitoring
423 mon0 => sub { # stop monitoring a port for another node
424 my $portid = shift;
425 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
426 },
427 mon1 => sub { # start monitoring a port for another node
428 my $portid = shift;
429 Scalar::Util::weaken (my $node = $SRCNODE);
430 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
431 delete $node->{rmon}{$portid};
432 $node->send (["", kil0 => $portid, @_])
433 if $node && $node->{transport};
434 };
435 },
436 # another node has killed a monitored port
437 kil0 => sub {
438 my $cbs = delete $SRCNODE->{lmon}{+shift}
439 or return;
440
441 $_->(@_) for @$cbs;
442 },
443
444 # "public" services - not actually public
445
446 # another node wants to kill a local port
447 kil => \&_kill,
448
449 # relay message to another node / generic echo
450 snd => \&snd,
451 snd_multiple => sub {
452 snd @$_ for @_
453 },
454
455 # random utilities
456 eval => sub {
457 &_secure_check;
458 my @res = do { package main; eval shift };
459 snd @_, "$@", @res if @_;
460 },
461 time => sub {
462 snd @_, AE::now;
463 },
464 devnull => sub {
465 #
466 },
467 "" => sub {
468 # empty messages are keepalives or similar devnull-applications
469 },
470 );
471
472 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
473 $PORT{""} = sub {
474 my $tag = shift;
475 eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } };
476 $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@;
477 };
478
479 our $NPROTO = 1;
480
481 # tell everybody who connects our nproto
482 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
483 $_[0]{local_greeting}{nproto} = $NPROTO;
484 };
485
486 #############################################################################
487 # seed management, try to keep connections to all seeds at all times
488
489 our %SEED_NODE; # seed ID => node ID|undef
490 our %NODE_SEED; # map node ID to seed ID
491 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
492 our $SEED_WATCHER;
493 our $SEED_RETRY;
494
495 sub seed_connect {
496 my ($seed) = @_;
497
498 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
499 or Carp::croak "$seed: unparsable seed address";
500
501 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
502
503 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect
504 $host, $port,
505 on_greeted => sub {
506 # called after receiving remote greeting, learn remote node name
507
508 # we rely on untrusted data here (the remote node name) this is
509 # hopefully ok, as this can at most be used for DOSing, which is easy
510 # when you can do MITM anyway.
511
512 # if we connect to ourselves, nuke this seed, but make sure we act like a seed
513 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
514 require AnyEvent::MP::Global; # every seed becomes a global node currently
515 delete $SEED_NODE{$seed};
516 delete $NODE_SEED{$seed};
517 } else {
518 $SEED_NODE{$seed} = $_[0]{remote_node};
519 $NODE_SEED{$_[0]{remote_node}} = $seed;
520 }
521 },
522 on_destroy => sub {
523 delete $SEED_CONNECT{$seed};
524 },
525 sub {
526 $SEED_CONNECT{$seed} = 1;
527 }
528 ;
529 }
530
531 sub seed_all {
532 my @seeds = grep {
533 !exists $SEED_CONNECT{$_}
534 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
535 } keys %SEED_NODE;
536
537 if (@seeds) {
538 # start connection attempt for every seed we are not connected to yet
539 seed_connect $_
540 for @seeds;
541
542 $SEED_RETRY = $SEED_RETRY * 2 + rand;
543 $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}
544 if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
545
546 $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all;
547
548 } else {
549 # all seeds connected or connecting, no need to restart timer
550 undef $SEED_WATCHER;
551 }
552 }
553
554 sub seed_again {
555 $SEED_RETRY = 1;
556 $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all;
557 }
558
559 # sets new seed list, starts connecting
560 sub set_seeds(@) {
561 %SEED_NODE = ();
562 %NODE_SEED = ();
563 %SEED_CONNECT = ();
564
565 @SEED_NODE{@_} = ();
566
567 seed_again;#d#
568 seed_all;
569 }
570
571 mon_nodes sub {
572 # if we lost the connection to a seed node, make sure we are seeding
573 seed_again
574 if !$_[1] && exists $NODE_SEED{$_[0]};
575 };
576
577 #############################################################################
578 # talk with/to global nodes
579
580 # protocol messages:
581 #
582 # sent by all slave nodes (slave to master)
583 # g_slave database - make other global node master of the sender
584 #
585 # sent by any node to global nodes
586 # g_set database - set whole database
587 # g_add family key val - add/replace key to database
588 # g_del family key - delete key from database
589 # g_get family key reply... - send reply with data
590 #
591 # send by global nodes
592 # g_global - node became global, similar to global=1 greeting
593 #
594 # database families
595 # "'l" -> node -> listeners
596 # "'g" -> node -> undef
597 # ...
598 #
599
600 # used on all nodes:
601 our $MASTER; # the global node we bind ourselves to, unless we are global ourselves
602 our $MASTER_MON;
603 our %LOCAL_DB; # this node database
604
605 our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes
606
607 our $GPROTO = 1;
608
609 # tell everybody who connects our nproto
610 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
611 $_[0]{local_greeting}{gproto} = $GPROTO;
612 };
613
614 #############################################################################
615 # master selection
616
617 # master requests
618 our %GLOBAL_REQ; # $id => \@req
619
620 sub global_req_add {
621 my ($id, $req) = @_;
622
623 return if exists $GLOBAL_REQ{$id};
624
625 $GLOBAL_REQ{$id} = $req;
626
627 snd $MASTER, @$req
628 if $MASTER;
629 }
630
631 sub global_req_del {
632 delete $GLOBAL_REQ{$_[0]};
633 }
634
635 sub g_find {
636 global_req_add "g_find $_[0]", [g_find => $_[0]];
637 }
638
639 # reply for g_find started in Node.pm
640 $NODE_REQ{g_found} = sub {
641 global_req_del "g_find $_[0]";
642
643 my $node = $NODE{$_[0]} or return;
644
645 $node->connect_to ($_[1]);
646 };
647
648 sub master_set {
649 $MASTER = $_[0];
650
651 snd $MASTER, g_slave => \%LOCAL_DB;
652
653 # (re-)send queued requests
654 snd $MASTER, @$_
655 for values %GLOBAL_REQ;
656 }
657
658 sub master_search {
659 #TODO: should also look for other global nodes, but we don't know them #d#
660 for (keys %NODE_SEED) {
661 if (node_is_up $_) {
662 master_set $_;
663 return;
664 }
665 }
666
667 $MASTER_MON = mon_nodes sub {
668 return unless $_[1]; # we are only interested in node-ups
669 return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes
670
671 master_set $_[0];
672
673 $MASTER_MON = mon_nodes sub {
674 if ($_[0] eq $MASTER && !$_[1]) {
675 undef $MASTER;
676 master_search ();
677 }
678 };
679 };
680 }
681
682 # other node wants to make us the master
683 $NODE_REQ{g_slave} = sub {
684 my ($db) = @_;
685
686 # load global module and redo the request
687 require AnyEvent::MP::Global;
688 &{ $NODE_REQ{g_slave} }
689 };
690
691 #############################################################################
692 # local database operations
693
694 # local database management
695 sub db_set($$;$) {
696 $LOCAL_DB{$_[0]}{$_[1]} = $_[2];
697 snd $MASTER, g_add => $_[0] => $_[1] => $_[2]
698 if defined $MASTER;
699 }
700
701 sub db_del($$) {
702 delete $LOCAL_DB{$_[0]}{$_[1]};
703 snd $MASTER, g_del => $_[0] => $_[1]
704 if defined $MASTER;
705 }
706
707 sub db_reg($$;$) {
708 my ($family, $key) = @_;
709 &db_set;
710 Guard::guard { db_del $family => $key }
711 }
712
713 sub db_keys($$$) {
714 #d#
715 }
716
717 #d# db_values
718 #d# db_family
719 #d# db_key
720
721 our %LOCAL_MON; # f, reply
722 our %MON_DB; # f, k, value
723
724 sub db_mon($@) {
725 my ($family, $cb) = @_;
726
727 if (my $db = $MON_DB{$family}) {
728 # if we already monitor this thingy, generate
729 # create events for all of them
730 $cb->($db, [keys %$db]);
731 } else {
732 # new monitor, request chg1 from upstream
733 global_req_add "mon1 $family" => [g_mon1 => $family];
734 $MON_DB{$family} = {};
735 }
736
737 $LOCAL_MON{$family}{$cb+0} = $cb;
738
739 Guard::guard {
740 my $mon = $LOCAL_MON{$family};
741 delete $mon->{$cb+0};
742
743 unless (%$mon) {
744 global_req_del "mon1 $family";
745
746 # no global_req, because we don't care if we are not connected
747 snd $MASTER, g_mon0 => $family
748 if $MASTER;
749
750 delete $LOCAL_MON{$family};
751 delete $MON_DB{$family};
752 }
753 }
754 }
755
756 # full update
757 $NODE_REQ{g_chg1} = sub {
758 my ($f, $ndb) = @_;
759
760 my $db = $MON_DB{$f};
761 my @k;
762
763 # add or replace keys
764 while (my ($k, $v) = each %$ndb) {
765 $db->{$k} = $v;
766 push @k, $k;
767 }
768
769 # delete keys that are no longer present
770 for (grep !exists $ndb->{$_}, keys %$db) {
771 delete $db->{$_};
772 push @k, $_;
773 }
774
775 $_->($db, \@k)
776 for values %{ $LOCAL_MON{$_[0]} };
777 };
778
779 # incremental update
780 $NODE_REQ{g_chg2} = sub {
781 my $db = $MON_DB{$_[0]};
782
783 @_ >= 3
784 ? $db->{$_[1]} = $_[2]
785 : delete $db->{$_[1]};
786
787 $_->($db, [$_[1]])
788 for values %{ $LOCAL_MON{$_[0]} };
789 };
790
791 #############################################################################
792 # configure
793
794 sub nodename {
795 require POSIX;
796 (POSIX::uname ())[1]
797 }
798
799 sub _resolve($) {
800 my ($nodeid) = @_;
801
802 my $cv = AE::cv;
803 my @res;
804
805 $cv->begin (sub {
806 my %seen;
807 my @refs;
808 for (sort { $a->[0] <=> $b->[0] } @res) {
809 push @refs, $_->[1] unless $seen{$_->[1]}++
810 }
811 shift->send (@refs);
812 });
813
814 my $idx;
815 for my $t (split /,/, $nodeid) {
816 my $pri = ++$idx;
817
818 $t = length $t ? nodename . ":$t" : nodename
819 if $t =~ /^\d*$/;
820
821 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
822 or Carp::croak "$t: unparsable transport descriptor";
823
824 $port = "0" if $port eq "*";
825
826 if ($host eq "*") {
827 $cv->begin;
828 # use fork_call, as Net::Interface is big, and we need it rarely.
829 require AnyEvent::Util;
830 AnyEvent::Util::fork_call (
831 sub {
832 my @addr;
833
834 require Net::Interface;
835
836 for my $if (Net::Interface->interfaces) {
837 # we statically lower-prioritise ipv6 here, TODO :()
838 for $_ ($if->address (Net::Interface::AF_INET ())) {
839 next if /^\x7f/; # skip localhost etc.
840 push @addr, $_;
841 }
842 for ($if->address (Net::Interface::AF_INET6 ())) {
843 #next if $if->scope ($_) <= 2;
844 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
845 push @addr, $_;
846 }
847
848 }
849 @addr
850 }, sub {
851 for my $ip (@_) {
852 push @res, [
853 $pri += 1e-5,
854 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
855 ];
856 }
857 $cv->end;
858 }
859 );
860 } else {
861 $cv->begin;
862 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
863 for (@_) {
864 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
865 push @res, [
866 $pri += 1e-5,
867 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
868 ];
869 }
870 $cv->end;
871 };
872 }
873 }
874
875 $cv->end;
876
877 $cv
878 }
879
880 sub configure(@) {
881 unshift @_, "profile" if @_ & 1;
882 my (%kv) = @_;
883
884 delete $NODE{$NODE}; # we do not support doing stuff before configure
885 _init_names;
886
887 my $profile = delete $kv{profile};
888
889 $profile = nodename
890 unless defined $profile;
891
892 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
893
894 if (exists $CONFIG->{secure}) {
895 my $pass = !$CONFIG->{secure};
896 $SECURE = sub { $pass };
897 }
898
899 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/";
900
901 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
902
903 $NODE = $node;
904
905 $NODE =~ s/%n/nodename/ge;
906
907 if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) {
908 # nodes with randomised node names do not need randomised port names
909 $UNIQ = "";
910 }
911
912 $NODE{$NODE} = $NODE{""};
913 $NODE{$NODE}{id} = $NODE;
914
915 my $seeds = $CONFIG->{seeds};
916 my $binds = $CONFIG->{binds};
917
918 $binds ||= ["*"];
919
920 $WARN->(8, "node $NODE starting up.");
921
922 $BINDS = [];
923 %BINDS = ();
924
925 for (map _resolve $_, @$binds) {
926 for my $bind ($_->recv) {
927 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
928 or Carp::croak "$bind: unparsable local bind address";
929
930 my $listener = AnyEvent::MP::Transport::mp_server
931 $host,
932 $port,
933 prepare => sub {
934 my (undef, $host, $port) = @_;
935 $bind = AnyEvent::Socket::format_hostport $host, $port;
936 0
937 },
938 ;
939 $BINDS{$bind} = $listener;
940 push @$BINDS, $bind;
941 }
942 }
943
944 db_set "'l" => $NODE => $BINDS;
945
946 $WARN->(8, "node listens on [@$BINDS].");
947
948 # connect to all seednodes
949 set_seeds map $_->recv, map _resolve $_, @$seeds;
950
951 master_search;
952
953 if ($NODE eq "atha") {;#d#
954 my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d#
955 }
956
957 for (@{ $CONFIG->{services} }) {
958 if (ref) {
959 my ($func, @args) = @$_;
960 (load_func $func)->(@args);
961 } elsif (s/::$//) {
962 eval "require $_";
963 die $@ if $@;
964 } else {
965 (load_func $_)->();
966 }
967 }
968 }
969
970 =back
971
972 =head1 SEE ALSO
973
974 L<AnyEvent::MP>.
975
976 =head1 AUTHOR
977
978 Marc Lehmann <schmorp@schmorp.de>
979 http://home.schmorp.de/
980
981 =cut
982
983 1
984