ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.21
Committed: Thu Aug 27 21:29:37 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.20: +66 -130 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16     connected nodes etc. are needed.
17    
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.19 our $VERSION = '0.8';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.21 add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.10 NODE $NODE node_of snd kil
44 root 1.17 port_is_local
45 root 1.1 resolve_node initialise_node
46 root 1.13 known_nodes up_nodes mon_nodes node_is_known node_is_up
47 root 1.1 );
48    
49     our $DEFAULT_PORT = "4040";
50    
51     our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
52     our $NETWORK_LATENCY = 3; # activity timeout
53     our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
54    
55 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
56 root 1.1
57     This value is called with an error or warning message, when e.g. a connection
58     could not be created, authorisation failed and so on.
59    
60 root 1.16 C<$level> sould be C<0> for messages ot be logged always, C<1> for
61     unexpected messages and errors, C<2> for warnings, C<7> for messages about
62     node connectivity and services, C<8> for debugging messages and C<9> for
63     tracing messages.
64    
65 root 1.1 The default simply logs the message to STDERR.
66    
67     =cut
68    
69     our $WARN = sub {
70 root 1.16 my ($level, $msg) = @_;
71    
72 root 1.1 $msg =~ s/\n$//;
73 root 1.16
74     printf STDERR "%s <%d> %s\n",
75     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
76     $level,
77     $msg;
78 root 1.1 };
79    
80 root 1.6 sub load_func($) {
81     my $func = $_[0];
82    
83     unless (defined &$func) {
84     my $pkg = $func;
85     do {
86     $pkg =~ s/::[^:]+$//
87     or return sub { die "unable to resolve '$func'" };
88     eval "require $pkg";
89     } until defined &$func;
90     }
91    
92     \&$func
93     }
94    
95 root 1.1 sub nonce($) {
96     my $nonce;
97    
98     if (open my $fh, "</dev/urandom") {
99     sysread $fh, $nonce, $_[0];
100     } else {
101     # shit...
102     our $nonce_init;
103     unless ($nonce_init++) {
104     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
105     }
106    
107     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
108     }
109    
110     $nonce
111     }
112    
113 root 1.21 sub alnumbits($) {
114 root 1.1 my $data = $_[0];
115    
116     if (eval "use Math::GMP 2.05; 1") {
117     $data = Math::GMP::get_str_gmp (
118     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
119     62
120     );
121     } else {
122     $data = MIME::Base64::encode_base64 $data, "";
123     $data =~ s/=//;
124     $data =~ s/\//s/g;
125     $data =~ s/\+/p/g;
126     }
127    
128     $data
129     }
130    
131     sub gen_uniq {
132 root 1.21 alnumbits pack "wNa*", $$, time, nonce 2
133 root 1.1 }
134    
135 root 1.3 =item $AnyEvent::MP::Kernel::PUBLIC
136    
137 root 1.21 A boolean indicating whether this is a public node, which can create and
138     accept direct connections from other nodes.
139 root 1.3
140     =cut
141    
142 root 1.20 our $CONFIG; # this node's configuration
143 root 1.21
144     our $RUNIQ = alnumbits nonce 16;; # remote uniq value
145 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
146 root 1.21 our $NODE = "anon/$RUNIQ";
147 root 1.1 our $ID = "a";
148    
149     our %NODE; # node id to transport mapping, or "undef", for local node
150     our (%PORT, %PORT_DATA); # local ports
151    
152 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
153 root 1.1 our %LMON; # monitored _local_ ports
154    
155     our %LISTENER;
156 root 1.21 our $LISTENER; # our listeners, as arrayref
157 root 1.1
158     our $SRCNODE; # holds the sending node during _inject
159    
160     sub NODE() {
161     $NODE
162     }
163    
164     sub node_of($) {
165 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
166 root 1.1
167 root 1.21 $node
168 root 1.1 }
169    
170 root 1.17 BEGIN {
171     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
172     ? sub () { 1 }
173     : sub () { 0 };
174     }
175 root 1.1
176     sub _inject {
177 root 1.21 warn "RCV $SRCNODE->{id} -> @_\n" if TRACE && @_;#d#
178 root 1.1 &{ $PORT{+shift} or return };
179     }
180    
181 root 1.20 # this function adds a node-ref, so you can send stuff to it
182     # it is basically the central routing component.
183 root 1.1 sub add_node {
184 root 1.21 my ($node) = @_;
185 root 1.1
186 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
187 root 1.13 }
188    
189 root 1.1 sub snd(@) {
190 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
191 root 1.1
192 root 1.21 warn "SND $nodeid <- $portid @_\n" if TRACE;#d#
193 root 1.1
194 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
195 root 1.2 ->{send} (["$portid", @_]);
196 root 1.1 }
197    
198 root 1.17 =item $is_local = port_is_local $port
199    
200     Returns true iff the port is a local port.
201    
202     =cut
203    
204     sub port_is_local($) {
205 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
206 root 1.17
207 root 1.21 $NODE{$nodeid} == $NODE{""}
208 root 1.17 }
209    
210 root 1.18 =item snd_to_func $node, $func, @args
211 root 1.11
212 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
213 root 1.11 this function with the given arguments on that node.
214    
215 root 1.20 This function can be used to implement C<spawn>-like interfaces.
216 root 1.11
217     =cut
218    
219 root 1.18 sub snd_to_func($$;@) {
220 root 1.21 my $nodeid = shift;
221 root 1.11
222 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
223 root 1.11 ->send (["", @_]);
224     }
225    
226 root 1.18 =item snd_on $node, @msg
227    
228     Executes C<snd> with the given C<@msg> (which must include the destination
229     port) on the given node.
230    
231     =cut
232    
233     sub snd_on($@) {
234     my $node = shift;
235     snd $node, snd => @_;
236     }
237    
238     =item eval_on $node, $string
239    
240     Evaluates the given string as Perl expression on the given node.
241    
242     =cut
243    
244     sub eval_on($@) {
245     my $node = shift;
246     snd $node, eval => @_;
247     }
248    
249 root 1.1 sub kil(@) {
250 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
251 root 1.1
252     length $portid
253 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
254 root 1.1
255 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
256 root 1.1 ->kill ("$portid", @_);
257     }
258    
259 root 1.7 sub _nodename {
260     require POSIX;
261     (POSIX::uname ())[1]
262     }
263    
264 root 1.21 sub _resolve($) {
265     my ($nodeid) = @_;
266 root 1.1
267     my $cv = AE::cv;
268     my @res;
269    
270     $cv->begin (sub {
271     my %seen;
272     my @refs;
273     for (sort { $a->[0] <=> $b->[0] } @res) {
274     push @refs, $_->[1] unless $seen{$_->[1]}++
275     }
276 root 1.21 shift->send (@refs);
277 root 1.1 });
278    
279 root 1.21 $nodeid = $DEFAULT_PORT unless length $nodeid;
280 root 1.1
281     my $idx;
282 root 1.21 for my $t (split /,/, $nodeid) {
283 root 1.1 my $pri = ++$idx;
284 root 1.7
285     $t = length $t ? _nodename . ":$t" : _nodename
286     if $t =~ /^\d*$/;
287 root 1.1
288 root 1.7 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
289     or Carp::croak "$t: unparsable transport descriptor";
290    
291     $cv->begin;
292     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
293     for (@_) {
294     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
295     push @res, [
296     $pri += 1e-5,
297     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
298     ];
299 root 1.1 }
300 root 1.7 $cv->end;
301     };
302 root 1.1 }
303    
304     $cv->end;
305    
306     $cv
307     }
308    
309 root 1.21 sub initialise_node($;%) {
310     my ($profile) = @_;
311 root 1.1
312 root 1.21 $profile = _nodename
313     unless defined $profile;
314 root 1.6
315 root 1.21 $CONFIG = AnyEvent::MP::Config::find_profile $profile;
316     $NODE = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid}
317     : $profile eq "anon/" ? $NODE
318     : $profile;
319 root 1.6
320 root 1.21 $NODE{$NODE} = $NODE{""};
321     $NODE{$NODE}{id} = $NODE;
322 root 1.20
323 root 1.21 my $seeds = $CONFIG->{seeds};
324     my $binds = $CONFIG->{binds};
325 root 1.3
326 root 1.21 $binds ||= [$NODE];
327 root 1.1
328 root 1.21 $WARN->(8, "node $NODE starting up.");
329 root 1.1
330 root 1.21 for (map _resolve $_, @$binds) {
331     for my $bind ($_->recv) {
332     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
333     or Carp::croak "$bind: unparsable local bind address";
334 root 1.20
335 root 1.21 $LISTENER{$bind} = AnyEvent::MP::Transport::mp_server $host, $port;
336     push @$LISTENER, $bind;
337     }
338 root 1.1 }
339    
340 root 1.21 # the global service is mandatory currently
341     require AnyEvent::MP::Global;
342 root 1.1
343 root 1.21 # connect to all seednodes
344     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
345 root 1.1
346 root 1.20 for (@{ $CONFIG->{services} }) {
347 root 1.13 if (s/::$//) {
348     eval "require $_";
349     die $@ if $@;
350     } else {
351     (load_func $_)->();
352     }
353     }
354 root 1.1 }
355    
356     #############################################################################
357 root 1.6 # node monitoring and info
358 root 1.3
359     sub _uniq_nodes {
360     my %node;
361    
362     @node{values %NODE} = values %NODE;
363    
364     values %node;
365     }
366    
367 root 1.20 sub _public_nodes {
368 root 1.21 &_uniq_nodes
369 root 1.20 }
370    
371 root 1.21 =item node_is_known $nodeid
372 root 1.13
373     Returns true iff the given node is currently known to the system.
374    
375     =cut
376    
377     sub node_is_known($) {
378     exists $NODE{$_[0]}
379     }
380    
381 root 1.21 =item node_is_up $nodeid
382 root 1.13
383     Returns true if the given node is "up", that is, the kernel thinks it has
384     a working connection to it.
385    
386     If the node is known but not currently connected, returns C<0>. If the
387     node is not known, returns C<undef>.
388    
389     =cut
390    
391     sub node_is_up($) {
392     ($NODE{$_[0]} or return)->{transport}
393     ? 1 : 0
394     }
395    
396 root 1.3 =item known_nodes
397    
398 root 1.21 Returns the node IDs of all public nodes connected to this node, including
399 root 1.18 itself.
400 root 1.3
401     =cut
402    
403     sub known_nodes {
404 root 1.21 map $_->{id}, _public_nodes
405 root 1.3 }
406    
407     =item up_nodes
408    
409 root 1.21 Return the node IDs of all public nodes that are currently connected
410 root 1.20 (excluding the node itself).
411 root 1.3
412     =cut
413    
414     sub up_nodes {
415 root 1.21 map $_->{id}, grep $_->{transport}, _public_nodes
416 root 1.3 }
417    
418 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
419 root 1.3
420     Registers a callback that is called each time a node goes up (connection
421     is established) or down (connection is lost).
422    
423     Node up messages can only be followed by node down messages for the same
424     node, and vice versa.
425    
426 root 1.13 The function returns an optional guard which can be used to de-register
427 root 1.3 the monitoring callback again.
428    
429     =cut
430    
431     our %MON_NODES;
432    
433     sub mon_nodes($) {
434     my ($cb) = @_;
435    
436     $MON_NODES{$cb+0} = $cb;
437    
438     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
439     }
440    
441     sub _inject_nodeevent($$;@) {
442 root 1.16 my ($node, $up, @reason) = @_;
443 root 1.3
444     for my $cb (values %MON_NODES) {
445 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
446 root 1.16 or $WARN->(1, $@);
447 root 1.3 }
448 root 1.16
449 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
450 root 1.3 }
451    
452     #############################################################################
453 root 1.1 # self node code
454    
455     our %node_req = (
456     # internal services
457    
458     # monitoring
459     mon0 => sub { # disable monitoring
460     my $portid = shift;
461     my $node = $SRCNODE;
462     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
463     },
464     mon1 => sub { # enable monitoring
465     my $portid = shift;
466     my $node = $SRCNODE;
467     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
468     $node->send (["", kil => $portid, @_]);
469     });
470     },
471     kil => sub {
472     my $cbs = delete $SRCNODE->{lmon}{+shift}
473     or return;
474    
475     $_->(@_) for @$cbs;
476     },
477    
478 root 1.18 # "public" services - not actually public
479 root 1.1
480     # relay message to another node / generic echo
481 root 1.15 snd => \&snd,
482     snd_multi => sub {
483 root 1.1 snd @$_ for @_
484     },
485    
486 root 1.4 # informational
487     info => sub {
488     snd @_, $NODE;
489     },
490     known_nodes => sub {
491     snd @_, known_nodes;
492     },
493     up_nodes => sub {
494     snd @_, up_nodes;
495     },
496    
497 root 1.1 # random garbage
498     eval => sub {
499     my @res = eval shift;
500     snd @_, "$@", @res if @_;
501     },
502     time => sub {
503     snd @_, AE::time;
504     },
505     devnull => sub {
506     #
507     },
508 root 1.15 "" => sub {
509     # empty messages are sent by monitoring
510     },
511 root 1.1 );
512    
513 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
514 root 1.1 $PORT{""} = sub {
515     my $tag = shift;
516     eval { &{ $node_req{$tag} ||= load_func $tag } };
517 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
518 root 1.1 };
519    
520     =back
521    
522     =head1 SEE ALSO
523    
524     L<AnyEvent::MP>.
525    
526     =head1 AUTHOR
527    
528     Marc Lehmann <schmorp@schmorp.de>
529     http://home.schmorp.de/
530    
531     =cut
532    
533     1
534