ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP/Kernel.pm
Revision: 1.44
Committed: Sun Sep 6 00:13:21 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.43: +9 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '1.0';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil port_is_local
44 configure
45 known_nodes up_nodes mon_nodes node_is_known node_is_up
46 );
47
48 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49
50 This value is called with an error or warning message, when e.g. a
51 connection could not be created, authorisation failed and so on.
52
53 It I<must not> block or send messages -queue it and use an idle watcher if
54 you need to do any of these things.
55
56 C<$level> should be C<0> for messages to be logged always, C<1> for
57 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58 node connectivity and services, C<8> for debugging messages and C<9> for
59 tracing messages.
60
61 The default simply logs the message to STDERR.
62
63 =item @AnyEvent::MP::Kernel::WARN
64
65 All code references in this array are called for every log message, from
66 the default C<$WARN> handler. This is an easy way to tie into the log
67 messages without disturbing others.
68
69 =cut
70
71 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
72 our @WARN;
73 our $WARN = sub {
74 &$_ for @WARN;
75
76 return if $WARNLEVEL < $_[0];
77
78 my ($level, $msg) = @_;
79
80 $msg =~ s/\n$//;
81
82 printf STDERR "%s <%d> %s\n",
83 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
84 $level,
85 $msg;
86 };
87
88 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
89
90 The maximum level at which warning messages will be printed to STDERR by
91 the default warn handler.
92
93 =cut
94
95 sub load_func($) {
96 my $func = $_[0];
97
98 unless (defined &$func) {
99 my $pkg = $func;
100 do {
101 $pkg =~ s/::[^:]+$//
102 or return sub { die "unable to resolve '$func'" };
103 eval "require $pkg";
104 } until defined &$func;
105 }
106
107 \&$func
108 }
109
110 sub nonce($) {
111 my $nonce;
112
113 if (open my $fh, "</dev/urandom") {
114 sysread $fh, $nonce, $_[0];
115 } else {
116 # shit...
117 our $nonce_init;
118 unless ($nonce_init++) {
119 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
120 }
121 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
122 }
123
124 $nonce
125 }
126
127 sub alnumbits($) {
128 my $data = $_[0];
129
130 if (eval "use Math::GMP 2.05; 1") {
131 $data = Math::GMP::get_str_gmp (
132 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
133 62
134 );
135 } else {
136 $data = MIME::Base64::encode_base64 $data, "";
137 $data =~ s/=//;
138 $data =~ s/x/x0/g;
139 $data =~ s/\//x1/g;
140 $data =~ s/\+/x2/g;
141 }
142
143 $data
144 }
145
146 sub gen_uniq {
147 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
148 }
149
150 our $CONFIG; # this node's configuration
151
152 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
153 our $UNIQ = gen_uniq; # per-process/node unique cookie
154 our $NODE = "anon/$RUNIQ";
155 our $ID = "a";
156
157 our %NODE; # node id to transport mapping, or "undef", for local node
158 our (%PORT, %PORT_DATA); # local ports
159
160 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
161 our %LMON; # monitored _local_ ports
162
163 our %LISTENER;
164 our $LISTENER; # our listeners, as arrayref
165
166 our $SRCNODE; # holds the sending node during _inject
167
168 sub NODE() {
169 $NODE
170 }
171
172 sub node_of($) {
173 my ($node, undef) = split /#/, $_[0], 2;
174
175 $node
176 }
177
178 BEGIN {
179 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
180 ? sub () { 1 }
181 : sub () { 0 };
182 }
183
184 our $DELAY_TIMER;
185 our @DELAY_QUEUE;
186
187 sub _delay_run {
188 (shift @DELAY_QUEUE or return)->() while 1;
189 }
190
191 sub delay($) {
192 push @DELAY_QUEUE, shift;
193 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
194 }
195
196 sub _inject {
197 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
198 &{ $PORT{+shift} or return };
199 }
200
201 # this function adds a node-ref, so you can send stuff to it
202 # it is basically the central routing component.
203 sub add_node {
204 my ($node) = @_;
205
206 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
207 }
208
209 sub snd(@) {
210 my ($nodeid, $portid) = split /#/, shift, 2;
211
212 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
213
214 ($NODE{$nodeid} || add_node $nodeid)
215 ->{send} (["$portid", @_]);
216 }
217
218 =item $is_local = port_is_local $port
219
220 Returns true iff the port is a local port.
221
222 =cut
223
224 sub port_is_local($) {
225 my ($nodeid, undef) = split /#/, $_[0], 2;
226
227 $NODE{$nodeid} == $NODE{""}
228 }
229
230 =item snd_to_func $node, $func, @args
231
232 Expects a node ID and a name of a function. Asynchronously tries to call
233 this function with the given arguments on that node.
234
235 This function can be used to implement C<spawn>-like interfaces.
236
237 =cut
238
239 sub snd_to_func($$;@) {
240 my $nodeid = shift;
241
242 return ($NODE{$nodeid} || add_node $nodeid)->send (["", @_])
243 if $nodeid ne $NODE;
244
245 # on $NODE, we artificially delay... (for spawn)
246 # this is very ugly - maybe we should simply delay ALL messages,
247 # to avoid deep recursion issues. but that's so... slow...
248 my @msg = ("", @_);
249 delay sub { $NODE{""}->send (\@msg) };
250 }
251
252 =item snd_on $node, @msg
253
254 Executes C<snd> with the given C<@msg> (which must include the destination
255 port) on the given node.
256
257 =cut
258
259 sub snd_on($@) {
260 my $node = shift;
261 snd $node, snd => @_;
262 }
263
264 =item eval_on $node, $string[, @reply]
265
266 Evaluates the given string as Perl expression on the given node. When
267 @reply is specified, then it is used to construct a reply message with
268 C<"$@"> and any results from the eval appended.
269
270 =cut
271
272 sub eval_on($$;@) {
273 my $node = shift;
274 snd $node, eval => @_;
275 }
276
277 sub kil(@) {
278 my ($nodeid, $portid) = split /#/, shift, 2;
279
280 length $portid
281 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
282
283 ($NODE{$nodeid} || add_node $nodeid)
284 ->kill ("$portid", @_);
285 }
286
287 sub _nodename {
288 require POSIX;
289 (POSIX::uname ())[1]
290 }
291
292 sub _resolve($) {
293 my ($nodeid) = @_;
294
295 my $cv = AE::cv;
296 my @res;
297
298 $cv->begin (sub {
299 my %seen;
300 my @refs;
301 for (sort { $a->[0] <=> $b->[0] } @res) {
302 push @refs, $_->[1] unless $seen{$_->[1]}++
303 }
304 shift->send (@refs);
305 });
306
307 my $idx;
308 for my $t (split /,/, $nodeid) {
309 my $pri = ++$idx;
310
311 $t = length $t ? _nodename . ":$t" : _nodename
312 if $t =~ /^\d*$/;
313
314 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
315 or Carp::croak "$t: unparsable transport descriptor";
316
317 $port = "0" if $port eq "*";
318
319 if ($host eq "*") {
320 $cv->begin;
321 # use fork_call, as Net::Interface is big, and we need it rarely.
322 require AnyEvent::Util;
323 AnyEvent::Util::fork_call (
324 sub {
325 my @addr;
326
327 require Net::Interface;
328
329 for my $if (Net::Interface->interfaces) {
330 # we statically lower-prioritise ipv6 here, TODO :()
331 for my $_ ($if->address (Net::Interface::AF_INET ())) {
332 next if /^\x7f/; # skip localhost etc.
333 push @addr, $_;
334 }
335 for ($if->address (Net::Interface::AF_INET6 ())) {
336 #next if $if->scope ($_) <= 2;
337 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
338 push @addr, $_;
339 }
340
341 }
342 @addr
343 }, sub {
344 for my $ip (@_) {
345 push @res, [
346 $pri += 1e-5,
347 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
348 ];
349 }
350 $cv->end;
351 }
352 );
353 } else {
354 $cv->begin;
355 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
356 for (@_) {
357 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
358 push @res, [
359 $pri += 1e-5,
360 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
361 ];
362 }
363 $cv->end;
364 };
365 }
366 }
367
368 $cv->end;
369
370 $cv
371 }
372
373 sub configure(@) {
374 unshift @_, "profile" if @_ & 1;
375 my (%kv) = @_;
376
377 my $profile = delete $kv{profile};
378
379 $profile = _nodename
380 unless defined $profile;
381
382 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
383
384 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
385 $NODE = $node
386 unless $node eq "anon/";
387
388 $NODE{$NODE} = $NODE{""};
389 $NODE{$NODE}{id} = $NODE;
390
391 my $seeds = $CONFIG->{seeds};
392 my $binds = $CONFIG->{binds};
393
394 $binds ||= ["*"];
395
396 $WARN->(8, "node $NODE starting up.");
397
398 $LISTENER = [];
399 %LISTENER = ();
400
401 for (map _resolve $_, @$binds) {
402 for my $bind ($_->recv) {
403 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
404 or Carp::croak "$bind: unparsable local bind address";
405
406 my $listener = AnyEvent::MP::Transport::mp_server
407 $host,
408 $port,
409 prepare => sub {
410 my (undef, $host, $port) = @_;
411 $bind = AnyEvent::Socket::format_hostport $host, $port;
412 },
413 ;
414 $LISTENER{$bind} = $listener;
415 push @$LISTENER, $bind;
416 }
417 }
418
419 $WARN->(8, "node listens on [@$LISTENER].");
420
421 # the global service is mandatory currently
422 require AnyEvent::MP::Global;
423
424 # connect to all seednodes
425 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
426
427 for (@{ $CONFIG->{services} }) {
428 if (ref) {
429 my ($func, @args) = @$_;
430 (load_func $func)->(@args);
431 } elsif (s/::$//) {
432 eval "require $_";
433 die $@ if $@;
434 } else {
435 (load_func $_)->();
436 }
437 }
438 }
439
440 #############################################################################
441 # node monitoring and info
442
443 =item node_is_known $nodeid
444
445 Returns true iff the given node is currently known to the system.
446
447 =cut
448
449 sub node_is_known($) {
450 exists $NODE{$_[0]}
451 }
452
453 =item node_is_up $nodeid
454
455 Returns true if the given node is "up", that is, the kernel thinks it has
456 a working connection to it.
457
458 If the node is known but not currently connected, returns C<0>. If the
459 node is not known, returns C<undef>.
460
461 =cut
462
463 sub node_is_up($) {
464 ($NODE{$_[0]} or return)->{transport}
465 ? 1 : 0
466 }
467
468 =item known_nodes
469
470 Returns the node IDs of all nodes currently known to this node, including
471 itself and nodes not currently connected.
472
473 =cut
474
475 sub known_nodes {
476 map $_->{id}, values %NODE
477 }
478
479 =item up_nodes
480
481 Return the node IDs of all nodes that are currently connected (excluding
482 the node itself).
483
484 =cut
485
486 sub up_nodes {
487 map $_->{id}, grep $_->{transport}, values %NODE
488 }
489
490 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
491
492 Registers a callback that is called each time a node goes up (a connection
493 is established) or down (the connection is lost).
494
495 Node up messages can only be followed by node down messages for the same
496 node, and vice versa.
497
498 Note that monitoring a node is usually better done by monitoring it's node
499 port. This function is mainly of interest to modules that are concerned
500 about the network topology and low-level connection handling.
501
502 Callbacks I<must not> block and I<should not> send any messages.
503
504 The function returns an optional guard which can be used to unregister
505 the monitoring callback again.
506
507 =cut
508
509 our %MON_NODES;
510
511 sub mon_nodes($) {
512 my ($cb) = @_;
513
514 $MON_NODES{$cb+0} = $cb;
515
516 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
517 }
518
519 sub _inject_nodeevent($$;@) {
520 my ($node, $up, @reason) = @_;
521
522 for my $cb (values %MON_NODES) {
523 eval { $cb->($node->{id}, $up, @reason); 1 }
524 or $WARN->(1, $@);
525 }
526
527 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
528 }
529
530 #############################################################################
531 # self node code
532
533 our %node_req = (
534 # internal services
535
536 # monitoring
537 mon0 => sub { # stop monitoring a port
538 my $portid = shift;
539 my $node = $SRCNODE;
540 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
541 },
542 mon1 => sub { # start monitoring a port
543 my $portid = shift;
544 my $node = $SRCNODE;
545 Scalar::Util::weaken $node; #TODO# ugly
546 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
547 $node->send (["", kil => $portid, @_])
548 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
549 });
550 },
551 kil => sub {
552 my $cbs = delete $SRCNODE->{lmon}{+shift}
553 or return;
554
555 $_->(@_) for @$cbs;
556 },
557
558 # "public" services - not actually public
559
560 # relay message to another node / generic echo
561 snd => \&snd,
562 snd_multiple => sub {
563 snd @$_ for @_
564 },
565
566 # informational
567 info => sub {
568 snd @_, $NODE;
569 },
570 known_nodes => sub {
571 snd @_, known_nodes;
572 },
573 up_nodes => sub {
574 snd @_, up_nodes;
575 },
576
577 # random utilities
578 eval => sub {
579 my @res = eval shift;
580 snd @_, "$@", @res if @_;
581 },
582 time => sub {
583 snd @_, AE::time;
584 },
585 devnull => sub {
586 #
587 },
588 "" => sub {
589 # empty messages are keepalives or similar devnull-applications
590 },
591 );
592
593 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
594 $PORT{""} = sub {
595 my $tag = shift;
596 eval { &{ $node_req{$tag} ||= load_func $tag } };
597 $WARN->(2, "error processing node message: $@") if $@;
598 };
599
600 =back
601
602 =head1 SEE ALSO
603
604 L<AnyEvent::MP>.
605
606 =head1 AUTHOR
607
608 Marc Lehmann <schmorp@schmorp.de>
609 http://home.schmorp.de/
610
611 =cut
612
613 1
614