ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.68
Committed: Mon Apr 19 04:50:46 2010 UTC (14 years, 1 month ago) by root
Branch: MAIN
CVS Tags: rel-1_29, rel-1_30
Changes since 1.67: +2 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 add_node load_func snd_to_func snd_on eval_on
41
42 NODE $NODE node_of snd kil port_is_local
43 configure
44 up_nodes mon_nodes node_is_up
45 );
46
47 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
48
49 This value is called with an error or warning message, when e.g. a
50 connection could not be created, authorisation failed and so on.
51
52 It I<must not> block or send messages -queue it and use an idle watcher if
53 you need to do any of these things.
54
55 C<$level> should be C<0> for messages to be logged always, C<1> for
56 unexpected messages and errors, C<2> for warnings, C<7> for messages about
57 node connectivity and services, C<8> for debugging messages and C<9> for
58 tracing messages.
59
60 The default simply logs the message to STDERR.
61
62 =item @AnyEvent::MP::Kernel::WARN
63
64 All code references in this array are called for every log message, from
65 the default C<$WARN> handler. This is an easy way to tie into the log
66 messages without disturbing others.
67
68 =cut
69
70 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
71 our @WARN;
72 our $WARN = sub {
73 &$_ for @WARN;
74
75 return if $WARNLEVEL < $_[0];
76
77 my ($level, $msg) = @_;
78
79 $msg =~ s/\n$//;
80
81 printf STDERR "%s <%d> %s\n",
82 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
83 $level,
84 $msg;
85 };
86
87 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
88
89 The maximum level at which warning messages will be printed to STDERR by
90 the default warn handler.
91
92 =cut
93
94 sub load_func($) {
95 my $func = $_[0];
96
97 unless (defined &$func) {
98 my $pkg = $func;
99 do {
100 $pkg =~ s/::[^:]+$//
101 or return sub { die "unable to resolve function '$func'" };
102
103 local $@;
104 unless (eval "require $pkg; 1") {
105 my $error = $@;
106 $error =~ /^Can't locate .*.pm in \@INC \(/
107 or return sub { die $error };
108 }
109 } until defined &$func;
110 }
111
112 \&$func
113 }
114
115 sub nonce($) {
116 my $nonce;
117
118 if (open my $fh, "</dev/urandom") {
119 sysread $fh, $nonce, $_[0];
120 } else {
121 # shit...
122 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
123 }
124
125 $nonce
126 }
127
128 sub alnumbits($) {
129 my $data = $_[0];
130
131 if (eval "use Math::GMP 2.05; 1") {
132 $data = Math::GMP::get_str_gmp (
133 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
134 62
135 );
136 } else {
137 $data = MIME::Base64::encode_base64 $data, "";
138 $data =~ s/=//;
139 $data =~ s/x/x0/g;
140 $data =~ s/\//x1/g;
141 $data =~ s/\+/x2/g;
142 }
143
144 $data
145 }
146
147 sub gen_uniq {
148 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
149 }
150
151 our $CONFIG; # this node's configuration
152
153 our $RUNIQ; # remote uniq value
154 our $UNIQ; # per-process/node unique cookie
155 our $NODE;
156 our $ID = "a";
157
158 our %NODE; # node id to transport mapping, or "undef", for local node
159 our (%PORT, %PORT_DATA); # local ports
160
161 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
162 our %LMON; # monitored _local_ ports
163
164 our %LISTENER;
165 our $LISTENER; # our listeners, as arrayref
166
167 our $SRCNODE; # holds the sending node during _inject
168
169 sub _seed {
170 $RUNIQ = alnumbits nonce 96/8;
171 $UNIQ = gen_uniq;
172 $NODE = "anon/$RUNIQ";
173 }
174
175 _seed;
176
177 sub NODE() {
178 $NODE
179 }
180
181 sub node_of($) {
182 my ($node, undef) = split /#/, $_[0], 2;
183
184 $node
185 }
186
187 BEGIN {
188 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
189 ? sub () { 1 }
190 : sub () { 0 };
191 }
192
193 our $DELAY_TIMER;
194 our @DELAY_QUEUE;
195
196 sub _delay_run {
197 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
198 }
199
200 sub delay($) {
201 push @DELAY_QUEUE, shift;
202 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
203 }
204
205 sub _inject {
206 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
207 &{ $PORT{+shift} or return };
208 }
209
210 # this function adds a node-ref, so you can send stuff to it
211 # it is basically the central routing component.
212 sub add_node {
213 my ($node) = @_;
214
215 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
216 }
217
218 sub snd(@) {
219 my ($nodeid, $portid) = split /#/, shift, 2;
220
221 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
222
223 defined $nodeid #d#UGLY
224 or Carp::croak "'undef' is not a valid node ID/port ID";
225
226 ($NODE{$nodeid} || add_node $nodeid)
227 ->{send} (["$portid", @_]);
228 }
229
230 =item $is_local = port_is_local $port
231
232 Returns true iff the port is a local port.
233
234 =cut
235
236 sub port_is_local($) {
237 my ($nodeid, undef) = split /#/, $_[0], 2;
238
239 $NODE{$nodeid} == $NODE{""}
240 }
241
242 =item snd_to_func $node, $func, @args
243
244 Expects a node ID and a name of a function. Asynchronously tries to call
245 this function with the given arguments on that node.
246
247 This function can be used to implement C<spawn>-like interfaces.
248
249 =cut
250
251 sub snd_to_func($$;@) {
252 my $nodeid = shift;
253
254 # on $NODE, we artificially delay... (for spawn)
255 # this is very ugly - maybe we should simply delay ALL messages,
256 # to avoid deep recursion issues. but that's so... slow...
257 $AnyEvent::MP::Node::Self::DELAY = 1
258 if $nodeid ne $NODE;
259
260 defined $nodeid #d#UGLY
261 or Carp::croak "'undef' is not a valid node ID/port ID";
262
263 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
264 }
265
266 =item snd_on $node, @msg
267
268 Executes C<snd> with the given C<@msg> (which must include the destination
269 port) on the given node.
270
271 =cut
272
273 sub snd_on($@) {
274 my $node = shift;
275 snd $node, snd => @_;
276 }
277
278 =item eval_on $node, $string[, @reply]
279
280 Evaluates the given string as Perl expression on the given node. When
281 @reply is specified, then it is used to construct a reply message with
282 C<"$@"> and any results from the eval appended.
283
284 =cut
285
286 sub eval_on($$;@) {
287 my $node = shift;
288 snd $node, eval => @_;
289 }
290
291 sub kil(@) {
292 my ($nodeid, $portid) = split /#/, shift, 2;
293
294 length $portid
295 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
296
297 ($NODE{$nodeid} || add_node $nodeid)
298 ->kill ("$portid", @_);
299 }
300
301 sub _nodename {
302 require POSIX;
303 (POSIX::uname ())[1]
304 }
305
306 sub _resolve($) {
307 my ($nodeid) = @_;
308
309 my $cv = AE::cv;
310 my @res;
311
312 $cv->begin (sub {
313 my %seen;
314 my @refs;
315 for (sort { $a->[0] <=> $b->[0] } @res) {
316 push @refs, $_->[1] unless $seen{$_->[1]}++
317 }
318 shift->send (@refs);
319 });
320
321 my $idx;
322 for my $t (split /,/, $nodeid) {
323 my $pri = ++$idx;
324
325 $t = length $t ? _nodename . ":$t" : _nodename
326 if $t =~ /^\d*$/;
327
328 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
329 or Carp::croak "$t: unparsable transport descriptor";
330
331 $port = "0" if $port eq "*";
332
333 if ($host eq "*") {
334 $cv->begin;
335 # use fork_call, as Net::Interface is big, and we need it rarely.
336 require AnyEvent::Util;
337 AnyEvent::Util::fork_call (
338 sub {
339 my @addr;
340
341 require Net::Interface;
342
343 for my $if (Net::Interface->interfaces) {
344 # we statically lower-prioritise ipv6 here, TODO :()
345 for $_ ($if->address (Net::Interface::AF_INET ())) {
346 next if /^\x7f/; # skip localhost etc.
347 push @addr, $_;
348 }
349 for ($if->address (Net::Interface::AF_INET6 ())) {
350 #next if $if->scope ($_) <= 2;
351 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
352 push @addr, $_;
353 }
354
355 }
356 @addr
357 }, sub {
358 for my $ip (@_) {
359 push @res, [
360 $pri += 1e-5,
361 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
362 ];
363 }
364 $cv->end;
365 }
366 );
367 } else {
368 $cv->begin;
369 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
370 for (@_) {
371 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
372 push @res, [
373 $pri += 1e-5,
374 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
375 ];
376 }
377 $cv->end;
378 };
379 }
380 }
381
382 $cv->end;
383
384 $cv
385 }
386
387 sub configure(@) {
388 unshift @_, "profile" if @_ & 1;
389 my (%kv) = @_;
390
391 delete $NODE{$NODE}; # we do not support doing stuff before configure
392 _seed;
393
394 my $profile = delete $kv{profile};
395
396 $profile = _nodename
397 unless defined $profile;
398
399 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
400
401 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
402
403 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
404
405 $NODE = $node
406 unless $node eq "anon/";
407
408 $NODE{$NODE} = $NODE{""};
409 $NODE{$NODE}{id} = $NODE;
410
411 my $seeds = $CONFIG->{seeds};
412 my $binds = $CONFIG->{binds};
413
414 $binds ||= ["*"];
415
416 $WARN->(8, "node $NODE starting up.");
417
418 $LISTENER = [];
419 %LISTENER = ();
420
421 for (map _resolve $_, @$binds) {
422 for my $bind ($_->recv) {
423 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
424 or Carp::croak "$bind: unparsable local bind address";
425
426 my $listener = AnyEvent::MP::Transport::mp_server
427 $host,
428 $port,
429 prepare => sub {
430 my (undef, $host, $port) = @_;
431 $bind = AnyEvent::Socket::format_hostport $host, $port;
432 0
433 },
434 ;
435 $LISTENER{$bind} = $listener;
436 push @$LISTENER, $bind;
437 }
438 }
439
440 $WARN->(8, "node listens on [@$LISTENER].");
441
442 # the global service is mandatory currently
443 require AnyEvent::MP::Global;
444
445 # connect to all seednodes
446 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
447
448 for (@{ $CONFIG->{services} }) {
449 if (ref) {
450 my ($func, @args) = @$_;
451 (load_func $func)->(@args);
452 } elsif (s/::$//) {
453 eval "require $_";
454 die $@ if $@;
455 } else {
456 (load_func $_)->();
457 }
458 }
459 }
460
461 #############################################################################
462 # node monitoring and info
463
464 =item node_is_known $nodeid
465
466 Returns true iff the given node is currently known to the system. The only
467 time a node is known but not up currently is when a conenction request is
468 pending.
469
470 =cut
471
472 sub node_is_known($) {
473 exists $NODE{$_[0]}
474 }
475
476 =item node_is_up $nodeid
477
478 Returns true if the given node is "up", that is, the kernel thinks it has
479 a working connection to it.
480
481 If the node is known but not currently connected, returns C<0>. If the
482 node is not known, returns C<undef>.
483
484 =cut
485
486 sub node_is_up($) {
487 ($NODE{$_[0]} or return)->{transport}
488 ? 1 : 0
489 }
490
491 =item known_nodes
492
493 Returns the node IDs of all nodes currently known to this node, including
494 itself and nodes not currently connected.
495
496 =cut
497
498 sub known_nodes() {
499 map $_->{id}, values %NODE
500 }
501
502 =item up_nodes
503
504 Return the node IDs of all nodes that are currently connected (excluding
505 the node itself).
506
507 =cut
508
509 sub up_nodes() {
510 map $_->{id}, grep $_->{transport}, values %NODE
511 }
512
513 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
514
515 Registers a callback that is called each time a node goes up (a connection
516 is established) or down (the connection is lost).
517
518 Node up messages can only be followed by node down messages for the same
519 node, and vice versa.
520
521 Note that monitoring a node is usually better done by monitoring it's node
522 port. This function is mainly of interest to modules that are concerned
523 about the network topology and low-level connection handling.
524
525 Callbacks I<must not> block and I<should not> send any messages.
526
527 The function returns an optional guard which can be used to unregister
528 the monitoring callback again.
529
530 Example: make sure you call function C<newnode> for all nodes that are up
531 or go up (and down).
532
533 newnode $_, 1 for up_nodes;
534 mon_nodes \&newnode;
535
536 =cut
537
538 our %MON_NODES;
539
540 sub mon_nodes($) {
541 my ($cb) = @_;
542
543 $MON_NODES{$cb+0} = $cb;
544
545 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
546 }
547
548 sub _inject_nodeevent($$;@) {
549 my ($node, $up, @reason) = @_;
550
551 for my $cb (values %MON_NODES) {
552 eval { $cb->($node->{id}, $up, @reason); 1 }
553 or $WARN->(1, $@);
554 }
555
556 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
557 }
558
559 #############################################################################
560 # self node code
561
562 sub _kill {
563 my $port = shift;
564
565 delete $PORT{$port}
566 or return; # killing nonexistent ports is O.K.
567 delete $PORT_DATA{$port};
568
569 my $mon = delete $LMON{$port}
570 or !@_
571 or $WARN->(2, "unmonitored local port $port died with reason: @_");
572
573 $_->(@_) for values %$mon;
574 }
575
576 sub _monitor {
577 return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]")
578 unless exists $PORT{$_[1]};
579
580 $LMON{$_[1]}{$_[2]+0} = $_[2];
581 }
582
583 sub _unmonitor {
584 delete $LMON{$_[1]}{$_[2]+0}
585 if exists $LMON{$_[1]};
586 }
587
588 our %node_req = (
589 # internal services
590
591 # monitoring
592 mon0 => sub { # stop monitoring a port for another node
593 my $portid = shift;
594 _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid};
595 },
596 mon1 => sub { # start monitoring a port for another node
597 my $portid = shift;
598 Scalar::Util::weaken (my $node = $SRCNODE);
599 _monitor undef, $portid, $node->{rmon}{$portid} = sub {
600 delete $node->{rmon}{$portid};
601 $node->send (["", kil0 => $portid, @_])
602 if $node && $node->{transport};
603 };
604 },
605 # another node has killed a monitored port
606 kil0 => sub {
607 my $cbs = delete $SRCNODE->{lmon}{+shift}
608 or return;
609
610 $_->(@_) for @$cbs;
611 },
612
613 # "public" services - not actually public
614
615 # another node wants to kill a local port
616 kil => \&_kill,
617
618 # relay message to another node / generic echo
619 snd => \&snd,
620 snd_multiple => sub {
621 snd @$_ for @_
622 },
623
624 # informational
625 info => sub {
626 snd @_, $NODE;
627 },
628 known_nodes => sub {
629 snd @_, known_nodes;
630 },
631 up_nodes => sub {
632 snd @_, up_nodes;
633 },
634
635 # random utilities
636 eval => sub {
637 my @res = do { package main; eval shift };
638 snd @_, "$@", @res if @_;
639 },
640 time => sub {
641 snd @_, AE::time;
642 },
643 devnull => sub {
644 #
645 },
646 "" => sub {
647 # empty messages are keepalives or similar devnull-applications
648 },
649 );
650
651 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
652 $PORT{""} = sub {
653 my $tag = shift;
654 eval { &{ $node_req{$tag} ||= load_func $tag } };
655 $WARN->(2, "error processing node message: $@") if $@;
656 };
657
658 =back
659
660 =head1 SEE ALSO
661
662 L<AnyEvent::MP>.
663
664 =head1 AUTHOR
665
666 Marc Lehmann <schmorp@schmorp.de>
667 http://home.schmorp.de/
668
669 =cut
670
671 1
672