ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP/Kernel.pm
Revision: 1.43
Committed: Sat Sep 5 22:48:27 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.42: +4 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.37 our $VERSION = '1.0';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.21 add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.34 NODE $NODE node_of snd kil port_is_local
44     configure
45 root 1.13 known_nodes up_nodes mon_nodes node_is_known node_is_up
46 root 1.1 );
47    
48 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49 root 1.1
50 root 1.27 This value is called with an error or warning message, when e.g. a
51     connection could not be created, authorisation failed and so on.
52    
53     It I<must not> block or send messages -queue it and use an idle watcher if
54     you need to do any of these things.
55 root 1.1
56 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
57 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58     node connectivity and services, C<8> for debugging messages and C<9> for
59     tracing messages.
60    
61 root 1.1 The default simply logs the message to STDERR.
62    
63     =cut
64    
65 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
66    
67 root 1.1 our $WARN = sub {
68 root 1.29 return if $WARNLEVEL < $_[0];
69    
70 root 1.16 my ($level, $msg) = @_;
71    
72 root 1.1 $msg =~ s/\n$//;
73 root 1.16
74     printf STDERR "%s <%d> %s\n",
75     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
76     $level,
77     $msg;
78 root 1.1 };
79    
80 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
81    
82     The maximum level at which warning messages will be printed to STDERR by
83     the default warn handler.
84    
85     =cut
86    
87 root 1.6 sub load_func($) {
88     my $func = $_[0];
89    
90     unless (defined &$func) {
91     my $pkg = $func;
92     do {
93     $pkg =~ s/::[^:]+$//
94     or return sub { die "unable to resolve '$func'" };
95     eval "require $pkg";
96     } until defined &$func;
97     }
98    
99     \&$func
100     }
101    
102 root 1.1 sub nonce($) {
103     my $nonce;
104    
105     if (open my $fh, "</dev/urandom") {
106     sysread $fh, $nonce, $_[0];
107     } else {
108     # shit...
109     our $nonce_init;
110 root 1.31 unless ($nonce_init++) {
111 root 1.1 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
112     }
113     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
114     }
115    
116     $nonce
117     }
118    
119 root 1.21 sub alnumbits($) {
120 root 1.1 my $data = $_[0];
121    
122     if (eval "use Math::GMP 2.05; 1") {
123     $data = Math::GMP::get_str_gmp (
124     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
125     62
126     );
127     } else {
128     $data = MIME::Base64::encode_base64 $data, "";
129     $data =~ s/=//;
130 root 1.31 $data =~ s/x/x0/g;
131     $data =~ s/\//x1/g;
132     $data =~ s/\+/x2/g;
133 root 1.1 }
134    
135     $data
136     }
137    
138     sub gen_uniq {
139 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
140 root 1.1 }
141    
142 root 1.20 our $CONFIG; # this node's configuration
143 root 1.21
144 root 1.36 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
145 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
146 root 1.21 our $NODE = "anon/$RUNIQ";
147 root 1.1 our $ID = "a";
148    
149     our %NODE; # node id to transport mapping, or "undef", for local node
150     our (%PORT, %PORT_DATA); # local ports
151    
152 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
153 root 1.1 our %LMON; # monitored _local_ ports
154    
155     our %LISTENER;
156 root 1.21 our $LISTENER; # our listeners, as arrayref
157 root 1.1
158     our $SRCNODE; # holds the sending node during _inject
159    
160     sub NODE() {
161     $NODE
162     }
163    
164     sub node_of($) {
165 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
166 root 1.1
167 root 1.21 $node
168 root 1.1 }
169    
170 root 1.17 BEGIN {
171     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
172     ? sub () { 1 }
173     : sub () { 0 };
174     }
175 root 1.1
176 root 1.42 our $DELAY_TIMER;
177     our @DELAY_QUEUE;
178    
179     sub _delay_run {
180     (shift @DELAY_QUEUE or return)->() while 1;
181     }
182    
183     sub delay($) {
184     push @DELAY_QUEUE, shift;
185     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
186     }
187    
188 root 1.1 sub _inject {
189 root 1.25 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
190 root 1.1 &{ $PORT{+shift} or return };
191     }
192    
193 root 1.20 # this function adds a node-ref, so you can send stuff to it
194     # it is basically the central routing component.
195 root 1.1 sub add_node {
196 root 1.21 my ($node) = @_;
197 root 1.1
198 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
199 root 1.13 }
200    
201 root 1.1 sub snd(@) {
202 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
203 root 1.1
204 root 1.25 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
205 root 1.1
206 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
207 root 1.2 ->{send} (["$portid", @_]);
208 root 1.1 }
209    
210 root 1.17 =item $is_local = port_is_local $port
211    
212     Returns true iff the port is a local port.
213    
214     =cut
215    
216     sub port_is_local($) {
217 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
218 root 1.17
219 root 1.21 $NODE{$nodeid} == $NODE{""}
220 root 1.17 }
221    
222 root 1.18 =item snd_to_func $node, $func, @args
223 root 1.11
224 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
225 root 1.11 this function with the given arguments on that node.
226    
227 root 1.20 This function can be used to implement C<spawn>-like interfaces.
228 root 1.11
229     =cut
230    
231 root 1.18 sub snd_to_func($$;@) {
232 root 1.21 my $nodeid = shift;
233 root 1.11
234 root 1.41 return ($NODE{$nodeid} || add_node $nodeid)->send (["", @_])
235     if $nodeid ne $NODE;
236    
237     # on $NODE, we artificially delay... (for spawn)
238     # this is very ugly - maybe we should simply delay ALL messages,
239     # to avoid deep recursion issues. but that's so... slow...
240     my @msg = ("", @_);
241 root 1.42 delay sub { $NODE{""}->send (\@msg) };
242 root 1.11 }
243    
244 root 1.18 =item snd_on $node, @msg
245    
246     Executes C<snd> with the given C<@msg> (which must include the destination
247     port) on the given node.
248    
249     =cut
250    
251     sub snd_on($@) {
252     my $node = shift;
253     snd $node, snd => @_;
254     }
255    
256 root 1.29 =item eval_on $node, $string[, @reply]
257 root 1.18
258 root 1.29 Evaluates the given string as Perl expression on the given node. When
259     @reply is specified, then it is used to construct a reply message with
260     C<"$@"> and any results from the eval appended.
261 root 1.18
262     =cut
263    
264 root 1.29 sub eval_on($$;@) {
265 root 1.18 my $node = shift;
266     snd $node, eval => @_;
267     }
268    
269 root 1.1 sub kil(@) {
270 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
271 root 1.1
272     length $portid
273 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
274 root 1.1
275 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
276 root 1.1 ->kill ("$portid", @_);
277     }
278    
279 root 1.7 sub _nodename {
280     require POSIX;
281     (POSIX::uname ())[1]
282     }
283    
284 root 1.21 sub _resolve($) {
285     my ($nodeid) = @_;
286 root 1.1
287     my $cv = AE::cv;
288     my @res;
289    
290     $cv->begin (sub {
291     my %seen;
292     my @refs;
293     for (sort { $a->[0] <=> $b->[0] } @res) {
294     push @refs, $_->[1] unless $seen{$_->[1]}++
295     }
296 root 1.21 shift->send (@refs);
297 root 1.1 });
298    
299     my $idx;
300 root 1.21 for my $t (split /,/, $nodeid) {
301 root 1.1 my $pri = ++$idx;
302 root 1.7
303     $t = length $t ? _nodename . ":$t" : _nodename
304     if $t =~ /^\d*$/;
305 root 1.1
306 root 1.34 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
307 root 1.7 or Carp::croak "$t: unparsable transport descriptor";
308    
309 root 1.33 $port = "0" if $port eq "*";
310    
311     if ($host eq "*") {
312     $cv->begin;
313     # use fork_call, as Net::Interface is big, and we need it rarely.
314     require AnyEvent::Util;
315     AnyEvent::Util::fork_call (
316     sub {
317     my @addr;
318    
319     require Net::Interface;
320    
321     for my $if (Net::Interface->interfaces) {
322     # we statically lower-prioritise ipv6 here, TODO :()
323     for my $_ ($if->address (Net::Interface::AF_INET ())) {
324     next if /^\x7f/; # skip localhost etc.
325     push @addr, $_;
326     }
327     for ($if->address (Net::Interface::AF_INET6 ())) {
328     #next if $if->scope ($_) <= 2;
329     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
330     push @addr, $_;
331     }
332    
333     }
334     @addr
335     }, sub {
336     for my $ip (@_) {
337     push @res, [
338     $pri += 1e-5,
339     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
340     ];
341     }
342     $cv->end;
343     }
344     );
345     } else {
346     $cv->begin;
347     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
348     for (@_) {
349     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
350     push @res, [
351     $pri += 1e-5,
352     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
353     ];
354     }
355     $cv->end;
356     };
357     }
358 root 1.1 }
359    
360     $cv->end;
361    
362     $cv
363     }
364    
365 root 1.39 sub configure(@) {
366     unshift @_, "profile" if @_ & 1;
367 root 1.34 my (%kv) = @_;
368    
369     my $profile = delete $kv{profile};
370 root 1.1
371 root 1.21 $profile = _nodename
372     unless defined $profile;
373 root 1.6
374 root 1.32 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
375 root 1.24
376     my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
377     $NODE = $node
378     unless $node eq "anon/";
379 root 1.6
380 root 1.21 $NODE{$NODE} = $NODE{""};
381     $NODE{$NODE}{id} = $NODE;
382 root 1.20
383 root 1.21 my $seeds = $CONFIG->{seeds};
384     my $binds = $CONFIG->{binds};
385 root 1.3
386 root 1.33 $binds ||= ["*"];
387 root 1.1
388 root 1.21 $WARN->(8, "node $NODE starting up.");
389 root 1.1
390 root 1.23 $LISTENER = [];
391     %LISTENER = ();
392    
393 root 1.21 for (map _resolve $_, @$binds) {
394     for my $bind ($_->recv) {
395     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
396     or Carp::croak "$bind: unparsable local bind address";
397 root 1.20
398 root 1.33 my $listener = AnyEvent::MP::Transport::mp_server
399     $host,
400     $port,
401     prepare => sub {
402     my (undef, $host, $port) = @_;
403     $bind = AnyEvent::Socket::format_hostport $host, $port;
404     },
405     ;
406     $LISTENER{$bind} = $listener;
407 root 1.21 push @$LISTENER, $bind;
408     }
409 root 1.1 }
410    
411 root 1.40 $WARN->(8, "node listens on [@$LISTENER].");
412    
413 root 1.21 # the global service is mandatory currently
414     require AnyEvent::MP::Global;
415 root 1.1
416 root 1.21 # connect to all seednodes
417     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
418 root 1.1
419 root 1.20 for (@{ $CONFIG->{services} }) {
420 root 1.43 if (ref) {
421     my ($func, @args) = @$_;
422     (load_func $func)->(@args);
423     } elsif (s/::$//) {
424 root 1.13 eval "require $_";
425     die $@ if $@;
426     } else {
427     (load_func $_)->();
428     }
429     }
430 root 1.1 }
431    
432     #############################################################################
433 root 1.6 # node monitoring and info
434 root 1.3
435 root 1.21 =item node_is_known $nodeid
436 root 1.13
437     Returns true iff the given node is currently known to the system.
438    
439     =cut
440    
441     sub node_is_known($) {
442     exists $NODE{$_[0]}
443     }
444    
445 root 1.21 =item node_is_up $nodeid
446 root 1.13
447     Returns true if the given node is "up", that is, the kernel thinks it has
448     a working connection to it.
449    
450     If the node is known but not currently connected, returns C<0>. If the
451     node is not known, returns C<undef>.
452    
453     =cut
454    
455     sub node_is_up($) {
456     ($NODE{$_[0]} or return)->{transport}
457     ? 1 : 0
458     }
459    
460 root 1.3 =item known_nodes
461    
462 root 1.26 Returns the node IDs of all nodes currently known to this node, including
463     itself and nodes not currently connected.
464 root 1.3
465     =cut
466    
467     sub known_nodes {
468 root 1.26 map $_->{id}, values %NODE
469 root 1.3 }
470    
471     =item up_nodes
472    
473 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
474     the node itself).
475 root 1.3
476     =cut
477    
478     sub up_nodes {
479 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
480 root 1.3 }
481    
482 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
483 root 1.3
484 root 1.27 Registers a callback that is called each time a node goes up (a connection
485     is established) or down (the connection is lost).
486 root 1.3
487     Node up messages can only be followed by node down messages for the same
488     node, and vice versa.
489    
490 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
491     port. This function is mainly of interest to modules that are concerned
492     about the network topology and low-level connection handling.
493    
494     Callbacks I<must not> block and I<should not> send any messages.
495    
496     The function returns an optional guard which can be used to unregister
497 root 1.3 the monitoring callback again.
498    
499     =cut
500    
501     our %MON_NODES;
502    
503     sub mon_nodes($) {
504     my ($cb) = @_;
505    
506     $MON_NODES{$cb+0} = $cb;
507    
508     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
509     }
510    
511     sub _inject_nodeevent($$;@) {
512 root 1.16 my ($node, $up, @reason) = @_;
513 root 1.3
514     for my $cb (values %MON_NODES) {
515 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
516 root 1.16 or $WARN->(1, $@);
517 root 1.3 }
518 root 1.16
519 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
520 root 1.3 }
521    
522     #############################################################################
523 root 1.1 # self node code
524    
525     our %node_req = (
526     # internal services
527    
528     # monitoring
529 root 1.27 mon0 => sub { # stop monitoring a port
530 root 1.1 my $portid = shift;
531     my $node = $SRCNODE;
532     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
533     },
534 root 1.27 mon1 => sub { # start monitoring a port
535 root 1.1 my $portid = shift;
536     my $node = $SRCNODE;
537 root 1.27 Scalar::Util::weaken $node; #TODO# ugly
538 root 1.1 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
539 root 1.27 $node->send (["", kil => $portid, @_])
540 root 1.39 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
541 root 1.1 });
542     },
543     kil => sub {
544     my $cbs = delete $SRCNODE->{lmon}{+shift}
545     or return;
546    
547     $_->(@_) for @$cbs;
548     },
549    
550 root 1.18 # "public" services - not actually public
551 root 1.1
552     # relay message to another node / generic echo
553 root 1.15 snd => \&snd,
554 root 1.27 snd_multiple => sub {
555 root 1.1 snd @$_ for @_
556     },
557    
558 root 1.4 # informational
559     info => sub {
560     snd @_, $NODE;
561     },
562     known_nodes => sub {
563     snd @_, known_nodes;
564     },
565     up_nodes => sub {
566     snd @_, up_nodes;
567     },
568    
569 root 1.30 # random utilities
570 root 1.1 eval => sub {
571     my @res = eval shift;
572     snd @_, "$@", @res if @_;
573     },
574     time => sub {
575     snd @_, AE::time;
576     },
577     devnull => sub {
578     #
579     },
580 root 1.15 "" => sub {
581 root 1.27 # empty messages are keepalives or similar devnull-applications
582 root 1.15 },
583 root 1.1 );
584    
585 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
586 root 1.1 $PORT{""} = sub {
587     my $tag = shift;
588     eval { &{ $node_req{$tag} ||= load_func $tag } };
589 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
590 root 1.1 };
591    
592     =back
593    
594     =head1 SEE ALSO
595    
596     L<AnyEvent::MP>.
597    
598     =head1 AUTHOR
599    
600     Marc Lehmann <schmorp@schmorp.de>
601     http://home.schmorp.de/
602    
603     =cut
604    
605     1
606