ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.41
Committed: Fri Sep 4 22:30:29 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.40: +11 -2 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '1.0';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil port_is_local
44 configure
45 known_nodes up_nodes mon_nodes node_is_known node_is_up
46 );
47
48 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49
50 This value is called with an error or warning message, when e.g. a
51 connection could not be created, authorisation failed and so on.
52
53 It I<must not> block or send messages -queue it and use an idle watcher if
54 you need to do any of these things.
55
56 C<$level> should be C<0> for messages to be logged always, C<1> for
57 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58 node connectivity and services, C<8> for debugging messages and C<9> for
59 tracing messages.
60
61 The default simply logs the message to STDERR.
62
63 =cut
64
65 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
66
67 our $WARN = sub {
68 return if $WARNLEVEL < $_[0];
69
70 my ($level, $msg) = @_;
71
72 $msg =~ s/\n$//;
73
74 printf STDERR "%s <%d> %s\n",
75 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
76 $level,
77 $msg;
78 };
79
80 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
81
82 The maximum level at which warning messages will be printed to STDERR by
83 the default warn handler.
84
85 =cut
86
87 sub load_func($) {
88 my $func = $_[0];
89
90 unless (defined &$func) {
91 my $pkg = $func;
92 do {
93 $pkg =~ s/::[^:]+$//
94 or return sub { die "unable to resolve '$func'" };
95 eval "require $pkg";
96 } until defined &$func;
97 }
98
99 \&$func
100 }
101
102 sub nonce($) {
103 my $nonce;
104
105 if (open my $fh, "</dev/urandom") {
106 sysread $fh, $nonce, $_[0];
107 } else {
108 # shit...
109 our $nonce_init;
110 unless ($nonce_init++) {
111 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
112 }
113 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
114 }
115
116 $nonce
117 }
118
119 sub alnumbits($) {
120 my $data = $_[0];
121
122 if (eval "use Math::GMP 2.05; 1") {
123 $data = Math::GMP::get_str_gmp (
124 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
125 62
126 );
127 } else {
128 $data = MIME::Base64::encode_base64 $data, "";
129 $data =~ s/=//;
130 $data =~ s/x/x0/g;
131 $data =~ s/\//x1/g;
132 $data =~ s/\+/x2/g;
133 }
134
135 $data
136 }
137
138 sub gen_uniq {
139 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
140 }
141
142 our $CONFIG; # this node's configuration
143
144 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
145 our $UNIQ = gen_uniq; # per-process/node unique cookie
146 our $NODE = "anon/$RUNIQ";
147 our $ID = "a";
148
149 our %NODE; # node id to transport mapping, or "undef", for local node
150 our (%PORT, %PORT_DATA); # local ports
151
152 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
153 our %LMON; # monitored _local_ ports
154
155 our %LISTENER;
156 our $LISTENER; # our listeners, as arrayref
157
158 our $SRCNODE; # holds the sending node during _inject
159
160 sub NODE() {
161 $NODE
162 }
163
164 sub node_of($) {
165 my ($node, undef) = split /#/, $_[0], 2;
166
167 $node
168 }
169
170 BEGIN {
171 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
172 ? sub () { 1 }
173 : sub () { 0 };
174 }
175
176 sub _inject {
177 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
178 &{ $PORT{+shift} or return };
179 }
180
181 # this function adds a node-ref, so you can send stuff to it
182 # it is basically the central routing component.
183 sub add_node {
184 my ($node) = @_;
185
186 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
187 }
188
189 sub snd(@) {
190 my ($nodeid, $portid) = split /#/, shift, 2;
191
192 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
193
194 ($NODE{$nodeid} || add_node $nodeid)
195 ->{send} (["$portid", @_]);
196 }
197
198 =item $is_local = port_is_local $port
199
200 Returns true iff the port is a local port.
201
202 =cut
203
204 sub port_is_local($) {
205 my ($nodeid, undef) = split /#/, $_[0], 2;
206
207 $NODE{$nodeid} == $NODE{""}
208 }
209
210 =item snd_to_func $node, $func, @args
211
212 Expects a node ID and a name of a function. Asynchronously tries to call
213 this function with the given arguments on that node.
214
215 This function can be used to implement C<spawn>-like interfaces.
216
217 =cut
218
219 sub snd_to_func($$;@) {
220 my $nodeid = shift;
221
222 return ($NODE{$nodeid} || add_node $nodeid)->send (["", @_])
223 if $nodeid ne $NODE;
224
225 # on $NODE, we artificially delay... (for spawn)
226 # this is very ugly - maybe we should simply delay ALL messages,
227 # to avoid deep recursion issues. but that's so... slow...
228 my @msg = ("", @_);
229 my $to; $to = AE::timer 0, 0, sub {
230 undef $to;
231 $NODE{""}->send (\@msg);
232 };
233 }
234
235 =item snd_on $node, @msg
236
237 Executes C<snd> with the given C<@msg> (which must include the destination
238 port) on the given node.
239
240 =cut
241
242 sub snd_on($@) {
243 my $node = shift;
244 snd $node, snd => @_;
245 }
246
247 =item eval_on $node, $string[, @reply]
248
249 Evaluates the given string as Perl expression on the given node. When
250 @reply is specified, then it is used to construct a reply message with
251 C<"$@"> and any results from the eval appended.
252
253 =cut
254
255 sub eval_on($$;@) {
256 my $node = shift;
257 snd $node, eval => @_;
258 }
259
260 sub kil(@) {
261 my ($nodeid, $portid) = split /#/, shift, 2;
262
263 length $portid
264 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
265
266 ($NODE{$nodeid} || add_node $nodeid)
267 ->kill ("$portid", @_);
268 }
269
270 sub _nodename {
271 require POSIX;
272 (POSIX::uname ())[1]
273 }
274
275 sub _resolve($) {
276 my ($nodeid) = @_;
277
278 my $cv = AE::cv;
279 my @res;
280
281 $cv->begin (sub {
282 my %seen;
283 my @refs;
284 for (sort { $a->[0] <=> $b->[0] } @res) {
285 push @refs, $_->[1] unless $seen{$_->[1]}++
286 }
287 shift->send (@refs);
288 });
289
290 my $idx;
291 for my $t (split /,/, $nodeid) {
292 my $pri = ++$idx;
293
294 $t = length $t ? _nodename . ":$t" : _nodename
295 if $t =~ /^\d*$/;
296
297 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
298 or Carp::croak "$t: unparsable transport descriptor";
299
300 $port = "0" if $port eq "*";
301
302 if ($host eq "*") {
303 $cv->begin;
304 # use fork_call, as Net::Interface is big, and we need it rarely.
305 require AnyEvent::Util;
306 AnyEvent::Util::fork_call (
307 sub {
308 my @addr;
309
310 require Net::Interface;
311
312 for my $if (Net::Interface->interfaces) {
313 # we statically lower-prioritise ipv6 here, TODO :()
314 for my $_ ($if->address (Net::Interface::AF_INET ())) {
315 next if /^\x7f/; # skip localhost etc.
316 push @addr, $_;
317 }
318 for ($if->address (Net::Interface::AF_INET6 ())) {
319 #next if $if->scope ($_) <= 2;
320 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
321 push @addr, $_;
322 }
323
324 }
325 @addr
326 }, sub {
327 for my $ip (@_) {
328 push @res, [
329 $pri += 1e-5,
330 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
331 ];
332 }
333 $cv->end;
334 }
335 );
336 } else {
337 $cv->begin;
338 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
339 for (@_) {
340 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
341 push @res, [
342 $pri += 1e-5,
343 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
344 ];
345 }
346 $cv->end;
347 };
348 }
349 }
350
351 $cv->end;
352
353 $cv
354 }
355
356 sub configure(@) {
357 unshift @_, "profile" if @_ & 1;
358 my (%kv) = @_;
359
360 my $profile = delete $kv{profile};
361
362 $profile = _nodename
363 unless defined $profile;
364
365 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
366
367 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
368 $NODE = $node
369 unless $node eq "anon/";
370
371 $NODE{$NODE} = $NODE{""};
372 $NODE{$NODE}{id} = $NODE;
373
374 my $seeds = $CONFIG->{seeds};
375 my $binds = $CONFIG->{binds};
376
377 $binds ||= ["*"];
378
379 $WARN->(8, "node $NODE starting up.");
380
381 $LISTENER = [];
382 %LISTENER = ();
383
384 for (map _resolve $_, @$binds) {
385 for my $bind ($_->recv) {
386 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
387 or Carp::croak "$bind: unparsable local bind address";
388
389 my $listener = AnyEvent::MP::Transport::mp_server
390 $host,
391 $port,
392 prepare => sub {
393 my (undef, $host, $port) = @_;
394 $bind = AnyEvent::Socket::format_hostport $host, $port;
395 },
396 ;
397 $LISTENER{$bind} = $listener;
398 push @$LISTENER, $bind;
399 }
400 }
401
402 $WARN->(8, "node listens on [@$LISTENER].");
403
404 # the global service is mandatory currently
405 require AnyEvent::MP::Global;
406
407 # connect to all seednodes
408 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
409
410 for (@{ $CONFIG->{services} }) {
411 if (s/::$//) {
412 eval "require $_";
413 die $@ if $@;
414 } else {
415 (load_func $_)->();
416 }
417 }
418 }
419
420 #############################################################################
421 # node monitoring and info
422
423 =item node_is_known $nodeid
424
425 Returns true iff the given node is currently known to the system.
426
427 =cut
428
429 sub node_is_known($) {
430 exists $NODE{$_[0]}
431 }
432
433 =item node_is_up $nodeid
434
435 Returns true if the given node is "up", that is, the kernel thinks it has
436 a working connection to it.
437
438 If the node is known but not currently connected, returns C<0>. If the
439 node is not known, returns C<undef>.
440
441 =cut
442
443 sub node_is_up($) {
444 ($NODE{$_[0]} or return)->{transport}
445 ? 1 : 0
446 }
447
448 =item known_nodes
449
450 Returns the node IDs of all nodes currently known to this node, including
451 itself and nodes not currently connected.
452
453 =cut
454
455 sub known_nodes {
456 map $_->{id}, values %NODE
457 }
458
459 =item up_nodes
460
461 Return the node IDs of all nodes that are currently connected (excluding
462 the node itself).
463
464 =cut
465
466 sub up_nodes {
467 map $_->{id}, grep $_->{transport}, values %NODE
468 }
469
470 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
471
472 Registers a callback that is called each time a node goes up (a connection
473 is established) or down (the connection is lost).
474
475 Node up messages can only be followed by node down messages for the same
476 node, and vice versa.
477
478 Note that monitoring a node is usually better done by monitoring it's node
479 port. This function is mainly of interest to modules that are concerned
480 about the network topology and low-level connection handling.
481
482 Callbacks I<must not> block and I<should not> send any messages.
483
484 The function returns an optional guard which can be used to unregister
485 the monitoring callback again.
486
487 =cut
488
489 our %MON_NODES;
490
491 sub mon_nodes($) {
492 my ($cb) = @_;
493
494 $MON_NODES{$cb+0} = $cb;
495
496 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
497 }
498
499 sub _inject_nodeevent($$;@) {
500 my ($node, $up, @reason) = @_;
501
502 for my $cb (values %MON_NODES) {
503 eval { $cb->($node->{id}, $up, @reason); 1 }
504 or $WARN->(1, $@);
505 }
506
507 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
508 }
509
510 #############################################################################
511 # self node code
512
513 our %node_req = (
514 # internal services
515
516 # monitoring
517 mon0 => sub { # stop monitoring a port
518 my $portid = shift;
519 my $node = $SRCNODE;
520 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
521 },
522 mon1 => sub { # start monitoring a port
523 my $portid = shift;
524 my $node = $SRCNODE;
525 Scalar::Util::weaken $node; #TODO# ugly
526 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
527 $node->send (["", kil => $portid, @_])
528 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
529 });
530 },
531 kil => sub {
532 my $cbs = delete $SRCNODE->{lmon}{+shift}
533 or return;
534
535 $_->(@_) for @$cbs;
536 },
537
538 # "public" services - not actually public
539
540 # relay message to another node / generic echo
541 snd => \&snd,
542 snd_multiple => sub {
543 snd @$_ for @_
544 },
545
546 # informational
547 info => sub {
548 snd @_, $NODE;
549 },
550 known_nodes => sub {
551 snd @_, known_nodes;
552 },
553 up_nodes => sub {
554 snd @_, up_nodes;
555 },
556
557 # random utilities
558 eval => sub {
559 my @res = eval shift;
560 snd @_, "$@", @res if @_;
561 },
562 time => sub {
563 snd @_, AE::time;
564 },
565 devnull => sub {
566 #
567 },
568 "" => sub {
569 # empty messages are keepalives or similar devnull-applications
570 },
571 );
572
573 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
574 $PORT{""} = sub {
575 my $tag = shift;
576 eval { &{ $node_req{$tag} ||= load_func $tag } };
577 $WARN->(2, "error processing node message: $@") if $@;
578 };
579
580 =back
581
582 =head1 SEE ALSO
583
584 L<AnyEvent::MP>.
585
586 =head1 AUTHOR
587
588 Marc Lehmann <schmorp@schmorp.de>
589 http://home.schmorp.de/
590
591 =cut
592
593 1
594