ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.9
Committed: Fri Aug 14 03:27:59 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.8: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16     connected nodes etc. are needed.
17    
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27     use Carp ();
28     use MIME::Base64 ();
29    
30     use AE ();
31    
32     use AnyEvent::MP::Node;
33     use AnyEvent::MP::Transport;
34    
35     use base "Exporter";
36    
37 root 1.5 our $VERSION = '0.6';
38 root 1.1 our @EXPORT = qw(
39     %NODE %PORT %PORT_DATA %REG $UNIQ $RUNIQ $ID add_node load_func
40    
41     NODE $NODE node_of snd kil _any_
42     resolve_node initialise_node
43     );
44    
45     our $DEFAULT_PORT = "4040";
46    
47     our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
48     our $NETWORK_LATENCY = 3; # activity timeout
49     our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
50    
51     =item $AnyEvent::MP::Kernel::WARN
52    
53     This value is called with an error or warning message, when e.g. a connection
54     could not be created, authorisation failed and so on.
55    
56     The default simply logs the message to STDERR.
57    
58     =cut
59    
60     our $WARN = sub {
61     my $msg = $_[0];
62     $msg =~ s/\n$//;
63     warn "$msg\n";
64     };
65    
66 root 1.6 sub load_func($) {
67     my $func = $_[0];
68    
69     unless (defined &$func) {
70     my $pkg = $func;
71     do {
72     $pkg =~ s/::[^:]+$//
73     or return sub { die "unable to resolve '$func'" };
74     eval "require $pkg";
75     } until defined &$func;
76     }
77    
78     \&$func
79     }
80    
81 root 1.1 sub nonce($) {
82     my $nonce;
83    
84     if (open my $fh, "</dev/urandom") {
85     sysread $fh, $nonce, $_[0];
86     } else {
87     # shit...
88     our $nonce_init;
89     unless ($nonce_init++) {
90     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
91     }
92    
93     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
94     }
95    
96     $nonce
97     }
98    
99     sub asciibits($) {
100     my $data = $_[0];
101    
102     if (eval "use Math::GMP 2.05; 1") {
103     $data = Math::GMP::get_str_gmp (
104     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
105     62
106     );
107     } else {
108     $data = MIME::Base64::encode_base64 $data, "";
109     $data =~ s/=//;
110     $data =~ s/\//s/g;
111     $data =~ s/\+/p/g;
112     }
113    
114     $data
115     }
116    
117     sub gen_uniq {
118     asciibits pack "wNa*", $$, time, nonce 2
119     }
120    
121 root 1.3 =item $AnyEvent::MP::Kernel::PUBLIC
122    
123     A boolean indicating whether this is a full/public node, which can create
124     and accept direct connections form othe rnodes.
125    
126     =item $AnyEvent::MP::Kernel::SLAVE
127    
128     A boolean indicating whether this node is a slave node, i.e. does most of it's
129     message sending/receiving through some master node.
130    
131     =item $AnyEvent::MP::Kernel::MASTER
132    
133     Defined only in slave mode, in which cas eit contains the noderef of the
134     master node.
135    
136     =cut
137    
138 root 1.1 our $PUBLIC = 0;
139     our $SLAVE = 0;
140     our $MASTER; # master noderef when $SLAVE
141    
142     our $NODE = asciibits nonce 16;
143     our $RUNIQ = $NODE; # remote uniq value
144     our $UNIQ = gen_uniq; # per-process/node unique cookie
145     our $ID = "a";
146    
147     our %NODE; # node id to transport mapping, or "undef", for local node
148     our (%PORT, %PORT_DATA); # local ports
149    
150     our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
151     our %LMON; # monitored _local_ ports
152    
153     our %LISTENER;
154    
155     our $SRCNODE; # holds the sending node during _inject
156    
157     sub NODE() {
158     $NODE
159     }
160    
161     sub node_of($) {
162     my ($noderef, undef) = split /#/, $_[0], 2;
163    
164     $noderef
165     }
166    
167     sub _ANY_() { 1 }
168     sub _any_() { \&_ANY_ }
169    
170     sub TRACE() { 0 }
171    
172     sub _inject {
173     warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
174     &{ $PORT{+shift} or return };
175     }
176    
177     sub add_node {
178     my ($noderef) = @_;
179    
180     return $NODE{$noderef}
181     if exists $NODE{$noderef};
182    
183     for (split /,/, $noderef) {
184     return $NODE{$noderef} = $NODE{$_}
185     if exists $NODE{$_};
186     }
187    
188     # new node, check validity
189     my $node;
190    
191     if ($noderef =~ /^slave\/.+$/) {
192     $node = new AnyEvent::MP::Node::Indirect $noderef;
193    
194     } else {
195     for (split /,/, $noderef) {
196     my ($host, $port) = AnyEvent::Socket::parse_hostport $_
197     or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
198    
199     $port > 0
200     or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
201    
202     AnyEvent::Socket::parse_address $host
203     or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
204     }
205    
206     # TODO: for indirect sends, use a different class
207     $node = new AnyEvent::MP::Node::Direct $noderef;
208     }
209    
210     $NODE{$_} = $node
211     for $noderef, split /,/, $noderef;
212    
213     $node
214     }
215    
216     sub snd(@) {
217     my ($noderef, $portid) = split /#/, shift, 2;
218    
219     warn "SND $noderef <- $portid @_\n" if TRACE;#d#
220    
221     ($NODE{$noderef} || add_node $noderef)
222 root 1.2 ->{send} (["$portid", @_]);
223 root 1.1 }
224    
225     sub kil(@) {
226     my ($noderef, $portid) = split /#/, shift, 2;
227    
228     length $portid
229     or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
230    
231     ($NODE{$noderef} || add_node $noderef)
232     ->kill ("$portid", @_);
233     }
234    
235 root 1.7 sub _nodename {
236     require POSIX;
237     (POSIX::uname ())[1]
238     }
239    
240 root 1.1 sub resolve_node($) {
241     my ($noderef) = @_;
242    
243     my $cv = AE::cv;
244     my @res;
245    
246     $cv->begin (sub {
247     my %seen;
248     my @refs;
249     for (sort { $a->[0] <=> $b->[0] } @res) {
250     push @refs, $_->[1] unless $seen{$_->[1]}++
251     }
252     shift->send (join ",", @refs);
253     });
254    
255     $noderef = $DEFAULT_PORT unless length $noderef;
256    
257     my $idx;
258     for my $t (split /,/, $noderef) {
259     my $pri = ++$idx;
260 root 1.7
261     $t = length $t ? _nodename . ":$t" : _nodename
262     if $t =~ /^\d*$/;
263 root 1.1
264 root 1.7 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
265     or Carp::croak "$t: unparsable transport descriptor";
266    
267     $cv->begin;
268     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
269     for (@_) {
270     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
271     push @res, [
272     $pri += 1e-5,
273     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
274     ];
275 root 1.1 }
276 root 1.7 $cv->end;
277     };
278 root 1.1 }
279    
280     $cv->end;
281    
282     $cv
283     }
284    
285     sub initialise_node(@) {
286     my ($noderef, @others) = @_;
287    
288 root 1.7 my $profile = AnyEvent::MP::Config::find_profile
289     +(defined $noderef ? $noderef : _nodename);
290 root 1.6
291     $noderef = $profile->{noderef}
292 root 1.7 if exists $profile->{noderef};
293 root 1.6
294 root 1.9 push @others, @{ $profile->{seeds} };
295 root 1.3
296 root 1.1 if ($noderef =~ /^slave\/(.*)$/) {
297     $SLAVE = AE::cv;
298     my $name = $1;
299     $name = $NODE unless length $name;
300     $noderef = AE::cv;
301     $noderef->send ("slave/$name");
302    
303     @others
304     or Carp::croak "seed nodes must be specified for slave nodes";
305    
306     } else {
307     $PUBLIC = 1;
308     $noderef = resolve_node $noderef;
309     }
310    
311     @others = map $_->recv, map +(resolve_node $_), @others;
312    
313     $NODE = $noderef->recv;
314    
315     for my $t (split /,/, $NODE) {
316     $NODE{$t} = $NODE{""};
317    
318     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
319    
320     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
321     sub {
322     my ($tp) = @_;
323    
324     # TODO: urgs
325     my $node = add_node $tp->{remote_node};
326     $node->{trial}{accept} = $tp;
327     },
328     ;
329     }
330    
331     (add_node $_)->connect for @others;
332    
333     if ($SLAVE) {
334     my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
335     $MASTER = $SLAVE->recv;
336     defined $MASTER
337     or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
338    
339     (my $via = $MASTER) =~ s/,/!/g;
340    
341     $NODE .= "\@$via";
342     $NODE{$NODE} = $NODE{""};
343    
344 root 1.2 $_->send (["", iam => $NODE])
345 root 1.1 for values %NODE;
346    
347     $SLAVE = 1;
348     }
349 root 1.6
350     (load_func $_)->()
351     for @{ $profile->{services} };
352 root 1.1 }
353    
354     #############################################################################
355 root 1.6 # node monitoring and info
356 root 1.3
357     sub _uniq_nodes {
358     my %node;
359    
360     @node{values %NODE} = values %NODE;
361    
362     values %node;
363     }
364    
365     =item known_nodes
366    
367     Returns the noderefs of all nodes connected to this node.
368    
369     =cut
370    
371     sub known_nodes {
372     map $_->{noderef}, _uniq_nodes
373     }
374    
375     =item up_nodes
376    
377     Return the noderefs of all nodes that are currently connected (excluding
378     the node itself).
379    
380     =cut
381    
382     sub up_nodes {
383     map $_->{noderef}, grep $_->{transport}, _uniq_nodes
384     }
385    
386     =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
387    
388     Registers a callback that is called each time a node goes up (connection
389     is established) or down (connection is lost).
390    
391     Node up messages can only be followed by node down messages for the same
392     node, and vice versa.
393    
394     The fucntino returns an optional guard which can be used to de-register
395     the monitoring callback again.
396    
397     =cut
398    
399     our %MON_NODES;
400    
401     sub mon_nodes($) {
402     my ($cb) = @_;
403    
404     $MON_NODES{$cb+0} = $cb;
405    
406     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
407     }
408    
409     sub _inject_nodeevent($$;@) {
410     my ($node, @args) = @_;
411    
412     unshift @args, $node->{noderef};
413    
414     for my $cb (values %MON_NODES) {
415     eval { $cb->(@args); 1 }
416     or $WARN->($@);
417     }
418     }
419    
420     #############################################################################
421 root 1.1 # self node code
422    
423     our %node_req = (
424     # internal services
425    
426     # monitoring
427     mon0 => sub { # disable monitoring
428     my $portid = shift;
429     my $node = $SRCNODE;
430     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
431     },
432     mon1 => sub { # enable monitoring
433     my $portid = shift;
434     my $node = $SRCNODE;
435     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
436     $node->send (["", kil => $portid, @_]);
437     });
438     },
439     kil => sub {
440     my $cbs = delete $SRCNODE->{lmon}{+shift}
441     or return;
442    
443     $_->(@_) for @$cbs;
444     },
445     # node changed its name (for slave nodes)
446     iam => sub {
447 root 1.8 # get rid of bogus slave/xxx name, hopefully
448     delete $NODE{$SRCNODE->{noderef}};
449    
450     # change noderef
451 root 1.1 $SRCNODE->{noderef} = $_[0];
452 root 1.8
453     # anchor
454 root 1.1 $NODE{$_[0]} = $SRCNODE;
455     },
456    
457     # public services
458    
459     # relay message to another node / generic echo
460     relay => sub {
461     &snd;
462     },
463     relay_multiple => sub {
464     snd @$_ for @_
465     },
466    
467 root 1.4 # informational
468     info => sub {
469     snd @_, $NODE;
470     },
471     known_nodes => sub {
472     snd @_, known_nodes;
473     },
474     up_nodes => sub {
475     snd @_, up_nodes;
476     },
477    
478 root 1.1 # random garbage
479     eval => sub {
480     my @res = eval shift;
481     snd @_, "$@", @res if @_;
482     },
483     time => sub {
484     snd @_, AE::time;
485     },
486     devnull => sub {
487     #
488     },
489     );
490    
491     $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
492     $PORT{""} = sub {
493     my $tag = shift;
494     eval { &{ $node_req{$tag} ||= load_func $tag } };
495     $WARN->("error processing node message: $@") if $@;
496     };
497    
498     =back
499    
500     =head1 SEE ALSO
501    
502     L<AnyEvent::MP>.
503    
504     =head1 AUTHOR
505    
506     Marc Lehmann <schmorp@schmorp.de>
507     http://home.schmorp.de/
508    
509     =cut
510    
511     1
512