ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.44
Committed: Sun Sep 6 00:13:21 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.43: +9 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.37 our $VERSION = '1.0';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.21 add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.34 NODE $NODE node_of snd kil port_is_local
44     configure
45 root 1.13 known_nodes up_nodes mon_nodes node_is_known node_is_up
46 root 1.1 );
47    
48 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49 root 1.1
50 root 1.27 This value is called with an error or warning message, when e.g. a
51     connection could not be created, authorisation failed and so on.
52    
53     It I<must not> block or send messages -queue it and use an idle watcher if
54     you need to do any of these things.
55 root 1.1
56 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
57 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58     node connectivity and services, C<8> for debugging messages and C<9> for
59     tracing messages.
60    
61 root 1.1 The default simply logs the message to STDERR.
62    
63 root 1.44 =item @AnyEvent::MP::Kernel::WARN
64    
65     All code references in this array are called for every log message, from
66     the default C<$WARN> handler. This is an easy way to tie into the log
67     messages without disturbing others.
68    
69 root 1.1 =cut
70    
71 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
72 root 1.44 our @WARN;
73     our $WARN = sub {
74     &$_ for @WARN;
75 root 1.29
76     return if $WARNLEVEL < $_[0];
77    
78 root 1.16 my ($level, $msg) = @_;
79    
80 root 1.1 $msg =~ s/\n$//;
81 root 1.16
82     printf STDERR "%s <%d> %s\n",
83     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
84     $level,
85     $msg;
86 root 1.1 };
87    
88 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
89    
90     The maximum level at which warning messages will be printed to STDERR by
91     the default warn handler.
92    
93     =cut
94    
95 root 1.6 sub load_func($) {
96     my $func = $_[0];
97    
98     unless (defined &$func) {
99     my $pkg = $func;
100     do {
101     $pkg =~ s/::[^:]+$//
102     or return sub { die "unable to resolve '$func'" };
103     eval "require $pkg";
104     } until defined &$func;
105     }
106    
107     \&$func
108     }
109    
110 root 1.1 sub nonce($) {
111     my $nonce;
112    
113     if (open my $fh, "</dev/urandom") {
114     sysread $fh, $nonce, $_[0];
115     } else {
116     # shit...
117     our $nonce_init;
118 root 1.31 unless ($nonce_init++) {
119 root 1.1 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
120     }
121     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
122     }
123    
124     $nonce
125     }
126    
127 root 1.21 sub alnumbits($) {
128 root 1.1 my $data = $_[0];
129    
130     if (eval "use Math::GMP 2.05; 1") {
131     $data = Math::GMP::get_str_gmp (
132     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
133     62
134     );
135     } else {
136     $data = MIME::Base64::encode_base64 $data, "";
137     $data =~ s/=//;
138 root 1.31 $data =~ s/x/x0/g;
139     $data =~ s/\//x1/g;
140     $data =~ s/\+/x2/g;
141 root 1.1 }
142    
143     $data
144     }
145    
146     sub gen_uniq {
147 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
148 root 1.1 }
149    
150 root 1.20 our $CONFIG; # this node's configuration
151 root 1.21
152 root 1.36 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
153 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
154 root 1.21 our $NODE = "anon/$RUNIQ";
155 root 1.1 our $ID = "a";
156    
157     our %NODE; # node id to transport mapping, or "undef", for local node
158     our (%PORT, %PORT_DATA); # local ports
159    
160 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
161 root 1.1 our %LMON; # monitored _local_ ports
162    
163     our %LISTENER;
164 root 1.21 our $LISTENER; # our listeners, as arrayref
165 root 1.1
166     our $SRCNODE; # holds the sending node during _inject
167    
168     sub NODE() {
169     $NODE
170     }
171    
172     sub node_of($) {
173 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
174 root 1.1
175 root 1.21 $node
176 root 1.1 }
177    
178 root 1.17 BEGIN {
179     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
180     ? sub () { 1 }
181     : sub () { 0 };
182     }
183 root 1.1
184 root 1.42 our $DELAY_TIMER;
185     our @DELAY_QUEUE;
186    
187     sub _delay_run {
188     (shift @DELAY_QUEUE or return)->() while 1;
189     }
190    
191     sub delay($) {
192     push @DELAY_QUEUE, shift;
193     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
194     }
195    
196 root 1.1 sub _inject {
197 root 1.25 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
198 root 1.1 &{ $PORT{+shift} or return };
199     }
200    
201 root 1.20 # this function adds a node-ref, so you can send stuff to it
202     # it is basically the central routing component.
203 root 1.1 sub add_node {
204 root 1.21 my ($node) = @_;
205 root 1.1
206 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
207 root 1.13 }
208    
209 root 1.1 sub snd(@) {
210 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
211 root 1.1
212 root 1.25 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
213 root 1.1
214 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
215 root 1.2 ->{send} (["$portid", @_]);
216 root 1.1 }
217    
218 root 1.17 =item $is_local = port_is_local $port
219    
220     Returns true iff the port is a local port.
221    
222     =cut
223    
224     sub port_is_local($) {
225 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
226 root 1.17
227 root 1.21 $NODE{$nodeid} == $NODE{""}
228 root 1.17 }
229    
230 root 1.18 =item snd_to_func $node, $func, @args
231 root 1.11
232 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
233 root 1.11 this function with the given arguments on that node.
234    
235 root 1.20 This function can be used to implement C<spawn>-like interfaces.
236 root 1.11
237     =cut
238    
239 root 1.18 sub snd_to_func($$;@) {
240 root 1.21 my $nodeid = shift;
241 root 1.11
242 root 1.41 return ($NODE{$nodeid} || add_node $nodeid)->send (["", @_])
243     if $nodeid ne $NODE;
244    
245     # on $NODE, we artificially delay... (for spawn)
246     # this is very ugly - maybe we should simply delay ALL messages,
247     # to avoid deep recursion issues. but that's so... slow...
248     my @msg = ("", @_);
249 root 1.42 delay sub { $NODE{""}->send (\@msg) };
250 root 1.11 }
251    
252 root 1.18 =item snd_on $node, @msg
253    
254     Executes C<snd> with the given C<@msg> (which must include the destination
255     port) on the given node.
256    
257     =cut
258    
259     sub snd_on($@) {
260     my $node = shift;
261     snd $node, snd => @_;
262     }
263    
264 root 1.29 =item eval_on $node, $string[, @reply]
265 root 1.18
266 root 1.29 Evaluates the given string as Perl expression on the given node. When
267     @reply is specified, then it is used to construct a reply message with
268     C<"$@"> and any results from the eval appended.
269 root 1.18
270     =cut
271    
272 root 1.29 sub eval_on($$;@) {
273 root 1.18 my $node = shift;
274     snd $node, eval => @_;
275     }
276    
277 root 1.1 sub kil(@) {
278 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
279 root 1.1
280     length $portid
281 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
282 root 1.1
283 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
284 root 1.1 ->kill ("$portid", @_);
285     }
286    
287 root 1.7 sub _nodename {
288     require POSIX;
289     (POSIX::uname ())[1]
290     }
291    
292 root 1.21 sub _resolve($) {
293     my ($nodeid) = @_;
294 root 1.1
295     my $cv = AE::cv;
296     my @res;
297    
298     $cv->begin (sub {
299     my %seen;
300     my @refs;
301     for (sort { $a->[0] <=> $b->[0] } @res) {
302     push @refs, $_->[1] unless $seen{$_->[1]}++
303     }
304 root 1.21 shift->send (@refs);
305 root 1.1 });
306    
307     my $idx;
308 root 1.21 for my $t (split /,/, $nodeid) {
309 root 1.1 my $pri = ++$idx;
310 root 1.7
311     $t = length $t ? _nodename . ":$t" : _nodename
312     if $t =~ /^\d*$/;
313 root 1.1
314 root 1.34 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
315 root 1.7 or Carp::croak "$t: unparsable transport descriptor";
316    
317 root 1.33 $port = "0" if $port eq "*";
318    
319     if ($host eq "*") {
320     $cv->begin;
321     # use fork_call, as Net::Interface is big, and we need it rarely.
322     require AnyEvent::Util;
323     AnyEvent::Util::fork_call (
324     sub {
325     my @addr;
326    
327     require Net::Interface;
328    
329     for my $if (Net::Interface->interfaces) {
330     # we statically lower-prioritise ipv6 here, TODO :()
331     for my $_ ($if->address (Net::Interface::AF_INET ())) {
332     next if /^\x7f/; # skip localhost etc.
333     push @addr, $_;
334     }
335     for ($if->address (Net::Interface::AF_INET6 ())) {
336     #next if $if->scope ($_) <= 2;
337     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
338     push @addr, $_;
339     }
340    
341     }
342     @addr
343     }, sub {
344     for my $ip (@_) {
345     push @res, [
346     $pri += 1e-5,
347     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
348     ];
349     }
350     $cv->end;
351     }
352     );
353     } else {
354     $cv->begin;
355     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
356     for (@_) {
357     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
358     push @res, [
359     $pri += 1e-5,
360     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
361     ];
362     }
363     $cv->end;
364     };
365     }
366 root 1.1 }
367    
368     $cv->end;
369    
370     $cv
371     }
372    
373 root 1.39 sub configure(@) {
374     unshift @_, "profile" if @_ & 1;
375 root 1.34 my (%kv) = @_;
376    
377     my $profile = delete $kv{profile};
378 root 1.1
379 root 1.21 $profile = _nodename
380     unless defined $profile;
381 root 1.6
382 root 1.32 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
383 root 1.24
384     my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
385     $NODE = $node
386     unless $node eq "anon/";
387 root 1.6
388 root 1.21 $NODE{$NODE} = $NODE{""};
389     $NODE{$NODE}{id} = $NODE;
390 root 1.20
391 root 1.21 my $seeds = $CONFIG->{seeds};
392     my $binds = $CONFIG->{binds};
393 root 1.3
394 root 1.33 $binds ||= ["*"];
395 root 1.1
396 root 1.21 $WARN->(8, "node $NODE starting up.");
397 root 1.1
398 root 1.23 $LISTENER = [];
399     %LISTENER = ();
400    
401 root 1.21 for (map _resolve $_, @$binds) {
402     for my $bind ($_->recv) {
403     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
404     or Carp::croak "$bind: unparsable local bind address";
405 root 1.20
406 root 1.33 my $listener = AnyEvent::MP::Transport::mp_server
407     $host,
408     $port,
409     prepare => sub {
410     my (undef, $host, $port) = @_;
411     $bind = AnyEvent::Socket::format_hostport $host, $port;
412     },
413     ;
414     $LISTENER{$bind} = $listener;
415 root 1.21 push @$LISTENER, $bind;
416     }
417 root 1.1 }
418    
419 root 1.40 $WARN->(8, "node listens on [@$LISTENER].");
420    
421 root 1.21 # the global service is mandatory currently
422     require AnyEvent::MP::Global;
423 root 1.1
424 root 1.21 # connect to all seednodes
425     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
426 root 1.1
427 root 1.20 for (@{ $CONFIG->{services} }) {
428 root 1.43 if (ref) {
429     my ($func, @args) = @$_;
430     (load_func $func)->(@args);
431     } elsif (s/::$//) {
432 root 1.13 eval "require $_";
433     die $@ if $@;
434     } else {
435     (load_func $_)->();
436     }
437     }
438 root 1.1 }
439    
440     #############################################################################
441 root 1.6 # node monitoring and info
442 root 1.3
443 root 1.21 =item node_is_known $nodeid
444 root 1.13
445     Returns true iff the given node is currently known to the system.
446    
447     =cut
448    
449     sub node_is_known($) {
450     exists $NODE{$_[0]}
451     }
452    
453 root 1.21 =item node_is_up $nodeid
454 root 1.13
455     Returns true if the given node is "up", that is, the kernel thinks it has
456     a working connection to it.
457    
458     If the node is known but not currently connected, returns C<0>. If the
459     node is not known, returns C<undef>.
460    
461     =cut
462    
463     sub node_is_up($) {
464     ($NODE{$_[0]} or return)->{transport}
465     ? 1 : 0
466     }
467    
468 root 1.3 =item known_nodes
469    
470 root 1.26 Returns the node IDs of all nodes currently known to this node, including
471     itself and nodes not currently connected.
472 root 1.3
473     =cut
474    
475     sub known_nodes {
476 root 1.26 map $_->{id}, values %NODE
477 root 1.3 }
478    
479     =item up_nodes
480    
481 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
482     the node itself).
483 root 1.3
484     =cut
485    
486     sub up_nodes {
487 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
488 root 1.3 }
489    
490 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
491 root 1.3
492 root 1.27 Registers a callback that is called each time a node goes up (a connection
493     is established) or down (the connection is lost).
494 root 1.3
495     Node up messages can only be followed by node down messages for the same
496     node, and vice versa.
497    
498 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
499     port. This function is mainly of interest to modules that are concerned
500     about the network topology and low-level connection handling.
501    
502     Callbacks I<must not> block and I<should not> send any messages.
503    
504     The function returns an optional guard which can be used to unregister
505 root 1.3 the monitoring callback again.
506    
507     =cut
508    
509     our %MON_NODES;
510    
511     sub mon_nodes($) {
512     my ($cb) = @_;
513    
514     $MON_NODES{$cb+0} = $cb;
515    
516     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
517     }
518    
519     sub _inject_nodeevent($$;@) {
520 root 1.16 my ($node, $up, @reason) = @_;
521 root 1.3
522     for my $cb (values %MON_NODES) {
523 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
524 root 1.16 or $WARN->(1, $@);
525 root 1.3 }
526 root 1.16
527 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
528 root 1.3 }
529    
530     #############################################################################
531 root 1.1 # self node code
532    
533     our %node_req = (
534     # internal services
535    
536     # monitoring
537 root 1.27 mon0 => sub { # stop monitoring a port
538 root 1.1 my $portid = shift;
539     my $node = $SRCNODE;
540     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
541     },
542 root 1.27 mon1 => sub { # start monitoring a port
543 root 1.1 my $portid = shift;
544     my $node = $SRCNODE;
545 root 1.27 Scalar::Util::weaken $node; #TODO# ugly
546 root 1.1 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
547 root 1.27 $node->send (["", kil => $portid, @_])
548 root 1.39 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
549 root 1.1 });
550     },
551     kil => sub {
552     my $cbs = delete $SRCNODE->{lmon}{+shift}
553     or return;
554    
555     $_->(@_) for @$cbs;
556     },
557    
558 root 1.18 # "public" services - not actually public
559 root 1.1
560     # relay message to another node / generic echo
561 root 1.15 snd => \&snd,
562 root 1.27 snd_multiple => sub {
563 root 1.1 snd @$_ for @_
564     },
565    
566 root 1.4 # informational
567     info => sub {
568     snd @_, $NODE;
569     },
570     known_nodes => sub {
571     snd @_, known_nodes;
572     },
573     up_nodes => sub {
574     snd @_, up_nodes;
575     },
576    
577 root 1.30 # random utilities
578 root 1.1 eval => sub {
579     my @res = eval shift;
580     snd @_, "$@", @res if @_;
581     },
582     time => sub {
583     snd @_, AE::time;
584     },
585     devnull => sub {
586     #
587     },
588 root 1.15 "" => sub {
589 root 1.27 # empty messages are keepalives or similar devnull-applications
590 root 1.15 },
591 root 1.1 );
592    
593 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
594 root 1.1 $PORT{""} = sub {
595     my $tag = shift;
596     eval { &{ $node_req{$tag} ||= load_func $tag } };
597 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
598 root 1.1 };
599    
600     =back
601    
602     =head1 SEE ALSO
603    
604     L<AnyEvent::MP>.
605    
606     =head1 AUTHOR
607    
608     Marc Lehmann <schmorp@schmorp.de>
609     http://home.schmorp.de/
610    
611     =cut
612    
613     1
614