ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.23
Committed: Thu Aug 27 23:46:33 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.22: +3 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16     connected nodes etc. are needed.
17    
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.19 our $VERSION = '0.8';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.21 add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.10 NODE $NODE node_of snd kil
44 root 1.17 port_is_local
45 root 1.1 resolve_node initialise_node
46 root 1.13 known_nodes up_nodes mon_nodes node_is_known node_is_up
47 root 1.1 );
48    
49     our $DEFAULT_PORT = "4040";
50    
51     our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
52     our $NETWORK_LATENCY = 3; # activity timeout
53     our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
54    
55 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
56 root 1.1
57     This value is called with an error or warning message, when e.g. a connection
58     could not be created, authorisation failed and so on.
59    
60 root 1.16 C<$level> sould be C<0> for messages ot be logged always, C<1> for
61     unexpected messages and errors, C<2> for warnings, C<7> for messages about
62     node connectivity and services, C<8> for debugging messages and C<9> for
63     tracing messages.
64    
65 root 1.1 The default simply logs the message to STDERR.
66    
67     =cut
68    
69     our $WARN = sub {
70 root 1.16 my ($level, $msg) = @_;
71    
72 root 1.1 $msg =~ s/\n$//;
73 root 1.16
74     printf STDERR "%s <%d> %s\n",
75     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
76     $level,
77     $msg;
78 root 1.1 };
79    
80 root 1.6 sub load_func($) {
81     my $func = $_[0];
82    
83     unless (defined &$func) {
84     my $pkg = $func;
85     do {
86     $pkg =~ s/::[^:]+$//
87     or return sub { die "unable to resolve '$func'" };
88     eval "require $pkg";
89     } until defined &$func;
90     }
91    
92     \&$func
93     }
94    
95 root 1.1 sub nonce($) {
96     my $nonce;
97    
98     if (open my $fh, "</dev/urandom") {
99     sysread $fh, $nonce, $_[0];
100     } else {
101     # shit...
102     our $nonce_init;
103     unless ($nonce_init++) {
104     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
105     }
106    
107     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
108     }
109    
110     $nonce
111     }
112    
113 root 1.21 sub alnumbits($) {
114 root 1.1 my $data = $_[0];
115    
116     if (eval "use Math::GMP 2.05; 1") {
117     $data = Math::GMP::get_str_gmp (
118     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
119     62
120     );
121     } else {
122     $data = MIME::Base64::encode_base64 $data, "";
123     $data =~ s/=//;
124     $data =~ s/\//s/g;
125     $data =~ s/\+/p/g;
126     }
127    
128     $data
129     }
130    
131     sub gen_uniq {
132 root 1.21 alnumbits pack "wNa*", $$, time, nonce 2
133 root 1.1 }
134    
135 root 1.3 =item $AnyEvent::MP::Kernel::PUBLIC
136    
137 root 1.21 A boolean indicating whether this is a public node, which can create and
138     accept direct connections from other nodes.
139 root 1.3
140     =cut
141    
142 root 1.20 our $CONFIG; # this node's configuration
143 root 1.21
144     our $RUNIQ = alnumbits nonce 16;; # remote uniq value
145 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
146 root 1.21 our $NODE = "anon/$RUNIQ";
147 root 1.1 our $ID = "a";
148    
149     our %NODE; # node id to transport mapping, or "undef", for local node
150     our (%PORT, %PORT_DATA); # local ports
151    
152 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
153 root 1.1 our %LMON; # monitored _local_ ports
154    
155     our %LISTENER;
156 root 1.21 our $LISTENER; # our listeners, as arrayref
157 root 1.1
158     our $SRCNODE; # holds the sending node during _inject
159    
160     sub NODE() {
161     $NODE
162     }
163    
164     sub node_of($) {
165 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
166 root 1.1
167 root 1.21 $node
168 root 1.1 }
169    
170 root 1.17 BEGIN {
171     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
172     ? sub () { 1 }
173     : sub () { 0 };
174     }
175 root 1.1
176     sub _inject {
177 root 1.21 warn "RCV $SRCNODE->{id} -> @_\n" if TRACE && @_;#d#
178 root 1.1 &{ $PORT{+shift} or return };
179     }
180    
181 root 1.20 # this function adds a node-ref, so you can send stuff to it
182     # it is basically the central routing component.
183 root 1.1 sub add_node {
184 root 1.21 my ($node) = @_;
185 root 1.1
186 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
187 root 1.13 }
188    
189 root 1.1 sub snd(@) {
190 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
191 root 1.1
192 root 1.21 warn "SND $nodeid <- $portid @_\n" if TRACE;#d#
193 root 1.1
194 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
195 root 1.2 ->{send} (["$portid", @_]);
196 root 1.1 }
197    
198 root 1.17 =item $is_local = port_is_local $port
199    
200     Returns true iff the port is a local port.
201    
202     =cut
203    
204     sub port_is_local($) {
205 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
206 root 1.17
207 root 1.21 $NODE{$nodeid} == $NODE{""}
208 root 1.17 }
209    
210 root 1.18 =item snd_to_func $node, $func, @args
211 root 1.11
212 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
213 root 1.11 this function with the given arguments on that node.
214    
215 root 1.20 This function can be used to implement C<spawn>-like interfaces.
216 root 1.11
217     =cut
218    
219 root 1.18 sub snd_to_func($$;@) {
220 root 1.21 my $nodeid = shift;
221 root 1.11
222 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
223 root 1.11 ->send (["", @_]);
224     }
225    
226 root 1.18 =item snd_on $node, @msg
227    
228     Executes C<snd> with the given C<@msg> (which must include the destination
229     port) on the given node.
230    
231     =cut
232    
233     sub snd_on($@) {
234     my $node = shift;
235     snd $node, snd => @_;
236     }
237    
238     =item eval_on $node, $string
239    
240     Evaluates the given string as Perl expression on the given node.
241    
242     =cut
243    
244     sub eval_on($@) {
245     my $node = shift;
246     snd $node, eval => @_;
247     }
248    
249 root 1.1 sub kil(@) {
250 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
251 root 1.1
252     length $portid
253 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
254 root 1.1
255 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
256 root 1.1 ->kill ("$portid", @_);
257     }
258    
259 root 1.7 sub _nodename {
260     require POSIX;
261     (POSIX::uname ())[1]
262     }
263    
264 root 1.21 sub _resolve($) {
265     my ($nodeid) = @_;
266 root 1.1
267     my $cv = AE::cv;
268     my @res;
269    
270     $cv->begin (sub {
271     my %seen;
272     my @refs;
273     for (sort { $a->[0] <=> $b->[0] } @res) {
274     push @refs, $_->[1] unless $seen{$_->[1]}++
275     }
276 root 1.21 shift->send (@refs);
277 root 1.1 });
278    
279 root 1.21 $nodeid = $DEFAULT_PORT unless length $nodeid;
280 root 1.1
281     my $idx;
282 root 1.21 for my $t (split /,/, $nodeid) {
283 root 1.1 my $pri = ++$idx;
284 root 1.7
285     $t = length $t ? _nodename . ":$t" : _nodename
286     if $t =~ /^\d*$/;
287 root 1.1
288 root 1.7 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
289     or Carp::croak "$t: unparsable transport descriptor";
290    
291     $cv->begin;
292     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
293     for (@_) {
294     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
295     push @res, [
296     $pri += 1e-5,
297     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
298     ];
299 root 1.1 }
300 root 1.7 $cv->end;
301     };
302 root 1.1 }
303    
304     $cv->end;
305    
306     $cv
307     }
308    
309 root 1.22 sub initialise_node(;$%) {
310 root 1.21 my ($profile) = @_;
311 root 1.1
312 root 1.21 $profile = _nodename
313     unless defined $profile;
314 root 1.6
315 root 1.21 $CONFIG = AnyEvent::MP::Config::find_profile $profile;
316     $NODE = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid}
317     : $profile eq "anon/" ? $NODE
318     : $profile;
319 root 1.6
320 root 1.21 $NODE{$NODE} = $NODE{""};
321     $NODE{$NODE}{id} = $NODE;
322 root 1.20
323 root 1.21 my $seeds = $CONFIG->{seeds};
324     my $binds = $CONFIG->{binds};
325 root 1.3
326 root 1.21 $binds ||= [$NODE];
327 root 1.1
328 root 1.21 $WARN->(8, "node $NODE starting up.");
329 root 1.1
330 root 1.23 $LISTENER = [];
331     %LISTENER = ();
332    
333 root 1.21 for (map _resolve $_, @$binds) {
334     for my $bind ($_->recv) {
335     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
336     or Carp::croak "$bind: unparsable local bind address";
337 root 1.20
338 root 1.21 $LISTENER{$bind} = AnyEvent::MP::Transport::mp_server $host, $port;
339     push @$LISTENER, $bind;
340     }
341 root 1.1 }
342    
343 root 1.21 # the global service is mandatory currently
344     require AnyEvent::MP::Global;
345 root 1.1
346 root 1.21 # connect to all seednodes
347     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
348 root 1.1
349 root 1.20 for (@{ $CONFIG->{services} }) {
350 root 1.13 if (s/::$//) {
351     eval "require $_";
352     die $@ if $@;
353     } else {
354     (load_func $_)->();
355     }
356     }
357 root 1.1 }
358    
359     #############################################################################
360 root 1.6 # node monitoring and info
361 root 1.3
362     sub _uniq_nodes {
363     my %node;
364    
365     @node{values %NODE} = values %NODE;
366    
367     values %node;
368     }
369    
370 root 1.20 sub _public_nodes {
371 root 1.21 &_uniq_nodes
372 root 1.20 }
373    
374 root 1.21 =item node_is_known $nodeid
375 root 1.13
376     Returns true iff the given node is currently known to the system.
377    
378     =cut
379    
380     sub node_is_known($) {
381     exists $NODE{$_[0]}
382     }
383    
384 root 1.21 =item node_is_up $nodeid
385 root 1.13
386     Returns true if the given node is "up", that is, the kernel thinks it has
387     a working connection to it.
388    
389     If the node is known but not currently connected, returns C<0>. If the
390     node is not known, returns C<undef>.
391    
392     =cut
393    
394     sub node_is_up($) {
395     ($NODE{$_[0]} or return)->{transport}
396     ? 1 : 0
397     }
398    
399 root 1.3 =item known_nodes
400    
401 root 1.21 Returns the node IDs of all public nodes connected to this node, including
402 root 1.18 itself.
403 root 1.3
404     =cut
405    
406     sub known_nodes {
407 root 1.21 map $_->{id}, _public_nodes
408 root 1.3 }
409    
410     =item up_nodes
411    
412 root 1.21 Return the node IDs of all public nodes that are currently connected
413 root 1.20 (excluding the node itself).
414 root 1.3
415     =cut
416    
417     sub up_nodes {
418 root 1.21 map $_->{id}, grep $_->{transport}, _public_nodes
419 root 1.3 }
420    
421 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
422 root 1.3
423     Registers a callback that is called each time a node goes up (connection
424     is established) or down (connection is lost).
425    
426     Node up messages can only be followed by node down messages for the same
427     node, and vice versa.
428    
429 root 1.13 The function returns an optional guard which can be used to de-register
430 root 1.3 the monitoring callback again.
431    
432     =cut
433    
434     our %MON_NODES;
435    
436     sub mon_nodes($) {
437     my ($cb) = @_;
438    
439     $MON_NODES{$cb+0} = $cb;
440    
441     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
442     }
443    
444     sub _inject_nodeevent($$;@) {
445 root 1.16 my ($node, $up, @reason) = @_;
446 root 1.3
447     for my $cb (values %MON_NODES) {
448 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
449 root 1.16 or $WARN->(1, $@);
450 root 1.3 }
451 root 1.16
452 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
453 root 1.3 }
454    
455     #############################################################################
456 root 1.1 # self node code
457    
458     our %node_req = (
459     # internal services
460    
461     # monitoring
462     mon0 => sub { # disable monitoring
463     my $portid = shift;
464     my $node = $SRCNODE;
465     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
466     },
467     mon1 => sub { # enable monitoring
468     my $portid = shift;
469     my $node = $SRCNODE;
470     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
471     $node->send (["", kil => $portid, @_]);
472     });
473     },
474     kil => sub {
475     my $cbs = delete $SRCNODE->{lmon}{+shift}
476     or return;
477    
478     $_->(@_) for @$cbs;
479     },
480    
481 root 1.18 # "public" services - not actually public
482 root 1.1
483     # relay message to another node / generic echo
484 root 1.15 snd => \&snd,
485     snd_multi => sub {
486 root 1.1 snd @$_ for @_
487     },
488    
489 root 1.4 # informational
490     info => sub {
491     snd @_, $NODE;
492     },
493     known_nodes => sub {
494     snd @_, known_nodes;
495     },
496     up_nodes => sub {
497     snd @_, up_nodes;
498     },
499    
500 root 1.1 # random garbage
501     eval => sub {
502     my @res = eval shift;
503     snd @_, "$@", @res if @_;
504     },
505     time => sub {
506     snd @_, AE::time;
507     },
508     devnull => sub {
509     #
510     },
511 root 1.15 "" => sub {
512     # empty messages are sent by monitoring
513     },
514 root 1.1 );
515    
516 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
517 root 1.1 $PORT{""} = sub {
518     my $tag = shift;
519     eval { &{ $node_req{$tag} ||= load_func $tag } };
520 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
521 root 1.1 };
522    
523     =back
524    
525     =head1 SEE ALSO
526    
527     L<AnyEvent::MP>.
528    
529     =head1 AUTHOR
530    
531     Marc Lehmann <schmorp@schmorp.de>
532     http://home.schmorp.de/
533    
534     =cut
535    
536     1
537