ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.31
Committed: Sun Aug 30 17:08:16 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.30: +4 -4 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.28 our $VERSION = '0.9';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.21 add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.10 NODE $NODE node_of snd kil
44 root 1.17 port_is_local
45 root 1.1 resolve_node initialise_node
46 root 1.13 known_nodes up_nodes mon_nodes node_is_known node_is_up
47 root 1.1 );
48    
49     our $DEFAULT_PORT = "4040";
50    
51     our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
52     our $NETWORK_LATENCY = 3; # activity timeout
53     our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
54    
55 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
56 root 1.1
57 root 1.27 This value is called with an error or warning message, when e.g. a
58     connection could not be created, authorisation failed and so on.
59    
60     It I<must not> block or send messages -queue it and use an idle watcher if
61     you need to do any of these things.
62 root 1.1
63 root 1.16 C<$level> sould be C<0> for messages ot be logged always, C<1> for
64     unexpected messages and errors, C<2> for warnings, C<7> for messages about
65     node connectivity and services, C<8> for debugging messages and C<9> for
66     tracing messages.
67    
68 root 1.1 The default simply logs the message to STDERR.
69    
70     =cut
71    
72 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
73    
74 root 1.1 our $WARN = sub {
75 root 1.29 return if $WARNLEVEL < $_[0];
76    
77 root 1.16 my ($level, $msg) = @_;
78    
79 root 1.1 $msg =~ s/\n$//;
80 root 1.16
81     printf STDERR "%s <%d> %s\n",
82     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
83     $level,
84     $msg;
85 root 1.1 };
86    
87 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
88    
89     The maximum level at which warning messages will be printed to STDERR by
90     the default warn handler.
91    
92     =cut
93    
94 root 1.6 sub load_func($) {
95     my $func = $_[0];
96    
97     unless (defined &$func) {
98     my $pkg = $func;
99     do {
100     $pkg =~ s/::[^:]+$//
101     or return sub { die "unable to resolve '$func'" };
102     eval "require $pkg";
103     } until defined &$func;
104     }
105    
106     \&$func
107     }
108    
109 root 1.1 sub nonce($) {
110     my $nonce;
111    
112     if (open my $fh, "</dev/urandom") {
113     sysread $fh, $nonce, $_[0];
114     } else {
115     # shit...
116     our $nonce_init;
117 root 1.31 unless ($nonce_init++) {
118 root 1.1 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
119     }
120     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
121     }
122    
123     $nonce
124     }
125    
126 root 1.21 sub alnumbits($) {
127 root 1.1 my $data = $_[0];
128    
129     if (eval "use Math::GMP 2.05; 1") {
130     $data = Math::GMP::get_str_gmp (
131     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
132     62
133     );
134     } else {
135     $data = MIME::Base64::encode_base64 $data, "";
136     $data =~ s/=//;
137 root 1.31 $data =~ s/x/x0/g;
138     $data =~ s/\//x1/g;
139     $data =~ s/\+/x2/g;
140 root 1.1 }
141    
142     $data
143     }
144    
145     sub gen_uniq {
146 root 1.21 alnumbits pack "wNa*", $$, time, nonce 2
147 root 1.1 }
148    
149 root 1.20 our $CONFIG; # this node's configuration
150 root 1.21
151     our $RUNIQ = alnumbits nonce 16;; # remote uniq value
152 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
153 root 1.21 our $NODE = "anon/$RUNIQ";
154 root 1.1 our $ID = "a";
155    
156     our %NODE; # node id to transport mapping, or "undef", for local node
157     our (%PORT, %PORT_DATA); # local ports
158    
159 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
160 root 1.1 our %LMON; # monitored _local_ ports
161    
162     our %LISTENER;
163 root 1.21 our $LISTENER; # our listeners, as arrayref
164 root 1.1
165     our $SRCNODE; # holds the sending node during _inject
166    
167     sub NODE() {
168     $NODE
169     }
170    
171     sub node_of($) {
172 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
173 root 1.1
174 root 1.21 $node
175 root 1.1 }
176    
177 root 1.17 BEGIN {
178     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
179     ? sub () { 1 }
180     : sub () { 0 };
181     }
182 root 1.1
183     sub _inject {
184 root 1.25 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
185 root 1.1 &{ $PORT{+shift} or return };
186     }
187    
188 root 1.20 # this function adds a node-ref, so you can send stuff to it
189     # it is basically the central routing component.
190 root 1.1 sub add_node {
191 root 1.21 my ($node) = @_;
192 root 1.1
193 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
194 root 1.13 }
195    
196 root 1.1 sub snd(@) {
197 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
198 root 1.1
199 root 1.25 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
200 root 1.1
201 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
202 root 1.2 ->{send} (["$portid", @_]);
203 root 1.1 }
204    
205 root 1.17 =item $is_local = port_is_local $port
206    
207     Returns true iff the port is a local port.
208    
209     =cut
210    
211     sub port_is_local($) {
212 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
213 root 1.17
214 root 1.21 $NODE{$nodeid} == $NODE{""}
215 root 1.17 }
216    
217 root 1.18 =item snd_to_func $node, $func, @args
218 root 1.11
219 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
220 root 1.11 this function with the given arguments on that node.
221    
222 root 1.20 This function can be used to implement C<spawn>-like interfaces.
223 root 1.11
224     =cut
225    
226 root 1.18 sub snd_to_func($$;@) {
227 root 1.21 my $nodeid = shift;
228 root 1.11
229 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
230 root 1.11 ->send (["", @_]);
231     }
232    
233 root 1.18 =item snd_on $node, @msg
234    
235     Executes C<snd> with the given C<@msg> (which must include the destination
236     port) on the given node.
237    
238     =cut
239    
240     sub snd_on($@) {
241     my $node = shift;
242     snd $node, snd => @_;
243     }
244    
245 root 1.29 =item eval_on $node, $string[, @reply]
246 root 1.18
247 root 1.29 Evaluates the given string as Perl expression on the given node. When
248     @reply is specified, then it is used to construct a reply message with
249     C<"$@"> and any results from the eval appended.
250 root 1.18
251     =cut
252    
253 root 1.29 sub eval_on($$;@) {
254 root 1.18 my $node = shift;
255     snd $node, eval => @_;
256     }
257    
258 root 1.1 sub kil(@) {
259 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
260 root 1.1
261     length $portid
262 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
263 root 1.1
264 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
265 root 1.1 ->kill ("$portid", @_);
266     }
267    
268 root 1.7 sub _nodename {
269     require POSIX;
270     (POSIX::uname ())[1]
271     }
272    
273 root 1.21 sub _resolve($) {
274     my ($nodeid) = @_;
275 root 1.1
276     my $cv = AE::cv;
277     my @res;
278    
279     $cv->begin (sub {
280     my %seen;
281     my @refs;
282     for (sort { $a->[0] <=> $b->[0] } @res) {
283     push @refs, $_->[1] unless $seen{$_->[1]}++
284     }
285 root 1.21 shift->send (@refs);
286 root 1.1 });
287    
288 root 1.21 $nodeid = $DEFAULT_PORT unless length $nodeid;
289 root 1.1
290     my $idx;
291 root 1.21 for my $t (split /,/, $nodeid) {
292 root 1.1 my $pri = ++$idx;
293 root 1.7
294     $t = length $t ? _nodename . ":$t" : _nodename
295     if $t =~ /^\d*$/;
296 root 1.1
297 root 1.7 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
298     or Carp::croak "$t: unparsable transport descriptor";
299    
300     $cv->begin;
301     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
302     for (@_) {
303     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
304     push @res, [
305     $pri += 1e-5,
306     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
307     ];
308 root 1.1 }
309 root 1.7 $cv->end;
310     };
311 root 1.1 }
312    
313     $cv->end;
314    
315     $cv
316     }
317    
318 root 1.22 sub initialise_node(;$%) {
319 root 1.21 my ($profile) = @_;
320 root 1.1
321 root 1.21 $profile = _nodename
322     unless defined $profile;
323 root 1.6
324 root 1.21 $CONFIG = AnyEvent::MP::Config::find_profile $profile;
325 root 1.24
326     my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
327     $NODE = $node
328     unless $node eq "anon/";
329 root 1.6
330 root 1.21 $NODE{$NODE} = $NODE{""};
331     $NODE{$NODE}{id} = $NODE;
332 root 1.20
333 root 1.21 my $seeds = $CONFIG->{seeds};
334     my $binds = $CONFIG->{binds};
335 root 1.3
336 root 1.21 $binds ||= [$NODE];
337 root 1.1
338 root 1.21 $WARN->(8, "node $NODE starting up.");
339 root 1.1
340 root 1.23 $LISTENER = [];
341     %LISTENER = ();
342    
343 root 1.21 for (map _resolve $_, @$binds) {
344     for my $bind ($_->recv) {
345     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
346     or Carp::croak "$bind: unparsable local bind address";
347 root 1.20
348 root 1.21 $LISTENER{$bind} = AnyEvent::MP::Transport::mp_server $host, $port;
349     push @$LISTENER, $bind;
350     }
351 root 1.1 }
352    
353 root 1.21 # the global service is mandatory currently
354     require AnyEvent::MP::Global;
355 root 1.1
356 root 1.21 # connect to all seednodes
357     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
358 root 1.1
359 root 1.20 for (@{ $CONFIG->{services} }) {
360 root 1.13 if (s/::$//) {
361     eval "require $_";
362     die $@ if $@;
363     } else {
364     (load_func $_)->();
365     }
366     }
367 root 1.1 }
368    
369     #############################################################################
370 root 1.6 # node monitoring and info
371 root 1.3
372 root 1.21 =item node_is_known $nodeid
373 root 1.13
374     Returns true iff the given node is currently known to the system.
375    
376     =cut
377    
378     sub node_is_known($) {
379     exists $NODE{$_[0]}
380     }
381    
382 root 1.21 =item node_is_up $nodeid
383 root 1.13
384     Returns true if the given node is "up", that is, the kernel thinks it has
385     a working connection to it.
386    
387     If the node is known but not currently connected, returns C<0>. If the
388     node is not known, returns C<undef>.
389    
390     =cut
391    
392     sub node_is_up($) {
393     ($NODE{$_[0]} or return)->{transport}
394     ? 1 : 0
395     }
396    
397 root 1.3 =item known_nodes
398    
399 root 1.26 Returns the node IDs of all nodes currently known to this node, including
400     itself and nodes not currently connected.
401 root 1.3
402     =cut
403    
404     sub known_nodes {
405 root 1.26 map $_->{id}, values %NODE
406 root 1.3 }
407    
408     =item up_nodes
409    
410 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
411     the node itself).
412 root 1.3
413     =cut
414    
415     sub up_nodes {
416 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
417 root 1.3 }
418    
419 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
420 root 1.3
421 root 1.27 Registers a callback that is called each time a node goes up (a connection
422     is established) or down (the connection is lost).
423 root 1.3
424     Node up messages can only be followed by node down messages for the same
425     node, and vice versa.
426    
427 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
428     port. This function is mainly of interest to modules that are concerned
429     about the network topology and low-level connection handling.
430    
431     Callbacks I<must not> block and I<should not> send any messages.
432    
433     The function returns an optional guard which can be used to unregister
434 root 1.3 the monitoring callback again.
435    
436     =cut
437    
438     our %MON_NODES;
439    
440     sub mon_nodes($) {
441     my ($cb) = @_;
442    
443     $MON_NODES{$cb+0} = $cb;
444    
445     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
446     }
447    
448     sub _inject_nodeevent($$;@) {
449 root 1.16 my ($node, $up, @reason) = @_;
450 root 1.3
451     for my $cb (values %MON_NODES) {
452 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
453 root 1.16 or $WARN->(1, $@);
454 root 1.3 }
455 root 1.16
456 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
457 root 1.3 }
458    
459     #############################################################################
460 root 1.1 # self node code
461    
462     our %node_req = (
463     # internal services
464    
465     # monitoring
466 root 1.27 mon0 => sub { # stop monitoring a port
467 root 1.1 my $portid = shift;
468     my $node = $SRCNODE;
469     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
470     },
471 root 1.27 mon1 => sub { # start monitoring a port
472 root 1.1 my $portid = shift;
473     my $node = $SRCNODE;
474 root 1.27 Scalar::Util::weaken $node; #TODO# ugly
475 root 1.1 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
476 root 1.27 $node->send (["", kil => $portid, @_])
477     if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disocnnect
478 root 1.1 });
479     },
480     kil => sub {
481     my $cbs = delete $SRCNODE->{lmon}{+shift}
482     or return;
483    
484     $_->(@_) for @$cbs;
485     },
486    
487 root 1.18 # "public" services - not actually public
488 root 1.1
489     # relay message to another node / generic echo
490 root 1.15 snd => \&snd,
491 root 1.27 snd_multiple => sub {
492 root 1.1 snd @$_ for @_
493     },
494    
495 root 1.4 # informational
496     info => sub {
497     snd @_, $NODE;
498     },
499     known_nodes => sub {
500     snd @_, known_nodes;
501     },
502     up_nodes => sub {
503     snd @_, up_nodes;
504     },
505    
506 root 1.30 # random utilities
507 root 1.1 eval => sub {
508     my @res = eval shift;
509     snd @_, "$@", @res if @_;
510     },
511     time => sub {
512     snd @_, AE::time;
513     },
514     devnull => sub {
515     #
516     },
517 root 1.15 "" => sub {
518 root 1.27 # empty messages are keepalives or similar devnull-applications
519 root 1.15 },
520 root 1.1 );
521    
522 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
523 root 1.1 $PORT{""} = sub {
524     my $tag = shift;
525     eval { &{ $node_req{$tag} ||= load_func $tag } };
526 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
527 root 1.1 };
528    
529     =back
530    
531     =head1 SEE ALSO
532    
533     L<AnyEvent::MP>.
534    
535     =head1 AUTHOR
536    
537     Marc Lehmann <schmorp@schmorp.de>
538     http://home.schmorp.de/
539    
540     =cut
541    
542     1
543