ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.16
Committed: Sat Aug 15 15:08:07 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.15: +20 -9 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16     connected nodes etc. are needed.
17    
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.12 our $VERSION = '0.7';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41     connect_node add_node load_func snd_to_func
42 root 1.1
43 root 1.10 NODE $NODE node_of snd kil
44 root 1.1 resolve_node initialise_node
45 root 1.13 known_nodes up_nodes mon_nodes node_is_known node_is_up
46 root 1.1 );
47    
48     our $DEFAULT_PORT = "4040";
49    
50     our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
51     our $NETWORK_LATENCY = 3; # activity timeout
52     our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
53    
54 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
55 root 1.1
56     This value is called with an error or warning message, when e.g. a connection
57     could not be created, authorisation failed and so on.
58    
59 root 1.16 C<$level> sould be C<0> for messages ot be logged always, C<1> for
60     unexpected messages and errors, C<2> for warnings, C<7> for messages about
61     node connectivity and services, C<8> for debugging messages and C<9> for
62     tracing messages.
63    
64 root 1.1 The default simply logs the message to STDERR.
65    
66     =cut
67    
68     our $WARN = sub {
69 root 1.16 my ($level, $msg) = @_;
70    
71 root 1.1 $msg =~ s/\n$//;
72 root 1.16
73     printf STDERR "%s <%d> %s\n",
74     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
75     $level,
76     $msg;
77 root 1.1 };
78    
79 root 1.6 sub load_func($) {
80     my $func = $_[0];
81    
82     unless (defined &$func) {
83     my $pkg = $func;
84     do {
85     $pkg =~ s/::[^:]+$//
86     or return sub { die "unable to resolve '$func'" };
87     eval "require $pkg";
88     } until defined &$func;
89     }
90    
91     \&$func
92     }
93    
94 root 1.1 sub nonce($) {
95     my $nonce;
96    
97     if (open my $fh, "</dev/urandom") {
98     sysread $fh, $nonce, $_[0];
99     } else {
100     # shit...
101     our $nonce_init;
102     unless ($nonce_init++) {
103     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
104     }
105    
106     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
107     }
108    
109     $nonce
110     }
111    
112     sub asciibits($) {
113     my $data = $_[0];
114    
115     if (eval "use Math::GMP 2.05; 1") {
116     $data = Math::GMP::get_str_gmp (
117     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
118     62
119     );
120     } else {
121     $data = MIME::Base64::encode_base64 $data, "";
122     $data =~ s/=//;
123     $data =~ s/\//s/g;
124     $data =~ s/\+/p/g;
125     }
126    
127     $data
128     }
129    
130     sub gen_uniq {
131     asciibits pack "wNa*", $$, time, nonce 2
132     }
133    
134 root 1.3 =item $AnyEvent::MP::Kernel::PUBLIC
135    
136     A boolean indicating whether this is a full/public node, which can create
137     and accept direct connections form othe rnodes.
138    
139     =item $AnyEvent::MP::Kernel::SLAVE
140    
141     A boolean indicating whether this node is a slave node, i.e. does most of it's
142     message sending/receiving through some master node.
143    
144     =item $AnyEvent::MP::Kernel::MASTER
145    
146     Defined only in slave mode, in which cas eit contains the noderef of the
147     master node.
148    
149     =cut
150    
151 root 1.1 our $PUBLIC = 0;
152     our $SLAVE = 0;
153     our $MASTER; # master noderef when $SLAVE
154    
155     our $NODE = asciibits nonce 16;
156     our $RUNIQ = $NODE; # remote uniq value
157     our $UNIQ = gen_uniq; # per-process/node unique cookie
158     our $ID = "a";
159    
160     our %NODE; # node id to transport mapping, or "undef", for local node
161     our (%PORT, %PORT_DATA); # local ports
162    
163     our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
164     our %LMON; # monitored _local_ ports
165    
166     our %LISTENER;
167    
168     our $SRCNODE; # holds the sending node during _inject
169    
170     sub NODE() {
171     $NODE
172     }
173    
174     sub node_of($) {
175     my ($noderef, undef) = split /#/, $_[0], 2;
176    
177     $noderef
178     }
179    
180     sub TRACE() { 0 }
181    
182     sub _inject {
183     warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
184     &{ $PORT{+shift} or return };
185     }
186    
187     sub add_node {
188     my ($noderef) = @_;
189    
190     return $NODE{$noderef}
191     if exists $NODE{$noderef};
192    
193     for (split /,/, $noderef) {
194     return $NODE{$noderef} = $NODE{$_}
195     if exists $NODE{$_};
196     }
197    
198     # new node, check validity
199     my $node;
200    
201     if ($noderef =~ /^slave\/.+$/) {
202     $node = new AnyEvent::MP::Node::Indirect $noderef;
203    
204     } else {
205     for (split /,/, $noderef) {
206     my ($host, $port) = AnyEvent::Socket::parse_hostport $_
207     or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
208    
209     $port > 0
210     or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
211    
212     AnyEvent::Socket::parse_address $host
213     or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
214     }
215    
216     $node = new AnyEvent::MP::Node::Direct $noderef;
217     }
218    
219     $NODE{$_} = $node
220     for $noderef, split /,/, $noderef;
221    
222     $node
223     }
224    
225 root 1.13 sub connect_node {
226     &add_node->connect;
227     }
228    
229 root 1.1 sub snd(@) {
230     my ($noderef, $portid) = split /#/, shift, 2;
231    
232     warn "SND $noderef <- $portid @_\n" if TRACE;#d#
233    
234     ($NODE{$noderef} || add_node $noderef)
235 root 1.2 ->{send} (["$portid", @_]);
236 root 1.1 }
237    
238 root 1.11 =item snd_to_func $noderef, $func, @args
239    
240     Expects a noderef and a name of a function. Asynchronously tries to call
241     this function with the given arguments on that node.
242    
243     This fucntion can be used to implement C<spawn>-like interfaces.
244    
245     =cut
246    
247     sub snd_to_func {
248     my $noderef = shift;
249    
250     ($NODE{$noderef} || add_node $noderef)
251     ->send (["", @_]);
252     }
253    
254 root 1.1 sub kil(@) {
255     my ($noderef, $portid) = split /#/, shift, 2;
256    
257     length $portid
258     or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
259    
260     ($NODE{$noderef} || add_node $noderef)
261     ->kill ("$portid", @_);
262     }
263    
264 root 1.7 sub _nodename {
265     require POSIX;
266     (POSIX::uname ())[1]
267     }
268    
269 root 1.1 sub resolve_node($) {
270     my ($noderef) = @_;
271    
272     my $cv = AE::cv;
273     my @res;
274    
275     $cv->begin (sub {
276     my %seen;
277     my @refs;
278     for (sort { $a->[0] <=> $b->[0] } @res) {
279     push @refs, $_->[1] unless $seen{$_->[1]}++
280     }
281     shift->send (join ",", @refs);
282     });
283    
284     $noderef = $DEFAULT_PORT unless length $noderef;
285    
286     my $idx;
287     for my $t (split /,/, $noderef) {
288     my $pri = ++$idx;
289 root 1.7
290     $t = length $t ? _nodename . ":$t" : _nodename
291     if $t =~ /^\d*$/;
292 root 1.1
293 root 1.7 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
294     or Carp::croak "$t: unparsable transport descriptor";
295    
296     $cv->begin;
297     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
298     for (@_) {
299     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
300     push @res, [
301     $pri += 1e-5,
302     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
303     ];
304 root 1.1 }
305 root 1.7 $cv->end;
306     };
307 root 1.1 }
308    
309     $cv->end;
310    
311     $cv
312     }
313    
314     sub initialise_node(@) {
315     my ($noderef, @others) = @_;
316    
317 root 1.7 my $profile = AnyEvent::MP::Config::find_profile
318     +(defined $noderef ? $noderef : _nodename);
319 root 1.6
320     $noderef = $profile->{noderef}
321 root 1.7 if exists $profile->{noderef};
322 root 1.6
323 root 1.9 push @others, @{ $profile->{seeds} };
324 root 1.3
325 root 1.1 if ($noderef =~ /^slave\/(.*)$/) {
326     $SLAVE = AE::cv;
327     my $name = $1;
328     $name = $NODE unless length $name;
329     $noderef = AE::cv;
330     $noderef->send ("slave/$name");
331    
332     @others
333     or Carp::croak "seed nodes must be specified for slave nodes";
334    
335     } else {
336     $PUBLIC = 1;
337     $noderef = resolve_node $noderef;
338     }
339    
340     @others = map $_->recv, map +(resolve_node $_), @others;
341    
342     $NODE = $noderef->recv;
343    
344     for my $t (split /,/, $NODE) {
345     $NODE{$t} = $NODE{""};
346    
347     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
348    
349     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
350     sub {
351     my ($tp) = @_;
352    
353     # TODO: urgs
354     my $node = add_node $tp->{remote_node};
355     $node->{trial}{accept} = $tp;
356     },
357     ;
358     }
359    
360 root 1.14 for (@others) {
361     my $node = add_node $_;
362     $node->{autoconnect} = 1;
363     $node->connect;
364     }
365 root 1.1
366     if ($SLAVE) {
367     my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
368 root 1.13 my $master = $SLAVE->recv;
369     $master
370 root 1.1 or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
371    
372 root 1.13 $MASTER = $master->{noderef};
373     $master->{autoconnect} = 1;
374    
375 root 1.1 (my $via = $MASTER) =~ s/,/!/g;
376    
377     $NODE .= "\@$via";
378     $NODE{$NODE} = $NODE{""};
379    
380 root 1.2 $_->send (["", iam => $NODE])
381 root 1.1 for values %NODE;
382    
383     $SLAVE = 1;
384     }
385 root 1.6
386 root 1.13 for (@{ $profile->{services} }) {
387     if (s/::$//) {
388     eval "require $_";
389     die $@ if $@;
390     } else {
391     (load_func $_)->();
392     }
393     }
394 root 1.1 }
395    
396     #############################################################################
397 root 1.6 # node monitoring and info
398 root 1.3
399     sub _uniq_nodes {
400     my %node;
401    
402     @node{values %NODE} = values %NODE;
403    
404     values %node;
405     }
406    
407 root 1.13 =item node_is_known $noderef
408    
409     Returns true iff the given node is currently known to the system.
410    
411     =cut
412    
413     sub node_is_known($) {
414     exists $NODE{$_[0]}
415     }
416    
417     =item node_is_up $noderef
418    
419     Returns true if the given node is "up", that is, the kernel thinks it has
420     a working connection to it.
421    
422     If the node is known but not currently connected, returns C<0>. If the
423     node is not known, returns C<undef>.
424    
425     =cut
426    
427     sub node_is_up($) {
428     ($NODE{$_[0]} or return)->{transport}
429     ? 1 : 0
430     }
431    
432 root 1.3 =item known_nodes
433    
434     Returns the noderefs of all nodes connected to this node.
435    
436     =cut
437    
438     sub known_nodes {
439     map $_->{noderef}, _uniq_nodes
440     }
441    
442     =item up_nodes
443    
444     Return the noderefs of all nodes that are currently connected (excluding
445     the node itself).
446    
447     =cut
448    
449     sub up_nodes {
450     map $_->{noderef}, grep $_->{transport}, _uniq_nodes
451     }
452    
453     =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
454    
455     Registers a callback that is called each time a node goes up (connection
456     is established) or down (connection is lost).
457    
458     Node up messages can only be followed by node down messages for the same
459     node, and vice versa.
460    
461 root 1.13 The function returns an optional guard which can be used to de-register
462 root 1.3 the monitoring callback again.
463    
464     =cut
465    
466     our %MON_NODES;
467    
468     sub mon_nodes($) {
469     my ($cb) = @_;
470    
471     $MON_NODES{$cb+0} = $cb;
472    
473     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
474     }
475    
476     sub _inject_nodeevent($$;@) {
477 root 1.16 my ($node, $up, @reason) = @_;
478 root 1.3
479     for my $cb (values %MON_NODES) {
480 root 1.16 eval { $cb->($node->{noderef}, $up, @reason); 1 }
481     or $WARN->(1, $@);
482 root 1.3 }
483 root 1.16
484     $WARN->(7, "$node->{noderef} is " . ($up ? "up" : "down") . " (@reason)");
485 root 1.3 }
486    
487     #############################################################################
488 root 1.1 # self node code
489    
490     our %node_req = (
491     # internal services
492    
493     # monitoring
494     mon0 => sub { # disable monitoring
495     my $portid = shift;
496     my $node = $SRCNODE;
497     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
498     },
499     mon1 => sub { # enable monitoring
500     my $portid = shift;
501     my $node = $SRCNODE;
502     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
503     $node->send (["", kil => $portid, @_]);
504     });
505     },
506     kil => sub {
507     my $cbs = delete $SRCNODE->{lmon}{+shift}
508     or return;
509    
510     $_->(@_) for @$cbs;
511     },
512     # node changed its name (for slave nodes)
513     iam => sub {
514 root 1.8 # get rid of bogus slave/xxx name, hopefully
515     delete $NODE{$SRCNODE->{noderef}};
516    
517     # change noderef
518 root 1.1 $SRCNODE->{noderef} = $_[0];
519 root 1.8
520     # anchor
521 root 1.1 $NODE{$_[0]} = $SRCNODE;
522     },
523    
524     # public services
525    
526     # relay message to another node / generic echo
527 root 1.15 snd => \&snd,
528     snd_multi => sub {
529 root 1.1 snd @$_ for @_
530     },
531    
532 root 1.4 # informational
533     info => sub {
534     snd @_, $NODE;
535     },
536     known_nodes => sub {
537     snd @_, known_nodes;
538     },
539     up_nodes => sub {
540     snd @_, up_nodes;
541     },
542    
543 root 1.1 # random garbage
544     eval => sub {
545     my @res = eval shift;
546     snd @_, "$@", @res if @_;
547     },
548     time => sub {
549     snd @_, AE::time;
550     },
551     devnull => sub {
552     #
553     },
554 root 1.15 "" => sub {
555     # empty messages are sent by monitoring
556     },
557 root 1.1 );
558    
559     $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
560     $PORT{""} = sub {
561     my $tag = shift;
562     eval { &{ $node_req{$tag} ||= load_func $tag } };
563 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
564 root 1.1 };
565    
566     =back
567    
568     =head1 SEE ALSO
569    
570     L<AnyEvent::MP>.
571    
572     =head1 AUTHOR
573    
574     Marc Lehmann <schmorp@schmorp.de>
575     http://home.schmorp.de/
576    
577     =cut
578    
579     1
580