ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.40
Committed: Fri Sep 4 21:01:22 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.39: +2 -4 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.37 our $VERSION = '1.0';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.21 add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.34 NODE $NODE node_of snd kil port_is_local
44     configure
45 root 1.13 known_nodes up_nodes mon_nodes node_is_known node_is_up
46 root 1.1 );
47    
48 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49 root 1.1
50 root 1.27 This value is called with an error or warning message, when e.g. a
51     connection could not be created, authorisation failed and so on.
52    
53     It I<must not> block or send messages -queue it and use an idle watcher if
54     you need to do any of these things.
55 root 1.1
56 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
57 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58     node connectivity and services, C<8> for debugging messages and C<9> for
59     tracing messages.
60    
61 root 1.1 The default simply logs the message to STDERR.
62    
63     =cut
64    
65 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
66    
67 root 1.1 our $WARN = sub {
68 root 1.29 return if $WARNLEVEL < $_[0];
69    
70 root 1.16 my ($level, $msg) = @_;
71    
72 root 1.1 $msg =~ s/\n$//;
73 root 1.16
74     printf STDERR "%s <%d> %s\n",
75     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
76     $level,
77     $msg;
78 root 1.1 };
79    
80 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
81    
82     The maximum level at which warning messages will be printed to STDERR by
83     the default warn handler.
84    
85     =cut
86    
87 root 1.6 sub load_func($) {
88     my $func = $_[0];
89    
90     unless (defined &$func) {
91     my $pkg = $func;
92     do {
93     $pkg =~ s/::[^:]+$//
94     or return sub { die "unable to resolve '$func'" };
95     eval "require $pkg";
96     } until defined &$func;
97     }
98    
99     \&$func
100     }
101    
102 root 1.1 sub nonce($) {
103     my $nonce;
104    
105     if (open my $fh, "</dev/urandom") {
106     sysread $fh, $nonce, $_[0];
107     } else {
108     # shit...
109     our $nonce_init;
110 root 1.31 unless ($nonce_init++) {
111 root 1.1 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
112     }
113     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
114     }
115    
116     $nonce
117     }
118    
119 root 1.21 sub alnumbits($) {
120 root 1.1 my $data = $_[0];
121    
122     if (eval "use Math::GMP 2.05; 1") {
123     $data = Math::GMP::get_str_gmp (
124     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
125     62
126     );
127     } else {
128     $data = MIME::Base64::encode_base64 $data, "";
129     $data =~ s/=//;
130 root 1.31 $data =~ s/x/x0/g;
131     $data =~ s/\//x1/g;
132     $data =~ s/\+/x2/g;
133 root 1.1 }
134    
135     $data
136     }
137    
138     sub gen_uniq {
139 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
140 root 1.1 }
141    
142 root 1.20 our $CONFIG; # this node's configuration
143 root 1.21
144 root 1.36 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
145 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
146 root 1.21 our $NODE = "anon/$RUNIQ";
147 root 1.1 our $ID = "a";
148    
149     our %NODE; # node id to transport mapping, or "undef", for local node
150     our (%PORT, %PORT_DATA); # local ports
151    
152 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
153 root 1.1 our %LMON; # monitored _local_ ports
154    
155     our %LISTENER;
156 root 1.21 our $LISTENER; # our listeners, as arrayref
157 root 1.1
158     our $SRCNODE; # holds the sending node during _inject
159    
160     sub NODE() {
161     $NODE
162     }
163    
164     sub node_of($) {
165 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
166 root 1.1
167 root 1.21 $node
168 root 1.1 }
169    
170 root 1.17 BEGIN {
171     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
172     ? sub () { 1 }
173     : sub () { 0 };
174     }
175 root 1.1
176     sub _inject {
177 root 1.25 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
178 root 1.1 &{ $PORT{+shift} or return };
179     }
180    
181 root 1.20 # this function adds a node-ref, so you can send stuff to it
182     # it is basically the central routing component.
183 root 1.1 sub add_node {
184 root 1.21 my ($node) = @_;
185 root 1.1
186 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
187 root 1.13 }
188    
189 root 1.1 sub snd(@) {
190 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
191 root 1.1
192 root 1.25 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
193 root 1.1
194 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
195 root 1.2 ->{send} (["$portid", @_]);
196 root 1.1 }
197    
198 root 1.17 =item $is_local = port_is_local $port
199    
200     Returns true iff the port is a local port.
201    
202     =cut
203    
204     sub port_is_local($) {
205 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
206 root 1.17
207 root 1.21 $NODE{$nodeid} == $NODE{""}
208 root 1.17 }
209    
210 root 1.18 =item snd_to_func $node, $func, @args
211 root 1.11
212 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
213 root 1.11 this function with the given arguments on that node.
214    
215 root 1.20 This function can be used to implement C<spawn>-like interfaces.
216 root 1.11
217     =cut
218    
219 root 1.18 sub snd_to_func($$;@) {
220 root 1.21 my $nodeid = shift;
221 root 1.11
222 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
223 root 1.11 ->send (["", @_]);
224     }
225    
226 root 1.18 =item snd_on $node, @msg
227    
228     Executes C<snd> with the given C<@msg> (which must include the destination
229     port) on the given node.
230    
231     =cut
232    
233     sub snd_on($@) {
234     my $node = shift;
235     snd $node, snd => @_;
236     }
237    
238 root 1.29 =item eval_on $node, $string[, @reply]
239 root 1.18
240 root 1.29 Evaluates the given string as Perl expression on the given node. When
241     @reply is specified, then it is used to construct a reply message with
242     C<"$@"> and any results from the eval appended.
243 root 1.18
244     =cut
245    
246 root 1.29 sub eval_on($$;@) {
247 root 1.18 my $node = shift;
248     snd $node, eval => @_;
249     }
250    
251 root 1.1 sub kil(@) {
252 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
253 root 1.1
254     length $portid
255 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
256 root 1.1
257 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
258 root 1.1 ->kill ("$portid", @_);
259     }
260    
261 root 1.7 sub _nodename {
262     require POSIX;
263     (POSIX::uname ())[1]
264     }
265    
266 root 1.21 sub _resolve($) {
267     my ($nodeid) = @_;
268 root 1.1
269     my $cv = AE::cv;
270     my @res;
271    
272     $cv->begin (sub {
273     my %seen;
274     my @refs;
275     for (sort { $a->[0] <=> $b->[0] } @res) {
276     push @refs, $_->[1] unless $seen{$_->[1]}++
277     }
278 root 1.21 shift->send (@refs);
279 root 1.1 });
280    
281     my $idx;
282 root 1.21 for my $t (split /,/, $nodeid) {
283 root 1.1 my $pri = ++$idx;
284 root 1.7
285     $t = length $t ? _nodename . ":$t" : _nodename
286     if $t =~ /^\d*$/;
287 root 1.1
288 root 1.34 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
289 root 1.7 or Carp::croak "$t: unparsable transport descriptor";
290    
291 root 1.33 $port = "0" if $port eq "*";
292    
293     if ($host eq "*") {
294     $cv->begin;
295     # use fork_call, as Net::Interface is big, and we need it rarely.
296     require AnyEvent::Util;
297     AnyEvent::Util::fork_call (
298     sub {
299     my @addr;
300    
301     require Net::Interface;
302    
303     for my $if (Net::Interface->interfaces) {
304     # we statically lower-prioritise ipv6 here, TODO :()
305     for my $_ ($if->address (Net::Interface::AF_INET ())) {
306     next if /^\x7f/; # skip localhost etc.
307     push @addr, $_;
308     }
309     for ($if->address (Net::Interface::AF_INET6 ())) {
310     #next if $if->scope ($_) <= 2;
311     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
312     push @addr, $_;
313     }
314    
315     }
316     @addr
317     }, sub {
318     for my $ip (@_) {
319     push @res, [
320     $pri += 1e-5,
321     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
322     ];
323     }
324     $cv->end;
325     }
326     );
327     } else {
328     $cv->begin;
329     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
330     for (@_) {
331     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
332     push @res, [
333     $pri += 1e-5,
334     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
335     ];
336     }
337     $cv->end;
338     };
339     }
340 root 1.1 }
341    
342     $cv->end;
343    
344     $cv
345     }
346    
347 root 1.39 sub configure(@) {
348     unshift @_, "profile" if @_ & 1;
349 root 1.34 my (%kv) = @_;
350    
351     my $profile = delete $kv{profile};
352 root 1.1
353 root 1.21 $profile = _nodename
354     unless defined $profile;
355 root 1.6
356 root 1.32 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
357 root 1.24
358     my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
359     $NODE = $node
360     unless $node eq "anon/";
361 root 1.6
362 root 1.21 $NODE{$NODE} = $NODE{""};
363     $NODE{$NODE}{id} = $NODE;
364 root 1.20
365 root 1.21 my $seeds = $CONFIG->{seeds};
366     my $binds = $CONFIG->{binds};
367 root 1.3
368 root 1.33 $binds ||= ["*"];
369 root 1.1
370 root 1.21 $WARN->(8, "node $NODE starting up.");
371 root 1.1
372 root 1.23 $LISTENER = [];
373     %LISTENER = ();
374    
375 root 1.21 for (map _resolve $_, @$binds) {
376     for my $bind ($_->recv) {
377     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
378     or Carp::croak "$bind: unparsable local bind address";
379 root 1.20
380 root 1.33 my $listener = AnyEvent::MP::Transport::mp_server
381     $host,
382     $port,
383     prepare => sub {
384     my (undef, $host, $port) = @_;
385     $bind = AnyEvent::Socket::format_hostport $host, $port;
386     },
387     ;
388     $LISTENER{$bind} = $listener;
389 root 1.21 push @$LISTENER, $bind;
390     }
391 root 1.1 }
392    
393 root 1.40 $WARN->(8, "node listens on [@$LISTENER].");
394    
395 root 1.21 # the global service is mandatory currently
396     require AnyEvent::MP::Global;
397 root 1.1
398 root 1.21 # connect to all seednodes
399     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
400 root 1.1
401 root 1.20 for (@{ $CONFIG->{services} }) {
402 root 1.13 if (s/::$//) {
403     eval "require $_";
404     die $@ if $@;
405     } else {
406     (load_func $_)->();
407     }
408     }
409 root 1.1 }
410    
411     #############################################################################
412 root 1.6 # node monitoring and info
413 root 1.3
414 root 1.21 =item node_is_known $nodeid
415 root 1.13
416     Returns true iff the given node is currently known to the system.
417    
418     =cut
419    
420     sub node_is_known($) {
421     exists $NODE{$_[0]}
422     }
423    
424 root 1.21 =item node_is_up $nodeid
425 root 1.13
426     Returns true if the given node is "up", that is, the kernel thinks it has
427     a working connection to it.
428    
429     If the node is known but not currently connected, returns C<0>. If the
430     node is not known, returns C<undef>.
431    
432     =cut
433    
434     sub node_is_up($) {
435     ($NODE{$_[0]} or return)->{transport}
436     ? 1 : 0
437     }
438    
439 root 1.3 =item known_nodes
440    
441 root 1.26 Returns the node IDs of all nodes currently known to this node, including
442     itself and nodes not currently connected.
443 root 1.3
444     =cut
445    
446     sub known_nodes {
447 root 1.26 map $_->{id}, values %NODE
448 root 1.3 }
449    
450     =item up_nodes
451    
452 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
453     the node itself).
454 root 1.3
455     =cut
456    
457     sub up_nodes {
458 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
459 root 1.3 }
460    
461 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
462 root 1.3
463 root 1.27 Registers a callback that is called each time a node goes up (a connection
464     is established) or down (the connection is lost).
465 root 1.3
466     Node up messages can only be followed by node down messages for the same
467     node, and vice versa.
468    
469 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
470     port. This function is mainly of interest to modules that are concerned
471     about the network topology and low-level connection handling.
472    
473     Callbacks I<must not> block and I<should not> send any messages.
474    
475     The function returns an optional guard which can be used to unregister
476 root 1.3 the monitoring callback again.
477    
478     =cut
479    
480     our %MON_NODES;
481    
482     sub mon_nodes($) {
483     my ($cb) = @_;
484    
485     $MON_NODES{$cb+0} = $cb;
486    
487     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
488     }
489    
490     sub _inject_nodeevent($$;@) {
491 root 1.16 my ($node, $up, @reason) = @_;
492 root 1.3
493     for my $cb (values %MON_NODES) {
494 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
495 root 1.16 or $WARN->(1, $@);
496 root 1.3 }
497 root 1.16
498 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
499 root 1.3 }
500    
501     #############################################################################
502 root 1.1 # self node code
503    
504     our %node_req = (
505     # internal services
506    
507     # monitoring
508 root 1.27 mon0 => sub { # stop monitoring a port
509 root 1.1 my $portid = shift;
510     my $node = $SRCNODE;
511     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
512     },
513 root 1.27 mon1 => sub { # start monitoring a port
514 root 1.1 my $portid = shift;
515     my $node = $SRCNODE;
516 root 1.27 Scalar::Util::weaken $node; #TODO# ugly
517 root 1.1 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
518 root 1.27 $node->send (["", kil => $portid, @_])
519 root 1.39 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
520 root 1.1 });
521     },
522     kil => sub {
523     my $cbs = delete $SRCNODE->{lmon}{+shift}
524     or return;
525    
526     $_->(@_) for @$cbs;
527     },
528    
529 root 1.18 # "public" services - not actually public
530 root 1.1
531     # relay message to another node / generic echo
532 root 1.15 snd => \&snd,
533 root 1.27 snd_multiple => sub {
534 root 1.1 snd @$_ for @_
535     },
536    
537 root 1.4 # informational
538     info => sub {
539     snd @_, $NODE;
540     },
541     known_nodes => sub {
542     snd @_, known_nodes;
543     },
544     up_nodes => sub {
545     snd @_, up_nodes;
546     },
547    
548 root 1.30 # random utilities
549 root 1.1 eval => sub {
550     my @res = eval shift;
551     snd @_, "$@", @res if @_;
552     },
553     time => sub {
554     snd @_, AE::time;
555     },
556     devnull => sub {
557     #
558     },
559 root 1.15 "" => sub {
560 root 1.27 # empty messages are keepalives or similar devnull-applications
561 root 1.15 },
562 root 1.1 );
563    
564 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
565 root 1.1 $PORT{""} = sub {
566     my $tag = shift;
567     eval { &{ $node_req{$tag} ||= load_func $tag } };
568 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
569 root 1.1 };
570    
571     =back
572    
573     =head1 SEE ALSO
574    
575     L<AnyEvent::MP>.
576    
577     =head1 AUTHOR
578    
579     Marc Lehmann <schmorp@schmorp.de>
580     http://home.schmorp.de/
581    
582     =cut
583    
584     1
585