ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP/Kernel.pm
Revision: 1.6
Committed: Thu Aug 13 03:34:24 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
Changes since 1.5: +25 -17 lines
Log Message:
services...

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16     connected nodes etc. are needed.
17    
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27     use Carp ();
28     use MIME::Base64 ();
29    
30     use AE ();
31    
32     use AnyEvent::MP::Node;
33     use AnyEvent::MP::Transport;
34    
35     use base "Exporter";
36    
37 root 1.5 our $VERSION = '0.6';
38 root 1.1 our @EXPORT = qw(
39     %NODE %PORT %PORT_DATA %REG $UNIQ $RUNIQ $ID add_node load_func
40    
41     NODE $NODE node_of snd kil _any_
42     resolve_node initialise_node
43     );
44    
45     our $DEFAULT_PORT = "4040";
46    
47     our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
48     our $NETWORK_LATENCY = 3; # activity timeout
49     our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
50    
51     =item $AnyEvent::MP::Kernel::WARN
52    
53     This value is called with an error or warning message, when e.g. a connection
54     could not be created, authorisation failed and so on.
55    
56     The default simply logs the message to STDERR.
57    
58     =cut
59    
60     our $WARN = sub {
61     my $msg = $_[0];
62     $msg =~ s/\n$//;
63     warn "$msg\n";
64     };
65    
66 root 1.6 sub load_func($) {
67     my $func = $_[0];
68    
69     unless (defined &$func) {
70     my $pkg = $func;
71     do {
72     $pkg =~ s/::[^:]+$//
73     or return sub { die "unable to resolve '$func'" };
74     eval "require $pkg";
75     } until defined &$func;
76     }
77    
78     \&$func
79     }
80    
81 root 1.1 sub nonce($) {
82     my $nonce;
83    
84     if (open my $fh, "</dev/urandom") {
85     sysread $fh, $nonce, $_[0];
86     } else {
87     # shit...
88     our $nonce_init;
89     unless ($nonce_init++) {
90     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
91     }
92    
93     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
94     }
95    
96     $nonce
97     }
98    
99     sub asciibits($) {
100     my $data = $_[0];
101    
102     if (eval "use Math::GMP 2.05; 1") {
103     $data = Math::GMP::get_str_gmp (
104     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
105     62
106     );
107     } else {
108     $data = MIME::Base64::encode_base64 $data, "";
109     $data =~ s/=//;
110     $data =~ s/\//s/g;
111     $data =~ s/\+/p/g;
112     }
113    
114     $data
115     }
116    
117     sub gen_uniq {
118     asciibits pack "wNa*", $$, time, nonce 2
119     }
120    
121 root 1.3 =item $AnyEvent::MP::Kernel::PUBLIC
122    
123     A boolean indicating whether this is a full/public node, which can create
124     and accept direct connections form othe rnodes.
125    
126     =item $AnyEvent::MP::Kernel::SLAVE
127    
128     A boolean indicating whether this node is a slave node, i.e. does most of it's
129     message sending/receiving through some master node.
130    
131     =item $AnyEvent::MP::Kernel::MASTER
132    
133     Defined only in slave mode, in which cas eit contains the noderef of the
134     master node.
135    
136     =cut
137    
138 root 1.1 our $PUBLIC = 0;
139     our $SLAVE = 0;
140     our $MASTER; # master noderef when $SLAVE
141    
142     our $NODE = asciibits nonce 16;
143     our $RUNIQ = $NODE; # remote uniq value
144     our $UNIQ = gen_uniq; # per-process/node unique cookie
145     our $ID = "a";
146    
147     our %NODE; # node id to transport mapping, or "undef", for local node
148     our (%PORT, %PORT_DATA); # local ports
149    
150     our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
151     our %LMON; # monitored _local_ ports
152    
153     our %LISTENER;
154    
155     our $SRCNODE; # holds the sending node during _inject
156    
157     sub NODE() {
158     $NODE
159     }
160    
161     sub node_of($) {
162     my ($noderef, undef) = split /#/, $_[0], 2;
163    
164     $noderef
165     }
166    
167     sub _ANY_() { 1 }
168     sub _any_() { \&_ANY_ }
169    
170     sub TRACE() { 0 }
171    
172     sub _inject {
173     warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
174     &{ $PORT{+shift} or return };
175     }
176    
177     sub add_node {
178     my ($noderef) = @_;
179    
180     return $NODE{$noderef}
181     if exists $NODE{$noderef};
182    
183     for (split /,/, $noderef) {
184     return $NODE{$noderef} = $NODE{$_}
185     if exists $NODE{$_};
186     }
187    
188     # new node, check validity
189     my $node;
190    
191     if ($noderef =~ /^slave\/.+$/) {
192     $node = new AnyEvent::MP::Node::Indirect $noderef;
193    
194     } else {
195     for (split /,/, $noderef) {
196     my ($host, $port) = AnyEvent::Socket::parse_hostport $_
197     or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
198    
199     $port > 0
200     or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
201    
202     AnyEvent::Socket::parse_address $host
203     or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
204     }
205    
206     # TODO: for indirect sends, use a different class
207     $node = new AnyEvent::MP::Node::Direct $noderef;
208     }
209    
210     $NODE{$_} = $node
211     for $noderef, split /,/, $noderef;
212    
213     $node
214     }
215    
216     sub snd(@) {
217     my ($noderef, $portid) = split /#/, shift, 2;
218    
219     warn "SND $noderef <- $portid @_\n" if TRACE;#d#
220    
221     ($NODE{$noderef} || add_node $noderef)
222 root 1.2 ->{send} (["$portid", @_]);
223 root 1.1 }
224    
225     sub kil(@) {
226     my ($noderef, $portid) = split /#/, shift, 2;
227    
228     length $portid
229     or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
230    
231     ($NODE{$noderef} || add_node $noderef)
232     ->kill ("$portid", @_);
233     }
234    
235     sub resolve_node($) {
236     my ($noderef) = @_;
237    
238     my $cv = AE::cv;
239     my @res;
240    
241     $cv->begin (sub {
242     my %seen;
243     my @refs;
244     for (sort { $a->[0] <=> $b->[0] } @res) {
245     push @refs, $_->[1] unless $seen{$_->[1]}++
246     }
247     shift->send (join ",", @refs);
248     });
249    
250     $noderef = $DEFAULT_PORT unless length $noderef;
251    
252     my $idx;
253     for my $t (split /,/, $noderef) {
254     my $pri = ++$idx;
255    
256     if ($t =~ /^\d*$/) {
257     require POSIX;
258     my $nodename = (POSIX::uname ())[1];
259    
260     $cv->begin;
261     AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
262     for (@_) {
263     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
264     push @res, [
265     $pri += 1e-5,
266     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
267     ];
268     }
269     $cv->end;
270     };
271    
272     # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
273     #
274     # for (@ipv4) {
275     # push @res, [
276     # $pri,
277     # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
278     # ];
279     # }
280     } else {
281     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
282     or Carp::croak "$t: unparsable transport descriptor";
283    
284     $cv->begin;
285     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
286     for (@_) {
287     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
288     push @res, [
289     $pri += 1e-5,
290     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
291     ];
292     }
293     $cv->end;
294     }
295     }
296     }
297    
298     $cv->end;
299    
300     $cv
301     }
302    
303     sub initialise_node(@) {
304     my ($noderef, @others) = @_;
305    
306 root 1.6 my $profile = defined $noderef ? {} : \%AnyEvent::MP::Config::CFG;
307    
308     $noderef = $profile->{noderef}
309     unless defined $noderef;
310    
311     @others = @{ $profile->{seeds} }
312 root 1.3 unless @others;
313    
314 root 1.1 if ($noderef =~ /^slave\/(.*)$/) {
315     $SLAVE = AE::cv;
316     my $name = $1;
317     $name = $NODE unless length $name;
318     $noderef = AE::cv;
319     $noderef->send ("slave/$name");
320    
321     @others
322     or Carp::croak "seed nodes must be specified for slave nodes";
323    
324     } else {
325     $PUBLIC = 1;
326     $noderef = resolve_node $noderef;
327     }
328    
329     @others = map $_->recv, map +(resolve_node $_), @others;
330    
331     $NODE = $noderef->recv;
332    
333     for my $t (split /,/, $NODE) {
334     $NODE{$t} = $NODE{""};
335    
336     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
337    
338     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
339     sub {
340     my ($tp) = @_;
341    
342     # TODO: urgs
343     my $node = add_node $tp->{remote_node};
344     $node->{trial}{accept} = $tp;
345     },
346     ;
347     }
348    
349     (add_node $_)->connect for @others;
350    
351     if ($SLAVE) {
352     my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
353     $MASTER = $SLAVE->recv;
354     defined $MASTER
355     or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
356    
357     (my $via = $MASTER) =~ s/,/!/g;
358    
359     $NODE .= "\@$via";
360     $NODE{$NODE} = $NODE{""};
361    
362 root 1.2 $_->send (["", iam => $NODE])
363 root 1.1 for values %NODE;
364    
365     $SLAVE = 1;
366     }
367 root 1.6
368     (load_func $_)->()
369     for @{ $profile->{services} };
370 root 1.1 }
371    
372     #############################################################################
373 root 1.6 # node monitoring and info
374 root 1.3
375     sub _uniq_nodes {
376     my %node;
377    
378     @node{values %NODE} = values %NODE;
379    
380     values %node;
381     }
382    
383     =item known_nodes
384    
385     Returns the noderefs of all nodes connected to this node.
386    
387     =cut
388    
389     sub known_nodes {
390     map $_->{noderef}, _uniq_nodes
391     }
392    
393     =item up_nodes
394    
395     Return the noderefs of all nodes that are currently connected (excluding
396     the node itself).
397    
398     =cut
399    
400     sub up_nodes {
401     map $_->{noderef}, grep $_->{transport}, _uniq_nodes
402     }
403    
404     =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
405    
406     Registers a callback that is called each time a node goes up (connection
407     is established) or down (connection is lost).
408    
409     Node up messages can only be followed by node down messages for the same
410     node, and vice versa.
411    
412     The fucntino returns an optional guard which can be used to de-register
413     the monitoring callback again.
414    
415     =cut
416    
417     our %MON_NODES;
418    
419     sub mon_nodes($) {
420     my ($cb) = @_;
421    
422     $MON_NODES{$cb+0} = $cb;
423    
424     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
425     }
426    
427     sub _inject_nodeevent($$;@) {
428     my ($node, @args) = @_;
429    
430     unshift @args, $node->{noderef};
431    
432     for my $cb (values %MON_NODES) {
433     eval { $cb->(@args); 1 }
434     or $WARN->($@);
435     }
436     }
437    
438     #############################################################################
439 root 1.1 # self node code
440    
441     our %node_req = (
442     # internal services
443    
444     # monitoring
445     mon0 => sub { # disable monitoring
446     my $portid = shift;
447     my $node = $SRCNODE;
448     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
449     },
450     mon1 => sub { # enable monitoring
451     my $portid = shift;
452     my $node = $SRCNODE;
453     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
454     $node->send (["", kil => $portid, @_]);
455     });
456     },
457     kil => sub {
458     my $cbs = delete $SRCNODE->{lmon}{+shift}
459     or return;
460    
461     $_->(@_) for @$cbs;
462     },
463     # node changed its name (for slave nodes)
464     iam => sub {
465     $SRCNODE->{noderef} = $_[0];
466     $NODE{$_[0]} = $SRCNODE;
467     },
468    
469     # public services
470    
471     # relay message to another node / generic echo
472     relay => sub {
473     &snd;
474     },
475     relay_multiple => sub {
476     snd @$_ for @_
477     },
478    
479 root 1.4 # informational
480     info => sub {
481     snd @_, $NODE;
482     },
483     known_nodes => sub {
484     snd @_, known_nodes;
485     },
486     up_nodes => sub {
487     snd @_, up_nodes;
488     },
489    
490 root 1.1 # random garbage
491     eval => sub {
492     my @res = eval shift;
493     snd @_, "$@", @res if @_;
494     },
495     time => sub {
496     snd @_, AE::time;
497     },
498     devnull => sub {
499     #
500     },
501     );
502    
503     $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
504     $PORT{""} = sub {
505     my $tag = shift;
506     eval { &{ $node_req{$tag} ||= load_func $tag } };
507     $WARN->("error processing node message: $@") if $@;
508     };
509    
510     =back
511    
512     =head1 SEE ALSO
513    
514     L<AnyEvent::MP>.
515    
516     =head1 AUTHOR
517    
518     Marc Lehmann <schmorp@schmorp.de>
519     http://home.schmorp.de/
520    
521     =cut
522    
523     1
524