ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.33
Committed: Sun Aug 30 19:49:47 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.32: +61 -13 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.3 AnyEvent::MP::Kernel - the actual message passing kernel
4 root 1.1
5     =head1 SYNOPSIS
6    
7 root 1.3 use AnyEvent::MP::Kernel;
8 root 1.1
9     =head1 DESCRIPTION
10    
11     This module provides most of the basic functionality of AnyEvent::MP,
12     exposed through higher level interfaces such as L<AnyEvent::MP> and
13     L<Coro::MP>.
14    
15 root 1.3 This module is mainly of interest when knowledge about connectivity,
16 root 1.27 connected nodes etc. is sought.
17 root 1.3
18     =head1 GLOBALS AND FUNCTIONS
19 root 1.1
20     =over 4
21    
22     =cut
23    
24     package AnyEvent::MP::Kernel;
25    
26     use common::sense;
27 root 1.16 use POSIX ();
28 root 1.1 use Carp ();
29     use MIME::Base64 ();
30    
31     use AE ();
32    
33     use AnyEvent::MP::Node;
34     use AnyEvent::MP::Transport;
35    
36     use base "Exporter";
37    
38 root 1.28 our $VERSION = '0.9';
39 root 1.1 our @EXPORT = qw(
40 root 1.13 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 root 1.21 add_node load_func snd_to_func snd_on eval_on
42 root 1.1
43 root 1.10 NODE $NODE node_of snd kil
44 root 1.17 port_is_local
45 root 1.1 resolve_node initialise_node
46 root 1.13 known_nodes up_nodes mon_nodes node_is_known node_is_up
47 root 1.1 );
48    
49     our $DEFAULT_PORT = "4040";
50    
51     our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
52     our $NETWORK_LATENCY = 3; # activity timeout
53     our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
54    
55 root 1.16 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
56 root 1.1
57 root 1.27 This value is called with an error or warning message, when e.g. a
58     connection could not be created, authorisation failed and so on.
59    
60     It I<must not> block or send messages -queue it and use an idle watcher if
61     you need to do any of these things.
62 root 1.1
63 root 1.16 C<$level> sould be C<0> for messages ot be logged always, C<1> for
64     unexpected messages and errors, C<2> for warnings, C<7> for messages about
65     node connectivity and services, C<8> for debugging messages and C<9> for
66     tracing messages.
67    
68 root 1.1 The default simply logs the message to STDERR.
69    
70     =cut
71    
72 root 1.29 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
73    
74 root 1.1 our $WARN = sub {
75 root 1.29 return if $WARNLEVEL < $_[0];
76    
77 root 1.16 my ($level, $msg) = @_;
78    
79 root 1.1 $msg =~ s/\n$//;
80 root 1.16
81     printf STDERR "%s <%d> %s\n",
82     (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
83     $level,
84     $msg;
85 root 1.1 };
86    
87 root 1.29 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
88    
89     The maximum level at which warning messages will be printed to STDERR by
90     the default warn handler.
91    
92     =cut
93    
94 root 1.6 sub load_func($) {
95     my $func = $_[0];
96    
97     unless (defined &$func) {
98     my $pkg = $func;
99     do {
100     $pkg =~ s/::[^:]+$//
101     or return sub { die "unable to resolve '$func'" };
102     eval "require $pkg";
103     } until defined &$func;
104     }
105    
106     \&$func
107     }
108    
109 root 1.1 sub nonce($) {
110     my $nonce;
111    
112     if (open my $fh, "</dev/urandom") {
113     sysread $fh, $nonce, $_[0];
114     } else {
115     # shit...
116     our $nonce_init;
117 root 1.31 unless ($nonce_init++) {
118 root 1.1 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
119     }
120     $nonce = join "", map +(chr rand 256), 1 .. $_[0]
121     }
122    
123     $nonce
124     }
125    
126 root 1.21 sub alnumbits($) {
127 root 1.1 my $data = $_[0];
128    
129     if (eval "use Math::GMP 2.05; 1") {
130     $data = Math::GMP::get_str_gmp (
131     (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
132     62
133     );
134     } else {
135     $data = MIME::Base64::encode_base64 $data, "";
136     $data =~ s/=//;
137 root 1.31 $data =~ s/x/x0/g;
138     $data =~ s/\//x1/g;
139     $data =~ s/\+/x2/g;
140 root 1.1 }
141    
142     $data
143     }
144    
145     sub gen_uniq {
146 root 1.21 alnumbits pack "wNa*", $$, time, nonce 2
147 root 1.1 }
148    
149 root 1.20 our $CONFIG; # this node's configuration
150 root 1.21
151     our $RUNIQ = alnumbits nonce 16;; # remote uniq value
152 root 1.1 our $UNIQ = gen_uniq; # per-process/node unique cookie
153 root 1.21 our $NODE = "anon/$RUNIQ";
154 root 1.1 our $ID = "a";
155    
156     our %NODE; # node id to transport mapping, or "undef", for local node
157     our (%PORT, %PORT_DATA); # local ports
158    
159 root 1.21 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
160 root 1.1 our %LMON; # monitored _local_ ports
161    
162     our %LISTENER;
163 root 1.21 our $LISTENER; # our listeners, as arrayref
164 root 1.1
165     our $SRCNODE; # holds the sending node during _inject
166    
167     sub NODE() {
168     $NODE
169     }
170    
171     sub node_of($) {
172 root 1.21 my ($node, undef) = split /#/, $_[0], 2;
173 root 1.1
174 root 1.21 $node
175 root 1.1 }
176    
177 root 1.17 BEGIN {
178     *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
179     ? sub () { 1 }
180     : sub () { 0 };
181     }
182 root 1.1
183     sub _inject {
184 root 1.25 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
185 root 1.1 &{ $PORT{+shift} or return };
186     }
187    
188 root 1.20 # this function adds a node-ref, so you can send stuff to it
189     # it is basically the central routing component.
190 root 1.1 sub add_node {
191 root 1.21 my ($node) = @_;
192 root 1.1
193 root 1.21 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
194 root 1.13 }
195    
196 root 1.1 sub snd(@) {
197 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
198 root 1.1
199 root 1.25 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
200 root 1.1
201 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
202 root 1.2 ->{send} (["$portid", @_]);
203 root 1.1 }
204    
205 root 1.17 =item $is_local = port_is_local $port
206    
207     Returns true iff the port is a local port.
208    
209     =cut
210    
211     sub port_is_local($) {
212 root 1.21 my ($nodeid, undef) = split /#/, $_[0], 2;
213 root 1.17
214 root 1.21 $NODE{$nodeid} == $NODE{""}
215 root 1.17 }
216    
217 root 1.18 =item snd_to_func $node, $func, @args
218 root 1.11
219 root 1.21 Expects a node ID and a name of a function. Asynchronously tries to call
220 root 1.11 this function with the given arguments on that node.
221    
222 root 1.20 This function can be used to implement C<spawn>-like interfaces.
223 root 1.11
224     =cut
225    
226 root 1.18 sub snd_to_func($$;@) {
227 root 1.21 my $nodeid = shift;
228 root 1.11
229 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
230 root 1.11 ->send (["", @_]);
231     }
232    
233 root 1.18 =item snd_on $node, @msg
234    
235     Executes C<snd> with the given C<@msg> (which must include the destination
236     port) on the given node.
237    
238     =cut
239    
240     sub snd_on($@) {
241     my $node = shift;
242     snd $node, snd => @_;
243     }
244    
245 root 1.29 =item eval_on $node, $string[, @reply]
246 root 1.18
247 root 1.29 Evaluates the given string as Perl expression on the given node. When
248     @reply is specified, then it is used to construct a reply message with
249     C<"$@"> and any results from the eval appended.
250 root 1.18
251     =cut
252    
253 root 1.29 sub eval_on($$;@) {
254 root 1.18 my $node = shift;
255     snd $node, eval => @_;
256     }
257    
258 root 1.1 sub kil(@) {
259 root 1.21 my ($nodeid, $portid) = split /#/, shift, 2;
260 root 1.1
261     length $portid
262 root 1.21 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
263 root 1.1
264 root 1.21 ($NODE{$nodeid} || add_node $nodeid)
265 root 1.1 ->kill ("$portid", @_);
266     }
267    
268 root 1.7 sub _nodename {
269     require POSIX;
270     (POSIX::uname ())[1]
271     }
272    
273 root 1.21 sub _resolve($) {
274     my ($nodeid) = @_;
275 root 1.1
276     my $cv = AE::cv;
277     my @res;
278    
279     $cv->begin (sub {
280     my %seen;
281     my @refs;
282     for (sort { $a->[0] <=> $b->[0] } @res) {
283     push @refs, $_->[1] unless $seen{$_->[1]}++
284     }
285 root 1.21 shift->send (@refs);
286 root 1.1 });
287    
288 root 1.21 $nodeid = $DEFAULT_PORT unless length $nodeid;
289 root 1.1
290     my $idx;
291 root 1.21 for my $t (split /,/, $nodeid) {
292 root 1.1 my $pri = ++$idx;
293 root 1.7
294     $t = length $t ? _nodename . ":$t" : _nodename
295     if $t =~ /^\d*$/;
296 root 1.1
297 root 1.7 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
298     or Carp::croak "$t: unparsable transport descriptor";
299    
300 root 1.33 $port = "0" if $port eq "*";
301    
302     if ($host eq "*") {
303     $cv->begin;
304     # use fork_call, as Net::Interface is big, and we need it rarely.
305     require AnyEvent::Util;
306     AnyEvent::Util::fork_call (
307     sub {
308     my @addr;
309    
310     require Net::Interface;
311    
312     for my $if (Net::Interface->interfaces) {
313     # we statically lower-prioritise ipv6 here, TODO :()
314     for my $_ ($if->address (Net::Interface::AF_INET ())) {
315     next if /^\x7f/; # skip localhost etc.
316     push @addr, $_;
317     }
318     for ($if->address (Net::Interface::AF_INET6 ())) {
319     #next if $if->scope ($_) <= 2;
320     next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
321     push @addr, $_;
322     }
323    
324     }
325     @addr
326     }, sub {
327     for my $ip (@_) {
328     push @res, [
329     $pri += 1e-5,
330     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
331     ];
332     }
333     $cv->end;
334     }
335     );
336     } else {
337     $cv->begin;
338     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
339     for (@_) {
340     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
341     push @res, [
342     $pri += 1e-5,
343     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
344     ];
345     }
346     $cv->end;
347     };
348     }
349 root 1.1 }
350    
351     $cv->end;
352    
353     $cv
354     }
355    
356 root 1.22 sub initialise_node(;$%) {
357 root 1.32 my ($profile, %kv) = @_;
358 root 1.1
359 root 1.21 $profile = _nodename
360     unless defined $profile;
361 root 1.6
362 root 1.32 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
363 root 1.24
364     my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
365     $NODE = $node
366     unless $node eq "anon/";
367 root 1.6
368 root 1.21 $NODE{$NODE} = $NODE{""};
369     $NODE{$NODE}{id} = $NODE;
370 root 1.20
371 root 1.21 my $seeds = $CONFIG->{seeds};
372     my $binds = $CONFIG->{binds};
373 root 1.3
374 root 1.33 $binds ||= ["*"];
375 root 1.1
376 root 1.21 $WARN->(8, "node $NODE starting up.");
377 root 1.1
378 root 1.23 $LISTENER = [];
379     %LISTENER = ();
380    
381 root 1.21 for (map _resolve $_, @$binds) {
382     for my $bind ($_->recv) {
383     my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
384     or Carp::croak "$bind: unparsable local bind address";
385 root 1.20
386 root 1.33 $port =~ s/^.*=//; # remove bogus aemp=
387    
388     my $listener = AnyEvent::MP::Transport::mp_server
389     $host,
390     $port,
391     prepare => sub {
392     my (undef, $host, $port) = @_;
393     $bind = AnyEvent::Socket::format_hostport $host, $port;
394     },
395     ;
396     $LISTENER{$bind} = $listener;
397 root 1.21 push @$LISTENER, $bind;
398     }
399 root 1.1 }
400    
401 root 1.21 # the global service is mandatory currently
402     require AnyEvent::MP::Global;
403 root 1.1
404 root 1.21 # connect to all seednodes
405     AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
406 root 1.1
407 root 1.20 for (@{ $CONFIG->{services} }) {
408 root 1.13 if (s/::$//) {
409     eval "require $_";
410     die $@ if $@;
411     } else {
412     (load_func $_)->();
413     }
414     }
415 root 1.1 }
416    
417     #############################################################################
418 root 1.6 # node monitoring and info
419 root 1.3
420 root 1.21 =item node_is_known $nodeid
421 root 1.13
422     Returns true iff the given node is currently known to the system.
423    
424     =cut
425    
426     sub node_is_known($) {
427     exists $NODE{$_[0]}
428     }
429    
430 root 1.21 =item node_is_up $nodeid
431 root 1.13
432     Returns true if the given node is "up", that is, the kernel thinks it has
433     a working connection to it.
434    
435     If the node is known but not currently connected, returns C<0>. If the
436     node is not known, returns C<undef>.
437    
438     =cut
439    
440     sub node_is_up($) {
441     ($NODE{$_[0]} or return)->{transport}
442     ? 1 : 0
443     }
444    
445 root 1.3 =item known_nodes
446    
447 root 1.26 Returns the node IDs of all nodes currently known to this node, including
448     itself and nodes not currently connected.
449 root 1.3
450     =cut
451    
452     sub known_nodes {
453 root 1.26 map $_->{id}, values %NODE
454 root 1.3 }
455    
456     =item up_nodes
457    
458 root 1.26 Return the node IDs of all nodes that are currently connected (excluding
459     the node itself).
460 root 1.3
461     =cut
462    
463     sub up_nodes {
464 root 1.26 map $_->{id}, grep $_->{transport}, values %NODE
465 root 1.3 }
466    
467 root 1.21 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
468 root 1.3
469 root 1.27 Registers a callback that is called each time a node goes up (a connection
470     is established) or down (the connection is lost).
471 root 1.3
472     Node up messages can only be followed by node down messages for the same
473     node, and vice versa.
474    
475 root 1.27 Note that monitoring a node is usually better done by monitoring it's node
476     port. This function is mainly of interest to modules that are concerned
477     about the network topology and low-level connection handling.
478    
479     Callbacks I<must not> block and I<should not> send any messages.
480    
481     The function returns an optional guard which can be used to unregister
482 root 1.3 the monitoring callback again.
483    
484     =cut
485    
486     our %MON_NODES;
487    
488     sub mon_nodes($) {
489     my ($cb) = @_;
490    
491     $MON_NODES{$cb+0} = $cb;
492    
493     wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
494     }
495    
496     sub _inject_nodeevent($$;@) {
497 root 1.16 my ($node, $up, @reason) = @_;
498 root 1.3
499     for my $cb (values %MON_NODES) {
500 root 1.21 eval { $cb->($node->{id}, $up, @reason); 1 }
501 root 1.16 or $WARN->(1, $@);
502 root 1.3 }
503 root 1.16
504 root 1.21 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
505 root 1.3 }
506    
507     #############################################################################
508 root 1.1 # self node code
509    
510     our %node_req = (
511     # internal services
512    
513     # monitoring
514 root 1.27 mon0 => sub { # stop monitoring a port
515 root 1.1 my $portid = shift;
516     my $node = $SRCNODE;
517     $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
518     },
519 root 1.27 mon1 => sub { # start monitoring a port
520 root 1.1 my $portid = shift;
521     my $node = $SRCNODE;
522 root 1.27 Scalar::Util::weaken $node; #TODO# ugly
523 root 1.1 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
524 root 1.27 $node->send (["", kil => $portid, @_])
525     if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disocnnect
526 root 1.1 });
527     },
528     kil => sub {
529     my $cbs = delete $SRCNODE->{lmon}{+shift}
530     or return;
531    
532     $_->(@_) for @$cbs;
533     },
534    
535 root 1.18 # "public" services - not actually public
536 root 1.1
537     # relay message to another node / generic echo
538 root 1.15 snd => \&snd,
539 root 1.27 snd_multiple => sub {
540 root 1.1 snd @$_ for @_
541     },
542    
543 root 1.4 # informational
544     info => sub {
545     snd @_, $NODE;
546     },
547     known_nodes => sub {
548     snd @_, known_nodes;
549     },
550     up_nodes => sub {
551     snd @_, up_nodes;
552     },
553    
554 root 1.30 # random utilities
555 root 1.1 eval => sub {
556     my @res = eval shift;
557     snd @_, "$@", @res if @_;
558     },
559     time => sub {
560     snd @_, AE::time;
561     },
562     devnull => sub {
563     #
564     },
565 root 1.15 "" => sub {
566 root 1.27 # empty messages are keepalives or similar devnull-applications
567 root 1.15 },
568 root 1.1 );
569    
570 root 1.18 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
571 root 1.1 $PORT{""} = sub {
572     my $tag = shift;
573     eval { &{ $node_req{$tag} ||= load_func $tag } };
574 root 1.16 $WARN->(2, "error processing node message: $@") if $@;
575 root 1.1 };
576    
577     =back
578    
579     =head1 SEE ALSO
580    
581     L<AnyEvent::MP>.
582    
583     =head1 AUTHOR
584    
585     Marc Lehmann <schmorp@schmorp.de>
586     http://home.schmorp.de/
587    
588     =cut
589    
590     1
591