ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.14
Committed: Sat Aug 15 04:12:38 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.13: +5 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16     connected nodes etc. are needed.
17    
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27     use Carp ();
28     use MIME::Base64 ();
29    
30     use AE ();
31    
32     use AnyEvent::MP::Node;
33     use AnyEvent::MP::Transport;
34    
35     use base "Exporter";
36    
37 root 1.12 our $VERSION = '0.7';
38 root 1.1 our @EXPORT = qw(
39 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40     connect_node add_node load_func snd_to_func
41 root 1.1
42 root 1.10 NODE $NODE node_of snd kil
43 root 1.1 resolve_node initialise_node
44 root 1.13 known_nodes up_nodes mon_nodes node_is_known node_is_up
45 root 1.1 );
46    
47     our $DEFAULT_PORT = "4040";
48    
49     our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
50     our $NETWORK_LATENCY = 3; # activity timeout
51     our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
52    
53     =item $AnyEvent::MP::Kernel::WARN
54    
55     This value is called with an error or warning message, when e.g. a connection
56     could not be created, authorisation failed and so on.
57    
58     The default simply logs the message to STDERR.
59    
60     =cut
61    
62     our $WARN = sub {
63     my $msg = $_[0];
64     $msg =~ s/\n$//;
65     warn "$msg\n";
66     };
67    
68 root 1.6 sub load_func($) {
69     my $func = $_[0];
70    
71     unless (defined &$func) {
72     my $pkg = $func;
73     do {
74     $pkg =~ s/::[^:]+$//
75     or return sub { die "unable to resolve '$func'" };
76     eval "require $pkg";
77     } until defined &$func;
78     }
79    
80     \&$func
81     }
82    
83 root 1.1 sub nonce($) {
84     my $nonce;
85    
86     if (open my $fh, "</dev/urandom") {
87     sysread $fh, $nonce, $_[0];
88     } else {
89     # shit...
90     our $nonce_init;
91     unless ($nonce_init++) {
92     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
93     }
94    
95     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
96     }
97    
98     $nonce
99     }
100    
101     sub asciibits($) {
102     my $data = $_[0];
103    
104     if (eval "use Math::GMP 2.05; 1") {
105     $data = Math::GMP::get_str_gmp (
106     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
107     62
108     );
109     } else {
110     $data = MIME::Base64::encode_base64 $data, "";
111     $data =~ s/=//;
112     $data =~ s/\//s/g;
113     $data =~ s/\+/p/g;
114     }
115    
116     $data
117     }
118    
119     sub gen_uniq {
120     asciibits pack "wNa*", $$, time, nonce 2
121     }
122    
123 root 1.3 =item $AnyEvent::MP::Kernel::PUBLIC
124    
125     A boolean indicating whether this is a full/public node, which can create
126     and accept direct connections form othe rnodes.
127    
128     =item $AnyEvent::MP::Kernel::SLAVE
129    
130     A boolean indicating whether this node is a slave node, i.e. does most of it's
131     message sending/receiving through some master node.
132    
133     =item $AnyEvent::MP::Kernel::MASTER
134    
135     Defined only in slave mode, in which cas eit contains the noderef of the
136     master node.
137    
138     =cut
139    
140 root 1.1 our $PUBLIC = 0;
141     our $SLAVE = 0;
142     our $MASTER; # master noderef when $SLAVE
143    
144     our $NODE = asciibits nonce 16;
145     our $RUNIQ = $NODE; # remote uniq value
146     our $UNIQ = gen_uniq; # per-process/node unique cookie
147     our $ID = "a";
148    
149     our %NODE; # node id to transport mapping, or "undef", for local node
150     our (%PORT, %PORT_DATA); # local ports
151    
152     our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
153     our %LMON; # monitored _local_ ports
154    
155     our %LISTENER;
156    
157     our $SRCNODE; # holds the sending node during _inject
158    
159     sub NODE() {
160     $NODE
161     }
162    
163     sub node_of($) {
164     my ($noderef, undef) = split /#/, $_[0], 2;
165    
166     $noderef
167     }
168    
169     sub TRACE() { 0 }
170    
171     sub _inject {
172     warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
173     &{ $PORT{+shift} or return };
174     }
175    
176     sub add_node {
177     my ($noderef) = @_;
178    
179     return $NODE{$noderef}
180     if exists $NODE{$noderef};
181    
182     for (split /,/, $noderef) {
183     return $NODE{$noderef} = $NODE{$_}
184     if exists $NODE{$_};
185     }
186    
187     # new node, check validity
188     my $node;
189    
190     if ($noderef =~ /^slave\/.+$/) {
191     $node = new AnyEvent::MP::Node::Indirect $noderef;
192    
193     } else {
194     for (split /,/, $noderef) {
195     my ($host, $port) = AnyEvent::Socket::parse_hostport $_
196     or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
197    
198     $port > 0
199     or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
200    
201     AnyEvent::Socket::parse_address $host
202     or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
203     }
204    
205     $node = new AnyEvent::MP::Node::Direct $noderef;
206     }
207    
208     $NODE{$_} = $node
209     for $noderef, split /,/, $noderef;
210    
211     $node
212     }
213    
214 root 1.13 sub connect_node {
215     &add_node->connect;
216     }
217    
218 root 1.1 sub snd(@) {
219     my ($noderef, $portid) = split /#/, shift, 2;
220    
221     warn "SND $noderef <- $portid @_\n" if TRACE;#d#
222    
223     ($NODE{$noderef} || add_node $noderef)
224 root 1.2 ->{send} (["$portid", @_]);
225 root 1.1 }
226    
227 root 1.11 =item snd_to_func $noderef, $func, @args
228    
229     Expects a noderef and a name of a function. Asynchronously tries to call
230     this function with the given arguments on that node.
231    
232     This fucntion can be used to implement C<spawn>-like interfaces.
233    
234     =cut
235    
236     sub snd_to_func {
237     my $noderef = shift;
238    
239     ($NODE{$noderef} || add_node $noderef)
240     ->send (["", @_]);
241     }
242    
243 root 1.1 sub kil(@) {
244     my ($noderef, $portid) = split /#/, shift, 2;
245    
246     length $portid
247     or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
248    
249     ($NODE{$noderef} || add_node $noderef)
250     ->kill ("$portid", @_);
251     }
252    
253 root 1.7 sub _nodename {
254     require POSIX;
255     (POSIX::uname ())[1]
256     }
257    
258 root 1.1 sub resolve_node($) {
259     my ($noderef) = @_;
260    
261     my $cv = AE::cv;
262     my @res;
263    
264     $cv->begin (sub {
265     my %seen;
266     my @refs;
267     for (sort { $a->[0] <=> $b->[0] } @res) {
268     push @refs, $_->[1] unless $seen{$_->[1]}++
269     }
270     shift->send (join ",", @refs);
271     });
272    
273     $noderef = $DEFAULT_PORT unless length $noderef;
274    
275     my $idx;
276     for my $t (split /,/, $noderef) {
277     my $pri = ++$idx;
278 root 1.7
279     $t = length $t ? _nodename . ":$t" : _nodename
280     if $t =~ /^\d*$/;
281 root 1.1
282 root 1.7 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
283     or Carp::croak "$t: unparsable transport descriptor";
284    
285     $cv->begin;
286     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
287     for (@_) {
288     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
289     push @res, [
290     $pri += 1e-5,
291     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
292     ];
293 root 1.1 }
294 root 1.7 $cv->end;
295     };
296 root 1.1 }
297    
298     $cv->end;
299    
300     $cv
301     }
302    
303     sub initialise_node(@) {
304     my ($noderef, @others) = @_;
305    
306 root 1.7 my $profile = AnyEvent::MP::Config::find_profile
307     +(defined $noderef ? $noderef : _nodename);
308 root 1.6
309     $noderef = $profile->{noderef}
310 root 1.7 if exists $profile->{noderef};
311 root 1.6
312 root 1.9 push @others, @{ $profile->{seeds} };
313 root 1.3
314 root 1.1 if ($noderef =~ /^slave\/(.*)$/) {
315     $SLAVE = AE::cv;
316     my $name = $1;
317     $name = $NODE unless length $name;
318     $noderef = AE::cv;
319     $noderef->send ("slave/$name");
320    
321     @others
322     or Carp::croak "seed nodes must be specified for slave nodes";
323    
324     } else {
325     $PUBLIC = 1;
326     $noderef = resolve_node $noderef;
327     }
328    
329     @others = map $_->recv, map +(resolve_node $_), @others;
330    
331     $NODE = $noderef->recv;
332    
333     for my $t (split /,/, $NODE) {
334     $NODE{$t} = $NODE{""};
335    
336     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
337    
338     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
339     sub {
340     my ($tp) = @_;
341    
342     # TODO: urgs
343     my $node = add_node $tp->{remote_node};
344     $node->{trial}{accept} = $tp;
345     },
346     ;
347     }
348    
349 root 1.14 for (@others) {
350     my $node = add_node $_;
351     $node->{autoconnect} = 1;
352     $node->connect;
353     }
354 root 1.1
355     if ($SLAVE) {
356     my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
357 root 1.13 my $master = $SLAVE->recv;
358     $master
359 root 1.1 or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
360    
361 root 1.13 $MASTER = $master->{noderef};
362     $master->{autoconnect} = 1;
363    
364 root 1.1 (my $via = $MASTER) =~ s/,/!/g;
365    
366     $NODE .= "\@$via";
367     $NODE{$NODE} = $NODE{""};
368    
369 root 1.2 $_->send (["", iam => $NODE])
370 root 1.1 for values %NODE;
371    
372     $SLAVE = 1;
373     }
374 root 1.6
375 root 1.13 for (@{ $profile->{services} }) {
376     if (s/::$//) {
377     eval "require $_";
378     die $@ if $@;
379     } else {
380     (load_func $_)->();
381     }
382     }
383 root 1.1 }
384    
385     #############################################################################
386 root 1.6 # node monitoring and info
387 root 1.3
388     sub _uniq_nodes {
389     my %node;
390    
391     @node{values %NODE} = values %NODE;
392    
393     values %node;
394     }
395    
396 root 1.13 =item node_is_known $noderef
397    
398     Returns true iff the given node is currently known to the system.
399    
400     =cut
401    
402     sub node_is_known($) {
403     exists $NODE{$_[0]}
404     }
405    
406     =item node_is_up $noderef
407    
408     Returns true if the given node is "up", that is, the kernel thinks it has
409     a working connection to it.
410    
411     If the node is known but not currently connected, returns C<0>. If the
412     node is not known, returns C<undef>.
413    
414     =cut
415    
416     sub node_is_up($) {
417     ($NODE{$_[0]} or return)->{transport}
418     ? 1 : 0
419     }
420    
421 root 1.3 =item known_nodes
422    
423     Returns the noderefs of all nodes connected to this node.
424    
425     =cut
426    
427     sub known_nodes {
428     map $_->{noderef}, _uniq_nodes
429     }
430    
431     =item up_nodes
432    
433     Return the noderefs of all nodes that are currently connected (excluding
434     the node itself).
435    
436     =cut
437    
438     sub up_nodes {
439     map $_->{noderef}, grep $_->{transport}, _uniq_nodes
440     }
441    
442     =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
443    
444     Registers a callback that is called each time a node goes up (connection
445     is established) or down (connection is lost).
446    
447     Node up messages can only be followed by node down messages for the same
448     node, and vice versa.
449    
450 root 1.13 The function returns an optional guard which can be used to de-register
451 root 1.3 the monitoring callback again.
452    
453     =cut
454    
455     our %MON_NODES;
456    
457     sub mon_nodes($) {
458     my ($cb) = @_;
459    
460     $MON_NODES{$cb+0} = $cb;
461    
462     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
463     }
464    
465     sub _inject_nodeevent($$;@) {
466     my ($node, @args) = @_;
467    
468     unshift @args, $node->{noderef};
469    
470     for my $cb (values %MON_NODES) {
471     eval { $cb->(@args); 1 }
472     or $WARN->($@);
473     }
474     }
475    
476     #############################################################################
477 root 1.1 # self node code
478    
479     our %node_req = (
480     # internal services
481    
482     # monitoring
483     mon0 => sub { # disable monitoring
484     my $portid = shift;
485     my $node = $SRCNODE;
486     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
487     },
488     mon1 => sub { # enable monitoring
489     my $portid = shift;
490     my $node = $SRCNODE;
491     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
492     $node->send (["", kil => $portid, @_]);
493     });
494     },
495     kil => sub {
496     my $cbs = delete $SRCNODE->{lmon}{+shift}
497     or return;
498    
499     $_->(@_) for @$cbs;
500     },
501     # node changed its name (for slave nodes)
502     iam => sub {
503 root 1.8 # get rid of bogus slave/xxx name, hopefully
504     delete $NODE{$SRCNODE->{noderef}};
505    
506     # change noderef
507 root 1.1 $SRCNODE->{noderef} = $_[0];
508 root 1.8
509     # anchor
510 root 1.1 $NODE{$_[0]} = $SRCNODE;
511     },
512    
513     # public services
514    
515     # relay message to another node / generic echo
516     relay => sub {
517     &snd;
518     },
519     relay_multiple => sub {
520     snd @$_ for @_
521     },
522    
523 root 1.4 # informational
524     info => sub {
525     snd @_, $NODE;
526     },
527     known_nodes => sub {
528     snd @_, known_nodes;
529     },
530     up_nodes => sub {
531     snd @_, up_nodes;
532     },
533    
534 root 1.1 # random garbage
535     eval => sub {
536     my @res = eval shift;
537     snd @_, "$@", @res if @_;
538     },
539     time => sub {
540     snd @_, AE::time;
541     },
542     devnull => sub {
543     #
544     },
545     );
546    
547     $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
548     $PORT{""} = sub {
549     my $tag = shift;
550     eval { &{ $node_req{$tag} ||= load_func $tag } };
551     $WARN->("error processing node message: $@") if $@;
552     };
553    
554     =back
555    
556     =head1 SEE ALSO
557    
558     L<AnyEvent::MP>.
559    
560     =head1 AUTHOR
561    
562     Marc Lehmann <schmorp@schmorp.de>
563     http://home.schmorp.de/
564    
565     =cut
566    
567     1
568