ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.48
Committed: Tue Sep 8 13:46:25 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.47: +2 -2 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '1.0';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil port_is_local
44 configure
45 up_nodes mon_nodes node_is_up
46 );
47
48 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49
50 This value is called with an error or warning message, when e.g. a
51 connection could not be created, authorisation failed and so on.
52
53 It I<must not> block or send messages -queue it and use an idle watcher if
54 you need to do any of these things.
55
56 C<$level> should be C<0> for messages to be logged always, C<1> for
57 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58 node connectivity and services, C<8> for debugging messages and C<9> for
59 tracing messages.
60
61 The default simply logs the message to STDERR.
62
63 =item @AnyEvent::MP::Kernel::WARN
64
65 All code references in this array are called for every log message, from
66 the default C<$WARN> handler. This is an easy way to tie into the log
67 messages without disturbing others.
68
69 =cut
70
71 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
72 our @WARN;
73 our $WARN = sub {
74 &$_ for @WARN;
75
76 return if $WARNLEVEL < $_[0];
77
78 my ($level, $msg) = @_;
79
80 $msg =~ s/\n$//;
81
82 printf STDERR "%s <%d> %s\n",
83 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
84 $level,
85 $msg;
86 };
87
88 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
89
90 The maximum level at which warning messages will be printed to STDERR by
91 the default warn handler.
92
93 =cut
94
95 sub load_func($) {
96 my $func = $_[0];
97
98 unless (defined &$func) {
99 my $pkg = $func;
100 do {
101 $pkg =~ s/::[^:]+$//
102 or return sub { die "unable to resolve '$func'" };
103 eval "require $pkg";
104 } until defined &$func;
105 }
106
107 \&$func
108 }
109
110 sub nonce($) {
111 my $nonce;
112
113 if (open my $fh, "</dev/urandom") {
114 sysread $fh, $nonce, $_[0];
115 } else {
116 # shit...
117 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
118 }
119
120 $nonce
121 }
122
123 sub alnumbits($) {
124 my $data = $_[0];
125
126 if (eval "use Math::GMP 2.05; 1") {
127 $data = Math::GMP::get_str_gmp (
128 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
129 62
130 );
131 } else {
132 $data = MIME::Base64::encode_base64 $data, "";
133 $data =~ s/=//;
134 $data =~ s/x/x0/g;
135 $data =~ s/\//x1/g;
136 $data =~ s/\+/x2/g;
137 }
138
139 $data
140 }
141
142 sub gen_uniq {
143 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
144 }
145
146 our $CONFIG; # this node's configuration
147
148 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
149 our $UNIQ = gen_uniq; # per-process/node unique cookie
150 our $NODE = "anon/$RUNIQ";
151 our $ID = "a";
152
153 our %NODE; # node id to transport mapping, or "undef", for local node
154 our (%PORT, %PORT_DATA); # local ports
155
156 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
157 our %LMON; # monitored _local_ ports
158
159 our %LISTENER;
160 our $LISTENER; # our listeners, as arrayref
161
162 our $SRCNODE; # holds the sending node during _inject
163
164 sub NODE() {
165 $NODE
166 }
167
168 sub node_of($) {
169 my ($node, undef) = split /#/, $_[0], 2;
170
171 $node
172 }
173
174 BEGIN {
175 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
176 ? sub () { 1 }
177 : sub () { 0 };
178 }
179
180 our $DELAY_TIMER;
181 our @DELAY_QUEUE;
182
183 sub _delay_run {
184 (shift @DELAY_QUEUE or return)->() while 1;
185 }
186
187 sub delay($) {
188 push @DELAY_QUEUE, shift;
189 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
190 }
191
192 sub _inject {
193 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
194 &{ $PORT{+shift} or return };
195 }
196
197 # this function adds a node-ref, so you can send stuff to it
198 # it is basically the central routing component.
199 sub add_node {
200 my ($node) = @_;
201
202 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
203 }
204
205 sub snd(@) {
206 my ($nodeid, $portid) = split /#/, shift, 2;
207
208 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
209
210 ($NODE{$nodeid} || add_node $nodeid)
211 ->{send} (["$portid", @_]);
212 }
213
214 =item $is_local = port_is_local $port
215
216 Returns true iff the port is a local port.
217
218 =cut
219
220 sub port_is_local($) {
221 my ($nodeid, undef) = split /#/, $_[0], 2;
222
223 $NODE{$nodeid} == $NODE{""}
224 }
225
226 =item snd_to_func $node, $func, @args
227
228 Expects a node ID and a name of a function. Asynchronously tries to call
229 this function with the given arguments on that node.
230
231 This function can be used to implement C<spawn>-like interfaces.
232
233 =cut
234
235 sub snd_to_func($$;@) {
236 my $nodeid = shift;
237
238 # on $NODE, we artificially delay... (for spawn)
239 # this is very ugly - maybe we should simply delay ALL messages,
240 # to avoid deep recursion issues. but that's so... slow...
241 $AnyEvent::MP::Node::Self::DELAY = 1
242 if $nodeid ne $NODE;
243
244 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
245 }
246
247 =item snd_on $node, @msg
248
249 Executes C<snd> with the given C<@msg> (which must include the destination
250 port) on the given node.
251
252 =cut
253
254 sub snd_on($@) {
255 my $node = shift;
256 snd $node, snd => @_;
257 }
258
259 =item eval_on $node, $string[, @reply]
260
261 Evaluates the given string as Perl expression on the given node. When
262 @reply is specified, then it is used to construct a reply message with
263 C<"$@"> and any results from the eval appended.
264
265 =cut
266
267 sub eval_on($$;@) {
268 my $node = shift;
269 snd $node, eval => @_;
270 }
271
272 sub kil(@) {
273 my ($nodeid, $portid) = split /#/, shift, 2;
274
275 length $portid
276 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
277
278 ($NODE{$nodeid} || add_node $nodeid)
279 ->kill ("$portid", @_);
280 }
281
282 sub _nodename {
283 require POSIX;
284 (POSIX::uname ())[1]
285 }
286
287 sub _resolve($) {
288 my ($nodeid) = @_;
289
290 my $cv = AE::cv;
291 my @res;
292
293 $cv->begin (sub {
294 my %seen;
295 my @refs;
296 for (sort { $a->[0] <=> $b->[0] } @res) {
297 push @refs, $_->[1] unless $seen{$_->[1]}++
298 }
299 shift->send (@refs);
300 });
301
302 my $idx;
303 for my $t (split /,/, $nodeid) {
304 my $pri = ++$idx;
305
306 $t = length $t ? _nodename . ":$t" : _nodename
307 if $t =~ /^\d*$/;
308
309 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
310 or Carp::croak "$t: unparsable transport descriptor";
311
312 $port = "0" if $port eq "*";
313
314 if ($host eq "*") {
315 $cv->begin;
316 # use fork_call, as Net::Interface is big, and we need it rarely.
317 require AnyEvent::Util;
318 AnyEvent::Util::fork_call (
319 sub {
320 my @addr;
321
322 require Net::Interface;
323
324 for my $if (Net::Interface->interfaces) {
325 # we statically lower-prioritise ipv6 here, TODO :()
326 for $_ ($if->address (Net::Interface::AF_INET ())) {
327 next if /^\x7f/; # skip localhost etc.
328 push @addr, $_;
329 }
330 for ($if->address (Net::Interface::AF_INET6 ())) {
331 #next if $if->scope ($_) <= 2;
332 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
333 push @addr, $_;
334 }
335
336 }
337 @addr
338 }, sub {
339 for my $ip (@_) {
340 push @res, [
341 $pri += 1e-5,
342 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
343 ];
344 }
345 $cv->end;
346 }
347 );
348 } else {
349 $cv->begin;
350 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
351 for (@_) {
352 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
353 push @res, [
354 $pri += 1e-5,
355 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
356 ];
357 }
358 $cv->end;
359 };
360 }
361 }
362
363 $cv->end;
364
365 $cv
366 }
367
368 sub configure(@) {
369 unshift @_, "profile" if @_ & 1;
370 my (%kv) = @_;
371
372 my $profile = delete $kv{profile};
373
374 $profile = _nodename
375 unless defined $profile;
376
377 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
378
379 delete $NODE{$NODE}; # we do not support doing stuff before configure
380
381 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
382 $NODE = $node
383 unless $node eq "anon/";
384
385 $NODE{$NODE} = $NODE{""};
386 $NODE{$NODE}{id} = $NODE;
387
388 my $seeds = $CONFIG->{seeds};
389 my $binds = $CONFIG->{binds};
390
391 $binds ||= ["*"];
392
393 $WARN->(8, "node $NODE starting up.");
394
395 $LISTENER = [];
396 %LISTENER = ();
397
398 for (map _resolve $_, @$binds) {
399 for my $bind ($_->recv) {
400 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
401 or Carp::croak "$bind: unparsable local bind address";
402
403 my $listener = AnyEvent::MP::Transport::mp_server
404 $host,
405 $port,
406 prepare => sub {
407 my (undef, $host, $port) = @_;
408 $bind = AnyEvent::Socket::format_hostport $host, $port;
409 },
410 ;
411 $LISTENER{$bind} = $listener;
412 push @$LISTENER, $bind;
413 }
414 }
415
416 $WARN->(8, "node listens on [@$LISTENER].");
417
418 # the global service is mandatory currently
419 require AnyEvent::MP::Global;
420
421 # connect to all seednodes
422 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
423
424 for (@{ $CONFIG->{services} }) {
425 if (ref) {
426 my ($func, @args) = @$_;
427 (load_func $func)->(@args);
428 } elsif (s/::$//) {
429 eval "require $_";
430 die $@ if $@;
431 } else {
432 (load_func $_)->();
433 }
434 }
435 }
436
437 #############################################################################
438 # node monitoring and info
439
440 =item node_is_known $nodeid
441
442 Returns true iff the given node is currently known to the system. The only
443 time a node is known but not up currently is when a conenction request is
444 pending.
445
446 =cut
447
448 sub node_is_known($) {
449 exists $NODE{$_[0]}
450 }
451
452 =item node_is_up $nodeid
453
454 Returns true if the given node is "up", that is, the kernel thinks it has
455 a working connection to it.
456
457 If the node is known but not currently connected, returns C<0>. If the
458 node is not known, returns C<undef>.
459
460 =cut
461
462 sub node_is_up($) {
463 ($NODE{$_[0]} or return)->{transport}
464 ? 1 : 0
465 }
466
467 =item known_nodes
468
469 Returns the node IDs of all nodes currently known to this node, including
470 itself and nodes not currently connected.
471
472 =cut
473
474 sub known_nodes {
475 map $_->{id}, values %NODE
476 }
477
478 =item up_nodes
479
480 Return the node IDs of all nodes that are currently connected (excluding
481 the node itself).
482
483 =cut
484
485 sub up_nodes {
486 map $_->{id}, grep $_->{transport}, values %NODE
487 }
488
489 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
490
491 Registers a callback that is called each time a node goes up (a connection
492 is established) or down (the connection is lost).
493
494 Node up messages can only be followed by node down messages for the same
495 node, and vice versa.
496
497 Note that monitoring a node is usually better done by monitoring it's node
498 port. This function is mainly of interest to modules that are concerned
499 about the network topology and low-level connection handling.
500
501 Callbacks I<must not> block and I<should not> send any messages.
502
503 The function returns an optional guard which can be used to unregister
504 the monitoring callback again.
505
506 Example: make sure you call function C<newnode> for all nodes that are up
507 or go up (and down).
508
509 newnode $_, 1 for up_nodes;
510 mon_nodes \&newnode;
511
512 =cut
513
514 our %MON_NODES;
515
516 sub mon_nodes($) {
517 my ($cb) = @_;
518
519 $MON_NODES{$cb+0} = $cb;
520
521 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
522 }
523
524 sub _inject_nodeevent($$;@) {
525 my ($node, $up, @reason) = @_;
526
527 for my $cb (values %MON_NODES) {
528 eval { $cb->($node->{id}, $up, @reason); 1 }
529 or $WARN->(1, $@);
530 }
531
532 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
533 }
534
535 #############################################################################
536 # self node code
537
538 our %node_req = (
539 # internal services
540
541 # monitoring
542 mon0 => sub { # stop monitoring a port
543 my $portid = shift;
544 my $node = $SRCNODE;
545 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
546 },
547 mon1 => sub { # start monitoring a port
548 my $portid = shift;
549 my $node = $SRCNODE;
550 Scalar::Util::weaken $node; #TODO# ugly
551 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
552 $node->send (["", kil => $portid, @_])
553 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
554 });
555 },
556 kil => sub {
557 my $cbs = delete $SRCNODE->{lmon}{+shift}
558 or return;
559
560 $_->(@_) for @$cbs;
561 },
562
563 # "public" services - not actually public
564
565 # relay message to another node / generic echo
566 snd => \&snd,
567 snd_multiple => sub {
568 snd @$_ for @_
569 },
570
571 # informational
572 info => sub {
573 snd @_, $NODE;
574 },
575 known_nodes => sub {
576 snd @_, known_nodes;
577 },
578 up_nodes => sub {
579 snd @_, up_nodes;
580 },
581
582 # random utilities
583 eval => sub {
584 my @res = eval shift;
585 snd @_, "$@", @res if @_;
586 },
587 time => sub {
588 snd @_, AE::time;
589 },
590 devnull => sub {
591 #
592 },
593 "" => sub {
594 # empty messages are keepalives or similar devnull-applications
595 },
596 );
597
598 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
599 $PORT{""} = sub {
600 my $tag = shift;
601 eval { &{ $node_req{$tag} ||= load_func $tag } };
602 $WARN->(2, "error processing node message: $@") if $@;
603 };
604
605 =back
606
607 =head1 SEE ALSO
608
609 L<AnyEvent::MP>.
610
611 =head1 AUTHOR
612
613 Marc Lehmann <schmorp@schmorp.de>
614 http://home.schmorp.de/
615
616 =cut
617
618 1
619