ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.42
Committed: Sat Sep 5 21:16:59 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.41: +13 -4 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '1.0';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil port_is_local
44 configure
45 known_nodes up_nodes mon_nodes node_is_known node_is_up
46 );
47
48 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49
50 This value is called with an error or warning message, when e.g. a
51 connection could not be created, authorisation failed and so on.
52
53 It I<must not> block or send messages -queue it and use an idle watcher if
54 you need to do any of these things.
55
56 C<$level> should be C<0> for messages to be logged always, C<1> for
57 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58 node connectivity and services, C<8> for debugging messages and C<9> for
59 tracing messages.
60
61 The default simply logs the message to STDERR.
62
63 =cut
64
65 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
66
67 our $WARN = sub {
68 return if $WARNLEVEL < $_[0];
69
70 my ($level, $msg) = @_;
71
72 $msg =~ s/\n$//;
73
74 printf STDERR "%s <%d> %s\n",
75 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
76 $level,
77 $msg;
78 };
79
80 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
81
82 The maximum level at which warning messages will be printed to STDERR by
83 the default warn handler.
84
85 =cut
86
87 sub load_func($) {
88 my $func = $_[0];
89
90 unless (defined &$func) {
91 my $pkg = $func;
92 do {
93 $pkg =~ s/::[^:]+$//
94 or return sub { die "unable to resolve '$func'" };
95 eval "require $pkg";
96 } until defined &$func;
97 }
98
99 \&$func
100 }
101
102 sub nonce($) {
103 my $nonce;
104
105 if (open my $fh, "</dev/urandom") {
106 sysread $fh, $nonce, $_[0];
107 } else {
108 # shit...
109 our $nonce_init;
110 unless ($nonce_init++) {
111 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
112 }
113 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
114 }
115
116 $nonce
117 }
118
119 sub alnumbits($) {
120 my $data = $_[0];
121
122 if (eval "use Math::GMP 2.05; 1") {
123 $data = Math::GMP::get_str_gmp (
124 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
125 62
126 );
127 } else {
128 $data = MIME::Base64::encode_base64 $data, "";
129 $data =~ s/=//;
130 $data =~ s/x/x0/g;
131 $data =~ s/\//x1/g;
132 $data =~ s/\+/x2/g;
133 }
134
135 $data
136 }
137
138 sub gen_uniq {
139 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
140 }
141
142 our $CONFIG; # this node's configuration
143
144 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
145 our $UNIQ = gen_uniq; # per-process/node unique cookie
146 our $NODE = "anon/$RUNIQ";
147 our $ID = "a";
148
149 our %NODE; # node id to transport mapping, or "undef", for local node
150 our (%PORT, %PORT_DATA); # local ports
151
152 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
153 our %LMON; # monitored _local_ ports
154
155 our %LISTENER;
156 our $LISTENER; # our listeners, as arrayref
157
158 our $SRCNODE; # holds the sending node during _inject
159
160 sub NODE() {
161 $NODE
162 }
163
164 sub node_of($) {
165 my ($node, undef) = split /#/, $_[0], 2;
166
167 $node
168 }
169
170 BEGIN {
171 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
172 ? sub () { 1 }
173 : sub () { 0 };
174 }
175
176 our $DELAY_TIMER;
177 our @DELAY_QUEUE;
178
179 sub _delay_run {
180 (shift @DELAY_QUEUE or return)->() while 1;
181 }
182
183 sub delay($) {
184 push @DELAY_QUEUE, shift;
185 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
186 }
187
188 sub _inject {
189 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
190 &{ $PORT{+shift} or return };
191 }
192
193 # this function adds a node-ref, so you can send stuff to it
194 # it is basically the central routing component.
195 sub add_node {
196 my ($node) = @_;
197
198 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
199 }
200
201 sub snd(@) {
202 my ($nodeid, $portid) = split /#/, shift, 2;
203
204 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
205
206 ($NODE{$nodeid} || add_node $nodeid)
207 ->{send} (["$portid", @_]);
208 }
209
210 =item $is_local = port_is_local $port
211
212 Returns true iff the port is a local port.
213
214 =cut
215
216 sub port_is_local($) {
217 my ($nodeid, undef) = split /#/, $_[0], 2;
218
219 $NODE{$nodeid} == $NODE{""}
220 }
221
222 =item snd_to_func $node, $func, @args
223
224 Expects a node ID and a name of a function. Asynchronously tries to call
225 this function with the given arguments on that node.
226
227 This function can be used to implement C<spawn>-like interfaces.
228
229 =cut
230
231 sub snd_to_func($$;@) {
232 my $nodeid = shift;
233
234 return ($NODE{$nodeid} || add_node $nodeid)->send (["", @_])
235 if $nodeid ne $NODE;
236
237 # on $NODE, we artificially delay... (for spawn)
238 # this is very ugly - maybe we should simply delay ALL messages,
239 # to avoid deep recursion issues. but that's so... slow...
240 my @msg = ("", @_);
241 delay sub { $NODE{""}->send (\@msg) };
242 }
243
244 =item snd_on $node, @msg
245
246 Executes C<snd> with the given C<@msg> (which must include the destination
247 port) on the given node.
248
249 =cut
250
251 sub snd_on($@) {
252 my $node = shift;
253 snd $node, snd => @_;
254 }
255
256 =item eval_on $node, $string[, @reply]
257
258 Evaluates the given string as Perl expression on the given node. When
259 @reply is specified, then it is used to construct a reply message with
260 C<"$@"> and any results from the eval appended.
261
262 =cut
263
264 sub eval_on($$;@) {
265 my $node = shift;
266 snd $node, eval => @_;
267 }
268
269 sub kil(@) {
270 my ($nodeid, $portid) = split /#/, shift, 2;
271
272 length $portid
273 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
274
275 ($NODE{$nodeid} || add_node $nodeid)
276 ->kill ("$portid", @_);
277 }
278
279 sub _nodename {
280 require POSIX;
281 (POSIX::uname ())[1]
282 }
283
284 sub _resolve($) {
285 my ($nodeid) = @_;
286
287 my $cv = AE::cv;
288 my @res;
289
290 $cv->begin (sub {
291 my %seen;
292 my @refs;
293 for (sort { $a->[0] <=> $b->[0] } @res) {
294 push @refs, $_->[1] unless $seen{$_->[1]}++
295 }
296 shift->send (@refs);
297 });
298
299 my $idx;
300 for my $t (split /,/, $nodeid) {
301 my $pri = ++$idx;
302
303 $t = length $t ? _nodename . ":$t" : _nodename
304 if $t =~ /^\d*$/;
305
306 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
307 or Carp::croak "$t: unparsable transport descriptor";
308
309 $port = "0" if $port eq "*";
310
311 if ($host eq "*") {
312 $cv->begin;
313 # use fork_call, as Net::Interface is big, and we need it rarely.
314 require AnyEvent::Util;
315 AnyEvent::Util::fork_call (
316 sub {
317 my @addr;
318
319 require Net::Interface;
320
321 for my $if (Net::Interface->interfaces) {
322 # we statically lower-prioritise ipv6 here, TODO :()
323 for my $_ ($if->address (Net::Interface::AF_INET ())) {
324 next if /^\x7f/; # skip localhost etc.
325 push @addr, $_;
326 }
327 for ($if->address (Net::Interface::AF_INET6 ())) {
328 #next if $if->scope ($_) <= 2;
329 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
330 push @addr, $_;
331 }
332
333 }
334 @addr
335 }, sub {
336 for my $ip (@_) {
337 push @res, [
338 $pri += 1e-5,
339 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
340 ];
341 }
342 $cv->end;
343 }
344 );
345 } else {
346 $cv->begin;
347 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
348 for (@_) {
349 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
350 push @res, [
351 $pri += 1e-5,
352 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
353 ];
354 }
355 $cv->end;
356 };
357 }
358 }
359
360 $cv->end;
361
362 $cv
363 }
364
365 sub configure(@) {
366 unshift @_, "profile" if @_ & 1;
367 my (%kv) = @_;
368
369 my $profile = delete $kv{profile};
370
371 $profile = _nodename
372 unless defined $profile;
373
374 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
375
376 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
377 $NODE = $node
378 unless $node eq "anon/";
379
380 $NODE{$NODE} = $NODE{""};
381 $NODE{$NODE}{id} = $NODE;
382
383 my $seeds = $CONFIG->{seeds};
384 my $binds = $CONFIG->{binds};
385
386 $binds ||= ["*"];
387
388 $WARN->(8, "node $NODE starting up.");
389
390 $LISTENER = [];
391 %LISTENER = ();
392
393 for (map _resolve $_, @$binds) {
394 for my $bind ($_->recv) {
395 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
396 or Carp::croak "$bind: unparsable local bind address";
397
398 my $listener = AnyEvent::MP::Transport::mp_server
399 $host,
400 $port,
401 prepare => sub {
402 my (undef, $host, $port) = @_;
403 $bind = AnyEvent::Socket::format_hostport $host, $port;
404 },
405 ;
406 $LISTENER{$bind} = $listener;
407 push @$LISTENER, $bind;
408 }
409 }
410
411 $WARN->(8, "node listens on [@$LISTENER].");
412
413 # the global service is mandatory currently
414 require AnyEvent::MP::Global;
415
416 # connect to all seednodes
417 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
418
419 for (@{ $CONFIG->{services} }) {
420 if (s/::$//) {
421 eval "require $_";
422 die $@ if $@;
423 } else {
424 (load_func $_)->();
425 }
426 }
427 }
428
429 #############################################################################
430 # node monitoring and info
431
432 =item node_is_known $nodeid
433
434 Returns true iff the given node is currently known to the system.
435
436 =cut
437
438 sub node_is_known($) {
439 exists $NODE{$_[0]}
440 }
441
442 =item node_is_up $nodeid
443
444 Returns true if the given node is "up", that is, the kernel thinks it has
445 a working connection to it.
446
447 If the node is known but not currently connected, returns C<0>. If the
448 node is not known, returns C<undef>.
449
450 =cut
451
452 sub node_is_up($) {
453 ($NODE{$_[0]} or return)->{transport}
454 ? 1 : 0
455 }
456
457 =item known_nodes
458
459 Returns the node IDs of all nodes currently known to this node, including
460 itself and nodes not currently connected.
461
462 =cut
463
464 sub known_nodes {
465 map $_->{id}, values %NODE
466 }
467
468 =item up_nodes
469
470 Return the node IDs of all nodes that are currently connected (excluding
471 the node itself).
472
473 =cut
474
475 sub up_nodes {
476 map $_->{id}, grep $_->{transport}, values %NODE
477 }
478
479 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
480
481 Registers a callback that is called each time a node goes up (a connection
482 is established) or down (the connection is lost).
483
484 Node up messages can only be followed by node down messages for the same
485 node, and vice versa.
486
487 Note that monitoring a node is usually better done by monitoring it's node
488 port. This function is mainly of interest to modules that are concerned
489 about the network topology and low-level connection handling.
490
491 Callbacks I<must not> block and I<should not> send any messages.
492
493 The function returns an optional guard which can be used to unregister
494 the monitoring callback again.
495
496 =cut
497
498 our %MON_NODES;
499
500 sub mon_nodes($) {
501 my ($cb) = @_;
502
503 $MON_NODES{$cb+0} = $cb;
504
505 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
506 }
507
508 sub _inject_nodeevent($$;@) {
509 my ($node, $up, @reason) = @_;
510
511 for my $cb (values %MON_NODES) {
512 eval { $cb->($node->{id}, $up, @reason); 1 }
513 or $WARN->(1, $@);
514 }
515
516 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
517 }
518
519 #############################################################################
520 # self node code
521
522 our %node_req = (
523 # internal services
524
525 # monitoring
526 mon0 => sub { # stop monitoring a port
527 my $portid = shift;
528 my $node = $SRCNODE;
529 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
530 },
531 mon1 => sub { # start monitoring a port
532 my $portid = shift;
533 my $node = $SRCNODE;
534 Scalar::Util::weaken $node; #TODO# ugly
535 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
536 $node->send (["", kil => $portid, @_])
537 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
538 });
539 },
540 kil => sub {
541 my $cbs = delete $SRCNODE->{lmon}{+shift}
542 or return;
543
544 $_->(@_) for @$cbs;
545 },
546
547 # "public" services - not actually public
548
549 # relay message to another node / generic echo
550 snd => \&snd,
551 snd_multiple => sub {
552 snd @$_ for @_
553 },
554
555 # informational
556 info => sub {
557 snd @_, $NODE;
558 },
559 known_nodes => sub {
560 snd @_, known_nodes;
561 },
562 up_nodes => sub {
563 snd @_, up_nodes;
564 },
565
566 # random utilities
567 eval => sub {
568 my @res = eval shift;
569 snd @_, "$@", @res if @_;
570 },
571 time => sub {
572 snd @_, AE::time;
573 },
574 devnull => sub {
575 #
576 },
577 "" => sub {
578 # empty messages are keepalives or similar devnull-applications
579 },
580 );
581
582 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
583 $PORT{""} = sub {
584 my $tag = shift;
585 eval { &{ $node_req{$tag} ||= load_func $tag } };
586 $WARN->(2, "error processing node message: $@") if $@;
587 };
588
589 =back
590
591 =head1 SEE ALSO
592
593 L<AnyEvent::MP>.
594
595 =head1 AUTHOR
596
597 Marc Lehmann <schmorp@schmorp.de>
598 http://home.schmorp.de/
599
600 =cut
601
602 1
603