ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.66
Committed: Wed Dec 30 13:37:53 2009 UTC (14 years, 7 months ago) by root
Branch: MAIN
Changes since 1.65: +16 -3 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38     our @EXPORT = qw(
39 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 root 1.21 add_node load_func snd_to_func snd_on eval_on
41 root 1.1
42 root 1.34 NODE $NODE node_of snd kil port_is_local
43     configure
44 root 1.46 up_nodes mon_nodes node_is_up
45 root 1.1 );
46    
47 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
48 root 1.1
49 root 1.27 This value is called with an error or warning message, when e.g. a
50     connection could not be created, authorisation failed and so on.
51    
52     It I<must not> block or send messages -queue it and use an idle watcher if
53     you need to do any of these things.
54 root 1.1
55 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
56 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
57     node connectivity and services, C<8> for debugging messages and C<9> for
58     tracing messages.
59    
60 root 1.1 The default simply logs the message to STDERR.
61    
62 root 1.44 =item @AnyEvent::MP::Kernel::WARN
63    
64     All code references in this array are called for every log message, from
65     the default C<$WARN> handler. This is an easy way to tie into the log
66     messages without disturbing others.
67    
68 root 1.1 =cut
69    
70 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
71 root 1.44 our @WARN;
72     our $WARN = sub {
73     &$_ for @WARN;
74 root 1.29
75     return if $WARNLEVEL < $_[0];
76    
77 root 1.16 my ($level, $msg) = @_;
78    
79 root 1.1 $msg =~ s/\n$//;
80 root 1.16
81     printf STDERR "%s <%d> %s\n",
82     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
83     $level,
84     $msg;
85 root 1.1 };
86    
87 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
88    
89     The maximum level at which warning messages will be printed to STDERR by
90     the default warn handler.
91    
92     =cut
93    
94 root 1.6 sub load_func($) {
95     my $func = $_[0];
96    
97     unless (defined &$func) {
98     my $pkg = $func;
99     do {
100     $pkg =~ s/::[^:]+$//
101 root 1.63 or return sub { die "unable to resolve function '$func'" };
102 root 1.60
103     local $@;
104 root 1.61 unless (eval "require $pkg; 1") {
105     my $error = $@;
106     $error =~ /^Can't locate .*.pm in \@INC \(/
107     or return sub { die $error };
108     }
109 root 1.6 } until defined &$func;
110     }
111    
112     \&$func
113     }
114    
115 root 1.1 sub nonce($) {
116     my $nonce;
117    
118     if (open my $fh, "</dev/urandom") {
119     sysread $fh, $nonce, $_[0];
120     } else {
121     # shit...
122     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
123     }
124    
125     $nonce
126     }
127    
128 root 1.21 sub alnumbits($) {
129 root 1.1 my $data = $_[0];
130    
131     if (eval "use Math::GMP 2.05; 1") {
132     $data = Math::GMP::get_str_gmp (
133     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
134     62
135     );
136     } else {
137     $data = MIME::Base64::encode_base64 $data, "";
138     $data =~ s/=//;
139 root 1.31 $data =~ s/x/x0/g;
140     $data =~ s/\//x1/g;
141     $data =~ s/\+/x2/g;
142 root 1.1 }
143    
144     $data
145     }
146    
147     sub gen_uniq {
148 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
149 root 1.1 }
150    
151 root 1.20 our $CONFIG; # this node's configuration
152 root 1.21
153 root 1.64 our $RUNIQ; # remote uniq value
154     our $UNIQ; # per-process/node unique cookie
155     our $NODE;
156     our $ID = "a";
157 root 1.1
158     our %NODE; # node id to transport mapping, or "undef", for local node
159     our (%PORT, %PORT_DATA); # local ports
160    
161 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
162 root 1.1 our %LMON; # monitored _local_ ports
163    
164     our %LISTENER;
165 root 1.21 our $LISTENER; # our listeners, as arrayref
166 root 1.1
167     our $SRCNODE; # holds the sending node during _inject
168    
169 root 1.64 sub _seed {
170     $RUNIQ = alnumbits nonce 96/8;
171     $UNIQ = gen_uniq;
172     $NODE = "anon/$RUNIQ";
173     }
174    
175     _seed;
176    
177 root 1.1 sub NODE() {
178     $NODE
179     }
180    
181     sub node_of($) {
182 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
183 root 1.1
184 root 1.21 $node
185 root 1.1 }
186    
187 root 1.17 BEGIN {
188     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
189     ? sub () { 1 }
190     : sub () { 0 };
191     }
192 root 1.1
193 root 1.42 our $DELAY_TIMER;
194     our @DELAY_QUEUE;
195    
196     sub _delay_run {
197 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
198 root 1.42 }
199    
200     sub delay($) {
201     push @DELAY_QUEUE, shift;
202     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
203     }
204    
205 root 1.1 sub _inject {
206 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
207 root 1.1 &{ $PORT{+shift} or return };
208     }
209    
210 root 1.20 # this function adds a node-ref, so you can send stuff to it
211     # it is basically the central routing component.
212 root 1.1 sub add_node {
213 root 1.21 my ($node) = @_;
214 root 1.1
215 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
216 root 1.13 }
217    
218 root 1.1 sub snd(@) {
219 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
220 root 1.1
221 root 1.48 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
222 root 1.1
223 root 1.49 defined $nodeid #d#UGLY
224     or Carp::croak "'undef' is not a valid node ID/port ID";
225    
226 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
227 root 1.2 ->{send} (["$portid", @_]);
228 root 1.1 }
229    
230 root 1.17 =item $is_local = port_is_local $port
231    
232     Returns true iff the port is a local port.
233    
234     =cut
235    
236     sub port_is_local($) {
237 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
238 root 1.17
239 root 1.21 $NODE{$nodeid} == $NODE{""}
240 root 1.17 }
241    
242 root 1.18 =item snd_to_func $node, $func, @args
243 root 1.11
244 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
245 root 1.11 this function with the given arguments on that node.
246    
247 root 1.20 This function can be used to implement C<spawn>-like interfaces.
248 root 1.11
249     =cut
250    
251 root 1.18 sub snd_to_func($$;@) {
252 root 1.21 my $nodeid = shift;
253 root 1.11
254 root 1.41 # on $NODE, we artificially delay... (for spawn)
255     # this is very ugly - maybe we should simply delay ALL messages,
256     # to avoid deep recursion issues. but that's so... slow...
257 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
258     if $nodeid ne $NODE;
259    
260 root 1.49 defined $nodeid #d#UGLY
261     or Carp::croak "'undef' is not a valid node ID/port ID";
262    
263 root 1.45 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
264 root 1.11 }
265    
266 root 1.18 =item snd_on $node, @msg
267    
268     Executes C<snd> with the given C<@msg> (which must include the destination
269     port) on the given node.
270    
271     =cut
272    
273     sub snd_on($@) {
274     my $node = shift;
275     snd $node, snd => @_;
276     }
277    
278 root 1.29 =item eval_on $node, $string[, @reply]
279 root 1.18
280 root 1.29 Evaluates the given string as Perl expression on the given node. When
281     @reply is specified, then it is used to construct a reply message with
282     C<"$@"> and any results from the eval appended.
283 root 1.18
284     =cut
285    
286 root 1.29 sub eval_on($$;@) {
287 root 1.18 my $node = shift;
288     snd $node, eval => @_;
289     }
290    
291 root 1.1 sub kil(@) {
292 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
293 root 1.1
294     length $portid
295 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
296 root 1.1
297 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
298 root 1.1 ->kill ("$portid", @_);
299     }
300    
301 root 1.66 # actual code that kills a port
302     sub _kill {
303     my $port = shift;
304    
305     delete $PORT{$port}
306     or return; # killing nonexistent ports is O.K.
307     delete $PORT_DATA{$port};
308    
309     my $mon = delete $LMON{$port}
310     or !@_
311     or $WARN->(2, "unmonitored local port $port died with reason: @_");
312    
313     $_->(@_) for values %$mon;
314     }
315    
316 root 1.7 sub _nodename {
317     require POSIX;
318     (POSIX::uname ())[1]
319     }
320    
321 root 1.21 sub _resolve($) {
322     my ($nodeid) = @_;
323 root 1.1
324     my $cv = AE::cv;
325     my @res;
326    
327     $cv->begin (sub {
328     my %seen;
329     my @refs;
330     for (sort { $a->[0] <=> $b->[0] } @res) {
331     push @refs, $_->[1] unless $seen{$_->[1]}++
332     }
333 root 1.21 shift->send (@refs);
334 root 1.1 });
335    
336     my $idx;
337 root 1.21 for my $t (split /,/, $nodeid) {
338 root 1.1 my $pri = ++$idx;
339 root 1.7
340     $t = length $t ? _nodename . ":$t" : _nodename
341     if $t =~ /^\d*$/;
342 root 1.1
343 root 1.34 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
344 root 1.7 or Carp::croak "$t: unparsable transport descriptor";
345    
346 root 1.33 $port = "0" if $port eq "*";
347    
348     if ($host eq "*") {
349     $cv->begin;
350     # use fork_call, as Net::Interface is big, and we need it rarely.
351     require AnyEvent::Util;
352     AnyEvent::Util::fork_call (
353     sub {
354     my @addr;
355    
356     require Net::Interface;
357    
358     for my $if (Net::Interface->interfaces) {
359     # we statically lower-prioritise ipv6 here, TODO :()
360 root 1.47 for $_ ($if->address (Net::Interface::AF_INET ())) {
361 root 1.33 next if /^\x7f/; # skip localhost etc.
362     push @addr, $_;
363     }
364     for ($if->address (Net::Interface::AF_INET6 ())) {
365     #next if $if->scope ($_) <= 2;
366     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
367     push @addr, $_;
368     }
369    
370     }
371     @addr
372     }, sub {
373     for my $ip (@_) {
374     push @res, [
375     $pri += 1e-5,
376     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
377     ];
378     }
379     $cv->end;
380     }
381     );
382     } else {
383     $cv->begin;
384     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
385     for (@_) {
386     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
387     push @res, [
388     $pri += 1e-5,
389     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
390     ];
391     }
392     $cv->end;
393     };
394     }
395 root 1.1 }
396    
397     $cv->end;
398    
399     $cv
400     }
401    
402 root 1.39 sub configure(@) {
403     unshift @_, "profile" if @_ & 1;
404 root 1.34 my (%kv) = @_;
405    
406 root 1.64 delete $NODE{$NODE}; # we do not support doing stuff before configure
407     _seed;
408    
409 root 1.34 my $profile = delete $kv{profile};
410 root 1.1
411 root 1.21 $profile = _nodename
412     unless defined $profile;
413 root 1.6
414 root 1.32 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
415 root 1.24
416     my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
417 root 1.51
418     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
419    
420 root 1.24 $NODE = $node
421     unless $node eq "anon/";
422 root 1.6
423 root 1.21 $NODE{$NODE} = $NODE{""};
424     $NODE{$NODE}{id} = $NODE;
425 root 1.20
426 root 1.21 my $seeds = $CONFIG->{seeds};
427     my $binds = $CONFIG->{binds};
428 root 1.3
429 root 1.33 $binds ||= ["*"];
430 root 1.1
431 root 1.21 $WARN->(8, "node $NODE starting up.");
432 root 1.1
433 root 1.23 $LISTENER = [];
434     %LISTENER = ();
435    
436 root 1.21 for (map _resolve $_, @$binds) {
437     for my $bind ($_->recv) {
438     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
439     or Carp::croak "$bind: unparsable local bind address";
440 root 1.20
441 root 1.33 my $listener = AnyEvent::MP::Transport::mp_server
442     $host,
443     $port,
444     prepare => sub {
445     my (undef, $host, $port) = @_;
446     $bind = AnyEvent::Socket::format_hostport $host, $port;
447 root 1.53 0
448 root 1.33 },
449     ;
450     $LISTENER{$bind} = $listener;
451 root 1.21 push @$LISTENER, $bind;
452     }
453 root 1.1 }
454    
455 root 1.40 $WARN->(8, "node listens on [@$LISTENER].");
456    
457 root 1.21 # the global service is mandatory currently
458     require AnyEvent::MP::Global;
459 root 1.1
460 root 1.21 # connect to all seednodes
461     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
462 root 1.1
463 root 1.20 for (@{ $CONFIG->{services} }) {
464 root 1.43 if (ref) {
465     my ($func, @args) = @$_;
466     (load_func $func)->(@args);
467     } elsif (s/::$//) {
468 root 1.13 eval "require $_";
469     die $@ if $@;
470     } else {
471     (load_func $_)->();
472     }
473     }
474 root 1.1 }
475    
476     #############################################################################
477 root 1.6 # node monitoring and info
478 root 1.3
479 root 1.21 =item node_is_known $nodeid
480 root 1.13
481 root 1.46 Returns true iff the given node is currently known to the system. The only
482     time a node is known but not up currently is when a conenction request is
483     pending.
484 root 1.13
485     =cut
486    
487     sub node_is_known($) {
488     exists $NODE{$_[0]}
489     }
490    
491 root 1.21 =item node_is_up $nodeid
492 root 1.13
493     Returns true if the given node is "up", that is, the kernel thinks it has
494     a working connection to it.
495    
496     If the node is known but not currently connected, returns C<0>. If the
497     node is not known, returns C<undef>.
498    
499     =cut
500    
501     sub node_is_up($) {
502     ($NODE{$_[0]} or return)->{transport}
503     ? 1 : 0
504     }
505    
506 root 1.3 =item known_nodes
507    
508 root 1.26 Returns the node IDs of all nodes currently known to this node, including
509     itself and nodes not currently connected.
510 root 1.3
511     =cut
512    
513 root 1.49 sub known_nodes() {
514 root 1.26 map $_->{id}, values %NODE
515 root 1.3 }
516    
517     =item up_nodes
518    
519 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
520     the node itself).
521 root 1.3
522     =cut
523    
524 root 1.49 sub up_nodes() {
525 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
526 root 1.3 }
527    
528 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
529 root 1.3
530 root 1.27 Registers a callback that is called each time a node goes up (a connection
531     is established) or down (the connection is lost).
532 root 1.3
533     Node up messages can only be followed by node down messages for the same
534     node, and vice versa.
535    
536 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
537     port. This function is mainly of interest to modules that are concerned
538     about the network topology and low-level connection handling.
539    
540     Callbacks I<must not> block and I<should not> send any messages.
541    
542     The function returns an optional guard which can be used to unregister
543 root 1.3 the monitoring callback again.
544    
545 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
546     or go up (and down).
547    
548     newnode $_, 1 for up_nodes;
549     mon_nodes \&newnode;
550    
551 root 1.3 =cut
552    
553     our %MON_NODES;
554    
555     sub mon_nodes($) {
556     my ($cb) = @_;
557    
558     $MON_NODES{$cb+0} = $cb;
559    
560 root 1.62 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
561 root 1.3 }
562    
563     sub _inject_nodeevent($$;@) {
564 root 1.16 my ($node, $up, @reason) = @_;
565 root 1.3
566     for my $cb (values %MON_NODES) {
567 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
568 root 1.16 or $WARN->(1, $@);
569 root 1.3 }
570 root 1.16
571 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
572 root 1.3 }
573    
574     #############################################################################
575 root 1.1 # self node code
576    
577     our %node_req = (
578     # internal services
579    
580     # monitoring
581 root 1.65 mon0 => sub { # stop monitoring a port for another node
582 root 1.1 my $portid = shift;
583     my $node = $SRCNODE;
584     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
585     },
586 root 1.65 mon1 => sub { # start monitoring a port for another node
587 root 1.1 my $portid = shift;
588     my $node = $SRCNODE;
589 root 1.59 Scalar::Util::weaken $node;
590 root 1.1 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
591 root 1.58 delete $node->{rmon}{$portid};
592 root 1.65 $node->send (["", kil0 => $portid, @_])
593 root 1.59 if $node && $node->{transport};
594 root 1.1 });
595     },
596 root 1.65 # another node has killed a monitored port
597     kil0 => sub {
598 root 1.1 my $cbs = delete $SRCNODE->{lmon}{+shift}
599     or return;
600    
601     $_->(@_) for @$cbs;
602     },
603    
604 root 1.18 # "public" services - not actually public
605 root 1.1
606 root 1.65 # another node wants to kill a local port
607 root 1.66 kil => \&_kill,
608 root 1.65
609 root 1.1 # relay message to another node / generic echo
610 root 1.15 snd => \&snd,
611 root 1.27 snd_multiple => sub {
612 root 1.1 snd @$_ for @_
613     },
614    
615 root 1.4 # informational
616     info => sub {
617     snd @_, $NODE;
618     },
619     known_nodes => sub {
620     snd @_, known_nodes;
621     },
622     up_nodes => sub {
623     snd @_, up_nodes;
624     },
625    
626 root 1.30 # random utilities
627 root 1.1 eval => sub {
628 root 1.50 my @res = do { package main; eval shift };
629 root 1.1 snd @_, "$@", @res if @_;
630     },
631     time => sub {
632     snd @_, AE::time;
633     },
634     devnull => sub {
635     #
636     },
637 root 1.15 "" => sub {
638 root 1.27 # empty messages are keepalives or similar devnull-applications
639 root 1.15 },
640 root 1.1 );
641    
642 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
643 root 1.1 $PORT{""} = sub {
644     my $tag = shift;
645     eval { &{ $node_req{$tag} ||= load_func $tag } };
646 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
647 root 1.1 };
648    
649     =back
650    
651     =head1 SEE ALSO
652    
653     L<AnyEvent::MP>.
654    
655     =head1 AUTHOR
656    
657     Marc Lehmann <schmorp@schmorp.de>
658     http://home.schmorp.de/
659    
660     =cut
661    
662     1
663