ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.45
Committed: Mon Sep 7 12:04:32 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.44: +4 -5 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.37 our $VERSION = '1.0';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.21 add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.34 NODE $NODE node_of snd kil port_is_local
44     configure
45 root 1.13 known_nodes up_nodes mon_nodes node_is_known node_is_up
46 root 1.1 );
47    
48 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49 root 1.1
50 root 1.27 This value is called with an error or warning message, when e.g. a
51     connection could not be created, authorisation failed and so on.
52    
53     It I<must not> block or send messages -queue it and use an idle watcher if
54     you need to do any of these things.
55 root 1.1
56 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
57 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58     node connectivity and services, C<8> for debugging messages and C<9> for
59     tracing messages.
60    
61 root 1.1 The default simply logs the message to STDERR.
62    
63 root 1.44 =item @AnyEvent::MP::Kernel::WARN
64    
65     All code references in this array are called for every log message, from
66     the default C<$WARN> handler. This is an easy way to tie into the log
67     messages without disturbing others.
68    
69 root 1.1 =cut
70    
71 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
72 root 1.44 our @WARN;
73     our $WARN = sub {
74     &$_ for @WARN;
75 root 1.29
76     return if $WARNLEVEL < $_[0];
77    
78 root 1.16 my ($level, $msg) = @_;
79    
80 root 1.1 $msg =~ s/\n$//;
81 root 1.16
82     printf STDERR "%s <%d> %s\n",
83     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
84     $level,
85     $msg;
86 root 1.1 };
87    
88 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
89    
90     The maximum level at which warning messages will be printed to STDERR by
91     the default warn handler.
92    
93     =cut
94    
95 root 1.6 sub load_func($) {
96     my $func = $_[0];
97    
98     unless (defined &$func) {
99     my $pkg = $func;
100     do {
101     $pkg =~ s/::[^:]+$//
102     or return sub { die "unable to resolve '$func'" };
103     eval "require $pkg";
104     } until defined &$func;
105     }
106    
107     \&$func
108     }
109    
110 root 1.1 sub nonce($) {
111     my $nonce;
112    
113     if (open my $fh, "</dev/urandom") {
114     sysread $fh, $nonce, $_[0];
115     } else {
116     # shit...
117     our $nonce_init;
118 root 1.31 unless ($nonce_init++) {
119 root 1.1 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
120     }
121     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
122     }
123    
124     $nonce
125     }
126    
127 root 1.21 sub alnumbits($) {
128 root 1.1 my $data = $_[0];
129    
130     if (eval "use Math::GMP 2.05; 1") {
131     $data = Math::GMP::get_str_gmp (
132     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
133     62
134     );
135     } else {
136     $data = MIME::Base64::encode_base64 $data, "";
137     $data =~ s/=//;
138 root 1.31 $data =~ s/x/x0/g;
139     $data =~ s/\//x1/g;
140     $data =~ s/\+/x2/g;
141 root 1.1 }
142    
143     $data
144     }
145    
146     sub gen_uniq {
147 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
148 root 1.1 }
149    
150 root 1.20 our $CONFIG; # this node's configuration
151 root 1.21
152 root 1.36 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
153 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
154 root 1.21 our $NODE = "anon/$RUNIQ";
155 root 1.1 our $ID = "a";
156    
157     our %NODE; # node id to transport mapping, or "undef", for local node
158     our (%PORT, %PORT_DATA); # local ports
159    
160 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
161 root 1.1 our %LMON; # monitored _local_ ports
162    
163     our %LISTENER;
164 root 1.21 our $LISTENER; # our listeners, as arrayref
165 root 1.1
166     our $SRCNODE; # holds the sending node during _inject
167    
168     sub NODE() {
169     $NODE
170     }
171    
172     sub node_of($) {
173 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
174 root 1.1
175 root 1.21 $node
176 root 1.1 }
177    
178 root 1.17 BEGIN {
179     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
180     ? sub () { 1 }
181     : sub () { 0 };
182     }
183 root 1.1
184 root 1.42 our $DELAY_TIMER;
185     our @DELAY_QUEUE;
186    
187     sub _delay_run {
188     (shift @DELAY_QUEUE or return)->() while 1;
189     }
190    
191     sub delay($) {
192     push @DELAY_QUEUE, shift;
193     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
194     }
195    
196 root 1.1 sub _inject {
197 root 1.25 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
198 root 1.1 &{ $PORT{+shift} or return };
199     }
200    
201 root 1.20 # this function adds a node-ref, so you can send stuff to it
202     # it is basically the central routing component.
203 root 1.1 sub add_node {
204 root 1.21 my ($node) = @_;
205 root 1.1
206 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
207 root 1.13 }
208    
209 root 1.1 sub snd(@) {
210 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
211 root 1.1
212 root 1.25 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
213 root 1.1
214 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
215 root 1.2 ->{send} (["$portid", @_]);
216 root 1.1 }
217    
218 root 1.17 =item $is_local = port_is_local $port
219    
220     Returns true iff the port is a local port.
221    
222     =cut
223    
224     sub port_is_local($) {
225 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
226 root 1.17
227 root 1.21 $NODE{$nodeid} == $NODE{""}
228 root 1.17 }
229    
230 root 1.18 =item snd_to_func $node, $func, @args
231 root 1.11
232 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
233 root 1.11 this function with the given arguments on that node.
234    
235 root 1.20 This function can be used to implement C<spawn>-like interfaces.
236 root 1.11
237     =cut
238    
239 root 1.18 sub snd_to_func($$;@) {
240 root 1.21 my $nodeid = shift;
241 root 1.11
242 root 1.41 # on $NODE, we artificially delay... (for spawn)
243     # this is very ugly - maybe we should simply delay ALL messages,
244     # to avoid deep recursion issues. but that's so... slow...
245 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
246     if $nodeid ne $NODE;
247    
248     ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
249 root 1.11 }
250    
251 root 1.18 =item snd_on $node, @msg
252    
253     Executes C<snd> with the given C<@msg> (which must include the destination
254     port) on the given node.
255    
256     =cut
257    
258     sub snd_on($@) {
259     my $node = shift;
260     snd $node, snd => @_;
261     }
262    
263 root 1.29 =item eval_on $node, $string[, @reply]
264 root 1.18
265 root 1.29 Evaluates the given string as Perl expression on the given node. When
266     @reply is specified, then it is used to construct a reply message with
267     C<"$@"> and any results from the eval appended.
268 root 1.18
269     =cut
270    
271 root 1.29 sub eval_on($$;@) {
272 root 1.18 my $node = shift;
273     snd $node, eval => @_;
274     }
275    
276 root 1.1 sub kil(@) {
277 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
278 root 1.1
279     length $portid
280 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
281 root 1.1
282 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
283 root 1.1 ->kill ("$portid", @_);
284     }
285    
286 root 1.7 sub _nodename {
287     require POSIX;
288     (POSIX::uname ())[1]
289     }
290    
291 root 1.21 sub _resolve($) {
292     my ($nodeid) = @_;
293 root 1.1
294     my $cv = AE::cv;
295     my @res;
296    
297     $cv->begin (sub {
298     my %seen;
299     my @refs;
300     for (sort { $a->[0] <=> $b->[0] } @res) {
301     push @refs, $_->[1] unless $seen{$_->[1]}++
302     }
303 root 1.21 shift->send (@refs);
304 root 1.1 });
305    
306     my $idx;
307 root 1.21 for my $t (split /,/, $nodeid) {
308 root 1.1 my $pri = ++$idx;
309 root 1.7
310     $t = length $t ? _nodename . ":$t" : _nodename
311     if $t =~ /^\d*$/;
312 root 1.1
313 root 1.34 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
314 root 1.7 or Carp::croak "$t: unparsable transport descriptor";
315    
316 root 1.33 $port = "0" if $port eq "*";
317    
318     if ($host eq "*") {
319     $cv->begin;
320     # use fork_call, as Net::Interface is big, and we need it rarely.
321     require AnyEvent::Util;
322     AnyEvent::Util::fork_call (
323     sub {
324     my @addr;
325    
326     require Net::Interface;
327    
328     for my $if (Net::Interface->interfaces) {
329     # we statically lower-prioritise ipv6 here, TODO :()
330     for my $_ ($if->address (Net::Interface::AF_INET ())) {
331     next if /^\x7f/; # skip localhost etc.
332     push @addr, $_;
333     }
334     for ($if->address (Net::Interface::AF_INET6 ())) {
335     #next if $if->scope ($_) <= 2;
336     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
337     push @addr, $_;
338     }
339    
340     }
341     @addr
342     }, sub {
343     for my $ip (@_) {
344     push @res, [
345     $pri += 1e-5,
346     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
347     ];
348     }
349     $cv->end;
350     }
351     );
352     } else {
353     $cv->begin;
354     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
355     for (@_) {
356     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
357     push @res, [
358     $pri += 1e-5,
359     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
360     ];
361     }
362     $cv->end;
363     };
364     }
365 root 1.1 }
366    
367     $cv->end;
368    
369     $cv
370     }
371    
372 root 1.39 sub configure(@) {
373     unshift @_, "profile" if @_ & 1;
374 root 1.34 my (%kv) = @_;
375    
376     my $profile = delete $kv{profile};
377 root 1.1
378 root 1.21 $profile = _nodename
379     unless defined $profile;
380 root 1.6
381 root 1.32 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
382 root 1.24
383     my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
384     $NODE = $node
385     unless $node eq "anon/";
386 root 1.6
387 root 1.21 $NODE{$NODE} = $NODE{""};
388     $NODE{$NODE}{id} = $NODE;
389 root 1.20
390 root 1.21 my $seeds = $CONFIG->{seeds};
391     my $binds = $CONFIG->{binds};
392 root 1.3
393 root 1.33 $binds ||= ["*"];
394 root 1.1
395 root 1.21 $WARN->(8, "node $NODE starting up.");
396 root 1.1
397 root 1.23 $LISTENER = [];
398     %LISTENER = ();
399    
400 root 1.21 for (map _resolve $_, @$binds) {
401     for my $bind ($_->recv) {
402     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
403     or Carp::croak "$bind: unparsable local bind address";
404 root 1.20
405 root 1.33 my $listener = AnyEvent::MP::Transport::mp_server
406     $host,
407     $port,
408     prepare => sub {
409     my (undef, $host, $port) = @_;
410     $bind = AnyEvent::Socket::format_hostport $host, $port;
411     },
412     ;
413     $LISTENER{$bind} = $listener;
414 root 1.21 push @$LISTENER, $bind;
415     }
416 root 1.1 }
417    
418 root 1.40 $WARN->(8, "node listens on [@$LISTENER].");
419    
420 root 1.21 # the global service is mandatory currently
421     require AnyEvent::MP::Global;
422 root 1.1
423 root 1.21 # connect to all seednodes
424     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
425 root 1.1
426 root 1.20 for (@{ $CONFIG->{services} }) {
427 root 1.43 if (ref) {
428     my ($func, @args) = @$_;
429     (load_func $func)->(@args);
430     } elsif (s/::$//) {
431 root 1.13 eval "require $_";
432     die $@ if $@;
433     } else {
434     (load_func $_)->();
435     }
436     }
437 root 1.1 }
438    
439     #############################################################################
440 root 1.6 # node monitoring and info
441 root 1.3
442 root 1.21 =item node_is_known $nodeid
443 root 1.13
444     Returns true iff the given node is currently known to the system.
445    
446     =cut
447    
448     sub node_is_known($) {
449     exists $NODE{$_[0]}
450     }
451    
452 root 1.21 =item node_is_up $nodeid
453 root 1.13
454     Returns true if the given node is "up", that is, the kernel thinks it has
455     a working connection to it.
456    
457     If the node is known but not currently connected, returns C<0>. If the
458     node is not known, returns C<undef>.
459    
460     =cut
461    
462     sub node_is_up($) {
463     ($NODE{$_[0]} or return)->{transport}
464     ? 1 : 0
465     }
466    
467 root 1.3 =item known_nodes
468    
469 root 1.26 Returns the node IDs of all nodes currently known to this node, including
470     itself and nodes not currently connected.
471 root 1.3
472     =cut
473    
474     sub known_nodes {
475 root 1.26 map $_->{id}, values %NODE
476 root 1.3 }
477    
478     =item up_nodes
479    
480 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
481     the node itself).
482 root 1.3
483     =cut
484    
485     sub up_nodes {
486 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
487 root 1.3 }
488    
489 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
490 root 1.3
491 root 1.27 Registers a callback that is called each time a node goes up (a connection
492     is established) or down (the connection is lost).
493 root 1.3
494     Node up messages can only be followed by node down messages for the same
495     node, and vice versa.
496    
497 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
498     port. This function is mainly of interest to modules that are concerned
499     about the network topology and low-level connection handling.
500    
501     Callbacks I<must not> block and I<should not> send any messages.
502    
503     The function returns an optional guard which can be used to unregister
504 root 1.3 the monitoring callback again.
505    
506     =cut
507    
508     our %MON_NODES;
509    
510     sub mon_nodes($) {
511     my ($cb) = @_;
512    
513     $MON_NODES{$cb+0} = $cb;
514    
515     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
516     }
517    
518     sub _inject_nodeevent($$;@) {
519 root 1.16 my ($node, $up, @reason) = @_;
520 root 1.3
521     for my $cb (values %MON_NODES) {
522 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
523 root 1.16 or $WARN->(1, $@);
524 root 1.3 }
525 root 1.16
526 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
527 root 1.3 }
528    
529     #############################################################################
530 root 1.1 # self node code
531    
532     our %node_req = (
533     # internal services
534    
535     # monitoring
536 root 1.27 mon0 => sub { # stop monitoring a port
537 root 1.1 my $portid = shift;
538     my $node = $SRCNODE;
539     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
540     },
541 root 1.27 mon1 => sub { # start monitoring a port
542 root 1.1 my $portid = shift;
543     my $node = $SRCNODE;
544 root 1.27 Scalar::Util::weaken $node; #TODO# ugly
545 root 1.1 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
546 root 1.27 $node->send (["", kil => $portid, @_])
547 root 1.39 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
548 root 1.1 });
549     },
550     kil => sub {
551     my $cbs = delete $SRCNODE->{lmon}{+shift}
552     or return;
553    
554     $_->(@_) for @$cbs;
555     },
556    
557 root 1.18 # "public" services - not actually public
558 root 1.1
559     # relay message to another node / generic echo
560 root 1.15 snd => \&snd,
561 root 1.27 snd_multiple => sub {
562 root 1.1 snd @$_ for @_
563     },
564    
565 root 1.4 # informational
566     info => sub {
567     snd @_, $NODE;
568     },
569     known_nodes => sub {
570     snd @_, known_nodes;
571     },
572     up_nodes => sub {
573     snd @_, up_nodes;
574     },
575    
576 root 1.30 # random utilities
577 root 1.1 eval => sub {
578     my @res = eval shift;
579     snd @_, "$@", @res if @_;
580     },
581     time => sub {
582     snd @_, AE::time;
583     },
584     devnull => sub {
585     #
586     },
587 root 1.15 "" => sub {
588 root 1.27 # empty messages are keepalives or similar devnull-applications
589 root 1.15 },
590 root 1.1 );
591    
592 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
593 root 1.1 $PORT{""} = sub {
594     my $tag = shift;
595     eval { &{ $node_req{$tag} ||= load_func $tag } };
596 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
597 root 1.1 };
598    
599     =back
600    
601     =head1 SEE ALSO
602    
603     L<AnyEvent::MP>.
604    
605     =head1 AUTHOR
606    
607     Marc Lehmann <schmorp@schmorp.de>
608     http://home.schmorp.de/
609    
610     =cut
611    
612     1
613