ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.46
Committed: Mon Sep 7 20:00:38 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.45: +10 -2 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '1.0';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil port_is_local
44 configure
45 up_nodes mon_nodes node_is_up
46 );
47
48 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49
50 This value is called with an error or warning message, when e.g. a
51 connection could not be created, authorisation failed and so on.
52
53 It I<must not> block or send messages -queue it and use an idle watcher if
54 you need to do any of these things.
55
56 C<$level> should be C<0> for messages to be logged always, C<1> for
57 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58 node connectivity and services, C<8> for debugging messages and C<9> for
59 tracing messages.
60
61 The default simply logs the message to STDERR.
62
63 =item @AnyEvent::MP::Kernel::WARN
64
65 All code references in this array are called for every log message, from
66 the default C<$WARN> handler. This is an easy way to tie into the log
67 messages without disturbing others.
68
69 =cut
70
71 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
72 our @WARN;
73 our $WARN = sub {
74 &$_ for @WARN;
75
76 return if $WARNLEVEL < $_[0];
77
78 my ($level, $msg) = @_;
79
80 $msg =~ s/\n$//;
81
82 printf STDERR "%s <%d> %s\n",
83 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
84 $level,
85 $msg;
86 };
87
88 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
89
90 The maximum level at which warning messages will be printed to STDERR by
91 the default warn handler.
92
93 =cut
94
95 sub load_func($) {
96 my $func = $_[0];
97
98 unless (defined &$func) {
99 my $pkg = $func;
100 do {
101 $pkg =~ s/::[^:]+$//
102 or return sub { die "unable to resolve '$func'" };
103 eval "require $pkg";
104 } until defined &$func;
105 }
106
107 \&$func
108 }
109
110 sub nonce($) {
111 my $nonce;
112
113 if (open my $fh, "</dev/urandom") {
114 sysread $fh, $nonce, $_[0];
115 } else {
116 # shit...
117 our $nonce_init;
118 unless ($nonce_init++) {
119 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
120 }
121 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
122 }
123
124 $nonce
125 }
126
127 sub alnumbits($) {
128 my $data = $_[0];
129
130 if (eval "use Math::GMP 2.05; 1") {
131 $data = Math::GMP::get_str_gmp (
132 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
133 62
134 );
135 } else {
136 $data = MIME::Base64::encode_base64 $data, "";
137 $data =~ s/=//;
138 $data =~ s/x/x0/g;
139 $data =~ s/\//x1/g;
140 $data =~ s/\+/x2/g;
141 }
142
143 $data
144 }
145
146 sub gen_uniq {
147 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
148 }
149
150 our $CONFIG; # this node's configuration
151
152 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
153 our $UNIQ = gen_uniq; # per-process/node unique cookie
154 our $NODE = "anon/$RUNIQ";
155 our $ID = "a";
156
157 our %NODE; # node id to transport mapping, or "undef", for local node
158 our (%PORT, %PORT_DATA); # local ports
159
160 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
161 our %LMON; # monitored _local_ ports
162
163 our %LISTENER;
164 our $LISTENER; # our listeners, as arrayref
165
166 our $SRCNODE; # holds the sending node during _inject
167
168 sub NODE() {
169 $NODE
170 }
171
172 sub node_of($) {
173 my ($node, undef) = split /#/, $_[0], 2;
174
175 $node
176 }
177
178 BEGIN {
179 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
180 ? sub () { 1 }
181 : sub () { 0 };
182 }
183
184 our $DELAY_TIMER;
185 our @DELAY_QUEUE;
186
187 sub _delay_run {
188 (shift @DELAY_QUEUE or return)->() while 1;
189 }
190
191 sub delay($) {
192 push @DELAY_QUEUE, shift;
193 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
194 }
195
196 sub _inject {
197 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
198 &{ $PORT{+shift} or return };
199 }
200
201 # this function adds a node-ref, so you can send stuff to it
202 # it is basically the central routing component.
203 sub add_node {
204 my ($node) = @_;
205
206 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
207 }
208
209 sub snd(@) {
210 my ($nodeid, $portid) = split /#/, shift, 2;
211
212 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
213
214 ($NODE{$nodeid} || add_node $nodeid)
215 ->{send} (["$portid", @_]);
216 }
217
218 =item $is_local = port_is_local $port
219
220 Returns true iff the port is a local port.
221
222 =cut
223
224 sub port_is_local($) {
225 my ($nodeid, undef) = split /#/, $_[0], 2;
226
227 $NODE{$nodeid} == $NODE{""}
228 }
229
230 =item snd_to_func $node, $func, @args
231
232 Expects a node ID and a name of a function. Asynchronously tries to call
233 this function with the given arguments on that node.
234
235 This function can be used to implement C<spawn>-like interfaces.
236
237 =cut
238
239 sub snd_to_func($$;@) {
240 my $nodeid = shift;
241
242 # on $NODE, we artificially delay... (for spawn)
243 # this is very ugly - maybe we should simply delay ALL messages,
244 # to avoid deep recursion issues. but that's so... slow...
245 $AnyEvent::MP::Node::Self::DELAY = 1
246 if $nodeid ne $NODE;
247
248 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
249 }
250
251 =item snd_on $node, @msg
252
253 Executes C<snd> with the given C<@msg> (which must include the destination
254 port) on the given node.
255
256 =cut
257
258 sub snd_on($@) {
259 my $node = shift;
260 snd $node, snd => @_;
261 }
262
263 =item eval_on $node, $string[, @reply]
264
265 Evaluates the given string as Perl expression on the given node. When
266 @reply is specified, then it is used to construct a reply message with
267 C<"$@"> and any results from the eval appended.
268
269 =cut
270
271 sub eval_on($$;@) {
272 my $node = shift;
273 snd $node, eval => @_;
274 }
275
276 sub kil(@) {
277 my ($nodeid, $portid) = split /#/, shift, 2;
278
279 length $portid
280 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
281
282 ($NODE{$nodeid} || add_node $nodeid)
283 ->kill ("$portid", @_);
284 }
285
286 sub _nodename {
287 require POSIX;
288 (POSIX::uname ())[1]
289 }
290
291 sub _resolve($) {
292 my ($nodeid) = @_;
293
294 my $cv = AE::cv;
295 my @res;
296
297 $cv->begin (sub {
298 my %seen;
299 my @refs;
300 for (sort { $a->[0] <=> $b->[0] } @res) {
301 push @refs, $_->[1] unless $seen{$_->[1]}++
302 }
303 shift->send (@refs);
304 });
305
306 my $idx;
307 for my $t (split /,/, $nodeid) {
308 my $pri = ++$idx;
309
310 $t = length $t ? _nodename . ":$t" : _nodename
311 if $t =~ /^\d*$/;
312
313 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
314 or Carp::croak "$t: unparsable transport descriptor";
315
316 $port = "0" if $port eq "*";
317
318 if ($host eq "*") {
319 $cv->begin;
320 # use fork_call, as Net::Interface is big, and we need it rarely.
321 require AnyEvent::Util;
322 AnyEvent::Util::fork_call (
323 sub {
324 my @addr;
325
326 require Net::Interface;
327
328 for my $if (Net::Interface->interfaces) {
329 # we statically lower-prioritise ipv6 here, TODO :()
330 for my $_ ($if->address (Net::Interface::AF_INET ())) {
331 next if /^\x7f/; # skip localhost etc.
332 push @addr, $_;
333 }
334 for ($if->address (Net::Interface::AF_INET6 ())) {
335 #next if $if->scope ($_) <= 2;
336 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
337 push @addr, $_;
338 }
339
340 }
341 @addr
342 }, sub {
343 for my $ip (@_) {
344 push @res, [
345 $pri += 1e-5,
346 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
347 ];
348 }
349 $cv->end;
350 }
351 );
352 } else {
353 $cv->begin;
354 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
355 for (@_) {
356 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
357 push @res, [
358 $pri += 1e-5,
359 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
360 ];
361 }
362 $cv->end;
363 };
364 }
365 }
366
367 $cv->end;
368
369 $cv
370 }
371
372 sub configure(@) {
373 unshift @_, "profile" if @_ & 1;
374 my (%kv) = @_;
375
376 my $profile = delete $kv{profile};
377
378 $profile = _nodename
379 unless defined $profile;
380
381 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
382
383 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
384 $NODE = $node
385 unless $node eq "anon/";
386
387 $NODE{$NODE} = $NODE{""};
388 $NODE{$NODE}{id} = $NODE;
389
390 my $seeds = $CONFIG->{seeds};
391 my $binds = $CONFIG->{binds};
392
393 $binds ||= ["*"];
394
395 $WARN->(8, "node $NODE starting up.");
396
397 $LISTENER = [];
398 %LISTENER = ();
399
400 for (map _resolve $_, @$binds) {
401 for my $bind ($_->recv) {
402 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
403 or Carp::croak "$bind: unparsable local bind address";
404
405 my $listener = AnyEvent::MP::Transport::mp_server
406 $host,
407 $port,
408 prepare => sub {
409 my (undef, $host, $port) = @_;
410 $bind = AnyEvent::Socket::format_hostport $host, $port;
411 },
412 ;
413 $LISTENER{$bind} = $listener;
414 push @$LISTENER, $bind;
415 }
416 }
417
418 $WARN->(8, "node listens on [@$LISTENER].");
419
420 # the global service is mandatory currently
421 require AnyEvent::MP::Global;
422
423 # connect to all seednodes
424 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
425
426 for (@{ $CONFIG->{services} }) {
427 if (ref) {
428 my ($func, @args) = @$_;
429 (load_func $func)->(@args);
430 } elsif (s/::$//) {
431 eval "require $_";
432 die $@ if $@;
433 } else {
434 (load_func $_)->();
435 }
436 }
437 }
438
439 #############################################################################
440 # node monitoring and info
441
442 =item node_is_known $nodeid
443
444 Returns true iff the given node is currently known to the system. The only
445 time a node is known but not up currently is when a conenction request is
446 pending.
447
448 =cut
449
450 sub node_is_known($) {
451 exists $NODE{$_[0]}
452 }
453
454 =item node_is_up $nodeid
455
456 Returns true if the given node is "up", that is, the kernel thinks it has
457 a working connection to it.
458
459 If the node is known but not currently connected, returns C<0>. If the
460 node is not known, returns C<undef>.
461
462 =cut
463
464 sub node_is_up($) {
465 ($NODE{$_[0]} or return)->{transport}
466 ? 1 : 0
467 }
468
469 =item known_nodes
470
471 Returns the node IDs of all nodes currently known to this node, including
472 itself and nodes not currently connected.
473
474 =cut
475
476 sub known_nodes {
477 map $_->{id}, values %NODE
478 }
479
480 =item up_nodes
481
482 Return the node IDs of all nodes that are currently connected (excluding
483 the node itself).
484
485 =cut
486
487 sub up_nodes {
488 map $_->{id}, grep $_->{transport}, values %NODE
489 }
490
491 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
492
493 Registers a callback that is called each time a node goes up (a connection
494 is established) or down (the connection is lost).
495
496 Node up messages can only be followed by node down messages for the same
497 node, and vice versa.
498
499 Note that monitoring a node is usually better done by monitoring it's node
500 port. This function is mainly of interest to modules that are concerned
501 about the network topology and low-level connection handling.
502
503 Callbacks I<must not> block and I<should not> send any messages.
504
505 The function returns an optional guard which can be used to unregister
506 the monitoring callback again.
507
508 Example: make sure you call function C<newnode> for all nodes that are up
509 or go up (and down).
510
511 newnode $_, 1 for up_nodes;
512 mon_nodes \&newnode;
513
514 =cut
515
516 our %MON_NODES;
517
518 sub mon_nodes($) {
519 my ($cb) = @_;
520
521 $MON_NODES{$cb+0} = $cb;
522
523 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
524 }
525
526 sub _inject_nodeevent($$;@) {
527 my ($node, $up, @reason) = @_;
528
529 for my $cb (values %MON_NODES) {
530 eval { $cb->($node->{id}, $up, @reason); 1 }
531 or $WARN->(1, $@);
532 }
533
534 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
535 }
536
537 #############################################################################
538 # self node code
539
540 our %node_req = (
541 # internal services
542
543 # monitoring
544 mon0 => sub { # stop monitoring a port
545 my $portid = shift;
546 my $node = $SRCNODE;
547 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
548 },
549 mon1 => sub { # start monitoring a port
550 my $portid = shift;
551 my $node = $SRCNODE;
552 Scalar::Util::weaken $node; #TODO# ugly
553 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
554 $node->send (["", kil => $portid, @_])
555 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
556 });
557 },
558 kil => sub {
559 my $cbs = delete $SRCNODE->{lmon}{+shift}
560 or return;
561
562 $_->(@_) for @$cbs;
563 },
564
565 # "public" services - not actually public
566
567 # relay message to another node / generic echo
568 snd => \&snd,
569 snd_multiple => sub {
570 snd @$_ for @_
571 },
572
573 # informational
574 info => sub {
575 snd @_, $NODE;
576 },
577 known_nodes => sub {
578 snd @_, known_nodes;
579 },
580 up_nodes => sub {
581 snd @_, up_nodes;
582 },
583
584 # random utilities
585 eval => sub {
586 my @res = eval shift;
587 snd @_, "$@", @res if @_;
588 },
589 time => sub {
590 snd @_, AE::time;
591 },
592 devnull => sub {
593 #
594 },
595 "" => sub {
596 # empty messages are keepalives or similar devnull-applications
597 },
598 );
599
600 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
601 $PORT{""} = sub {
602 my $tag = shift;
603 eval { &{ $node_req{$tag} ||= load_func $tag } };
604 $WARN->(2, "error processing node message: $@") if $@;
605 };
606
607 =back
608
609 =head1 SEE ALSO
610
611 L<AnyEvent::MP>.
612
613 =head1 AUTHOR
614
615 Marc Lehmann <schmorp@schmorp.de>
616 http://home.schmorp.de/
617
618 =cut
619
620 1
621