ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.63
Committed: Sat Nov 7 02:36:31 2009 UTC (14 years, 6 months ago) by root
Branch: MAIN
Changes since 1.62: +1 -1 lines
Log Message:
bleh

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38     our @EXPORT = qw(
39 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 root 1.21 add_node load_func snd_to_func snd_on eval_on
41 root 1.1
42 root 1.34 NODE $NODE node_of snd kil port_is_local
43     configure
44 root 1.46 up_nodes mon_nodes node_is_up
45 root 1.1 );
46    
47 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
48 root 1.1
49 root 1.27 This value is called with an error or warning message, when e.g. a
50     connection could not be created, authorisation failed and so on.
51    
52     It I<must not> block or send messages -queue it and use an idle watcher if
53     you need to do any of these things.
54 root 1.1
55 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
56 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
57     node connectivity and services, C<8> for debugging messages and C<9> for
58     tracing messages.
59    
60 root 1.1 The default simply logs the message to STDERR.
61    
62 root 1.44 =item @AnyEvent::MP::Kernel::WARN
63    
64     All code references in this array are called for every log message, from
65     the default C<$WARN> handler. This is an easy way to tie into the log
66     messages without disturbing others.
67    
68 root 1.1 =cut
69    
70 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
71 root 1.44 our @WARN;
72     our $WARN = sub {
73     &$_ for @WARN;
74 root 1.29
75     return if $WARNLEVEL < $_[0];
76    
77 root 1.16 my ($level, $msg) = @_;
78    
79 root 1.1 $msg =~ s/\n$//;
80 root 1.16
81     printf STDERR "%s <%d> %s\n",
82     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
83     $level,
84     $msg;
85 root 1.1 };
86    
87 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
88    
89     The maximum level at which warning messages will be printed to STDERR by
90     the default warn handler.
91    
92     =cut
93    
94 root 1.6 sub load_func($) {
95     my $func = $_[0];
96    
97     unless (defined &$func) {
98     my $pkg = $func;
99     do {
100     $pkg =~ s/::[^:]+$//
101 root 1.63 or return sub { die "unable to resolve function '$func'" };
102 root 1.60
103     local $@;
104 root 1.61 unless (eval "require $pkg; 1") {
105     my $error = $@;
106     $error =~ /^Can't locate .*.pm in \@INC \(/
107     or return sub { die $error };
108     }
109 root 1.6 } until defined &$func;
110     }
111    
112     \&$func
113     }
114    
115 root 1.1 sub nonce($) {
116     my $nonce;
117    
118     if (open my $fh, "</dev/urandom") {
119     sysread $fh, $nonce, $_[0];
120     } else {
121     # shit...
122     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
123     }
124    
125     $nonce
126     }
127    
128 root 1.21 sub alnumbits($) {
129 root 1.1 my $data = $_[0];
130    
131     if (eval "use Math::GMP 2.05; 1") {
132     $data = Math::GMP::get_str_gmp (
133     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
134     62
135     );
136     } else {
137     $data = MIME::Base64::encode_base64 $data, "";
138     $data =~ s/=//;
139 root 1.31 $data =~ s/x/x0/g;
140     $data =~ s/\//x1/g;
141     $data =~ s/\+/x2/g;
142 root 1.1 }
143    
144     $data
145     }
146    
147     sub gen_uniq {
148 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
149 root 1.1 }
150    
151 root 1.20 our $CONFIG; # this node's configuration
152 root 1.21
153 root 1.36 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
154 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
155 root 1.21 our $NODE = "anon/$RUNIQ";
156 root 1.1 our $ID = "a";
157    
158     our %NODE; # node id to transport mapping, or "undef", for local node
159     our (%PORT, %PORT_DATA); # local ports
160    
161 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
162 root 1.1 our %LMON; # monitored _local_ ports
163    
164     our %LISTENER;
165 root 1.21 our $LISTENER; # our listeners, as arrayref
166 root 1.1
167     our $SRCNODE; # holds the sending node during _inject
168    
169     sub NODE() {
170     $NODE
171     }
172    
173     sub node_of($) {
174 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
175 root 1.1
176 root 1.21 $node
177 root 1.1 }
178    
179 root 1.17 BEGIN {
180     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
181     ? sub () { 1 }
182     : sub () { 0 };
183     }
184 root 1.1
185 root 1.42 our $DELAY_TIMER;
186     our @DELAY_QUEUE;
187    
188     sub _delay_run {
189 root 1.55 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
190 root 1.42 }
191    
192     sub delay($) {
193     push @DELAY_QUEUE, shift;
194     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
195     }
196    
197 root 1.1 sub _inject {
198 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
199 root 1.1 &{ $PORT{+shift} or return };
200     }
201    
202 root 1.20 # this function adds a node-ref, so you can send stuff to it
203     # it is basically the central routing component.
204 root 1.1 sub add_node {
205 root 1.21 my ($node) = @_;
206 root 1.1
207 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
208 root 1.13 }
209    
210 root 1.1 sub snd(@) {
211 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
212 root 1.1
213 root 1.48 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
214 root 1.1
215 root 1.49 defined $nodeid #d#UGLY
216     or Carp::croak "'undef' is not a valid node ID/port ID";
217    
218 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
219 root 1.2 ->{send} (["$portid", @_]);
220 root 1.1 }
221    
222 root 1.17 =item $is_local = port_is_local $port
223    
224     Returns true iff the port is a local port.
225    
226     =cut
227    
228     sub port_is_local($) {
229 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
230 root 1.17
231 root 1.21 $NODE{$nodeid} == $NODE{""}
232 root 1.17 }
233    
234 root 1.18 =item snd_to_func $node, $func, @args
235 root 1.11
236 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
237 root 1.11 this function with the given arguments on that node.
238    
239 root 1.20 This function can be used to implement C<spawn>-like interfaces.
240 root 1.11
241     =cut
242    
243 root 1.18 sub snd_to_func($$;@) {
244 root 1.21 my $nodeid = shift;
245 root 1.11
246 root 1.41 # on $NODE, we artificially delay... (for spawn)
247     # this is very ugly - maybe we should simply delay ALL messages,
248     # to avoid deep recursion issues. but that's so... slow...
249 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
250     if $nodeid ne $NODE;
251    
252 root 1.49 defined $nodeid #d#UGLY
253     or Carp::croak "'undef' is not a valid node ID/port ID";
254    
255 root 1.45 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
256 root 1.11 }
257    
258 root 1.18 =item snd_on $node, @msg
259    
260     Executes C<snd> with the given C<@msg> (which must include the destination
261     port) on the given node.
262    
263     =cut
264    
265     sub snd_on($@) {
266     my $node = shift;
267     snd $node, snd => @_;
268     }
269    
270 root 1.29 =item eval_on $node, $string[, @reply]
271 root 1.18
272 root 1.29 Evaluates the given string as Perl expression on the given node. When
273     @reply is specified, then it is used to construct a reply message with
274     C<"$@"> and any results from the eval appended.
275 root 1.18
276     =cut
277    
278 root 1.29 sub eval_on($$;@) {
279 root 1.18 my $node = shift;
280     snd $node, eval => @_;
281     }
282    
283 root 1.1 sub kil(@) {
284 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
285 root 1.1
286     length $portid
287 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
288 root 1.1
289 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
290 root 1.1 ->kill ("$portid", @_);
291     }
292    
293 root 1.7 sub _nodename {
294     require POSIX;
295     (POSIX::uname ())[1]
296     }
297    
298 root 1.21 sub _resolve($) {
299     my ($nodeid) = @_;
300 root 1.1
301     my $cv = AE::cv;
302     my @res;
303    
304     $cv->begin (sub {
305     my %seen;
306     my @refs;
307     for (sort { $a->[0] <=> $b->[0] } @res) {
308     push @refs, $_->[1] unless $seen{$_->[1]}++
309     }
310 root 1.21 shift->send (@refs);
311 root 1.1 });
312    
313     my $idx;
314 root 1.21 for my $t (split /,/, $nodeid) {
315 root 1.1 my $pri = ++$idx;
316 root 1.7
317     $t = length $t ? _nodename . ":$t" : _nodename
318     if $t =~ /^\d*$/;
319 root 1.1
320 root 1.34 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
321 root 1.7 or Carp::croak "$t: unparsable transport descriptor";
322    
323 root 1.33 $port = "0" if $port eq "*";
324    
325     if ($host eq "*") {
326     $cv->begin;
327     # use fork_call, as Net::Interface is big, and we need it rarely.
328     require AnyEvent::Util;
329     AnyEvent::Util::fork_call (
330     sub {
331     my @addr;
332    
333     require Net::Interface;
334    
335     for my $if (Net::Interface->interfaces) {
336     # we statically lower-prioritise ipv6 here, TODO :()
337 root 1.47 for $_ ($if->address (Net::Interface::AF_INET ())) {
338 root 1.33 next if /^\x7f/; # skip localhost etc.
339     push @addr, $_;
340     }
341     for ($if->address (Net::Interface::AF_INET6 ())) {
342     #next if $if->scope ($_) <= 2;
343     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
344     push @addr, $_;
345     }
346    
347     }
348     @addr
349     }, sub {
350     for my $ip (@_) {
351     push @res, [
352     $pri += 1e-5,
353     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
354     ];
355     }
356     $cv->end;
357     }
358     );
359     } else {
360     $cv->begin;
361     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
362     for (@_) {
363     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
364     push @res, [
365     $pri += 1e-5,
366     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
367     ];
368     }
369     $cv->end;
370     };
371     }
372 root 1.1 }
373    
374     $cv->end;
375    
376     $cv
377     }
378    
379 root 1.39 sub configure(@) {
380     unshift @_, "profile" if @_ & 1;
381 root 1.34 my (%kv) = @_;
382    
383     my $profile = delete $kv{profile};
384 root 1.1
385 root 1.21 $profile = _nodename
386     unless defined $profile;
387 root 1.6
388 root 1.32 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
389 root 1.24
390 root 1.47 delete $NODE{$NODE}; # we do not support doing stuff before configure
391    
392 root 1.24 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
393 root 1.51
394     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
395    
396 root 1.24 $NODE = $node
397     unless $node eq "anon/";
398 root 1.6
399 root 1.21 $NODE{$NODE} = $NODE{""};
400     $NODE{$NODE}{id} = $NODE;
401 root 1.20
402 root 1.21 my $seeds = $CONFIG->{seeds};
403     my $binds = $CONFIG->{binds};
404 root 1.3
405 root 1.33 $binds ||= ["*"];
406 root 1.1
407 root 1.21 $WARN->(8, "node $NODE starting up.");
408 root 1.1
409 root 1.23 $LISTENER = [];
410     %LISTENER = ();
411    
412 root 1.21 for (map _resolve $_, @$binds) {
413     for my $bind ($_->recv) {
414     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
415     or Carp::croak "$bind: unparsable local bind address";
416 root 1.20
417 root 1.33 my $listener = AnyEvent::MP::Transport::mp_server
418     $host,
419     $port,
420     prepare => sub {
421     my (undef, $host, $port) = @_;
422     $bind = AnyEvent::Socket::format_hostport $host, $port;
423 root 1.53 0
424 root 1.33 },
425     ;
426     $LISTENER{$bind} = $listener;
427 root 1.21 push @$LISTENER, $bind;
428     }
429 root 1.1 }
430    
431 root 1.40 $WARN->(8, "node listens on [@$LISTENER].");
432    
433 root 1.21 # the global service is mandatory currently
434     require AnyEvent::MP::Global;
435 root 1.1
436 root 1.21 # connect to all seednodes
437     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
438 root 1.1
439 root 1.20 for (@{ $CONFIG->{services} }) {
440 root 1.43 if (ref) {
441     my ($func, @args) = @$_;
442     (load_func $func)->(@args);
443     } elsif (s/::$//) {
444 root 1.13 eval "require $_";
445     die $@ if $@;
446     } else {
447     (load_func $_)->();
448     }
449     }
450 root 1.1 }
451    
452     #############################################################################
453 root 1.6 # node monitoring and info
454 root 1.3
455 root 1.21 =item node_is_known $nodeid
456 root 1.13
457 root 1.46 Returns true iff the given node is currently known to the system. The only
458     time a node is known but not up currently is when a conenction request is
459     pending.
460 root 1.13
461     =cut
462    
463     sub node_is_known($) {
464     exists $NODE{$_[0]}
465     }
466    
467 root 1.21 =item node_is_up $nodeid
468 root 1.13
469     Returns true if the given node is "up", that is, the kernel thinks it has
470     a working connection to it.
471    
472     If the node is known but not currently connected, returns C<0>. If the
473     node is not known, returns C<undef>.
474    
475     =cut
476    
477     sub node_is_up($) {
478     ($NODE{$_[0]} or return)->{transport}
479     ? 1 : 0
480     }
481    
482 root 1.3 =item known_nodes
483    
484 root 1.26 Returns the node IDs of all nodes currently known to this node, including
485     itself and nodes not currently connected.
486 root 1.3
487     =cut
488    
489 root 1.49 sub known_nodes() {
490 root 1.26 map $_->{id}, values %NODE
491 root 1.3 }
492    
493     =item up_nodes
494    
495 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
496     the node itself).
497 root 1.3
498     =cut
499    
500 root 1.49 sub up_nodes() {
501 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
502 root 1.3 }
503    
504 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
505 root 1.3
506 root 1.27 Registers a callback that is called each time a node goes up (a connection
507     is established) or down (the connection is lost).
508 root 1.3
509     Node up messages can only be followed by node down messages for the same
510     node, and vice versa.
511    
512 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
513     port. This function is mainly of interest to modules that are concerned
514     about the network topology and low-level connection handling.
515    
516     Callbacks I<must not> block and I<should not> send any messages.
517    
518     The function returns an optional guard which can be used to unregister
519 root 1.3 the monitoring callback again.
520    
521 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
522     or go up (and down).
523    
524     newnode $_, 1 for up_nodes;
525     mon_nodes \&newnode;
526    
527 root 1.3 =cut
528    
529     our %MON_NODES;
530    
531     sub mon_nodes($) {
532     my ($cb) = @_;
533    
534     $MON_NODES{$cb+0} = $cb;
535    
536 root 1.62 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
537 root 1.3 }
538    
539     sub _inject_nodeevent($$;@) {
540 root 1.16 my ($node, $up, @reason) = @_;
541 root 1.3
542     for my $cb (values %MON_NODES) {
543 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
544 root 1.16 or $WARN->(1, $@);
545 root 1.3 }
546 root 1.16
547 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
548 root 1.3 }
549    
550     #############################################################################
551 root 1.1 # self node code
552    
553     our %node_req = (
554     # internal services
555    
556     # monitoring
557 root 1.27 mon0 => sub { # stop monitoring a port
558 root 1.1 my $portid = shift;
559     my $node = $SRCNODE;
560     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
561     },
562 root 1.27 mon1 => sub { # start monitoring a port
563 root 1.1 my $portid = shift;
564     my $node = $SRCNODE;
565 root 1.59 Scalar::Util::weaken $node;
566 root 1.1 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
567 root 1.58 delete $node->{rmon}{$portid};
568 root 1.27 $node->send (["", kil => $portid, @_])
569 root 1.59 if $node && $node->{transport};
570 root 1.1 });
571     },
572     kil => sub {
573     my $cbs = delete $SRCNODE->{lmon}{+shift}
574     or return;
575    
576     $_->(@_) for @$cbs;
577     },
578    
579 root 1.18 # "public" services - not actually public
580 root 1.1
581     # relay message to another node / generic echo
582 root 1.15 snd => \&snd,
583 root 1.27 snd_multiple => sub {
584 root 1.1 snd @$_ for @_
585     },
586    
587 root 1.4 # informational
588     info => sub {
589     snd @_, $NODE;
590     },
591     known_nodes => sub {
592     snd @_, known_nodes;
593     },
594     up_nodes => sub {
595     snd @_, up_nodes;
596     },
597    
598 root 1.30 # random utilities
599 root 1.1 eval => sub {
600 root 1.50 my @res = do { package main; eval shift };
601 root 1.1 snd @_, "$@", @res if @_;
602     },
603     time => sub {
604     snd @_, AE::time;
605     },
606     devnull => sub {
607     #
608     },
609 root 1.15 "" => sub {
610 root 1.27 # empty messages are keepalives or similar devnull-applications
611 root 1.15 },
612 root 1.1 );
613    
614 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
615 root 1.1 $PORT{""} = sub {
616     my $tag = shift;
617     eval { &{ $node_req{$tag} ||= load_func $tag } };
618 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
619 root 1.1 };
620    
621     =back
622    
623     =head1 SEE ALSO
624    
625     L<AnyEvent::MP>.
626    
627     =head1 AUTHOR
628    
629     Marc Lehmann <schmorp@schmorp.de>
630     http://home.schmorp.de/
631    
632     =cut
633    
634     1
635