ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.57
Committed: Fri Oct 2 13:29:49 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.56: +0 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38     our @EXPORT = qw(
39 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 root 1.21 add_node load_func snd_to_func snd_on eval_on
41 root 1.1
42 root 1.34 NODE $NODE node_of snd kil port_is_local
43     configure
44 root 1.46 up_nodes mon_nodes node_is_up
45 root 1.1 );
46    
47 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
48 root 1.1
49 root 1.27 This value is called with an error or warning message, when e.g. a
50     connection could not be created, authorisation failed and so on.
51    
52     It I<must not> block or send messages -queue it and use an idle watcher if
53     you need to do any of these things.
54 root 1.1
55 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
56 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
57     node connectivity and services, C<8> for debugging messages and C<9> for
58     tracing messages.
59    
60 root 1.1 The default simply logs the message to STDERR.
61    
62 root 1.44 =item @AnyEvent::MP::Kernel::WARN
63    
64     All code references in this array are called for every log message, from
65     the default C<$WARN> handler. This is an easy way to tie into the log
66     messages without disturbing others.
67    
68 root 1.1 =cut
69    
70 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
71 root 1.44 our @WARN;
72     our $WARN = sub {
73     &$_ for @WARN;
74 root 1.29
75     return if $WARNLEVEL < $_[0];
76    
77 root 1.16 my ($level, $msg) = @_;
78    
79 root 1.1 $msg =~ s/\n$//;
80 root 1.16
81     printf STDERR "%s <%d> %s\n",
82     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
83     $level,
84     $msg;
85 root 1.1 };
86    
87 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
88    
89     The maximum level at which warning messages will be printed to STDERR by
90     the default warn handler.
91    
92     =cut
93    
94 root 1.6 sub load_func($) {
95     my $func = $_[0];
96    
97     unless (defined &$func) {
98     my $pkg = $func;
99     do {
100     $pkg =~ s/::[^:]+$//
101     or return sub { die "unable to resolve '$func'" };
102     eval "require $pkg";
103     } until defined &$func;
104     }
105    
106     \&$func
107     }
108    
109 root 1.1 sub nonce($) {
110     my $nonce;
111    
112     if (open my $fh, "</dev/urandom") {
113     sysread $fh, $nonce, $_[0];
114     } else {
115     # shit...
116     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
117     }
118    
119     $nonce
120     }
121    
122 root 1.21 sub alnumbits($) {
123 root 1.1 my $data = $_[0];
124    
125     if (eval "use Math::GMP 2.05; 1") {
126     $data = Math::GMP::get_str_gmp (
127     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
128     62
129     );
130     } else {
131     $data = MIME::Base64::encode_base64 $data, "";
132     $data =~ s/=//;
133 root 1.31 $data =~ s/x/x0/g;
134     $data =~ s/\//x1/g;
135     $data =~ s/\+/x2/g;
136 root 1.1 }
137    
138     $data
139     }
140    
141     sub gen_uniq {
142 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
143 root 1.1 }
144    
145 root 1.20 our $CONFIG; # this node's configuration
146 root 1.21
147 root 1.36 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
148 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
149 root 1.21 our $NODE = "anon/$RUNIQ";
150 root 1.1 our $ID = "a";
151    
152     our %NODE; # node id to transport mapping, or "undef", for local node
153     our (%PORT, %PORT_DATA); # local ports
154    
155 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
156 root 1.1 our %LMON; # monitored _local_ ports
157    
158     our %LISTENER;
159 root 1.21 our $LISTENER; # our listeners, as arrayref
160 root 1.1
161     our $SRCNODE; # holds the sending node during _inject
162    
163     sub NODE() {
164     $NODE
165     }
166    
167     sub node_of($) {
168 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
169 root 1.1
170 root 1.21 $node
171 root 1.1 }
172    
173 root 1.17 BEGIN {
174     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
175     ? sub () { 1 }
176     : sub () { 0 };
177     }
178 root 1.1
179 root 1.42 our $DELAY_TIMER;
180     our @DELAY_QUEUE;
181    
182     sub _delay_run {
183 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
184 root 1.42 }
185    
186     sub delay($) {
187     push @DELAY_QUEUE, shift;
188     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
189     }
190    
191 root 1.1 sub _inject {
192 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
193 root 1.1 &{ $PORT{+shift} or return };
194     }
195    
196 root 1.20 # this function adds a node-ref, so you can send stuff to it
197     # it is basically the central routing component.
198 root 1.1 sub add_node {
199 root 1.21 my ($node) = @_;
200 root 1.1
201 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
202 root 1.13 }
203    
204 root 1.1 sub snd(@) {
205 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
206 root 1.1
207 root 1.48 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
208 root 1.1
209 root 1.49 defined $nodeid #d#UGLY
210     or Carp::croak "'undef' is not a valid node ID/port ID";
211    
212 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
213 root 1.2 ->{send} (["$portid", @_]);
214 root 1.1 }
215    
216 root 1.17 =item $is_local = port_is_local $port
217    
218     Returns true iff the port is a local port.
219    
220     =cut
221    
222     sub port_is_local($) {
223 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
224 root 1.17
225 root 1.21 $NODE{$nodeid} == $NODE{""}
226 root 1.17 }
227    
228 root 1.18 =item snd_to_func $node, $func, @args
229 root 1.11
230 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
231 root 1.11 this function with the given arguments on that node.
232    
233 root 1.20 This function can be used to implement C<spawn>-like interfaces.
234 root 1.11
235     =cut
236    
237 root 1.18 sub snd_to_func($$;@) {
238 root 1.21 my $nodeid = shift;
239 root 1.11
240 root 1.41 # on $NODE, we artificially delay... (for spawn)
241     # this is very ugly - maybe we should simply delay ALL messages,
242     # to avoid deep recursion issues. but that's so... slow...
243 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
244     if $nodeid ne $NODE;
245    
246 root 1.49 defined $nodeid #d#UGLY
247     or Carp::croak "'undef' is not a valid node ID/port ID";
248    
249 root 1.45 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
250 root 1.11 }
251    
252 root 1.18 =item snd_on $node, @msg
253    
254     Executes C<snd> with the given C<@msg> (which must include the destination
255     port) on the given node.
256    
257     =cut
258    
259     sub snd_on($@) {
260     my $node = shift;
261     snd $node, snd => @_;
262     }
263    
264 root 1.29 =item eval_on $node, $string[, @reply]
265 root 1.18
266 root 1.29 Evaluates the given string as Perl expression on the given node. When
267     @reply is specified, then it is used to construct a reply message with
268     C<"$@"> and any results from the eval appended.
269 root 1.18
270     =cut
271    
272 root 1.29 sub eval_on($$;@) {
273 root 1.18 my $node = shift;
274     snd $node, eval => @_;
275     }
276    
277 root 1.1 sub kil(@) {
278 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
279 root 1.1
280     length $portid
281 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
282 root 1.1
283 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
284 root 1.1 ->kill ("$portid", @_);
285     }
286    
287 root 1.7 sub _nodename {
288     require POSIX;
289     (POSIX::uname ())[1]
290     }
291    
292 root 1.21 sub _resolve($) {
293     my ($nodeid) = @_;
294 root 1.1
295     my $cv = AE::cv;
296     my @res;
297    
298     $cv->begin (sub {
299     my %seen;
300     my @refs;
301     for (sort { $a->[0] <=> $b->[0] } @res) {
302     push @refs, $_->[1] unless $seen{$_->[1]}++
303     }
304 root 1.21 shift->send (@refs);
305 root 1.1 });
306    
307     my $idx;
308 root 1.21 for my $t (split /,/, $nodeid) {
309 root 1.1 my $pri = ++$idx;
310 root 1.7
311     $t = length $t ? _nodename . ":$t" : _nodename
312     if $t =~ /^\d*$/;
313 root 1.1
314 root 1.34 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
315 root 1.7 or Carp::croak "$t: unparsable transport descriptor";
316    
317 root 1.33 $port = "0" if $port eq "*";
318    
319     if ($host eq "*") {
320     $cv->begin;
321     # use fork_call, as Net::Interface is big, and we need it rarely.
322     require AnyEvent::Util;
323     AnyEvent::Util::fork_call (
324     sub {
325     my @addr;
326    
327     require Net::Interface;
328    
329     for my $if (Net::Interface->interfaces) {
330     # we statically lower-prioritise ipv6 here, TODO :()
331 root 1.47 for $_ ($if->address (Net::Interface::AF_INET ())) {
332 root 1.33 next if /^\x7f/; # skip localhost etc.
333     push @addr, $_;
334     }
335     for ($if->address (Net::Interface::AF_INET6 ())) {
336     #next if $if->scope ($_) <= 2;
337     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
338     push @addr, $_;
339     }
340    
341     }
342     @addr
343     }, sub {
344     for my $ip (@_) {
345     push @res, [
346     $pri += 1e-5,
347     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
348     ];
349     }
350     $cv->end;
351     }
352     );
353     } else {
354     $cv->begin;
355     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
356     for (@_) {
357     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
358     push @res, [
359     $pri += 1e-5,
360     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
361     ];
362     }
363     $cv->end;
364     };
365     }
366 root 1.1 }
367    
368     $cv->end;
369    
370     $cv
371     }
372    
373 root 1.39 sub configure(@) {
374     unshift @_, "profile" if @_ & 1;
375 root 1.34 my (%kv) = @_;
376    
377     my $profile = delete $kv{profile};
378 root 1.1
379 root 1.21 $profile = _nodename
380     unless defined $profile;
381 root 1.6
382 root 1.32 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
383 root 1.24
384 root 1.47 delete $NODE{$NODE}; # we do not support doing stuff before configure
385    
386 root 1.24 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
387 root 1.51
388     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
389    
390 root 1.24 $NODE = $node
391     unless $node eq "anon/";
392 root 1.6
393 root 1.21 $NODE{$NODE} = $NODE{""};
394     $NODE{$NODE}{id} = $NODE;
395 root 1.20
396 root 1.21 my $seeds = $CONFIG->{seeds};
397     my $binds = $CONFIG->{binds};
398 root 1.3
399 root 1.33 $binds ||= ["*"];
400 root 1.1
401 root 1.21 $WARN->(8, "node $NODE starting up.");
402 root 1.1
403 root 1.23 $LISTENER = [];
404     %LISTENER = ();
405    
406 root 1.21 for (map _resolve $_, @$binds) {
407     for my $bind ($_->recv) {
408     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
409     or Carp::croak "$bind: unparsable local bind address";
410 root 1.20
411 root 1.33 my $listener = AnyEvent::MP::Transport::mp_server
412     $host,
413     $port,
414     prepare => sub {
415     my (undef, $host, $port) = @_;
416     $bind = AnyEvent::Socket::format_hostport $host, $port;
417 root 1.53 0
418 root 1.33 },
419     ;
420     $LISTENER{$bind} = $listener;
421 root 1.21 push @$LISTENER, $bind;
422     }
423 root 1.1 }
424    
425 root 1.40 $WARN->(8, "node listens on [@$LISTENER].");
426    
427 root 1.21 # the global service is mandatory currently
428     require AnyEvent::MP::Global;
429 root 1.1
430 root 1.21 # connect to all seednodes
431     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
432 root 1.1
433 root 1.20 for (@{ $CONFIG->{services} }) {
434 root 1.43 if (ref) {
435     my ($func, @args) = @$_;
436     (load_func $func)->(@args);
437     } elsif (s/::$//) {
438 root 1.13 eval "require $_";
439     die $@ if $@;
440     } else {
441     (load_func $_)->();
442     }
443     }
444 root 1.1 }
445    
446     #############################################################################
447 root 1.6 # node monitoring and info
448 root 1.3
449 root 1.21 =item node_is_known $nodeid
450 root 1.13
451 root 1.46 Returns true iff the given node is currently known to the system. The only
452     time a node is known but not up currently is when a conenction request is
453     pending.
454 root 1.13
455     =cut
456    
457     sub node_is_known($) {
458     exists $NODE{$_[0]}
459     }
460    
461 root 1.21 =item node_is_up $nodeid
462 root 1.13
463     Returns true if the given node is "up", that is, the kernel thinks it has
464     a working connection to it.
465    
466     If the node is known but not currently connected, returns C<0>. If the
467     node is not known, returns C<undef>.
468    
469     =cut
470    
471     sub node_is_up($) {
472     ($NODE{$_[0]} or return)->{transport}
473     ? 1 : 0
474     }
475    
476 root 1.3 =item known_nodes
477    
478 root 1.26 Returns the node IDs of all nodes currently known to this node, including
479     itself and nodes not currently connected.
480 root 1.3
481     =cut
482    
483 root 1.49 sub known_nodes() {
484 root 1.26 map $_->{id}, values %NODE
485 root 1.3 }
486    
487     =item up_nodes
488    
489 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
490     the node itself).
491 root 1.3
492     =cut
493    
494 root 1.49 sub up_nodes() {
495 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
496 root 1.3 }
497    
498 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
499 root 1.3
500 root 1.27 Registers a callback that is called each time a node goes up (a connection
501     is established) or down (the connection is lost).
502 root 1.3
503     Node up messages can only be followed by node down messages for the same
504     node, and vice versa.
505    
506 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
507     port. This function is mainly of interest to modules that are concerned
508     about the network topology and low-level connection handling.
509    
510     Callbacks I<must not> block and I<should not> send any messages.
511    
512     The function returns an optional guard which can be used to unregister
513 root 1.3 the monitoring callback again.
514    
515 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
516     or go up (and down).
517    
518     newnode $_, 1 for up_nodes;
519     mon_nodes \&newnode;
520    
521 root 1.3 =cut
522    
523     our %MON_NODES;
524    
525     sub mon_nodes($) {
526     my ($cb) = @_;
527    
528     $MON_NODES{$cb+0} = $cb;
529    
530     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
531     }
532    
533     sub _inject_nodeevent($$;@) {
534 root 1.16 my ($node, $up, @reason) = @_;
535 root 1.3
536     for my $cb (values %MON_NODES) {
537 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
538 root 1.16 or $WARN->(1, $@);
539 root 1.3 }
540 root 1.16
541 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
542 root 1.3 }
543    
544     #############################################################################
545 root 1.1 # self node code
546    
547     our %node_req = (
548     # internal services
549    
550     # monitoring
551 root 1.27 mon0 => sub { # stop monitoring a port
552 root 1.1 my $portid = shift;
553     my $node = $SRCNODE;
554     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
555     },
556 root 1.27 mon1 => sub { # start monitoring a port
557 root 1.1 my $portid = shift;
558     my $node = $SRCNODE;
559 root 1.27 Scalar::Util::weaken $node; #TODO# ugly
560 root 1.1 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
561 root 1.27 $node->send (["", kil => $portid, @_])
562 root 1.39 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
563 root 1.1 });
564     },
565     kil => sub {
566     my $cbs = delete $SRCNODE->{lmon}{+shift}
567     or return;
568    
569     $_->(@_) for @$cbs;
570     },
571    
572 root 1.18 # "public" services - not actually public
573 root 1.1
574     # relay message to another node / generic echo
575 root 1.15 snd => \&snd,
576 root 1.27 snd_multiple => sub {
577 root 1.1 snd @$_ for @_
578     },
579    
580 root 1.4 # informational
581     info => sub {
582     snd @_, $NODE;
583     },
584     known_nodes => sub {
585     snd @_, known_nodes;
586     },
587     up_nodes => sub {
588     snd @_, up_nodes;
589     },
590    
591 root 1.30 # random utilities
592 root 1.1 eval => sub {
593 root 1.50 my @res = do { package main; eval shift };
594 root 1.1 snd @_, "$@", @res if @_;
595     },
596     time => sub {
597     snd @_, AE::time;
598     },
599     devnull => sub {
600     #
601     },
602 root 1.15 "" => sub {
603 root 1.27 # empty messages are keepalives or similar devnull-applications
604 root 1.15 },
605 root 1.1 );
606    
607 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
608 root 1.1 $PORT{""} = sub {
609     my $tag = shift;
610     eval { &{ $node_req{$tag} ||= load_func $tag } };
611 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
612 root 1.1 };
613    
614     =back
615    
616     =head1 SEE ALSO
617    
618     L<AnyEvent::MP>.
619    
620     =head1 AUTHOR
621    
622     Marc Lehmann <schmorp@schmorp.de>
623     http://home.schmorp.de/
624    
625     =cut
626    
627     1
628