ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.42
Committed: Sat Sep 5 21:16:59 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.41: +13 -4 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.37 our $VERSION = '1.0';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.21 add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.34 NODE $NODE node_of snd kil port_is_local
44     configure
45 root 1.13 known_nodes up_nodes mon_nodes node_is_known node_is_up
46 root 1.1 );
47    
48 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49 root 1.1
50 root 1.27 This value is called with an error or warning message, when e.g. a
51     connection could not be created, authorisation failed and so on.
52    
53     It I<must not> block or send messages -queue it and use an idle watcher if
54     you need to do any of these things.
55 root 1.1
56 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
57 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58     node connectivity and services, C<8> for debugging messages and C<9> for
59     tracing messages.
60    
61 root 1.1 The default simply logs the message to STDERR.
62    
63     =cut
64    
65 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
66    
67 root 1.1 our $WARN = sub {
68 root 1.29 return if $WARNLEVEL < $_[0];
69    
70 root 1.16 my ($level, $msg) = @_;
71    
72 root 1.1 $msg =~ s/\n$//;
73 root 1.16
74     printf STDERR "%s <%d> %s\n",
75     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
76     $level,
77     $msg;
78 root 1.1 };
79    
80 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
81    
82     The maximum level at which warning messages will be printed to STDERR by
83     the default warn handler.
84    
85     =cut
86    
87 root 1.6 sub load_func($) {
88     my $func = $_[0];
89    
90     unless (defined &$func) {
91     my $pkg = $func;
92     do {
93     $pkg =~ s/::[^:]+$//
94     or return sub { die "unable to resolve '$func'" };
95     eval "require $pkg";
96     } until defined &$func;
97     }
98    
99     \&$func
100     }
101    
102 root 1.1 sub nonce($) {
103     my $nonce;
104    
105     if (open my $fh, "</dev/urandom") {
106     sysread $fh, $nonce, $_[0];
107     } else {
108     # shit...
109     our $nonce_init;
110 root 1.31 unless ($nonce_init++) {
111 root 1.1 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
112     }
113     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
114     }
115    
116     $nonce
117     }
118    
119 root 1.21 sub alnumbits($) {
120 root 1.1 my $data = $_[0];
121    
122     if (eval "use Math::GMP 2.05; 1") {
123     $data = Math::GMP::get_str_gmp (
124     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
125     62
126     );
127     } else {
128     $data = MIME::Base64::encode_base64 $data, "";
129     $data =~ s/=//;
130 root 1.31 $data =~ s/x/x0/g;
131     $data =~ s/\//x1/g;
132     $data =~ s/\+/x2/g;
133 root 1.1 }
134    
135     $data
136     }
137    
138     sub gen_uniq {
139 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
140 root 1.1 }
141    
142 root 1.20 our $CONFIG; # this node's configuration
143 root 1.21
144 root 1.36 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
145 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
146 root 1.21 our $NODE = "anon/$RUNIQ";
147 root 1.1 our $ID = "a";
148    
149     our %NODE; # node id to transport mapping, or "undef", for local node
150     our (%PORT, %PORT_DATA); # local ports
151    
152 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
153 root 1.1 our %LMON; # monitored _local_ ports
154    
155     our %LISTENER;
156 root 1.21 our $LISTENER; # our listeners, as arrayref
157 root 1.1
158     our $SRCNODE; # holds the sending node during _inject
159    
160     sub NODE() {
161     $NODE
162     }
163    
164     sub node_of($) {
165 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
166 root 1.1
167 root 1.21 $node
168 root 1.1 }
169    
170 root 1.17 BEGIN {
171     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
172     ? sub () { 1 }
173     : sub () { 0 };
174     }
175 root 1.1
176 root 1.42 our $DELAY_TIMER;
177     our @DELAY_QUEUE;
178    
179     sub _delay_run {
180     (shift @DELAY_QUEUE or return)->() while 1;
181     }
182    
183     sub delay($) {
184     push @DELAY_QUEUE, shift;
185     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
186     }
187    
188 root 1.1 sub _inject {
189 root 1.25 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
190 root 1.1 &{ $PORT{+shift} or return };
191     }
192    
193 root 1.20 # this function adds a node-ref, so you can send stuff to it
194     # it is basically the central routing component.
195 root 1.1 sub add_node {
196 root 1.21 my ($node) = @_;
197 root 1.1
198 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
199 root 1.13 }
200    
201 root 1.1 sub snd(@) {
202 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
203 root 1.1
204 root 1.25 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
205 root 1.1
206 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
207 root 1.2 ->{send} (["$portid", @_]);
208 root 1.1 }
209    
210 root 1.17 =item $is_local = port_is_local $port
211    
212     Returns true iff the port is a local port.
213    
214     =cut
215    
216     sub port_is_local($) {
217 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
218 root 1.17
219 root 1.21 $NODE{$nodeid} == $NODE{""}
220 root 1.17 }
221    
222 root 1.18 =item snd_to_func $node, $func, @args
223 root 1.11
224 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
225 root 1.11 this function with the given arguments on that node.
226    
227 root 1.20 This function can be used to implement C<spawn>-like interfaces.
228 root 1.11
229     =cut
230    
231 root 1.18 sub snd_to_func($$;@) {
232 root 1.21 my $nodeid = shift;
233 root 1.11
234 root 1.41 return ($NODE{$nodeid} || add_node $nodeid)->send (["", @_])
235     if $nodeid ne $NODE;
236    
237     # on $NODE, we artificially delay... (for spawn)
238     # this is very ugly - maybe we should simply delay ALL messages,
239     # to avoid deep recursion issues. but that's so... slow...
240     my @msg = ("", @_);
241 root 1.42 delay sub { $NODE{""}->send (\@msg) };
242 root 1.11 }
243    
244 root 1.18 =item snd_on $node, @msg
245    
246     Executes C<snd> with the given C<@msg> (which must include the destination
247     port) on the given node.
248    
249     =cut
250    
251     sub snd_on($@) {
252     my $node = shift;
253     snd $node, snd => @_;
254     }
255    
256 root 1.29 =item eval_on $node, $string[, @reply]
257 root 1.18
258 root 1.29 Evaluates the given string as Perl expression on the given node. When
259     @reply is specified, then it is used to construct a reply message with
260     C<"$@"> and any results from the eval appended.
261 root 1.18
262     =cut
263    
264 root 1.29 sub eval_on($$;@) {
265 root 1.18 my $node = shift;
266     snd $node, eval => @_;
267     }
268    
269 root 1.1 sub kil(@) {
270 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
271 root 1.1
272     length $portid
273 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
274 root 1.1
275 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
276 root 1.1 ->kill ("$portid", @_);
277     }
278    
279 root 1.7 sub _nodename {
280     require POSIX;
281     (POSIX::uname ())[1]
282     }
283    
284 root 1.21 sub _resolve($) {
285     my ($nodeid) = @_;
286 root 1.1
287     my $cv = AE::cv;
288     my @res;
289    
290     $cv->begin (sub {
291     my %seen;
292     my @refs;
293     for (sort { $a->[0] <=> $b->[0] } @res) {
294     push @refs, $_->[1] unless $seen{$_->[1]}++
295     }
296 root 1.21 shift->send (@refs);
297 root 1.1 });
298    
299     my $idx;
300 root 1.21 for my $t (split /,/, $nodeid) {
301 root 1.1 my $pri = ++$idx;
302 root 1.7
303     $t = length $t ? _nodename . ":$t" : _nodename
304     if $t =~ /^\d*$/;
305 root 1.1
306 root 1.34 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
307 root 1.7 or Carp::croak "$t: unparsable transport descriptor";
308    
309 root 1.33 $port = "0" if $port eq "*";
310    
311     if ($host eq "*") {
312     $cv->begin;
313     # use fork_call, as Net::Interface is big, and we need it rarely.
314     require AnyEvent::Util;
315     AnyEvent::Util::fork_call (
316     sub {
317     my @addr;
318    
319     require Net::Interface;
320    
321     for my $if (Net::Interface->interfaces) {
322     # we statically lower-prioritise ipv6 here, TODO :()
323     for my $_ ($if->address (Net::Interface::AF_INET ())) {
324     next if /^\x7f/; # skip localhost etc.
325     push @addr, $_;
326     }
327     for ($if->address (Net::Interface::AF_INET6 ())) {
328     #next if $if->scope ($_) <= 2;
329     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
330     push @addr, $_;
331     }
332    
333     }
334     @addr
335     }, sub {
336     for my $ip (@_) {
337     push @res, [
338     $pri += 1e-5,
339     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
340     ];
341     }
342     $cv->end;
343     }
344     );
345     } else {
346     $cv->begin;
347     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
348     for (@_) {
349     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
350     push @res, [
351     $pri += 1e-5,
352     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
353     ];
354     }
355     $cv->end;
356     };
357     }
358 root 1.1 }
359    
360     $cv->end;
361    
362     $cv
363     }
364    
365 root 1.39 sub configure(@) {
366     unshift @_, "profile" if @_ & 1;
367 root 1.34 my (%kv) = @_;
368    
369     my $profile = delete $kv{profile};
370 root 1.1
371 root 1.21 $profile = _nodename
372     unless defined $profile;
373 root 1.6
374 root 1.32 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
375 root 1.24
376     my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
377     $NODE = $node
378     unless $node eq "anon/";
379 root 1.6
380 root 1.21 $NODE{$NODE} = $NODE{""};
381     $NODE{$NODE}{id} = $NODE;
382 root 1.20
383 root 1.21 my $seeds = $CONFIG->{seeds};
384     my $binds = $CONFIG->{binds};
385 root 1.3
386 root 1.33 $binds ||= ["*"];
387 root 1.1
388 root 1.21 $WARN->(8, "node $NODE starting up.");
389 root 1.1
390 root 1.23 $LISTENER = [];
391     %LISTENER = ();
392    
393 root 1.21 for (map _resolve $_, @$binds) {
394     for my $bind ($_->recv) {
395     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
396     or Carp::croak "$bind: unparsable local bind address";
397 root 1.20
398 root 1.33 my $listener = AnyEvent::MP::Transport::mp_server
399     $host,
400     $port,
401     prepare => sub {
402     my (undef, $host, $port) = @_;
403     $bind = AnyEvent::Socket::format_hostport $host, $port;
404     },
405     ;
406     $LISTENER{$bind} = $listener;
407 root 1.21 push @$LISTENER, $bind;
408     }
409 root 1.1 }
410    
411 root 1.40 $WARN->(8, "node listens on [@$LISTENER].");
412    
413 root 1.21 # the global service is mandatory currently
414     require AnyEvent::MP::Global;
415 root 1.1
416 root 1.21 # connect to all seednodes
417     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
418 root 1.1
419 root 1.20 for (@{ $CONFIG->{services} }) {
420 root 1.13 if (s/::$//) {
421     eval "require $_";
422     die $@ if $@;
423     } else {
424     (load_func $_)->();
425     }
426     }
427 root 1.1 }
428    
429     #############################################################################
430 root 1.6 # node monitoring and info
431 root 1.3
432 root 1.21 =item node_is_known $nodeid
433 root 1.13
434     Returns true iff the given node is currently known to the system.
435    
436     =cut
437    
438     sub node_is_known($) {
439     exists $NODE{$_[0]}
440     }
441    
442 root 1.21 =item node_is_up $nodeid
443 root 1.13
444     Returns true if the given node is "up", that is, the kernel thinks it has
445     a working connection to it.
446    
447     If the node is known but not currently connected, returns C<0>. If the
448     node is not known, returns C<undef>.
449    
450     =cut
451    
452     sub node_is_up($) {
453     ($NODE{$_[0]} or return)->{transport}
454     ? 1 : 0
455     }
456    
457 root 1.3 =item known_nodes
458    
459 root 1.26 Returns the node IDs of all nodes currently known to this node, including
460     itself and nodes not currently connected.
461 root 1.3
462     =cut
463    
464     sub known_nodes {
465 root 1.26 map $_->{id}, values %NODE
466 root 1.3 }
467    
468     =item up_nodes
469    
470 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
471     the node itself).
472 root 1.3
473     =cut
474    
475     sub up_nodes {
476 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
477 root 1.3 }
478    
479 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
480 root 1.3
481 root 1.27 Registers a callback that is called each time a node goes up (a connection
482     is established) or down (the connection is lost).
483 root 1.3
484     Node up messages can only be followed by node down messages for the same
485     node, and vice versa.
486    
487 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
488     port. This function is mainly of interest to modules that are concerned
489     about the network topology and low-level connection handling.
490    
491     Callbacks I<must not> block and I<should not> send any messages.
492    
493     The function returns an optional guard which can be used to unregister
494 root 1.3 the monitoring callback again.
495    
496     =cut
497    
498     our %MON_NODES;
499    
500     sub mon_nodes($) {
501     my ($cb) = @_;
502    
503     $MON_NODES{$cb+0} = $cb;
504    
505     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
506     }
507    
508     sub _inject_nodeevent($$;@) {
509 root 1.16 my ($node, $up, @reason) = @_;
510 root 1.3
511     for my $cb (values %MON_NODES) {
512 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
513 root 1.16 or $WARN->(1, $@);
514 root 1.3 }
515 root 1.16
516 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
517 root 1.3 }
518    
519     #############################################################################
520 root 1.1 # self node code
521    
522     our %node_req = (
523     # internal services
524    
525     # monitoring
526 root 1.27 mon0 => sub { # stop monitoring a port
527 root 1.1 my $portid = shift;
528     my $node = $SRCNODE;
529     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
530     },
531 root 1.27 mon1 => sub { # start monitoring a port
532 root 1.1 my $portid = shift;
533     my $node = $SRCNODE;
534 root 1.27 Scalar::Util::weaken $node; #TODO# ugly
535 root 1.1 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
536 root 1.27 $node->send (["", kil => $portid, @_])
537 root 1.39 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
538 root 1.1 });
539     },
540     kil => sub {
541     my $cbs = delete $SRCNODE->{lmon}{+shift}
542     or return;
543    
544     $_->(@_) for @$cbs;
545     },
546    
547 root 1.18 # "public" services - not actually public
548 root 1.1
549     # relay message to another node / generic echo
550 root 1.15 snd => \&snd,
551 root 1.27 snd_multiple => sub {
552 root 1.1 snd @$_ for @_
553     },
554    
555 root 1.4 # informational
556     info => sub {
557     snd @_, $NODE;
558     },
559     known_nodes => sub {
560     snd @_, known_nodes;
561     },
562     up_nodes => sub {
563     snd @_, up_nodes;
564     },
565    
566 root 1.30 # random utilities
567 root 1.1 eval => sub {
568     my @res = eval shift;
569     snd @_, "$@", @res if @_;
570     },
571     time => sub {
572     snd @_, AE::time;
573     },
574     devnull => sub {
575     #
576     },
577 root 1.15 "" => sub {
578 root 1.27 # empty messages are keepalives or similar devnull-applications
579 root 1.15 },
580 root 1.1 );
581    
582 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
583 root 1.1 $PORT{""} = sub {
584     my $tag = shift;
585     eval { &{ $node_req{$tag} ||= load_func $tag } };
586 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
587 root 1.1 };
588    
589     =back
590    
591     =head1 SEE ALSO
592    
593     L<AnyEvent::MP>.
594    
595     =head1 AUTHOR
596    
597     Marc Lehmann <schmorp@schmorp.de>
598     http://home.schmorp.de/
599    
600     =cut
601    
602     1
603