ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP/Kernel.pm
Revision: 1.70
Committed: Sun Feb 26 11:12:54 2012 UTC (12 years, 4 months ago) by root
Branch: MAIN
Changes since 1.69: +2 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 add_node load_func snd_to_func snd_on eval_on
41
42 NODE $NODE node_of snd kil port_is_local
43 configure
44 up_nodes mon_nodes node_is_up
45 );
46
47 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
48
49 This value is called with an error or warning message, when e.g. a
50 connection could not be created, authorisation failed and so on.
51
52 It I<must not> block or send messages -queue it and use an idle watcher if
53 you need to do any of these things.
54
55 C<$level> should be C<0> for messages to be logged always, C<1> for
56 unexpected messages and errors, C<2> for warnings, C<7> for messages about
57 node connectivity and services, C<8> for debugging messages and C<9> for
58 tracing messages.
59
60 The default simply logs the message to STDERR.
61
62 =item @AnyEvent::MP::Kernel::WARN
63
64 All code references in this array are called for every log message, from
65 the default C<$WARN> handler. This is an easy way to tie into the log
66 messages without disturbing others.
67
68 =cut
69
70 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
71 our @WARN;
72 our $WARN = sub {
73 &$_ for @WARN;
74
75 return if $WARNLEVEL < $_[0];
76
77 my ($level, $msg) = @_;
78
79 $msg =~ s/\n$//;
80
81 printf STDERR "%s <%d> %s\n",
82 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
83 $level,
84 $msg;
85 };
86
87 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
88
89 The maximum level at which warning messages will be printed to STDERR by
90 the default warn handler.
91
92 =cut
93
94 sub load_func($) {
95 my $func = $_[0];
96
97 unless (defined &$func) {
98 my $pkg = $func;
99 do {
100 $pkg =~ s/::[^:]+$//
101 or return sub { die "unable to resolve function '$func'" };
102
103 local $@;
104 unless (eval "require $pkg; 1") {
105 my $error = $@;
106 $error =~ /^Can't locate .*.pm in \@INC \(/
107 or return sub { die $error };
108 }
109 } until defined &$func;
110 }
111
112 \&$func
113 }
114
115 sub nonce($) {
116 my $nonce;
117
118 if (open my $fh, "</dev/urandom") {
119 sysread $fh, $nonce, $_[0];
120 } else {
121 # shit...
122 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
123 }
124
125 $nonce
126 }
127
128 sub alnumbits($) {
129 my $data = $_[0];
130
131 if (eval "use Math::GMP 2.05; 1") {
132 $data = Math::GMP::get_str_gmp (
133 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
134 62
135 );
136 } else {
137 $data = MIME::Base64::encode_base64 $data, "";
138 $data =~ s/=//;
139 $data =~ s/x/x0/g;
140 $data =~ s/\//x1/g;
141 $data =~ s/\+/x2/g;
142 }
143
144 $data
145 }
146
147 sub gen_uniq {
148 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
149 }
150
151 our $CONFIG; # this node's configuration
152
153 our $RUNIQ; # remote uniq value
154 our $UNIQ; # per-process/node unique cookie
155 our $NODE;
156 our $ID = "a";
157
158 our %NODE; # node id to transport mapping, or "undef", for local node
159 our (%PORT, %PORT_DATA); # local ports
160
161 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
162 our %LMON; # monitored _local_ ports
163
164 our %LISTENER;
165 our $LISTENER; # our listeners, as arrayref
166
167 our $SRCNODE; # holds the sending node during _inject
168
169 sub _init_names {
170 $RUNIQ = alnumbits nonce 96/8;
171 $UNIQ = gen_uniq;
172 $NODE = "anon/$RUNIQ";
173 }
174
175 _init_names;
176
177 sub NODE() {
178 $NODE
179 }
180
181 sub node_of($) {
182 my ($node, undef) = split /#/, $_[0], 2;
183
184 $node
185 }
186
187 BEGIN {
188 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
189 ? sub () { 1 }
190 : sub () { 0 };
191 }
192
193 our $DELAY_TIMER;
194 our @DELAY_QUEUE;
195
196 sub _delay_run {
197 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
198 }
199
200 sub delay($) {
201 push @DELAY_QUEUE, shift;
202 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
203 }
204
205 sub _inject {
206 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
207 &{ $PORT{+shift} or return };
208 }
209
210 # this function adds a node-ref, so you can send stuff to it
211 # it is basically the central routing component.
212 sub add_node {
213 my ($node) = @_;
214
215 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
216 }
217
218 sub snd(@) {
219 my ($nodeid, $portid) = split /#/, shift, 2;
220
221 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
222
223 defined $nodeid #d#UGLY
224 or Carp::croak "'undef' is not a valid node ID/port ID";
225
226 ($NODE{$nodeid} || add_node $nodeid)
227 ->{send} (["$portid", @_]);
228 }
229
230 =item $is_local = port_is_local $port
231
232 Returns true iff the port is a local port.
233
234 =cut
235
236 sub port_is_local($) {
237 my ($nodeid, undef) = split /#/, $_[0], 2;
238
239 $NODE{$nodeid} == $NODE{""}
240 }
241
242 =item snd_to_func $node, $func, @args
243
244 Expects a node ID and a name of a function. Asynchronously tries to call
245 this function with the given arguments on that node.
246
247 This function can be used to implement C<spawn>-like interfaces.
248
249 =cut
250
251 sub snd_to_func($$;@) {
252 my $nodeid = shift;
253
254 # on $NODE, we artificially delay... (for spawn)
255 # this is very ugly - maybe we should simply delay ALL messages,
256 # to avoid deep recursion issues. but that's so... slow...
257 $AnyEvent::MP::Node::Self::DELAY = 1
258 if $nodeid ne $NODE;
259
260 defined $nodeid #d#UGLY
261 or Carp::croak "'undef' is not a valid node ID/port ID";
262
263 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
264 }
265
266 =item snd_on $node, @msg
267
268 Executes C<snd> with the given C<@msg> (which must include the destination
269 port) on the given node.
270
271 =cut
272
273 sub snd_on($@) {
274 my $node = shift;
275 snd $node, snd => @_;
276 }
277
278 =item eval_on $node, $string[, @reply]
279
280 Evaluates the given string as Perl expression on the given node. When
281 @reply is specified, then it is used to construct a reply message with
282 C<"$@"> and any results from the eval appended.
283
284 =cut
285
286 sub eval_on($$;@) {
287 my $node = shift;
288 snd $node, eval => @_;
289 }
290
291 sub kil(@) {
292 my ($nodeid, $portid) = split /#/, shift, 2;
293
294 length $portid
295 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
296
297 ($NODE{$nodeid} || add_node $nodeid)
298 ->kill ("$portid", @_);
299 }
300
301 #############################################################################
302 # node monitoring and info
303
304 =item node_is_known $nodeid
305
306 Returns true iff the given node is currently known to the system. The only
307 time a node is known but not up currently is when a connection request is
308 pending.
309
310 =cut
311
312 sub node_is_known($) {
313 exists $NODE{$_[0]}
314 }
315
316 =item node_is_up $nodeid
317
318 Returns true if the given node is "up", that is, the kernel thinks it has
319 a working connection to it.
320
321 If the node is known (to this local node) but not currently connected,
322 returns C<0>. If the node is not known, returns C<undef>.
323
324 =cut
325
326 sub node_is_up($) {
327 ($NODE{$_[0]} or return)->{transport}
328 ? 1 : 0
329 }
330
331 =item known_nodes
332
333 Returns the node IDs of all nodes currently known to this node, including
334 itself and nodes not currently connected.
335
336 =cut
337
338 sub known_nodes() {
339 map $_->{id}, values %NODE
340 }
341
342 =item up_nodes
343
344 Return the node IDs of all nodes that are currently connected (excluding
345 the node itself).
346
347 =cut
348
349 sub up_nodes() {
350 map $_->{id}, grep $_->{transport}, values %NODE
351 }
352
353 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
354
355 Registers a callback that is called each time a node goes up (a connection
356 is established) or down (the connection is lost).
357
358 Node up messages can only be followed by node down messages for the same
359 node, and vice versa.
360
361 Note that monitoring a node is usually better done by monitoring it's node
362 port. This function is mainly of interest to modules that are concerned
363 about the network topology and low-level connection handling.
364
365 Callbacks I<must not> block and I<should not> send any messages.
366
367 The function returns an optional guard which can be used to unregister
368 the monitoring callback again.
369
370 Example: make sure you call function C<newnode> for all nodes that are up
371 or go up (and down).
372
373 newnode $_, 1 for up_nodes;
374 mon_nodes \&newnode;
375
376 =cut
377
378 our %MON_NODES;
379
380 sub mon_nodes($) {
381 my ($cb) = @_;
382
383 $MON_NODES{$cb+0} = $cb;
384
385 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
386 }
387
388 sub _inject_nodeevent($$;@) {
389 my ($node, $up, @reason) = @_;
390
391 for my $cb (values %MON_NODES) {
392 eval { $cb->($node->{id}, $up, @reason); 1 }
393 or $WARN->(1, $@);
394 }
395
396 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
397 }
398
399 #############################################################################
400 # self node code
401
402 sub _kill {
403 my $port = shift;
404
405 delete $PORT{$port}
406 or return; # killing nonexistent ports is O.K.
407 delete $PORT_DATA{$port};
408
409 my $mon = delete $LMON{$port}
410 or !@_
411 or $WARN->(2, "unmonitored local port $port died with reason: @_");
412
413 $_->(@_) for values %$mon;
414 }
415
416 sub _monitor {
417 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
418 unless exists $PORT{$_[1]};
419
420 $LMON{$_[1]}{$_[2]+0} = $_[2];
421 }
422
423 sub _unmonitor {
424 delete $LMON{$_[1]}{$_[2]+0}
425 if exists $LMON{$_[1]};
426 }
427
428 our %node_req = (
429 # internal services
430
431 # monitoring
432 mon0 => sub { # stop monitoring a port for another node
433 my $portid = shift;
434 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
435 },
436 mon1 => sub { # start monitoring a port for another node
437 my $portid = shift;
438 Scalar::Util::weaken (my $node = $SRCNODE);
439 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
440 delete $node->{rmon}{$portid};
441 $node->send (["", kil0 => $portid, @_])
442 if $node && $node->{transport};
443 };
444 },
445 # another node has killed a monitored port
446 kil0 => sub {
447 my $cbs = delete $SRCNODE->{lmon}{+shift}
448 or return;
449
450 $_->(@_) for @$cbs;
451 },
452
453 # "public" services - not actually public
454
455 # another node wants to kill a local port
456 kil => \&_kill,
457
458 # relay message to another node / generic echo
459 snd => \&snd,
460 snd_multiple => sub {
461 snd @$_ for @_
462 },
463
464 # informational
465 info => sub {
466 snd @_, $NODE;
467 },
468 known_nodes => sub {
469 snd @_, known_nodes;
470 },
471 up_nodes => sub {
472 snd @_, up_nodes;
473 },
474
475 # random utilities
476 eval => sub {
477 my @res = do { package main; eval shift };
478 snd @_, "$@", @res if @_;
479 },
480 time => sub {
481 snd @_, AE::time;
482 },
483 devnull => sub {
484 #
485 },
486 "" => sub {
487 # empty messages are keepalives or similar devnull-applications
488 },
489 );
490
491 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
492 $PORT{""} = sub {
493 my $tag = shift;
494 eval { &{ $node_req{$tag} ||= load_func $tag } };
495 $WARN->(2, "error processing node message: $@") if $@;
496 };
497
498 #############################################################################
499 # seed management, try to keep connections to all seeds
500
501 our %SEED_NODE; # seed ID => node ID|undef
502 our %NODE_SEED; # map node ID to seed ID
503 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
504 our $SEED_WATCHER;
505
506 # called before sending the greeting, grabs address we connect to
507 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
508 defined (my $seed = $_[0]{seed})
509 or return;
510
511 $SEED_CONNECT{$seed} = 2;
512 };
513
514 # called after receiving remote greeting, grabs remote node name
515 push @AnyEvent::MP::Transport::HOOK_GREETING, sub {
516 defined (my $seed = $_[0]{seed})
517 or return;
518
519 # we rely on untrusted data here (the remote node name) this is
520 # hopefully ok, as this can at most be used for DOSing, which is easy
521 # when you can do MITM anyway.
522
523 # if we connect to ourselves, nuke this seed
524 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
525 require AnyEvent::MP::Global; # every seed becomes a global node currently
526 delete $SEED_NODE{$_[0]{seed}};
527 delete $NODE_SEED{$_[0]{seed}};
528 } else {
529 $SEED_NODE{$seed} = $_[0]{remote_node};
530 $NODE_SEED{$_[0]{remote_node}} = $seed;
531 }
532 };
533
534 ## called when connection is up, same as above, but now verified
535 #push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
536 # defined (my $seed = $_[0]{seed})
537 # or return;
538 # AE::log 5, "connected($seed)\n";#d#
539 #
540 # $SEED_NODE{$seed} = $_[0]{remote_node};
541 # $NODE_SEED{$_[0]{remote_node}} = $seed;
542 #};
543
544 # called when connections get destroyed, update our data structures
545 # and check for self-connects
546 push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
547 # if we lost the connection to a seed node, make sure we start seeding
548 seed_again ()#d#
549 if exists $NODE_SEED{ $_[0]{remote_node} };
550
551 defined (my $seed = $_[0]{seed})
552 or return;
553
554 delete $SEED_CONNECT{$seed};
555 };
556
557 sub seed_connect {
558 my ($seed) = @_;
559
560 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
561 or Carp::croak "$seed: unparsable seed address";
562
563 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
564
565 # ughhh
566 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
567 seed => $seed,
568 sub {
569 $SEED_CONNECT{$seed} = 1;
570 },
571 ;
572 }
573
574 sub seed_all {
575 # my $next = List::Util::max 1,
576 # $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
577 # * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
578 # - rand;
579
580 my @seeds = grep {
581 !exists $SEED_CONNECT{$_}
582 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
583 } keys %SEED_NODE;
584
585 if (@seeds) {
586 # start connection attempt for every seed we are not connected to yet
587 seed_connect $_
588 for @seeds;
589 } else {
590 # all seeds connected or connecting
591 undef $SEED_WATCHER;
592 }
593 }
594
595 sub seed_again {
596 $SEED_WATCHER ||= AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, \&seed_all;
597 }
598
599 # sets new seed list, starts connecting
600 sub set_seeds(@) {
601 %SEED_NODE = ();
602 @SEED_NODE{@_} = ();
603
604 seed_again;
605 }
606
607 #############################################################################
608 # configure
609
610 sub _nodename {
611 require POSIX;
612 (POSIX::uname ())[1]
613 }
614
615 sub _resolve($) {
616 my ($nodeid) = @_;
617
618 my $cv = AE::cv;
619 my @res;
620
621 $cv->begin (sub {
622 my %seen;
623 my @refs;
624 for (sort { $a->[0] <=> $b->[0] } @res) {
625 push @refs, $_->[1] unless $seen{$_->[1]}++
626 }
627 shift->send (@refs);
628 });
629
630 my $idx;
631 for my $t (split /,/, $nodeid) {
632 my $pri = ++$idx;
633
634 $t = length $t ? _nodename . ":$t" : _nodename
635 if $t =~ /^\d*$/;
636
637 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
638 or Carp::croak "$t: unparsable transport descriptor";
639
640 $port = "0" if $port eq "*";
641
642 if ($host eq "*") {
643 $cv->begin;
644 # use fork_call, as Net::Interface is big, and we need it rarely.
645 require AnyEvent::Util;
646 AnyEvent::Util::fork_call (
647 sub {
648 my @addr;
649
650 require Net::Interface;
651
652 for my $if (Net::Interface->interfaces) {
653 # we statically lower-prioritise ipv6 here, TODO :()
654 for $_ ($if->address (Net::Interface::AF_INET ())) {
655 next if /^\x7f/; # skip localhost etc.
656 push @addr, $_;
657 }
658 for ($if->address (Net::Interface::AF_INET6 ())) {
659 #next if $if->scope ($_) <= 2;
660 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
661 push @addr, $_;
662 }
663
664 }
665 @addr
666 }, sub {
667 for my $ip (@_) {
668 push @res, [
669 $pri += 1e-5,
670 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
671 ];
672 }
673 $cv->end;
674 }
675 );
676 } else {
677 $cv->begin;
678 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
679 for (@_) {
680 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
681 push @res, [
682 $pri += 1e-5,
683 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
684 ];
685 }
686 $cv->end;
687 };
688 }
689 }
690
691 $cv->end;
692
693 $cv
694 }
695
696 sub configure(@) {
697 unshift @_, "profile" if @_ & 1;
698 my (%kv) = @_;
699
700 delete $NODE{$NODE}; # we do not support doing stuff before configure
701 _init_names;
702
703 my $profile = delete $kv{profile};
704
705 $profile = _nodename
706 unless defined $profile;
707
708 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
709
710 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
711
712 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
713
714 $NODE = $node
715 unless $node eq "anon/";
716
717 $NODE{$NODE} = $NODE{""};
718 $NODE{$NODE}{id} = $NODE;
719
720 my $seeds = $CONFIG->{seeds};
721 my $binds = $CONFIG->{binds};
722
723 $binds ||= ["*"];
724
725 $WARN->(8, "node $NODE starting up.");
726
727 $LISTENER = [];
728 %LISTENER = ();
729
730 for (map _resolve $_, @$binds) {
731 for my $bind ($_->recv) {
732 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
733 or Carp::croak "$bind: unparsable local bind address";
734
735 my $listener = AnyEvent::MP::Transport::mp_server
736 $host,
737 $port,
738 prepare => sub {
739 my (undef, $host, $port) = @_;
740 $bind = AnyEvent::Socket::format_hostport $host, $port;
741 0
742 },
743 ;
744 $LISTENER{$bind} = $listener;
745 push @$LISTENER, $bind;
746 }
747 }
748
749 $WARN->(8, "node listens on [@$LISTENER].");
750
751 # the global service is mandatory currently
752 require AnyEvent::MP::Global;
753
754 # connect to all seednodes
755 set_seeds map $_->recv, map _resolve $_, @$seeds;
756
757 for (@{ $CONFIG->{services} }) {
758 if (ref) {
759 my ($func, @args) = @$_;
760 (load_func $func)->(@args);
761 } elsif (s/::$//) {
762 eval "require $_";
763 die $@ if $@;
764 } else {
765 (load_func $_)->();
766 }
767 }
768 }
769
770 =back
771
772 =head1 SEE ALSO
773
774 L<AnyEvent::MP>.
775
776 =head1 AUTHOR
777
778 Marc Lehmann <schmorp@schmorp.de>
779 http://home.schmorp.de/
780
781 =cut
782
783 1
784