ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.11
Committed: Fri Aug 14 23:17:17 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.10: +17 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16     connected nodes etc. are needed.
17    
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27     use Carp ();
28     use MIME::Base64 ();
29    
30     use AE ();
31    
32     use AnyEvent::MP::Node;
33     use AnyEvent::MP::Transport;
34    
35     use base "Exporter";
36    
37 root 1.5 our $VERSION = '0.6';
38 root 1.1 our @EXPORT = qw(
39 root 1.11 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID add_node load_func snd_to_func
40 root 1.1
41 root 1.10 NODE $NODE node_of snd kil
42 root 1.1 resolve_node initialise_node
43     );
44    
45     our $DEFAULT_PORT = "4040";
46    
47     our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
48     our $NETWORK_LATENCY = 3; # activity timeout
49     our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
50    
51     =item $AnyEvent::MP::Kernel::WARN
52    
53     This value is called with an error or warning message, when e.g. a connection
54     could not be created, authorisation failed and so on.
55    
56     The default simply logs the message to STDERR.
57    
58     =cut
59    
60     our $WARN = sub {
61     my $msg = $_[0];
62     $msg =~ s/\n$//;
63     warn "$msg\n";
64     };
65    
66 root 1.6 sub load_func($) {
67     my $func = $_[0];
68    
69     unless (defined &$func) {
70     my $pkg = $func;
71     do {
72     $pkg =~ s/::[^:]+$//
73     or return sub { die "unable to resolve '$func'" };
74     eval "require $pkg";
75     } until defined &$func;
76     }
77    
78     \&$func
79     }
80    
81 root 1.1 sub nonce($) {
82     my $nonce;
83    
84     if (open my $fh, "</dev/urandom") {
85     sysread $fh, $nonce, $_[0];
86     } else {
87     # shit...
88     our $nonce_init;
89     unless ($nonce_init++) {
90     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
91     }
92    
93     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
94     }
95    
96     $nonce
97     }
98    
99     sub asciibits($) {
100     my $data = $_[0];
101    
102     if (eval "use Math::GMP 2.05; 1") {
103     $data = Math::GMP::get_str_gmp (
104     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
105     62
106     );
107     } else {
108     $data = MIME::Base64::encode_base64 $data, "";
109     $data =~ s/=//;
110     $data =~ s/\//s/g;
111     $data =~ s/\+/p/g;
112     }
113    
114     $data
115     }
116    
117     sub gen_uniq {
118     asciibits pack "wNa*", $$, time, nonce 2
119     }
120    
121 root 1.3 =item $AnyEvent::MP::Kernel::PUBLIC
122    
123     A boolean indicating whether this is a full/public node, which can create
124     and accept direct connections form othe rnodes.
125    
126     =item $AnyEvent::MP::Kernel::SLAVE
127    
128     A boolean indicating whether this node is a slave node, i.e. does most of it's
129     message sending/receiving through some master node.
130    
131     =item $AnyEvent::MP::Kernel::MASTER
132    
133     Defined only in slave mode, in which cas eit contains the noderef of the
134     master node.
135    
136     =cut
137    
138 root 1.1 our $PUBLIC = 0;
139     our $SLAVE = 0;
140     our $MASTER; # master noderef when $SLAVE
141    
142     our $NODE = asciibits nonce 16;
143     our $RUNIQ = $NODE; # remote uniq value
144     our $UNIQ = gen_uniq; # per-process/node unique cookie
145     our $ID = "a";
146    
147     our %NODE; # node id to transport mapping, or "undef", for local node
148     our (%PORT, %PORT_DATA); # local ports
149    
150     our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
151     our %LMON; # monitored _local_ ports
152    
153     our %LISTENER;
154    
155     our $SRCNODE; # holds the sending node during _inject
156    
157     sub NODE() {
158     $NODE
159     }
160    
161     sub node_of($) {
162     my ($noderef, undef) = split /#/, $_[0], 2;
163    
164     $noderef
165     }
166    
167     sub TRACE() { 0 }
168    
169     sub _inject {
170     warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
171     &{ $PORT{+shift} or return };
172     }
173    
174     sub add_node {
175     my ($noderef) = @_;
176    
177     return $NODE{$noderef}
178     if exists $NODE{$noderef};
179    
180     for (split /,/, $noderef) {
181     return $NODE{$noderef} = $NODE{$_}
182     if exists $NODE{$_};
183     }
184    
185     # new node, check validity
186     my $node;
187    
188     if ($noderef =~ /^slave\/.+$/) {
189     $node = new AnyEvent::MP::Node::Indirect $noderef;
190    
191     } else {
192     for (split /,/, $noderef) {
193     my ($host, $port) = AnyEvent::Socket::parse_hostport $_
194     or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
195    
196     $port > 0
197     or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
198    
199     AnyEvent::Socket::parse_address $host
200     or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
201     }
202    
203     # TODO: for indirect sends, use a different class
204     $node = new AnyEvent::MP::Node::Direct $noderef;
205     }
206    
207     $NODE{$_} = $node
208     for $noderef, split /,/, $noderef;
209    
210     $node
211     }
212    
213     sub snd(@) {
214     my ($noderef, $portid) = split /#/, shift, 2;
215    
216     warn "SND $noderef <- $portid @_\n" if TRACE;#d#
217    
218     ($NODE{$noderef} || add_node $noderef)
219 root 1.2 ->{send} (["$portid", @_]);
220 root 1.1 }
221    
222 root 1.11 =item snd_to_func $noderef, $func, @args
223    
224     Expects a noderef and a name of a function. Asynchronously tries to call
225     this function with the given arguments on that node.
226    
227     This fucntion can be used to implement C<spawn>-like interfaces.
228    
229     =cut
230    
231     sub snd_to_func {
232     my $noderef = shift;
233    
234     ($NODE{$noderef} || add_node $noderef)
235     ->send (["", @_]);
236     }
237    
238 root 1.1 sub kil(@) {
239     my ($noderef, $portid) = split /#/, shift, 2;
240    
241     length $portid
242     or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
243    
244     ($NODE{$noderef} || add_node $noderef)
245     ->kill ("$portid", @_);
246     }
247    
248 root 1.7 sub _nodename {
249     require POSIX;
250     (POSIX::uname ())[1]
251     }
252    
253 root 1.1 sub resolve_node($) {
254     my ($noderef) = @_;
255    
256     my $cv = AE::cv;
257     my @res;
258    
259     $cv->begin (sub {
260     my %seen;
261     my @refs;
262     for (sort { $a->[0] <=> $b->[0] } @res) {
263     push @refs, $_->[1] unless $seen{$_->[1]}++
264     }
265     shift->send (join ",", @refs);
266     });
267    
268     $noderef = $DEFAULT_PORT unless length $noderef;
269    
270     my $idx;
271     for my $t (split /,/, $noderef) {
272     my $pri = ++$idx;
273 root 1.7
274     $t = length $t ? _nodename . ":$t" : _nodename
275     if $t =~ /^\d*$/;
276 root 1.1
277 root 1.7 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
278     or Carp::croak "$t: unparsable transport descriptor";
279    
280     $cv->begin;
281     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
282     for (@_) {
283     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
284     push @res, [
285     $pri += 1e-5,
286     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
287     ];
288 root 1.1 }
289 root 1.7 $cv->end;
290     };
291 root 1.1 }
292    
293     $cv->end;
294    
295     $cv
296     }
297    
298     sub initialise_node(@) {
299     my ($noderef, @others) = @_;
300    
301 root 1.7 my $profile = AnyEvent::MP::Config::find_profile
302     +(defined $noderef ? $noderef : _nodename);
303 root 1.6
304     $noderef = $profile->{noderef}
305 root 1.7 if exists $profile->{noderef};
306 root 1.6
307 root 1.9 push @others, @{ $profile->{seeds} };
308 root 1.3
309 root 1.1 if ($noderef =~ /^slave\/(.*)$/) {
310     $SLAVE = AE::cv;
311     my $name = $1;
312     $name = $NODE unless length $name;
313     $noderef = AE::cv;
314     $noderef->send ("slave/$name");
315    
316     @others
317     or Carp::croak "seed nodes must be specified for slave nodes";
318    
319     } else {
320     $PUBLIC = 1;
321     $noderef = resolve_node $noderef;
322     }
323    
324     @others = map $_->recv, map +(resolve_node $_), @others;
325    
326     $NODE = $noderef->recv;
327    
328     for my $t (split /,/, $NODE) {
329     $NODE{$t} = $NODE{""};
330    
331     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
332    
333     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
334     sub {
335     my ($tp) = @_;
336    
337     # TODO: urgs
338     my $node = add_node $tp->{remote_node};
339     $node->{trial}{accept} = $tp;
340     },
341     ;
342     }
343    
344     (add_node $_)->connect for @others;
345    
346     if ($SLAVE) {
347     my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
348     $MASTER = $SLAVE->recv;
349     defined $MASTER
350     or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
351    
352     (my $via = $MASTER) =~ s/,/!/g;
353    
354     $NODE .= "\@$via";
355     $NODE{$NODE} = $NODE{""};
356    
357 root 1.2 $_->send (["", iam => $NODE])
358 root 1.1 for values %NODE;
359    
360     $SLAVE = 1;
361     }
362 root 1.6
363     (load_func $_)->()
364     for @{ $profile->{services} };
365 root 1.1 }
366    
367     #############################################################################
368 root 1.6 # node monitoring and info
369 root 1.3
370     sub _uniq_nodes {
371     my %node;
372    
373     @node{values %NODE} = values %NODE;
374    
375     values %node;
376     }
377    
378     =item known_nodes
379    
380     Returns the noderefs of all nodes connected to this node.
381    
382     =cut
383    
384     sub known_nodes {
385     map $_->{noderef}, _uniq_nodes
386     }
387    
388     =item up_nodes
389    
390     Return the noderefs of all nodes that are currently connected (excluding
391     the node itself).
392    
393     =cut
394    
395     sub up_nodes {
396     map $_->{noderef}, grep $_->{transport}, _uniq_nodes
397     }
398    
399     =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
400    
401     Registers a callback that is called each time a node goes up (connection
402     is established) or down (connection is lost).
403    
404     Node up messages can only be followed by node down messages for the same
405     node, and vice versa.
406    
407     The fucntino returns an optional guard which can be used to de-register
408     the monitoring callback again.
409    
410     =cut
411    
412     our %MON_NODES;
413    
414     sub mon_nodes($) {
415     my ($cb) = @_;
416    
417     $MON_NODES{$cb+0} = $cb;
418    
419     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
420     }
421    
422     sub _inject_nodeevent($$;@) {
423     my ($node, @args) = @_;
424    
425     unshift @args, $node->{noderef};
426    
427     for my $cb (values %MON_NODES) {
428     eval { $cb->(@args); 1 }
429     or $WARN->($@);
430     }
431     }
432    
433     #############################################################################
434 root 1.1 # self node code
435    
436     our %node_req = (
437     # internal services
438    
439     # monitoring
440     mon0 => sub { # disable monitoring
441     my $portid = shift;
442     my $node = $SRCNODE;
443     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
444     },
445     mon1 => sub { # enable monitoring
446     my $portid = shift;
447     my $node = $SRCNODE;
448     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
449     $node->send (["", kil => $portid, @_]);
450     });
451     },
452     kil => sub {
453     my $cbs = delete $SRCNODE->{lmon}{+shift}
454     or return;
455    
456     $_->(@_) for @$cbs;
457     },
458     # node changed its name (for slave nodes)
459     iam => sub {
460 root 1.8 # get rid of bogus slave/xxx name, hopefully
461     delete $NODE{$SRCNODE->{noderef}};
462    
463     # change noderef
464 root 1.1 $SRCNODE->{noderef} = $_[0];
465 root 1.8
466     # anchor
467 root 1.1 $NODE{$_[0]} = $SRCNODE;
468     },
469    
470     # public services
471    
472     # relay message to another node / generic echo
473     relay => sub {
474     &snd;
475     },
476     relay_multiple => sub {
477     snd @$_ for @_
478     },
479    
480 root 1.4 # informational
481     info => sub {
482     snd @_, $NODE;
483     },
484     known_nodes => sub {
485     snd @_, known_nodes;
486     },
487     up_nodes => sub {
488     snd @_, up_nodes;
489     },
490    
491 root 1.1 # random garbage
492     eval => sub {
493     my @res = eval shift;
494     snd @_, "$@", @res if @_;
495     },
496     time => sub {
497     snd @_, AE::time;
498     },
499     devnull => sub {
500     #
501     },
502     );
503    
504     $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
505     $PORT{""} = sub {
506     my $tag = shift;
507     eval { &{ $node_req{$tag} ||= load_func $tag } };
508     $WARN->("error processing node message: $@") if $@;
509     };
510    
511     =back
512    
513     =head1 SEE ALSO
514    
515     L<AnyEvent::MP>.
516    
517     =head1 AUTHOR
518    
519     Marc Lehmann <schmorp@schmorp.de>
520     http://home.schmorp.de/
521    
522     =cut
523    
524     1
525