ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.45
Committed: Mon Sep 7 12:04:32 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.44: +4 -5 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '1.0';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil port_is_local
44 configure
45 known_nodes up_nodes mon_nodes node_is_known node_is_up
46 );
47
48 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49
50 This value is called with an error or warning message, when e.g. a
51 connection could not be created, authorisation failed and so on.
52
53 It I<must not> block or send messages -queue it and use an idle watcher if
54 you need to do any of these things.
55
56 C<$level> should be C<0> for messages to be logged always, C<1> for
57 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58 node connectivity and services, C<8> for debugging messages and C<9> for
59 tracing messages.
60
61 The default simply logs the message to STDERR.
62
63 =item @AnyEvent::MP::Kernel::WARN
64
65 All code references in this array are called for every log message, from
66 the default C<$WARN> handler. This is an easy way to tie into the log
67 messages without disturbing others.
68
69 =cut
70
71 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
72 our @WARN;
73 our $WARN = sub {
74 &$_ for @WARN;
75
76 return if $WARNLEVEL < $_[0];
77
78 my ($level, $msg) = @_;
79
80 $msg =~ s/\n$//;
81
82 printf STDERR "%s <%d> %s\n",
83 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
84 $level,
85 $msg;
86 };
87
88 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
89
90 The maximum level at which warning messages will be printed to STDERR by
91 the default warn handler.
92
93 =cut
94
95 sub load_func($) {
96 my $func = $_[0];
97
98 unless (defined &$func) {
99 my $pkg = $func;
100 do {
101 $pkg =~ s/::[^:]+$//
102 or return sub { die "unable to resolve '$func'" };
103 eval "require $pkg";
104 } until defined &$func;
105 }
106
107 \&$func
108 }
109
110 sub nonce($) {
111 my $nonce;
112
113 if (open my $fh, "</dev/urandom") {
114 sysread $fh, $nonce, $_[0];
115 } else {
116 # shit...
117 our $nonce_init;
118 unless ($nonce_init++) {
119 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
120 }
121 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
122 }
123
124 $nonce
125 }
126
127 sub alnumbits($) {
128 my $data = $_[0];
129
130 if (eval "use Math::GMP 2.05; 1") {
131 $data = Math::GMP::get_str_gmp (
132 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
133 62
134 );
135 } else {
136 $data = MIME::Base64::encode_base64 $data, "";
137 $data =~ s/=//;
138 $data =~ s/x/x0/g;
139 $data =~ s/\//x1/g;
140 $data =~ s/\+/x2/g;
141 }
142
143 $data
144 }
145
146 sub gen_uniq {
147 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
148 }
149
150 our $CONFIG; # this node's configuration
151
152 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
153 our $UNIQ = gen_uniq; # per-process/node unique cookie
154 our $NODE = "anon/$RUNIQ";
155 our $ID = "a";
156
157 our %NODE; # node id to transport mapping, or "undef", for local node
158 our (%PORT, %PORT_DATA); # local ports
159
160 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
161 our %LMON; # monitored _local_ ports
162
163 our %LISTENER;
164 our $LISTENER; # our listeners, as arrayref
165
166 our $SRCNODE; # holds the sending node during _inject
167
168 sub NODE() {
169 $NODE
170 }
171
172 sub node_of($) {
173 my ($node, undef) = split /#/, $_[0], 2;
174
175 $node
176 }
177
178 BEGIN {
179 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
180 ? sub () { 1 }
181 : sub () { 0 };
182 }
183
184 our $DELAY_TIMER;
185 our @DELAY_QUEUE;
186
187 sub _delay_run {
188 (shift @DELAY_QUEUE or return)->() while 1;
189 }
190
191 sub delay($) {
192 push @DELAY_QUEUE, shift;
193 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
194 }
195
196 sub _inject {
197 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
198 &{ $PORT{+shift} or return };
199 }
200
201 # this function adds a node-ref, so you can send stuff to it
202 # it is basically the central routing component.
203 sub add_node {
204 my ($node) = @_;
205
206 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
207 }
208
209 sub snd(@) {
210 my ($nodeid, $portid) = split /#/, shift, 2;
211
212 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
213
214 ($NODE{$nodeid} || add_node $nodeid)
215 ->{send} (["$portid", @_]);
216 }
217
218 =item $is_local = port_is_local $port
219
220 Returns true iff the port is a local port.
221
222 =cut
223
224 sub port_is_local($) {
225 my ($nodeid, undef) = split /#/, $_[0], 2;
226
227 $NODE{$nodeid} == $NODE{""}
228 }
229
230 =item snd_to_func $node, $func, @args
231
232 Expects a node ID and a name of a function. Asynchronously tries to call
233 this function with the given arguments on that node.
234
235 This function can be used to implement C<spawn>-like interfaces.
236
237 =cut
238
239 sub snd_to_func($$;@) {
240 my $nodeid = shift;
241
242 # on $NODE, we artificially delay... (for spawn)
243 # this is very ugly - maybe we should simply delay ALL messages,
244 # to avoid deep recursion issues. but that's so... slow...
245 $AnyEvent::MP::Node::Self::DELAY = 1
246 if $nodeid ne $NODE;
247
248 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
249 }
250
251 =item snd_on $node, @msg
252
253 Executes C<snd> with the given C<@msg> (which must include the destination
254 port) on the given node.
255
256 =cut
257
258 sub snd_on($@) {
259 my $node = shift;
260 snd $node, snd => @_;
261 }
262
263 =item eval_on $node, $string[, @reply]
264
265 Evaluates the given string as Perl expression on the given node. When
266 @reply is specified, then it is used to construct a reply message with
267 C<"$@"> and any results from the eval appended.
268
269 =cut
270
271 sub eval_on($$;@) {
272 my $node = shift;
273 snd $node, eval => @_;
274 }
275
276 sub kil(@) {
277 my ($nodeid, $portid) = split /#/, shift, 2;
278
279 length $portid
280 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
281
282 ($NODE{$nodeid} || add_node $nodeid)
283 ->kill ("$portid", @_);
284 }
285
286 sub _nodename {
287 require POSIX;
288 (POSIX::uname ())[1]
289 }
290
291 sub _resolve($) {
292 my ($nodeid) = @_;
293
294 my $cv = AE::cv;
295 my @res;
296
297 $cv->begin (sub {
298 my %seen;
299 my @refs;
300 for (sort { $a->[0] <=> $b->[0] } @res) {
301 push @refs, $_->[1] unless $seen{$_->[1]}++
302 }
303 shift->send (@refs);
304 });
305
306 my $idx;
307 for my $t (split /,/, $nodeid) {
308 my $pri = ++$idx;
309
310 $t = length $t ? _nodename . ":$t" : _nodename
311 if $t =~ /^\d*$/;
312
313 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
314 or Carp::croak "$t: unparsable transport descriptor";
315
316 $port = "0" if $port eq "*";
317
318 if ($host eq "*") {
319 $cv->begin;
320 # use fork_call, as Net::Interface is big, and we need it rarely.
321 require AnyEvent::Util;
322 AnyEvent::Util::fork_call (
323 sub {
324 my @addr;
325
326 require Net::Interface;
327
328 for my $if (Net::Interface->interfaces) {
329 # we statically lower-prioritise ipv6 here, TODO :()
330 for my $_ ($if->address (Net::Interface::AF_INET ())) {
331 next if /^\x7f/; # skip localhost etc.
332 push @addr, $_;
333 }
334 for ($if->address (Net::Interface::AF_INET6 ())) {
335 #next if $if->scope ($_) <= 2;
336 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
337 push @addr, $_;
338 }
339
340 }
341 @addr
342 }, sub {
343 for my $ip (@_) {
344 push @res, [
345 $pri += 1e-5,
346 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
347 ];
348 }
349 $cv->end;
350 }
351 );
352 } else {
353 $cv->begin;
354 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
355 for (@_) {
356 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
357 push @res, [
358 $pri += 1e-5,
359 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
360 ];
361 }
362 $cv->end;
363 };
364 }
365 }
366
367 $cv->end;
368
369 $cv
370 }
371
372 sub configure(@) {
373 unshift @_, "profile" if @_ & 1;
374 my (%kv) = @_;
375
376 my $profile = delete $kv{profile};
377
378 $profile = _nodename
379 unless defined $profile;
380
381 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
382
383 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
384 $NODE = $node
385 unless $node eq "anon/";
386
387 $NODE{$NODE} = $NODE{""};
388 $NODE{$NODE}{id} = $NODE;
389
390 my $seeds = $CONFIG->{seeds};
391 my $binds = $CONFIG->{binds};
392
393 $binds ||= ["*"];
394
395 $WARN->(8, "node $NODE starting up.");
396
397 $LISTENER = [];
398 %LISTENER = ();
399
400 for (map _resolve $_, @$binds) {
401 for my $bind ($_->recv) {
402 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
403 or Carp::croak "$bind: unparsable local bind address";
404
405 my $listener = AnyEvent::MP::Transport::mp_server
406 $host,
407 $port,
408 prepare => sub {
409 my (undef, $host, $port) = @_;
410 $bind = AnyEvent::Socket::format_hostport $host, $port;
411 },
412 ;
413 $LISTENER{$bind} = $listener;
414 push @$LISTENER, $bind;
415 }
416 }
417
418 $WARN->(8, "node listens on [@$LISTENER].");
419
420 # the global service is mandatory currently
421 require AnyEvent::MP::Global;
422
423 # connect to all seednodes
424 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
425
426 for (@{ $CONFIG->{services} }) {
427 if (ref) {
428 my ($func, @args) = @$_;
429 (load_func $func)->(@args);
430 } elsif (s/::$//) {
431 eval "require $_";
432 die $@ if $@;
433 } else {
434 (load_func $_)->();
435 }
436 }
437 }
438
439 #############################################################################
440 # node monitoring and info
441
442 =item node_is_known $nodeid
443
444 Returns true iff the given node is currently known to the system.
445
446 =cut
447
448 sub node_is_known($) {
449 exists $NODE{$_[0]}
450 }
451
452 =item node_is_up $nodeid
453
454 Returns true if the given node is "up", that is, the kernel thinks it has
455 a working connection to it.
456
457 If the node is known but not currently connected, returns C<0>. If the
458 node is not known, returns C<undef>.
459
460 =cut
461
462 sub node_is_up($) {
463 ($NODE{$_[0]} or return)->{transport}
464 ? 1 : 0
465 }
466
467 =item known_nodes
468
469 Returns the node IDs of all nodes currently known to this node, including
470 itself and nodes not currently connected.
471
472 =cut
473
474 sub known_nodes {
475 map $_->{id}, values %NODE
476 }
477
478 =item up_nodes
479
480 Return the node IDs of all nodes that are currently connected (excluding
481 the node itself).
482
483 =cut
484
485 sub up_nodes {
486 map $_->{id}, grep $_->{transport}, values %NODE
487 }
488
489 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
490
491 Registers a callback that is called each time a node goes up (a connection
492 is established) or down (the connection is lost).
493
494 Node up messages can only be followed by node down messages for the same
495 node, and vice versa.
496
497 Note that monitoring a node is usually better done by monitoring it's node
498 port. This function is mainly of interest to modules that are concerned
499 about the network topology and low-level connection handling.
500
501 Callbacks I<must not> block and I<should not> send any messages.
502
503 The function returns an optional guard which can be used to unregister
504 the monitoring callback again.
505
506 =cut
507
508 our %MON_NODES;
509
510 sub mon_nodes($) {
511 my ($cb) = @_;
512
513 $MON_NODES{$cb+0} = $cb;
514
515 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
516 }
517
518 sub _inject_nodeevent($$;@) {
519 my ($node, $up, @reason) = @_;
520
521 for my $cb (values %MON_NODES) {
522 eval { $cb->($node->{id}, $up, @reason); 1 }
523 or $WARN->(1, $@);
524 }
525
526 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
527 }
528
529 #############################################################################
530 # self node code
531
532 our %node_req = (
533 # internal services
534
535 # monitoring
536 mon0 => sub { # stop monitoring a port
537 my $portid = shift;
538 my $node = $SRCNODE;
539 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
540 },
541 mon1 => sub { # start monitoring a port
542 my $portid = shift;
543 my $node = $SRCNODE;
544 Scalar::Util::weaken $node; #TODO# ugly
545 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
546 $node->send (["", kil => $portid, @_])
547 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
548 });
549 },
550 kil => sub {
551 my $cbs = delete $SRCNODE->{lmon}{+shift}
552 or return;
553
554 $_->(@_) for @$cbs;
555 },
556
557 # "public" services - not actually public
558
559 # relay message to another node / generic echo
560 snd => \&snd,
561 snd_multiple => sub {
562 snd @$_ for @_
563 },
564
565 # informational
566 info => sub {
567 snd @_, $NODE;
568 },
569 known_nodes => sub {
570 snd @_, known_nodes;
571 },
572 up_nodes => sub {
573 snd @_, up_nodes;
574 },
575
576 # random utilities
577 eval => sub {
578 my @res = eval shift;
579 snd @_, "$@", @res if @_;
580 },
581 time => sub {
582 snd @_, AE::time;
583 },
584 devnull => sub {
585 #
586 },
587 "" => sub {
588 # empty messages are keepalives or similar devnull-applications
589 },
590 );
591
592 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
593 $PORT{""} = sub {
594 my $tag = shift;
595 eval { &{ $node_req{$tag} ||= load_func $tag } };
596 $WARN->(2, "error processing node message: $@") if $@;
597 };
598
599 =back
600
601 =head1 SEE ALSO
602
603 L<AnyEvent::MP>.
604
605 =head1 AUTHOR
606
607 Marc Lehmann <schmorp@schmorp.de>
608 http://home.schmorp.de/
609
610 =cut
611
612 1
613