ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP/Kernel.pm
Revision: 1.66
Committed: Wed Dec 30 13:37:53 2009 UTC (14 years, 6 months ago) by root
Branch: MAIN
Changes since 1.65: +16 -3 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 add_node load_func snd_to_func snd_on eval_on
41
42 NODE $NODE node_of snd kil port_is_local
43 configure
44 up_nodes mon_nodes node_is_up
45 );
46
47 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
48
49 This value is called with an error or warning message, when e.g. a
50 connection could not be created, authorisation failed and so on.
51
52 It I<must not> block or send messages -queue it and use an idle watcher if
53 you need to do any of these things.
54
55 C<$level> should be C<0> for messages to be logged always, C<1> for
56 unexpected messages and errors, C<2> for warnings, C<7> for messages about
57 node connectivity and services, C<8> for debugging messages and C<9> for
58 tracing messages.
59
60 The default simply logs the message to STDERR.
61
62 =item @AnyEvent::MP::Kernel::WARN
63
64 All code references in this array are called for every log message, from
65 the default C<$WARN> handler. This is an easy way to tie into the log
66 messages without disturbing others.
67
68 =cut
69
70 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
71 our @WARN;
72 our $WARN = sub {
73 &$_ for @WARN;
74
75 return if $WARNLEVEL < $_[0];
76
77 my ($level, $msg) = @_;
78
79 $msg =~ s/\n$//;
80
81 printf STDERR "%s <%d> %s\n",
82 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
83 $level,
84 $msg;
85 };
86
87 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
88
89 The maximum level at which warning messages will be printed to STDERR by
90 the default warn handler.
91
92 =cut
93
94 sub load_func($) {
95 my $func = $_[0];
96
97 unless (defined &$func) {
98 my $pkg = $func;
99 do {
100 $pkg =~ s/::[^:]+$//
101 or return sub { die "unable to resolve function '$func'" };
102
103 local $@;
104 unless (eval "require $pkg; 1") {
105 my $error = $@;
106 $error =~ /^Can't locate .*.pm in \@INC \(/
107 or return sub { die $error };
108 }
109 } until defined &$func;
110 }
111
112 \&$func
113 }
114
115 sub nonce($) {
116 my $nonce;
117
118 if (open my $fh, "</dev/urandom") {
119 sysread $fh, $nonce, $_[0];
120 } else {
121 # shit...
122 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
123 }
124
125 $nonce
126 }
127
128 sub alnumbits($) {
129 my $data = $_[0];
130
131 if (eval "use Math::GMP 2.05; 1") {
132 $data = Math::GMP::get_str_gmp (
133 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
134 62
135 );
136 } else {
137 $data = MIME::Base64::encode_base64 $data, "";
138 $data =~ s/=//;
139 $data =~ s/x/x0/g;
140 $data =~ s/\//x1/g;
141 $data =~ s/\+/x2/g;
142 }
143
144 $data
145 }
146
147 sub gen_uniq {
148 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
149 }
150
151 our $CONFIG; # this node's configuration
152
153 our $RUNIQ; # remote uniq value
154 our $UNIQ; # per-process/node unique cookie
155 our $NODE;
156 our $ID = "a";
157
158 our %NODE; # node id to transport mapping, or "undef", for local node
159 our (%PORT, %PORT_DATA); # local ports
160
161 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
162 our %LMON; # monitored _local_ ports
163
164 our %LISTENER;
165 our $LISTENER; # our listeners, as arrayref
166
167 our $SRCNODE; # holds the sending node during _inject
168
169 sub _seed {
170 $RUNIQ = alnumbits nonce 96/8;
171 $UNIQ = gen_uniq;
172 $NODE = "anon/$RUNIQ";
173 }
174
175 _seed;
176
177 sub NODE() {
178 $NODE
179 }
180
181 sub node_of($) {
182 my ($node, undef) = split /#/, $_[0], 2;
183
184 $node
185 }
186
187 BEGIN {
188 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
189 ? sub () { 1 }
190 : sub () { 0 };
191 }
192
193 our $DELAY_TIMER;
194 our @DELAY_QUEUE;
195
196 sub _delay_run {
197 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
198 }
199
200 sub delay($) {
201 push @DELAY_QUEUE, shift;
202 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
203 }
204
205 sub _inject {
206 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
207 &{ $PORT{+shift} or return };
208 }
209
210 # this function adds a node-ref, so you can send stuff to it
211 # it is basically the central routing component.
212 sub add_node {
213 my ($node) = @_;
214
215 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
216 }
217
218 sub snd(@) {
219 my ($nodeid, $portid) = split /#/, shift, 2;
220
221 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
222
223 defined $nodeid #d#UGLY
224 or Carp::croak "'undef' is not a valid node ID/port ID";
225
226 ($NODE{$nodeid} || add_node $nodeid)
227 ->{send} (["$portid", @_]);
228 }
229
230 =item $is_local = port_is_local $port
231
232 Returns true iff the port is a local port.
233
234 =cut
235
236 sub port_is_local($) {
237 my ($nodeid, undef) = split /#/, $_[0], 2;
238
239 $NODE{$nodeid} == $NODE{""}
240 }
241
242 =item snd_to_func $node, $func, @args
243
244 Expects a node ID and a name of a function. Asynchronously tries to call
245 this function with the given arguments on that node.
246
247 This function can be used to implement C<spawn>-like interfaces.
248
249 =cut
250
251 sub snd_to_func($$;@) {
252 my $nodeid = shift;
253
254 # on $NODE, we artificially delay... (for spawn)
255 # this is very ugly - maybe we should simply delay ALL messages,
256 # to avoid deep recursion issues. but that's so... slow...
257 $AnyEvent::MP::Node::Self::DELAY = 1
258 if $nodeid ne $NODE;
259
260 defined $nodeid #d#UGLY
261 or Carp::croak "'undef' is not a valid node ID/port ID";
262
263 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
264 }
265
266 =item snd_on $node, @msg
267
268 Executes C<snd> with the given C<@msg> (which must include the destination
269 port) on the given node.
270
271 =cut
272
273 sub snd_on($@) {
274 my $node = shift;
275 snd $node, snd => @_;
276 }
277
278 =item eval_on $node, $string[, @reply]
279
280 Evaluates the given string as Perl expression on the given node. When
281 @reply is specified, then it is used to construct a reply message with
282 C<"$@"> and any results from the eval appended.
283
284 =cut
285
286 sub eval_on($$;@) {
287 my $node = shift;
288 snd $node, eval => @_;
289 }
290
291 sub kil(@) {
292 my ($nodeid, $portid) = split /#/, shift, 2;
293
294 length $portid
295 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
296
297 ($NODE{$nodeid} || add_node $nodeid)
298 ->kill ("$portid", @_);
299 }
300
301 # actual code that kills a port
302 sub _kill {
303 my $port = shift;
304
305 delete $PORT{$port}
306 or return; # killing nonexistent ports is O.K.
307 delete $PORT_DATA{$port};
308
309 my $mon = delete $LMON{$port}
310 or !@_
311 or $WARN->(2, "unmonitored local port $port died with reason: @_");
312
313 $_->(@_) for values %$mon;
314 }
315
316 sub _nodename {
317 require POSIX;
318 (POSIX::uname ())[1]
319 }
320
321 sub _resolve($) {
322 my ($nodeid) = @_;
323
324 my $cv = AE::cv;
325 my @res;
326
327 $cv->begin (sub {
328 my %seen;
329 my @refs;
330 for (sort { $a->[0] <=> $b->[0] } @res) {
331 push @refs, $_->[1] unless $seen{$_->[1]}++
332 }
333 shift->send (@refs);
334 });
335
336 my $idx;
337 for my $t (split /,/, $nodeid) {
338 my $pri = ++$idx;
339
340 $t = length $t ? _nodename . ":$t" : _nodename
341 if $t =~ /^\d*$/;
342
343 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
344 or Carp::croak "$t: unparsable transport descriptor";
345
346 $port = "0" if $port eq "*";
347
348 if ($host eq "*") {
349 $cv->begin;
350 # use fork_call, as Net::Interface is big, and we need it rarely.
351 require AnyEvent::Util;
352 AnyEvent::Util::fork_call (
353 sub {
354 my @addr;
355
356 require Net::Interface;
357
358 for my $if (Net::Interface->interfaces) {
359 # we statically lower-prioritise ipv6 here, TODO :()
360 for $_ ($if->address (Net::Interface::AF_INET ())) {
361 next if /^\x7f/; # skip localhost etc.
362 push @addr, $_;
363 }
364 for ($if->address (Net::Interface::AF_INET6 ())) {
365 #next if $if->scope ($_) <= 2;
366 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
367 push @addr, $_;
368 }
369
370 }
371 @addr
372 }, sub {
373 for my $ip (@_) {
374 push @res, [
375 $pri += 1e-5,
376 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
377 ];
378 }
379 $cv->end;
380 }
381 );
382 } else {
383 $cv->begin;
384 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
385 for (@_) {
386 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
387 push @res, [
388 $pri += 1e-5,
389 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
390 ];
391 }
392 $cv->end;
393 };
394 }
395 }
396
397 $cv->end;
398
399 $cv
400 }
401
402 sub configure(@) {
403 unshift @_, "profile" if @_ & 1;
404 my (%kv) = @_;
405
406 delete $NODE{$NODE}; # we do not support doing stuff before configure
407 _seed;
408
409 my $profile = delete $kv{profile};
410
411 $profile = _nodename
412 unless defined $profile;
413
414 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
415
416 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
417
418 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
419
420 $NODE = $node
421 unless $node eq "anon/";
422
423 $NODE{$NODE} = $NODE{""};
424 $NODE{$NODE}{id} = $NODE;
425
426 my $seeds = $CONFIG->{seeds};
427 my $binds = $CONFIG->{binds};
428
429 $binds ||= ["*"];
430
431 $WARN->(8, "node $NODE starting up.");
432
433 $LISTENER = [];
434 %LISTENER = ();
435
436 for (map _resolve $_, @$binds) {
437 for my $bind ($_->recv) {
438 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
439 or Carp::croak "$bind: unparsable local bind address";
440
441 my $listener = AnyEvent::MP::Transport::mp_server
442 $host,
443 $port,
444 prepare => sub {
445 my (undef, $host, $port) = @_;
446 $bind = AnyEvent::Socket::format_hostport $host, $port;
447 0
448 },
449 ;
450 $LISTENER{$bind} = $listener;
451 push @$LISTENER, $bind;
452 }
453 }
454
455 $WARN->(8, "node listens on [@$LISTENER].");
456
457 # the global service is mandatory currently
458 require AnyEvent::MP::Global;
459
460 # connect to all seednodes
461 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
462
463 for (@{ $CONFIG->{services} }) {
464 if (ref) {
465 my ($func, @args) = @$_;
466 (load_func $func)->(@args);
467 } elsif (s/::$//) {
468 eval "require $_";
469 die $@ if $@;
470 } else {
471 (load_func $_)->();
472 }
473 }
474 }
475
476 #############################################################################
477 # node monitoring and info
478
479 =item node_is_known $nodeid
480
481 Returns true iff the given node is currently known to the system. The only
482 time a node is known but not up currently is when a conenction request is
483 pending.
484
485 =cut
486
487 sub node_is_known($) {
488 exists $NODE{$_[0]}
489 }
490
491 =item node_is_up $nodeid
492
493 Returns true if the given node is "up", that is, the kernel thinks it has
494 a working connection to it.
495
496 If the node is known but not currently connected, returns C<0>. If the
497 node is not known, returns C<undef>.
498
499 =cut
500
501 sub node_is_up($) {
502 ($NODE{$_[0]} or return)->{transport}
503 ? 1 : 0
504 }
505
506 =item known_nodes
507
508 Returns the node IDs of all nodes currently known to this node, including
509 itself and nodes not currently connected.
510
511 =cut
512
513 sub known_nodes() {
514 map $_->{id}, values %NODE
515 }
516
517 =item up_nodes
518
519 Return the node IDs of all nodes that are currently connected (excluding
520 the node itself).
521
522 =cut
523
524 sub up_nodes() {
525 map $_->{id}, grep $_->{transport}, values %NODE
526 }
527
528 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
529
530 Registers a callback that is called each time a node goes up (a connection
531 is established) or down (the connection is lost).
532
533 Node up messages can only be followed by node down messages for the same
534 node, and vice versa.
535
536 Note that monitoring a node is usually better done by monitoring it's node
537 port. This function is mainly of interest to modules that are concerned
538 about the network topology and low-level connection handling.
539
540 Callbacks I<must not> block and I<should not> send any messages.
541
542 The function returns an optional guard which can be used to unregister
543 the monitoring callback again.
544
545 Example: make sure you call function C<newnode> for all nodes that are up
546 or go up (and down).
547
548 newnode $_, 1 for up_nodes;
549 mon_nodes \&newnode;
550
551 =cut
552
553 our %MON_NODES;
554
555 sub mon_nodes($) {
556 my ($cb) = @_;
557
558 $MON_NODES{$cb+0} = $cb;
559
560 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
561 }
562
563 sub _inject_nodeevent($$;@) {
564 my ($node, $up, @reason) = @_;
565
566 for my $cb (values %MON_NODES) {
567 eval { $cb->($node->{id}, $up, @reason); 1 }
568 or $WARN->(1, $@);
569 }
570
571 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
572 }
573
574 #############################################################################
575 # self node code
576
577 our %node_req = (
578 # internal services
579
580 # monitoring
581 mon0 => sub { # stop monitoring a port for another node
582 my $portid = shift;
583 my $node = $SRCNODE;
584 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
585 },
586 mon1 => sub { # start monitoring a port for another node
587 my $portid = shift;
588 my $node = $SRCNODE;
589 Scalar::Util::weaken $node;
590 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
591 delete $node->{rmon}{$portid};
592 $node->send (["", kil0 => $portid, @_])
593 if $node && $node->{transport};
594 });
595 },
596 # another node has killed a monitored port
597 kil0 => sub {
598 my $cbs = delete $SRCNODE->{lmon}{+shift}
599 or return;
600
601 $_->(@_) for @$cbs;
602 },
603
604 # "public" services - not actually public
605
606 # another node wants to kill a local port
607 kil => \&_kill,
608
609 # relay message to another node / generic echo
610 snd => \&snd,
611 snd_multiple => sub {
612 snd @$_ for @_
613 },
614
615 # informational
616 info => sub {
617 snd @_, $NODE;
618 },
619 known_nodes => sub {
620 snd @_, known_nodes;
621 },
622 up_nodes => sub {
623 snd @_, up_nodes;
624 },
625
626 # random utilities
627 eval => sub {
628 my @res = do { package main; eval shift };
629 snd @_, "$@", @res if @_;
630 },
631 time => sub {
632 snd @_, AE::time;
633 },
634 devnull => sub {
635 #
636 },
637 "" => sub {
638 # empty messages are keepalives or similar devnull-applications
639 },
640 );
641
642 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
643 $PORT{""} = sub {
644 my $tag = shift;
645 eval { &{ $node_req{$tag} ||= load_func $tag } };
646 $WARN->(2, "error processing node message: $@") if $@;
647 };
648
649 =back
650
651 =head1 SEE ALSO
652
653 L<AnyEvent::MP>.
654
655 =head1 AUTHOR
656
657 Marc Lehmann <schmorp@schmorp.de>
658 http://home.schmorp.de/
659
660 =cut
661
662 1
663