ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.46
Committed: Mon Sep 7 20:00:38 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.45: +10 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.37 our $VERSION = '1.0';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.21 add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.34 NODE $NODE node_of snd kil port_is_local
44     configure
45 root 1.46 up_nodes mon_nodes node_is_up
46 root 1.1 );
47    
48 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49 root 1.1
50 root 1.27 This value is called with an error or warning message, when e.g. a
51     connection could not be created, authorisation failed and so on.
52    
53     It I<must not> block or send messages -queue it and use an idle watcher if
54     you need to do any of these things.
55 root 1.1
56 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
57 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58     node connectivity and services, C<8> for debugging messages and C<9> for
59     tracing messages.
60    
61 root 1.1 The default simply logs the message to STDERR.
62    
63 root 1.44 =item @AnyEvent::MP::Kernel::WARN
64    
65     All code references in this array are called for every log message, from
66     the default C<$WARN> handler. This is an easy way to tie into the log
67     messages without disturbing others.
68    
69 root 1.1 =cut
70    
71 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
72 root 1.44 our @WARN;
73     our $WARN = sub {
74     &$_ for @WARN;
75 root 1.29
76     return if $WARNLEVEL < $_[0];
77    
78 root 1.16 my ($level, $msg) = @_;
79    
80 root 1.1 $msg =~ s/\n$//;
81 root 1.16
82     printf STDERR "%s <%d> %s\n",
83     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
84     $level,
85     $msg;
86 root 1.1 };
87    
88 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
89    
90     The maximum level at which warning messages will be printed to STDERR by
91     the default warn handler.
92    
93     =cut
94    
95 root 1.6 sub load_func($) {
96     my $func = $_[0];
97    
98     unless (defined &$func) {
99     my $pkg = $func;
100     do {
101     $pkg =~ s/::[^:]+$//
102     or return sub { die "unable to resolve '$func'" };
103     eval "require $pkg";
104     } until defined &$func;
105     }
106    
107     \&$func
108     }
109    
110 root 1.1 sub nonce($) {
111     my $nonce;
112    
113     if (open my $fh, "</dev/urandom") {
114     sysread $fh, $nonce, $_[0];
115     } else {
116     # shit...
117     our $nonce_init;
118 root 1.31 unless ($nonce_init++) {
119 root 1.1 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
120     }
121     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
122     }
123    
124     $nonce
125     }
126    
127 root 1.21 sub alnumbits($) {
128 root 1.1 my $data = $_[0];
129    
130     if (eval "use Math::GMP 2.05; 1") {
131     $data = Math::GMP::get_str_gmp (
132     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
133     62
134     );
135     } else {
136     $data = MIME::Base64::encode_base64 $data, "";
137     $data =~ s/=//;
138 root 1.31 $data =~ s/x/x0/g;
139     $data =~ s/\//x1/g;
140     $data =~ s/\+/x2/g;
141 root 1.1 }
142    
143     $data
144     }
145    
146     sub gen_uniq {
147 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
148 root 1.1 }
149    
150 root 1.20 our $CONFIG; # this node's configuration
151 root 1.21
152 root 1.36 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
153 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
154 root 1.21 our $NODE = "anon/$RUNIQ";
155 root 1.1 our $ID = "a";
156    
157     our %NODE; # node id to transport mapping, or "undef", for local node
158     our (%PORT, %PORT_DATA); # local ports
159    
160 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
161 root 1.1 our %LMON; # monitored _local_ ports
162    
163     our %LISTENER;
164 root 1.21 our $LISTENER; # our listeners, as arrayref
165 root 1.1
166     our $SRCNODE; # holds the sending node during _inject
167    
168     sub NODE() {
169     $NODE
170     }
171    
172     sub node_of($) {
173 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
174 root 1.1
175 root 1.21 $node
176 root 1.1 }
177    
178 root 1.17 BEGIN {
179     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
180     ? sub () { 1 }
181     : sub () { 0 };
182     }
183 root 1.1
184 root 1.42 our $DELAY_TIMER;
185     our @DELAY_QUEUE;
186    
187     sub _delay_run {
188     (shift @DELAY_QUEUE or return)->() while 1;
189     }
190    
191     sub delay($) {
192     push @DELAY_QUEUE, shift;
193     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
194     }
195    
196 root 1.1 sub _inject {
197 root 1.25 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
198 root 1.1 &{ $PORT{+shift} or return };
199     }
200    
201 root 1.20 # this function adds a node-ref, so you can send stuff to it
202     # it is basically the central routing component.
203 root 1.1 sub add_node {
204 root 1.21 my ($node) = @_;
205 root 1.1
206 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
207 root 1.13 }
208    
209 root 1.1 sub snd(@) {
210 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
211 root 1.1
212 root 1.25 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
213 root 1.1
214 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
215 root 1.2 ->{send} (["$portid", @_]);
216 root 1.1 }
217    
218 root 1.17 =item $is_local = port_is_local $port
219    
220     Returns true iff the port is a local port.
221    
222     =cut
223    
224     sub port_is_local($) {
225 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
226 root 1.17
227 root 1.21 $NODE{$nodeid} == $NODE{""}
228 root 1.17 }
229    
230 root 1.18 =item snd_to_func $node, $func, @args
231 root 1.11
232 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
233 root 1.11 this function with the given arguments on that node.
234    
235 root 1.20 This function can be used to implement C<spawn>-like interfaces.
236 root 1.11
237     =cut
238    
239 root 1.18 sub snd_to_func($$;@) {
240 root 1.21 my $nodeid = shift;
241 root 1.11
242 root 1.41 # on $NODE, we artificially delay... (for spawn)
243     # this is very ugly - maybe we should simply delay ALL messages,
244     # to avoid deep recursion issues. but that's so... slow...
245 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
246     if $nodeid ne $NODE;
247    
248     ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
249 root 1.11 }
250    
251 root 1.18 =item snd_on $node, @msg
252    
253     Executes C<snd> with the given C<@msg> (which must include the destination
254     port) on the given node.
255    
256     =cut
257    
258     sub snd_on($@) {
259     my $node = shift;
260     snd $node, snd => @_;
261     }
262    
263 root 1.29 =item eval_on $node, $string[, @reply]
264 root 1.18
265 root 1.29 Evaluates the given string as Perl expression on the given node. When
266     @reply is specified, then it is used to construct a reply message with
267     C<"$@"> and any results from the eval appended.
268 root 1.18
269     =cut
270    
271 root 1.29 sub eval_on($$;@) {
272 root 1.18 my $node = shift;
273     snd $node, eval => @_;
274     }
275    
276 root 1.1 sub kil(@) {
277 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
278 root 1.1
279     length $portid
280 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
281 root 1.1
282 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
283 root 1.1 ->kill ("$portid", @_);
284     }
285    
286 root 1.7 sub _nodename {
287     require POSIX;
288     (POSIX::uname ())[1]
289     }
290    
291 root 1.21 sub _resolve($) {
292     my ($nodeid) = @_;
293 root 1.1
294     my $cv = AE::cv;
295     my @res;
296    
297     $cv->begin (sub {
298     my %seen;
299     my @refs;
300     for (sort { $a->[0] <=> $b->[0] } @res) {
301     push @refs, $_->[1] unless $seen{$_->[1]}++
302     }
303 root 1.21 shift->send (@refs);
304 root 1.1 });
305    
306     my $idx;
307 root 1.21 for my $t (split /,/, $nodeid) {
308 root 1.1 my $pri = ++$idx;
309 root 1.7
310     $t = length $t ? _nodename . ":$t" : _nodename
311     if $t =~ /^\d*$/;
312 root 1.1
313 root 1.34 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
314 root 1.7 or Carp::croak "$t: unparsable transport descriptor";
315    
316 root 1.33 $port = "0" if $port eq "*";
317    
318     if ($host eq "*") {
319     $cv->begin;
320     # use fork_call, as Net::Interface is big, and we need it rarely.
321     require AnyEvent::Util;
322     AnyEvent::Util::fork_call (
323     sub {
324     my @addr;
325    
326     require Net::Interface;
327    
328     for my $if (Net::Interface->interfaces) {
329     # we statically lower-prioritise ipv6 here, TODO :()
330     for my $_ ($if->address (Net::Interface::AF_INET ())) {
331     next if /^\x7f/; # skip localhost etc.
332     push @addr, $_;
333     }
334     for ($if->address (Net::Interface::AF_INET6 ())) {
335     #next if $if->scope ($_) <= 2;
336     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
337     push @addr, $_;
338     }
339    
340     }
341     @addr
342     }, sub {
343     for my $ip (@_) {
344     push @res, [
345     $pri += 1e-5,
346     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
347     ];
348     }
349     $cv->end;
350     }
351     );
352     } else {
353     $cv->begin;
354     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
355     for (@_) {
356     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
357     push @res, [
358     $pri += 1e-5,
359     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
360     ];
361     }
362     $cv->end;
363     };
364     }
365 root 1.1 }
366    
367     $cv->end;
368    
369     $cv
370     }
371    
372 root 1.39 sub configure(@) {
373     unshift @_, "profile" if @_ & 1;
374 root 1.34 my (%kv) = @_;
375    
376     my $profile = delete $kv{profile};
377 root 1.1
378 root 1.21 $profile = _nodename
379     unless defined $profile;
380 root 1.6
381 root 1.32 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
382 root 1.24
383     my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
384     $NODE = $node
385     unless $node eq "anon/";
386 root 1.6
387 root 1.21 $NODE{$NODE} = $NODE{""};
388     $NODE{$NODE}{id} = $NODE;
389 root 1.20
390 root 1.21 my $seeds = $CONFIG->{seeds};
391     my $binds = $CONFIG->{binds};
392 root 1.3
393 root 1.33 $binds ||= ["*"];
394 root 1.1
395 root 1.21 $WARN->(8, "node $NODE starting up.");
396 root 1.1
397 root 1.23 $LISTENER = [];
398     %LISTENER = ();
399    
400 root 1.21 for (map _resolve $_, @$binds) {
401     for my $bind ($_->recv) {
402     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
403     or Carp::croak "$bind: unparsable local bind address";
404 root 1.20
405 root 1.33 my $listener = AnyEvent::MP::Transport::mp_server
406     $host,
407     $port,
408     prepare => sub {
409     my (undef, $host, $port) = @_;
410     $bind = AnyEvent::Socket::format_hostport $host, $port;
411     },
412     ;
413     $LISTENER{$bind} = $listener;
414 root 1.21 push @$LISTENER, $bind;
415     }
416 root 1.1 }
417    
418 root 1.40 $WARN->(8, "node listens on [@$LISTENER].");
419    
420 root 1.21 # the global service is mandatory currently
421     require AnyEvent::MP::Global;
422 root 1.1
423 root 1.21 # connect to all seednodes
424     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
425 root 1.1
426 root 1.20 for (@{ $CONFIG->{services} }) {
427 root 1.43 if (ref) {
428     my ($func, @args) = @$_;
429     (load_func $func)->(@args);
430     } elsif (s/::$//) {
431 root 1.13 eval "require $_";
432     die $@ if $@;
433     } else {
434     (load_func $_)->();
435     }
436     }
437 root 1.1 }
438    
439     #############################################################################
440 root 1.6 # node monitoring and info
441 root 1.3
442 root 1.21 =item node_is_known $nodeid
443 root 1.13
444 root 1.46 Returns true iff the given node is currently known to the system. The only
445     time a node is known but not up currently is when a conenction request is
446     pending.
447 root 1.13
448     =cut
449    
450     sub node_is_known($) {
451     exists $NODE{$_[0]}
452     }
453    
454 root 1.21 =item node_is_up $nodeid
455 root 1.13
456     Returns true if the given node is "up", that is, the kernel thinks it has
457     a working connection to it.
458    
459     If the node is known but not currently connected, returns C<0>. If the
460     node is not known, returns C<undef>.
461    
462     =cut
463    
464     sub node_is_up($) {
465     ($NODE{$_[0]} or return)->{transport}
466     ? 1 : 0
467     }
468    
469 root 1.3 =item known_nodes
470    
471 root 1.26 Returns the node IDs of all nodes currently known to this node, including
472     itself and nodes not currently connected.
473 root 1.3
474     =cut
475    
476     sub known_nodes {
477 root 1.26 map $_->{id}, values %NODE
478 root 1.3 }
479    
480     =item up_nodes
481    
482 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
483     the node itself).
484 root 1.3
485     =cut
486    
487     sub up_nodes {
488 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
489 root 1.3 }
490    
491 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
492 root 1.3
493 root 1.27 Registers a callback that is called each time a node goes up (a connection
494     is established) or down (the connection is lost).
495 root 1.3
496     Node up messages can only be followed by node down messages for the same
497     node, and vice versa.
498    
499 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
500     port. This function is mainly of interest to modules that are concerned
501     about the network topology and low-level connection handling.
502    
503     Callbacks I<must not> block and I<should not> send any messages.
504    
505     The function returns an optional guard which can be used to unregister
506 root 1.3 the monitoring callback again.
507    
508 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
509     or go up (and down).
510    
511     newnode $_, 1 for up_nodes;
512     mon_nodes \&newnode;
513    
514 root 1.3 =cut
515    
516     our %MON_NODES;
517    
518     sub mon_nodes($) {
519     my ($cb) = @_;
520    
521     $MON_NODES{$cb+0} = $cb;
522    
523     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
524     }
525    
526     sub _inject_nodeevent($$;@) {
527 root 1.16 my ($node, $up, @reason) = @_;
528 root 1.3
529     for my $cb (values %MON_NODES) {
530 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
531 root 1.16 or $WARN->(1, $@);
532 root 1.3 }
533 root 1.16
534 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
535 root 1.3 }
536    
537     #############################################################################
538 root 1.1 # self node code
539    
540     our %node_req = (
541     # internal services
542    
543     # monitoring
544 root 1.27 mon0 => sub { # stop monitoring a port
545 root 1.1 my $portid = shift;
546     my $node = $SRCNODE;
547     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
548     },
549 root 1.27 mon1 => sub { # start monitoring a port
550 root 1.1 my $portid = shift;
551     my $node = $SRCNODE;
552 root 1.27 Scalar::Util::weaken $node; #TODO# ugly
553 root 1.1 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
554 root 1.27 $node->send (["", kil => $portid, @_])
555 root 1.39 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
556 root 1.1 });
557     },
558     kil => sub {
559     my $cbs = delete $SRCNODE->{lmon}{+shift}
560     or return;
561    
562     $_->(@_) for @$cbs;
563     },
564    
565 root 1.18 # "public" services - not actually public
566 root 1.1
567     # relay message to another node / generic echo
568 root 1.15 snd => \&snd,
569 root 1.27 snd_multiple => sub {
570 root 1.1 snd @$_ for @_
571     },
572    
573 root 1.4 # informational
574     info => sub {
575     snd @_, $NODE;
576     },
577     known_nodes => sub {
578     snd @_, known_nodes;
579     },
580     up_nodes => sub {
581     snd @_, up_nodes;
582     },
583    
584 root 1.30 # random utilities
585 root 1.1 eval => sub {
586     my @res = eval shift;
587     snd @_, "$@", @res if @_;
588     },
589     time => sub {
590     snd @_, AE::time;
591     },
592     devnull => sub {
593     #
594     },
595 root 1.15 "" => sub {
596 root 1.27 # empty messages are keepalives or similar devnull-applications
597 root 1.15 },
598 root 1.1 );
599    
600 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
601 root 1.1 $PORT{""} = sub {
602     my $tag = shift;
603     eval { &{ $node_req{$tag} ||= load_func $tag } };
604 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
605 root 1.1 };
606    
607     =back
608    
609     =head1 SEE ALSO
610    
611     L<AnyEvent::MP>.
612    
613     =head1 AUTHOR
614    
615     Marc Lehmann <schmorp@schmorp.de>
616     http://home.schmorp.de/
617    
618     =cut
619    
620     1
621