ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.17
Committed: Sun Aug 16 02:55:17 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
Changes since 1.16: +20 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16     connected nodes etc. are needed.
17    
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.12 our $VERSION = '0.7';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41     connect_node add_node load_func snd_to_func
42 root 1.1
43 root 1.10 NODE $NODE node_of snd kil
44 root 1.17 port_is_local
45 root 1.1 resolve_node initialise_node
46 root 1.13 known_nodes up_nodes mon_nodes node_is_known node_is_up
47 root 1.1 );
48    
49     our $DEFAULT_PORT = "4040";
50    
51     our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
52     our $NETWORK_LATENCY = 3; # activity timeout
53     our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
54    
55 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
56 root 1.1
57     This value is called with an error or warning message, when e.g. a connection
58     could not be created, authorisation failed and so on.
59    
60 root 1.16 C<$level> sould be C<0> for messages ot be logged always, C<1> for
61     unexpected messages and errors, C<2> for warnings, C<7> for messages about
62     node connectivity and services, C<8> for debugging messages and C<9> for
63     tracing messages.
64    
65 root 1.1 The default simply logs the message to STDERR.
66    
67     =cut
68    
69     our $WARN = sub {
70 root 1.16 my ($level, $msg) = @_;
71    
72 root 1.1 $msg =~ s/\n$//;
73 root 1.16
74     printf STDERR "%s <%d> %s\n",
75     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
76     $level,
77     $msg;
78 root 1.1 };
79    
80 root 1.6 sub load_func($) {
81     my $func = $_[0];
82    
83     unless (defined &$func) {
84     my $pkg = $func;
85     do {
86     $pkg =~ s/::[^:]+$//
87     or return sub { die "unable to resolve '$func'" };
88     eval "require $pkg";
89     } until defined &$func;
90     }
91    
92     \&$func
93     }
94    
95 root 1.1 sub nonce($) {
96     my $nonce;
97    
98     if (open my $fh, "</dev/urandom") {
99     sysread $fh, $nonce, $_[0];
100     } else {
101     # shit...
102     our $nonce_init;
103     unless ($nonce_init++) {
104     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
105     }
106    
107     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
108     }
109    
110     $nonce
111     }
112    
113     sub asciibits($) {
114     my $data = $_[0];
115    
116     if (eval "use Math::GMP 2.05; 1") {
117     $data = Math::GMP::get_str_gmp (
118     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
119     62
120     );
121     } else {
122     $data = MIME::Base64::encode_base64 $data, "";
123     $data =~ s/=//;
124     $data =~ s/\//s/g;
125     $data =~ s/\+/p/g;
126     }
127    
128     $data
129     }
130    
131     sub gen_uniq {
132     asciibits pack "wNa*", $$, time, nonce 2
133     }
134    
135 root 1.3 =item $AnyEvent::MP::Kernel::PUBLIC
136    
137     A boolean indicating whether this is a full/public node, which can create
138     and accept direct connections form othe rnodes.
139    
140     =item $AnyEvent::MP::Kernel::SLAVE
141    
142     A boolean indicating whether this node is a slave node, i.e. does most of it's
143     message sending/receiving through some master node.
144    
145     =item $AnyEvent::MP::Kernel::MASTER
146    
147     Defined only in slave mode, in which cas eit contains the noderef of the
148     master node.
149    
150     =cut
151    
152 root 1.1 our $PUBLIC = 0;
153     our $SLAVE = 0;
154     our $MASTER; # master noderef when $SLAVE
155    
156     our $NODE = asciibits nonce 16;
157     our $RUNIQ = $NODE; # remote uniq value
158     our $UNIQ = gen_uniq; # per-process/node unique cookie
159     our $ID = "a";
160    
161     our %NODE; # node id to transport mapping, or "undef", for local node
162     our (%PORT, %PORT_DATA); # local ports
163    
164     our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
165     our %LMON; # monitored _local_ ports
166    
167     our %LISTENER;
168    
169     our $SRCNODE; # holds the sending node during _inject
170    
171     sub NODE() {
172     $NODE
173     }
174    
175     sub node_of($) {
176     my ($noderef, undef) = split /#/, $_[0], 2;
177    
178     $noderef
179     }
180    
181 root 1.17 BEGIN {
182     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
183     ? sub () { 1 }
184     : sub () { 0 };
185     }
186 root 1.1
187     sub _inject {
188     warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
189     &{ $PORT{+shift} or return };
190     }
191    
192     sub add_node {
193     my ($noderef) = @_;
194    
195     return $NODE{$noderef}
196     if exists $NODE{$noderef};
197    
198     for (split /,/, $noderef) {
199     return $NODE{$noderef} = $NODE{$_}
200     if exists $NODE{$_};
201     }
202    
203     # new node, check validity
204     my $node;
205    
206     if ($noderef =~ /^slave\/.+$/) {
207     $node = new AnyEvent::MP::Node::Indirect $noderef;
208    
209     } else {
210     for (split /,/, $noderef) {
211     my ($host, $port) = AnyEvent::Socket::parse_hostport $_
212     or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
213    
214     $port > 0
215     or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
216    
217     AnyEvent::Socket::parse_address $host
218     or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
219     }
220    
221     $node = new AnyEvent::MP::Node::Direct $noderef;
222     }
223    
224     $NODE{$_} = $node
225     for $noderef, split /,/, $noderef;
226    
227     $node
228     }
229    
230 root 1.13 sub connect_node {
231     &add_node->connect;
232     }
233    
234 root 1.1 sub snd(@) {
235     my ($noderef, $portid) = split /#/, shift, 2;
236    
237     warn "SND $noderef <- $portid @_\n" if TRACE;#d#
238    
239     ($NODE{$noderef} || add_node $noderef)
240 root 1.2 ->{send} (["$portid", @_]);
241 root 1.1 }
242    
243 root 1.17 =item $is_local = port_is_local $port
244    
245     Returns true iff the port is a local port.
246    
247     =cut
248    
249     sub port_is_local($) {
250     my ($noderef, undef) = split /#/, $_[0], 2;
251    
252     $NODE{$noderef} == $NODE{""}
253     }
254    
255 root 1.11 =item snd_to_func $noderef, $func, @args
256    
257     Expects a noderef and a name of a function. Asynchronously tries to call
258     this function with the given arguments on that node.
259    
260     This fucntion can be used to implement C<spawn>-like interfaces.
261    
262     =cut
263    
264     sub snd_to_func {
265     my $noderef = shift;
266    
267     ($NODE{$noderef} || add_node $noderef)
268     ->send (["", @_]);
269     }
270    
271 root 1.1 sub kil(@) {
272     my ($noderef, $portid) = split /#/, shift, 2;
273    
274     length $portid
275     or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
276    
277     ($NODE{$noderef} || add_node $noderef)
278     ->kill ("$portid", @_);
279     }
280    
281 root 1.7 sub _nodename {
282     require POSIX;
283     (POSIX::uname ())[1]
284     }
285    
286 root 1.1 sub resolve_node($) {
287     my ($noderef) = @_;
288    
289     my $cv = AE::cv;
290     my @res;
291    
292     $cv->begin (sub {
293     my %seen;
294     my @refs;
295     for (sort { $a->[0] <=> $b->[0] } @res) {
296     push @refs, $_->[1] unless $seen{$_->[1]}++
297     }
298     shift->send (join ",", @refs);
299     });
300    
301     $noderef = $DEFAULT_PORT unless length $noderef;
302    
303     my $idx;
304     for my $t (split /,/, $noderef) {
305     my $pri = ++$idx;
306 root 1.7
307     $t = length $t ? _nodename . ":$t" : _nodename
308     if $t =~ /^\d*$/;
309 root 1.1
310 root 1.7 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
311     or Carp::croak "$t: unparsable transport descriptor";
312    
313     $cv->begin;
314     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
315     for (@_) {
316     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
317     push @res, [
318     $pri += 1e-5,
319     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
320     ];
321 root 1.1 }
322 root 1.7 $cv->end;
323     };
324 root 1.1 }
325    
326     $cv->end;
327    
328     $cv
329     }
330    
331     sub initialise_node(@) {
332     my ($noderef, @others) = @_;
333    
334 root 1.7 my $profile = AnyEvent::MP::Config::find_profile
335     +(defined $noderef ? $noderef : _nodename);
336 root 1.6
337     $noderef = $profile->{noderef}
338 root 1.7 if exists $profile->{noderef};
339 root 1.6
340 root 1.9 push @others, @{ $profile->{seeds} };
341 root 1.3
342 root 1.1 if ($noderef =~ /^slave\/(.*)$/) {
343     $SLAVE = AE::cv;
344     my $name = $1;
345     $name = $NODE unless length $name;
346     $noderef = AE::cv;
347     $noderef->send ("slave/$name");
348    
349     @others
350     or Carp::croak "seed nodes must be specified for slave nodes";
351    
352     } else {
353     $PUBLIC = 1;
354     $noderef = resolve_node $noderef;
355     }
356    
357     @others = map $_->recv, map +(resolve_node $_), @others;
358    
359     $NODE = $noderef->recv;
360    
361 root 1.17 $NODE{$NODE} = $NODE{""};
362    
363 root 1.1 for my $t (split /,/, $NODE) {
364     $NODE{$t} = $NODE{""};
365    
366     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
367    
368     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
369     sub {
370     my ($tp) = @_;
371    
372     # TODO: urgs
373     my $node = add_node $tp->{remote_node};
374     $node->{trial}{accept} = $tp;
375     },
376     ;
377     }
378    
379 root 1.14 for (@others) {
380     my $node = add_node $_;
381     $node->{autoconnect} = 1;
382     $node->connect;
383     }
384 root 1.1
385     if ($SLAVE) {
386     my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
387 root 1.13 my $master = $SLAVE->recv;
388     $master
389 root 1.1 or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
390    
391 root 1.13 $MASTER = $master->{noderef};
392     $master->{autoconnect} = 1;
393    
394 root 1.1 (my $via = $MASTER) =~ s/,/!/g;
395    
396     $NODE .= "\@$via";
397     $NODE{$NODE} = $NODE{""};
398    
399 root 1.2 $_->send (["", iam => $NODE])
400 root 1.1 for values %NODE;
401    
402     $SLAVE = 1;
403     }
404 root 1.6
405 root 1.13 for (@{ $profile->{services} }) {
406     if (s/::$//) {
407     eval "require $_";
408     die $@ if $@;
409     } else {
410     (load_func $_)->();
411     }
412     }
413 root 1.1 }
414    
415     #############################################################################
416 root 1.6 # node monitoring and info
417 root 1.3
418     sub _uniq_nodes {
419     my %node;
420    
421     @node{values %NODE} = values %NODE;
422    
423     values %node;
424     }
425    
426 root 1.13 =item node_is_known $noderef
427    
428     Returns true iff the given node is currently known to the system.
429    
430     =cut
431    
432     sub node_is_known($) {
433     exists $NODE{$_[0]}
434     }
435    
436     =item node_is_up $noderef
437    
438     Returns true if the given node is "up", that is, the kernel thinks it has
439     a working connection to it.
440    
441     If the node is known but not currently connected, returns C<0>. If the
442     node is not known, returns C<undef>.
443    
444     =cut
445    
446     sub node_is_up($) {
447     ($NODE{$_[0]} or return)->{transport}
448     ? 1 : 0
449     }
450    
451 root 1.3 =item known_nodes
452    
453     Returns the noderefs of all nodes connected to this node.
454    
455     =cut
456    
457     sub known_nodes {
458     map $_->{noderef}, _uniq_nodes
459     }
460    
461     =item up_nodes
462    
463     Return the noderefs of all nodes that are currently connected (excluding
464     the node itself).
465    
466     =cut
467    
468     sub up_nodes {
469     map $_->{noderef}, grep $_->{transport}, _uniq_nodes
470     }
471    
472     =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
473    
474     Registers a callback that is called each time a node goes up (connection
475     is established) or down (connection is lost).
476    
477     Node up messages can only be followed by node down messages for the same
478     node, and vice versa.
479    
480 root 1.13 The function returns an optional guard which can be used to de-register
481 root 1.3 the monitoring callback again.
482    
483     =cut
484    
485     our %MON_NODES;
486    
487     sub mon_nodes($) {
488     my ($cb) = @_;
489    
490     $MON_NODES{$cb+0} = $cb;
491    
492     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
493     }
494    
495     sub _inject_nodeevent($$;@) {
496 root 1.16 my ($node, $up, @reason) = @_;
497 root 1.3
498     for my $cb (values %MON_NODES) {
499 root 1.16 eval { $cb->($node->{noderef}, $up, @reason); 1 }
500     or $WARN->(1, $@);
501 root 1.3 }
502 root 1.16
503     $WARN->(7, "$node->{noderef} is " . ($up ? "up" : "down") . " (@reason)");
504 root 1.3 }
505    
506     #############################################################################
507 root 1.1 # self node code
508    
509     our %node_req = (
510     # internal services
511    
512     # monitoring
513     mon0 => sub { # disable monitoring
514     my $portid = shift;
515     my $node = $SRCNODE;
516     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
517     },
518     mon1 => sub { # enable monitoring
519     my $portid = shift;
520     my $node = $SRCNODE;
521     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
522     $node->send (["", kil => $portid, @_]);
523     });
524     },
525     kil => sub {
526     my $cbs = delete $SRCNODE->{lmon}{+shift}
527     or return;
528    
529     $_->(@_) for @$cbs;
530     },
531     # node changed its name (for slave nodes)
532     iam => sub {
533 root 1.8 # get rid of bogus slave/xxx name, hopefully
534     delete $NODE{$SRCNODE->{noderef}};
535    
536     # change noderef
537 root 1.1 $SRCNODE->{noderef} = $_[0];
538 root 1.8
539     # anchor
540 root 1.1 $NODE{$_[0]} = $SRCNODE;
541     },
542    
543     # public services
544    
545     # relay message to another node / generic echo
546 root 1.15 snd => \&snd,
547     snd_multi => sub {
548 root 1.1 snd @$_ for @_
549     },
550    
551 root 1.4 # informational
552     info => sub {
553     snd @_, $NODE;
554     },
555     known_nodes => sub {
556     snd @_, known_nodes;
557     },
558     up_nodes => sub {
559     snd @_, up_nodes;
560     },
561    
562 root 1.1 # random garbage
563     eval => sub {
564     my @res = eval shift;
565     snd @_, "$@", @res if @_;
566     },
567     time => sub {
568     snd @_, AE::time;
569     },
570     devnull => sub {
571     #
572     },
573 root 1.15 "" => sub {
574     # empty messages are sent by monitoring
575     },
576 root 1.1 );
577    
578     $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
579     $PORT{""} = sub {
580     my $tag = shift;
581     eval { &{ $node_req{$tag} ||= load_func $tag } };
582 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
583 root 1.1 };
584    
585     =back
586    
587     =head1 SEE ALSO
588    
589     L<AnyEvent::MP>.
590    
591     =head1 AUTHOR
592    
593     Marc Lehmann <schmorp@schmorp.de>
594     http://home.schmorp.de/
595    
596     =cut
597    
598     1
599