ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.41
Committed: Fri Sep 4 22:30:29 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.40: +11 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.37 our $VERSION = '1.0';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.21 add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.34 NODE $NODE node_of snd kil port_is_local
44     configure
45 root 1.13 known_nodes up_nodes mon_nodes node_is_known node_is_up
46 root 1.1 );
47    
48 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49 root 1.1
50 root 1.27 This value is called with an error or warning message, when e.g. a
51     connection could not be created, authorisation failed and so on.
52    
53     It I<must not> block or send messages -queue it and use an idle watcher if
54     you need to do any of these things.
55 root 1.1
56 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
57 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58     node connectivity and services, C<8> for debugging messages and C<9> for
59     tracing messages.
60    
61 root 1.1 The default simply logs the message to STDERR.
62    
63     =cut
64    
65 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
66    
67 root 1.1 our $WARN = sub {
68 root 1.29 return if $WARNLEVEL < $_[0];
69    
70 root 1.16 my ($level, $msg) = @_;
71    
72 root 1.1 $msg =~ s/\n$//;
73 root 1.16
74     printf STDERR "%s <%d> %s\n",
75     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
76     $level,
77     $msg;
78 root 1.1 };
79    
80 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
81    
82     The maximum level at which warning messages will be printed to STDERR by
83     the default warn handler.
84    
85     =cut
86    
87 root 1.6 sub load_func($) {
88     my $func = $_[0];
89    
90     unless (defined &$func) {
91     my $pkg = $func;
92     do {
93     $pkg =~ s/::[^:]+$//
94     or return sub { die "unable to resolve '$func'" };
95     eval "require $pkg";
96     } until defined &$func;
97     }
98    
99     \&$func
100     }
101    
102 root 1.1 sub nonce($) {
103     my $nonce;
104    
105     if (open my $fh, "</dev/urandom") {
106     sysread $fh, $nonce, $_[0];
107     } else {
108     # shit...
109     our $nonce_init;
110 root 1.31 unless ($nonce_init++) {
111 root 1.1 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
112     }
113     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
114     }
115    
116     $nonce
117     }
118    
119 root 1.21 sub alnumbits($) {
120 root 1.1 my $data = $_[0];
121    
122     if (eval "use Math::GMP 2.05; 1") {
123     $data = Math::GMP::get_str_gmp (
124     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
125     62
126     );
127     } else {
128     $data = MIME::Base64::encode_base64 $data, "";
129     $data =~ s/=//;
130 root 1.31 $data =~ s/x/x0/g;
131     $data =~ s/\//x1/g;
132     $data =~ s/\+/x2/g;
133 root 1.1 }
134    
135     $data
136     }
137    
138     sub gen_uniq {
139 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
140 root 1.1 }
141    
142 root 1.20 our $CONFIG; # this node's configuration
143 root 1.21
144 root 1.36 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
145 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
146 root 1.21 our $NODE = "anon/$RUNIQ";
147 root 1.1 our $ID = "a";
148    
149     our %NODE; # node id to transport mapping, or "undef", for local node
150     our (%PORT, %PORT_DATA); # local ports
151    
152 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
153 root 1.1 our %LMON; # monitored _local_ ports
154    
155     our %LISTENER;
156 root 1.21 our $LISTENER; # our listeners, as arrayref
157 root 1.1
158     our $SRCNODE; # holds the sending node during _inject
159    
160     sub NODE() {
161     $NODE
162     }
163    
164     sub node_of($) {
165 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
166 root 1.1
167 root 1.21 $node
168 root 1.1 }
169    
170 root 1.17 BEGIN {
171     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
172     ? sub () { 1 }
173     : sub () { 0 };
174     }
175 root 1.1
176     sub _inject {
177 root 1.25 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
178 root 1.1 &{ $PORT{+shift} or return };
179     }
180    
181 root 1.20 # this function adds a node-ref, so you can send stuff to it
182     # it is basically the central routing component.
183 root 1.1 sub add_node {
184 root 1.21 my ($node) = @_;
185 root 1.1
186 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
187 root 1.13 }
188    
189 root 1.1 sub snd(@) {
190 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
191 root 1.1
192 root 1.25 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
193 root 1.1
194 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
195 root 1.2 ->{send} (["$portid", @_]);
196 root 1.1 }
197    
198 root 1.17 =item $is_local = port_is_local $port
199    
200     Returns true iff the port is a local port.
201    
202     =cut
203    
204     sub port_is_local($) {
205 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
206 root 1.17
207 root 1.21 $NODE{$nodeid} == $NODE{""}
208 root 1.17 }
209    
210 root 1.18 =item snd_to_func $node, $func, @args
211 root 1.11
212 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
213 root 1.11 this function with the given arguments on that node.
214    
215 root 1.20 This function can be used to implement C<spawn>-like interfaces.
216 root 1.11
217     =cut
218    
219 root 1.18 sub snd_to_func($$;@) {
220 root 1.21 my $nodeid = shift;
221 root 1.11
222 root 1.41 return ($NODE{$nodeid} || add_node $nodeid)->send (["", @_])
223     if $nodeid ne $NODE;
224    
225     # on $NODE, we artificially delay... (for spawn)
226     # this is very ugly - maybe we should simply delay ALL messages,
227     # to avoid deep recursion issues. but that's so... slow...
228     my @msg = ("", @_);
229     my $to; $to = AE::timer 0, 0, sub {
230     undef $to;
231     $NODE{""}->send (\@msg);
232     };
233 root 1.11 }
234    
235 root 1.18 =item snd_on $node, @msg
236    
237     Executes C<snd> with the given C<@msg> (which must include the destination
238     port) on the given node.
239    
240     =cut
241    
242     sub snd_on($@) {
243     my $node = shift;
244     snd $node, snd => @_;
245     }
246    
247 root 1.29 =item eval_on $node, $string[, @reply]
248 root 1.18
249 root 1.29 Evaluates the given string as Perl expression on the given node. When
250     @reply is specified, then it is used to construct a reply message with
251     C<"$@"> and any results from the eval appended.
252 root 1.18
253     =cut
254    
255 root 1.29 sub eval_on($$;@) {
256 root 1.18 my $node = shift;
257     snd $node, eval => @_;
258     }
259    
260 root 1.1 sub kil(@) {
261 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
262 root 1.1
263     length $portid
264 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
265 root 1.1
266 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
267 root 1.1 ->kill ("$portid", @_);
268     }
269    
270 root 1.7 sub _nodename {
271     require POSIX;
272     (POSIX::uname ())[1]
273     }
274    
275 root 1.21 sub _resolve($) {
276     my ($nodeid) = @_;
277 root 1.1
278     my $cv = AE::cv;
279     my @res;
280    
281     $cv->begin (sub {
282     my %seen;
283     my @refs;
284     for (sort { $a->[0] <=> $b->[0] } @res) {
285     push @refs, $_->[1] unless $seen{$_->[1]}++
286     }
287 root 1.21 shift->send (@refs);
288 root 1.1 });
289    
290     my $idx;
291 root 1.21 for my $t (split /,/, $nodeid) {
292 root 1.1 my $pri = ++$idx;
293 root 1.7
294     $t = length $t ? _nodename . ":$t" : _nodename
295     if $t =~ /^\d*$/;
296 root 1.1
297 root 1.34 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
298 root 1.7 or Carp::croak "$t: unparsable transport descriptor";
299    
300 root 1.33 $port = "0" if $port eq "*";
301    
302     if ($host eq "*") {
303     $cv->begin;
304     # use fork_call, as Net::Interface is big, and we need it rarely.
305     require AnyEvent::Util;
306     AnyEvent::Util::fork_call (
307     sub {
308     my @addr;
309    
310     require Net::Interface;
311    
312     for my $if (Net::Interface->interfaces) {
313     # we statically lower-prioritise ipv6 here, TODO :()
314     for my $_ ($if->address (Net::Interface::AF_INET ())) {
315     next if /^\x7f/; # skip localhost etc.
316     push @addr, $_;
317     }
318     for ($if->address (Net::Interface::AF_INET6 ())) {
319     #next if $if->scope ($_) <= 2;
320     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
321     push @addr, $_;
322     }
323    
324     }
325     @addr
326     }, sub {
327     for my $ip (@_) {
328     push @res, [
329     $pri += 1e-5,
330     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
331     ];
332     }
333     $cv->end;
334     }
335     );
336     } else {
337     $cv->begin;
338     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
339     for (@_) {
340     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
341     push @res, [
342     $pri += 1e-5,
343     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
344     ];
345     }
346     $cv->end;
347     };
348     }
349 root 1.1 }
350    
351     $cv->end;
352    
353     $cv
354     }
355    
356 root 1.39 sub configure(@) {
357     unshift @_, "profile" if @_ & 1;
358 root 1.34 my (%kv) = @_;
359    
360     my $profile = delete $kv{profile};
361 root 1.1
362 root 1.21 $profile = _nodename
363     unless defined $profile;
364 root 1.6
365 root 1.32 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
366 root 1.24
367     my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
368     $NODE = $node
369     unless $node eq "anon/";
370 root 1.6
371 root 1.21 $NODE{$NODE} = $NODE{""};
372     $NODE{$NODE}{id} = $NODE;
373 root 1.20
374 root 1.21 my $seeds = $CONFIG->{seeds};
375     my $binds = $CONFIG->{binds};
376 root 1.3
377 root 1.33 $binds ||= ["*"];
378 root 1.1
379 root 1.21 $WARN->(8, "node $NODE starting up.");
380 root 1.1
381 root 1.23 $LISTENER = [];
382     %LISTENER = ();
383    
384 root 1.21 for (map _resolve $_, @$binds) {
385     for my $bind ($_->recv) {
386     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
387     or Carp::croak "$bind: unparsable local bind address";
388 root 1.20
389 root 1.33 my $listener = AnyEvent::MP::Transport::mp_server
390     $host,
391     $port,
392     prepare => sub {
393     my (undef, $host, $port) = @_;
394     $bind = AnyEvent::Socket::format_hostport $host, $port;
395     },
396     ;
397     $LISTENER{$bind} = $listener;
398 root 1.21 push @$LISTENER, $bind;
399     }
400 root 1.1 }
401    
402 root 1.40 $WARN->(8, "node listens on [@$LISTENER].");
403    
404 root 1.21 # the global service is mandatory currently
405     require AnyEvent::MP::Global;
406 root 1.1
407 root 1.21 # connect to all seednodes
408     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
409 root 1.1
410 root 1.20 for (@{ $CONFIG->{services} }) {
411 root 1.13 if (s/::$//) {
412     eval "require $_";
413     die $@ if $@;
414     } else {
415     (load_func $_)->();
416     }
417     }
418 root 1.1 }
419    
420     #############################################################################
421 root 1.6 # node monitoring and info
422 root 1.3
423 root 1.21 =item node_is_known $nodeid
424 root 1.13
425     Returns true iff the given node is currently known to the system.
426    
427     =cut
428    
429     sub node_is_known($) {
430     exists $NODE{$_[0]}
431     }
432    
433 root 1.21 =item node_is_up $nodeid
434 root 1.13
435     Returns true if the given node is "up", that is, the kernel thinks it has
436     a working connection to it.
437    
438     If the node is known but not currently connected, returns C<0>. If the
439     node is not known, returns C<undef>.
440    
441     =cut
442    
443     sub node_is_up($) {
444     ($NODE{$_[0]} or return)->{transport}
445     ? 1 : 0
446     }
447    
448 root 1.3 =item known_nodes
449    
450 root 1.26 Returns the node IDs of all nodes currently known to this node, including
451     itself and nodes not currently connected.
452 root 1.3
453     =cut
454    
455     sub known_nodes {
456 root 1.26 map $_->{id}, values %NODE
457 root 1.3 }
458    
459     =item up_nodes
460    
461 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
462     the node itself).
463 root 1.3
464     =cut
465    
466     sub up_nodes {
467 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
468 root 1.3 }
469    
470 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
471 root 1.3
472 root 1.27 Registers a callback that is called each time a node goes up (a connection
473     is established) or down (the connection is lost).
474 root 1.3
475     Node up messages can only be followed by node down messages for the same
476     node, and vice versa.
477    
478 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
479     port. This function is mainly of interest to modules that are concerned
480     about the network topology and low-level connection handling.
481    
482     Callbacks I<must not> block and I<should not> send any messages.
483    
484     The function returns an optional guard which can be used to unregister
485 root 1.3 the monitoring callback again.
486    
487     =cut
488    
489     our %MON_NODES;
490    
491     sub mon_nodes($) {
492     my ($cb) = @_;
493    
494     $MON_NODES{$cb+0} = $cb;
495    
496     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
497     }
498    
499     sub _inject_nodeevent($$;@) {
500 root 1.16 my ($node, $up, @reason) = @_;
501 root 1.3
502     for my $cb (values %MON_NODES) {
503 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
504 root 1.16 or $WARN->(1, $@);
505 root 1.3 }
506 root 1.16
507 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
508 root 1.3 }
509    
510     #############################################################################
511 root 1.1 # self node code
512    
513     our %node_req = (
514     # internal services
515    
516     # monitoring
517 root 1.27 mon0 => sub { # stop monitoring a port
518 root 1.1 my $portid = shift;
519     my $node = $SRCNODE;
520     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
521     },
522 root 1.27 mon1 => sub { # start monitoring a port
523 root 1.1 my $portid = shift;
524     my $node = $SRCNODE;
525 root 1.27 Scalar::Util::weaken $node; #TODO# ugly
526 root 1.1 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
527 root 1.27 $node->send (["", kil => $portid, @_])
528 root 1.39 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
529 root 1.1 });
530     },
531     kil => sub {
532     my $cbs = delete $SRCNODE->{lmon}{+shift}
533     or return;
534    
535     $_->(@_) for @$cbs;
536     },
537    
538 root 1.18 # "public" services - not actually public
539 root 1.1
540     # relay message to another node / generic echo
541 root 1.15 snd => \&snd,
542 root 1.27 snd_multiple => sub {
543 root 1.1 snd @$_ for @_
544     },
545    
546 root 1.4 # informational
547     info => sub {
548     snd @_, $NODE;
549     },
550     known_nodes => sub {
551     snd @_, known_nodes;
552     },
553     up_nodes => sub {
554     snd @_, up_nodes;
555     },
556    
557 root 1.30 # random utilities
558 root 1.1 eval => sub {
559     my @res = eval shift;
560     snd @_, "$@", @res if @_;
561     },
562     time => sub {
563     snd @_, AE::time;
564     },
565     devnull => sub {
566     #
567     },
568 root 1.15 "" => sub {
569 root 1.27 # empty messages are keepalives or similar devnull-applications
570 root 1.15 },
571 root 1.1 );
572    
573 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
574 root 1.1 $PORT{""} = sub {
575     my $tag = shift;
576     eval { &{ $node_req{$tag} ||= load_func $tag } };
577 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
578 root 1.1 };
579    
580     =back
581    
582     =head1 SEE ALSO
583    
584     L<AnyEvent::MP>.
585    
586     =head1 AUTHOR
587    
588     Marc Lehmann <schmorp@schmorp.de>
589     http://home.schmorp.de/
590    
591     =cut
592    
593     1
594