ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.68
Committed: Mon Apr 19 04:50:46 2010 UTC (14 years, 1 month ago) by root
Branch: MAIN
CVS Tags: rel-1_29, rel-1_30
Changes since 1.67: +2 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38     our @EXPORT = qw(
39 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 root 1.21 add_node load_func snd_to_func snd_on eval_on
41 root 1.1
42 root 1.34 NODE $NODE node_of snd kil port_is_local
43     configure
44 root 1.46 up_nodes mon_nodes node_is_up
45 root 1.1 );
46    
47 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
48 root 1.1
49 root 1.27 This value is called with an error or warning message, when e.g. a
50     connection could not be created, authorisation failed and so on.
51    
52     It I<must not> block or send messages -queue it and use an idle watcher if
53     you need to do any of these things.
54 root 1.1
55 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
56 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
57     node connectivity and services, C<8> for debugging messages and C<9> for
58     tracing messages.
59    
60 root 1.1 The default simply logs the message to STDERR.
61    
62 root 1.44 =item @AnyEvent::MP::Kernel::WARN
63    
64     All code references in this array are called for every log message, from
65     the default C<$WARN> handler. This is an easy way to tie into the log
66     messages without disturbing others.
67    
68 root 1.1 =cut
69    
70 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
71 root 1.44 our @WARN;
72     our $WARN = sub {
73     &$_ for @WARN;
74 root 1.29
75     return if $WARNLEVEL < $_[0];
76    
77 root 1.16 my ($level, $msg) = @_;
78    
79 root 1.1 $msg =~ s/\n$//;
80 root 1.16
81     printf STDERR "%s <%d> %s\n",
82     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
83     $level,
84     $msg;
85 root 1.1 };
86    
87 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
88    
89     The maximum level at which warning messages will be printed to STDERR by
90     the default warn handler.
91    
92     =cut
93    
94 root 1.6 sub load_func($) {
95     my $func = $_[0];
96    
97     unless (defined &$func) {
98     my $pkg = $func;
99     do {
100     $pkg =~ s/::[^:]+$//
101 root 1.63 or return sub { die "unable to resolve function '$func'" };
102 root 1.60
103     local $@;
104 root 1.61 unless (eval "require $pkg; 1") {
105     my $error = $@;
106     $error =~ /^Can't locate .*.pm in \@INC \(/
107     or return sub { die $error };
108     }
109 root 1.6 } until defined &$func;
110     }
111    
112     \&$func
113     }
114    
115 root 1.1 sub nonce($) {
116     my $nonce;
117    
118     if (open my $fh, "</dev/urandom") {
119     sysread $fh, $nonce, $_[0];
120     } else {
121     # shit...
122     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
123     }
124    
125     $nonce
126     }
127    
128 root 1.21 sub alnumbits($) {
129 root 1.1 my $data = $_[0];
130    
131     if (eval "use Math::GMP 2.05; 1") {
132     $data = Math::GMP::get_str_gmp (
133     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
134     62
135     );
136     } else {
137     $data = MIME::Base64::encode_base64 $data, "";
138     $data =~ s/=//;
139 root 1.31 $data =~ s/x/x0/g;
140     $data =~ s/\//x1/g;
141     $data =~ s/\+/x2/g;
142 root 1.1 }
143    
144     $data
145     }
146    
147     sub gen_uniq {
148 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
149 root 1.1 }
150    
151 root 1.20 our $CONFIG; # this node's configuration
152 root 1.21
153 root 1.64 our $RUNIQ; # remote uniq value
154     our $UNIQ; # per-process/node unique cookie
155     our $NODE;
156     our $ID = "a";
157 root 1.1
158     our %NODE; # node id to transport mapping, or "undef", for local node
159     our (%PORT, %PORT_DATA); # local ports
160    
161 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
162 root 1.1 our %LMON; # monitored _local_ ports
163    
164     our %LISTENER;
165 root 1.21 our $LISTENER; # our listeners, as arrayref
166 root 1.1
167     our $SRCNODE; # holds the sending node during _inject
168    
169 root 1.64 sub _seed {
170     $RUNIQ = alnumbits nonce 96/8;
171     $UNIQ = gen_uniq;
172     $NODE = "anon/$RUNIQ";
173     }
174    
175     _seed;
176    
177 root 1.1 sub NODE() {
178     $NODE
179     }
180    
181     sub node_of($) {
182 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
183 root 1.1
184 root 1.21 $node
185 root 1.1 }
186    
187 root 1.17 BEGIN {
188     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
189     ? sub () { 1 }
190     : sub () { 0 };
191     }
192 root 1.1
193 root 1.42 our $DELAY_TIMER;
194     our @DELAY_QUEUE;
195    
196     sub _delay_run {
197 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
198 root 1.42 }
199    
200     sub delay($) {
201     push @DELAY_QUEUE, shift;
202     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
203     }
204    
205 root 1.1 sub _inject {
206 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
207 root 1.1 &{ $PORT{+shift} or return };
208     }
209    
210 root 1.20 # this function adds a node-ref, so you can send stuff to it
211     # it is basically the central routing component.
212 root 1.1 sub add_node {
213 root 1.21 my ($node) = @_;
214 root 1.1
215 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
216 root 1.13 }
217    
218 root 1.1 sub snd(@) {
219 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
220 root 1.1
221 root 1.48 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
222 root 1.1
223 root 1.49 defined $nodeid #d#UGLY
224     or Carp::croak "'undef' is not a valid node ID/port ID";
225    
226 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
227 root 1.2 ->{send} (["$portid", @_]);
228 root 1.1 }
229    
230 root 1.17 =item $is_local = port_is_local $port
231    
232     Returns true iff the port is a local port.
233    
234     =cut
235    
236     sub port_is_local($) {
237 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
238 root 1.17
239 root 1.21 $NODE{$nodeid} == $NODE{""}
240 root 1.17 }
241    
242 root 1.18 =item snd_to_func $node, $func, @args
243 root 1.11
244 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
245 root 1.11 this function with the given arguments on that node.
246    
247 root 1.20 This function can be used to implement C<spawn>-like interfaces.
248 root 1.11
249     =cut
250    
251 root 1.18 sub snd_to_func($$;@) {
252 root 1.21 my $nodeid = shift;
253 root 1.11
254 root 1.41 # on $NODE, we artificially delay... (for spawn)
255     # this is very ugly - maybe we should simply delay ALL messages,
256     # to avoid deep recursion issues. but that's so... slow...
257 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
258     if $nodeid ne $NODE;
259    
260 root 1.49 defined $nodeid #d#UGLY
261     or Carp::croak "'undef' is not a valid node ID/port ID";
262    
263 root 1.45 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
264 root 1.11 }
265    
266 root 1.18 =item snd_on $node, @msg
267    
268     Executes C<snd> with the given C<@msg> (which must include the destination
269     port) on the given node.
270    
271     =cut
272    
273     sub snd_on($@) {
274     my $node = shift;
275     snd $node, snd => @_;
276     }
277    
278 root 1.29 =item eval_on $node, $string[, @reply]
279 root 1.18
280 root 1.29 Evaluates the given string as Perl expression on the given node. When
281     @reply is specified, then it is used to construct a reply message with
282     C<"$@"> and any results from the eval appended.
283 root 1.18
284     =cut
285    
286 root 1.29 sub eval_on($$;@) {
287 root 1.18 my $node = shift;
288     snd $node, eval => @_;
289     }
290    
291 root 1.1 sub kil(@) {
292 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
293 root 1.1
294     length $portid
295 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
296 root 1.1
297 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
298 root 1.1 ->kill ("$portid", @_);
299     }
300    
301 root 1.7 sub _nodename {
302     require POSIX;
303     (POSIX::uname ())[1]
304     }
305    
306 root 1.21 sub _resolve($) {
307     my ($nodeid) = @_;
308 root 1.1
309     my $cv = AE::cv;
310     my @res;
311    
312     $cv->begin (sub {
313     my %seen;
314     my @refs;
315     for (sort { $a->[0] <=> $b->[0] } @res) {
316     push @refs, $_->[1] unless $seen{$_->[1]}++
317     }
318 root 1.21 shift->send (@refs);
319 root 1.1 });
320    
321     my $idx;
322 root 1.21 for my $t (split /,/, $nodeid) {
323 root 1.1 my $pri = ++$idx;
324 root 1.7
325     $t = length $t ? _nodename . ":$t" : _nodename
326     if $t =~ /^\d*$/;
327 root 1.1
328 root 1.34 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
329 root 1.7 or Carp::croak "$t: unparsable transport descriptor";
330    
331 root 1.33 $port = "0" if $port eq "*";
332    
333     if ($host eq "*") {
334     $cv->begin;
335     # use fork_call, as Net::Interface is big, and we need it rarely.
336     require AnyEvent::Util;
337     AnyEvent::Util::fork_call (
338     sub {
339     my @addr;
340    
341     require Net::Interface;
342    
343     for my $if (Net::Interface->interfaces) {
344     # we statically lower-prioritise ipv6 here, TODO :()
345 root 1.47 for $_ ($if->address (Net::Interface::AF_INET ())) {
346 root 1.33 next if /^\x7f/; # skip localhost etc.
347     push @addr, $_;
348     }
349     for ($if->address (Net::Interface::AF_INET6 ())) {
350     #next if $if->scope ($_) <= 2;
351     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
352     push @addr, $_;
353     }
354    
355     }
356     @addr
357     }, sub {
358     for my $ip (@_) {
359     push @res, [
360     $pri += 1e-5,
361     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
362     ];
363     }
364     $cv->end;
365     }
366     );
367     } else {
368     $cv->begin;
369     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
370     for (@_) {
371     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
372     push @res, [
373     $pri += 1e-5,
374     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
375     ];
376     }
377     $cv->end;
378     };
379     }
380 root 1.1 }
381    
382     $cv->end;
383    
384     $cv
385     }
386    
387 root 1.39 sub configure(@) {
388     unshift @_, "profile" if @_ & 1;
389 root 1.34 my (%kv) = @_;
390    
391 root 1.64 delete $NODE{$NODE}; # we do not support doing stuff before configure
392     _seed;
393    
394 root 1.34 my $profile = delete $kv{profile};
395 root 1.1
396 root 1.21 $profile = _nodename
397     unless defined $profile;
398 root 1.6
399 root 1.32 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
400 root 1.24
401     my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
402 root 1.51
403     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
404    
405 root 1.24 $NODE = $node
406     unless $node eq "anon/";
407 root 1.6
408 root 1.21 $NODE{$NODE} = $NODE{""};
409     $NODE{$NODE}{id} = $NODE;
410 root 1.20
411 root 1.21 my $seeds = $CONFIG->{seeds};
412     my $binds = $CONFIG->{binds};
413 root 1.3
414 root 1.33 $binds ||= ["*"];
415 root 1.1
416 root 1.21 $WARN->(8, "node $NODE starting up.");
417 root 1.1
418 root 1.23 $LISTENER = [];
419     %LISTENER = ();
420    
421 root 1.21 for (map _resolve $_, @$binds) {
422     for my $bind ($_->recv) {
423     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
424     or Carp::croak "$bind: unparsable local bind address";
425 root 1.20
426 root 1.33 my $listener = AnyEvent::MP::Transport::mp_server
427     $host,
428     $port,
429     prepare => sub {
430     my (undef, $host, $port) = @_;
431     $bind = AnyEvent::Socket::format_hostport $host, $port;
432 root 1.53 0
433 root 1.33 },
434     ;
435     $LISTENER{$bind} = $listener;
436 root 1.21 push @$LISTENER, $bind;
437     }
438 root 1.1 }
439    
440 root 1.40 $WARN->(8, "node listens on [@$LISTENER].");
441    
442 root 1.21 # the global service is mandatory currently
443     require AnyEvent::MP::Global;
444 root 1.1
445 root 1.21 # connect to all seednodes
446     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
447 root 1.1
448 root 1.20 for (@{ $CONFIG->{services} }) {
449 root 1.43 if (ref) {
450     my ($func, @args) = @$_;
451     (load_func $func)->(@args);
452     } elsif (s/::$//) {
453 root 1.13 eval "require $_";
454     die $@ if $@;
455     } else {
456     (load_func $_)->();
457     }
458     }
459 root 1.1 }
460    
461     #############################################################################
462 root 1.6 # node monitoring and info
463 root 1.3
464 root 1.21 =item node_is_known $nodeid
465 root 1.13
466 root 1.46 Returns true iff the given node is currently known to the system. The only
467     time a node is known but not up currently is when a conenction request is
468     pending.
469 root 1.13
470     =cut
471    
472     sub node_is_known($) {
473     exists $NODE{$_[0]}
474     }
475    
476 root 1.21 =item node_is_up $nodeid
477 root 1.13
478     Returns true if the given node is "up", that is, the kernel thinks it has
479     a working connection to it.
480    
481     If the node is known but not currently connected, returns C<0>. If the
482     node is not known, returns C<undef>.
483    
484     =cut
485    
486     sub node_is_up($) {
487     ($NODE{$_[0]} or return)->{transport}
488     ? 1 : 0
489     }
490    
491 root 1.3 =item known_nodes
492    
493 root 1.26 Returns the node IDs of all nodes currently known to this node, including
494     itself and nodes not currently connected.
495 root 1.3
496     =cut
497    
498 root 1.49 sub known_nodes() {
499 root 1.26 map $_->{id}, values %NODE
500 root 1.3 }
501    
502     =item up_nodes
503    
504 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
505     the node itself).
506 root 1.3
507     =cut
508    
509 root 1.49 sub up_nodes() {
510 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
511 root 1.3 }
512    
513 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
514 root 1.3
515 root 1.27 Registers a callback that is called each time a node goes up (a connection
516     is established) or down (the connection is lost).
517 root 1.3
518     Node up messages can only be followed by node down messages for the same
519     node, and vice versa.
520    
521 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
522     port. This function is mainly of interest to modules that are concerned
523     about the network topology and low-level connection handling.
524    
525     Callbacks I<must not> block and I<should not> send any messages.
526    
527     The function returns an optional guard which can be used to unregister
528 root 1.3 the monitoring callback again.
529    
530 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
531     or go up (and down).
532    
533     newnode $_, 1 for up_nodes;
534     mon_nodes \&newnode;
535    
536 root 1.3 =cut
537    
538     our %MON_NODES;
539    
540     sub mon_nodes($) {
541     my ($cb) = @_;
542    
543     $MON_NODES{$cb+0} = $cb;
544    
545 root 1.62 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
546 root 1.3 }
547    
548     sub _inject_nodeevent($$;@) {
549 root 1.16 my ($node, $up, @reason) = @_;
550 root 1.3
551     for my $cb (values %MON_NODES) {
552 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
553 root 1.16 or $WARN->(1, $@);
554 root 1.3 }
555 root 1.16
556 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
557 root 1.3 }
558    
559     #############################################################################
560 root 1.1 # self node code
561    
562 root 1.67 sub _kill {
563     my $port = shift;
564    
565     delete $PORT{$port}
566     or return; # killing nonexistent ports is O.K.
567     delete $PORT_DATA{$port};
568    
569     my $mon = delete $LMON{$port}
570     or !@_
571     or $WARN->(2, "unmonitored local port $port died with reason: @_");
572    
573     $_->(@_) for values %$mon;
574     }
575    
576     sub _monitor {
577     return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
578     unless exists $PORT{$_[1]};
579    
580     $LMON{$_[1]}{$_[2]+0} = $_[2];
581     }
582    
583     sub _unmonitor {
584 root 1.68 delete $LMON{$_[1]}{$_[2]+0}
585     if exists $LMON{$_[1]};
586 root 1.67 }
587    
588 root 1.1 our %node_req = (
589     # internal services
590    
591     # monitoring
592 root 1.65 mon0 => sub { # stop monitoring a port for another node
593 root 1.1 my $portid = shift;
594 root 1.67 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
595 root 1.1 },
596 root 1.65 mon1 => sub { # start monitoring a port for another node
597 root 1.1 my $portid = shift;
598 root 1.67 Scalar::Util::weaken (my $node = $SRCNODE);
599     _monitor undef, $portid, $node->{rmon}{$portid} = sub {
600 root 1.58 delete $node->{rmon}{$portid};
601 root 1.65 $node->send (["", kil0 => $portid, @_])
602 root 1.59 if $node && $node->{transport};
603 root 1.67 };
604 root 1.1 },
605 root 1.65 # another node has killed a monitored port
606     kil0 => sub {
607 root 1.1 my $cbs = delete $SRCNODE->{lmon}{+shift}
608     or return;
609    
610     $_->(@_) for @$cbs;
611     },
612    
613 root 1.18 # "public" services - not actually public
614 root 1.1
615 root 1.65 # another node wants to kill a local port
616 root 1.66 kil => \&_kill,
617 root 1.65
618 root 1.1 # relay message to another node / generic echo
619 root 1.15 snd => \&snd,
620 root 1.27 snd_multiple => sub {
621 root 1.1 snd @$_ for @_
622     },
623    
624 root 1.4 # informational
625     info => sub {
626     snd @_, $NODE;
627     },
628     known_nodes => sub {
629     snd @_, known_nodes;
630     },
631     up_nodes => sub {
632     snd @_, up_nodes;
633     },
634    
635 root 1.30 # random utilities
636 root 1.1 eval => sub {
637 root 1.50 my @res = do { package main; eval shift };
638 root 1.1 snd @_, "$@", @res if @_;
639     },
640     time => sub {
641     snd @_, AE::time;
642     },
643     devnull => sub {
644     #
645     },
646 root 1.15 "" => sub {
647 root 1.27 # empty messages are keepalives or similar devnull-applications
648 root 1.15 },
649 root 1.1 );
650    
651 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
652 root 1.1 $PORT{""} = sub {
653     my $tag = shift;
654     eval { &{ $node_req{$tag} ||= load_func $tag } };
655 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
656 root 1.1 };
657    
658     =back
659    
660     =head1 SEE ALSO
661    
662     L<AnyEvent::MP>.
663    
664     =head1 AUTHOR
665    
666     Marc Lehmann <schmorp@schmorp.de>
667     http://home.schmorp.de/
668    
669     =cut
670    
671     1
672