ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.20
Committed: Thu Aug 27 07:12:48 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.19: +68 -97 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16     connected nodes etc. are needed.
17    
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.19 our $VERSION = '0.8';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.18 connect_node add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.10 NODE $NODE node_of snd kil
44 root 1.17 port_is_local
45 root 1.1 resolve_node initialise_node
46 root 1.13 known_nodes up_nodes mon_nodes node_is_known node_is_up
47 root 1.1 );
48    
49     our $DEFAULT_PORT = "4040";
50    
51     our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
52     our $NETWORK_LATENCY = 3; # activity timeout
53     our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
54    
55 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
56 root 1.1
57     This value is called with an error or warning message, when e.g. a connection
58     could not be created, authorisation failed and so on.
59    
60 root 1.16 C<$level> sould be C<0> for messages ot be logged always, C<1> for
61     unexpected messages and errors, C<2> for warnings, C<7> for messages about
62     node connectivity and services, C<8> for debugging messages and C<9> for
63     tracing messages.
64    
65 root 1.1 The default simply logs the message to STDERR.
66    
67     =cut
68    
69     our $WARN = sub {
70 root 1.16 my ($level, $msg) = @_;
71    
72 root 1.1 $msg =~ s/\n$//;
73 root 1.16
74     printf STDERR "%s <%d> %s\n",
75     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
76     $level,
77     $msg;
78 root 1.1 };
79    
80 root 1.6 sub load_func($) {
81     my $func = $_[0];
82    
83     unless (defined &$func) {
84     my $pkg = $func;
85     do {
86     $pkg =~ s/::[^:]+$//
87     or return sub { die "unable to resolve '$func'" };
88     eval "require $pkg";
89     } until defined &$func;
90     }
91    
92     \&$func
93     }
94    
95 root 1.1 sub nonce($) {
96     my $nonce;
97    
98     if (open my $fh, "</dev/urandom") {
99     sysread $fh, $nonce, $_[0];
100     } else {
101     # shit...
102     our $nonce_init;
103     unless ($nonce_init++) {
104     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
105     }
106    
107     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
108     }
109    
110     $nonce
111     }
112    
113     sub asciibits($) {
114     my $data = $_[0];
115    
116     if (eval "use Math::GMP 2.05; 1") {
117     $data = Math::GMP::get_str_gmp (
118     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
119     62
120     );
121     } else {
122     $data = MIME::Base64::encode_base64 $data, "";
123     $data =~ s/=//;
124     $data =~ s/\//s/g;
125     $data =~ s/\+/p/g;
126     }
127    
128     $data
129     }
130    
131     sub gen_uniq {
132     asciibits pack "wNa*", $$, time, nonce 2
133     }
134    
135 root 1.3 =item $AnyEvent::MP::Kernel::PUBLIC
136    
137     A boolean indicating whether this is a full/public node, which can create
138     and accept direct connections form othe rnodes.
139    
140     =item $AnyEvent::MP::Kernel::SLAVE
141    
142     A boolean indicating whether this node is a slave node, i.e. does most of it's
143     message sending/receiving through some master node.
144    
145     =item $AnyEvent::MP::Kernel::MASTER
146    
147 root 1.20 Defined only in slave mode, in which case it contains the noderef of the
148 root 1.3 master node.
149    
150     =cut
151    
152 root 1.20 our $CONFIG; # this node's configuration
153 root 1.1 our $PUBLIC = 0;
154 root 1.20 our $SLAVE = "";
155 root 1.1 our $MASTER; # master noderef when $SLAVE
156    
157     our $NODE = asciibits nonce 16;
158 root 1.20 our $NODEID = $NODE; # same as NODE, except slave nodes have no @master part
159 root 1.1 our $RUNIQ = $NODE; # remote uniq value
160     our $UNIQ = gen_uniq; # per-process/node unique cookie
161     our $ID = "a";
162    
163     our %NODE; # node id to transport mapping, or "undef", for local node
164     our (%PORT, %PORT_DATA); # local ports
165    
166     our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
167     our %LMON; # monitored _local_ ports
168    
169     our %LISTENER;
170    
171     our $SRCNODE; # holds the sending node during _inject
172    
173     sub NODE() {
174     $NODE
175     }
176    
177     sub node_of($) {
178     my ($noderef, undef) = split /#/, $_[0], 2;
179    
180     $noderef
181     }
182    
183 root 1.17 BEGIN {
184     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
185     ? sub () { 1 }
186     : sub () { 0 };
187     }
188 root 1.1
189     sub _inject {
190 root 1.18 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE && @_;#d#
191 root 1.1 &{ $PORT{+shift} or return };
192     }
193    
194 root 1.20 # this function adds a node-ref, so you can send stuff to it
195     # it is basically the central routing component.
196 root 1.1 sub add_node {
197     my ($noderef) = @_;
198    
199 root 1.20 $NODE{$noderef} ||= do {
200     # new node, check validity
201     my $node;
202    
203     if ($noderef =~ /^slave\/.+$/) {
204     # slave node without routing part -> direct connection
205     # only really valid from transports
206     $node = new AnyEvent::MP::Node::Direct $noderef;
207 root 1.1
208 root 1.20 } else {
209     # direct node (or slave node without routing part)
210 root 1.1
211 root 1.20 for (split /,/, $noderef) {
212     my ($host, $port) = AnyEvent::Socket::parse_hostport $_
213     or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
214 root 1.1
215 root 1.20 $port > 0
216     or Carp::croak "$noderef: not a resolved node reference ('$_' contains invalid port)";
217 root 1.1
218 root 1.20 AnyEvent::Socket::parse_address $host
219     or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
220     }
221 root 1.1
222 root 1.20 $node = new AnyEvent::MP::Node::Direct $noderef;
223 root 1.1 }
224    
225 root 1.20 $node
226 root 1.1 }
227     }
228    
229 root 1.13 sub connect_node {
230     &add_node->connect;
231     }
232    
233 root 1.1 sub snd(@) {
234     my ($noderef, $portid) = split /#/, shift, 2;
235    
236     warn "SND $noderef <- $portid @_\n" if TRACE;#d#
237    
238     ($NODE{$noderef} || add_node $noderef)
239 root 1.2 ->{send} (["$portid", @_]);
240 root 1.1 }
241    
242 root 1.17 =item $is_local = port_is_local $port
243    
244     Returns true iff the port is a local port.
245    
246     =cut
247    
248     sub port_is_local($) {
249     my ($noderef, undef) = split /#/, $_[0], 2;
250    
251     $NODE{$noderef} == $NODE{""}
252     }
253    
254 root 1.18 =item snd_to_func $node, $func, @args
255 root 1.11
256     Expects a noderef and a name of a function. Asynchronously tries to call
257     this function with the given arguments on that node.
258    
259 root 1.20 This function can be used to implement C<spawn>-like interfaces.
260 root 1.11
261     =cut
262    
263 root 1.18 sub snd_to_func($$;@) {
264 root 1.11 my $noderef = shift;
265    
266     ($NODE{$noderef} || add_node $noderef)
267     ->send (["", @_]);
268     }
269    
270 root 1.18 =item snd_on $node, @msg
271    
272     Executes C<snd> with the given C<@msg> (which must include the destination
273     port) on the given node.
274    
275     =cut
276    
277     sub snd_on($@) {
278     my $node = shift;
279     snd $node, snd => @_;
280     }
281    
282     =item eval_on $node, $string
283    
284     Evaluates the given string as Perl expression on the given node.
285    
286     =cut
287    
288     sub eval_on($@) {
289     my $node = shift;
290     snd $node, eval => @_;
291     }
292    
293 root 1.1 sub kil(@) {
294     my ($noderef, $portid) = split /#/, shift, 2;
295    
296     length $portid
297     or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
298    
299     ($NODE{$noderef} || add_node $noderef)
300     ->kill ("$portid", @_);
301     }
302    
303 root 1.7 sub _nodename {
304     require POSIX;
305     (POSIX::uname ())[1]
306     }
307    
308 root 1.1 sub resolve_node($) {
309     my ($noderef) = @_;
310    
311     my $cv = AE::cv;
312     my @res;
313    
314     $cv->begin (sub {
315     my %seen;
316     my @refs;
317     for (sort { $a->[0] <=> $b->[0] } @res) {
318     push @refs, $_->[1] unless $seen{$_->[1]}++
319     }
320     shift->send (join ",", @refs);
321     });
322    
323     $noderef = $DEFAULT_PORT unless length $noderef;
324    
325     my $idx;
326     for my $t (split /,/, $noderef) {
327     my $pri = ++$idx;
328 root 1.7
329     $t = length $t ? _nodename . ":$t" : _nodename
330     if $t =~ /^\d*$/;
331 root 1.1
332 root 1.7 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
333     or Carp::croak "$t: unparsable transport descriptor";
334    
335     $cv->begin;
336     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
337     for (@_) {
338     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
339     push @res, [
340     $pri += 1e-5,
341     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
342     ];
343 root 1.1 }
344 root 1.7 $cv->end;
345     };
346 root 1.1 }
347    
348     $cv->end;
349    
350     $cv
351     }
352    
353     sub initialise_node(@) {
354     my ($noderef, @others) = @_;
355    
356 root 1.20 $CONFIG = AnyEvent::MP::Config::find_profile
357     +(defined $noderef ? $noderef : _nodename);
358 root 1.6
359 root 1.20 $noderef = $CONFIG->{noderef}
360     if exists $CONFIG->{noderef};
361 root 1.6
362 root 1.20 push @others, @{ $CONFIG->{seeds} };
363    
364     @others = map $_->recv, map +(resolve_node $_), @others;
365 root 1.3
366 root 1.1 if ($noderef =~ /^slave\/(.*)$/) {
367     my $name = $1;
368     $name = $NODE unless length $name;
369    
370     @others
371     or Carp::croak "seed nodes must be specified for slave nodes";
372    
373 root 1.20 $SLAVE = 1;
374     $NODE = "slave/$name";
375    
376 root 1.1 } else {
377     $PUBLIC = 1;
378 root 1.20 $NODE = (resolve_node $noderef)->recv;
379 root 1.1 }
380    
381 root 1.20 $NODE{$NODE} = $NODE{""};
382     $NODE{$NODE}{noderef} = $NODE;
383 root 1.1
384 root 1.20 unless ($SLAVE) {
385     for my $t (split /,/, $NODE) {
386     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
387    
388     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
389     sub {
390     my ($tp) = @_;
391    
392     # TODO: urgs
393     my $node = add_node $tp->{remote_node};
394     $node->{trial}{accept} = $tp;
395     },
396     ;
397     }
398 root 1.1 }
399    
400 root 1.14 for (@others) {
401     my $node = add_node $_;
402     $node->{autoconnect} = 1;
403     $node->connect;
404     }
405 root 1.1
406 root 1.20 for (@{ $CONFIG->{services} }) {
407 root 1.13 if (s/::$//) {
408     eval "require $_";
409     die $@ if $@;
410     } else {
411     (load_func $_)->();
412     }
413     }
414 root 1.20
415     # slave nodes need global
416     require AnyEvent::MP::Global
417     if $SLAVE;
418 root 1.1 }
419    
420     #############################################################################
421 root 1.6 # node monitoring and info
422 root 1.3
423     sub _uniq_nodes {
424     my %node;
425    
426     @node{values %NODE} = values %NODE;
427    
428     values %node;
429     }
430    
431 root 1.20 sub _public_nodes {
432     grep $_->{noderef} !~ /^slave\//, _uniq_nodes
433     }
434    
435 root 1.13 =item node_is_known $noderef
436    
437     Returns true iff the given node is currently known to the system.
438    
439     =cut
440    
441     sub node_is_known($) {
442     exists $NODE{$_[0]}
443     }
444    
445     =item node_is_up $noderef
446    
447     Returns true if the given node is "up", that is, the kernel thinks it has
448     a working connection to it.
449    
450     If the node is known but not currently connected, returns C<0>. If the
451     node is not known, returns C<undef>.
452    
453     =cut
454    
455     sub node_is_up($) {
456     ($NODE{$_[0]} or return)->{transport}
457     ? 1 : 0
458     }
459    
460 root 1.3 =item known_nodes
461    
462 root 1.20 Returns the noderefs of all public nodes connected to this node, including
463 root 1.18 itself.
464 root 1.3
465     =cut
466    
467     sub known_nodes {
468 root 1.20 map $_->{noderef}, _public_nodes
469 root 1.3 }
470    
471     =item up_nodes
472    
473 root 1.20 Return the noderefs of all public nodes that are currently connected
474     (excluding the node itself).
475 root 1.3
476     =cut
477    
478     sub up_nodes {
479 root 1.20 map $_->{noderef}, grep $_->{transport}, _public_nodes
480 root 1.3 }
481    
482     =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
483    
484     Registers a callback that is called each time a node goes up (connection
485     is established) or down (connection is lost).
486    
487     Node up messages can only be followed by node down messages for the same
488     node, and vice versa.
489    
490 root 1.13 The function returns an optional guard which can be used to de-register
491 root 1.3 the monitoring callback again.
492    
493     =cut
494    
495     our %MON_NODES;
496    
497     sub mon_nodes($) {
498     my ($cb) = @_;
499    
500     $MON_NODES{$cb+0} = $cb;
501    
502     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
503     }
504    
505     sub _inject_nodeevent($$;@) {
506 root 1.16 my ($node, $up, @reason) = @_;
507 root 1.3
508     for my $cb (values %MON_NODES) {
509 root 1.16 eval { $cb->($node->{noderef}, $up, @reason); 1 }
510     or $WARN->(1, $@);
511 root 1.3 }
512 root 1.16
513     $WARN->(7, "$node->{noderef} is " . ($up ? "up" : "down") . " (@reason)");
514 root 1.3 }
515    
516     #############################################################################
517 root 1.1 # self node code
518    
519     our %node_req = (
520     # internal services
521    
522     # monitoring
523     mon0 => sub { # disable monitoring
524     my $portid = shift;
525     my $node = $SRCNODE;
526     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
527     },
528     mon1 => sub { # enable monitoring
529     my $portid = shift;
530     my $node = $SRCNODE;
531     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
532     $node->send (["", kil => $portid, @_]);
533     });
534     },
535     kil => sub {
536     my $cbs = delete $SRCNODE->{lmon}{+shift}
537     or return;
538    
539     $_->(@_) for @$cbs;
540     },
541    
542 root 1.18 # "public" services - not actually public
543 root 1.1
544     # relay message to another node / generic echo
545 root 1.15 snd => \&snd,
546     snd_multi => sub {
547 root 1.1 snd @$_ for @_
548     },
549    
550 root 1.4 # informational
551     info => sub {
552     snd @_, $NODE;
553     },
554     known_nodes => sub {
555     snd @_, known_nodes;
556     },
557     up_nodes => sub {
558     snd @_, up_nodes;
559     },
560    
561 root 1.1 # random garbage
562     eval => sub {
563     my @res = eval shift;
564     snd @_, "$@", @res if @_;
565     },
566     time => sub {
567     snd @_, AE::time;
568     },
569     devnull => sub {
570     #
571     },
572 root 1.15 "" => sub {
573     # empty messages are sent by monitoring
574     },
575 root 1.1 );
576    
577 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
578 root 1.1 $PORT{""} = sub {
579     my $tag = shift;
580     eval { &{ $node_req{$tag} ||= load_func $tag } };
581 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
582 root 1.1 };
583    
584     =back
585    
586     =head1 SEE ALSO
587    
588     L<AnyEvent::MP>.
589    
590     =head1 AUTHOR
591    
592     Marc Lehmann <schmorp@schmorp.de>
593     http://home.schmorp.de/
594    
595     =cut
596    
597     1
598