ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.56
Committed: Thu Oct 1 21:00:02 2009 UTC (14 years, 7 months ago) by root
Branch: MAIN
CVS Tags: rel-1_2
Changes since 1.55: +1 -1 lines
Log Message:
1.2

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '1.2';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil port_is_local
44 configure
45 up_nodes mon_nodes node_is_up
46 );
47
48 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49
50 This value is called with an error or warning message, when e.g. a
51 connection could not be created, authorisation failed and so on.
52
53 It I<must not> block or send messages -queue it and use an idle watcher if
54 you need to do any of these things.
55
56 C<$level> should be C<0> for messages to be logged always, C<1> for
57 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58 node connectivity and services, C<8> for debugging messages and C<9> for
59 tracing messages.
60
61 The default simply logs the message to STDERR.
62
63 =item @AnyEvent::MP::Kernel::WARN
64
65 All code references in this array are called for every log message, from
66 the default C<$WARN> handler. This is an easy way to tie into the log
67 messages without disturbing others.
68
69 =cut
70
71 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
72 our @WARN;
73 our $WARN = sub {
74 &$_ for @WARN;
75
76 return if $WARNLEVEL < $_[0];
77
78 my ($level, $msg) = @_;
79
80 $msg =~ s/\n$//;
81
82 printf STDERR "%s <%d> %s\n",
83 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
84 $level,
85 $msg;
86 };
87
88 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
89
90 The maximum level at which warning messages will be printed to STDERR by
91 the default warn handler.
92
93 =cut
94
95 sub load_func($) {
96 my $func = $_[0];
97
98 unless (defined &$func) {
99 my $pkg = $func;
100 do {
101 $pkg =~ s/::[^:]+$//
102 or return sub { die "unable to resolve '$func'" };
103 eval "require $pkg";
104 } until defined &$func;
105 }
106
107 \&$func
108 }
109
110 sub nonce($) {
111 my $nonce;
112
113 if (open my $fh, "</dev/urandom") {
114 sysread $fh, $nonce, $_[0];
115 } else {
116 # shit...
117 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
118 }
119
120 $nonce
121 }
122
123 sub alnumbits($) {
124 my $data = $_[0];
125
126 if (eval "use Math::GMP 2.05; 1") {
127 $data = Math::GMP::get_str_gmp (
128 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
129 62
130 );
131 } else {
132 $data = MIME::Base64::encode_base64 $data, "";
133 $data =~ s/=//;
134 $data =~ s/x/x0/g;
135 $data =~ s/\//x1/g;
136 $data =~ s/\+/x2/g;
137 }
138
139 $data
140 }
141
142 sub gen_uniq {
143 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
144 }
145
146 our $CONFIG; # this node's configuration
147
148 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
149 our $UNIQ = gen_uniq; # per-process/node unique cookie
150 our $NODE = "anon/$RUNIQ";
151 our $ID = "a";
152
153 our %NODE; # node id to transport mapping, or "undef", for local node
154 our (%PORT, %PORT_DATA); # local ports
155
156 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
157 our %LMON; # monitored _local_ ports
158
159 our %LISTENER;
160 our $LISTENER; # our listeners, as arrayref
161
162 our $SRCNODE; # holds the sending node during _inject
163
164 sub NODE() {
165 $NODE
166 }
167
168 sub node_of($) {
169 my ($node, undef) = split /#/, $_[0], 2;
170
171 $node
172 }
173
174 BEGIN {
175 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
176 ? sub () { 1 }
177 : sub () { 0 };
178 }
179
180 our $DELAY_TIMER;
181 our @DELAY_QUEUE;
182
183 sub _delay_run {
184 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
185 }
186
187 sub delay($) {
188 push @DELAY_QUEUE, shift;
189 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
190 }
191
192 sub _inject {
193 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
194 &{ $PORT{+shift} or return };
195 }
196
197 # this function adds a node-ref, so you can send stuff to it
198 # it is basically the central routing component.
199 sub add_node {
200 my ($node) = @_;
201
202 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
203 }
204
205 sub snd(@) {
206 my ($nodeid, $portid) = split /#/, shift, 2;
207
208 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
209
210 defined $nodeid #d#UGLY
211 or Carp::croak "'undef' is not a valid node ID/port ID";
212
213 ($NODE{$nodeid} || add_node $nodeid)
214 ->{send} (["$portid", @_]);
215 }
216
217 =item $is_local = port_is_local $port
218
219 Returns true iff the port is a local port.
220
221 =cut
222
223 sub port_is_local($) {
224 my ($nodeid, undef) = split /#/, $_[0], 2;
225
226 $NODE{$nodeid} == $NODE{""}
227 }
228
229 =item snd_to_func $node, $func, @args
230
231 Expects a node ID and a name of a function. Asynchronously tries to call
232 this function with the given arguments on that node.
233
234 This function can be used to implement C<spawn>-like interfaces.
235
236 =cut
237
238 sub snd_to_func($$;@) {
239 my $nodeid = shift;
240
241 # on $NODE, we artificially delay... (for spawn)
242 # this is very ugly - maybe we should simply delay ALL messages,
243 # to avoid deep recursion issues. but that's so... slow...
244 $AnyEvent::MP::Node::Self::DELAY = 1
245 if $nodeid ne $NODE;
246
247 defined $nodeid #d#UGLY
248 or Carp::croak "'undef' is not a valid node ID/port ID";
249
250 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
251 }
252
253 =item snd_on $node, @msg
254
255 Executes C<snd> with the given C<@msg> (which must include the destination
256 port) on the given node.
257
258 =cut
259
260 sub snd_on($@) {
261 my $node = shift;
262 snd $node, snd => @_;
263 }
264
265 =item eval_on $node, $string[, @reply]
266
267 Evaluates the given string as Perl expression on the given node. When
268 @reply is specified, then it is used to construct a reply message with
269 C<"$@"> and any results from the eval appended.
270
271 =cut
272
273 sub eval_on($$;@) {
274 my $node = shift;
275 snd $node, eval => @_;
276 }
277
278 sub kil(@) {
279 my ($nodeid, $portid) = split /#/, shift, 2;
280
281 length $portid
282 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
283
284 ($NODE{$nodeid} || add_node $nodeid)
285 ->kill ("$portid", @_);
286 }
287
288 sub _nodename {
289 require POSIX;
290 (POSIX::uname ())[1]
291 }
292
293 sub _resolve($) {
294 my ($nodeid) = @_;
295
296 my $cv = AE::cv;
297 my @res;
298
299 $cv->begin (sub {
300 my %seen;
301 my @refs;
302 for (sort { $a->[0] <=> $b->[0] } @res) {
303 push @refs, $_->[1] unless $seen{$_->[1]}++
304 }
305 shift->send (@refs);
306 });
307
308 my $idx;
309 for my $t (split /,/, $nodeid) {
310 my $pri = ++$idx;
311
312 $t = length $t ? _nodename . ":$t" : _nodename
313 if $t =~ /^\d*$/;
314
315 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
316 or Carp::croak "$t: unparsable transport descriptor";
317
318 $port = "0" if $port eq "*";
319
320 if ($host eq "*") {
321 $cv->begin;
322 # use fork_call, as Net::Interface is big, and we need it rarely.
323 require AnyEvent::Util;
324 AnyEvent::Util::fork_call (
325 sub {
326 my @addr;
327
328 require Net::Interface;
329
330 for my $if (Net::Interface->interfaces) {
331 # we statically lower-prioritise ipv6 here, TODO :()
332 for $_ ($if->address (Net::Interface::AF_INET ())) {
333 next if /^\x7f/; # skip localhost etc.
334 push @addr, $_;
335 }
336 for ($if->address (Net::Interface::AF_INET6 ())) {
337 #next if $if->scope ($_) <= 2;
338 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
339 push @addr, $_;
340 }
341
342 }
343 @addr
344 }, sub {
345 for my $ip (@_) {
346 push @res, [
347 $pri += 1e-5,
348 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
349 ];
350 }
351 $cv->end;
352 }
353 );
354 } else {
355 $cv->begin;
356 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
357 for (@_) {
358 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
359 push @res, [
360 $pri += 1e-5,
361 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
362 ];
363 }
364 $cv->end;
365 };
366 }
367 }
368
369 $cv->end;
370
371 $cv
372 }
373
374 sub configure(@) {
375 unshift @_, "profile" if @_ & 1;
376 my (%kv) = @_;
377
378 my $profile = delete $kv{profile};
379
380 $profile = _nodename
381 unless defined $profile;
382
383 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
384
385 delete $NODE{$NODE}; # we do not support doing stuff before configure
386
387 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
388
389 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
390
391 $NODE = $node
392 unless $node eq "anon/";
393
394 $NODE{$NODE} = $NODE{""};
395 $NODE{$NODE}{id} = $NODE;
396
397 my $seeds = $CONFIG->{seeds};
398 my $binds = $CONFIG->{binds};
399
400 $binds ||= ["*"];
401
402 $WARN->(8, "node $NODE starting up.");
403
404 $LISTENER = [];
405 %LISTENER = ();
406
407 for (map _resolve $_, @$binds) {
408 for my $bind ($_->recv) {
409 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
410 or Carp::croak "$bind: unparsable local bind address";
411
412 my $listener = AnyEvent::MP::Transport::mp_server
413 $host,
414 $port,
415 prepare => sub {
416 my (undef, $host, $port) = @_;
417 $bind = AnyEvent::Socket::format_hostport $host, $port;
418 0
419 },
420 ;
421 $LISTENER{$bind} = $listener;
422 push @$LISTENER, $bind;
423 }
424 }
425
426 $WARN->(8, "node listens on [@$LISTENER].");
427
428 # the global service is mandatory currently
429 require AnyEvent::MP::Global;
430
431 # connect to all seednodes
432 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
433
434 for (@{ $CONFIG->{services} }) {
435 if (ref) {
436 my ($func, @args) = @$_;
437 (load_func $func)->(@args);
438 } elsif (s/::$//) {
439 eval "require $_";
440 die $@ if $@;
441 } else {
442 (load_func $_)->();
443 }
444 }
445 }
446
447 #############################################################################
448 # node monitoring and info
449
450 =item node_is_known $nodeid
451
452 Returns true iff the given node is currently known to the system. The only
453 time a node is known but not up currently is when a conenction request is
454 pending.
455
456 =cut
457
458 sub node_is_known($) {
459 exists $NODE{$_[0]}
460 }
461
462 =item node_is_up $nodeid
463
464 Returns true if the given node is "up", that is, the kernel thinks it has
465 a working connection to it.
466
467 If the node is known but not currently connected, returns C<0>. If the
468 node is not known, returns C<undef>.
469
470 =cut
471
472 sub node_is_up($) {
473 ($NODE{$_[0]} or return)->{transport}
474 ? 1 : 0
475 }
476
477 =item known_nodes
478
479 Returns the node IDs of all nodes currently known to this node, including
480 itself and nodes not currently connected.
481
482 =cut
483
484 sub known_nodes() {
485 map $_->{id}, values %NODE
486 }
487
488 =item up_nodes
489
490 Return the node IDs of all nodes that are currently connected (excluding
491 the node itself).
492
493 =cut
494
495 sub up_nodes() {
496 map $_->{id}, grep $_->{transport}, values %NODE
497 }
498
499 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
500
501 Registers a callback that is called each time a node goes up (a connection
502 is established) or down (the connection is lost).
503
504 Node up messages can only be followed by node down messages for the same
505 node, and vice versa.
506
507 Note that monitoring a node is usually better done by monitoring it's node
508 port. This function is mainly of interest to modules that are concerned
509 about the network topology and low-level connection handling.
510
511 Callbacks I<must not> block and I<should not> send any messages.
512
513 The function returns an optional guard which can be used to unregister
514 the monitoring callback again.
515
516 Example: make sure you call function C<newnode> for all nodes that are up
517 or go up (and down).
518
519 newnode $_, 1 for up_nodes;
520 mon_nodes \&newnode;
521
522 =cut
523
524 our %MON_NODES;
525
526 sub mon_nodes($) {
527 my ($cb) = @_;
528
529 $MON_NODES{$cb+0} = $cb;
530
531 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
532 }
533
534 sub _inject_nodeevent($$;@) {
535 my ($node, $up, @reason) = @_;
536
537 for my $cb (values %MON_NODES) {
538 eval { $cb->($node->{id}, $up, @reason); 1 }
539 or $WARN->(1, $@);
540 }
541
542 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
543 }
544
545 #############################################################################
546 # self node code
547
548 our %node_req = (
549 # internal services
550
551 # monitoring
552 mon0 => sub { # stop monitoring a port
553 my $portid = shift;
554 my $node = $SRCNODE;
555 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
556 },
557 mon1 => sub { # start monitoring a port
558 my $portid = shift;
559 my $node = $SRCNODE;
560 Scalar::Util::weaken $node; #TODO# ugly
561 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
562 $node->send (["", kil => $portid, @_])
563 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
564 });
565 },
566 kil => sub {
567 my $cbs = delete $SRCNODE->{lmon}{+shift}
568 or return;
569
570 $_->(@_) for @$cbs;
571 },
572
573 # "public" services - not actually public
574
575 # relay message to another node / generic echo
576 snd => \&snd,
577 snd_multiple => sub {
578 snd @$_ for @_
579 },
580
581 # informational
582 info => sub {
583 snd @_, $NODE;
584 },
585 known_nodes => sub {
586 snd @_, known_nodes;
587 },
588 up_nodes => sub {
589 snd @_, up_nodes;
590 },
591
592 # random utilities
593 eval => sub {
594 my @res = do { package main; eval shift };
595 snd @_, "$@", @res if @_;
596 },
597 time => sub {
598 snd @_, AE::time;
599 },
600 devnull => sub {
601 #
602 },
603 "" => sub {
604 # empty messages are keepalives or similar devnull-applications
605 },
606 );
607
608 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
609 $PORT{""} = sub {
610 my $tag = shift;
611 eval { &{ $node_req{$tag} ||= load_func $tag } };
612 $WARN->(2, "error processing node message: $@") if $@;
613 };
614
615 =back
616
617 =head1 SEE ALSO
618
619 L<AnyEvent::MP>.
620
621 =head1 AUTHOR
622
623 Marc Lehmann <schmorp@schmorp.de>
624 http://home.schmorp.de/
625
626 =cut
627
628 1
629