ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.39
Committed: Thu Sep 3 20:16:36 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.38: +3 -2 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '1.0';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil port_is_local
44 configure
45 known_nodes up_nodes mon_nodes node_is_known node_is_up
46 );
47
48 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
49 our $NETWORK_LATENCY = 3; # activity timeout
50 our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
51
52 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
53
54 This value is called with an error or warning message, when e.g. a
55 connection could not be created, authorisation failed and so on.
56
57 It I<must not> block or send messages -queue it and use an idle watcher if
58 you need to do any of these things.
59
60 C<$level> should be C<0> for messages to be logged always, C<1> for
61 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62 node connectivity and services, C<8> for debugging messages and C<9> for
63 tracing messages.
64
65 The default simply logs the message to STDERR.
66
67 =cut
68
69 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
70
71 our $WARN = sub {
72 return if $WARNLEVEL < $_[0];
73
74 my ($level, $msg) = @_;
75
76 $msg =~ s/\n$//;
77
78 printf STDERR "%s <%d> %s\n",
79 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
80 $level,
81 $msg;
82 };
83
84 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
85
86 The maximum level at which warning messages will be printed to STDERR by
87 the default warn handler.
88
89 =cut
90
91 sub load_func($) {
92 my $func = $_[0];
93
94 unless (defined &$func) {
95 my $pkg = $func;
96 do {
97 $pkg =~ s/::[^:]+$//
98 or return sub { die "unable to resolve '$func'" };
99 eval "require $pkg";
100 } until defined &$func;
101 }
102
103 \&$func
104 }
105
106 sub nonce($) {
107 my $nonce;
108
109 if (open my $fh, "</dev/urandom") {
110 sysread $fh, $nonce, $_[0];
111 } else {
112 # shit...
113 our $nonce_init;
114 unless ($nonce_init++) {
115 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
116 }
117 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
118 }
119
120 $nonce
121 }
122
123 sub alnumbits($) {
124 my $data = $_[0];
125
126 if (eval "use Math::GMP 2.05; 1") {
127 $data = Math::GMP::get_str_gmp (
128 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
129 62
130 );
131 } else {
132 $data = MIME::Base64::encode_base64 $data, "";
133 $data =~ s/=//;
134 $data =~ s/x/x0/g;
135 $data =~ s/\//x1/g;
136 $data =~ s/\+/x2/g;
137 }
138
139 $data
140 }
141
142 sub gen_uniq {
143 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
144 }
145
146 our $CONFIG; # this node's configuration
147
148 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
149 our $UNIQ = gen_uniq; # per-process/node unique cookie
150 our $NODE = "anon/$RUNIQ";
151 our $ID = "a";
152
153 our %NODE; # node id to transport mapping, or "undef", for local node
154 our (%PORT, %PORT_DATA); # local ports
155
156 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
157 our %LMON; # monitored _local_ ports
158
159 our %LISTENER;
160 our $LISTENER; # our listeners, as arrayref
161
162 our $SRCNODE; # holds the sending node during _inject
163
164 sub NODE() {
165 $NODE
166 }
167
168 sub node_of($) {
169 my ($node, undef) = split /#/, $_[0], 2;
170
171 $node
172 }
173
174 BEGIN {
175 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
176 ? sub () { 1 }
177 : sub () { 0 };
178 }
179
180 sub _inject {
181 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
182 &{ $PORT{+shift} or return };
183 }
184
185 # this function adds a node-ref, so you can send stuff to it
186 # it is basically the central routing component.
187 sub add_node {
188 my ($node) = @_;
189
190 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
191 }
192
193 sub snd(@) {
194 my ($nodeid, $portid) = split /#/, shift, 2;
195
196 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
197
198 ($NODE{$nodeid} || add_node $nodeid)
199 ->{send} (["$portid", @_]);
200 }
201
202 =item $is_local = port_is_local $port
203
204 Returns true iff the port is a local port.
205
206 =cut
207
208 sub port_is_local($) {
209 my ($nodeid, undef) = split /#/, $_[0], 2;
210
211 $NODE{$nodeid} == $NODE{""}
212 }
213
214 =item snd_to_func $node, $func, @args
215
216 Expects a node ID and a name of a function. Asynchronously tries to call
217 this function with the given arguments on that node.
218
219 This function can be used to implement C<spawn>-like interfaces.
220
221 =cut
222
223 sub snd_to_func($$;@) {
224 my $nodeid = shift;
225
226 ($NODE{$nodeid} || add_node $nodeid)
227 ->send (["", @_]);
228 }
229
230 =item snd_on $node, @msg
231
232 Executes C<snd> with the given C<@msg> (which must include the destination
233 port) on the given node.
234
235 =cut
236
237 sub snd_on($@) {
238 my $node = shift;
239 snd $node, snd => @_;
240 }
241
242 =item eval_on $node, $string[, @reply]
243
244 Evaluates the given string as Perl expression on the given node. When
245 @reply is specified, then it is used to construct a reply message with
246 C<"$@"> and any results from the eval appended.
247
248 =cut
249
250 sub eval_on($$;@) {
251 my $node = shift;
252 snd $node, eval => @_;
253 }
254
255 sub kil(@) {
256 my ($nodeid, $portid) = split /#/, shift, 2;
257
258 length $portid
259 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
260
261 ($NODE{$nodeid} || add_node $nodeid)
262 ->kill ("$portid", @_);
263 }
264
265 sub _nodename {
266 require POSIX;
267 (POSIX::uname ())[1]
268 }
269
270 sub _resolve($) {
271 my ($nodeid) = @_;
272
273 my $cv = AE::cv;
274 my @res;
275
276 $cv->begin (sub {
277 my %seen;
278 my @refs;
279 for (sort { $a->[0] <=> $b->[0] } @res) {
280 push @refs, $_->[1] unless $seen{$_->[1]}++
281 }
282 shift->send (@refs);
283 });
284
285 my $idx;
286 for my $t (split /,/, $nodeid) {
287 my $pri = ++$idx;
288
289 $t = length $t ? _nodename . ":$t" : _nodename
290 if $t =~ /^\d*$/;
291
292 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
293 or Carp::croak "$t: unparsable transport descriptor";
294
295 $port = "0" if $port eq "*";
296
297 if ($host eq "*") {
298 $cv->begin;
299 # use fork_call, as Net::Interface is big, and we need it rarely.
300 require AnyEvent::Util;
301 AnyEvent::Util::fork_call (
302 sub {
303 my @addr;
304
305 require Net::Interface;
306
307 for my $if (Net::Interface->interfaces) {
308 # we statically lower-prioritise ipv6 here, TODO :()
309 for my $_ ($if->address (Net::Interface::AF_INET ())) {
310 next if /^\x7f/; # skip localhost etc.
311 push @addr, $_;
312 }
313 for ($if->address (Net::Interface::AF_INET6 ())) {
314 #next if $if->scope ($_) <= 2;
315 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
316 push @addr, $_;
317 }
318
319 }
320 @addr
321 }, sub {
322 for my $ip (@_) {
323 push @res, [
324 $pri += 1e-5,
325 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
326 ];
327 }
328 $cv->end;
329 }
330 );
331 } else {
332 $cv->begin;
333 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
334 for (@_) {
335 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
336 push @res, [
337 $pri += 1e-5,
338 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
339 ];
340 }
341 $cv->end;
342 };
343 }
344 }
345
346 $cv->end;
347
348 $cv
349 }
350
351 sub configure(@) {
352 unshift @_, "profile" if @_ & 1;
353 my (%kv) = @_;
354
355 my $profile = delete $kv{profile};
356
357 $profile = _nodename
358 unless defined $profile;
359
360 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
361
362 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
363 $NODE = $node
364 unless $node eq "anon/";
365
366 $NODE{$NODE} = $NODE{""};
367 $NODE{$NODE}{id} = $NODE;
368
369 my $seeds = $CONFIG->{seeds};
370 my $binds = $CONFIG->{binds};
371
372 $binds ||= ["*"];
373
374 $WARN->(8, "node $NODE starting up.");
375
376 $LISTENER = [];
377 %LISTENER = ();
378
379 for (map _resolve $_, @$binds) {
380 for my $bind ($_->recv) {
381 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
382 or Carp::croak "$bind: unparsable local bind address";
383
384 my $listener = AnyEvent::MP::Transport::mp_server
385 $host,
386 $port,
387 prepare => sub {
388 my (undef, $host, $port) = @_;
389 $bind = AnyEvent::Socket::format_hostport $host, $port;
390 },
391 ;
392 $LISTENER{$bind} = $listener;
393 push @$LISTENER, $bind;
394 }
395 }
396
397 # the global service is mandatory currently
398 require AnyEvent::MP::Global;
399
400 # connect to all seednodes
401 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
402
403 for (@{ $CONFIG->{services} }) {
404 if (s/::$//) {
405 eval "require $_";
406 die $@ if $@;
407 } else {
408 (load_func $_)->();
409 }
410 }
411 }
412
413 #############################################################################
414 # node monitoring and info
415
416 =item node_is_known $nodeid
417
418 Returns true iff the given node is currently known to the system.
419
420 =cut
421
422 sub node_is_known($) {
423 exists $NODE{$_[0]}
424 }
425
426 =item node_is_up $nodeid
427
428 Returns true if the given node is "up", that is, the kernel thinks it has
429 a working connection to it.
430
431 If the node is known but not currently connected, returns C<0>. If the
432 node is not known, returns C<undef>.
433
434 =cut
435
436 sub node_is_up($) {
437 ($NODE{$_[0]} or return)->{transport}
438 ? 1 : 0
439 }
440
441 =item known_nodes
442
443 Returns the node IDs of all nodes currently known to this node, including
444 itself and nodes not currently connected.
445
446 =cut
447
448 sub known_nodes {
449 map $_->{id}, values %NODE
450 }
451
452 =item up_nodes
453
454 Return the node IDs of all nodes that are currently connected (excluding
455 the node itself).
456
457 =cut
458
459 sub up_nodes {
460 map $_->{id}, grep $_->{transport}, values %NODE
461 }
462
463 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
464
465 Registers a callback that is called each time a node goes up (a connection
466 is established) or down (the connection is lost).
467
468 Node up messages can only be followed by node down messages for the same
469 node, and vice versa.
470
471 Note that monitoring a node is usually better done by monitoring it's node
472 port. This function is mainly of interest to modules that are concerned
473 about the network topology and low-level connection handling.
474
475 Callbacks I<must not> block and I<should not> send any messages.
476
477 The function returns an optional guard which can be used to unregister
478 the monitoring callback again.
479
480 =cut
481
482 our %MON_NODES;
483
484 sub mon_nodes($) {
485 my ($cb) = @_;
486
487 $MON_NODES{$cb+0} = $cb;
488
489 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
490 }
491
492 sub _inject_nodeevent($$;@) {
493 my ($node, $up, @reason) = @_;
494
495 for my $cb (values %MON_NODES) {
496 eval { $cb->($node->{id}, $up, @reason); 1 }
497 or $WARN->(1, $@);
498 }
499
500 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
501 }
502
503 #############################################################################
504 # self node code
505
506 our %node_req = (
507 # internal services
508
509 # monitoring
510 mon0 => sub { # stop monitoring a port
511 my $portid = shift;
512 my $node = $SRCNODE;
513 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
514 },
515 mon1 => sub { # start monitoring a port
516 my $portid = shift;
517 my $node = $SRCNODE;
518 Scalar::Util::weaken $node; #TODO# ugly
519 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
520 $node->send (["", kil => $portid, @_])
521 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
522 });
523 },
524 kil => sub {
525 my $cbs = delete $SRCNODE->{lmon}{+shift}
526 or return;
527
528 $_->(@_) for @$cbs;
529 },
530
531 # "public" services - not actually public
532
533 # relay message to another node / generic echo
534 snd => \&snd,
535 snd_multiple => sub {
536 snd @$_ for @_
537 },
538
539 # informational
540 info => sub {
541 snd @_, $NODE;
542 },
543 known_nodes => sub {
544 snd @_, known_nodes;
545 },
546 up_nodes => sub {
547 snd @_, up_nodes;
548 },
549
550 # random utilities
551 eval => sub {
552 my @res = eval shift;
553 snd @_, "$@", @res if @_;
554 },
555 time => sub {
556 snd @_, AE::time;
557 },
558 devnull => sub {
559 #
560 },
561 "" => sub {
562 # empty messages are keepalives or similar devnull-applications
563 },
564 );
565
566 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
567 $PORT{""} = sub {
568 my $tag = shift;
569 eval { &{ $node_req{$tag} ||= load_func $tag } };
570 $WARN->(2, "error processing node message: $@") if $@;
571 };
572
573 =back
574
575 =head1 SEE ALSO
576
577 L<AnyEvent::MP>.
578
579 =head1 AUTHOR
580
581 Marc Lehmann <schmorp@schmorp.de>
582 http://home.schmorp.de/
583
584 =cut
585
586 1
587