ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.28
Committed: Sat Aug 29 16:48:18 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-0_9
Changes since 1.27: +1 -1 lines
Log Message:
0.9

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.28 our $VERSION = '0.9';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.21 add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.10 NODE $NODE node_of snd kil
44 root 1.17 port_is_local
45 root 1.1 resolve_node initialise_node
46 root 1.13 known_nodes up_nodes mon_nodes node_is_known node_is_up
47 root 1.1 );
48    
49     our $DEFAULT_PORT = "4040";
50    
51     our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
52     our $NETWORK_LATENCY = 3; # activity timeout
53     our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
54    
55 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
56 root 1.1
57 root 1.27 This value is called with an error or warning message, when e.g. a
58     connection could not be created, authorisation failed and so on.
59    
60     It I<must not> block or send messages -queue it and use an idle watcher if
61     you need to do any of these things.
62 root 1.1
63 root 1.16 C<$level> sould be C<0> for messages ot be logged always, C<1> for
64     unexpected messages and errors, C<2> for warnings, C<7> for messages about
65     node connectivity and services, C<8> for debugging messages and C<9> for
66     tracing messages.
67    
68 root 1.1 The default simply logs the message to STDERR.
69    
70     =cut
71    
72     our $WARN = sub {
73 root 1.16 my ($level, $msg) = @_;
74    
75 root 1.1 $msg =~ s/\n$//;
76 root 1.16
77     printf STDERR "%s <%d> %s\n",
78     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
79     $level,
80     $msg;
81 root 1.1 };
82    
83 root 1.6 sub load_func($) {
84     my $func = $_[0];
85    
86     unless (defined &$func) {
87     my $pkg = $func;
88     do {
89     $pkg =~ s/::[^:]+$//
90     or return sub { die "unable to resolve '$func'" };
91     eval "require $pkg";
92     } until defined &$func;
93     }
94    
95     \&$func
96     }
97    
98 root 1.1 sub nonce($) {
99     my $nonce;
100    
101     if (open my $fh, "</dev/urandom") {
102     sysread $fh, $nonce, $_[0];
103     } else {
104     # shit...
105     our $nonce_init;
106     unless ($nonce_init++) {
107     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
108     }
109    
110     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
111     }
112    
113     $nonce
114     }
115    
116 root 1.21 sub alnumbits($) {
117 root 1.1 my $data = $_[0];
118    
119     if (eval "use Math::GMP 2.05; 1") {
120     $data = Math::GMP::get_str_gmp (
121     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
122     62
123     );
124     } else {
125     $data = MIME::Base64::encode_base64 $data, "";
126     $data =~ s/=//;
127     $data =~ s/\//s/g;
128     $data =~ s/\+/p/g;
129     }
130    
131     $data
132     }
133    
134     sub gen_uniq {
135 root 1.21 alnumbits pack "wNa*", $$, time, nonce 2
136 root 1.1 }
137    
138 root 1.20 our $CONFIG; # this node's configuration
139 root 1.21
140     our $RUNIQ = alnumbits nonce 16;; # remote uniq value
141 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
142 root 1.21 our $NODE = "anon/$RUNIQ";
143 root 1.1 our $ID = "a";
144    
145     our %NODE; # node id to transport mapping, or "undef", for local node
146     our (%PORT, %PORT_DATA); # local ports
147    
148 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
149 root 1.1 our %LMON; # monitored _local_ ports
150    
151     our %LISTENER;
152 root 1.21 our $LISTENER; # our listeners, as arrayref
153 root 1.1
154     our $SRCNODE; # holds the sending node during _inject
155    
156     sub NODE() {
157     $NODE
158     }
159    
160     sub node_of($) {
161 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
162 root 1.1
163 root 1.21 $node
164 root 1.1 }
165    
166 root 1.17 BEGIN {
167     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
168     ? sub () { 1 }
169     : sub () { 0 };
170     }
171 root 1.1
172     sub _inject {
173 root 1.25 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
174 root 1.1 &{ $PORT{+shift} or return };
175     }
176    
177 root 1.20 # this function adds a node-ref, so you can send stuff to it
178     # it is basically the central routing component.
179 root 1.1 sub add_node {
180 root 1.21 my ($node) = @_;
181 root 1.1
182 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
183 root 1.13 }
184    
185 root 1.1 sub snd(@) {
186 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
187 root 1.1
188 root 1.25 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
189 root 1.1
190 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
191 root 1.2 ->{send} (["$portid", @_]);
192 root 1.1 }
193    
194 root 1.17 =item $is_local = port_is_local $port
195    
196     Returns true iff the port is a local port.
197    
198     =cut
199    
200     sub port_is_local($) {
201 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
202 root 1.17
203 root 1.21 $NODE{$nodeid} == $NODE{""}
204 root 1.17 }
205    
206 root 1.18 =item snd_to_func $node, $func, @args
207 root 1.11
208 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
209 root 1.11 this function with the given arguments on that node.
210    
211 root 1.20 This function can be used to implement C<spawn>-like interfaces.
212 root 1.11
213     =cut
214    
215 root 1.18 sub snd_to_func($$;@) {
216 root 1.21 my $nodeid = shift;
217 root 1.11
218 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
219 root 1.11 ->send (["", @_]);
220     }
221    
222 root 1.18 =item snd_on $node, @msg
223    
224     Executes C<snd> with the given C<@msg> (which must include the destination
225     port) on the given node.
226    
227     =cut
228    
229     sub snd_on($@) {
230     my $node = shift;
231     snd $node, snd => @_;
232     }
233    
234     =item eval_on $node, $string
235    
236     Evaluates the given string as Perl expression on the given node.
237    
238     =cut
239    
240     sub eval_on($@) {
241     my $node = shift;
242     snd $node, eval => @_;
243     }
244    
245 root 1.1 sub kil(@) {
246 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
247 root 1.1
248     length $portid
249 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
250 root 1.1
251 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
252 root 1.1 ->kill ("$portid", @_);
253     }
254    
255 root 1.7 sub _nodename {
256     require POSIX;
257     (POSIX::uname ())[1]
258     }
259    
260 root 1.21 sub _resolve($) {
261     my ($nodeid) = @_;
262 root 1.1
263     my $cv = AE::cv;
264     my @res;
265    
266     $cv->begin (sub {
267     my %seen;
268     my @refs;
269     for (sort { $a->[0] <=> $b->[0] } @res) {
270     push @refs, $_->[1] unless $seen{$_->[1]}++
271     }
272 root 1.21 shift->send (@refs);
273 root 1.1 });
274    
275 root 1.21 $nodeid = $DEFAULT_PORT unless length $nodeid;
276 root 1.1
277     my $idx;
278 root 1.21 for my $t (split /,/, $nodeid) {
279 root 1.1 my $pri = ++$idx;
280 root 1.7
281     $t = length $t ? _nodename . ":$t" : _nodename
282     if $t =~ /^\d*$/;
283 root 1.1
284 root 1.7 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
285     or Carp::croak "$t: unparsable transport descriptor";
286    
287     $cv->begin;
288     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
289     for (@_) {
290     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
291     push @res, [
292     $pri += 1e-5,
293     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
294     ];
295 root 1.1 }
296 root 1.7 $cv->end;
297     };
298 root 1.1 }
299    
300     $cv->end;
301    
302     $cv
303     }
304    
305 root 1.22 sub initialise_node(;$%) {
306 root 1.21 my ($profile) = @_;
307 root 1.1
308 root 1.21 $profile = _nodename
309     unless defined $profile;
310 root 1.6
311 root 1.21 $CONFIG = AnyEvent::MP::Config::find_profile $profile;
312 root 1.24
313     my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
314     $NODE = $node
315     unless $node eq "anon/";
316 root 1.6
317 root 1.21 $NODE{$NODE} = $NODE{""};
318     $NODE{$NODE}{id} = $NODE;
319 root 1.20
320 root 1.21 my $seeds = $CONFIG->{seeds};
321     my $binds = $CONFIG->{binds};
322 root 1.3
323 root 1.21 $binds ||= [$NODE];
324 root 1.1
325 root 1.21 $WARN->(8, "node $NODE starting up.");
326 root 1.1
327 root 1.23 $LISTENER = [];
328     %LISTENER = ();
329    
330 root 1.21 for (map _resolve $_, @$binds) {
331     for my $bind ($_->recv) {
332     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
333     or Carp::croak "$bind: unparsable local bind address";
334 root 1.20
335 root 1.21 $LISTENER{$bind} = AnyEvent::MP::Transport::mp_server $host, $port;
336     push @$LISTENER, $bind;
337     }
338 root 1.1 }
339    
340 root 1.21 # the global service is mandatory currently
341     require AnyEvent::MP::Global;
342 root 1.1
343 root 1.21 # connect to all seednodes
344     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
345 root 1.1
346 root 1.20 for (@{ $CONFIG->{services} }) {
347 root 1.13 if (s/::$//) {
348     eval "require $_";
349     die $@ if $@;
350     } else {
351     (load_func $_)->();
352     }
353     }
354 root 1.1 }
355    
356     #############################################################################
357 root 1.6 # node monitoring and info
358 root 1.3
359 root 1.21 =item node_is_known $nodeid
360 root 1.13
361     Returns true iff the given node is currently known to the system.
362    
363     =cut
364    
365     sub node_is_known($) {
366     exists $NODE{$_[0]}
367     }
368    
369 root 1.21 =item node_is_up $nodeid
370 root 1.13
371     Returns true if the given node is "up", that is, the kernel thinks it has
372     a working connection to it.
373    
374     If the node is known but not currently connected, returns C<0>. If the
375     node is not known, returns C<undef>.
376    
377     =cut
378    
379     sub node_is_up($) {
380     ($NODE{$_[0]} or return)->{transport}
381     ? 1 : 0
382     }
383    
384 root 1.3 =item known_nodes
385    
386 root 1.26 Returns the node IDs of all nodes currently known to this node, including
387     itself and nodes not currently connected.
388 root 1.3
389     =cut
390    
391     sub known_nodes {
392 root 1.26 map $_->{id}, values %NODE
393 root 1.3 }
394    
395     =item up_nodes
396    
397 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
398     the node itself).
399 root 1.3
400     =cut
401    
402     sub up_nodes {
403 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
404 root 1.3 }
405    
406 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
407 root 1.3
408 root 1.27 Registers a callback that is called each time a node goes up (a connection
409     is established) or down (the connection is lost).
410 root 1.3
411     Node up messages can only be followed by node down messages for the same
412     node, and vice versa.
413    
414 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
415     port. This function is mainly of interest to modules that are concerned
416     about the network topology and low-level connection handling.
417    
418     Callbacks I<must not> block and I<should not> send any messages.
419    
420     The function returns an optional guard which can be used to unregister
421 root 1.3 the monitoring callback again.
422    
423     =cut
424    
425     our %MON_NODES;
426    
427     sub mon_nodes($) {
428     my ($cb) = @_;
429    
430     $MON_NODES{$cb+0} = $cb;
431    
432     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
433     }
434    
435     sub _inject_nodeevent($$;@) {
436 root 1.16 my ($node, $up, @reason) = @_;
437 root 1.3
438     for my $cb (values %MON_NODES) {
439 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
440 root 1.16 or $WARN->(1, $@);
441 root 1.3 }
442 root 1.16
443 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
444 root 1.3 }
445    
446     #############################################################################
447 root 1.1 # self node code
448    
449     our %node_req = (
450     # internal services
451    
452     # monitoring
453 root 1.27 mon0 => sub { # stop monitoring a port
454 root 1.1 my $portid = shift;
455     my $node = $SRCNODE;
456     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
457     },
458 root 1.27 mon1 => sub { # start monitoring a port
459 root 1.1 my $portid = shift;
460     my $node = $SRCNODE;
461 root 1.27 Scalar::Util::weaken $node; #TODO# ugly
462 root 1.1 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
463 root 1.27 $node->send (["", kil => $portid, @_])
464     if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disocnnect
465 root 1.1 });
466     },
467     kil => sub {
468     my $cbs = delete $SRCNODE->{lmon}{+shift}
469     or return;
470    
471     $_->(@_) for @$cbs;
472     },
473    
474 root 1.18 # "public" services - not actually public
475 root 1.1
476     # relay message to another node / generic echo
477 root 1.15 snd => \&snd,
478 root 1.27 snd_multiple => sub {
479 root 1.1 snd @$_ for @_
480     },
481    
482 root 1.4 # informational
483     info => sub {
484     snd @_, $NODE;
485     },
486     known_nodes => sub {
487     snd @_, known_nodes;
488     },
489     up_nodes => sub {
490     snd @_, up_nodes;
491     },
492    
493 root 1.1 # random garbage
494     eval => sub {
495     my @res = eval shift;
496     snd @_, "$@", @res if @_;
497     },
498     time => sub {
499     snd @_, AE::time;
500     },
501     devnull => sub {
502     #
503     },
504 root 1.15 "" => sub {
505 root 1.27 # empty messages are keepalives or similar devnull-applications
506 root 1.15 },
507 root 1.1 );
508    
509 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
510 root 1.1 $PORT{""} = sub {
511     my $tag = shift;
512     eval { &{ $node_req{$tag} ||= load_func $tag } };
513 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
514 root 1.1 };
515    
516     =back
517    
518     =head1 SEE ALSO
519    
520     L<AnyEvent::MP>.
521    
522     =head1 AUTHOR
523    
524     Marc Lehmann <schmorp@schmorp.de>
525     http://home.schmorp.de/
526    
527     =cut
528    
529     1
530