ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.65
Committed: Wed Dec 30 10:59:18 2009 UTC (14 years, 5 months ago) by root
Branch: MAIN
Changes since 1.64: +10 -4 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38     our @EXPORT = qw(
39 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 root 1.21 add_node load_func snd_to_func snd_on eval_on
41 root 1.1
42 root 1.34 NODE $NODE node_of snd kil port_is_local
43     configure
44 root 1.46 up_nodes mon_nodes node_is_up
45 root 1.1 );
46    
47 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
48 root 1.1
49 root 1.27 This value is called with an error or warning message, when e.g. a
50     connection could not be created, authorisation failed and so on.
51    
52     It I<must not> block or send messages -queue it and use an idle watcher if
53     you need to do any of these things.
54 root 1.1
55 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
56 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
57     node connectivity and services, C<8> for debugging messages and C<9> for
58     tracing messages.
59    
60 root 1.1 The default simply logs the message to STDERR.
61    
62 root 1.44 =item @AnyEvent::MP::Kernel::WARN
63    
64     All code references in this array are called for every log message, from
65     the default C<$WARN> handler. This is an easy way to tie into the log
66     messages without disturbing others.
67    
68 root 1.1 =cut
69    
70 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
71 root 1.44 our @WARN;
72     our $WARN = sub {
73     &$_ for @WARN;
74 root 1.29
75     return if $WARNLEVEL < $_[0];
76    
77 root 1.16 my ($level, $msg) = @_;
78    
79 root 1.1 $msg =~ s/\n$//;
80 root 1.16
81     printf STDERR "%s <%d> %s\n",
82     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
83     $level,
84     $msg;
85 root 1.1 };
86    
87 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
88    
89     The maximum level at which warning messages will be printed to STDERR by
90     the default warn handler.
91    
92     =cut
93    
94 root 1.6 sub load_func($) {
95     my $func = $_[0];
96    
97     unless (defined &$func) {
98     my $pkg = $func;
99     do {
100     $pkg =~ s/::[^:]+$//
101 root 1.63 or return sub { die "unable to resolve function '$func'" };
102 root 1.60
103     local $@;
104 root 1.61 unless (eval "require $pkg; 1") {
105     my $error = $@;
106     $error =~ /^Can't locate .*.pm in \@INC \(/
107     or return sub { die $error };
108     }
109 root 1.6 } until defined &$func;
110     }
111    
112     \&$func
113     }
114    
115 root 1.1 sub nonce($) {
116     my $nonce;
117    
118     if (open my $fh, "</dev/urandom") {
119     sysread $fh, $nonce, $_[0];
120     } else {
121     # shit...
122     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
123     }
124    
125     $nonce
126     }
127    
128 root 1.21 sub alnumbits($) {
129 root 1.1 my $data = $_[0];
130    
131     if (eval "use Math::GMP 2.05; 1") {
132     $data = Math::GMP::get_str_gmp (
133     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
134     62
135     );
136     } else {
137     $data = MIME::Base64::encode_base64 $data, "";
138     $data =~ s/=//;
139 root 1.31 $data =~ s/x/x0/g;
140     $data =~ s/\//x1/g;
141     $data =~ s/\+/x2/g;
142 root 1.1 }
143    
144     $data
145     }
146    
147     sub gen_uniq {
148 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
149 root 1.1 }
150    
151 root 1.20 our $CONFIG; # this node's configuration
152 root 1.21
153 root 1.64 our $RUNIQ; # remote uniq value
154     our $UNIQ; # per-process/node unique cookie
155     our $NODE;
156     our $ID = "a";
157 root 1.1
158     our %NODE; # node id to transport mapping, or "undef", for local node
159     our (%PORT, %PORT_DATA); # local ports
160    
161 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
162 root 1.1 our %LMON; # monitored _local_ ports
163    
164     our %LISTENER;
165 root 1.21 our $LISTENER; # our listeners, as arrayref
166 root 1.1
167     our $SRCNODE; # holds the sending node during _inject
168    
169 root 1.64 sub _seed {
170     $RUNIQ = alnumbits nonce 96/8;
171     $UNIQ = gen_uniq;
172     $NODE = "anon/$RUNIQ";
173     }
174    
175     _seed;
176    
177 root 1.1 sub NODE() {
178     $NODE
179     }
180    
181     sub node_of($) {
182 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
183 root 1.1
184 root 1.21 $node
185 root 1.1 }
186    
187 root 1.17 BEGIN {
188     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
189     ? sub () { 1 }
190     : sub () { 0 };
191     }
192 root 1.1
193 root 1.42 our $DELAY_TIMER;
194     our @DELAY_QUEUE;
195    
196     sub _delay_run {
197 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
198 root 1.42 }
199    
200     sub delay($) {
201     push @DELAY_QUEUE, shift;
202     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
203     }
204    
205 root 1.1 sub _inject {
206 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
207 root 1.1 &{ $PORT{+shift} or return };
208     }
209    
210 root 1.20 # this function adds a node-ref, so you can send stuff to it
211     # it is basically the central routing component.
212 root 1.1 sub add_node {
213 root 1.21 my ($node) = @_;
214 root 1.1
215 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
216 root 1.13 }
217    
218 root 1.1 sub snd(@) {
219 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
220 root 1.1
221 root 1.48 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
222 root 1.1
223 root 1.49 defined $nodeid #d#UGLY
224     or Carp::croak "'undef' is not a valid node ID/port ID";
225    
226 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
227 root 1.2 ->{send} (["$portid", @_]);
228 root 1.1 }
229    
230 root 1.17 =item $is_local = port_is_local $port
231    
232     Returns true iff the port is a local port.
233    
234     =cut
235    
236     sub port_is_local($) {
237 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
238 root 1.17
239 root 1.21 $NODE{$nodeid} == $NODE{""}
240 root 1.17 }
241    
242 root 1.18 =item snd_to_func $node, $func, @args
243 root 1.11
244 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
245 root 1.11 this function with the given arguments on that node.
246    
247 root 1.20 This function can be used to implement C<spawn>-like interfaces.
248 root 1.11
249     =cut
250    
251 root 1.18 sub snd_to_func($$;@) {
252 root 1.21 my $nodeid = shift;
253 root 1.11
254 root 1.41 # on $NODE, we artificially delay... (for spawn)
255     # this is very ugly - maybe we should simply delay ALL messages,
256     # to avoid deep recursion issues. but that's so... slow...
257 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
258     if $nodeid ne $NODE;
259    
260 root 1.49 defined $nodeid #d#UGLY
261     or Carp::croak "'undef' is not a valid node ID/port ID";
262    
263 root 1.45 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
264 root 1.11 }
265    
266 root 1.18 =item snd_on $node, @msg
267    
268     Executes C<snd> with the given C<@msg> (which must include the destination
269     port) on the given node.
270    
271     =cut
272    
273     sub snd_on($@) {
274     my $node = shift;
275     snd $node, snd => @_;
276     }
277    
278 root 1.29 =item eval_on $node, $string[, @reply]
279 root 1.18
280 root 1.29 Evaluates the given string as Perl expression on the given node. When
281     @reply is specified, then it is used to construct a reply message with
282     C<"$@"> and any results from the eval appended.
283 root 1.18
284     =cut
285    
286 root 1.29 sub eval_on($$;@) {
287 root 1.18 my $node = shift;
288     snd $node, eval => @_;
289     }
290    
291 root 1.1 sub kil(@) {
292 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
293 root 1.1
294     length $portid
295 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
296 root 1.1
297 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
298 root 1.1 ->kill ("$portid", @_);
299     }
300    
301 root 1.7 sub _nodename {
302     require POSIX;
303     (POSIX::uname ())[1]
304     }
305    
306 root 1.21 sub _resolve($) {
307     my ($nodeid) = @_;
308 root 1.1
309     my $cv = AE::cv;
310     my @res;
311    
312     $cv->begin (sub {
313     my %seen;
314     my @refs;
315     for (sort { $a->[0] <=> $b->[0] } @res) {
316     push @refs, $_->[1] unless $seen{$_->[1]}++
317     }
318 root 1.21 shift->send (@refs);
319 root 1.1 });
320    
321     my $idx;
322 root 1.21 for my $t (split /,/, $nodeid) {
323 root 1.1 my $pri = ++$idx;
324 root 1.7
325     $t = length $t ? _nodename . ":$t" : _nodename
326     if $t =~ /^\d*$/;
327 root 1.1
328 root 1.34 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
329 root 1.7 or Carp::croak "$t: unparsable transport descriptor";
330    
331 root 1.33 $port = "0" if $port eq "*";
332    
333     if ($host eq "*") {
334     $cv->begin;
335     # use fork_call, as Net::Interface is big, and we need it rarely.
336     require AnyEvent::Util;
337     AnyEvent::Util::fork_call (
338     sub {
339     my @addr;
340    
341     require Net::Interface;
342    
343     for my $if (Net::Interface->interfaces) {
344     # we statically lower-prioritise ipv6 here, TODO :()
345 root 1.47 for $_ ($if->address (Net::Interface::AF_INET ())) {
346 root 1.33 next if /^\x7f/; # skip localhost etc.
347     push @addr, $_;
348     }
349     for ($if->address (Net::Interface::AF_INET6 ())) {
350     #next if $if->scope ($_) <= 2;
351     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
352     push @addr, $_;
353     }
354    
355     }
356     @addr
357     }, sub {
358     for my $ip (@_) {
359     push @res, [
360     $pri += 1e-5,
361     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
362     ];
363     }
364     $cv->end;
365     }
366     );
367     } else {
368     $cv->begin;
369     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
370     for (@_) {
371     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
372     push @res, [
373     $pri += 1e-5,
374     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
375     ];
376     }
377     $cv->end;
378     };
379     }
380 root 1.1 }
381    
382     $cv->end;
383    
384     $cv
385     }
386    
387 root 1.39 sub configure(@) {
388     unshift @_, "profile" if @_ & 1;
389 root 1.34 my (%kv) = @_;
390    
391 root 1.64 delete $NODE{$NODE}; # we do not support doing stuff before configure
392     _seed;
393    
394 root 1.34 my $profile = delete $kv{profile};
395 root 1.1
396 root 1.21 $profile = _nodename
397     unless defined $profile;
398 root 1.6
399 root 1.32 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
400 root 1.24
401     my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
402 root 1.51
403     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
404    
405 root 1.24 $NODE = $node
406     unless $node eq "anon/";
407 root 1.6
408 root 1.21 $NODE{$NODE} = $NODE{""};
409     $NODE{$NODE}{id} = $NODE;
410 root 1.20
411 root 1.21 my $seeds = $CONFIG->{seeds};
412     my $binds = $CONFIG->{binds};
413 root 1.3
414 root 1.33 $binds ||= ["*"];
415 root 1.1
416 root 1.21 $WARN->(8, "node $NODE starting up.");
417 root 1.1
418 root 1.23 $LISTENER = [];
419     %LISTENER = ();
420    
421 root 1.21 for (map _resolve $_, @$binds) {
422     for my $bind ($_->recv) {
423     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
424     or Carp::croak "$bind: unparsable local bind address";
425 root 1.20
426 root 1.33 my $listener = AnyEvent::MP::Transport::mp_server
427     $host,
428     $port,
429     prepare => sub {
430     my (undef, $host, $port) = @_;
431     $bind = AnyEvent::Socket::format_hostport $host, $port;
432 root 1.53 0
433 root 1.33 },
434     ;
435     $LISTENER{$bind} = $listener;
436 root 1.21 push @$LISTENER, $bind;
437     }
438 root 1.1 }
439    
440 root 1.40 $WARN->(8, "node listens on [@$LISTENER].");
441    
442 root 1.21 # the global service is mandatory currently
443     require AnyEvent::MP::Global;
444 root 1.1
445 root 1.21 # connect to all seednodes
446     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
447 root 1.1
448 root 1.20 for (@{ $CONFIG->{services} }) {
449 root 1.43 if (ref) {
450     my ($func, @args) = @$_;
451     (load_func $func)->(@args);
452     } elsif (s/::$//) {
453 root 1.13 eval "require $_";
454     die $@ if $@;
455     } else {
456     (load_func $_)->();
457     }
458     }
459 root 1.1 }
460    
461     #############################################################################
462 root 1.6 # node monitoring and info
463 root 1.3
464 root 1.21 =item node_is_known $nodeid
465 root 1.13
466 root 1.46 Returns true iff the given node is currently known to the system. The only
467     time a node is known but not up currently is when a conenction request is
468     pending.
469 root 1.13
470     =cut
471    
472     sub node_is_known($) {
473     exists $NODE{$_[0]}
474     }
475    
476 root 1.21 =item node_is_up $nodeid
477 root 1.13
478     Returns true if the given node is "up", that is, the kernel thinks it has
479     a working connection to it.
480    
481     If the node is known but not currently connected, returns C<0>. If the
482     node is not known, returns C<undef>.
483    
484     =cut
485    
486     sub node_is_up($) {
487     ($NODE{$_[0]} or return)->{transport}
488     ? 1 : 0
489     }
490    
491 root 1.3 =item known_nodes
492    
493 root 1.26 Returns the node IDs of all nodes currently known to this node, including
494     itself and nodes not currently connected.
495 root 1.3
496     =cut
497    
498 root 1.49 sub known_nodes() {
499 root 1.26 map $_->{id}, values %NODE
500 root 1.3 }
501    
502     =item up_nodes
503    
504 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
505     the node itself).
506 root 1.3
507     =cut
508    
509 root 1.49 sub up_nodes() {
510 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
511 root 1.3 }
512    
513 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
514 root 1.3
515 root 1.27 Registers a callback that is called each time a node goes up (a connection
516     is established) or down (the connection is lost).
517 root 1.3
518     Node up messages can only be followed by node down messages for the same
519     node, and vice versa.
520    
521 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
522     port. This function is mainly of interest to modules that are concerned
523     about the network topology and low-level connection handling.
524    
525     Callbacks I<must not> block and I<should not> send any messages.
526    
527     The function returns an optional guard which can be used to unregister
528 root 1.3 the monitoring callback again.
529    
530 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
531     or go up (and down).
532    
533     newnode $_, 1 for up_nodes;
534     mon_nodes \&newnode;
535    
536 root 1.3 =cut
537    
538     our %MON_NODES;
539    
540     sub mon_nodes($) {
541     my ($cb) = @_;
542    
543     $MON_NODES{$cb+0} = $cb;
544    
545 root 1.62 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
546 root 1.3 }
547    
548     sub _inject_nodeevent($$;@) {
549 root 1.16 my ($node, $up, @reason) = @_;
550 root 1.3
551     for my $cb (values %MON_NODES) {
552 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
553 root 1.16 or $WARN->(1, $@);
554 root 1.3 }
555 root 1.16
556 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
557 root 1.3 }
558    
559     #############################################################################
560 root 1.1 # self node code
561    
562     our %node_req = (
563     # internal services
564    
565     # monitoring
566 root 1.65 mon0 => sub { # stop monitoring a port for another node
567 root 1.1 my $portid = shift;
568     my $node = $SRCNODE;
569     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
570     },
571 root 1.65 mon1 => sub { # start monitoring a port for another node
572 root 1.1 my $portid = shift;
573     my $node = $SRCNODE;
574 root 1.59 Scalar::Util::weaken $node;
575 root 1.1 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
576 root 1.58 delete $node->{rmon}{$portid};
577 root 1.65 $node->send (["", kil0 => $portid, @_])
578 root 1.59 if $node && $node->{transport};
579 root 1.1 });
580     },
581 root 1.65 # another node has killed a monitored port
582     kil0 => sub {
583 root 1.1 my $cbs = delete $SRCNODE->{lmon}{+shift}
584     or return;
585    
586     $_->(@_) for @$cbs;
587     },
588    
589 root 1.18 # "public" services - not actually public
590 root 1.1
591 root 1.65 # another node wants to kill a local port
592     kil => sub {
593     $NODE{""}->kill (@_);
594     },
595    
596 root 1.1 # relay message to another node / generic echo
597 root 1.15 snd => \&snd,
598 root 1.27 snd_multiple => sub {
599 root 1.1 snd @$_ for @_
600     },
601    
602 root 1.4 # informational
603     info => sub {
604     snd @_, $NODE;
605     },
606     known_nodes => sub {
607     snd @_, known_nodes;
608     },
609     up_nodes => sub {
610     snd @_, up_nodes;
611     },
612    
613 root 1.30 # random utilities
614 root 1.1 eval => sub {
615 root 1.50 my @res = do { package main; eval shift };
616 root 1.1 snd @_, "$@", @res if @_;
617     },
618     time => sub {
619     snd @_, AE::time;
620     },
621     devnull => sub {
622     #
623     },
624 root 1.15 "" => sub {
625 root 1.27 # empty messages are keepalives or similar devnull-applications
626 root 1.15 },
627 root 1.1 );
628    
629 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
630 root 1.1 $PORT{""} = sub {
631     my $tag = shift;
632     eval { &{ $node_req{$tag} ||= load_func $tag } };
633 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
634 root 1.1 };
635    
636     =back
637    
638     =head1 SEE ALSO
639    
640     L<AnyEvent::MP>.
641    
642     =head1 AUTHOR
643    
644     Marc Lehmann <schmorp@schmorp.de>
645     http://home.schmorp.de/
646    
647     =cut
648    
649     1
650