ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.62
Committed: Thu Nov 5 22:44:56 2009 UTC (14 years, 6 months ago) by root
Branch: MAIN
CVS Tags: rel-1_23
Changes since 1.61: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 add_node load_func snd_to_func snd_on eval_on
41
42 NODE $NODE node_of snd kil port_is_local
43 configure
44 up_nodes mon_nodes node_is_up
45 );
46
47 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
48
49 This value is called with an error or warning message, when e.g. a
50 connection could not be created, authorisation failed and so on.
51
52 It I<must not> block or send messages -queue it and use an idle watcher if
53 you need to do any of these things.
54
55 C<$level> should be C<0> for messages to be logged always, C<1> for
56 unexpected messages and errors, C<2> for warnings, C<7> for messages about
57 node connectivity and services, C<8> for debugging messages and C<9> for
58 tracing messages.
59
60 The default simply logs the message to STDERR.
61
62 =item @AnyEvent::MP::Kernel::WARN
63
64 All code references in this array are called for every log message, from
65 the default C<$WARN> handler. This is an easy way to tie into the log
66 messages without disturbing others.
67
68 =cut
69
70 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
71 our @WARN;
72 our $WARN = sub {
73 &$_ for @WARN;
74
75 return if $WARNLEVEL < $_[0];
76
77 my ($level, $msg) = @_;
78
79 $msg =~ s/\n$//;
80
81 printf STDERR "%s <%d> %s\n",
82 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
83 $level,
84 $msg;
85 };
86
87 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
88
89 The maximum level at which warning messages will be printed to STDERR by
90 the default warn handler.
91
92 =cut
93
94 sub load_func($) {
95 my $func = $_[0];
96
97 unless (defined &$func) {
98 my $pkg = $func;
99 do {
100 $pkg =~ s/::[^:]+$//
101 or return sub { die "unable to resolve '$func'" };
102
103 local $@;
104 unless (eval "require $pkg; 1") {
105 my $error = $@;
106 $error =~ /^Can't locate .*.pm in \@INC \(/
107 or return sub { die $error };
108 }
109 } until defined &$func;
110 }
111
112 \&$func
113 }
114
115 sub nonce($) {
116 my $nonce;
117
118 if (open my $fh, "</dev/urandom") {
119 sysread $fh, $nonce, $_[0];
120 } else {
121 # shit...
122 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
123 }
124
125 $nonce
126 }
127
128 sub alnumbits($) {
129 my $data = $_[0];
130
131 if (eval "use Math::GMP 2.05; 1") {
132 $data = Math::GMP::get_str_gmp (
133 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
134 62
135 );
136 } else {
137 $data = MIME::Base64::encode_base64 $data, "";
138 $data =~ s/=//;
139 $data =~ s/x/x0/g;
140 $data =~ s/\//x1/g;
141 $data =~ s/\+/x2/g;
142 }
143
144 $data
145 }
146
147 sub gen_uniq {
148 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
149 }
150
151 our $CONFIG; # this node's configuration
152
153 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
154 our $UNIQ = gen_uniq; # per-process/node unique cookie
155 our $NODE = "anon/$RUNIQ";
156 our $ID = "a";
157
158 our %NODE; # node id to transport mapping, or "undef", for local node
159 our (%PORT, %PORT_DATA); # local ports
160
161 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
162 our %LMON; # monitored _local_ ports
163
164 our %LISTENER;
165 our $LISTENER; # our listeners, as arrayref
166
167 our $SRCNODE; # holds the sending node during _inject
168
169 sub NODE() {
170 $NODE
171 }
172
173 sub node_of($) {
174 my ($node, undef) = split /#/, $_[0], 2;
175
176 $node
177 }
178
179 BEGIN {
180 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
181 ? sub () { 1 }
182 : sub () { 0 };
183 }
184
185 our $DELAY_TIMER;
186 our @DELAY_QUEUE;
187
188 sub _delay_run {
189 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
190 }
191
192 sub delay($) {
193 push @DELAY_QUEUE, shift;
194 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
195 }
196
197 sub _inject {
198 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
199 &{ $PORT{+shift} or return };
200 }
201
202 # this function adds a node-ref, so you can send stuff to it
203 # it is basically the central routing component.
204 sub add_node {
205 my ($node) = @_;
206
207 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
208 }
209
210 sub snd(@) {
211 my ($nodeid, $portid) = split /#/, shift, 2;
212
213 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
214
215 defined $nodeid #d#UGLY
216 or Carp::croak "'undef' is not a valid node ID/port ID";
217
218 ($NODE{$nodeid} || add_node $nodeid)
219 ->{send} (["$portid", @_]);
220 }
221
222 =item $is_local = port_is_local $port
223
224 Returns true iff the port is a local port.
225
226 =cut
227
228 sub port_is_local($) {
229 my ($nodeid, undef) = split /#/, $_[0], 2;
230
231 $NODE{$nodeid} == $NODE{""}
232 }
233
234 =item snd_to_func $node, $func, @args
235
236 Expects a node ID and a name of a function. Asynchronously tries to call
237 this function with the given arguments on that node.
238
239 This function can be used to implement C<spawn>-like interfaces.
240
241 =cut
242
243 sub snd_to_func($$;@) {
244 my $nodeid = shift;
245
246 # on $NODE, we artificially delay... (for spawn)
247 # this is very ugly - maybe we should simply delay ALL messages,
248 # to avoid deep recursion issues. but that's so... slow...
249 $AnyEvent::MP::Node::Self::DELAY = 1
250 if $nodeid ne $NODE;
251
252 defined $nodeid #d#UGLY
253 or Carp::croak "'undef' is not a valid node ID/port ID";
254
255 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
256 }
257
258 =item snd_on $node, @msg
259
260 Executes C<snd> with the given C<@msg> (which must include the destination
261 port) on the given node.
262
263 =cut
264
265 sub snd_on($@) {
266 my $node = shift;
267 snd $node, snd => @_;
268 }
269
270 =item eval_on $node, $string[, @reply]
271
272 Evaluates the given string as Perl expression on the given node. When
273 @reply is specified, then it is used to construct a reply message with
274 C<"$@"> and any results from the eval appended.
275
276 =cut
277
278 sub eval_on($$;@) {
279 my $node = shift;
280 snd $node, eval => @_;
281 }
282
283 sub kil(@) {
284 my ($nodeid, $portid) = split /#/, shift, 2;
285
286 length $portid
287 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
288
289 ($NODE{$nodeid} || add_node $nodeid)
290 ->kill ("$portid", @_);
291 }
292
293 sub _nodename {
294 require POSIX;
295 (POSIX::uname ())[1]
296 }
297
298 sub _resolve($) {
299 my ($nodeid) = @_;
300
301 my $cv = AE::cv;
302 my @res;
303
304 $cv->begin (sub {
305 my %seen;
306 my @refs;
307 for (sort { $a->[0] <=> $b->[0] } @res) {
308 push @refs, $_->[1] unless $seen{$_->[1]}++
309 }
310 shift->send (@refs);
311 });
312
313 my $idx;
314 for my $t (split /,/, $nodeid) {
315 my $pri = ++$idx;
316
317 $t = length $t ? _nodename . ":$t" : _nodename
318 if $t =~ /^\d*$/;
319
320 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
321 or Carp::croak "$t: unparsable transport descriptor";
322
323 $port = "0" if $port eq "*";
324
325 if ($host eq "*") {
326 $cv->begin;
327 # use fork_call, as Net::Interface is big, and we need it rarely.
328 require AnyEvent::Util;
329 AnyEvent::Util::fork_call (
330 sub {
331 my @addr;
332
333 require Net::Interface;
334
335 for my $if (Net::Interface->interfaces) {
336 # we statically lower-prioritise ipv6 here, TODO :()
337 for $_ ($if->address (Net::Interface::AF_INET ())) {
338 next if /^\x7f/; # skip localhost etc.
339 push @addr, $_;
340 }
341 for ($if->address (Net::Interface::AF_INET6 ())) {
342 #next if $if->scope ($_) <= 2;
343 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
344 push @addr, $_;
345 }
346
347 }
348 @addr
349 }, sub {
350 for my $ip (@_) {
351 push @res, [
352 $pri += 1e-5,
353 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
354 ];
355 }
356 $cv->end;
357 }
358 );
359 } else {
360 $cv->begin;
361 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
362 for (@_) {
363 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
364 push @res, [
365 $pri += 1e-5,
366 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
367 ];
368 }
369 $cv->end;
370 };
371 }
372 }
373
374 $cv->end;
375
376 $cv
377 }
378
379 sub configure(@) {
380 unshift @_, "profile" if @_ & 1;
381 my (%kv) = @_;
382
383 my $profile = delete $kv{profile};
384
385 $profile = _nodename
386 unless defined $profile;
387
388 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
389
390 delete $NODE{$NODE}; # we do not support doing stuff before configure
391
392 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
393
394 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
395
396 $NODE = $node
397 unless $node eq "anon/";
398
399 $NODE{$NODE} = $NODE{""};
400 $NODE{$NODE}{id} = $NODE;
401
402 my $seeds = $CONFIG->{seeds};
403 my $binds = $CONFIG->{binds};
404
405 $binds ||= ["*"];
406
407 $WARN->(8, "node $NODE starting up.");
408
409 $LISTENER = [];
410 %LISTENER = ();
411
412 for (map _resolve $_, @$binds) {
413 for my $bind ($_->recv) {
414 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
415 or Carp::croak "$bind: unparsable local bind address";
416
417 my $listener = AnyEvent::MP::Transport::mp_server
418 $host,
419 $port,
420 prepare => sub {
421 my (undef, $host, $port) = @_;
422 $bind = AnyEvent::Socket::format_hostport $host, $port;
423 0
424 },
425 ;
426 $LISTENER{$bind} = $listener;
427 push @$LISTENER, $bind;
428 }
429 }
430
431 $WARN->(8, "node listens on [@$LISTENER].");
432
433 # the global service is mandatory currently
434 require AnyEvent::MP::Global;
435
436 # connect to all seednodes
437 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
438
439 for (@{ $CONFIG->{services} }) {
440 if (ref) {
441 my ($func, @args) = @$_;
442 (load_func $func)->(@args);
443 } elsif (s/::$//) {
444 eval "require $_";
445 die $@ if $@;
446 } else {
447 (load_func $_)->();
448 }
449 }
450 }
451
452 #############################################################################
453 # node monitoring and info
454
455 =item node_is_known $nodeid
456
457 Returns true iff the given node is currently known to the system. The only
458 time a node is known but not up currently is when a conenction request is
459 pending.
460
461 =cut
462
463 sub node_is_known($) {
464 exists $NODE{$_[0]}
465 }
466
467 =item node_is_up $nodeid
468
469 Returns true if the given node is "up", that is, the kernel thinks it has
470 a working connection to it.
471
472 If the node is known but not currently connected, returns C<0>. If the
473 node is not known, returns C<undef>.
474
475 =cut
476
477 sub node_is_up($) {
478 ($NODE{$_[0]} or return)->{transport}
479 ? 1 : 0
480 }
481
482 =item known_nodes
483
484 Returns the node IDs of all nodes currently known to this node, including
485 itself and nodes not currently connected.
486
487 =cut
488
489 sub known_nodes() {
490 map $_->{id}, values %NODE
491 }
492
493 =item up_nodes
494
495 Return the node IDs of all nodes that are currently connected (excluding
496 the node itself).
497
498 =cut
499
500 sub up_nodes() {
501 map $_->{id}, grep $_->{transport}, values %NODE
502 }
503
504 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
505
506 Registers a callback that is called each time a node goes up (a connection
507 is established) or down (the connection is lost).
508
509 Node up messages can only be followed by node down messages for the same
510 node, and vice versa.
511
512 Note that monitoring a node is usually better done by monitoring it's node
513 port. This function is mainly of interest to modules that are concerned
514 about the network topology and low-level connection handling.
515
516 Callbacks I<must not> block and I<should not> send any messages.
517
518 The function returns an optional guard which can be used to unregister
519 the monitoring callback again.
520
521 Example: make sure you call function C<newnode> for all nodes that are up
522 or go up (and down).
523
524 newnode $_, 1 for up_nodes;
525 mon_nodes \&newnode;
526
527 =cut
528
529 our %MON_NODES;
530
531 sub mon_nodes($) {
532 my ($cb) = @_;
533
534 $MON_NODES{$cb+0} = $cb;
535
536 defined wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
537 }
538
539 sub _inject_nodeevent($$;@) {
540 my ($node, $up, @reason) = @_;
541
542 for my $cb (values %MON_NODES) {
543 eval { $cb->($node->{id}, $up, @reason); 1 }
544 or $WARN->(1, $@);
545 }
546
547 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
548 }
549
550 #############################################################################
551 # self node code
552
553 our %node_req = (
554 # internal services
555
556 # monitoring
557 mon0 => sub { # stop monitoring a port
558 my $portid = shift;
559 my $node = $SRCNODE;
560 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
561 },
562 mon1 => sub { # start monitoring a port
563 my $portid = shift;
564 my $node = $SRCNODE;
565 Scalar::Util::weaken $node;
566 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
567 delete $node->{rmon}{$portid};
568 $node->send (["", kil => $portid, @_])
569 if $node && $node->{transport};
570 });
571 },
572 kil => sub {
573 my $cbs = delete $SRCNODE->{lmon}{+shift}
574 or return;
575
576 $_->(@_) for @$cbs;
577 },
578
579 # "public" services - not actually public
580
581 # relay message to another node / generic echo
582 snd => \&snd,
583 snd_multiple => sub {
584 snd @$_ for @_
585 },
586
587 # informational
588 info => sub {
589 snd @_, $NODE;
590 },
591 known_nodes => sub {
592 snd @_, known_nodes;
593 },
594 up_nodes => sub {
595 snd @_, up_nodes;
596 },
597
598 # random utilities
599 eval => sub {
600 my @res = do { package main; eval shift };
601 snd @_, "$@", @res if @_;
602 },
603 time => sub {
604 snd @_, AE::time;
605 },
606 devnull => sub {
607 #
608 },
609 "" => sub {
610 # empty messages are keepalives or similar devnull-applications
611 },
612 );
613
614 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
615 $PORT{""} = sub {
616 my $tag = shift;
617 eval { &{ $node_req{$tag} ||= load_func $tag } };
618 $WARN->(2, "error processing node message: $@") if $@;
619 };
620
621 =back
622
623 =head1 SEE ALSO
624
625 L<AnyEvent::MP>.
626
627 =head1 AUTHOR
628
629 Marc Lehmann <schmorp@schmorp.de>
630 http://home.schmorp.de/
631
632 =cut
633
634 1
635