ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.1
Committed: Wed Aug 12 21:39:59 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Kernel - basis for AnyEvent::MP and Coro::MP
4    
5     =head1 SYNOPSIS
6    
7     # use AnyEvent::MP or Coro::MP instead
8    
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15     =head1 GLOBALS
16    
17     =over 4
18    
19     =cut
20    
21     package AnyEvent::MP::Kernel;
22    
23     use common::sense;
24     use Carp ();
25     use MIME::Base64 ();
26    
27     use AE ();
28    
29     use AnyEvent::MP::Node;
30     use AnyEvent::MP::Transport;
31    
32     use base "Exporter";
33    
34     our $VERSION = '0.4';
35     our @EXPORT = qw(
36     %NODE %PORT %PORT_DATA %REG $UNIQ $RUNIQ $ID add_node load_func
37    
38     NODE $NODE node_of snd kil _any_
39     resolve_node initialise_node
40     );
41    
42     our $DEFAULT_PORT = "4040";
43    
44     our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
45     our $NETWORK_LATENCY = 3; # activity timeout
46     our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
47    
48     =item $AnyEvent::MP::Kernel::WARN
49    
50     This value is called with an error or warning message, when e.g. a connection
51     could not be created, authorisation failed and so on.
52    
53     The default simply logs the message to STDERR.
54    
55     =cut
56    
57     our $WARN = sub {
58     my $msg = $_[0];
59     $msg =~ s/\n$//;
60     warn "$msg\n";
61     };
62    
63     sub nonce($) {
64     my $nonce;
65    
66     if (open my $fh, "</dev/urandom") {
67     sysread $fh, $nonce, $_[0];
68     } else {
69     # shit...
70     our $nonce_init;
71     unless ($nonce_init++) {
72     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
73     }
74    
75     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
76     }
77    
78     $nonce
79     }
80    
81     sub asciibits($) {
82     my $data = $_[0];
83    
84     if (eval "use Math::GMP 2.05; 1") {
85     $data = Math::GMP::get_str_gmp (
86     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
87     62
88     );
89     } else {
90     $data = MIME::Base64::encode_base64 $data, "";
91     $data =~ s/=//;
92     $data =~ s/\//s/g;
93     $data =~ s/\+/p/g;
94     }
95    
96     $data
97     }
98    
99     sub gen_uniq {
100     asciibits pack "wNa*", $$, time, nonce 2
101     }
102    
103     our $PUBLIC = 0;
104     our $SLAVE = 0;
105     our $MASTER; # master noderef when $SLAVE
106    
107     our $NODE = asciibits nonce 16;
108     our $RUNIQ = $NODE; # remote uniq value
109     our $UNIQ = gen_uniq; # per-process/node unique cookie
110     our $ID = "a";
111    
112     our %NODE; # node id to transport mapping, or "undef", for local node
113     our (%PORT, %PORT_DATA); # local ports
114    
115     our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
116     our %LMON; # monitored _local_ ports
117    
118     our %LISTENER;
119    
120     our $SRCNODE; # holds the sending node during _inject
121    
122     sub NODE() {
123     $NODE
124     }
125    
126     sub node_of($) {
127     my ($noderef, undef) = split /#/, $_[0], 2;
128    
129     $noderef
130     }
131    
132     sub _ANY_() { 1 }
133     sub _any_() { \&_ANY_ }
134    
135     sub TRACE() { 0 }
136    
137     sub _inject {
138     warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
139     &{ $PORT{+shift} or return };
140     }
141    
142     sub add_node {
143     my ($noderef) = @_;
144    
145     return $NODE{$noderef}
146     if exists $NODE{$noderef};
147    
148     for (split /,/, $noderef) {
149     return $NODE{$noderef} = $NODE{$_}
150     if exists $NODE{$_};
151     }
152    
153     # new node, check validity
154     my $node;
155    
156     if ($noderef =~ /^slave\/.+$/) {
157     $node = new AnyEvent::MP::Node::Indirect $noderef;
158    
159     } else {
160     for (split /,/, $noderef) {
161     my ($host, $port) = AnyEvent::Socket::parse_hostport $_
162     or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
163    
164     $port > 0
165     or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
166    
167     AnyEvent::Socket::parse_address $host
168     or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
169     }
170    
171     # TODO: for indirect sends, use a different class
172     $node = new AnyEvent::MP::Node::Direct $noderef;
173     }
174    
175     $NODE{$_} = $node
176     for $noderef, split /,/, $noderef;
177    
178     $node
179     }
180    
181     sub snd(@) {
182     my ($noderef, $portid) = split /#/, shift, 2;
183    
184     warn "SND $noderef <- $portid @_\n" if TRACE;#d#
185    
186     ($NODE{$noderef} || add_node $noderef)
187     ->send (["$portid", @_]);
188     }
189    
190     sub kil(@) {
191     my ($noderef, $portid) = split /#/, shift, 2;
192    
193     length $portid
194     or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
195    
196     ($NODE{$noderef} || add_node $noderef)
197     ->kill ("$portid", @_);
198     }
199    
200     sub resolve_node($) {
201     my ($noderef) = @_;
202    
203     my $cv = AE::cv;
204     my @res;
205    
206     $cv->begin (sub {
207     my %seen;
208     my @refs;
209     for (sort { $a->[0] <=> $b->[0] } @res) {
210     push @refs, $_->[1] unless $seen{$_->[1]}++
211     }
212     shift->send (join ",", @refs);
213     });
214    
215     $noderef = $DEFAULT_PORT unless length $noderef;
216    
217     my $idx;
218     for my $t (split /,/, $noderef) {
219     my $pri = ++$idx;
220    
221     if ($t =~ /^\d*$/) {
222     require POSIX;
223     my $nodename = (POSIX::uname ())[1];
224    
225     $cv->begin;
226     AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
227     for (@_) {
228     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
229     push @res, [
230     $pri += 1e-5,
231     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
232     ];
233     }
234     $cv->end;
235     };
236    
237     # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
238     #
239     # for (@ipv4) {
240     # push @res, [
241     # $pri,
242     # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
243     # ];
244     # }
245     } else {
246     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
247     or Carp::croak "$t: unparsable transport descriptor";
248    
249     $cv->begin;
250     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
251     for (@_) {
252     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
253     push @res, [
254     $pri += 1e-5,
255     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
256     ];
257     }
258     $cv->end;
259     }
260     }
261     }
262    
263     $cv->end;
264    
265     $cv
266     }
267    
268     sub initialise_node(@) {
269     my ($noderef, @others) = @_;
270    
271     if ($noderef =~ /^slave\/(.*)$/) {
272     $SLAVE = AE::cv;
273     my $name = $1;
274     $name = $NODE unless length $name;
275     $noderef = AE::cv;
276     $noderef->send ("slave/$name");
277    
278     @others
279     or Carp::croak "seed nodes must be specified for slave nodes";
280    
281     } else {
282     $PUBLIC = 1;
283     $noderef = resolve_node $noderef;
284     }
285    
286     @others = map $_->recv, map +(resolve_node $_), @others;
287    
288     $NODE = $noderef->recv;
289    
290     for my $t (split /,/, $NODE) {
291     $NODE{$t} = $NODE{""};
292    
293     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
294    
295     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
296     sub {
297     my ($tp) = @_;
298    
299     # TODO: urgs
300     my $node = add_node $tp->{remote_node};
301     $node->{trial}{accept} = $tp;
302     },
303     ;
304     }
305    
306     (add_node $_)->connect for @others;
307    
308     if ($SLAVE) {
309     my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
310     $MASTER = $SLAVE->recv;
311     defined $MASTER
312     or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
313    
314     (my $via = $MASTER) =~ s/,/!/g;
315    
316     $NODE .= "\@$via";
317     $NODE{$NODE} = $NODE{""};
318    
319     $_->send ([iam => $NODE])
320     for values %NODE;
321    
322     $SLAVE = 1;
323     }
324     }
325    
326     #############################################################################
327     # self node code
328    
329     sub load_func($) {
330     my $func = $_[0];
331    
332     unless (defined &$func) {
333     my $pkg = $func;
334     do {
335     $pkg =~ s/::[^:]+$//
336     or return sub { die "unable to resolve '$func'" };
337     eval "require $pkg";
338     } until defined &$func;
339     }
340    
341     \&$func
342     }
343    
344     our %node_req = (
345     # internal services
346    
347     # monitoring
348     mon0 => sub { # disable monitoring
349     my $portid = shift;
350     my $node = $SRCNODE;
351     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
352     },
353     mon1 => sub { # enable monitoring
354     my $portid = shift;
355     my $node = $SRCNODE;
356     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
357     $node->send (["", kil => $portid, @_]);
358     });
359     },
360     kil => sub {
361     my $cbs = delete $SRCNODE->{lmon}{+shift}
362     or return;
363    
364     $_->(@_) for @$cbs;
365     },
366     # node changed its name (for slave nodes)
367     iam => sub {
368     $SRCNODE->{noderef} = $_[0];
369     $NODE{$_[0]} = $SRCNODE;
370     },
371    
372     # public services
373    
374     # relay message to another node / generic echo
375     relay => sub {
376     &snd;
377     },
378     relay_multiple => sub {
379     snd @$_ for @_
380     },
381    
382     # random garbage
383     eval => sub {
384     my @res = eval shift;
385     snd @_, "$@", @res if @_;
386     },
387     time => sub {
388     snd @_, AE::time;
389     },
390     devnull => sub {
391     #
392     },
393     );
394    
395     $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
396     $PORT{""} = sub {
397     my $tag = shift;
398     eval { &{ $node_req{$tag} ||= load_func $tag } };
399     $WARN->("error processing node message: $@") if $@;
400     };
401    
402     =back
403    
404     =head1 SEE ALSO
405    
406     L<AnyEvent::MP>.
407    
408     =head1 AUTHOR
409    
410     Marc Lehmann <schmorp@schmorp.de>
411     http://home.schmorp.de/
412    
413     =cut
414    
415     1
416