ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP/Kernel.pm
Revision: 1.54
Committed: Thu Oct 1 20:41:36 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.53: +1 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.52 our $VERSION = '1.1';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.21 add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.34 NODE $NODE node_of snd kil port_is_local
44     configure
45 root 1.46 up_nodes mon_nodes node_is_up
46 root 1.1 );
47    
48 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49 root 1.1
50 root 1.27 This value is called with an error or warning message, when e.g. a
51     connection could not be created, authorisation failed and so on.
52    
53     It I<must not> block or send messages -queue it and use an idle watcher if
54     you need to do any of these things.
55 root 1.1
56 elmex 1.38 C<$level> should be C<0> for messages to be logged always, C<1> for
57 root 1.16 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58     node connectivity and services, C<8> for debugging messages and C<9> for
59     tracing messages.
60    
61 root 1.1 The default simply logs the message to STDERR.
62    
63 root 1.44 =item @AnyEvent::MP::Kernel::WARN
64    
65     All code references in this array are called for every log message, from
66     the default C<$WARN> handler. This is an easy way to tie into the log
67     messages without disturbing others.
68    
69 root 1.1 =cut
70    
71 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
72 root 1.44 our @WARN;
73     our $WARN = sub {
74     &$_ for @WARN;
75 root 1.29
76     return if $WARNLEVEL < $_[0];
77    
78 root 1.16 my ($level, $msg) = @_;
79    
80 root 1.1 $msg =~ s/\n$//;
81 root 1.16
82     printf STDERR "%s <%d> %s\n",
83     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
84     $level,
85     $msg;
86 root 1.1 };
87    
88 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
89    
90     The maximum level at which warning messages will be printed to STDERR by
91     the default warn handler.
92    
93     =cut
94    
95 root 1.6 sub load_func($) {
96     my $func = $_[0];
97    
98     unless (defined &$func) {
99     my $pkg = $func;
100     do {
101     $pkg =~ s/::[^:]+$//
102     or return sub { die "unable to resolve '$func'" };
103     eval "require $pkg";
104     } until defined &$func;
105     }
106    
107     \&$func
108     }
109    
110 root 1.1 sub nonce($) {
111     my $nonce;
112    
113     if (open my $fh, "</dev/urandom") {
114     sysread $fh, $nonce, $_[0];
115     } else {
116     # shit...
117     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
118     }
119    
120     $nonce
121     }
122    
123 root 1.21 sub alnumbits($) {
124 root 1.1 my $data = $_[0];
125    
126     if (eval "use Math::GMP 2.05; 1") {
127     $data = Math::GMP::get_str_gmp (
128     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
129     62
130     );
131     } else {
132     $data = MIME::Base64::encode_base64 $data, "";
133     $data =~ s/=//;
134 root 1.31 $data =~ s/x/x0/g;
135     $data =~ s/\//x1/g;
136     $data =~ s/\+/x2/g;
137 root 1.1 }
138    
139     $data
140     }
141    
142     sub gen_uniq {
143 root 1.36 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
144 root 1.1 }
145    
146 root 1.20 our $CONFIG; # this node's configuration
147 root 1.21
148 root 1.36 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
149 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
150 root 1.21 our $NODE = "anon/$RUNIQ";
151 root 1.1 our $ID = "a";
152    
153     our %NODE; # node id to transport mapping, or "undef", for local node
154     our (%PORT, %PORT_DATA); # local ports
155    
156 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
157 root 1.1 our %LMON; # monitored _local_ ports
158    
159     our %LISTENER;
160 root 1.21 our $LISTENER; # our listeners, as arrayref
161 root 1.1
162     our $SRCNODE; # holds the sending node during _inject
163    
164     sub NODE() {
165     $NODE
166     }
167    
168     sub node_of($) {
169 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
170 root 1.1
171 root 1.21 $node
172 root 1.1 }
173    
174 root 1.17 BEGIN {
175     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
176     ? sub () { 1 }
177     : sub () { 0 };
178     }
179 root 1.1
180 root 1.42 our $DELAY_TIMER;
181     our @DELAY_QUEUE;
182    
183     sub _delay_run {
184     (shift @DELAY_QUEUE or return)->() while 1;
185 root 1.54 undef $DELAY_TIMER;
186 root 1.42 }
187    
188     sub delay($) {
189     push @DELAY_QUEUE, shift;
190     $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
191     }
192    
193 root 1.1 sub _inject {
194 root 1.48 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
195 root 1.1 &{ $PORT{+shift} or return };
196     }
197    
198 root 1.20 # this function adds a node-ref, so you can send stuff to it
199     # it is basically the central routing component.
200 root 1.1 sub add_node {
201 root 1.21 my ($node) = @_;
202 root 1.1
203 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
204 root 1.13 }
205    
206 root 1.1 sub snd(@) {
207 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
208 root 1.1
209 root 1.48 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
210 root 1.1
211 root 1.49 defined $nodeid #d#UGLY
212     or Carp::croak "'undef' is not a valid node ID/port ID";
213    
214 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
215 root 1.2 ->{send} (["$portid", @_]);
216 root 1.1 }
217    
218 root 1.17 =item $is_local = port_is_local $port
219    
220     Returns true iff the port is a local port.
221    
222     =cut
223    
224     sub port_is_local($) {
225 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
226 root 1.17
227 root 1.21 $NODE{$nodeid} == $NODE{""}
228 root 1.17 }
229    
230 root 1.18 =item snd_to_func $node, $func, @args
231 root 1.11
232 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
233 root 1.11 this function with the given arguments on that node.
234    
235 root 1.20 This function can be used to implement C<spawn>-like interfaces.
236 root 1.11
237     =cut
238    
239 root 1.18 sub snd_to_func($$;@) {
240 root 1.21 my $nodeid = shift;
241 root 1.11
242 root 1.41 # on $NODE, we artificially delay... (for spawn)
243     # this is very ugly - maybe we should simply delay ALL messages,
244     # to avoid deep recursion issues. but that's so... slow...
245 root 1.45 $AnyEvent::MP::Node::Self::DELAY = 1
246     if $nodeid ne $NODE;
247    
248 root 1.49 defined $nodeid #d#UGLY
249     or Carp::croak "'undef' is not a valid node ID/port ID";
250    
251 root 1.45 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
252 root 1.11 }
253    
254 root 1.18 =item snd_on $node, @msg
255    
256     Executes C<snd> with the given C<@msg> (which must include the destination
257     port) on the given node.
258    
259     =cut
260    
261     sub snd_on($@) {
262     my $node = shift;
263     snd $node, snd => @_;
264     }
265    
266 root 1.29 =item eval_on $node, $string[, @reply]
267 root 1.18
268 root 1.29 Evaluates the given string as Perl expression on the given node. When
269     @reply is specified, then it is used to construct a reply message with
270     C<"$@"> and any results from the eval appended.
271 root 1.18
272     =cut
273    
274 root 1.29 sub eval_on($$;@) {
275 root 1.18 my $node = shift;
276     snd $node, eval => @_;
277     }
278    
279 root 1.1 sub kil(@) {
280 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
281 root 1.1
282     length $portid
283 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
284 root 1.1
285 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
286 root 1.1 ->kill ("$portid", @_);
287     }
288    
289 root 1.7 sub _nodename {
290     require POSIX;
291     (POSIX::uname ())[1]
292     }
293    
294 root 1.21 sub _resolve($) {
295     my ($nodeid) = @_;
296 root 1.1
297     my $cv = AE::cv;
298     my @res;
299    
300     $cv->begin (sub {
301     my %seen;
302     my @refs;
303     for (sort { $a->[0] <=> $b->[0] } @res) {
304     push @refs, $_->[1] unless $seen{$_->[1]}++
305     }
306 root 1.21 shift->send (@refs);
307 root 1.1 });
308    
309     my $idx;
310 root 1.21 for my $t (split /,/, $nodeid) {
311 root 1.1 my $pri = ++$idx;
312 root 1.7
313     $t = length $t ? _nodename . ":$t" : _nodename
314     if $t =~ /^\d*$/;
315 root 1.1
316 root 1.34 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
317 root 1.7 or Carp::croak "$t: unparsable transport descriptor";
318    
319 root 1.33 $port = "0" if $port eq "*";
320    
321     if ($host eq "*") {
322     $cv->begin;
323     # use fork_call, as Net::Interface is big, and we need it rarely.
324     require AnyEvent::Util;
325     AnyEvent::Util::fork_call (
326     sub {
327     my @addr;
328    
329     require Net::Interface;
330    
331     for my $if (Net::Interface->interfaces) {
332     # we statically lower-prioritise ipv6 here, TODO :()
333 root 1.47 for $_ ($if->address (Net::Interface::AF_INET ())) {
334 root 1.33 next if /^\x7f/; # skip localhost etc.
335     push @addr, $_;
336     }
337     for ($if->address (Net::Interface::AF_INET6 ())) {
338     #next if $if->scope ($_) <= 2;
339     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
340     push @addr, $_;
341     }
342    
343     }
344     @addr
345     }, sub {
346     for my $ip (@_) {
347     push @res, [
348     $pri += 1e-5,
349     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
350     ];
351     }
352     $cv->end;
353     }
354     );
355     } else {
356     $cv->begin;
357     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
358     for (@_) {
359     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
360     push @res, [
361     $pri += 1e-5,
362     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
363     ];
364     }
365     $cv->end;
366     };
367     }
368 root 1.1 }
369    
370     $cv->end;
371    
372     $cv
373     }
374    
375 root 1.39 sub configure(@) {
376     unshift @_, "profile" if @_ & 1;
377 root 1.34 my (%kv) = @_;
378    
379     my $profile = delete $kv{profile};
380 root 1.1
381 root 1.21 $profile = _nodename
382     unless defined $profile;
383 root 1.6
384 root 1.32 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
385 root 1.24
386 root 1.47 delete $NODE{$NODE}; # we do not support doing stuff before configure
387    
388 root 1.24 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
389 root 1.51
390     $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
391    
392 root 1.24 $NODE = $node
393     unless $node eq "anon/";
394 root 1.6
395 root 1.21 $NODE{$NODE} = $NODE{""};
396     $NODE{$NODE}{id} = $NODE;
397 root 1.20
398 root 1.21 my $seeds = $CONFIG->{seeds};
399     my $binds = $CONFIG->{binds};
400 root 1.3
401 root 1.33 $binds ||= ["*"];
402 root 1.1
403 root 1.21 $WARN->(8, "node $NODE starting up.");
404 root 1.1
405 root 1.23 $LISTENER = [];
406     %LISTENER = ();
407    
408 root 1.21 for (map _resolve $_, @$binds) {
409     for my $bind ($_->recv) {
410     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
411     or Carp::croak "$bind: unparsable local bind address";
412 root 1.20
413 root 1.33 my $listener = AnyEvent::MP::Transport::mp_server
414     $host,
415     $port,
416     prepare => sub {
417     my (undef, $host, $port) = @_;
418     $bind = AnyEvent::Socket::format_hostport $host, $port;
419 root 1.53 0
420 root 1.33 },
421     ;
422     $LISTENER{$bind} = $listener;
423 root 1.21 push @$LISTENER, $bind;
424     }
425 root 1.1 }
426    
427 root 1.40 $WARN->(8, "node listens on [@$LISTENER].");
428    
429 root 1.21 # the global service is mandatory currently
430     require AnyEvent::MP::Global;
431 root 1.1
432 root 1.21 # connect to all seednodes
433     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
434 root 1.1
435 root 1.20 for (@{ $CONFIG->{services} }) {
436 root 1.43 if (ref) {
437     my ($func, @args) = @$_;
438     (load_func $func)->(@args);
439     } elsif (s/::$//) {
440 root 1.13 eval "require $_";
441     die $@ if $@;
442     } else {
443     (load_func $_)->();
444     }
445     }
446 root 1.1 }
447    
448     #############################################################################
449 root 1.6 # node monitoring and info
450 root 1.3
451 root 1.21 =item node_is_known $nodeid
452 root 1.13
453 root 1.46 Returns true iff the given node is currently known to the system. The only
454     time a node is known but not up currently is when a conenction request is
455     pending.
456 root 1.13
457     =cut
458    
459     sub node_is_known($) {
460     exists $NODE{$_[0]}
461     }
462    
463 root 1.21 =item node_is_up $nodeid
464 root 1.13
465     Returns true if the given node is "up", that is, the kernel thinks it has
466     a working connection to it.
467    
468     If the node is known but not currently connected, returns C<0>. If the
469     node is not known, returns C<undef>.
470    
471     =cut
472    
473     sub node_is_up($) {
474     ($NODE{$_[0]} or return)->{transport}
475     ? 1 : 0
476     }
477    
478 root 1.3 =item known_nodes
479    
480 root 1.26 Returns the node IDs of all nodes currently known to this node, including
481     itself and nodes not currently connected.
482 root 1.3
483     =cut
484    
485 root 1.49 sub known_nodes() {
486 root 1.26 map $_->{id}, values %NODE
487 root 1.3 }
488    
489     =item up_nodes
490    
491 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
492     the node itself).
493 root 1.3
494     =cut
495    
496 root 1.49 sub up_nodes() {
497 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
498 root 1.3 }
499    
500 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
501 root 1.3
502 root 1.27 Registers a callback that is called each time a node goes up (a connection
503     is established) or down (the connection is lost).
504 root 1.3
505     Node up messages can only be followed by node down messages for the same
506     node, and vice versa.
507    
508 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
509     port. This function is mainly of interest to modules that are concerned
510     about the network topology and low-level connection handling.
511    
512     Callbacks I<must not> block and I<should not> send any messages.
513    
514     The function returns an optional guard which can be used to unregister
515 root 1.3 the monitoring callback again.
516    
517 root 1.46 Example: make sure you call function C<newnode> for all nodes that are up
518     or go up (and down).
519    
520     newnode $_, 1 for up_nodes;
521     mon_nodes \&newnode;
522    
523 root 1.3 =cut
524    
525     our %MON_NODES;
526    
527     sub mon_nodes($) {
528     my ($cb) = @_;
529    
530     $MON_NODES{$cb+0} = $cb;
531    
532     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
533     }
534    
535     sub _inject_nodeevent($$;@) {
536 root 1.16 my ($node, $up, @reason) = @_;
537 root 1.3
538     for my $cb (values %MON_NODES) {
539 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
540 root 1.16 or $WARN->(1, $@);
541 root 1.3 }
542 root 1.16
543 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
544 root 1.3 }
545    
546     #############################################################################
547 root 1.1 # self node code
548    
549     our %node_req = (
550     # internal services
551    
552     # monitoring
553 root 1.27 mon0 => sub { # stop monitoring a port
554 root 1.1 my $portid = shift;
555     my $node = $SRCNODE;
556     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
557     },
558 root 1.27 mon1 => sub { # start monitoring a port
559 root 1.1 my $portid = shift;
560     my $node = $SRCNODE;
561 root 1.27 Scalar::Util::weaken $node; #TODO# ugly
562 root 1.1 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
563 root 1.27 $node->send (["", kil => $portid, @_])
564 root 1.39 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
565 root 1.1 });
566     },
567     kil => sub {
568     my $cbs = delete $SRCNODE->{lmon}{+shift}
569     or return;
570    
571     $_->(@_) for @$cbs;
572     },
573    
574 root 1.18 # "public" services - not actually public
575 root 1.1
576     # relay message to another node / generic echo
577 root 1.15 snd => \&snd,
578 root 1.27 snd_multiple => sub {
579 root 1.1 snd @$_ for @_
580     },
581    
582 root 1.4 # informational
583     info => sub {
584     snd @_, $NODE;
585     },
586     known_nodes => sub {
587     snd @_, known_nodes;
588     },
589     up_nodes => sub {
590     snd @_, up_nodes;
591     },
592    
593 root 1.30 # random utilities
594 root 1.1 eval => sub {
595 root 1.50 my @res = do { package main; eval shift };
596 root 1.1 snd @_, "$@", @res if @_;
597     },
598     time => sub {
599     snd @_, AE::time;
600     },
601     devnull => sub {
602     #
603     },
604 root 1.15 "" => sub {
605 root 1.27 # empty messages are keepalives or similar devnull-applications
606 root 1.15 },
607 root 1.1 );
608    
609 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
610 root 1.1 $PORT{""} = sub {
611     my $tag = shift;
612     eval { &{ $node_req{$tag} ||= load_func $tag } };
613 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
614 root 1.1 };
615    
616     =back
617    
618     =head1 SEE ALSO
619    
620     L<AnyEvent::MP>.
621    
622     =head1 AUTHOR
623    
624     Marc Lehmann <schmorp@schmorp.de>
625     http://home.schmorp.de/
626    
627     =cut
628    
629     1
630