ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.47
Committed: Tue Sep 8 01:38:16 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.46: +3 -5 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.37 our $VERSION = '1.0';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.21 add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.34 NODE $NODE node_of snd kil port_is_local
44     configure
45 root 1.46 up_nodes mon_nodes node_is_up
46 root 1.1 );
47    
48 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49 root 1.1
50 root 1.27 This value is called with an error or warning message, when e.g. a
51     connection could not be created, authorisation failed and so on.
52    
53     It I<must not> block or send messages -queue it and use an idle watcher if
54     you need to do any of these things.
55 root 1.1
56 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
57 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58     node connectivity and services, C<8> for debugging messages and C<9> for
59     tracing messages.
60    
61 root 1.1 The default simply logs the message to STDERR.
62    
63 root 1.44 =item @AnyEvent::MP::Kernel::WARN
64    
65     All code references in this array are called for every log message, from
66     the default C<$WARN> handler. This is an easy way to tie into the log
67     messages without disturbing others.
68    
69 root 1.1 =cut
70    
71 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
72 root 1.44 our @WARN;
73     our $WARN = sub {
74     &$_ for @WARN;
75 root 1.29
76     return if $WARNLEVEL < $_[0];
77    
78 root 1.16 my ($level, $msg) = @_;
79    
80 root 1.1 $msg =~ s/\n$//;
81 root 1.16
82     printf STDERR "%s <%d> %s\n",
83     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
84     $level,
85     $msg;
86 root 1.1 };
87    
88 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
89    
90     The maximum level at which warning messages will be printed to STDERR by
91     the default warn handler.
92    
93     =cut
94    
95 root 1.6 sub load_func($) {
96     my $func = $_[0];
97    
98     unless (defined &$func) {
99     my $pkg = $func;
100     do {
101     $pkg =~ s/::[^:]+$//
102     or return sub { die "unable to resolve '$func'" };
103     eval "require $pkg";
104     } until defined &$func;
105     }
106    
107     \&$func
108     }
109    
110 root 1.1 sub nonce($) {
111     my $nonce;
112    
113     if (open my $fh, "</dev/urandom") {
114     sysread $fh, $nonce, $_[0];
115     } else {
116     # shit...
117     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
118     }
119    
120     $nonce
121     }
122    
123 root 1.21 sub alnumbits($) {
124 root 1.1 my $data = $_[0];
125    
126     if (eval "use Math::GMP 2.05; 1") {
127     $data = Math::GMP::get_str_gmp (
128     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
129     62
130     );
131     } else {
132     $data = MIME::Base64::encode_base64 $data, "";
133     $data =~ s/=//;
134 root 1.31 $data =~ s/x/x0/g;
135     $data =~ s/\//x1/g;
136     $data =~ s/\+/x2/g;
137 root 1.1 }
138    
139     $data
140     }
141    
142     sub gen_uniq {
143 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
144 root 1.1 }
145    
146 root 1.20 our $CONFIG; # this node's configuration
147 root 1.21
148 root 1.36 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
149 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
150 root 1.21 our $NODE = "anon/$RUNIQ";
151 root 1.1 our $ID = "a";
152    
153     our %NODE; # node id to transport mapping, or "undef", for local node
154     our (%PORT, %PORT_DATA); # local ports
155    
156 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
157 root 1.1 our %LMON; # monitored _local_ ports
158    
159     our %LISTENER;
160 root 1.21 our $LISTENER; # our listeners, as arrayref
161 root 1.1
162     our $SRCNODE; # holds the sending node during _inject
163    
164     sub NODE() {
165     $NODE
166     }
167    
168     sub node_of($) {
169 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
170 root 1.1
171 root 1.21 $node
172 root 1.1 }
173    
174 root 1.17 BEGIN {
175     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
176     ? sub () { 1 }
177     : sub () { 0 };
178     }
179 root 1.1
180 root 1.42 our $DELAY_TIMER;
181     our @DELAY_QUEUE;
182    
183     sub _delay_run {
184     (shift @DELAY_QUEUE or return)->() while 1;
185     }
186    
187     sub delay($) {
188     push @DELAY_QUEUE, shift;
189     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
190     }
191    
192 root 1.1 sub _inject {
193 root 1.25 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
194 root 1.1 &{ $PORT{+shift} or return };
195     }
196    
197 root 1.20 # this function adds a node-ref, so you can send stuff to it
198     # it is basically the central routing component.
199 root 1.1 sub add_node {
200 root 1.21 my ($node) = @_;
201 root 1.1
202 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
203 root 1.13 }
204    
205 root 1.1 sub snd(@) {
206 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
207 root 1.1
208 root 1.25 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
209 root 1.1
210 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
211 root 1.2 ->{send} (["$portid", @_]);
212 root 1.1 }
213    
214 root 1.17 =item $is_local = port_is_local $port
215    
216     Returns true iff the port is a local port.
217    
218     =cut
219    
220     sub port_is_local($) {
221 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
222 root 1.17
223 root 1.21 $NODE{$nodeid} == $NODE{""}
224 root 1.17 }
225    
226 root 1.18 =item snd_to_func $node, $func, @args
227 root 1.11
228 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
229 root 1.11 this function with the given arguments on that node.
230    
231 root 1.20 This function can be used to implement C<spawn>-like interfaces.
232 root 1.11
233     =cut
234    
235 root 1.18 sub snd_to_func($$;@) {
236 root 1.21 my $nodeid = shift;
237 root 1.11
238 root 1.41 # on $NODE, we artificially delay... (for spawn)
239     # this is very ugly - maybe we should simply delay ALL messages,
240     # to avoid deep recursion issues. but that's so... slow...
241 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
242     if $nodeid ne $NODE;
243    
244     ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
245 root 1.11 }
246    
247 root 1.18 =item snd_on $node, @msg
248    
249     Executes C<snd> with the given C<@msg> (which must include the destination
250     port) on the given node.
251    
252     =cut
253    
254     sub snd_on($@) {
255     my $node = shift;
256     snd $node, snd => @_;
257     }
258    
259 root 1.29 =item eval_on $node, $string[, @reply]
260 root 1.18
261 root 1.29 Evaluates the given string as Perl expression on the given node. When
262     @reply is specified, then it is used to construct a reply message with
263     C<"$@"> and any results from the eval appended.
264 root 1.18
265     =cut
266    
267 root 1.29 sub eval_on($$;@) {
268 root 1.18 my $node = shift;
269     snd $node, eval => @_;
270     }
271    
272 root 1.1 sub kil(@) {
273 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
274 root 1.1
275     length $portid
276 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
277 root 1.1
278 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
279 root 1.1 ->kill ("$portid", @_);
280     }
281    
282 root 1.7 sub _nodename {
283     require POSIX;
284     (POSIX::uname ())[1]
285     }
286    
287 root 1.21 sub _resolve($) {
288     my ($nodeid) = @_;
289 root 1.1
290     my $cv = AE::cv;
291     my @res;
292    
293     $cv->begin (sub {
294     my %seen;
295     my @refs;
296     for (sort { $a->[0] <=> $b->[0] } @res) {
297     push @refs, $_->[1] unless $seen{$_->[1]}++
298     }
299 root 1.21 shift->send (@refs);
300 root 1.1 });
301    
302     my $idx;
303 root 1.21 for my $t (split /,/, $nodeid) {
304 root 1.1 my $pri = ++$idx;
305 root 1.7
306     $t = length $t ? _nodename . ":$t" : _nodename
307     if $t =~ /^\d*$/;
308 root 1.1
309 root 1.34 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
310 root 1.7 or Carp::croak "$t: unparsable transport descriptor";
311    
312 root 1.33 $port = "0" if $port eq "*";
313    
314     if ($host eq "*") {
315     $cv->begin;
316     # use fork_call, as Net::Interface is big, and we need it rarely.
317     require AnyEvent::Util;
318     AnyEvent::Util::fork_call (
319     sub {
320     my @addr;
321    
322     require Net::Interface;
323    
324     for my $if (Net::Interface->interfaces) {
325     # we statically lower-prioritise ipv6 here, TODO :()
326 root 1.47 for $_ ($if->address (Net::Interface::AF_INET ())) {
327 root 1.33 next if /^\x7f/; # skip localhost etc.
328     push @addr, $_;
329     }
330     for ($if->address (Net::Interface::AF_INET6 ())) {
331     #next if $if->scope ($_) <= 2;
332     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
333     push @addr, $_;
334     }
335    
336     }
337     @addr
338     }, sub {
339     for my $ip (@_) {
340     push @res, [
341     $pri += 1e-5,
342     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
343     ];
344     }
345     $cv->end;
346     }
347     );
348     } else {
349     $cv->begin;
350     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
351     for (@_) {
352     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
353     push @res, [
354     $pri += 1e-5,
355     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
356     ];
357     }
358     $cv->end;
359     };
360     }
361 root 1.1 }
362    
363     $cv->end;
364    
365     $cv
366     }
367    
368 root 1.39 sub configure(@) {
369     unshift @_, "profile" if @_ & 1;
370 root 1.34 my (%kv) = @_;
371    
372     my $profile = delete $kv{profile};
373 root 1.1
374 root 1.21 $profile = _nodename
375     unless defined $profile;
376 root 1.6
377 root 1.32 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
378 root 1.24
379 root 1.47 delete $NODE{$NODE}; # we do not support doing stuff before configure
380    
381 root 1.24 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
382     $NODE = $node
383     unless $node eq "anon/";
384 root 1.6
385 root 1.21 $NODE{$NODE} = $NODE{""};
386     $NODE{$NODE}{id} = $NODE;
387 root 1.20
388 root 1.21 my $seeds = $CONFIG->{seeds};
389     my $binds = $CONFIG->{binds};
390 root 1.3
391 root 1.33 $binds ||= ["*"];
392 root 1.1
393 root 1.21 $WARN->(8, "node $NODE starting up.");
394 root 1.1
395 root 1.23 $LISTENER = [];
396     %LISTENER = ();
397    
398 root 1.21 for (map _resolve $_, @$binds) {
399     for my $bind ($_->recv) {
400     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
401     or Carp::croak "$bind: unparsable local bind address";
402 root 1.20
403 root 1.33 my $listener = AnyEvent::MP::Transport::mp_server
404     $host,
405     $port,
406     prepare => sub {
407     my (undef, $host, $port) = @_;
408     $bind = AnyEvent::Socket::format_hostport $host, $port;
409     },
410     ;
411     $LISTENER{$bind} = $listener;
412 root 1.21 push @$LISTENER, $bind;
413     }
414 root 1.1 }
415    
416 root 1.40 $WARN->(8, "node listens on [@$LISTENER].");
417    
418 root 1.21 # the global service is mandatory currently
419     require AnyEvent::MP::Global;
420 root 1.1
421 root 1.21 # connect to all seednodes
422     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
423 root 1.1
424 root 1.20 for (@{ $CONFIG->{services} }) {
425 root 1.43 if (ref) {
426     my ($func, @args) = @$_;
427     (load_func $func)->(@args);
428     } elsif (s/::$//) {
429 root 1.13 eval "require $_";
430     die $@ if $@;
431     } else {
432     (load_func $_)->();
433     }
434     }
435 root 1.1 }
436    
437     #############################################################################
438 root 1.6 # node monitoring and info
439 root 1.3
440 root 1.21 =item node_is_known $nodeid
441 root 1.13
442 root 1.46 Returns true iff the given node is currently known to the system. The only
443     time a node is known but not up currently is when a conenction request is
444     pending.
445 root 1.13
446     =cut
447    
448     sub node_is_known($) {
449     exists $NODE{$_[0]}
450     }
451    
452 root 1.21 =item node_is_up $nodeid
453 root 1.13
454     Returns true if the given node is "up", that is, the kernel thinks it has
455     a working connection to it.
456    
457     If the node is known but not currently connected, returns C<0>. If the
458     node is not known, returns C<undef>.
459    
460     =cut
461    
462     sub node_is_up($) {
463     ($NODE{$_[0]} or return)->{transport}
464     ? 1 : 0
465     }
466    
467 root 1.3 =item known_nodes
468    
469 root 1.26 Returns the node IDs of all nodes currently known to this node, including
470     itself and nodes not currently connected.
471 root 1.3
472     =cut
473    
474     sub known_nodes {
475 root 1.26 map $_->{id}, values %NODE
476 root 1.3 }
477    
478     =item up_nodes
479    
480 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
481     the node itself).
482 root 1.3
483     =cut
484    
485     sub up_nodes {
486 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
487 root 1.3 }
488    
489 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
490 root 1.3
491 root 1.27 Registers a callback that is called each time a node goes up (a connection
492     is established) or down (the connection is lost).
493 root 1.3
494     Node up messages can only be followed by node down messages for the same
495     node, and vice versa.
496    
497 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
498     port. This function is mainly of interest to modules that are concerned
499     about the network topology and low-level connection handling.
500    
501     Callbacks I<must not> block and I<should not> send any messages.
502    
503     The function returns an optional guard which can be used to unregister
504 root 1.3 the monitoring callback again.
505    
506 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
507     or go up (and down).
508    
509     newnode $_, 1 for up_nodes;
510     mon_nodes \&newnode;
511    
512 root 1.3 =cut
513    
514     our %MON_NODES;
515    
516     sub mon_nodes($) {
517     my ($cb) = @_;
518    
519     $MON_NODES{$cb+0} = $cb;
520    
521     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
522     }
523    
524     sub _inject_nodeevent($$;@) {
525 root 1.16 my ($node, $up, @reason) = @_;
526 root 1.3
527     for my $cb (values %MON_NODES) {
528 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
529 root 1.16 or $WARN->(1, $@);
530 root 1.3 }
531 root 1.16
532 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
533 root 1.3 }
534    
535     #############################################################################
536 root 1.1 # self node code
537    
538     our %node_req = (
539     # internal services
540    
541     # monitoring
542 root 1.27 mon0 => sub { # stop monitoring a port
543 root 1.1 my $portid = shift;
544     my $node = $SRCNODE;
545     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
546     },
547 root 1.27 mon1 => sub { # start monitoring a port
548 root 1.1 my $portid = shift;
549     my $node = $SRCNODE;
550 root 1.27 Scalar::Util::weaken $node; #TODO# ugly
551 root 1.1 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
552 root 1.27 $node->send (["", kil => $portid, @_])
553 root 1.39 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
554 root 1.1 });
555     },
556     kil => sub {
557     my $cbs = delete $SRCNODE->{lmon}{+shift}
558     or return;
559    
560     $_->(@_) for @$cbs;
561     },
562    
563 root 1.18 # "public" services - not actually public
564 root 1.1
565     # relay message to another node / generic echo
566 root 1.15 snd => \&snd,
567 root 1.27 snd_multiple => sub {
568 root 1.1 snd @$_ for @_
569     },
570    
571 root 1.4 # informational
572     info => sub {
573     snd @_, $NODE;
574     },
575     known_nodes => sub {
576     snd @_, known_nodes;
577     },
578     up_nodes => sub {
579     snd @_, up_nodes;
580     },
581    
582 root 1.30 # random utilities
583 root 1.1 eval => sub {
584     my @res = eval shift;
585     snd @_, "$@", @res if @_;
586     },
587     time => sub {
588     snd @_, AE::time;
589     },
590     devnull => sub {
591     #
592     },
593 root 1.15 "" => sub {
594 root 1.27 # empty messages are keepalives or similar devnull-applications
595 root 1.15 },
596 root 1.1 );
597    
598 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
599 root 1.1 $PORT{""} = sub {
600     my $tag = shift;
601     eval { &{ $node_req{$tag} ||= load_func $tag } };
602 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
603 root 1.1 };
604    
605     =back
606    
607     =head1 SEE ALSO
608    
609     L<AnyEvent::MP>.
610    
611     =head1 AUTHOR
612    
613     Marc Lehmann <schmorp@schmorp.de>
614     http://home.schmorp.de/
615    
616     =cut
617    
618     1
619