ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.56
Committed: Thu Oct 1 21:00:02 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
CVS Tags: rel-1_2
Changes since 1.55: +1 -1 lines
Log Message:
1.2

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.56 our $VERSION = '1.2';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.21 add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.34 NODE $NODE node_of snd kil port_is_local
44     configure
45 root 1.46 up_nodes mon_nodes node_is_up
46 root 1.1 );
47    
48 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49 root 1.1
50 root 1.27 This value is called with an error or warning message, when e.g. a
51     connection could not be created, authorisation failed and so on.
52    
53     It I<must not> block or send messages -queue it and use an idle watcher if
54     you need to do any of these things.
55 root 1.1
56 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
57 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58     node connectivity and services, C<8> for debugging messages and C<9> for
59     tracing messages.
60    
61 root 1.1 The default simply logs the message to STDERR.
62    
63 root 1.44 =item @AnyEvent::MP::Kernel::WARN
64    
65     All code references in this array are called for every log message, from
66     the default C<$WARN> handler. This is an easy way to tie into the log
67     messages without disturbing others.
68    
69 root 1.1 =cut
70    
71 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
72 root 1.44 our @WARN;
73     our $WARN = sub {
74     &$_ for @WARN;
75 root 1.29
76     return if $WARNLEVEL < $_[0];
77    
78 root 1.16 my ($level, $msg) = @_;
79    
80 root 1.1 $msg =~ s/\n$//;
81 root 1.16
82     printf STDERR "%s <%d> %s\n",
83     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
84     $level,
85     $msg;
86 root 1.1 };
87    
88 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
89    
90     The maximum level at which warning messages will be printed to STDERR by
91     the default warn handler.
92    
93     =cut
94    
95 root 1.6 sub load_func($) {
96     my $func = $_[0];
97    
98     unless (defined &$func) {
99     my $pkg = $func;
100     do {
101     $pkg =~ s/::[^:]+$//
102     or return sub { die "unable to resolve '$func'" };
103     eval "require $pkg";
104     } until defined &$func;
105     }
106    
107     \&$func
108     }
109    
110 root 1.1 sub nonce($) {
111     my $nonce;
112    
113     if (open my $fh, "</dev/urandom") {
114     sysread $fh, $nonce, $_[0];
115     } else {
116     # shit...
117     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
118     }
119    
120     $nonce
121     }
122    
123 root 1.21 sub alnumbits($) {
124 root 1.1 my $data = $_[0];
125    
126     if (eval "use Math::GMP 2.05; 1") {
127     $data = Math::GMP::get_str_gmp (
128     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
129     62
130     );
131     } else {
132     $data = MIME::Base64::encode_base64 $data, "";
133     $data =~ s/=//;
134 root 1.31 $data =~ s/x/x0/g;
135     $data =~ s/\//x1/g;
136     $data =~ s/\+/x2/g;
137 root 1.1 }
138    
139     $data
140     }
141    
142     sub gen_uniq {
143 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
144 root 1.1 }
145    
146 root 1.20 our $CONFIG; # this node's configuration
147 root 1.21
148 root 1.36 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
149 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
150 root 1.21 our $NODE = "anon/$RUNIQ";
151 root 1.1 our $ID = "a";
152    
153     our %NODE; # node id to transport mapping, or "undef", for local node
154     our (%PORT, %PORT_DATA); # local ports
155    
156 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
157 root 1.1 our %LMON; # monitored _local_ ports
158    
159     our %LISTENER;
160 root 1.21 our $LISTENER; # our listeners, as arrayref
161 root 1.1
162     our $SRCNODE; # holds the sending node during _inject
163    
164     sub NODE() {
165     $NODE
166     }
167    
168     sub node_of($) {
169 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
170 root 1.1
171 root 1.21 $node
172 root 1.1 }
173    
174 root 1.17 BEGIN {
175     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
176     ? sub () { 1 }
177     : sub () { 0 };
178     }
179 root 1.1
180 root 1.42 our $DELAY_TIMER;
181     our @DELAY_QUEUE;
182    
183     sub _delay_run {
184 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
185 root 1.42 }
186    
187     sub delay($) {
188     push @DELAY_QUEUE, shift;
189     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
190     }
191    
192 root 1.1 sub _inject {
193 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
194 root 1.1 &{ $PORT{+shift} or return };
195     }
196    
197 root 1.20 # this function adds a node-ref, so you can send stuff to it
198     # it is basically the central routing component.
199 root 1.1 sub add_node {
200 root 1.21 my ($node) = @_;
201 root 1.1
202 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
203 root 1.13 }
204    
205 root 1.1 sub snd(@) {
206 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
207 root 1.1
208 root 1.48 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
209 root 1.1
210 root 1.49 defined $nodeid #d#UGLY
211     or Carp::croak "'undef' is not a valid node ID/port ID";
212    
213 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
214 root 1.2 ->{send} (["$portid", @_]);
215 root 1.1 }
216    
217 root 1.17 =item $is_local = port_is_local $port
218    
219     Returns true iff the port is a local port.
220    
221     =cut
222    
223     sub port_is_local($) {
224 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
225 root 1.17
226 root 1.21 $NODE{$nodeid} == $NODE{""}
227 root 1.17 }
228    
229 root 1.18 =item snd_to_func $node, $func, @args
230 root 1.11
231 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
232 root 1.11 this function with the given arguments on that node.
233    
234 root 1.20 This function can be used to implement C<spawn>-like interfaces.
235 root 1.11
236     =cut
237    
238 root 1.18 sub snd_to_func($$;@) {
239 root 1.21 my $nodeid = shift;
240 root 1.11
241 root 1.41 # on $NODE, we artificially delay... (for spawn)
242     # this is very ugly - maybe we should simply delay ALL messages,
243     # to avoid deep recursion issues. but that's so... slow...
244 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
245     if $nodeid ne $NODE;
246    
247 root 1.49 defined $nodeid #d#UGLY
248     or Carp::croak "'undef' is not a valid node ID/port ID";
249    
250 root 1.45 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
251 root 1.11 }
252    
253 root 1.18 =item snd_on $node, @msg
254    
255     Executes C<snd> with the given C<@msg> (which must include the destination
256     port) on the given node.
257    
258     =cut
259    
260     sub snd_on($@) {
261     my $node = shift;
262     snd $node, snd => @_;
263     }
264    
265 root 1.29 =item eval_on $node, $string[, @reply]
266 root 1.18
267 root 1.29 Evaluates the given string as Perl expression on the given node. When
268     @reply is specified, then it is used to construct a reply message with
269     C<"$@"> and any results from the eval appended.
270 root 1.18
271     =cut
272    
273 root 1.29 sub eval_on($$;@) {
274 root 1.18 my $node = shift;
275     snd $node, eval => @_;
276     }
277    
278 root 1.1 sub kil(@) {
279 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
280 root 1.1
281     length $portid
282 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
283 root 1.1
284 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
285 root 1.1 ->kill ("$portid", @_);
286     }
287    
288 root 1.7 sub _nodename {
289     require POSIX;
290     (POSIX::uname ())[1]
291     }
292    
293 root 1.21 sub _resolve($) {
294     my ($nodeid) = @_;
295 root 1.1
296     my $cv = AE::cv;
297     my @res;
298    
299     $cv->begin (sub {
300     my %seen;
301     my @refs;
302     for (sort { $a->[0] <=> $b->[0] } @res) {
303     push @refs, $_->[1] unless $seen{$_->[1]}++
304     }
305 root 1.21 shift->send (@refs);
306 root 1.1 });
307    
308     my $idx;
309 root 1.21 for my $t (split /,/, $nodeid) {
310 root 1.1 my $pri = ++$idx;
311 root 1.7
312     $t = length $t ? _nodename . ":$t" : _nodename
313     if $t =~ /^\d*$/;
314 root 1.1
315 root 1.34 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
316 root 1.7 or Carp::croak "$t: unparsable transport descriptor";
317    
318 root 1.33 $port = "0" if $port eq "*";
319    
320     if ($host eq "*") {
321     $cv->begin;
322     # use fork_call, as Net::Interface is big, and we need it rarely.
323     require AnyEvent::Util;
324     AnyEvent::Util::fork_call (
325     sub {
326     my @addr;
327    
328     require Net::Interface;
329    
330     for my $if (Net::Interface->interfaces) {
331     # we statically lower-prioritise ipv6 here, TODO :()
332 root 1.47 for $_ ($if->address (Net::Interface::AF_INET ())) {
333 root 1.33 next if /^\x7f/; # skip localhost etc.
334     push @addr, $_;
335     }
336     for ($if->address (Net::Interface::AF_INET6 ())) {
337     #next if $if->scope ($_) <= 2;
338     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
339     push @addr, $_;
340     }
341    
342     }
343     @addr
344     }, sub {
345     for my $ip (@_) {
346     push @res, [
347     $pri += 1e-5,
348     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
349     ];
350     }
351     $cv->end;
352     }
353     );
354     } else {
355     $cv->begin;
356     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
357     for (@_) {
358     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
359     push @res, [
360     $pri += 1e-5,
361     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
362     ];
363     }
364     $cv->end;
365     };
366     }
367 root 1.1 }
368    
369     $cv->end;
370    
371     $cv
372     }
373    
374 root 1.39 sub configure(@) {
375     unshift @_, "profile" if @_ & 1;
376 root 1.34 my (%kv) = @_;
377    
378     my $profile = delete $kv{profile};
379 root 1.1
380 root 1.21 $profile = _nodename
381     unless defined $profile;
382 root 1.6
383 root 1.32 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
384 root 1.24
385 root 1.47 delete $NODE{$NODE}; # we do not support doing stuff before configure
386    
387 root 1.24 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
388 root 1.51
389     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
390    
391 root 1.24 $NODE = $node
392     unless $node eq "anon/";
393 root 1.6
394 root 1.21 $NODE{$NODE} = $NODE{""};
395     $NODE{$NODE}{id} = $NODE;
396 root 1.20
397 root 1.21 my $seeds = $CONFIG->{seeds};
398     my $binds = $CONFIG->{binds};
399 root 1.3
400 root 1.33 $binds ||= ["*"];
401 root 1.1
402 root 1.21 $WARN->(8, "node $NODE starting up.");
403 root 1.1
404 root 1.23 $LISTENER = [];
405     %LISTENER = ();
406    
407 root 1.21 for (map _resolve $_, @$binds) {
408     for my $bind ($_->recv) {
409     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
410     or Carp::croak "$bind: unparsable local bind address";
411 root 1.20
412 root 1.33 my $listener = AnyEvent::MP::Transport::mp_server
413     $host,
414     $port,
415     prepare => sub {
416     my (undef, $host, $port) = @_;
417     $bind = AnyEvent::Socket::format_hostport $host, $port;
418 root 1.53 0
419 root 1.33 },
420     ;
421     $LISTENER{$bind} = $listener;
422 root 1.21 push @$LISTENER, $bind;
423     }
424 root 1.1 }
425    
426 root 1.40 $WARN->(8, "node listens on [@$LISTENER].");
427    
428 root 1.21 # the global service is mandatory currently
429     require AnyEvent::MP::Global;
430 root 1.1
431 root 1.21 # connect to all seednodes
432     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
433 root 1.1
434 root 1.20 for (@{ $CONFIG->{services} }) {
435 root 1.43 if (ref) {
436     my ($func, @args) = @$_;
437     (load_func $func)->(@args);
438     } elsif (s/::$//) {
439 root 1.13 eval "require $_";
440     die $@ if $@;
441     } else {
442     (load_func $_)->();
443     }
444     }
445 root 1.1 }
446    
447     #############################################################################
448 root 1.6 # node monitoring and info
449 root 1.3
450 root 1.21 =item node_is_known $nodeid
451 root 1.13
452 root 1.46 Returns true iff the given node is currently known to the system. The only
453     time a node is known but not up currently is when a conenction request is
454     pending.
455 root 1.13
456     =cut
457    
458     sub node_is_known($) {
459     exists $NODE{$_[0]}
460     }
461    
462 root 1.21 =item node_is_up $nodeid
463 root 1.13
464     Returns true if the given node is "up", that is, the kernel thinks it has
465     a working connection to it.
466    
467     If the node is known but not currently connected, returns C<0>. If the
468     node is not known, returns C<undef>.
469    
470     =cut
471    
472     sub node_is_up($) {
473     ($NODE{$_[0]} or return)->{transport}
474     ? 1 : 0
475     }
476    
477 root 1.3 =item known_nodes
478    
479 root 1.26 Returns the node IDs of all nodes currently known to this node, including
480     itself and nodes not currently connected.
481 root 1.3
482     =cut
483    
484 root 1.49 sub known_nodes() {
485 root 1.26 map $_->{id}, values %NODE
486 root 1.3 }
487    
488     =item up_nodes
489    
490 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
491     the node itself).
492 root 1.3
493     =cut
494    
495 root 1.49 sub up_nodes() {
496 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
497 root 1.3 }
498    
499 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
500 root 1.3
501 root 1.27 Registers a callback that is called each time a node goes up (a connection
502     is established) or down (the connection is lost).
503 root 1.3
504     Node up messages can only be followed by node down messages for the same
505     node, and vice versa.
506    
507 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
508     port. This function is mainly of interest to modules that are concerned
509     about the network topology and low-level connection handling.
510    
511     Callbacks I<must not> block and I<should not> send any messages.
512    
513     The function returns an optional guard which can be used to unregister
514 root 1.3 the monitoring callback again.
515    
516 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
517     or go up (and down).
518    
519     newnode $_, 1 for up_nodes;
520     mon_nodes \&newnode;
521    
522 root 1.3 =cut
523    
524     our %MON_NODES;
525    
526     sub mon_nodes($) {
527     my ($cb) = @_;
528    
529     $MON_NODES{$cb+0} = $cb;
530    
531     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
532     }
533    
534     sub _inject_nodeevent($$;@) {
535 root 1.16 my ($node, $up, @reason) = @_;
536 root 1.3
537     for my $cb (values %MON_NODES) {
538 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
539 root 1.16 or $WARN->(1, $@);
540 root 1.3 }
541 root 1.16
542 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
543 root 1.3 }
544    
545     #############################################################################
546 root 1.1 # self node code
547    
548     our %node_req = (
549     # internal services
550    
551     # monitoring
552 root 1.27 mon0 => sub { # stop monitoring a port
553 root 1.1 my $portid = shift;
554     my $node = $SRCNODE;
555     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
556     },
557 root 1.27 mon1 => sub { # start monitoring a port
558 root 1.1 my $portid = shift;
559     my $node = $SRCNODE;
560 root 1.27 Scalar::Util::weaken $node; #TODO# ugly
561 root 1.1 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
562 root 1.27 $node->send (["", kil => $portid, @_])
563 root 1.39 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
564 root 1.1 });
565     },
566     kil => sub {
567     my $cbs = delete $SRCNODE->{lmon}{+shift}
568     or return;
569    
570     $_->(@_) for @$cbs;
571     },
572    
573 root 1.18 # "public" services - not actually public
574 root 1.1
575     # relay message to another node / generic echo
576 root 1.15 snd => \&snd,
577 root 1.27 snd_multiple => sub {
578 root 1.1 snd @$_ for @_
579     },
580    
581 root 1.4 # informational
582     info => sub {
583     snd @_, $NODE;
584     },
585     known_nodes => sub {
586     snd @_, known_nodes;
587     },
588     up_nodes => sub {
589     snd @_, up_nodes;
590     },
591    
592 root 1.30 # random utilities
593 root 1.1 eval => sub {
594 root 1.50 my @res = do { package main; eval shift };
595 root 1.1 snd @_, "$@", @res if @_;
596     },
597     time => sub {
598     snd @_, AE::time;
599     },
600     devnull => sub {
601     #
602     },
603 root 1.15 "" => sub {
604 root 1.27 # empty messages are keepalives or similar devnull-applications
605 root 1.15 },
606 root 1.1 );
607    
608 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
609 root 1.1 $PORT{""} = sub {
610     my $tag = shift;
611     eval { &{ $node_req{$tag} ||= load_func $tag } };
612 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
613 root 1.1 };
614    
615     =back
616    
617     =head1 SEE ALSO
618    
619     L<AnyEvent::MP>.
620    
621     =head1 AUTHOR
622    
623     Marc Lehmann <schmorp@schmorp.de>
624     http://home.schmorp.de/
625    
626     =cut
627    
628     1
629