ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.40
Committed: Fri Sep 4 21:01:22 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.39: +2 -4 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '1.0';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil port_is_local
44 configure
45 known_nodes up_nodes mon_nodes node_is_known node_is_up
46 );
47
48 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49
50 This value is called with an error or warning message, when e.g. a
51 connection could not be created, authorisation failed and so on.
52
53 It I<must not> block or send messages -queue it and use an idle watcher if
54 you need to do any of these things.
55
56 C<$level> should be C<0> for messages to be logged always, C<1> for
57 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58 node connectivity and services, C<8> for debugging messages and C<9> for
59 tracing messages.
60
61 The default simply logs the message to STDERR.
62
63 =cut
64
65 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
66
67 our $WARN = sub {
68 return if $WARNLEVEL < $_[0];
69
70 my ($level, $msg) = @_;
71
72 $msg =~ s/\n$//;
73
74 printf STDERR "%s <%d> %s\n",
75 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
76 $level,
77 $msg;
78 };
79
80 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
81
82 The maximum level at which warning messages will be printed to STDERR by
83 the default warn handler.
84
85 =cut
86
87 sub load_func($) {
88 my $func = $_[0];
89
90 unless (defined &$func) {
91 my $pkg = $func;
92 do {
93 $pkg =~ s/::[^:]+$//
94 or return sub { die "unable to resolve '$func'" };
95 eval "require $pkg";
96 } until defined &$func;
97 }
98
99 \&$func
100 }
101
102 sub nonce($) {
103 my $nonce;
104
105 if (open my $fh, "</dev/urandom") {
106 sysread $fh, $nonce, $_[0];
107 } else {
108 # shit...
109 our $nonce_init;
110 unless ($nonce_init++) {
111 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
112 }
113 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
114 }
115
116 $nonce
117 }
118
119 sub alnumbits($) {
120 my $data = $_[0];
121
122 if (eval "use Math::GMP 2.05; 1") {
123 $data = Math::GMP::get_str_gmp (
124 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
125 62
126 );
127 } else {
128 $data = MIME::Base64::encode_base64 $data, "";
129 $data =~ s/=//;
130 $data =~ s/x/x0/g;
131 $data =~ s/\//x1/g;
132 $data =~ s/\+/x2/g;
133 }
134
135 $data
136 }
137
138 sub gen_uniq {
139 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
140 }
141
142 our $CONFIG; # this node's configuration
143
144 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
145 our $UNIQ = gen_uniq; # per-process/node unique cookie
146 our $NODE = "anon/$RUNIQ";
147 our $ID = "a";
148
149 our %NODE; # node id to transport mapping, or "undef", for local node
150 our (%PORT, %PORT_DATA); # local ports
151
152 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
153 our %LMON; # monitored _local_ ports
154
155 our %LISTENER;
156 our $LISTENER; # our listeners, as arrayref
157
158 our $SRCNODE; # holds the sending node during _inject
159
160 sub NODE() {
161 $NODE
162 }
163
164 sub node_of($) {
165 my ($node, undef) = split /#/, $_[0], 2;
166
167 $node
168 }
169
170 BEGIN {
171 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
172 ? sub () { 1 }
173 : sub () { 0 };
174 }
175
176 sub _inject {
177 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
178 &{ $PORT{+shift} or return };
179 }
180
181 # this function adds a node-ref, so you can send stuff to it
182 # it is basically the central routing component.
183 sub add_node {
184 my ($node) = @_;
185
186 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
187 }
188
189 sub snd(@) {
190 my ($nodeid, $portid) = split /#/, shift, 2;
191
192 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
193
194 ($NODE{$nodeid} || add_node $nodeid)
195 ->{send} (["$portid", @_]);
196 }
197
198 =item $is_local = port_is_local $port
199
200 Returns true iff the port is a local port.
201
202 =cut
203
204 sub port_is_local($) {
205 my ($nodeid, undef) = split /#/, $_[0], 2;
206
207 $NODE{$nodeid} == $NODE{""}
208 }
209
210 =item snd_to_func $node, $func, @args
211
212 Expects a node ID and a name of a function. Asynchronously tries to call
213 this function with the given arguments on that node.
214
215 This function can be used to implement C<spawn>-like interfaces.
216
217 =cut
218
219 sub snd_to_func($$;@) {
220 my $nodeid = shift;
221
222 ($NODE{$nodeid} || add_node $nodeid)
223 ->send (["", @_]);
224 }
225
226 =item snd_on $node, @msg
227
228 Executes C<snd> with the given C<@msg> (which must include the destination
229 port) on the given node.
230
231 =cut
232
233 sub snd_on($@) {
234 my $node = shift;
235 snd $node, snd => @_;
236 }
237
238 =item eval_on $node, $string[, @reply]
239
240 Evaluates the given string as Perl expression on the given node. When
241 @reply is specified, then it is used to construct a reply message with
242 C<"$@"> and any results from the eval appended.
243
244 =cut
245
246 sub eval_on($$;@) {
247 my $node = shift;
248 snd $node, eval => @_;
249 }
250
251 sub kil(@) {
252 my ($nodeid, $portid) = split /#/, shift, 2;
253
254 length $portid
255 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
256
257 ($NODE{$nodeid} || add_node $nodeid)
258 ->kill ("$portid", @_);
259 }
260
261 sub _nodename {
262 require POSIX;
263 (POSIX::uname ())[1]
264 }
265
266 sub _resolve($) {
267 my ($nodeid) = @_;
268
269 my $cv = AE::cv;
270 my @res;
271
272 $cv->begin (sub {
273 my %seen;
274 my @refs;
275 for (sort { $a->[0] <=> $b->[0] } @res) {
276 push @refs, $_->[1] unless $seen{$_->[1]}++
277 }
278 shift->send (@refs);
279 });
280
281 my $idx;
282 for my $t (split /,/, $nodeid) {
283 my $pri = ++$idx;
284
285 $t = length $t ? _nodename . ":$t" : _nodename
286 if $t =~ /^\d*$/;
287
288 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
289 or Carp::croak "$t: unparsable transport descriptor";
290
291 $port = "0" if $port eq "*";
292
293 if ($host eq "*") {
294 $cv->begin;
295 # use fork_call, as Net::Interface is big, and we need it rarely.
296 require AnyEvent::Util;
297 AnyEvent::Util::fork_call (
298 sub {
299 my @addr;
300
301 require Net::Interface;
302
303 for my $if (Net::Interface->interfaces) {
304 # we statically lower-prioritise ipv6 here, TODO :()
305 for my $_ ($if->address (Net::Interface::AF_INET ())) {
306 next if /^\x7f/; # skip localhost etc.
307 push @addr, $_;
308 }
309 for ($if->address (Net::Interface::AF_INET6 ())) {
310 #next if $if->scope ($_) <= 2;
311 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
312 push @addr, $_;
313 }
314
315 }
316 @addr
317 }, sub {
318 for my $ip (@_) {
319 push @res, [
320 $pri += 1e-5,
321 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
322 ];
323 }
324 $cv->end;
325 }
326 );
327 } else {
328 $cv->begin;
329 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
330 for (@_) {
331 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
332 push @res, [
333 $pri += 1e-5,
334 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
335 ];
336 }
337 $cv->end;
338 };
339 }
340 }
341
342 $cv->end;
343
344 $cv
345 }
346
347 sub configure(@) {
348 unshift @_, "profile" if @_ & 1;
349 my (%kv) = @_;
350
351 my $profile = delete $kv{profile};
352
353 $profile = _nodename
354 unless defined $profile;
355
356 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
357
358 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
359 $NODE = $node
360 unless $node eq "anon/";
361
362 $NODE{$NODE} = $NODE{""};
363 $NODE{$NODE}{id} = $NODE;
364
365 my $seeds = $CONFIG->{seeds};
366 my $binds = $CONFIG->{binds};
367
368 $binds ||= ["*"];
369
370 $WARN->(8, "node $NODE starting up.");
371
372 $LISTENER = [];
373 %LISTENER = ();
374
375 for (map _resolve $_, @$binds) {
376 for my $bind ($_->recv) {
377 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
378 or Carp::croak "$bind: unparsable local bind address";
379
380 my $listener = AnyEvent::MP::Transport::mp_server
381 $host,
382 $port,
383 prepare => sub {
384 my (undef, $host, $port) = @_;
385 $bind = AnyEvent::Socket::format_hostport $host, $port;
386 },
387 ;
388 $LISTENER{$bind} = $listener;
389 push @$LISTENER, $bind;
390 }
391 }
392
393 $WARN->(8, "node listens on [@$LISTENER].");
394
395 # the global service is mandatory currently
396 require AnyEvent::MP::Global;
397
398 # connect to all seednodes
399 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
400
401 for (@{ $CONFIG->{services} }) {
402 if (s/::$//) {
403 eval "require $_";
404 die $@ if $@;
405 } else {
406 (load_func $_)->();
407 }
408 }
409 }
410
411 #############################################################################
412 # node monitoring and info
413
414 =item node_is_known $nodeid
415
416 Returns true iff the given node is currently known to the system.
417
418 =cut
419
420 sub node_is_known($) {
421 exists $NODE{$_[0]}
422 }
423
424 =item node_is_up $nodeid
425
426 Returns true if the given node is "up", that is, the kernel thinks it has
427 a working connection to it.
428
429 If the node is known but not currently connected, returns C<0>. If the
430 node is not known, returns C<undef>.
431
432 =cut
433
434 sub node_is_up($) {
435 ($NODE{$_[0]} or return)->{transport}
436 ? 1 : 0
437 }
438
439 =item known_nodes
440
441 Returns the node IDs of all nodes currently known to this node, including
442 itself and nodes not currently connected.
443
444 =cut
445
446 sub known_nodes {
447 map $_->{id}, values %NODE
448 }
449
450 =item up_nodes
451
452 Return the node IDs of all nodes that are currently connected (excluding
453 the node itself).
454
455 =cut
456
457 sub up_nodes {
458 map $_->{id}, grep $_->{transport}, values %NODE
459 }
460
461 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
462
463 Registers a callback that is called each time a node goes up (a connection
464 is established) or down (the connection is lost).
465
466 Node up messages can only be followed by node down messages for the same
467 node, and vice versa.
468
469 Note that monitoring a node is usually better done by monitoring it's node
470 port. This function is mainly of interest to modules that are concerned
471 about the network topology and low-level connection handling.
472
473 Callbacks I<must not> block and I<should not> send any messages.
474
475 The function returns an optional guard which can be used to unregister
476 the monitoring callback again.
477
478 =cut
479
480 our %MON_NODES;
481
482 sub mon_nodes($) {
483 my ($cb) = @_;
484
485 $MON_NODES{$cb+0} = $cb;
486
487 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
488 }
489
490 sub _inject_nodeevent($$;@) {
491 my ($node, $up, @reason) = @_;
492
493 for my $cb (values %MON_NODES) {
494 eval { $cb->($node->{id}, $up, @reason); 1 }
495 or $WARN->(1, $@);
496 }
497
498 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
499 }
500
501 #############################################################################
502 # self node code
503
504 our %node_req = (
505 # internal services
506
507 # monitoring
508 mon0 => sub { # stop monitoring a port
509 my $portid = shift;
510 my $node = $SRCNODE;
511 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
512 },
513 mon1 => sub { # start monitoring a port
514 my $portid = shift;
515 my $node = $SRCNODE;
516 Scalar::Util::weaken $node; #TODO# ugly
517 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
518 $node->send (["", kil => $portid, @_])
519 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
520 });
521 },
522 kil => sub {
523 my $cbs = delete $SRCNODE->{lmon}{+shift}
524 or return;
525
526 $_->(@_) for @$cbs;
527 },
528
529 # "public" services - not actually public
530
531 # relay message to another node / generic echo
532 snd => \&snd,
533 snd_multiple => sub {
534 snd @$_ for @_
535 },
536
537 # informational
538 info => sub {
539 snd @_, $NODE;
540 },
541 known_nodes => sub {
542 snd @_, known_nodes;
543 },
544 up_nodes => sub {
545 snd @_, up_nodes;
546 },
547
548 # random utilities
549 eval => sub {
550 my @res = eval shift;
551 snd @_, "$@", @res if @_;
552 },
553 time => sub {
554 snd @_, AE::time;
555 },
556 devnull => sub {
557 #
558 },
559 "" => sub {
560 # empty messages are keepalives or similar devnull-applications
561 },
562 );
563
564 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
565 $PORT{""} = sub {
566 my $tag = shift;
567 eval { &{ $node_req{$tag} ||= load_func $tag } };
568 $WARN->(2, "error processing node message: $@") if $@;
569 };
570
571 =back
572
573 =head1 SEE ALSO
574
575 L<AnyEvent::MP>.
576
577 =head1 AUTHOR
578
579 Marc Lehmann <schmorp@schmorp.de>
580 http://home.schmorp.de/
581
582 =cut
583
584 1
585