ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.39
Committed: Thu Sep 3 20:16:36 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.38: +3 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.37 our $VERSION = '1.0';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.21 add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.34 NODE $NODE node_of snd kil port_is_local
44     configure
45 root 1.13 known_nodes up_nodes mon_nodes node_is_known node_is_up
46 root 1.1 );
47    
48     our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
49     our $NETWORK_LATENCY = 3; # activity timeout
50     our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
51    
52 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
53 root 1.1
54 root 1.27 This value is called with an error or warning message, when e.g. a
55     connection could not be created, authorisation failed and so on.
56    
57     It I<must not> block or send messages -queue it and use an idle watcher if
58     you need to do any of these things.
59 root 1.1
60 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
61 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62     node connectivity and services, C<8> for debugging messages and C<9> for
63     tracing messages.
64    
65 root 1.1 The default simply logs the message to STDERR.
66    
67     =cut
68    
69 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
70    
71 root 1.1 our $WARN = sub {
72 root 1.29 return if $WARNLEVEL < $_[0];
73    
74 root 1.16 my ($level, $msg) = @_;
75    
76 root 1.1 $msg =~ s/\n$//;
77 root 1.16
78     printf STDERR "%s <%d> %s\n",
79     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
80     $level,
81     $msg;
82 root 1.1 };
83    
84 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
85    
86     The maximum level at which warning messages will be printed to STDERR by
87     the default warn handler.
88    
89     =cut
90    
91 root 1.6 sub load_func($) {
92     my $func = $_[0];
93    
94     unless (defined &$func) {
95     my $pkg = $func;
96     do {
97     $pkg =~ s/::[^:]+$//
98     or return sub { die "unable to resolve '$func'" };
99     eval "require $pkg";
100     } until defined &$func;
101     }
102    
103     \&$func
104     }
105    
106 root 1.1 sub nonce($) {
107     my $nonce;
108    
109     if (open my $fh, "</dev/urandom") {
110     sysread $fh, $nonce, $_[0];
111     } else {
112     # shit...
113     our $nonce_init;
114 root 1.31 unless ($nonce_init++) {
115 root 1.1 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
116     }
117     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
118     }
119    
120     $nonce
121     }
122    
123 root 1.21 sub alnumbits($) {
124 root 1.1 my $data = $_[0];
125    
126     if (eval "use Math::GMP 2.05; 1") {
127     $data = Math::GMP::get_str_gmp (
128     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
129     62
130     );
131     } else {
132     $data = MIME::Base64::encode_base64 $data, "";
133     $data =~ s/=//;
134 root 1.31 $data =~ s/x/x0/g;
135     $data =~ s/\//x1/g;
136     $data =~ s/\+/x2/g;
137 root 1.1 }
138    
139     $data
140     }
141    
142     sub gen_uniq {
143 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
144 root 1.1 }
145    
146 root 1.20 our $CONFIG; # this node's configuration
147 root 1.21
148 root 1.36 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
149 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
150 root 1.21 our $NODE = "anon/$RUNIQ";
151 root 1.1 our $ID = "a";
152    
153     our %NODE; # node id to transport mapping, or "undef", for local node
154     our (%PORT, %PORT_DATA); # local ports
155    
156 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
157 root 1.1 our %LMON; # monitored _local_ ports
158    
159     our %LISTENER;
160 root 1.21 our $LISTENER; # our listeners, as arrayref
161 root 1.1
162     our $SRCNODE; # holds the sending node during _inject
163    
164     sub NODE() {
165     $NODE
166     }
167    
168     sub node_of($) {
169 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
170 root 1.1
171 root 1.21 $node
172 root 1.1 }
173    
174 root 1.17 BEGIN {
175     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
176     ? sub () { 1 }
177     : sub () { 0 };
178     }
179 root 1.1
180     sub _inject {
181 root 1.25 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
182 root 1.1 &{ $PORT{+shift} or return };
183     }
184    
185 root 1.20 # this function adds a node-ref, so you can send stuff to it
186     # it is basically the central routing component.
187 root 1.1 sub add_node {
188 root 1.21 my ($node) = @_;
189 root 1.1
190 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
191 root 1.13 }
192    
193 root 1.1 sub snd(@) {
194 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
195 root 1.1
196 root 1.25 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
197 root 1.1
198 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
199 root 1.2 ->{send} (["$portid", @_]);
200 root 1.1 }
201    
202 root 1.17 =item $is_local = port_is_local $port
203    
204     Returns true iff the port is a local port.
205    
206     =cut
207    
208     sub port_is_local($) {
209 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
210 root 1.17
211 root 1.21 $NODE{$nodeid} == $NODE{""}
212 root 1.17 }
213    
214 root 1.18 =item snd_to_func $node, $func, @args
215 root 1.11
216 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
217 root 1.11 this function with the given arguments on that node.
218    
219 root 1.20 This function can be used to implement C<spawn>-like interfaces.
220 root 1.11
221     =cut
222    
223 root 1.18 sub snd_to_func($$;@) {
224 root 1.21 my $nodeid = shift;
225 root 1.11
226 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
227 root 1.11 ->send (["", @_]);
228     }
229    
230 root 1.18 =item snd_on $node, @msg
231    
232     Executes C<snd> with the given C<@msg> (which must include the destination
233     port) on the given node.
234    
235     =cut
236    
237     sub snd_on($@) {
238     my $node = shift;
239     snd $node, snd => @_;
240     }
241    
242 root 1.29 =item eval_on $node, $string[, @reply]
243 root 1.18
244 root 1.29 Evaluates the given string as Perl expression on the given node. When
245     @reply is specified, then it is used to construct a reply message with
246     C<"$@"> and any results from the eval appended.
247 root 1.18
248     =cut
249    
250 root 1.29 sub eval_on($$;@) {
251 root 1.18 my $node = shift;
252     snd $node, eval => @_;
253     }
254    
255 root 1.1 sub kil(@) {
256 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
257 root 1.1
258     length $portid
259 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
260 root 1.1
261 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
262 root 1.1 ->kill ("$portid", @_);
263     }
264    
265 root 1.7 sub _nodename {
266     require POSIX;
267     (POSIX::uname ())[1]
268     }
269    
270 root 1.21 sub _resolve($) {
271     my ($nodeid) = @_;
272 root 1.1
273     my $cv = AE::cv;
274     my @res;
275    
276     $cv->begin (sub {
277     my %seen;
278     my @refs;
279     for (sort { $a->[0] <=> $b->[0] } @res) {
280     push @refs, $_->[1] unless $seen{$_->[1]}++
281     }
282 root 1.21 shift->send (@refs);
283 root 1.1 });
284    
285     my $idx;
286 root 1.21 for my $t (split /,/, $nodeid) {
287 root 1.1 my $pri = ++$idx;
288 root 1.7
289     $t = length $t ? _nodename . ":$t" : _nodename
290     if $t =~ /^\d*$/;
291 root 1.1
292 root 1.34 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
293 root 1.7 or Carp::croak "$t: unparsable transport descriptor";
294    
295 root 1.33 $port = "0" if $port eq "*";
296    
297     if ($host eq "*") {
298     $cv->begin;
299     # use fork_call, as Net::Interface is big, and we need it rarely.
300     require AnyEvent::Util;
301     AnyEvent::Util::fork_call (
302     sub {
303     my @addr;
304    
305     require Net::Interface;
306    
307     for my $if (Net::Interface->interfaces) {
308     # we statically lower-prioritise ipv6 here, TODO :()
309     for my $_ ($if->address (Net::Interface::AF_INET ())) {
310     next if /^\x7f/; # skip localhost etc.
311     push @addr, $_;
312     }
313     for ($if->address (Net::Interface::AF_INET6 ())) {
314     #next if $if->scope ($_) <= 2;
315     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
316     push @addr, $_;
317     }
318    
319     }
320     @addr
321     }, sub {
322     for my $ip (@_) {
323     push @res, [
324     $pri += 1e-5,
325     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
326     ];
327     }
328     $cv->end;
329     }
330     );
331     } else {
332     $cv->begin;
333     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
334     for (@_) {
335     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
336     push @res, [
337     $pri += 1e-5,
338     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
339     ];
340     }
341     $cv->end;
342     };
343     }
344 root 1.1 }
345    
346     $cv->end;
347    
348     $cv
349     }
350    
351 root 1.39 sub configure(@) {
352     unshift @_, "profile" if @_ & 1;
353 root 1.34 my (%kv) = @_;
354    
355     my $profile = delete $kv{profile};
356 root 1.1
357 root 1.21 $profile = _nodename
358     unless defined $profile;
359 root 1.6
360 root 1.32 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
361 root 1.24
362     my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
363     $NODE = $node
364     unless $node eq "anon/";
365 root 1.6
366 root 1.21 $NODE{$NODE} = $NODE{""};
367     $NODE{$NODE}{id} = $NODE;
368 root 1.20
369 root 1.21 my $seeds = $CONFIG->{seeds};
370     my $binds = $CONFIG->{binds};
371 root 1.3
372 root 1.33 $binds ||= ["*"];
373 root 1.1
374 root 1.21 $WARN->(8, "node $NODE starting up.");
375 root 1.1
376 root 1.23 $LISTENER = [];
377     %LISTENER = ();
378    
379 root 1.21 for (map _resolve $_, @$binds) {
380     for my $bind ($_->recv) {
381     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
382     or Carp::croak "$bind: unparsable local bind address";
383 root 1.20
384 root 1.33 my $listener = AnyEvent::MP::Transport::mp_server
385     $host,
386     $port,
387     prepare => sub {
388     my (undef, $host, $port) = @_;
389     $bind = AnyEvent::Socket::format_hostport $host, $port;
390     },
391     ;
392     $LISTENER{$bind} = $listener;
393 root 1.21 push @$LISTENER, $bind;
394     }
395 root 1.1 }
396    
397 root 1.21 # the global service is mandatory currently
398     require AnyEvent::MP::Global;
399 root 1.1
400 root 1.21 # connect to all seednodes
401     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
402 root 1.1
403 root 1.20 for (@{ $CONFIG->{services} }) {
404 root 1.13 if (s/::$//) {
405     eval "require $_";
406     die $@ if $@;
407     } else {
408     (load_func $_)->();
409     }
410     }
411 root 1.1 }
412    
413     #############################################################################
414 root 1.6 # node monitoring and info
415 root 1.3
416 root 1.21 =item node_is_known $nodeid
417 root 1.13
418     Returns true iff the given node is currently known to the system.
419    
420     =cut
421    
422     sub node_is_known($) {
423     exists $NODE{$_[0]}
424     }
425    
426 root 1.21 =item node_is_up $nodeid
427 root 1.13
428     Returns true if the given node is "up", that is, the kernel thinks it has
429     a working connection to it.
430    
431     If the node is known but not currently connected, returns C<0>. If the
432     node is not known, returns C<undef>.
433    
434     =cut
435    
436     sub node_is_up($) {
437     ($NODE{$_[0]} or return)->{transport}
438     ? 1 : 0
439     }
440    
441 root 1.3 =item known_nodes
442    
443 root 1.26 Returns the node IDs of all nodes currently known to this node, including
444     itself and nodes not currently connected.
445 root 1.3
446     =cut
447    
448     sub known_nodes {
449 root 1.26 map $_->{id}, values %NODE
450 root 1.3 }
451    
452     =item up_nodes
453    
454 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
455     the node itself).
456 root 1.3
457     =cut
458    
459     sub up_nodes {
460 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
461 root 1.3 }
462    
463 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
464 root 1.3
465 root 1.27 Registers a callback that is called each time a node goes up (a connection
466     is established) or down (the connection is lost).
467 root 1.3
468     Node up messages can only be followed by node down messages for the same
469     node, and vice versa.
470    
471 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
472     port. This function is mainly of interest to modules that are concerned
473     about the network topology and low-level connection handling.
474    
475     Callbacks I<must not> block and I<should not> send any messages.
476    
477     The function returns an optional guard which can be used to unregister
478 root 1.3 the monitoring callback again.
479    
480     =cut
481    
482     our %MON_NODES;
483    
484     sub mon_nodes($) {
485     my ($cb) = @_;
486    
487     $MON_NODES{$cb+0} = $cb;
488    
489     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
490     }
491    
492     sub _inject_nodeevent($$;@) {
493 root 1.16 my ($node, $up, @reason) = @_;
494 root 1.3
495     for my $cb (values %MON_NODES) {
496 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
497 root 1.16 or $WARN->(1, $@);
498 root 1.3 }
499 root 1.16
500 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
501 root 1.3 }
502    
503     #############################################################################
504 root 1.1 # self node code
505    
506     our %node_req = (
507     # internal services
508    
509     # monitoring
510 root 1.27 mon0 => sub { # stop monitoring a port
511 root 1.1 my $portid = shift;
512     my $node = $SRCNODE;
513     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
514     },
515 root 1.27 mon1 => sub { # start monitoring a port
516 root 1.1 my $portid = shift;
517     my $node = $SRCNODE;
518 root 1.27 Scalar::Util::weaken $node; #TODO# ugly
519 root 1.1 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
520 root 1.27 $node->send (["", kil => $portid, @_])
521 root 1.39 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
522 root 1.1 });
523     },
524     kil => sub {
525     my $cbs = delete $SRCNODE->{lmon}{+shift}
526     or return;
527    
528     $_->(@_) for @$cbs;
529     },
530    
531 root 1.18 # "public" services - not actually public
532 root 1.1
533     # relay message to another node / generic echo
534 root 1.15 snd => \&snd,
535 root 1.27 snd_multiple => sub {
536 root 1.1 snd @$_ for @_
537     },
538    
539 root 1.4 # informational
540     info => sub {
541     snd @_, $NODE;
542     },
543     known_nodes => sub {
544     snd @_, known_nodes;
545     },
546     up_nodes => sub {
547     snd @_, up_nodes;
548     },
549    
550 root 1.30 # random utilities
551 root 1.1 eval => sub {
552     my @res = eval shift;
553     snd @_, "$@", @res if @_;
554     },
555     time => sub {
556     snd @_, AE::time;
557     },
558     devnull => sub {
559     #
560     },
561 root 1.15 "" => sub {
562 root 1.27 # empty messages are keepalives or similar devnull-applications
563 root 1.15 },
564 root 1.1 );
565    
566 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
567 root 1.1 $PORT{""} = sub {
568     my $tag = shift;
569     eval { &{ $node_req{$tag} ||= load_func $tag } };
570 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
571 root 1.1 };
572    
573     =back
574    
575     =head1 SEE ALSO
576    
577     L<AnyEvent::MP>.
578    
579     =head1 AUTHOR
580    
581     Marc Lehmann <schmorp@schmorp.de>
582     http://home.schmorp.de/
583    
584     =cut
585    
586     1
587