ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.34
Committed: Mon Aug 31 10:07:04 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.33: +7 -12 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.28 our $VERSION = '0.9';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.21 add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.34 NODE $NODE node_of snd kil port_is_local
44     configure
45 root 1.13 known_nodes up_nodes mon_nodes node_is_known node_is_up
46 root 1.1 );
47    
48     our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
49     our $NETWORK_LATENCY = 3; # activity timeout
50     our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
51    
52 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
53 root 1.1
54 root 1.27 This value is called with an error or warning message, when e.g. a
55     connection could not be created, authorisation failed and so on.
56    
57     It I<must not> block or send messages -queue it and use an idle watcher if
58     you need to do any of these things.
59 root 1.1
60 root 1.16 C<$level> sould be C<0> for messages ot be logged always, C<1> for
61     unexpected messages and errors, C<2> for warnings, C<7> for messages about
62     node connectivity and services, C<8> for debugging messages and C<9> for
63     tracing messages.
64    
65 root 1.1 The default simply logs the message to STDERR.
66    
67     =cut
68    
69 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
70    
71 root 1.1 our $WARN = sub {
72 root 1.29 return if $WARNLEVEL < $_[0];
73    
74 root 1.16 my ($level, $msg) = @_;
75    
76 root 1.1 $msg =~ s/\n$//;
77 root 1.16
78     printf STDERR "%s <%d> %s\n",
79     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
80     $level,
81     $msg;
82 root 1.1 };
83    
84 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
85    
86     The maximum level at which warning messages will be printed to STDERR by
87     the default warn handler.
88    
89     =cut
90    
91 root 1.6 sub load_func($) {
92     my $func = $_[0];
93    
94     unless (defined &$func) {
95     my $pkg = $func;
96     do {
97     $pkg =~ s/::[^:]+$//
98     or return sub { die "unable to resolve '$func'" };
99     eval "require $pkg";
100     } until defined &$func;
101     }
102    
103     \&$func
104     }
105    
106 root 1.1 sub nonce($) {
107     my $nonce;
108    
109     if (open my $fh, "</dev/urandom") {
110     sysread $fh, $nonce, $_[0];
111     } else {
112     # shit...
113     our $nonce_init;
114 root 1.31 unless ($nonce_init++) {
115 root 1.1 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
116     }
117     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
118     }
119    
120     $nonce
121     }
122    
123 root 1.21 sub alnumbits($) {
124 root 1.1 my $data = $_[0];
125    
126     if (eval "use Math::GMP 2.05; 1") {
127     $data = Math::GMP::get_str_gmp (
128     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
129     62
130     );
131     } else {
132     $data = MIME::Base64::encode_base64 $data, "";
133     $data =~ s/=//;
134 root 1.31 $data =~ s/x/x0/g;
135     $data =~ s/\//x1/g;
136     $data =~ s/\+/x2/g;
137 root 1.1 }
138    
139     $data
140     }
141    
142     sub gen_uniq {
143 root 1.21 alnumbits pack "wNa*", $$, time, nonce 2
144 root 1.1 }
145    
146 root 1.20 our $CONFIG; # this node's configuration
147 root 1.21
148     our $RUNIQ = alnumbits nonce 16;; # remote uniq value
149 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
150 root 1.21 our $NODE = "anon/$RUNIQ";
151 root 1.1 our $ID = "a";
152    
153     our %NODE; # node id to transport mapping, or "undef", for local node
154     our (%PORT, %PORT_DATA); # local ports
155    
156 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
157 root 1.1 our %LMON; # monitored _local_ ports
158    
159     our %LISTENER;
160 root 1.21 our $LISTENER; # our listeners, as arrayref
161 root 1.1
162     our $SRCNODE; # holds the sending node during _inject
163    
164     sub NODE() {
165     $NODE
166     }
167    
168     sub node_of($) {
169 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
170 root 1.1
171 root 1.21 $node
172 root 1.1 }
173    
174 root 1.17 BEGIN {
175     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
176     ? sub () { 1 }
177     : sub () { 0 };
178     }
179 root 1.1
180     sub _inject {
181 root 1.25 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
182 root 1.1 &{ $PORT{+shift} or return };
183     }
184    
185 root 1.20 # this function adds a node-ref, so you can send stuff to it
186     # it is basically the central routing component.
187 root 1.1 sub add_node {
188 root 1.21 my ($node) = @_;
189 root 1.1
190 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
191 root 1.13 }
192    
193 root 1.1 sub snd(@) {
194 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
195 root 1.1
196 root 1.25 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
197 root 1.1
198 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
199 root 1.2 ->{send} (["$portid", @_]);
200 root 1.1 }
201    
202 root 1.17 =item $is_local = port_is_local $port
203    
204     Returns true iff the port is a local port.
205    
206     =cut
207    
208     sub port_is_local($) {
209 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
210 root 1.17
211 root 1.21 $NODE{$nodeid} == $NODE{""}
212 root 1.17 }
213    
214 root 1.18 =item snd_to_func $node, $func, @args
215 root 1.11
216 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
217 root 1.11 this function with the given arguments on that node.
218    
219 root 1.20 This function can be used to implement C<spawn>-like interfaces.
220 root 1.11
221     =cut
222    
223 root 1.18 sub snd_to_func($$;@) {
224 root 1.21 my $nodeid = shift;
225 root 1.11
226 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
227 root 1.11 ->send (["", @_]);
228     }
229    
230 root 1.18 =item snd_on $node, @msg
231    
232     Executes C<snd> with the given C<@msg> (which must include the destination
233     port) on the given node.
234    
235     =cut
236    
237     sub snd_on($@) {
238     my $node = shift;
239     snd $node, snd => @_;
240     }
241    
242 root 1.29 =item eval_on $node, $string[, @reply]
243 root 1.18
244 root 1.29 Evaluates the given string as Perl expression on the given node. When
245     @reply is specified, then it is used to construct a reply message with
246     C<"$@"> and any results from the eval appended.
247 root 1.18
248     =cut
249    
250 root 1.29 sub eval_on($$;@) {
251 root 1.18 my $node = shift;
252     snd $node, eval => @_;
253     }
254    
255 root 1.1 sub kil(@) {
256 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
257 root 1.1
258     length $portid
259 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
260 root 1.1
261 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
262 root 1.1 ->kill ("$portid", @_);
263     }
264    
265 root 1.7 sub _nodename {
266     require POSIX;
267     (POSIX::uname ())[1]
268     }
269    
270 root 1.21 sub _resolve($) {
271     my ($nodeid) = @_;
272 root 1.1
273     my $cv = AE::cv;
274     my @res;
275    
276     $cv->begin (sub {
277     my %seen;
278     my @refs;
279     for (sort { $a->[0] <=> $b->[0] } @res) {
280     push @refs, $_->[1] unless $seen{$_->[1]}++
281     }
282 root 1.21 shift->send (@refs);
283 root 1.1 });
284    
285     my $idx;
286 root 1.21 for my $t (split /,/, $nodeid) {
287 root 1.1 my $pri = ++$idx;
288 root 1.7
289     $t = length $t ? _nodename . ":$t" : _nodename
290     if $t =~ /^\d*$/;
291 root 1.1
292 root 1.34 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
293 root 1.7 or Carp::croak "$t: unparsable transport descriptor";
294    
295 root 1.33 $port = "0" if $port eq "*";
296    
297     if ($host eq "*") {
298     $cv->begin;
299     # use fork_call, as Net::Interface is big, and we need it rarely.
300     require AnyEvent::Util;
301     AnyEvent::Util::fork_call (
302     sub {
303     my @addr;
304    
305     require Net::Interface;
306    
307     for my $if (Net::Interface->interfaces) {
308     # we statically lower-prioritise ipv6 here, TODO :()
309     for my $_ ($if->address (Net::Interface::AF_INET ())) {
310     next if /^\x7f/; # skip localhost etc.
311     push @addr, $_;
312     }
313     for ($if->address (Net::Interface::AF_INET6 ())) {
314     #next if $if->scope ($_) <= 2;
315     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
316     push @addr, $_;
317     }
318    
319     }
320     @addr
321     }, sub {
322     for my $ip (@_) {
323     push @res, [
324     $pri += 1e-5,
325     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
326     ];
327     }
328     $cv->end;
329     }
330     );
331     } else {
332     $cv->begin;
333     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
334     for (@_) {
335     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
336     push @res, [
337     $pri += 1e-5,
338     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
339     ];
340     }
341     $cv->end;
342     };
343     }
344 root 1.1 }
345    
346     $cv->end;
347    
348     $cv
349     }
350    
351 root 1.34 sub configure(%) {
352     my (%kv) = @_;
353    
354     my $profile = delete $kv{profile};
355 root 1.1
356 root 1.21 $profile = _nodename
357     unless defined $profile;
358 root 1.6
359 root 1.32 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
360 root 1.24
361     my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
362     $NODE = $node
363     unless $node eq "anon/";
364 root 1.6
365 root 1.21 $NODE{$NODE} = $NODE{""};
366     $NODE{$NODE}{id} = $NODE;
367 root 1.20
368 root 1.21 my $seeds = $CONFIG->{seeds};
369     my $binds = $CONFIG->{binds};
370 root 1.3
371 root 1.33 $binds ||= ["*"];
372 root 1.1
373 root 1.21 $WARN->(8, "node $NODE starting up.");
374 root 1.1
375 root 1.23 $LISTENER = [];
376     %LISTENER = ();
377    
378 root 1.21 for (map _resolve $_, @$binds) {
379     for my $bind ($_->recv) {
380     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
381     or Carp::croak "$bind: unparsable local bind address";
382 root 1.20
383 root 1.33 my $listener = AnyEvent::MP::Transport::mp_server
384     $host,
385     $port,
386     prepare => sub {
387     my (undef, $host, $port) = @_;
388     $bind = AnyEvent::Socket::format_hostport $host, $port;
389     },
390     ;
391     $LISTENER{$bind} = $listener;
392 root 1.21 push @$LISTENER, $bind;
393     }
394 root 1.1 }
395    
396 root 1.21 # the global service is mandatory currently
397     require AnyEvent::MP::Global;
398 root 1.1
399 root 1.21 # connect to all seednodes
400     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
401 root 1.1
402 root 1.20 for (@{ $CONFIG->{services} }) {
403 root 1.13 if (s/::$//) {
404     eval "require $_";
405     die $@ if $@;
406     } else {
407     (load_func $_)->();
408     }
409     }
410 root 1.1 }
411    
412     #############################################################################
413 root 1.6 # node monitoring and info
414 root 1.3
415 root 1.21 =item node_is_known $nodeid
416 root 1.13
417     Returns true iff the given node is currently known to the system.
418    
419     =cut
420    
421     sub node_is_known($) {
422     exists $NODE{$_[0]}
423     }
424    
425 root 1.21 =item node_is_up $nodeid
426 root 1.13
427     Returns true if the given node is "up", that is, the kernel thinks it has
428     a working connection to it.
429    
430     If the node is known but not currently connected, returns C<0>. If the
431     node is not known, returns C<undef>.
432    
433     =cut
434    
435     sub node_is_up($) {
436     ($NODE{$_[0]} or return)->{transport}
437     ? 1 : 0
438     }
439    
440 root 1.3 =item known_nodes
441    
442 root 1.26 Returns the node IDs of all nodes currently known to this node, including
443     itself and nodes not currently connected.
444 root 1.3
445     =cut
446    
447     sub known_nodes {
448 root 1.26 map $_->{id}, values %NODE
449 root 1.3 }
450    
451     =item up_nodes
452    
453 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
454     the node itself).
455 root 1.3
456     =cut
457    
458     sub up_nodes {
459 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
460 root 1.3 }
461    
462 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
463 root 1.3
464 root 1.27 Registers a callback that is called each time a node goes up (a connection
465     is established) or down (the connection is lost).
466 root 1.3
467     Node up messages can only be followed by node down messages for the same
468     node, and vice versa.
469    
470 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
471     port. This function is mainly of interest to modules that are concerned
472     about the network topology and low-level connection handling.
473    
474     Callbacks I<must not> block and I<should not> send any messages.
475    
476     The function returns an optional guard which can be used to unregister
477 root 1.3 the monitoring callback again.
478    
479     =cut
480    
481     our %MON_NODES;
482    
483     sub mon_nodes($) {
484     my ($cb) = @_;
485    
486     $MON_NODES{$cb+0} = $cb;
487    
488     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
489     }
490    
491     sub _inject_nodeevent($$;@) {
492 root 1.16 my ($node, $up, @reason) = @_;
493 root 1.3
494     for my $cb (values %MON_NODES) {
495 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
496 root 1.16 or $WARN->(1, $@);
497 root 1.3 }
498 root 1.16
499 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
500 root 1.3 }
501    
502     #############################################################################
503 root 1.1 # self node code
504    
505     our %node_req = (
506     # internal services
507    
508     # monitoring
509 root 1.27 mon0 => sub { # stop monitoring a port
510 root 1.1 my $portid = shift;
511     my $node = $SRCNODE;
512     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
513     },
514 root 1.27 mon1 => sub { # start monitoring a port
515 root 1.1 my $portid = shift;
516     my $node = $SRCNODE;
517 root 1.27 Scalar::Util::weaken $node; #TODO# ugly
518 root 1.1 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
519 root 1.27 $node->send (["", kil => $portid, @_])
520     if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disocnnect
521 root 1.1 });
522     },
523     kil => sub {
524     my $cbs = delete $SRCNODE->{lmon}{+shift}
525     or return;
526    
527     $_->(@_) for @$cbs;
528     },
529    
530 root 1.18 # "public" services - not actually public
531 root 1.1
532     # relay message to another node / generic echo
533 root 1.15 snd => \&snd,
534 root 1.27 snd_multiple => sub {
535 root 1.1 snd @$_ for @_
536     },
537    
538 root 1.4 # informational
539     info => sub {
540     snd @_, $NODE;
541     },
542     known_nodes => sub {
543     snd @_, known_nodes;
544     },
545     up_nodes => sub {
546     snd @_, up_nodes;
547     },
548    
549 root 1.30 # random utilities
550 root 1.1 eval => sub {
551     my @res = eval shift;
552     snd @_, "$@", @res if @_;
553     },
554     time => sub {
555     snd @_, AE::time;
556     },
557     devnull => sub {
558     #
559     },
560 root 1.15 "" => sub {
561 root 1.27 # empty messages are keepalives or similar devnull-applications
562 root 1.15 },
563 root 1.1 );
564    
565 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
566 root 1.1 $PORT{""} = sub {
567     my $tag = shift;
568     eval { &{ $node_req{$tag} ||= load_func $tag } };
569 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
570 root 1.1 };
571    
572     =back
573    
574     =head1 SEE ALSO
575    
576     L<AnyEvent::MP>.
577    
578     =head1 AUTHOR
579    
580     Marc Lehmann <schmorp@schmorp.de>
581     http://home.schmorp.de/
582    
583     =cut
584    
585     1
586