ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.13
Committed: Sat Aug 15 02:36:03 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.12: +47 -8 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16     connected nodes etc. are needed.
17    
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27     use Carp ();
28     use MIME::Base64 ();
29    
30     use AE ();
31    
32     use AnyEvent::MP::Node;
33     use AnyEvent::MP::Transport;
34    
35     use base "Exporter";
36    
37 root 1.12 our $VERSION = '0.7';
38 root 1.1 our @EXPORT = qw(
39 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40     connect_node add_node load_func snd_to_func
41 root 1.1
42 root 1.10 NODE $NODE node_of snd kil
43 root 1.1 resolve_node initialise_node
44 root 1.13 known_nodes up_nodes mon_nodes node_is_known node_is_up
45 root 1.1 );
46    
47     our $DEFAULT_PORT = "4040";
48    
49     our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
50     our $NETWORK_LATENCY = 3; # activity timeout
51     our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
52    
53     =item $AnyEvent::MP::Kernel::WARN
54    
55     This value is called with an error or warning message, when e.g. a connection
56     could not be created, authorisation failed and so on.
57    
58     The default simply logs the message to STDERR.
59    
60     =cut
61    
62     our $WARN = sub {
63     my $msg = $_[0];
64     $msg =~ s/\n$//;
65     warn "$msg\n";
66     };
67    
68 root 1.6 sub load_func($) {
69     my $func = $_[0];
70    
71     unless (defined &$func) {
72     my $pkg = $func;
73     do {
74     $pkg =~ s/::[^:]+$//
75     or return sub { die "unable to resolve '$func'" };
76     eval "require $pkg";
77     } until defined &$func;
78     }
79    
80     \&$func
81     }
82    
83 root 1.1 sub nonce($) {
84     my $nonce;
85    
86     if (open my $fh, "</dev/urandom") {
87     sysread $fh, $nonce, $_[0];
88     } else {
89     # shit...
90     our $nonce_init;
91     unless ($nonce_init++) {
92     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
93     }
94    
95     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
96     }
97    
98     $nonce
99     }
100    
101     sub asciibits($) {
102     my $data = $_[0];
103    
104     if (eval "use Math::GMP 2.05; 1") {
105     $data = Math::GMP::get_str_gmp (
106     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
107     62
108     );
109     } else {
110     $data = MIME::Base64::encode_base64 $data, "";
111     $data =~ s/=//;
112     $data =~ s/\//s/g;
113     $data =~ s/\+/p/g;
114     }
115    
116     $data
117     }
118    
119     sub gen_uniq {
120     asciibits pack "wNa*", $$, time, nonce 2
121     }
122    
123 root 1.3 =item $AnyEvent::MP::Kernel::PUBLIC
124    
125     A boolean indicating whether this is a full/public node, which can create
126     and accept direct connections form othe rnodes.
127    
128     =item $AnyEvent::MP::Kernel::SLAVE
129    
130     A boolean indicating whether this node is a slave node, i.e. does most of it's
131     message sending/receiving through some master node.
132    
133     =item $AnyEvent::MP::Kernel::MASTER
134    
135     Defined only in slave mode, in which cas eit contains the noderef of the
136     master node.
137    
138     =cut
139    
140 root 1.1 our $PUBLIC = 0;
141     our $SLAVE = 0;
142     our $MASTER; # master noderef when $SLAVE
143    
144     our $NODE = asciibits nonce 16;
145     our $RUNIQ = $NODE; # remote uniq value
146     our $UNIQ = gen_uniq; # per-process/node unique cookie
147     our $ID = "a";
148    
149     our %NODE; # node id to transport mapping, or "undef", for local node
150     our (%PORT, %PORT_DATA); # local ports
151    
152     our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
153     our %LMON; # monitored _local_ ports
154    
155     our %LISTENER;
156    
157     our $SRCNODE; # holds the sending node during _inject
158    
159     sub NODE() {
160     $NODE
161     }
162    
163     sub node_of($) {
164     my ($noderef, undef) = split /#/, $_[0], 2;
165    
166     $noderef
167     }
168    
169     sub TRACE() { 0 }
170    
171     sub _inject {
172     warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
173     &{ $PORT{+shift} or return };
174     }
175    
176     sub add_node {
177     my ($noderef) = @_;
178    
179     return $NODE{$noderef}
180     if exists $NODE{$noderef};
181    
182     for (split /,/, $noderef) {
183     return $NODE{$noderef} = $NODE{$_}
184     if exists $NODE{$_};
185     }
186    
187     # new node, check validity
188     my $node;
189    
190     if ($noderef =~ /^slave\/.+$/) {
191     $node = new AnyEvent::MP::Node::Indirect $noderef;
192    
193     } else {
194     for (split /,/, $noderef) {
195     my ($host, $port) = AnyEvent::Socket::parse_hostport $_
196     or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
197    
198     $port > 0
199     or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
200    
201     AnyEvent::Socket::parse_address $host
202     or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
203     }
204    
205     $node = new AnyEvent::MP::Node::Direct $noderef;
206     }
207    
208     $NODE{$_} = $node
209     for $noderef, split /,/, $noderef;
210    
211     $node
212     }
213    
214 root 1.13 sub connect_node {
215     &add_node->connect;
216     }
217    
218 root 1.1 sub snd(@) {
219     my ($noderef, $portid) = split /#/, shift, 2;
220    
221     warn "SND $noderef <- $portid @_\n" if TRACE;#d#
222    
223     ($NODE{$noderef} || add_node $noderef)
224 root 1.2 ->{send} (["$portid", @_]);
225 root 1.1 }
226    
227 root 1.11 =item snd_to_func $noderef, $func, @args
228    
229     Expects a noderef and a name of a function. Asynchronously tries to call
230     this function with the given arguments on that node.
231    
232     This fucntion can be used to implement C<spawn>-like interfaces.
233    
234     =cut
235    
236     sub snd_to_func {
237     my $noderef = shift;
238    
239     ($NODE{$noderef} || add_node $noderef)
240     ->send (["", @_]);
241     }
242    
243 root 1.1 sub kil(@) {
244     my ($noderef, $portid) = split /#/, shift, 2;
245    
246     length $portid
247     or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
248    
249     ($NODE{$noderef} || add_node $noderef)
250     ->kill ("$portid", @_);
251     }
252    
253 root 1.7 sub _nodename {
254     require POSIX;
255     (POSIX::uname ())[1]
256     }
257    
258 root 1.1 sub resolve_node($) {
259     my ($noderef) = @_;
260    
261     my $cv = AE::cv;
262     my @res;
263    
264     $cv->begin (sub {
265     my %seen;
266     my @refs;
267     for (sort { $a->[0] <=> $b->[0] } @res) {
268     push @refs, $_->[1] unless $seen{$_->[1]}++
269     }
270     shift->send (join ",", @refs);
271     });
272    
273     $noderef = $DEFAULT_PORT unless length $noderef;
274    
275     my $idx;
276     for my $t (split /,/, $noderef) {
277     my $pri = ++$idx;
278 root 1.7
279     $t = length $t ? _nodename . ":$t" : _nodename
280     if $t =~ /^\d*$/;
281 root 1.1
282 root 1.7 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
283     or Carp::croak "$t: unparsable transport descriptor";
284    
285     $cv->begin;
286     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
287     for (@_) {
288     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
289     push @res, [
290     $pri += 1e-5,
291     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
292     ];
293 root 1.1 }
294 root 1.7 $cv->end;
295     };
296 root 1.1 }
297    
298     $cv->end;
299    
300     $cv
301     }
302    
303     sub initialise_node(@) {
304     my ($noderef, @others) = @_;
305    
306 root 1.7 my $profile = AnyEvent::MP::Config::find_profile
307     +(defined $noderef ? $noderef : _nodename);
308 root 1.6
309     $noderef = $profile->{noderef}
310 root 1.7 if exists $profile->{noderef};
311 root 1.6
312 root 1.9 push @others, @{ $profile->{seeds} };
313 root 1.3
314 root 1.1 if ($noderef =~ /^slave\/(.*)$/) {
315     $SLAVE = AE::cv;
316     my $name = $1;
317     $name = $NODE unless length $name;
318     $noderef = AE::cv;
319     $noderef->send ("slave/$name");
320    
321     @others
322     or Carp::croak "seed nodes must be specified for slave nodes";
323    
324     } else {
325     $PUBLIC = 1;
326     $noderef = resolve_node $noderef;
327     }
328    
329     @others = map $_->recv, map +(resolve_node $_), @others;
330    
331     $NODE = $noderef->recv;
332    
333     for my $t (split /,/, $NODE) {
334     $NODE{$t} = $NODE{""};
335    
336     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
337    
338     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
339     sub {
340     my ($tp) = @_;
341    
342     # TODO: urgs
343     my $node = add_node $tp->{remote_node};
344     $node->{trial}{accept} = $tp;
345     },
346     ;
347     }
348    
349 root 1.13 connect_node $_ for @others;
350 root 1.1
351     if ($SLAVE) {
352     my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
353 root 1.13 my $master = $SLAVE->recv;
354     $master
355 root 1.1 or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
356    
357 root 1.13 $MASTER = $master->{noderef};
358     $master->{autoconnect} = 1;
359    
360 root 1.1 (my $via = $MASTER) =~ s/,/!/g;
361    
362     $NODE .= "\@$via";
363     $NODE{$NODE} = $NODE{""};
364    
365 root 1.2 $_->send (["", iam => $NODE])
366 root 1.1 for values %NODE;
367    
368     $SLAVE = 1;
369     }
370 root 1.6
371 root 1.13 for (@{ $profile->{services} }) {
372     if (s/::$//) {
373     eval "require $_";
374     die $@ if $@;
375     } else {
376     (load_func $_)->();
377     }
378     }
379 root 1.1 }
380    
381     #############################################################################
382 root 1.6 # node monitoring and info
383 root 1.3
384     sub _uniq_nodes {
385     my %node;
386    
387     @node{values %NODE} = values %NODE;
388    
389     values %node;
390     }
391    
392 root 1.13 =item node_is_known $noderef
393    
394     Returns true iff the given node is currently known to the system.
395    
396     =cut
397    
398     sub node_is_known($) {
399     exists $NODE{$_[0]}
400     }
401    
402     =item node_is_up $noderef
403    
404     Returns true if the given node is "up", that is, the kernel thinks it has
405     a working connection to it.
406    
407     If the node is known but not currently connected, returns C<0>. If the
408     node is not known, returns C<undef>.
409    
410     =cut
411    
412     sub node_is_up($) {
413     ($NODE{$_[0]} or return)->{transport}
414     ? 1 : 0
415     }
416    
417 root 1.3 =item known_nodes
418    
419     Returns the noderefs of all nodes connected to this node.
420    
421     =cut
422    
423     sub known_nodes {
424     map $_->{noderef}, _uniq_nodes
425     }
426    
427     =item up_nodes
428    
429     Return the noderefs of all nodes that are currently connected (excluding
430     the node itself).
431    
432     =cut
433    
434     sub up_nodes {
435     map $_->{noderef}, grep $_->{transport}, _uniq_nodes
436     }
437    
438     =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
439    
440     Registers a callback that is called each time a node goes up (connection
441     is established) or down (connection is lost).
442    
443     Node up messages can only be followed by node down messages for the same
444     node, and vice versa.
445    
446 root 1.13 The function returns an optional guard which can be used to de-register
447 root 1.3 the monitoring callback again.
448    
449     =cut
450    
451     our %MON_NODES;
452    
453     sub mon_nodes($) {
454     my ($cb) = @_;
455    
456     $MON_NODES{$cb+0} = $cb;
457    
458     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
459     }
460    
461     sub _inject_nodeevent($$;@) {
462     my ($node, @args) = @_;
463    
464     unshift @args, $node->{noderef};
465    
466     for my $cb (values %MON_NODES) {
467     eval { $cb->(@args); 1 }
468     or $WARN->($@);
469     }
470     }
471    
472     #############################################################################
473 root 1.1 # self node code
474    
475     our %node_req = (
476     # internal services
477    
478     # monitoring
479     mon0 => sub { # disable monitoring
480     my $portid = shift;
481     my $node = $SRCNODE;
482     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
483     },
484     mon1 => sub { # enable monitoring
485     my $portid = shift;
486     my $node = $SRCNODE;
487     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
488     $node->send (["", kil => $portid, @_]);
489     });
490     },
491     kil => sub {
492     my $cbs = delete $SRCNODE->{lmon}{+shift}
493     or return;
494    
495     $_->(@_) for @$cbs;
496     },
497     # node changed its name (for slave nodes)
498     iam => sub {
499 root 1.8 # get rid of bogus slave/xxx name, hopefully
500     delete $NODE{$SRCNODE->{noderef}};
501    
502     # change noderef
503 root 1.1 $SRCNODE->{noderef} = $_[0];
504 root 1.8
505     # anchor
506 root 1.1 $NODE{$_[0]} = $SRCNODE;
507     },
508    
509     # public services
510    
511     # relay message to another node / generic echo
512     relay => sub {
513     &snd;
514     },
515     relay_multiple => sub {
516     snd @$_ for @_
517     },
518    
519 root 1.4 # informational
520     info => sub {
521     snd @_, $NODE;
522     },
523     known_nodes => sub {
524     snd @_, known_nodes;
525     },
526     up_nodes => sub {
527     snd @_, up_nodes;
528     },
529    
530 root 1.1 # random garbage
531     eval => sub {
532     my @res = eval shift;
533     snd @_, "$@", @res if @_;
534     },
535     time => sub {
536     snd @_, AE::time;
537     },
538     devnull => sub {
539     #
540     },
541     );
542    
543     $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
544     $PORT{""} = sub {
545     my $tag = shift;
546     eval { &{ $node_req{$tag} ||= load_func $tag } };
547     $WARN->("error processing node message: $@") if $@;
548     };
549    
550     =back
551    
552     =head1 SEE ALSO
553    
554     L<AnyEvent::MP>.
555    
556     =head1 AUTHOR
557    
558     Marc Lehmann <schmorp@schmorp.de>
559     http://home.schmorp.de/
560    
561     =cut
562    
563     1
564