ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.69
Committed: Sun Feb 26 10:29:59 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.68: +276 -165 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 add_node load_func snd_to_func snd_on eval_on
41
42 NODE $NODE node_of snd kil port_is_local
43 configure
44 up_nodes mon_nodes node_is_up
45 );
46
47 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
48
49 This value is called with an error or warning message, when e.g. a
50 connection could not be created, authorisation failed and so on.
51
52 It I<must not> block or send messages -queue it and use an idle watcher if
53 you need to do any of these things.
54
55 C<$level> should be C<0> for messages to be logged always, C<1> for
56 unexpected messages and errors, C<2> for warnings, C<7> for messages about
57 node connectivity and services, C<8> for debugging messages and C<9> for
58 tracing messages.
59
60 The default simply logs the message to STDERR.
61
62 =item @AnyEvent::MP::Kernel::WARN
63
64 All code references in this array are called for every log message, from
65 the default C<$WARN> handler. This is an easy way to tie into the log
66 messages without disturbing others.
67
68 =cut
69
70 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
71 our @WARN;
72 our $WARN = sub {
73 &$_ for @WARN;
74
75 return if $WARNLEVEL < $_[0];
76
77 my ($level, $msg) = @_;
78
79 $msg =~ s/\n$//;
80
81 printf STDERR "%s <%d> %s\n",
82 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
83 $level,
84 $msg;
85 };
86
87 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
88
89 The maximum level at which warning messages will be printed to STDERR by
90 the default warn handler.
91
92 =cut
93
94 sub load_func($) {
95 my $func = $_[0];
96
97 unless (defined &$func) {
98 my $pkg = $func;
99 do {
100 $pkg =~ s/::[^:]+$//
101 or return sub { die "unable to resolve function '$func'" };
102
103 local $@;
104 unless (eval "require $pkg; 1") {
105 my $error = $@;
106 $error =~ /^Can't locate .*.pm in \@INC \(/
107 or return sub { die $error };
108 }
109 } until defined &$func;
110 }
111
112 \&$func
113 }
114
115 sub nonce($) {
116 my $nonce;
117
118 if (open my $fh, "</dev/urandom") {
119 sysread $fh, $nonce, $_[0];
120 } else {
121 # shit...
122 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
123 }
124
125 $nonce
126 }
127
128 sub alnumbits($) {
129 my $data = $_[0];
130
131 if (eval "use Math::GMP 2.05; 1") {
132 $data = Math::GMP::get_str_gmp (
133 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
134 62
135 );
136 } else {
137 $data = MIME::Base64::encode_base64 $data, "";
138 $data =~ s/=//;
139 $data =~ s/x/x0/g;
140 $data =~ s/\//x1/g;
141 $data =~ s/\+/x2/g;
142 }
143
144 $data
145 }
146
147 sub gen_uniq {
148 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
149 }
150
151 our $CONFIG; # this node's configuration
152
153 our $RUNIQ; # remote uniq value
154 our $UNIQ; # per-process/node unique cookie
155 our $NODE;
156 our $ID = "a";
157
158 our %NODE; # node id to transport mapping, or "undef", for local node
159 our (%PORT, %PORT_DATA); # local ports
160
161 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
162 our %LMON; # monitored _local_ ports
163
164 our %LISTENER;
165 our $LISTENER; # our listeners, as arrayref
166
167 our $SRCNODE; # holds the sending node during _inject
168
169 sub _init_names {
170 $RUNIQ = alnumbits nonce 96/8;
171 $UNIQ = gen_uniq;
172 $NODE = "anon/$RUNIQ";
173 }
174
175 _init_names;
176
177 sub NODE() {
178 $NODE
179 }
180
181 sub node_of($) {
182 my ($node, undef) = split /#/, $_[0], 2;
183
184 $node
185 }
186
187 BEGIN {
188 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
189 ? sub () { 1 }
190 : sub () { 0 };
191 }
192
193 our $DELAY_TIMER;
194 our @DELAY_QUEUE;
195
196 sub _delay_run {
197 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
198 }
199
200 sub delay($) {
201 push @DELAY_QUEUE, shift;
202 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
203 }
204
205 sub _inject {
206 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
207 &{ $PORT{+shift} or return };
208 }
209
210 # this function adds a node-ref, so you can send stuff to it
211 # it is basically the central routing component.
212 sub add_node {
213 my ($node) = @_;
214
215 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
216 }
217
218 sub snd(@) {
219 my ($nodeid, $portid) = split /#/, shift, 2;
220
221 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
222
223 defined $nodeid #d#UGLY
224 or Carp::croak "'undef' is not a valid node ID/port ID";
225
226 ($NODE{$nodeid} || add_node $nodeid)
227 ->{send} (["$portid", @_]);
228 }
229
230 =item $is_local = port_is_local $port
231
232 Returns true iff the port is a local port.
233
234 =cut
235
236 sub port_is_local($) {
237 my ($nodeid, undef) = split /#/, $_[0], 2;
238
239 $NODE{$nodeid} == $NODE{""}
240 }
241
242 =item snd_to_func $node, $func, @args
243
244 Expects a node ID and a name of a function. Asynchronously tries to call
245 this function with the given arguments on that node.
246
247 This function can be used to implement C<spawn>-like interfaces.
248
249 =cut
250
251 sub snd_to_func($$;@) {
252 my $nodeid = shift;
253
254 # on $NODE, we artificially delay... (for spawn)
255 # this is very ugly - maybe we should simply delay ALL messages,
256 # to avoid deep recursion issues. but that's so... slow...
257 $AnyEvent::MP::Node::Self::DELAY = 1
258 if $nodeid ne $NODE;
259
260 defined $nodeid #d#UGLY
261 or Carp::croak "'undef' is not a valid node ID/port ID";
262
263 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
264 }
265
266 =item snd_on $node, @msg
267
268 Executes C<snd> with the given C<@msg> (which must include the destination
269 port) on the given node.
270
271 =cut
272
273 sub snd_on($@) {
274 my $node = shift;
275 snd $node, snd => @_;
276 }
277
278 =item eval_on $node, $string[, @reply]
279
280 Evaluates the given string as Perl expression on the given node. When
281 @reply is specified, then it is used to construct a reply message with
282 C<"$@"> and any results from the eval appended.
283
284 =cut
285
286 sub eval_on($$;@) {
287 my $node = shift;
288 snd $node, eval => @_;
289 }
290
291 sub kil(@) {
292 my ($nodeid, $portid) = split /#/, shift, 2;
293
294 length $portid
295 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
296
297 ($NODE{$nodeid} || add_node $nodeid)
298 ->kill ("$portid", @_);
299 }
300
301 #############################################################################
302 # node monitoring and info
303
304 =item node_is_known $nodeid
305
306 Returns true iff the given node is currently known to the system. The only
307 time a node is known but not up currently is when a connection request is
308 pending.
309
310 =cut
311
312 sub node_is_known($) {
313 exists $NODE{$_[0]}
314 }
315
316 =item node_is_up $nodeid
317
318 Returns true if the given node is "up", that is, the kernel thinks it has
319 a working connection to it.
320
321 If the node is known (to this local node) but not currently connected,
322 returns C<0>. If the node is not known, returns C<undef>.
323
324 =cut
325
326 sub node_is_up($) {
327 ($NODE{$_[0]} or return)->{transport}
328 ? 1 : 0
329 }
330
331 =item known_nodes
332
333 Returns the node IDs of all nodes currently known to this node, including
334 itself and nodes not currently connected.
335
336 =cut
337
338 sub known_nodes() {
339 map $_->{id}, values %NODE
340 }
341
342 =item up_nodes
343
344 Return the node IDs of all nodes that are currently connected (excluding
345 the node itself).
346
347 =cut
348
349 sub up_nodes() {
350 map $_->{id}, grep $_->{transport}, values %NODE
351 }
352
353 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
354
355 Registers a callback that is called each time a node goes up (a connection
356 is established) or down (the connection is lost).
357
358 Node up messages can only be followed by node down messages for the same
359 node, and vice versa.
360
361 Note that monitoring a node is usually better done by monitoring it's node
362 port. This function is mainly of interest to modules that are concerned
363 about the network topology and low-level connection handling.
364
365 Callbacks I<must not> block and I<should not> send any messages.
366
367 The function returns an optional guard which can be used to unregister
368 the monitoring callback again.
369
370 Example: make sure you call function C<newnode> for all nodes that are up
371 or go up (and down).
372
373 newnode $_, 1 for up_nodes;
374 mon_nodes \&newnode;
375
376 =cut
377
378 our %MON_NODES;
379
380 sub mon_nodes($) {
381 my ($cb) = @_;
382
383 $MON_NODES{$cb+0} = $cb;
384
385 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
386 }
387
388 sub _inject_nodeevent($$;@) {
389 my ($node, $up, @reason) = @_;
390
391 for my $cb (values %MON_NODES) {
392 eval { $cb->($node->{id}, $up, @reason); 1 }
393 or $WARN->(1, $@);
394 }
395
396 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
397 }
398
399 #############################################################################
400 # self node code
401
402 sub _kill {
403 my $port = shift;
404
405 delete $PORT{$port}
406 or return; # killing nonexistent ports is O.K.
407 delete $PORT_DATA{$port};
408
409 my $mon = delete $LMON{$port}
410 or !@_
411 or $WARN->(2, "unmonitored local port $port died with reason: @_");
412
413 $_->(@_) for values %$mon;
414 }
415
416 sub _monitor {
417 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
418 unless exists $PORT{$_[1]};
419
420 $LMON{$_[1]}{$_[2]+0} = $_[2];
421 }
422
423 sub _unmonitor {
424 delete $LMON{$_[1]}{$_[2]+0}
425 if exists $LMON{$_[1]};
426 }
427
428 our %node_req = (
429 # internal services
430
431 # monitoring
432 mon0 => sub { # stop monitoring a port for another node
433 my $portid = shift;
434 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
435 },
436 mon1 => sub { # start monitoring a port for another node
437 my $portid = shift;
438 Scalar::Util::weaken (my $node = $SRCNODE);
439 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
440 delete $node->{rmon}{$portid};
441 $node->send (["", kil0 => $portid, @_])
442 if $node && $node->{transport};
443 };
444 },
445 # another node has killed a monitored port
446 kil0 => sub {
447 my $cbs = delete $SRCNODE->{lmon}{+shift}
448 or return;
449
450 $_->(@_) for @$cbs;
451 },
452
453 # "public" services - not actually public
454
455 # another node wants to kill a local port
456 kil => \&_kill,
457
458 # relay message to another node / generic echo
459 snd => \&snd,
460 snd_multiple => sub {
461 snd @$_ for @_
462 },
463
464 # informational
465 info => sub {
466 snd @_, $NODE;
467 },
468 known_nodes => sub {
469 snd @_, known_nodes;
470 },
471 up_nodes => sub {
472 snd @_, up_nodes;
473 },
474
475 # random utilities
476 eval => sub {
477 my @res = do { package main; eval shift };
478 snd @_, "$@", @res if @_;
479 },
480 time => sub {
481 snd @_, AE::time;
482 },
483 devnull => sub {
484 #
485 },
486 "" => sub {
487 # empty messages are keepalives or similar devnull-applications
488 },
489 );
490
491 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
492 $PORT{""} = sub {
493 my $tag = shift;
494 eval { &{ $node_req{$tag} ||= load_func $tag } };
495 $WARN->(2, "error processing node message: $@") if $@;
496 };
497
498 #############################################################################
499 # seed management, try to keep connections to all seeds
500
501 our %SEED_NODE; # seed ID => node ID|undef
502 our %NODE_SEED; # map node ID to seed ID
503 our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting
504 our $SEED_WATCHER;
505
506 # called before sending the greeting, grabs address we connect to
507 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
508 defined (my $seed = $_[0]{seed})
509 or return;
510
511 $SEED_CONNECT{$seed} = 2;
512 };
513
514 # called after receiving remote greeting, grabs remote node name
515 push @AnyEvent::MP::Transport::HOOK_GREETING, sub {
516 defined (my $seed = $_[0]{seed})
517 or return;
518
519 # we rely on untrusted data here (the remote node name) this is
520 # hopefully ok, as this can at most be used for DOSing, which is easy
521 # when you can do MITM anyway.
522
523 # if we connect to ourselves, nuke this seed
524 if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
525 delete $SEED_NODE{$_[0]{seed}};
526 delete $NODE_SEED{$_[0]{seed}};
527 } else {
528 $SEED_NODE{$seed} = $_[0]{remote_node};
529 $NODE_SEED{$_[0]{remote_node}} = $seed;
530 }
531 };
532
533 ## called when connection is up, same as above, but now verified
534 #push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
535 # defined (my $seed = $_[0]{seed})
536 # or return;
537 # AE::log 5, "connected($seed)\n";#d#
538 #
539 # $SEED_NODE{$seed} = $_[0]{remote_node};
540 # $NODE_SEED{$_[0]{remote_node}} = $seed;
541 #};
542
543 # called when connections get destroyed, update our data structures
544 # and check for self-connects
545 push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
546 # if we lost the connection to a seed node, make sure we start seeding
547 seed_again ()#d#
548 if exists $NODE_SEED{ $_[0]{remote_node} };
549
550 defined (my $seed = $_[0]{seed})
551 or return;
552
553 delete $SEED_CONNECT{$seed};
554 };
555
556 sub seed_connect {
557 my ($seed) = @_;
558
559 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
560 or Carp::croak "$seed: unparsable seed address";
561
562 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
563
564 # ughhh
565 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
566 seed => $seed,
567 sub {
568 $SEED_CONNECT{$seed} = 1;
569 },
570 ;
571 }
572
573 sub seed_all {
574 # my $next = List::Util::max 1,
575 # $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
576 # * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
577 # - rand;
578
579 my @seeds = grep {
580 !exists $SEED_CONNECT{$_}
581 && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_})
582 } keys %SEED_NODE;
583
584 if (@seeds) {
585 # start conenction attempt for every seed we are not connected to yet
586 seed_connect $_
587 for @seeds;
588 } else {
589 # all seeds connected or connecting
590 undef $SEED_WATCHER;
591 }
592 }
593
594 sub seed_again {
595 $SEED_WATCHER ||= AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, \&seed_all;
596 }
597
598 # sets new seed list, starts connecting
599 sub set_seeds(@) {
600 %SEED_NODE = ();
601 @SEED_NODE{@_} = ();
602
603 seed_again;
604 }
605
606 #############################################################################
607 # configure
608
609 sub _nodename {
610 require POSIX;
611 (POSIX::uname ())[1]
612 }
613
614 sub _resolve($) {
615 my ($nodeid) = @_;
616
617 my $cv = AE::cv;
618 my @res;
619
620 $cv->begin (sub {
621 my %seen;
622 my @refs;
623 for (sort { $a->[0] <=> $b->[0] } @res) {
624 push @refs, $_->[1] unless $seen{$_->[1]}++
625 }
626 shift->send (@refs);
627 });
628
629 my $idx;
630 for my $t (split /,/, $nodeid) {
631 my $pri = ++$idx;
632
633 $t = length $t ? _nodename . ":$t" : _nodename
634 if $t =~ /^\d*$/;
635
636 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
637 or Carp::croak "$t: unparsable transport descriptor";
638
639 $port = "0" if $port eq "*";
640
641 if ($host eq "*") {
642 $cv->begin;
643 # use fork_call, as Net::Interface is big, and we need it rarely.
644 require AnyEvent::Util;
645 AnyEvent::Util::fork_call (
646 sub {
647 my @addr;
648
649 require Net::Interface;
650
651 for my $if (Net::Interface->interfaces) {
652 # we statically lower-prioritise ipv6 here, TODO :()
653 for $_ ($if->address (Net::Interface::AF_INET ())) {
654 next if /^\x7f/; # skip localhost etc.
655 push @addr, $_;
656 }
657 for ($if->address (Net::Interface::AF_INET6 ())) {
658 #next if $if->scope ($_) <= 2;
659 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
660 push @addr, $_;
661 }
662
663 }
664 @addr
665 }, sub {
666 for my $ip (@_) {
667 push @res, [
668 $pri += 1e-5,
669 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
670 ];
671 }
672 $cv->end;
673 }
674 );
675 } else {
676 $cv->begin;
677 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
678 for (@_) {
679 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
680 push @res, [
681 $pri += 1e-5,
682 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
683 ];
684 }
685 $cv->end;
686 };
687 }
688 }
689
690 $cv->end;
691
692 $cv
693 }
694
695 sub configure(@) {
696 unshift @_, "profile" if @_ & 1;
697 my (%kv) = @_;
698
699 delete $NODE{$NODE}; # we do not support doing stuff before configure
700 _init_names;
701
702 my $profile = delete $kv{profile};
703
704 $profile = _nodename
705 unless defined $profile;
706
707 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
708
709 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
710
711 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
712
713 $NODE = $node
714 unless $node eq "anon/";
715
716 $NODE{$NODE} = $NODE{""};
717 $NODE{$NODE}{id} = $NODE;
718
719 my $seeds = $CONFIG->{seeds};
720 my $binds = $CONFIG->{binds};
721
722 $binds ||= ["*"];
723
724 $WARN->(8, "node $NODE starting up.");
725
726 $LISTENER = [];
727 %LISTENER = ();
728
729 for (map _resolve $_, @$binds) {
730 for my $bind ($_->recv) {
731 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
732 or Carp::croak "$bind: unparsable local bind address";
733
734 my $listener = AnyEvent::MP::Transport::mp_server
735 $host,
736 $port,
737 prepare => sub {
738 my (undef, $host, $port) = @_;
739 $bind = AnyEvent::Socket::format_hostport $host, $port;
740 0
741 },
742 ;
743 $LISTENER{$bind} = $listener;
744 push @$LISTENER, $bind;
745 }
746 }
747
748 $WARN->(8, "node listens on [@$LISTENER].");
749
750 # the global service is mandatory currently
751 require AnyEvent::MP::Global;
752
753 # connect to all seednodes
754 set_seeds map $_->recv, map _resolve $_, @$seeds;
755
756 for (@{ $CONFIG->{services} }) {
757 if (ref) {
758 my ($func, @args) = @$_;
759 (load_func $func)->(@args);
760 } elsif (s/::$//) {
761 eval "require $_";
762 die $@ if $@;
763 } else {
764 (load_func $_)->();
765 }
766 }
767 }
768
769 =back
770
771 =head1 SEE ALSO
772
773 L<AnyEvent::MP>.
774
775 =head1 AUTHOR
776
777 Marc Lehmann <schmorp@schmorp.de>
778 http://home.schmorp.de/
779
780 =cut
781
782 1
783