ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.4
Committed: Thu Aug 13 00:49:23 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.3: +11 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16     connected nodes etc. are needed.
17    
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27     use Carp ();
28     use MIME::Base64 ();
29    
30     use AE ();
31    
32     use AnyEvent::MP::Node;
33     use AnyEvent::MP::Transport;
34    
35     use base "Exporter";
36    
37     our $VERSION = '0.4';
38     our @EXPORT = qw(
39     %NODE %PORT %PORT_DATA %REG $UNIQ $RUNIQ $ID add_node load_func
40    
41     NODE $NODE node_of snd kil _any_
42     resolve_node initialise_node
43     );
44    
45     our $DEFAULT_PORT = "4040";
46    
47     our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
48     our $NETWORK_LATENCY = 3; # activity timeout
49     our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
50    
51     =item $AnyEvent::MP::Kernel::WARN
52    
53     This value is called with an error or warning message, when e.g. a connection
54     could not be created, authorisation failed and so on.
55    
56     The default simply logs the message to STDERR.
57    
58     =cut
59    
60     our $WARN = sub {
61     my $msg = $_[0];
62     $msg =~ s/\n$//;
63     warn "$msg\n";
64     };
65    
66     sub nonce($) {
67     my $nonce;
68    
69     if (open my $fh, "</dev/urandom") {
70     sysread $fh, $nonce, $_[0];
71     } else {
72     # shit...
73     our $nonce_init;
74     unless ($nonce_init++) {
75     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
76     }
77    
78     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
79     }
80    
81     $nonce
82     }
83    
84     sub asciibits($) {
85     my $data = $_[0];
86    
87     if (eval "use Math::GMP 2.05; 1") {
88     $data = Math::GMP::get_str_gmp (
89     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
90     62
91     );
92     } else {
93     $data = MIME::Base64::encode_base64 $data, "";
94     $data =~ s/=//;
95     $data =~ s/\//s/g;
96     $data =~ s/\+/p/g;
97     }
98    
99     $data
100     }
101    
102     sub gen_uniq {
103     asciibits pack "wNa*", $$, time, nonce 2
104     }
105    
106 root 1.3 =item $AnyEvent::MP::Kernel::PUBLIC
107    
108     A boolean indicating whether this is a full/public node, which can create
109     and accept direct connections form othe rnodes.
110    
111     =item $AnyEvent::MP::Kernel::SLAVE
112    
113     A boolean indicating whether this node is a slave node, i.e. does most of it's
114     message sending/receiving through some master node.
115    
116     =item $AnyEvent::MP::Kernel::MASTER
117    
118     Defined only in slave mode, in which cas eit contains the noderef of the
119     master node.
120    
121     =cut
122    
123 root 1.1 our $PUBLIC = 0;
124     our $SLAVE = 0;
125     our $MASTER; # master noderef when $SLAVE
126    
127     our $NODE = asciibits nonce 16;
128     our $RUNIQ = $NODE; # remote uniq value
129     our $UNIQ = gen_uniq; # per-process/node unique cookie
130     our $ID = "a";
131    
132     our %NODE; # node id to transport mapping, or "undef", for local node
133     our (%PORT, %PORT_DATA); # local ports
134    
135     our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
136     our %LMON; # monitored _local_ ports
137    
138     our %LISTENER;
139    
140     our $SRCNODE; # holds the sending node during _inject
141    
142     sub NODE() {
143     $NODE
144     }
145    
146     sub node_of($) {
147     my ($noderef, undef) = split /#/, $_[0], 2;
148    
149     $noderef
150     }
151    
152     sub _ANY_() { 1 }
153     sub _any_() { \&_ANY_ }
154    
155     sub TRACE() { 0 }
156    
157     sub _inject {
158     warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
159     &{ $PORT{+shift} or return };
160     }
161    
162     sub add_node {
163     my ($noderef) = @_;
164    
165     return $NODE{$noderef}
166     if exists $NODE{$noderef};
167    
168     for (split /,/, $noderef) {
169     return $NODE{$noderef} = $NODE{$_}
170     if exists $NODE{$_};
171     }
172    
173     # new node, check validity
174     my $node;
175    
176     if ($noderef =~ /^slave\/.+$/) {
177     $node = new AnyEvent::MP::Node::Indirect $noderef;
178    
179     } else {
180     for (split /,/, $noderef) {
181     my ($host, $port) = AnyEvent::Socket::parse_hostport $_
182     or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
183    
184     $port > 0
185     or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
186    
187     AnyEvent::Socket::parse_address $host
188     or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
189     }
190    
191     # TODO: for indirect sends, use a different class
192     $node = new AnyEvent::MP::Node::Direct $noderef;
193     }
194    
195     $NODE{$_} = $node
196     for $noderef, split /,/, $noderef;
197    
198     $node
199     }
200    
201     sub snd(@) {
202     my ($noderef, $portid) = split /#/, shift, 2;
203    
204     warn "SND $noderef <- $portid @_\n" if TRACE;#d#
205    
206     ($NODE{$noderef} || add_node $noderef)
207 root 1.2 ->{send} (["$portid", @_]);
208 root 1.1 }
209    
210     sub kil(@) {
211     my ($noderef, $portid) = split /#/, shift, 2;
212    
213     length $portid
214     or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
215    
216     ($NODE{$noderef} || add_node $noderef)
217     ->kill ("$portid", @_);
218     }
219    
220     sub resolve_node($) {
221     my ($noderef) = @_;
222    
223     my $cv = AE::cv;
224     my @res;
225    
226     $cv->begin (sub {
227     my %seen;
228     my @refs;
229     for (sort { $a->[0] <=> $b->[0] } @res) {
230     push @refs, $_->[1] unless $seen{$_->[1]}++
231     }
232     shift->send (join ",", @refs);
233     });
234    
235     $noderef = $DEFAULT_PORT unless length $noderef;
236    
237     my $idx;
238     for my $t (split /,/, $noderef) {
239     my $pri = ++$idx;
240    
241     if ($t =~ /^\d*$/) {
242     require POSIX;
243     my $nodename = (POSIX::uname ())[1];
244    
245     $cv->begin;
246     AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
247     for (@_) {
248     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
249     push @res, [
250     $pri += 1e-5,
251     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
252     ];
253     }
254     $cv->end;
255     };
256    
257     # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
258     #
259     # for (@ipv4) {
260     # push @res, [
261     # $pri,
262     # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
263     # ];
264     # }
265     } else {
266     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
267     or Carp::croak "$t: unparsable transport descriptor";
268    
269     $cv->begin;
270     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
271     for (@_) {
272     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
273     push @res, [
274     $pri += 1e-5,
275     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
276     ];
277     }
278     $cv->end;
279     }
280     }
281     }
282    
283     $cv->end;
284    
285     $cv
286     }
287    
288     sub initialise_node(@) {
289     my ($noderef, @others) = @_;
290    
291 root 1.3 @others = @{ $AnyEvent::MP::Config::CFG{seeds} }
292     unless @others;
293    
294 root 1.1 if ($noderef =~ /^slave\/(.*)$/) {
295     $SLAVE = AE::cv;
296     my $name = $1;
297     $name = $NODE unless length $name;
298     $noderef = AE::cv;
299     $noderef->send ("slave/$name");
300    
301     @others
302     or Carp::croak "seed nodes must be specified for slave nodes";
303    
304     } else {
305     $PUBLIC = 1;
306     $noderef = resolve_node $noderef;
307     }
308    
309     @others = map $_->recv, map +(resolve_node $_), @others;
310    
311     $NODE = $noderef->recv;
312    
313     for my $t (split /,/, $NODE) {
314     $NODE{$t} = $NODE{""};
315    
316     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
317    
318     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
319     sub {
320     my ($tp) = @_;
321    
322     # TODO: urgs
323     my $node = add_node $tp->{remote_node};
324     $node->{trial}{accept} = $tp;
325     },
326     ;
327     }
328    
329     (add_node $_)->connect for @others;
330    
331     if ($SLAVE) {
332     my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
333     $MASTER = $SLAVE->recv;
334     defined $MASTER
335     or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
336    
337     (my $via = $MASTER) =~ s/,/!/g;
338    
339     $NODE .= "\@$via";
340     $NODE{$NODE} = $NODE{""};
341    
342 root 1.2 $_->send (["", iam => $NODE])
343 root 1.1 for values %NODE;
344    
345     $SLAVE = 1;
346     }
347     }
348    
349     #############################################################################
350 root 1.3 # node moniotirng and info
351    
352     sub _uniq_nodes {
353     my %node;
354    
355     @node{values %NODE} = values %NODE;
356    
357     values %node;
358     }
359    
360     =item known_nodes
361    
362     Returns the noderefs of all nodes connected to this node.
363    
364     =cut
365    
366     sub known_nodes {
367     map $_->{noderef}, _uniq_nodes
368     }
369    
370     =item up_nodes
371    
372     Return the noderefs of all nodes that are currently connected (excluding
373     the node itself).
374    
375     =cut
376    
377     sub up_nodes {
378     map $_->{noderef}, grep $_->{transport}, _uniq_nodes
379     }
380    
381     =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
382    
383     Registers a callback that is called each time a node goes up (connection
384     is established) or down (connection is lost).
385    
386     Node up messages can only be followed by node down messages for the same
387     node, and vice versa.
388    
389     The fucntino returns an optional guard which can be used to de-register
390     the monitoring callback again.
391    
392     =cut
393    
394     our %MON_NODES;
395    
396     sub mon_nodes($) {
397     my ($cb) = @_;
398    
399     $MON_NODES{$cb+0} = $cb;
400    
401     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
402     }
403    
404     sub _inject_nodeevent($$;@) {
405     my ($node, @args) = @_;
406    
407     unshift @args, $node->{noderef};
408    
409     for my $cb (values %MON_NODES) {
410     eval { $cb->(@args); 1 }
411     or $WARN->($@);
412     }
413     }
414    
415     #############################################################################
416 root 1.1 # self node code
417    
418     sub load_func($) {
419     my $func = $_[0];
420    
421     unless (defined &$func) {
422     my $pkg = $func;
423     do {
424     $pkg =~ s/::[^:]+$//
425     or return sub { die "unable to resolve '$func'" };
426     eval "require $pkg";
427     } until defined &$func;
428     }
429    
430     \&$func
431     }
432    
433     our %node_req = (
434     # internal services
435    
436     # monitoring
437     mon0 => sub { # disable monitoring
438     my $portid = shift;
439     my $node = $SRCNODE;
440     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
441     },
442     mon1 => sub { # enable monitoring
443     my $portid = shift;
444     my $node = $SRCNODE;
445     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
446     $node->send (["", kil => $portid, @_]);
447     });
448     },
449     kil => sub {
450     my $cbs = delete $SRCNODE->{lmon}{+shift}
451     or return;
452    
453     $_->(@_) for @$cbs;
454     },
455     # node changed its name (for slave nodes)
456     iam => sub {
457     $SRCNODE->{noderef} = $_[0];
458     $NODE{$_[0]} = $SRCNODE;
459     },
460    
461     # public services
462    
463     # relay message to another node / generic echo
464     relay => sub {
465     &snd;
466     },
467     relay_multiple => sub {
468     snd @$_ for @_
469     },
470    
471 root 1.4 # informational
472     info => sub {
473     snd @_, $NODE;
474     },
475     known_nodes => sub {
476     snd @_, known_nodes;
477     },
478     up_nodes => sub {
479     snd @_, up_nodes;
480     },
481    
482 root 1.1 # random garbage
483     eval => sub {
484     my @res = eval shift;
485     snd @_, "$@", @res if @_;
486     },
487     time => sub {
488     snd @_, AE::time;
489     },
490     devnull => sub {
491     #
492     },
493     );
494    
495     $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
496     $PORT{""} = sub {
497     my $tag = shift;
498     eval { &{ $node_req{$tag} ||= load_func $tag } };
499     $WARN->("error processing node message: $@") if $@;
500     };
501    
502     =back
503    
504     =head1 SEE ALSO
505    
506     L<AnyEvent::MP>.
507    
508     =head1 AUTHOR
509    
510     Marc Lehmann <schmorp@schmorp.de>
511     http://home.schmorp.de/
512    
513     =cut
514    
515     1
516