ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.19
Committed: Wed Aug 19 05:57:14 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-0_8
Changes since 1.18: +1 -1 lines
Log Message:
0.8

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16     connected nodes etc. are needed.
17    
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.19 our $VERSION = '0.8';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.18 connect_node add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.10 NODE $NODE node_of snd kil
44 root 1.17 port_is_local
45 root 1.1 resolve_node initialise_node
46 root 1.13 known_nodes up_nodes mon_nodes node_is_known node_is_up
47 root 1.1 );
48    
49     our $DEFAULT_PORT = "4040";
50    
51     our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
52     our $NETWORK_LATENCY = 3; # activity timeout
53     our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
54    
55 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
56 root 1.1
57     This value is called with an error or warning message, when e.g. a connection
58     could not be created, authorisation failed and so on.
59    
60 root 1.16 C<$level> sould be C<0> for messages ot be logged always, C<1> for
61     unexpected messages and errors, C<2> for warnings, C<7> for messages about
62     node connectivity and services, C<8> for debugging messages and C<9> for
63     tracing messages.
64    
65 root 1.1 The default simply logs the message to STDERR.
66    
67     =cut
68    
69     our $WARN = sub {
70 root 1.16 my ($level, $msg) = @_;
71    
72 root 1.1 $msg =~ s/\n$//;
73 root 1.16
74     printf STDERR "%s <%d> %s\n",
75     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
76     $level,
77     $msg;
78 root 1.1 };
79    
80 root 1.6 sub load_func($) {
81     my $func = $_[0];
82    
83     unless (defined &$func) {
84     my $pkg = $func;
85     do {
86     $pkg =~ s/::[^:]+$//
87     or return sub { die "unable to resolve '$func'" };
88     eval "require $pkg";
89     } until defined &$func;
90     }
91    
92     \&$func
93     }
94    
95 root 1.1 sub nonce($) {
96     my $nonce;
97    
98     if (open my $fh, "</dev/urandom") {
99     sysread $fh, $nonce, $_[0];
100     } else {
101     # shit...
102     our $nonce_init;
103     unless ($nonce_init++) {
104     srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
105     }
106    
107     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
108     }
109    
110     $nonce
111     }
112    
113     sub asciibits($) {
114     my $data = $_[0];
115    
116     if (eval "use Math::GMP 2.05; 1") {
117     $data = Math::GMP::get_str_gmp (
118     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
119     62
120     );
121     } else {
122     $data = MIME::Base64::encode_base64 $data, "";
123     $data =~ s/=//;
124     $data =~ s/\//s/g;
125     $data =~ s/\+/p/g;
126     }
127    
128     $data
129     }
130    
131     sub gen_uniq {
132     asciibits pack "wNa*", $$, time, nonce 2
133     }
134    
135 root 1.3 =item $AnyEvent::MP::Kernel::PUBLIC
136    
137     A boolean indicating whether this is a full/public node, which can create
138     and accept direct connections form othe rnodes.
139    
140     =item $AnyEvent::MP::Kernel::SLAVE
141    
142     A boolean indicating whether this node is a slave node, i.e. does most of it's
143     message sending/receiving through some master node.
144    
145     =item $AnyEvent::MP::Kernel::MASTER
146    
147     Defined only in slave mode, in which cas eit contains the noderef of the
148     master node.
149    
150     =cut
151    
152 root 1.1 our $PUBLIC = 0;
153     our $SLAVE = 0;
154     our $MASTER; # master noderef when $SLAVE
155    
156     our $NODE = asciibits nonce 16;
157     our $RUNIQ = $NODE; # remote uniq value
158     our $UNIQ = gen_uniq; # per-process/node unique cookie
159     our $ID = "a";
160    
161     our %NODE; # node id to transport mapping, or "undef", for local node
162     our (%PORT, %PORT_DATA); # local ports
163    
164     our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
165     our %LMON; # monitored _local_ ports
166    
167     our %LISTENER;
168    
169     our $SRCNODE; # holds the sending node during _inject
170    
171     sub NODE() {
172     $NODE
173     }
174    
175     sub node_of($) {
176     my ($noderef, undef) = split /#/, $_[0], 2;
177    
178     $noderef
179     }
180    
181 root 1.17 BEGIN {
182     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
183     ? sub () { 1 }
184     : sub () { 0 };
185     }
186 root 1.1
187     sub _inject {
188 root 1.18 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE && @_;#d#
189 root 1.1 &{ $PORT{+shift} or return };
190     }
191    
192     sub add_node {
193     my ($noderef) = @_;
194    
195     return $NODE{$noderef}
196     if exists $NODE{$noderef};
197    
198     for (split /,/, $noderef) {
199     return $NODE{$noderef} = $NODE{$_}
200     if exists $NODE{$_};
201     }
202    
203     # new node, check validity
204     my $node;
205    
206     if ($noderef =~ /^slave\/.+$/) {
207     $node = new AnyEvent::MP::Node::Indirect $noderef;
208    
209     } else {
210     for (split /,/, $noderef) {
211     my ($host, $port) = AnyEvent::Socket::parse_hostport $_
212     or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
213    
214     $port > 0
215     or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
216    
217     AnyEvent::Socket::parse_address $host
218     or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
219     }
220    
221     $node = new AnyEvent::MP::Node::Direct $noderef;
222     }
223    
224     $NODE{$_} = $node
225     for $noderef, split /,/, $noderef;
226    
227     $node
228     }
229    
230 root 1.13 sub connect_node {
231     &add_node->connect;
232     }
233    
234 root 1.1 sub snd(@) {
235     my ($noderef, $portid) = split /#/, shift, 2;
236    
237     warn "SND $noderef <- $portid @_\n" if TRACE;#d#
238    
239     ($NODE{$noderef} || add_node $noderef)
240 root 1.2 ->{send} (["$portid", @_]);
241 root 1.1 }
242    
243 root 1.17 =item $is_local = port_is_local $port
244    
245     Returns true iff the port is a local port.
246    
247     =cut
248    
249     sub port_is_local($) {
250     my ($noderef, undef) = split /#/, $_[0], 2;
251    
252     $NODE{$noderef} == $NODE{""}
253     }
254    
255 root 1.18 =item snd_to_func $node, $func, @args
256 root 1.11
257     Expects a noderef and a name of a function. Asynchronously tries to call
258     this function with the given arguments on that node.
259    
260     This fucntion can be used to implement C<spawn>-like interfaces.
261    
262     =cut
263    
264 root 1.18 sub snd_to_func($$;@) {
265 root 1.11 my $noderef = shift;
266    
267     ($NODE{$noderef} || add_node $noderef)
268     ->send (["", @_]);
269     }
270    
271 root 1.18 =item snd_on $node, @msg
272    
273     Executes C<snd> with the given C<@msg> (which must include the destination
274     port) on the given node.
275    
276     =cut
277    
278     sub snd_on($@) {
279     my $node = shift;
280     snd $node, snd => @_;
281     }
282    
283     =item eval_on $node, $string
284    
285     Evaluates the given string as Perl expression on the given node.
286    
287     =cut
288    
289     sub eval_on($@) {
290     my $node = shift;
291     snd $node, eval => @_;
292     }
293    
294 root 1.1 sub kil(@) {
295     my ($noderef, $portid) = split /#/, shift, 2;
296    
297     length $portid
298     or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
299    
300     ($NODE{$noderef} || add_node $noderef)
301     ->kill ("$portid", @_);
302     }
303    
304 root 1.7 sub _nodename {
305     require POSIX;
306     (POSIX::uname ())[1]
307     }
308    
309 root 1.1 sub resolve_node($) {
310     my ($noderef) = @_;
311    
312     my $cv = AE::cv;
313     my @res;
314    
315     $cv->begin (sub {
316     my %seen;
317     my @refs;
318     for (sort { $a->[0] <=> $b->[0] } @res) {
319     push @refs, $_->[1] unless $seen{$_->[1]}++
320     }
321     shift->send (join ",", @refs);
322     });
323    
324     $noderef = $DEFAULT_PORT unless length $noderef;
325    
326     my $idx;
327     for my $t (split /,/, $noderef) {
328     my $pri = ++$idx;
329 root 1.7
330     $t = length $t ? _nodename . ":$t" : _nodename
331     if $t =~ /^\d*$/;
332 root 1.1
333 root 1.7 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
334     or Carp::croak "$t: unparsable transport descriptor";
335    
336     $cv->begin;
337     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
338     for (@_) {
339     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
340     push @res, [
341     $pri += 1e-5,
342     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
343     ];
344 root 1.1 }
345 root 1.7 $cv->end;
346     };
347 root 1.1 }
348    
349     $cv->end;
350    
351     $cv
352     }
353    
354 root 1.18 sub _node_rename {
355     $NODE = shift;
356    
357     my $self = $NODE{""};
358     $NODE{$NODE} = delete $NODE{$self->{noderef}};
359     $self->{noderef} = $NODE;
360     }
361    
362 root 1.1 sub initialise_node(@) {
363     my ($noderef, @others) = @_;
364    
365 root 1.7 my $profile = AnyEvent::MP::Config::find_profile
366     +(defined $noderef ? $noderef : _nodename);
367 root 1.6
368     $noderef = $profile->{noderef}
369 root 1.7 if exists $profile->{noderef};
370 root 1.6
371 root 1.9 push @others, @{ $profile->{seeds} };
372 root 1.3
373 root 1.1 if ($noderef =~ /^slave\/(.*)$/) {
374     $SLAVE = AE::cv;
375     my $name = $1;
376     $name = $NODE unless length $name;
377     $noderef = AE::cv;
378     $noderef->send ("slave/$name");
379    
380     @others
381     or Carp::croak "seed nodes must be specified for slave nodes";
382    
383     } else {
384     $PUBLIC = 1;
385     $noderef = resolve_node $noderef;
386     }
387    
388     @others = map $_->recv, map +(resolve_node $_), @others;
389    
390 root 1.18 _node_rename $noderef->recv;
391 root 1.17
392 root 1.1 for my $t (split /,/, $NODE) {
393     my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
394    
395     $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
396     sub {
397     my ($tp) = @_;
398    
399     # TODO: urgs
400     my $node = add_node $tp->{remote_node};
401     $node->{trial}{accept} = $tp;
402     },
403     ;
404     }
405    
406 root 1.14 for (@others) {
407     my $node = add_node $_;
408     $node->{autoconnect} = 1;
409     $node->connect;
410     }
411 root 1.1
412     if ($SLAVE) {
413     my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
414 root 1.13 my $master = $SLAVE->recv;
415     $master
416 root 1.1 or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
417    
418 root 1.13 $MASTER = $master->{noderef};
419     $master->{autoconnect} = 1;
420    
421 root 1.1 (my $via = $MASTER) =~ s/,/!/g;
422    
423     $NODE .= "\@$via";
424 root 1.18 _node_rename $NODE;
425 root 1.1
426 root 1.2 $_->send (["", iam => $NODE])
427 root 1.1 for values %NODE;
428    
429     $SLAVE = 1;
430     }
431 root 1.6
432 root 1.13 for (@{ $profile->{services} }) {
433     if (s/::$//) {
434     eval "require $_";
435     die $@ if $@;
436     } else {
437     (load_func $_)->();
438     }
439     }
440 root 1.1 }
441    
442     #############################################################################
443 root 1.6 # node monitoring and info
444 root 1.3
445     sub _uniq_nodes {
446     my %node;
447    
448     @node{values %NODE} = values %NODE;
449    
450     values %node;
451     }
452    
453 root 1.13 =item node_is_known $noderef
454    
455     Returns true iff the given node is currently known to the system.
456    
457     =cut
458    
459     sub node_is_known($) {
460     exists $NODE{$_[0]}
461     }
462    
463     =item node_is_up $noderef
464    
465     Returns true if the given node is "up", that is, the kernel thinks it has
466     a working connection to it.
467    
468     If the node is known but not currently connected, returns C<0>. If the
469     node is not known, returns C<undef>.
470    
471     =cut
472    
473     sub node_is_up($) {
474     ($NODE{$_[0]} or return)->{transport}
475     ? 1 : 0
476     }
477    
478 root 1.3 =item known_nodes
479    
480 root 1.18 Returns the noderefs of all nodes connected to this node, including
481     itself.
482 root 1.3
483     =cut
484    
485     sub known_nodes {
486     map $_->{noderef}, _uniq_nodes
487     }
488    
489     =item up_nodes
490    
491     Return the noderefs of all nodes that are currently connected (excluding
492     the node itself).
493    
494     =cut
495    
496     sub up_nodes {
497     map $_->{noderef}, grep $_->{transport}, _uniq_nodes
498     }
499    
500     =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
501    
502     Registers a callback that is called each time a node goes up (connection
503     is established) or down (connection is lost).
504    
505     Node up messages can only be followed by node down messages for the same
506     node, and vice versa.
507    
508 root 1.13 The function returns an optional guard which can be used to de-register
509 root 1.3 the monitoring callback again.
510    
511     =cut
512    
513     our %MON_NODES;
514    
515     sub mon_nodes($) {
516     my ($cb) = @_;
517    
518     $MON_NODES{$cb+0} = $cb;
519    
520     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
521     }
522    
523     sub _inject_nodeevent($$;@) {
524 root 1.16 my ($node, $up, @reason) = @_;
525 root 1.3
526     for my $cb (values %MON_NODES) {
527 root 1.16 eval { $cb->($node->{noderef}, $up, @reason); 1 }
528     or $WARN->(1, $@);
529 root 1.3 }
530 root 1.16
531     $WARN->(7, "$node->{noderef} is " . ($up ? "up" : "down") . " (@reason)");
532 root 1.3 }
533    
534     #############################################################################
535 root 1.1 # self node code
536    
537     our %node_req = (
538     # internal services
539    
540     # monitoring
541     mon0 => sub { # disable monitoring
542     my $portid = shift;
543     my $node = $SRCNODE;
544     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
545     },
546     mon1 => sub { # enable monitoring
547     my $portid = shift;
548     my $node = $SRCNODE;
549     $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
550     $node->send (["", kil => $portid, @_]);
551     });
552     },
553     kil => sub {
554     my $cbs = delete $SRCNODE->{lmon}{+shift}
555     or return;
556    
557     $_->(@_) for @$cbs;
558     },
559     # node changed its name (for slave nodes)
560     iam => sub {
561 root 1.8 # get rid of bogus slave/xxx name, hopefully
562     delete $NODE{$SRCNODE->{noderef}};
563    
564     # change noderef
565 root 1.1 $SRCNODE->{noderef} = $_[0];
566 root 1.8
567     # anchor
568 root 1.1 $NODE{$_[0]} = $SRCNODE;
569     },
570    
571 root 1.18 # "public" services - not actually public
572 root 1.1
573     # relay message to another node / generic echo
574 root 1.15 snd => \&snd,
575     snd_multi => sub {
576 root 1.1 snd @$_ for @_
577     },
578    
579 root 1.4 # informational
580     info => sub {
581     snd @_, $NODE;
582     },
583     known_nodes => sub {
584     snd @_, known_nodes;
585     },
586     up_nodes => sub {
587     snd @_, up_nodes;
588     },
589    
590 root 1.1 # random garbage
591     eval => sub {
592     my @res = eval shift;
593     snd @_, "$@", @res if @_;
594     },
595     time => sub {
596     snd @_, AE::time;
597     },
598     devnull => sub {
599     #
600     },
601 root 1.15 "" => sub {
602     # empty messages are sent by monitoring
603     },
604 root 1.1 );
605    
606 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
607 root 1.1 $PORT{""} = sub {
608     my $tag = shift;
609     eval { &{ $node_req{$tag} ||= load_func $tag } };
610 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
611 root 1.1 };
612    
613     =back
614    
615     =head1 SEE ALSO
616    
617     L<AnyEvent::MP>.
618    
619     =head1 AUTHOR
620    
621     Marc Lehmann <schmorp@schmorp.de>
622     http://home.schmorp.de/
623    
624     =cut
625    
626     1
627