ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.51
Committed: Wed Sep 9 21:52:40 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.50: +3 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.37 our $VERSION = '1.0';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.21 add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.34 NODE $NODE node_of snd kil port_is_local
44     configure
45 root 1.46 up_nodes mon_nodes node_is_up
46 root 1.1 );
47    
48 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49 root 1.1
50 root 1.27 This value is called with an error or warning message, when e.g. a
51     connection could not be created, authorisation failed and so on.
52    
53     It I<must not> block or send messages -queue it and use an idle watcher if
54     you need to do any of these things.
55 root 1.1
56 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
57 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58     node connectivity and services, C<8> for debugging messages and C<9> for
59     tracing messages.
60    
61 root 1.1 The default simply logs the message to STDERR.
62    
63 root 1.44 =item @AnyEvent::MP::Kernel::WARN
64    
65     All code references in this array are called for every log message, from
66     the default C<$WARN> handler. This is an easy way to tie into the log
67     messages without disturbing others.
68    
69 root 1.1 =cut
70    
71 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
72 root 1.44 our @WARN;
73     our $WARN = sub {
74     &$_ for @WARN;
75 root 1.29
76     return if $WARNLEVEL < $_[0];
77    
78 root 1.16 my ($level, $msg) = @_;
79    
80 root 1.1 $msg =~ s/\n$//;
81 root 1.16
82     printf STDERR "%s <%d> %s\n",
83     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
84     $level,
85     $msg;
86 root 1.1 };
87    
88 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
89    
90     The maximum level at which warning messages will be printed to STDERR by
91     the default warn handler.
92    
93     =cut
94    
95 root 1.6 sub load_func($) {
96     my $func = $_[0];
97    
98     unless (defined &$func) {
99     my $pkg = $func;
100     do {
101     $pkg =~ s/::[^:]+$//
102     or return sub { die "unable to resolve '$func'" };
103     eval "require $pkg";
104     } until defined &$func;
105     }
106    
107     \&$func
108     }
109    
110 root 1.1 sub nonce($) {
111     my $nonce;
112    
113     if (open my $fh, "</dev/urandom") {
114     sysread $fh, $nonce, $_[0];
115     } else {
116     # shit...
117     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
118     }
119    
120     $nonce
121     }
122    
123 root 1.21 sub alnumbits($) {
124 root 1.1 my $data = $_[0];
125    
126     if (eval "use Math::GMP 2.05; 1") {
127     $data = Math::GMP::get_str_gmp (
128     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
129     62
130     );
131     } else {
132     $data = MIME::Base64::encode_base64 $data, "";
133     $data =~ s/=//;
134 root 1.31 $data =~ s/x/x0/g;
135     $data =~ s/\//x1/g;
136     $data =~ s/\+/x2/g;
137 root 1.1 }
138    
139     $data
140     }
141    
142     sub gen_uniq {
143 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
144 root 1.1 }
145    
146 root 1.20 our $CONFIG; # this node's configuration
147 root 1.21
148 root 1.36 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
149 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
150 root 1.21 our $NODE = "anon/$RUNIQ";
151 root 1.1 our $ID = "a";
152    
153     our %NODE; # node id to transport mapping, or "undef", for local node
154     our (%PORT, %PORT_DATA); # local ports
155    
156 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
157 root 1.1 our %LMON; # monitored _local_ ports
158    
159     our %LISTENER;
160 root 1.21 our $LISTENER; # our listeners, as arrayref
161 root 1.1
162     our $SRCNODE; # holds the sending node during _inject
163    
164     sub NODE() {
165     $NODE
166     }
167    
168     sub node_of($) {
169 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
170 root 1.1
171 root 1.21 $node
172 root 1.1 }
173    
174 root 1.17 BEGIN {
175     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
176     ? sub () { 1 }
177     : sub () { 0 };
178     }
179 root 1.1
180 root 1.42 our $DELAY_TIMER;
181     our @DELAY_QUEUE;
182    
183     sub _delay_run {
184     (shift @DELAY_QUEUE or return)->() while 1;
185     }
186    
187     sub delay($) {
188     push @DELAY_QUEUE, shift;
189     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
190     }
191    
192 root 1.1 sub _inject {
193 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
194 root 1.1 &{ $PORT{+shift} or return };
195     }
196    
197 root 1.20 # this function adds a node-ref, so you can send stuff to it
198     # it is basically the central routing component.
199 root 1.1 sub add_node {
200 root 1.21 my ($node) = @_;
201 root 1.1
202 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
203 root 1.13 }
204    
205 root 1.1 sub snd(@) {
206 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
207 root 1.1
208 root 1.48 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
209 root 1.1
210 root 1.49 defined $nodeid #d#UGLY
211     or Carp::croak "'undef' is not a valid node ID/port ID";
212    
213 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
214 root 1.2 ->{send} (["$portid", @_]);
215 root 1.1 }
216    
217 root 1.17 =item $is_local = port_is_local $port
218    
219     Returns true iff the port is a local port.
220    
221     =cut
222    
223     sub port_is_local($) {
224 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
225 root 1.17
226 root 1.21 $NODE{$nodeid} == $NODE{""}
227 root 1.17 }
228    
229 root 1.18 =item snd_to_func $node, $func, @args
230 root 1.11
231 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
232 root 1.11 this function with the given arguments on that node.
233    
234 root 1.20 This function can be used to implement C<spawn>-like interfaces.
235 root 1.11
236     =cut
237    
238 root 1.18 sub snd_to_func($$;@) {
239 root 1.21 my $nodeid = shift;
240 root 1.11
241 root 1.41 # on $NODE, we artificially delay... (for spawn)
242     # this is very ugly - maybe we should simply delay ALL messages,
243     # to avoid deep recursion issues. but that's so... slow...
244 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
245     if $nodeid ne $NODE;
246    
247 root 1.49 defined $nodeid #d#UGLY
248     or Carp::croak "'undef' is not a valid node ID/port ID";
249    
250 root 1.45 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
251 root 1.11 }
252    
253 root 1.18 =item snd_on $node, @msg
254    
255     Executes C<snd> with the given C<@msg> (which must include the destination
256     port) on the given node.
257    
258     =cut
259    
260     sub snd_on($@) {
261     my $node = shift;
262     snd $node, snd => @_;
263     }
264    
265 root 1.29 =item eval_on $node, $string[, @reply]
266 root 1.18
267 root 1.29 Evaluates the given string as Perl expression on the given node. When
268     @reply is specified, then it is used to construct a reply message with
269     C<"$@"> and any results from the eval appended.
270 root 1.18
271     =cut
272    
273 root 1.29 sub eval_on($$;@) {
274 root 1.18 my $node = shift;
275     snd $node, eval => @_;
276     }
277    
278 root 1.1 sub kil(@) {
279 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
280 root 1.1
281     length $portid
282 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
283 root 1.1
284 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
285 root 1.1 ->kill ("$portid", @_);
286     }
287    
288 root 1.7 sub _nodename {
289     require POSIX;
290     (POSIX::uname ())[1]
291     }
292    
293 root 1.21 sub _resolve($) {
294     my ($nodeid) = @_;
295 root 1.1
296     my $cv = AE::cv;
297     my @res;
298    
299     $cv->begin (sub {
300     my %seen;
301     my @refs;
302     for (sort { $a->[0] <=> $b->[0] } @res) {
303     push @refs, $_->[1] unless $seen{$_->[1]}++
304     }
305 root 1.21 shift->send (@refs);
306 root 1.1 });
307    
308     my $idx;
309 root 1.21 for my $t (split /,/, $nodeid) {
310 root 1.1 my $pri = ++$idx;
311 root 1.7
312     $t = length $t ? _nodename . ":$t" : _nodename
313     if $t =~ /^\d*$/;
314 root 1.1
315 root 1.34 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
316 root 1.7 or Carp::croak "$t: unparsable transport descriptor";
317    
318 root 1.33 $port = "0" if $port eq "*";
319    
320     if ($host eq "*") {
321     $cv->begin;
322     # use fork_call, as Net::Interface is big, and we need it rarely.
323     require AnyEvent::Util;
324     AnyEvent::Util::fork_call (
325     sub {
326     my @addr;
327    
328     require Net::Interface;
329    
330     for my $if (Net::Interface->interfaces) {
331     # we statically lower-prioritise ipv6 here, TODO :()
332 root 1.47 for $_ ($if->address (Net::Interface::AF_INET ())) {
333 root 1.33 next if /^\x7f/; # skip localhost etc.
334     push @addr, $_;
335     }
336     for ($if->address (Net::Interface::AF_INET6 ())) {
337     #next if $if->scope ($_) <= 2;
338     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
339     push @addr, $_;
340     }
341    
342     }
343     @addr
344     }, sub {
345     for my $ip (@_) {
346     push @res, [
347     $pri += 1e-5,
348     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
349     ];
350     }
351     $cv->end;
352     }
353     );
354     } else {
355     $cv->begin;
356     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
357     for (@_) {
358     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
359     push @res, [
360     $pri += 1e-5,
361     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
362     ];
363     }
364     $cv->end;
365     };
366     }
367 root 1.1 }
368    
369     $cv->end;
370    
371     $cv
372     }
373    
374 root 1.39 sub configure(@) {
375     unshift @_, "profile" if @_ & 1;
376 root 1.34 my (%kv) = @_;
377    
378     my $profile = delete $kv{profile};
379 root 1.1
380 root 1.21 $profile = _nodename
381     unless defined $profile;
382 root 1.6
383 root 1.32 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
384 root 1.24
385 root 1.47 delete $NODE{$NODE}; # we do not support doing stuff before configure
386    
387 root 1.24 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
388 root 1.51
389     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
390    
391 root 1.24 $NODE = $node
392     unless $node eq "anon/";
393 root 1.6
394 root 1.21 $NODE{$NODE} = $NODE{""};
395     $NODE{$NODE}{id} = $NODE;
396 root 1.20
397 root 1.21 my $seeds = $CONFIG->{seeds};
398     my $binds = $CONFIG->{binds};
399 root 1.3
400 root 1.33 $binds ||= ["*"];
401 root 1.1
402 root 1.21 $WARN->(8, "node $NODE starting up.");
403 root 1.1
404 root 1.23 $LISTENER = [];
405     %LISTENER = ();
406    
407 root 1.21 for (map _resolve $_, @$binds) {
408     for my $bind ($_->recv) {
409     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
410     or Carp::croak "$bind: unparsable local bind address";
411 root 1.20
412 root 1.33 my $listener = AnyEvent::MP::Transport::mp_server
413     $host,
414     $port,
415     prepare => sub {
416     my (undef, $host, $port) = @_;
417     $bind = AnyEvent::Socket::format_hostport $host, $port;
418     },
419     ;
420     $LISTENER{$bind} = $listener;
421 root 1.21 push @$LISTENER, $bind;
422     }
423 root 1.1 }
424    
425 root 1.40 $WARN->(8, "node listens on [@$LISTENER].");
426    
427 root 1.21 # the global service is mandatory currently
428     require AnyEvent::MP::Global;
429 root 1.1
430 root 1.21 # connect to all seednodes
431     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
432 root 1.1
433 root 1.20 for (@{ $CONFIG->{services} }) {
434 root 1.43 if (ref) {
435     my ($func, @args) = @$_;
436     (load_func $func)->(@args);
437     } elsif (s/::$//) {
438 root 1.13 eval "require $_";
439     die $@ if $@;
440     } else {
441     (load_func $_)->();
442     }
443     }
444 root 1.1 }
445    
446     #############################################################################
447 root 1.6 # node monitoring and info
448 root 1.3
449 root 1.21 =item node_is_known $nodeid
450 root 1.13
451 root 1.46 Returns true iff the given node is currently known to the system. The only
452     time a node is known but not up currently is when a conenction request is
453     pending.
454 root 1.13
455     =cut
456    
457     sub node_is_known($) {
458     exists $NODE{$_[0]}
459     }
460    
461 root 1.21 =item node_is_up $nodeid
462 root 1.13
463     Returns true if the given node is "up", that is, the kernel thinks it has
464     a working connection to it.
465    
466     If the node is known but not currently connected, returns C<0>. If the
467     node is not known, returns C<undef>.
468    
469     =cut
470    
471     sub node_is_up($) {
472     ($NODE{$_[0]} or return)->{transport}
473     ? 1 : 0
474     }
475    
476 root 1.3 =item known_nodes
477    
478 root 1.26 Returns the node IDs of all nodes currently known to this node, including
479     itself and nodes not currently connected.
480 root 1.3
481     =cut
482    
483 root 1.49 sub known_nodes() {
484 root 1.26 map $_->{id}, values %NODE
485 root 1.3 }
486    
487     =item up_nodes
488    
489 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
490     the node itself).
491 root 1.3
492     =cut
493    
494 root 1.49 sub up_nodes() {
495 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
496 root 1.3 }
497    
498 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
499 root 1.3
500 root 1.27 Registers a callback that is called each time a node goes up (a connection
501     is established) or down (the connection is lost).
502 root 1.3
503     Node up messages can only be followed by node down messages for the same
504     node, and vice versa.
505    
506 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
507     port. This function is mainly of interest to modules that are concerned
508     about the network topology and low-level connection handling.
509    
510     Callbacks I<must not> block and I<should not> send any messages.
511    
512     The function returns an optional guard which can be used to unregister
513 root 1.3 the monitoring callback again.
514    
515 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
516     or go up (and down).
517    
518     newnode $_, 1 for up_nodes;
519     mon_nodes \&newnode;
520    
521 root 1.3 =cut
522    
523     our %MON_NODES;
524    
525     sub mon_nodes($) {
526     my ($cb) = @_;
527    
528     $MON_NODES{$cb+0} = $cb;
529    
530     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
531     }
532    
533     sub _inject_nodeevent($$;@) {
534 root 1.16 my ($node, $up, @reason) = @_;
535 root 1.3
536     for my $cb (values %MON_NODES) {
537 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
538 root 1.16 or $WARN->(1, $@);
539 root 1.3 }
540 root 1.16
541 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
542 root 1.3 }
543    
544     #############################################################################
545 root 1.1 # self node code
546    
547     our %node_req = (
548     # internal services
549    
550     # monitoring
551 root 1.27 mon0 => sub { # stop monitoring a port
552 root 1.1 my $portid = shift;
553     my $node = $SRCNODE;
554     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
555     },
556 root 1.27 mon1 => sub { # start monitoring a port
557 root 1.1 my $portid = shift;
558     my $node = $SRCNODE;
559 root 1.27 Scalar::Util::weaken $node; #TODO# ugly
560 root 1.1 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
561 root 1.27 $node->send (["", kil => $portid, @_])
562 root 1.39 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
563 root 1.1 });
564     },
565     kil => sub {
566     my $cbs = delete $SRCNODE->{lmon}{+shift}
567     or return;
568    
569     $_->(@_) for @$cbs;
570     },
571    
572 root 1.18 # "public" services - not actually public
573 root 1.1
574     # relay message to another node / generic echo
575 root 1.15 snd => \&snd,
576 root 1.27 snd_multiple => sub {
577 root 1.1 snd @$_ for @_
578     },
579    
580 root 1.4 # informational
581     info => sub {
582     snd @_, $NODE;
583     },
584     known_nodes => sub {
585     snd @_, known_nodes;
586     },
587     up_nodes => sub {
588     snd @_, up_nodes;
589     },
590    
591 root 1.30 # random utilities
592 root 1.1 eval => sub {
593 root 1.50 my @res = do { package main; eval shift };
594 root 1.1 snd @_, "$@", @res if @_;
595     },
596     time => sub {
597     snd @_, AE::time;
598     },
599     devnull => sub {
600     #
601     },
602 root 1.15 "" => sub {
603 root 1.27 # empty messages are keepalives or similar devnull-applications
604 root 1.15 },
605 root 1.1 );
606    
607 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
608 root 1.1 $PORT{""} = sub {
609     my $tag = shift;
610     eval { &{ $node_req{$tag} ||= load_func $tag } };
611 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
612 root 1.1 };
613    
614     =back
615    
616     =head1 SEE ALSO
617    
618     L<AnyEvent::MP>.
619    
620     =head1 AUTHOR
621    
622     Marc Lehmann <schmorp@schmorp.de>
623     http://home.schmorp.de/
624    
625     =cut
626    
627     1
628