ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.50
Committed: Wed Sep 9 18:54:28 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.49: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.37 our $VERSION = '1.0';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.21 add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.34 NODE $NODE node_of snd kil port_is_local
44     configure
45 root 1.46 up_nodes mon_nodes node_is_up
46 root 1.1 );
47    
48 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49 root 1.1
50 root 1.27 This value is called with an error or warning message, when e.g. a
51     connection could not be created, authorisation failed and so on.
52    
53     It I<must not> block or send messages -queue it and use an idle watcher if
54     you need to do any of these things.
55 root 1.1
56 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
57 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58     node connectivity and services, C<8> for debugging messages and C<9> for
59     tracing messages.
60    
61 root 1.1 The default simply logs the message to STDERR.
62    
63 root 1.44 =item @AnyEvent::MP::Kernel::WARN
64    
65     All code references in this array are called for every log message, from
66     the default C<$WARN> handler. This is an easy way to tie into the log
67     messages without disturbing others.
68    
69 root 1.1 =cut
70    
71 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
72 root 1.44 our @WARN;
73     our $WARN = sub {
74     &$_ for @WARN;
75 root 1.29
76     return if $WARNLEVEL < $_[0];
77    
78 root 1.16 my ($level, $msg) = @_;
79    
80 root 1.1 $msg =~ s/\n$//;
81 root 1.16
82     printf STDERR "%s <%d> %s\n",
83     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
84     $level,
85     $msg;
86 root 1.1 };
87    
88 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
89    
90     The maximum level at which warning messages will be printed to STDERR by
91     the default warn handler.
92    
93     =cut
94    
95 root 1.6 sub load_func($) {
96     my $func = $_[0];
97    
98     unless (defined &$func) {
99     my $pkg = $func;
100     do {
101     $pkg =~ s/::[^:]+$//
102     or return sub { die "unable to resolve '$func'" };
103     eval "require $pkg";
104     } until defined &$func;
105     }
106    
107     \&$func
108     }
109    
110 root 1.1 sub nonce($) {
111     my $nonce;
112    
113     if (open my $fh, "</dev/urandom") {
114     sysread $fh, $nonce, $_[0];
115     } else {
116     # shit...
117     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
118     }
119    
120     $nonce
121     }
122    
123 root 1.21 sub alnumbits($) {
124 root 1.1 my $data = $_[0];
125    
126     if (eval "use Math::GMP 2.05; 1") {
127     $data = Math::GMP::get_str_gmp (
128     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
129     62
130     );
131     } else {
132     $data = MIME::Base64::encode_base64 $data, "";
133     $data =~ s/=//;
134 root 1.31 $data =~ s/x/x0/g;
135     $data =~ s/\//x1/g;
136     $data =~ s/\+/x2/g;
137 root 1.1 }
138    
139     $data
140     }
141    
142     sub gen_uniq {
143 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
144 root 1.1 }
145    
146 root 1.20 our $CONFIG; # this node's configuration
147 root 1.21
148 root 1.36 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
149 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
150 root 1.21 our $NODE = "anon/$RUNIQ";
151 root 1.1 our $ID = "a";
152    
153     our %NODE; # node id to transport mapping, or "undef", for local node
154     our (%PORT, %PORT_DATA); # local ports
155    
156 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
157 root 1.1 our %LMON; # monitored _local_ ports
158    
159     our %LISTENER;
160 root 1.21 our $LISTENER; # our listeners, as arrayref
161 root 1.1
162     our $SRCNODE; # holds the sending node during _inject
163    
164     sub NODE() {
165     $NODE
166     }
167    
168     sub node_of($) {
169 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
170 root 1.1
171 root 1.21 $node
172 root 1.1 }
173    
174 root 1.17 BEGIN {
175     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
176     ? sub () { 1 }
177     : sub () { 0 };
178     }
179 root 1.1
180 root 1.42 our $DELAY_TIMER;
181     our @DELAY_QUEUE;
182    
183     sub _delay_run {
184     (shift @DELAY_QUEUE or return)->() while 1;
185     }
186    
187     sub delay($) {
188     push @DELAY_QUEUE, shift;
189     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
190     }
191    
192 root 1.1 sub _inject {
193 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
194 root 1.1 &{ $PORT{+shift} or return };
195     }
196    
197 root 1.20 # this function adds a node-ref, so you can send stuff to it
198     # it is basically the central routing component.
199 root 1.1 sub add_node {
200 root 1.21 my ($node) = @_;
201 root 1.1
202 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
203 root 1.13 }
204    
205 root 1.1 sub snd(@) {
206 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
207 root 1.1
208 root 1.48 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
209 root 1.1
210 root 1.49 defined $nodeid #d#UGLY
211     or Carp::croak "'undef' is not a valid node ID/port ID";
212    
213 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
214 root 1.2 ->{send} (["$portid", @_]);
215 root 1.1 }
216    
217 root 1.17 =item $is_local = port_is_local $port
218    
219     Returns true iff the port is a local port.
220    
221     =cut
222    
223     sub port_is_local($) {
224 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
225 root 1.17
226 root 1.21 $NODE{$nodeid} == $NODE{""}
227 root 1.17 }
228    
229 root 1.18 =item snd_to_func $node, $func, @args
230 root 1.11
231 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
232 root 1.11 this function with the given arguments on that node.
233    
234 root 1.20 This function can be used to implement C<spawn>-like interfaces.
235 root 1.11
236     =cut
237    
238 root 1.18 sub snd_to_func($$;@) {
239 root 1.21 my $nodeid = shift;
240 root 1.11
241 root 1.41 # on $NODE, we artificially delay... (for spawn)
242     # this is very ugly - maybe we should simply delay ALL messages,
243     # to avoid deep recursion issues. but that's so... slow...
244 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
245     if $nodeid ne $NODE;
246    
247 root 1.49 defined $nodeid #d#UGLY
248     or Carp::croak "'undef' is not a valid node ID/port ID";
249    
250 root 1.45 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
251 root 1.11 }
252    
253 root 1.18 =item snd_on $node, @msg
254    
255     Executes C<snd> with the given C<@msg> (which must include the destination
256     port) on the given node.
257    
258     =cut
259    
260     sub snd_on($@) {
261     my $node = shift;
262     snd $node, snd => @_;
263     }
264    
265 root 1.29 =item eval_on $node, $string[, @reply]
266 root 1.18
267 root 1.29 Evaluates the given string as Perl expression on the given node. When
268     @reply is specified, then it is used to construct a reply message with
269     C<"$@"> and any results from the eval appended.
270 root 1.18
271     =cut
272    
273 root 1.29 sub eval_on($$;@) {
274 root 1.18 my $node = shift;
275     snd $node, eval => @_;
276     }
277    
278 root 1.1 sub kil(@) {
279 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
280 root 1.1
281     length $portid
282 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
283 root 1.1
284 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
285 root 1.1 ->kill ("$portid", @_);
286     }
287    
288 root 1.7 sub _nodename {
289     require POSIX;
290     (POSIX::uname ())[1]
291     }
292    
293 root 1.21 sub _resolve($) {
294     my ($nodeid) = @_;
295 root 1.1
296     my $cv = AE::cv;
297     my @res;
298    
299     $cv->begin (sub {
300     my %seen;
301     my @refs;
302     for (sort { $a->[0] <=> $b->[0] } @res) {
303     push @refs, $_->[1] unless $seen{$_->[1]}++
304     }
305 root 1.21 shift->send (@refs);
306 root 1.1 });
307    
308     my $idx;
309 root 1.21 for my $t (split /,/, $nodeid) {
310 root 1.1 my $pri = ++$idx;
311 root 1.7
312     $t = length $t ? _nodename . ":$t" : _nodename
313     if $t =~ /^\d*$/;
314 root 1.1
315 root 1.34 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
316 root 1.7 or Carp::croak "$t: unparsable transport descriptor";
317    
318 root 1.33 $port = "0" if $port eq "*";
319    
320     if ($host eq "*") {
321     $cv->begin;
322     # use fork_call, as Net::Interface is big, and we need it rarely.
323     require AnyEvent::Util;
324     AnyEvent::Util::fork_call (
325     sub {
326     my @addr;
327    
328     require Net::Interface;
329    
330     for my $if (Net::Interface->interfaces) {
331     # we statically lower-prioritise ipv6 here, TODO :()
332 root 1.47 for $_ ($if->address (Net::Interface::AF_INET ())) {
333 root 1.33 next if /^\x7f/; # skip localhost etc.
334     push @addr, $_;
335     }
336     for ($if->address (Net::Interface::AF_INET6 ())) {
337     #next if $if->scope ($_) <= 2;
338     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
339     push @addr, $_;
340     }
341    
342     }
343     @addr
344     }, sub {
345     for my $ip (@_) {
346     push @res, [
347     $pri += 1e-5,
348     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
349     ];
350     }
351     $cv->end;
352     }
353     );
354     } else {
355     $cv->begin;
356     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
357     for (@_) {
358     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
359     push @res, [
360     $pri += 1e-5,
361     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
362     ];
363     }
364     $cv->end;
365     };
366     }
367 root 1.1 }
368    
369     $cv->end;
370    
371     $cv
372     }
373    
374 root 1.39 sub configure(@) {
375     unshift @_, "profile" if @_ & 1;
376 root 1.34 my (%kv) = @_;
377    
378     my $profile = delete $kv{profile};
379 root 1.1
380 root 1.21 $profile = _nodename
381     unless defined $profile;
382 root 1.6
383 root 1.32 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
384 root 1.24
385 root 1.47 delete $NODE{$NODE}; # we do not support doing stuff before configure
386    
387 root 1.24 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
388     $NODE = $node
389     unless $node eq "anon/";
390 root 1.6
391 root 1.21 $NODE{$NODE} = $NODE{""};
392     $NODE{$NODE}{id} = $NODE;
393 root 1.20
394 root 1.21 my $seeds = $CONFIG->{seeds};
395     my $binds = $CONFIG->{binds};
396 root 1.3
397 root 1.33 $binds ||= ["*"];
398 root 1.1
399 root 1.21 $WARN->(8, "node $NODE starting up.");
400 root 1.1
401 root 1.23 $LISTENER = [];
402     %LISTENER = ();
403    
404 root 1.21 for (map _resolve $_, @$binds) {
405     for my $bind ($_->recv) {
406     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
407     or Carp::croak "$bind: unparsable local bind address";
408 root 1.20
409 root 1.33 my $listener = AnyEvent::MP::Transport::mp_server
410     $host,
411     $port,
412     prepare => sub {
413     my (undef, $host, $port) = @_;
414     $bind = AnyEvent::Socket::format_hostport $host, $port;
415     },
416     ;
417     $LISTENER{$bind} = $listener;
418 root 1.21 push @$LISTENER, $bind;
419     }
420 root 1.1 }
421    
422 root 1.40 $WARN->(8, "node listens on [@$LISTENER].");
423    
424 root 1.21 # the global service is mandatory currently
425     require AnyEvent::MP::Global;
426 root 1.1
427 root 1.21 # connect to all seednodes
428     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
429 root 1.1
430 root 1.20 for (@{ $CONFIG->{services} }) {
431 root 1.43 if (ref) {
432     my ($func, @args) = @$_;
433     (load_func $func)->(@args);
434     } elsif (s/::$//) {
435 root 1.13 eval "require $_";
436     die $@ if $@;
437     } else {
438     (load_func $_)->();
439     }
440     }
441 root 1.1 }
442    
443     #############################################################################
444 root 1.6 # node monitoring and info
445 root 1.3
446 root 1.21 =item node_is_known $nodeid
447 root 1.13
448 root 1.46 Returns true iff the given node is currently known to the system. The only
449     time a node is known but not up currently is when a conenction request is
450     pending.
451 root 1.13
452     =cut
453    
454     sub node_is_known($) {
455     exists $NODE{$_[0]}
456     }
457    
458 root 1.21 =item node_is_up $nodeid
459 root 1.13
460     Returns true if the given node is "up", that is, the kernel thinks it has
461     a working connection to it.
462    
463     If the node is known but not currently connected, returns C<0>. If the
464     node is not known, returns C<undef>.
465    
466     =cut
467    
468     sub node_is_up($) {
469     ($NODE{$_[0]} or return)->{transport}
470     ? 1 : 0
471     }
472    
473 root 1.3 =item known_nodes
474    
475 root 1.26 Returns the node IDs of all nodes currently known to this node, including
476     itself and nodes not currently connected.
477 root 1.3
478     =cut
479    
480 root 1.49 sub known_nodes() {
481 root 1.26 map $_->{id}, values %NODE
482 root 1.3 }
483    
484     =item up_nodes
485    
486 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
487     the node itself).
488 root 1.3
489     =cut
490    
491 root 1.49 sub up_nodes() {
492 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
493 root 1.3 }
494    
495 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
496 root 1.3
497 root 1.27 Registers a callback that is called each time a node goes up (a connection
498     is established) or down (the connection is lost).
499 root 1.3
500     Node up messages can only be followed by node down messages for the same
501     node, and vice versa.
502    
503 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
504     port. This function is mainly of interest to modules that are concerned
505     about the network topology and low-level connection handling.
506    
507     Callbacks I<must not> block and I<should not> send any messages.
508    
509     The function returns an optional guard which can be used to unregister
510 root 1.3 the monitoring callback again.
511    
512 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
513     or go up (and down).
514    
515     newnode $_, 1 for up_nodes;
516     mon_nodes \&newnode;
517    
518 root 1.3 =cut
519    
520     our %MON_NODES;
521    
522     sub mon_nodes($) {
523     my ($cb) = @_;
524    
525     $MON_NODES{$cb+0} = $cb;
526    
527     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
528     }
529    
530     sub _inject_nodeevent($$;@) {
531 root 1.16 my ($node, $up, @reason) = @_;
532 root 1.3
533     for my $cb (values %MON_NODES) {
534 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
535 root 1.16 or $WARN->(1, $@);
536 root 1.3 }
537 root 1.16
538 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
539 root 1.3 }
540    
541     #############################################################################
542 root 1.1 # self node code
543    
544     our %node_req = (
545     # internal services
546    
547     # monitoring
548 root 1.27 mon0 => sub { # stop monitoring a port
549 root 1.1 my $portid = shift;
550     my $node = $SRCNODE;
551     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
552     },
553 root 1.27 mon1 => sub { # start monitoring a port
554 root 1.1 my $portid = shift;
555     my $node = $SRCNODE;
556 root 1.27 Scalar::Util::weaken $node; #TODO# ugly
557 root 1.1 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
558 root 1.27 $node->send (["", kil => $portid, @_])
559 root 1.39 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
560 root 1.1 });
561     },
562     kil => sub {
563     my $cbs = delete $SRCNODE->{lmon}{+shift}
564     or return;
565    
566     $_->(@_) for @$cbs;
567     },
568    
569 root 1.18 # "public" services - not actually public
570 root 1.1
571     # relay message to another node / generic echo
572 root 1.15 snd => \&snd,
573 root 1.27 snd_multiple => sub {
574 root 1.1 snd @$_ for @_
575     },
576    
577 root 1.4 # informational
578     info => sub {
579     snd @_, $NODE;
580     },
581     known_nodes => sub {
582     snd @_, known_nodes;
583     },
584     up_nodes => sub {
585     snd @_, up_nodes;
586     },
587    
588 root 1.30 # random utilities
589 root 1.1 eval => sub {
590 root 1.50 my @res = do { package main; eval shift };
591 root 1.1 snd @_, "$@", @res if @_;
592     },
593     time => sub {
594     snd @_, AE::time;
595     },
596     devnull => sub {
597     #
598     },
599 root 1.15 "" => sub {
600 root 1.27 # empty messages are keepalives or similar devnull-applications
601 root 1.15 },
602 root 1.1 );
603    
604 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
605 root 1.1 $PORT{""} = sub {
606     my $tag = shift;
607     eval { &{ $node_req{$tag} ||= load_func $tag } };
608 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
609 root 1.1 };
610    
611     =back
612    
613     =head1 SEE ALSO
614    
615     L<AnyEvent::MP>.
616    
617     =head1 AUTHOR
618    
619     Marc Lehmann <schmorp@schmorp.de>
620     http://home.schmorp.de/
621    
622     =cut
623    
624     1
625