ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.43
Committed: Sat Sep 5 22:48:27 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.42: +4 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '1.0';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil port_is_local
44 configure
45 known_nodes up_nodes mon_nodes node_is_known node_is_up
46 );
47
48 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49
50 This value is called with an error or warning message, when e.g. a
51 connection could not be created, authorisation failed and so on.
52
53 It I<must not> block or send messages -queue it and use an idle watcher if
54 you need to do any of these things.
55
56 C<$level> should be C<0> for messages to be logged always, C<1> for
57 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58 node connectivity and services, C<8> for debugging messages and C<9> for
59 tracing messages.
60
61 The default simply logs the message to STDERR.
62
63 =cut
64
65 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
66
67 our $WARN = sub {
68 return if $WARNLEVEL < $_[0];
69
70 my ($level, $msg) = @_;
71
72 $msg =~ s/\n$//;
73
74 printf STDERR "%s <%d> %s\n",
75 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
76 $level,
77 $msg;
78 };
79
80 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
81
82 The maximum level at which warning messages will be printed to STDERR by
83 the default warn handler.
84
85 =cut
86
87 sub load_func($) {
88 my $func = $_[0];
89
90 unless (defined &$func) {
91 my $pkg = $func;
92 do {
93 $pkg =~ s/::[^:]+$//
94 or return sub { die "unable to resolve '$func'" };
95 eval "require $pkg";
96 } until defined &$func;
97 }
98
99 \&$func
100 }
101
102 sub nonce($) {
103 my $nonce;
104
105 if (open my $fh, "</dev/urandom") {
106 sysread $fh, $nonce, $_[0];
107 } else {
108 # shit...
109 our $nonce_init;
110 unless ($nonce_init++) {
111 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
112 }
113 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
114 }
115
116 $nonce
117 }
118
119 sub alnumbits($) {
120 my $data = $_[0];
121
122 if (eval "use Math::GMP 2.05; 1") {
123 $data = Math::GMP::get_str_gmp (
124 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
125 62
126 );
127 } else {
128 $data = MIME::Base64::encode_base64 $data, "";
129 $data =~ s/=//;
130 $data =~ s/x/x0/g;
131 $data =~ s/\//x1/g;
132 $data =~ s/\+/x2/g;
133 }
134
135 $data
136 }
137
138 sub gen_uniq {
139 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
140 }
141
142 our $CONFIG; # this node's configuration
143
144 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
145 our $UNIQ = gen_uniq; # per-process/node unique cookie
146 our $NODE = "anon/$RUNIQ";
147 our $ID = "a";
148
149 our %NODE; # node id to transport mapping, or "undef", for local node
150 our (%PORT, %PORT_DATA); # local ports
151
152 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
153 our %LMON; # monitored _local_ ports
154
155 our %LISTENER;
156 our $LISTENER; # our listeners, as arrayref
157
158 our $SRCNODE; # holds the sending node during _inject
159
160 sub NODE() {
161 $NODE
162 }
163
164 sub node_of($) {
165 my ($node, undef) = split /#/, $_[0], 2;
166
167 $node
168 }
169
170 BEGIN {
171 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
172 ? sub () { 1 }
173 : sub () { 0 };
174 }
175
176 our $DELAY_TIMER;
177 our @DELAY_QUEUE;
178
179 sub _delay_run {
180 (shift @DELAY_QUEUE or return)->() while 1;
181 }
182
183 sub delay($) {
184 push @DELAY_QUEUE, shift;
185 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
186 }
187
188 sub _inject {
189 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
190 &{ $PORT{+shift} or return };
191 }
192
193 # this function adds a node-ref, so you can send stuff to it
194 # it is basically the central routing component.
195 sub add_node {
196 my ($node) = @_;
197
198 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
199 }
200
201 sub snd(@) {
202 my ($nodeid, $portid) = split /#/, shift, 2;
203
204 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
205
206 ($NODE{$nodeid} || add_node $nodeid)
207 ->{send} (["$portid", @_]);
208 }
209
210 =item $is_local = port_is_local $port
211
212 Returns true iff the port is a local port.
213
214 =cut
215
216 sub port_is_local($) {
217 my ($nodeid, undef) = split /#/, $_[0], 2;
218
219 $NODE{$nodeid} == $NODE{""}
220 }
221
222 =item snd_to_func $node, $func, @args
223
224 Expects a node ID and a name of a function. Asynchronously tries to call
225 this function with the given arguments on that node.
226
227 This function can be used to implement C<spawn>-like interfaces.
228
229 =cut
230
231 sub snd_to_func($$;@) {
232 my $nodeid = shift;
233
234 return ($NODE{$nodeid} || add_node $nodeid)->send (["", @_])
235 if $nodeid ne $NODE;
236
237 # on $NODE, we artificially delay... (for spawn)
238 # this is very ugly - maybe we should simply delay ALL messages,
239 # to avoid deep recursion issues. but that's so... slow...
240 my @msg = ("", @_);
241 delay sub { $NODE{""}->send (\@msg) };
242 }
243
244 =item snd_on $node, @msg
245
246 Executes C<snd> with the given C<@msg> (which must include the destination
247 port) on the given node.
248
249 =cut
250
251 sub snd_on($@) {
252 my $node = shift;
253 snd $node, snd => @_;
254 }
255
256 =item eval_on $node, $string[, @reply]
257
258 Evaluates the given string as Perl expression on the given node. When
259 @reply is specified, then it is used to construct a reply message with
260 C<"$@"> and any results from the eval appended.
261
262 =cut
263
264 sub eval_on($$;@) {
265 my $node = shift;
266 snd $node, eval => @_;
267 }
268
269 sub kil(@) {
270 my ($nodeid, $portid) = split /#/, shift, 2;
271
272 length $portid
273 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
274
275 ($NODE{$nodeid} || add_node $nodeid)
276 ->kill ("$portid", @_);
277 }
278
279 sub _nodename {
280 require POSIX;
281 (POSIX::uname ())[1]
282 }
283
284 sub _resolve($) {
285 my ($nodeid) = @_;
286
287 my $cv = AE::cv;
288 my @res;
289
290 $cv->begin (sub {
291 my %seen;
292 my @refs;
293 for (sort { $a->[0] <=> $b->[0] } @res) {
294 push @refs, $_->[1] unless $seen{$_->[1]}++
295 }
296 shift->send (@refs);
297 });
298
299 my $idx;
300 for my $t (split /,/, $nodeid) {
301 my $pri = ++$idx;
302
303 $t = length $t ? _nodename . ":$t" : _nodename
304 if $t =~ /^\d*$/;
305
306 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
307 or Carp::croak "$t: unparsable transport descriptor";
308
309 $port = "0" if $port eq "*";
310
311 if ($host eq "*") {
312 $cv->begin;
313 # use fork_call, as Net::Interface is big, and we need it rarely.
314 require AnyEvent::Util;
315 AnyEvent::Util::fork_call (
316 sub {
317 my @addr;
318
319 require Net::Interface;
320
321 for my $if (Net::Interface->interfaces) {
322 # we statically lower-prioritise ipv6 here, TODO :()
323 for my $_ ($if->address (Net::Interface::AF_INET ())) {
324 next if /^\x7f/; # skip localhost etc.
325 push @addr, $_;
326 }
327 for ($if->address (Net::Interface::AF_INET6 ())) {
328 #next if $if->scope ($_) <= 2;
329 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
330 push @addr, $_;
331 }
332
333 }
334 @addr
335 }, sub {
336 for my $ip (@_) {
337 push @res, [
338 $pri += 1e-5,
339 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
340 ];
341 }
342 $cv->end;
343 }
344 );
345 } else {
346 $cv->begin;
347 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
348 for (@_) {
349 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
350 push @res, [
351 $pri += 1e-5,
352 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
353 ];
354 }
355 $cv->end;
356 };
357 }
358 }
359
360 $cv->end;
361
362 $cv
363 }
364
365 sub configure(@) {
366 unshift @_, "profile" if @_ & 1;
367 my (%kv) = @_;
368
369 my $profile = delete $kv{profile};
370
371 $profile = _nodename
372 unless defined $profile;
373
374 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
375
376 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
377 $NODE = $node
378 unless $node eq "anon/";
379
380 $NODE{$NODE} = $NODE{""};
381 $NODE{$NODE}{id} = $NODE;
382
383 my $seeds = $CONFIG->{seeds};
384 my $binds = $CONFIG->{binds};
385
386 $binds ||= ["*"];
387
388 $WARN->(8, "node $NODE starting up.");
389
390 $LISTENER = [];
391 %LISTENER = ();
392
393 for (map _resolve $_, @$binds) {
394 for my $bind ($_->recv) {
395 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
396 or Carp::croak "$bind: unparsable local bind address";
397
398 my $listener = AnyEvent::MP::Transport::mp_server
399 $host,
400 $port,
401 prepare => sub {
402 my (undef, $host, $port) = @_;
403 $bind = AnyEvent::Socket::format_hostport $host, $port;
404 },
405 ;
406 $LISTENER{$bind} = $listener;
407 push @$LISTENER, $bind;
408 }
409 }
410
411 $WARN->(8, "node listens on [@$LISTENER].");
412
413 # the global service is mandatory currently
414 require AnyEvent::MP::Global;
415
416 # connect to all seednodes
417 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
418
419 for (@{ $CONFIG->{services} }) {
420 if (ref) {
421 my ($func, @args) = @$_;
422 (load_func $func)->(@args);
423 } elsif (s/::$//) {
424 eval "require $_";
425 die $@ if $@;
426 } else {
427 (load_func $_)->();
428 }
429 }
430 }
431
432 #############################################################################
433 # node monitoring and info
434
435 =item node_is_known $nodeid
436
437 Returns true iff the given node is currently known to the system.
438
439 =cut
440
441 sub node_is_known($) {
442 exists $NODE{$_[0]}
443 }
444
445 =item node_is_up $nodeid
446
447 Returns true if the given node is "up", that is, the kernel thinks it has
448 a working connection to it.
449
450 If the node is known but not currently connected, returns C<0>. If the
451 node is not known, returns C<undef>.
452
453 =cut
454
455 sub node_is_up($) {
456 ($NODE{$_[0]} or return)->{transport}
457 ? 1 : 0
458 }
459
460 =item known_nodes
461
462 Returns the node IDs of all nodes currently known to this node, including
463 itself and nodes not currently connected.
464
465 =cut
466
467 sub known_nodes {
468 map $_->{id}, values %NODE
469 }
470
471 =item up_nodes
472
473 Return the node IDs of all nodes that are currently connected (excluding
474 the node itself).
475
476 =cut
477
478 sub up_nodes {
479 map $_->{id}, grep $_->{transport}, values %NODE
480 }
481
482 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
483
484 Registers a callback that is called each time a node goes up (a connection
485 is established) or down (the connection is lost).
486
487 Node up messages can only be followed by node down messages for the same
488 node, and vice versa.
489
490 Note that monitoring a node is usually better done by monitoring it's node
491 port. This function is mainly of interest to modules that are concerned
492 about the network topology and low-level connection handling.
493
494 Callbacks I<must not> block and I<should not> send any messages.
495
496 The function returns an optional guard which can be used to unregister
497 the monitoring callback again.
498
499 =cut
500
501 our %MON_NODES;
502
503 sub mon_nodes($) {
504 my ($cb) = @_;
505
506 $MON_NODES{$cb+0} = $cb;
507
508 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
509 }
510
511 sub _inject_nodeevent($$;@) {
512 my ($node, $up, @reason) = @_;
513
514 for my $cb (values %MON_NODES) {
515 eval { $cb->($node->{id}, $up, @reason); 1 }
516 or $WARN->(1, $@);
517 }
518
519 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
520 }
521
522 #############################################################################
523 # self node code
524
525 our %node_req = (
526 # internal services
527
528 # monitoring
529 mon0 => sub { # stop monitoring a port
530 my $portid = shift;
531 my $node = $SRCNODE;
532 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
533 },
534 mon1 => sub { # start monitoring a port
535 my $portid = shift;
536 my $node = $SRCNODE;
537 Scalar::Util::weaken $node; #TODO# ugly
538 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
539 $node->send (["", kil => $portid, @_])
540 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
541 });
542 },
543 kil => sub {
544 my $cbs = delete $SRCNODE->{lmon}{+shift}
545 or return;
546
547 $_->(@_) for @$cbs;
548 },
549
550 # "public" services - not actually public
551
552 # relay message to another node / generic echo
553 snd => \&snd,
554 snd_multiple => sub {
555 snd @$_ for @_
556 },
557
558 # informational
559 info => sub {
560 snd @_, $NODE;
561 },
562 known_nodes => sub {
563 snd @_, known_nodes;
564 },
565 up_nodes => sub {
566 snd @_, up_nodes;
567 },
568
569 # random utilities
570 eval => sub {
571 my @res = eval shift;
572 snd @_, "$@", @res if @_;
573 },
574 time => sub {
575 snd @_, AE::time;
576 },
577 devnull => sub {
578 #
579 },
580 "" => sub {
581 # empty messages are keepalives or similar devnull-applications
582 },
583 );
584
585 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
586 $PORT{""} = sub {
587 my $tag = shift;
588 eval { &{ $node_req{$tag} ||= load_func $tag } };
589 $WARN->(2, "error processing node message: $@") if $@;
590 };
591
592 =back
593
594 =head1 SEE ALSO
595
596 L<AnyEvent::MP>.
597
598 =head1 AUTHOR
599
600 Marc Lehmann <schmorp@schmorp.de>
601 http://home.schmorp.de/
602
603 =cut
604
605 1
606