ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.10
Committed: Fri Aug 14 14:01:05 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.9: +2 -5 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16     connected nodes etc. are needed.
17    
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27     use Carp ();
28     use MIME::Base64 ();
29    
30     use AE ();
31    
32     use AnyEvent::MP::Node;
33     use AnyEvent::MP::Transport;
34    
35     use base "Exporter";
36    
37 root 1.5 our $VERSION = '0.6';
38 root 1.1 our @EXPORT = qw(
39 root 1.10 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID add_node load_func
40 root 1.1
41 root 1.10 NODE $NODE node_of snd kil
42 root 1.1 resolve_node initialise_node
43     );
44    
45     our $DEFAULT_PORT = "4040";
46    
47     our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
48     our $NETWORK_LATENCY = 3; # activity timeout
49     our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
50    
51     =item $AnyEvent::MP::Kernel::WARN
52    
53     This value is called with an error or warning message, when e.g. a connection
54     could not be created, authorisation failed and so on.
55    
56     The default simply logs the message to STDERR.
57    
58     =cut
59    
60     our $WARN = sub {
61     my $msg = $_[0];
62     $msg =~ s/\n$//;
63     warn "$msg\n";
64     };
65    
66 root 1.6 sub load_func($) {
67     my $func = $_[0];
68    
69     unless (defined &$func) {
70     my $pkg = $func;
71     do {
72     $pkg =~ s/::[^:]+$//
73     or return sub { die "unable to resolve '$func'" };
74     eval "require $pkg";
75     } until defined &$func;
76     }
77    
78     \&$func
79     }
80    
81 root 1.1 sub nonce($) {
82     my $nonce;
83    
84     if (open my $fh, "</dev/urandom") {
85     sysread $fh, $nonce, $_[0];
86     } else {
87     # shit...
88     our $nonce_init;
89     unless ($nonce_init++) {
90     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
91     }
92    
93     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
94     }
95    
96     $nonce
97     }
98    
99     sub asciibits($) {
100     my $data = $_[0];
101    
102     if (eval "use Math::GMP 2.05; 1") {
103     $data = Math::GMP::get_str_gmp (
104     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
105     62
106     );
107     } else {
108     $data = MIME::Base64::encode_base64 $data, "";
109     $data =~ s/=//;
110     $data =~ s/\//s/g;
111     $data =~ s/\+/p/g;
112     }
113    
114     $data
115     }
116    
117     sub gen_uniq {
118     asciibits pack "wNa*", $$, time, nonce 2
119     }
120    
121 root 1.3 =item $AnyEvent::MP::Kernel::PUBLIC
122    
123     A boolean indicating whether this is a full/public node, which can create
124     and accept direct connections form othe rnodes.
125    
126     =item $AnyEvent::MP::Kernel::SLAVE
127    
128     A boolean indicating whether this node is a slave node, i.e. does most of it's
129     message sending/receiving through some master node.
130    
131     =item $AnyEvent::MP::Kernel::MASTER
132    
133     Defined only in slave mode, in which cas eit contains the noderef of the
134     master node.
135    
136     =cut
137    
138 root 1.1 our $PUBLIC = 0;
139     our $SLAVE = 0;
140     our $MASTER; # master noderef when $SLAVE
141    
142     our $NODE = asciibits nonce 16;
143     our $RUNIQ = $NODE; # remote uniq value
144     our $UNIQ = gen_uniq; # per-process/node unique cookie
145     our $ID = "a";
146    
147     our %NODE; # node id to transport mapping, or "undef", for local node
148     our (%PORT, %PORT_DATA); # local ports
149    
150     our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
151     our %LMON; # monitored _local_ ports
152    
153     our %LISTENER;
154    
155     our $SRCNODE; # holds the sending node during _inject
156    
157     sub NODE() {
158     $NODE
159     }
160    
161     sub node_of($) {
162     my ($noderef, undef) = split /#/, $_[0], 2;
163    
164     $noderef
165     }
166    
167     sub TRACE() { 0 }
168    
169     sub _inject {
170     warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
171     &{ $PORT{+shift} or return };
172     }
173    
174     sub add_node {
175     my ($noderef) = @_;
176    
177     return $NODE{$noderef}
178     if exists $NODE{$noderef};
179    
180     for (split /,/, $noderef) {
181     return $NODE{$noderef} = $NODE{$_}
182     if exists $NODE{$_};
183     }
184    
185     # new node, check validity
186     my $node;
187    
188     if ($noderef =~ /^slave\/.+$/) {
189     $node = new AnyEvent::MP::Node::Indirect $noderef;
190    
191     } else {
192     for (split /,/, $noderef) {
193     my ($host, $port) = AnyEvent::Socket::parse_hostport $_
194     or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
195    
196     $port > 0
197     or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
198    
199     AnyEvent::Socket::parse_address $host
200     or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
201     }
202    
203     # TODO: for indirect sends, use a different class
204     $node = new AnyEvent::MP::Node::Direct $noderef;
205     }
206    
207     $NODE{$_} = $node
208     for $noderef, split /,/, $noderef;
209    
210     $node
211     }
212    
213     sub snd(@) {
214     my ($noderef, $portid) = split /#/, shift, 2;
215    
216     warn "SND $noderef <- $portid @_\n" if TRACE;#d#
217    
218     ($NODE{$noderef} || add_node $noderef)
219 root 1.2 ->{send} (["$portid", @_]);
220 root 1.1 }
221    
222     sub kil(@) {
223     my ($noderef, $portid) = split /#/, shift, 2;
224    
225     length $portid
226     or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
227    
228     ($NODE{$noderef} || add_node $noderef)
229     ->kill ("$portid", @_);
230     }
231    
232 root 1.7 sub _nodename {
233     require POSIX;
234     (POSIX::uname ())[1]
235     }
236    
237 root 1.1 sub resolve_node($) {
238     my ($noderef) = @_;
239    
240     my $cv = AE::cv;
241     my @res;
242    
243     $cv->begin (sub {
244     my %seen;
245     my @refs;
246     for (sort { $a->[0] <=> $b->[0] } @res) {
247     push @refs, $_->[1] unless $seen{$_->[1]}++
248     }
249     shift->send (join ",", @refs);
250     });
251    
252     $noderef = $DEFAULT_PORT unless length $noderef;
253    
254     my $idx;
255     for my $t (split /,/, $noderef) {
256     my $pri = ++$idx;
257 root 1.7
258     $t = length $t ? _nodename . ":$t" : _nodename
259     if $t =~ /^\d*$/;
260 root 1.1
261 root 1.7 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
262     or Carp::croak "$t: unparsable transport descriptor";
263    
264     $cv->begin;
265     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
266     for (@_) {
267     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
268     push @res, [
269     $pri += 1e-5,
270     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
271     ];
272 root 1.1 }
273 root 1.7 $cv->end;
274     };
275 root 1.1 }
276    
277     $cv->end;
278    
279     $cv
280     }
281    
282     sub initialise_node(@) {
283     my ($noderef, @others) = @_;
284    
285 root 1.7 my $profile = AnyEvent::MP::Config::find_profile
286     +(defined $noderef ? $noderef : _nodename);
287 root 1.6
288     $noderef = $profile->{noderef}
289 root 1.7 if exists $profile->{noderef};
290 root 1.6
291 root 1.9 push @others, @{ $profile->{seeds} };
292 root 1.3
293 root 1.1 if ($noderef =~ /^slave\/(.*)$/) {
294     $SLAVE = AE::cv;
295     my $name = $1;
296     $name = $NODE unless length $name;
297     $noderef = AE::cv;
298     $noderef->send ("slave/$name");
299    
300     @others
301     or Carp::croak "seed nodes must be specified for slave nodes";
302    
303     } else {
304     $PUBLIC = 1;
305     $noderef = resolve_node $noderef;
306     }
307    
308     @others = map $_->recv, map +(resolve_node $_), @others;
309    
310     $NODE = $noderef->recv;
311    
312     for my $t (split /,/, $NODE) {
313     $NODE{$t} = $NODE{""};
314    
315     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
316    
317     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
318     sub {
319     my ($tp) = @_;
320    
321     # TODO: urgs
322     my $node = add_node $tp->{remote_node};
323     $node->{trial}{accept} = $tp;
324     },
325     ;
326     }
327    
328     (add_node $_)->connect for @others;
329    
330     if ($SLAVE) {
331     my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
332     $MASTER = $SLAVE->recv;
333     defined $MASTER
334     or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
335    
336     (my $via = $MASTER) =~ s/,/!/g;
337    
338     $NODE .= "\@$via";
339     $NODE{$NODE} = $NODE{""};
340    
341 root 1.2 $_->send (["", iam => $NODE])
342 root 1.1 for values %NODE;
343    
344     $SLAVE = 1;
345     }
346 root 1.6
347     (load_func $_)->()
348     for @{ $profile->{services} };
349 root 1.1 }
350    
351     #############################################################################
352 root 1.6 # node monitoring and info
353 root 1.3
354     sub _uniq_nodes {
355     my %node;
356    
357     @node{values %NODE} = values %NODE;
358    
359     values %node;
360     }
361    
362     =item known_nodes
363    
364     Returns the noderefs of all nodes connected to this node.
365    
366     =cut
367    
368     sub known_nodes {
369     map $_->{noderef}, _uniq_nodes
370     }
371    
372     =item up_nodes
373    
374     Return the noderefs of all nodes that are currently connected (excluding
375     the node itself).
376    
377     =cut
378    
379     sub up_nodes {
380     map $_->{noderef}, grep $_->{transport}, _uniq_nodes
381     }
382    
383     =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
384    
385     Registers a callback that is called each time a node goes up (connection
386     is established) or down (connection is lost).
387    
388     Node up messages can only be followed by node down messages for the same
389     node, and vice versa.
390    
391     The fucntino returns an optional guard which can be used to de-register
392     the monitoring callback again.
393    
394     =cut
395    
396     our %MON_NODES;
397    
398     sub mon_nodes($) {
399     my ($cb) = @_;
400    
401     $MON_NODES{$cb+0} = $cb;
402    
403     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
404     }
405    
406     sub _inject_nodeevent($$;@) {
407     my ($node, @args) = @_;
408    
409     unshift @args, $node->{noderef};
410    
411     for my $cb (values %MON_NODES) {
412     eval { $cb->(@args); 1 }
413     or $WARN->($@);
414     }
415     }
416    
417     #############################################################################
418 root 1.1 # self node code
419    
420     our %node_req = (
421     # internal services
422    
423     # monitoring
424     mon0 => sub { # disable monitoring
425     my $portid = shift;
426     my $node = $SRCNODE;
427     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
428     },
429     mon1 => sub { # enable monitoring
430     my $portid = shift;
431     my $node = $SRCNODE;
432     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
433     $node->send (["", kil => $portid, @_]);
434     });
435     },
436     kil => sub {
437     my $cbs = delete $SRCNODE->{lmon}{+shift}
438     or return;
439    
440     $_->(@_) for @$cbs;
441     },
442     # node changed its name (for slave nodes)
443     iam => sub {
444 root 1.8 # get rid of bogus slave/xxx name, hopefully
445     delete $NODE{$SRCNODE->{noderef}};
446    
447     # change noderef
448 root 1.1 $SRCNODE->{noderef} = $_[0];
449 root 1.8
450     # anchor
451 root 1.1 $NODE{$_[0]} = $SRCNODE;
452     },
453    
454     # public services
455    
456     # relay message to another node / generic echo
457     relay => sub {
458     &snd;
459     },
460     relay_multiple => sub {
461     snd @$_ for @_
462     },
463    
464 root 1.4 # informational
465     info => sub {
466     snd @_, $NODE;
467     },
468     known_nodes => sub {
469     snd @_, known_nodes;
470     },
471     up_nodes => sub {
472     snd @_, up_nodes;
473     },
474    
475 root 1.1 # random garbage
476     eval => sub {
477     my @res = eval shift;
478     snd @_, "$@", @res if @_;
479     },
480     time => sub {
481     snd @_, AE::time;
482     },
483     devnull => sub {
484     #
485     },
486     );
487    
488     $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
489     $PORT{""} = sub {
490     my $tag = shift;
491     eval { &{ $node_req{$tag} ||= load_func $tag } };
492     $WARN->("error processing node message: $@") if $@;
493     };
494    
495     =back
496    
497     =head1 SEE ALSO
498    
499     L<AnyEvent::MP>.
500    
501     =head1 AUTHOR
502    
503     Marc Lehmann <schmorp@schmorp.de>
504     http://home.schmorp.de/
505    
506     =cut
507    
508     1
509