ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.25
Committed: Fri Aug 28 20:57:42 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.24: +2 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16     connected nodes etc. are needed.
17    
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.19 our $VERSION = '0.8';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.21 add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.10 NODE $NODE node_of snd kil
44 root 1.17 port_is_local
45 root 1.1 resolve_node initialise_node
46 root 1.13 known_nodes up_nodes mon_nodes node_is_known node_is_up
47 root 1.1 );
48    
49     our $DEFAULT_PORT = "4040";
50    
51     our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
52     our $NETWORK_LATENCY = 3; # activity timeout
53     our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
54    
55 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
56 root 1.1
57     This value is called with an error or warning message, when e.g. a connection
58     could not be created, authorisation failed and so on.
59    
60 root 1.16 C<$level> sould be C<0> for messages ot be logged always, C<1> for
61     unexpected messages and errors, C<2> for warnings, C<7> for messages about
62     node connectivity and services, C<8> for debugging messages and C<9> for
63     tracing messages.
64    
65 root 1.1 The default simply logs the message to STDERR.
66    
67     =cut
68    
69     our $WARN = sub {
70 root 1.16 my ($level, $msg) = @_;
71    
72 root 1.1 $msg =~ s/\n$//;
73 root 1.16
74     printf STDERR "%s <%d> %s\n",
75     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
76     $level,
77     $msg;
78 root 1.1 };
79    
80 root 1.6 sub load_func($) {
81     my $func = $_[0];
82    
83     unless (defined &$func) {
84     my $pkg = $func;
85     do {
86     $pkg =~ s/::[^:]+$//
87     or return sub { die "unable to resolve '$func'" };
88     eval "require $pkg";
89     } until defined &$func;
90     }
91    
92     \&$func
93     }
94    
95 root 1.1 sub nonce($) {
96     my $nonce;
97    
98     if (open my $fh, "</dev/urandom") {
99     sysread $fh, $nonce, $_[0];
100     } else {
101     # shit...
102     our $nonce_init;
103     unless ($nonce_init++) {
104     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
105     }
106    
107     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
108     }
109    
110     $nonce
111     }
112    
113 root 1.21 sub alnumbits($) {
114 root 1.1 my $data = $_[0];
115    
116     if (eval "use Math::GMP 2.05; 1") {
117     $data = Math::GMP::get_str_gmp (
118     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
119     62
120     );
121     } else {
122     $data = MIME::Base64::encode_base64 $data, "";
123     $data =~ s/=//;
124     $data =~ s/\//s/g;
125     $data =~ s/\+/p/g;
126     }
127    
128     $data
129     }
130    
131     sub gen_uniq {
132 root 1.21 alnumbits pack "wNa*", $$, time, nonce 2
133 root 1.1 }
134    
135 root 1.3 =item $AnyEvent::MP::Kernel::PUBLIC
136    
137 root 1.21 A boolean indicating whether this is a public node, which can create and
138     accept direct connections from other nodes.
139 root 1.3
140     =cut
141    
142 root 1.20 our $CONFIG; # this node's configuration
143 root 1.21
144     our $RUNIQ = alnumbits nonce 16;; # remote uniq value
145 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
146 root 1.21 our $NODE = "anon/$RUNIQ";
147 root 1.1 our $ID = "a";
148    
149     our %NODE; # node id to transport mapping, or "undef", for local node
150     our (%PORT, %PORT_DATA); # local ports
151    
152 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
153 root 1.1 our %LMON; # monitored _local_ ports
154    
155     our %LISTENER;
156 root 1.21 our $LISTENER; # our listeners, as arrayref
157 root 1.1
158     our $SRCNODE; # holds the sending node during _inject
159    
160     sub NODE() {
161     $NODE
162     }
163    
164     sub node_of($) {
165 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
166 root 1.1
167 root 1.21 $node
168 root 1.1 }
169    
170 root 1.17 BEGIN {
171     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
172     ? sub () { 1 }
173     : sub () { 0 };
174     }
175 root 1.1
176     sub _inject {
177 root 1.25 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
178 root 1.1 &{ $PORT{+shift} or return };
179     }
180    
181 root 1.20 # this function adds a node-ref, so you can send stuff to it
182     # it is basically the central routing component.
183 root 1.1 sub add_node {
184 root 1.21 my ($node) = @_;
185 root 1.1
186 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
187 root 1.13 }
188    
189 root 1.1 sub snd(@) {
190 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
191 root 1.1
192 root 1.25 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
193 root 1.1
194 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
195 root 1.2 ->{send} (["$portid", @_]);
196 root 1.1 }
197    
198 root 1.17 =item $is_local = port_is_local $port
199    
200     Returns true iff the port is a local port.
201    
202     =cut
203    
204     sub port_is_local($) {
205 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
206 root 1.17
207 root 1.21 $NODE{$nodeid} == $NODE{""}
208 root 1.17 }
209    
210 root 1.18 =item snd_to_func $node, $func, @args
211 root 1.11
212 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
213 root 1.11 this function with the given arguments on that node.
214    
215 root 1.20 This function can be used to implement C<spawn>-like interfaces.
216 root 1.11
217     =cut
218    
219 root 1.18 sub snd_to_func($$;@) {
220 root 1.21 my $nodeid = shift;
221 root 1.11
222 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
223 root 1.11 ->send (["", @_]);
224     }
225    
226 root 1.18 =item snd_on $node, @msg
227    
228     Executes C<snd> with the given C<@msg> (which must include the destination
229     port) on the given node.
230    
231     =cut
232    
233     sub snd_on($@) {
234     my $node = shift;
235     snd $node, snd => @_;
236     }
237    
238     =item eval_on $node, $string
239    
240     Evaluates the given string as Perl expression on the given node.
241    
242     =cut
243    
244     sub eval_on($@) {
245     my $node = shift;
246     snd $node, eval => @_;
247     }
248    
249 root 1.1 sub kil(@) {
250 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
251 root 1.1
252     length $portid
253 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
254 root 1.1
255 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
256 root 1.1 ->kill ("$portid", @_);
257     }
258    
259 root 1.7 sub _nodename {
260     require POSIX;
261     (POSIX::uname ())[1]
262     }
263    
264 root 1.21 sub _resolve($) {
265     my ($nodeid) = @_;
266 root 1.1
267     my $cv = AE::cv;
268     my @res;
269    
270     $cv->begin (sub {
271     my %seen;
272     my @refs;
273     for (sort { $a->[0] <=> $b->[0] } @res) {
274     push @refs, $_->[1] unless $seen{$_->[1]}++
275     }
276 root 1.21 shift->send (@refs);
277 root 1.1 });
278    
279 root 1.21 $nodeid = $DEFAULT_PORT unless length $nodeid;
280 root 1.1
281     my $idx;
282 root 1.21 for my $t (split /,/, $nodeid) {
283 root 1.1 my $pri = ++$idx;
284 root 1.7
285     $t = length $t ? _nodename . ":$t" : _nodename
286     if $t =~ /^\d*$/;
287 root 1.1
288 root 1.7 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
289     or Carp::croak "$t: unparsable transport descriptor";
290    
291     $cv->begin;
292     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
293     for (@_) {
294     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
295     push @res, [
296     $pri += 1e-5,
297     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
298     ];
299 root 1.1 }
300 root 1.7 $cv->end;
301     };
302 root 1.1 }
303    
304     $cv->end;
305    
306     $cv
307     }
308    
309 root 1.22 sub initialise_node(;$%) {
310 root 1.21 my ($profile) = @_;
311 root 1.1
312 root 1.21 $profile = _nodename
313     unless defined $profile;
314 root 1.6
315 root 1.21 $CONFIG = AnyEvent::MP::Config::find_profile $profile;
316 root 1.24
317     my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
318     $NODE = $node
319     unless $node eq "anon/";
320 root 1.6
321 root 1.21 $NODE{$NODE} = $NODE{""};
322     $NODE{$NODE}{id} = $NODE;
323 root 1.20
324 root 1.21 my $seeds = $CONFIG->{seeds};
325     my $binds = $CONFIG->{binds};
326 root 1.3
327 root 1.21 $binds ||= [$NODE];
328 root 1.1
329 root 1.21 $WARN->(8, "node $NODE starting up.");
330 root 1.1
331 root 1.23 $LISTENER = [];
332     %LISTENER = ();
333    
334 root 1.21 for (map _resolve $_, @$binds) {
335     for my $bind ($_->recv) {
336     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
337     or Carp::croak "$bind: unparsable local bind address";
338 root 1.20
339 root 1.21 $LISTENER{$bind} = AnyEvent::MP::Transport::mp_server $host, $port;
340     push @$LISTENER, $bind;
341     }
342 root 1.1 }
343    
344 root 1.21 # the global service is mandatory currently
345     require AnyEvent::MP::Global;
346 root 1.1
347 root 1.21 # connect to all seednodes
348     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
349 root 1.1
350 root 1.20 for (@{ $CONFIG->{services} }) {
351 root 1.13 if (s/::$//) {
352     eval "require $_";
353     die $@ if $@;
354     } else {
355     (load_func $_)->();
356     }
357     }
358 root 1.1 }
359    
360     #############################################################################
361 root 1.6 # node monitoring and info
362 root 1.3
363     sub _uniq_nodes {
364     my %node;
365    
366     @node{values %NODE} = values %NODE;
367    
368     values %node;
369     }
370    
371 root 1.20 sub _public_nodes {
372 root 1.21 &_uniq_nodes
373 root 1.20 }
374    
375 root 1.21 =item node_is_known $nodeid
376 root 1.13
377     Returns true iff the given node is currently known to the system.
378    
379     =cut
380    
381     sub node_is_known($) {
382     exists $NODE{$_[0]}
383     }
384    
385 root 1.21 =item node_is_up $nodeid
386 root 1.13
387     Returns true if the given node is "up", that is, the kernel thinks it has
388     a working connection to it.
389    
390     If the node is known but not currently connected, returns C<0>. If the
391     node is not known, returns C<undef>.
392    
393     =cut
394    
395     sub node_is_up($) {
396     ($NODE{$_[0]} or return)->{transport}
397     ? 1 : 0
398     }
399    
400 root 1.3 =item known_nodes
401    
402 root 1.21 Returns the node IDs of all public nodes connected to this node, including
403 root 1.18 itself.
404 root 1.3
405     =cut
406    
407     sub known_nodes {
408 root 1.21 map $_->{id}, _public_nodes
409 root 1.3 }
410    
411     =item up_nodes
412    
413 root 1.21 Return the node IDs of all public nodes that are currently connected
414 root 1.20 (excluding the node itself).
415 root 1.3
416     =cut
417    
418     sub up_nodes {
419 root 1.21 map $_->{id}, grep $_->{transport}, _public_nodes
420 root 1.3 }
421    
422 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
423 root 1.3
424     Registers a callback that is called each time a node goes up (connection
425     is established) or down (connection is lost).
426    
427     Node up messages can only be followed by node down messages for the same
428     node, and vice versa.
429    
430 root 1.13 The function returns an optional guard which can be used to de-register
431 root 1.3 the monitoring callback again.
432    
433     =cut
434    
435     our %MON_NODES;
436    
437     sub mon_nodes($) {
438     my ($cb) = @_;
439    
440     $MON_NODES{$cb+0} = $cb;
441    
442     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
443     }
444    
445     sub _inject_nodeevent($$;@) {
446 root 1.16 my ($node, $up, @reason) = @_;
447 root 1.3
448     for my $cb (values %MON_NODES) {
449 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
450 root 1.16 or $WARN->(1, $@);
451 root 1.3 }
452 root 1.16
453 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
454 root 1.3 }
455    
456     #############################################################################
457 root 1.1 # self node code
458    
459     our %node_req = (
460     # internal services
461    
462     # monitoring
463     mon0 => sub { # disable monitoring
464     my $portid = shift;
465     my $node = $SRCNODE;
466     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
467     },
468     mon1 => sub { # enable monitoring
469     my $portid = shift;
470     my $node = $SRCNODE;
471     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
472     $node->send (["", kil => $portid, @_]);
473     });
474     },
475     kil => sub {
476     my $cbs = delete $SRCNODE->{lmon}{+shift}
477     or return;
478    
479     $_->(@_) for @$cbs;
480     },
481    
482 root 1.18 # "public" services - not actually public
483 root 1.1
484     # relay message to another node / generic echo
485 root 1.15 snd => \&snd,
486     snd_multi => sub {
487 root 1.1 snd @$_ for @_
488     },
489    
490 root 1.4 # informational
491     info => sub {
492     snd @_, $NODE;
493     },
494     known_nodes => sub {
495     snd @_, known_nodes;
496     },
497     up_nodes => sub {
498     snd @_, up_nodes;
499     },
500    
501 root 1.1 # random garbage
502     eval => sub {
503     my @res = eval shift;
504     snd @_, "$@", @res if @_;
505     },
506     time => sub {
507     snd @_, AE::time;
508     },
509     devnull => sub {
510     #
511     },
512 root 1.15 "" => sub {
513     # empty messages are sent by monitoring
514     },
515 root 1.1 );
516    
517 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
518 root 1.1 $PORT{""} = sub {
519     my $tag = shift;
520     eval { &{ $node_req{$tag} ||= load_func $tag } };
521 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
522 root 1.1 };
523    
524     =back
525    
526     =head1 SEE ALSO
527    
528     L<AnyEvent::MP>.
529    
530     =head1 AUTHOR
531    
532     Marc Lehmann <schmorp@schmorp.de>
533     http://home.schmorp.de/
534    
535     =cut
536    
537     1
538