ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.58
Committed: Mon Oct 5 19:39:43 2009 UTC (14 years, 7 months ago) by root
Branch: MAIN
CVS Tags: rel-1_21
Changes since 1.57: +1 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our @EXPORT = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 add_node load_func snd_to_func snd_on eval_on
41
42 NODE $NODE node_of snd kil port_is_local
43 configure
44 up_nodes mon_nodes node_is_up
45 );
46
47 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
48
49 This value is called with an error or warning message, when e.g. a
50 connection could not be created, authorisation failed and so on.
51
52 It I<must not> block or send messages -queue it and use an idle watcher if
53 you need to do any of these things.
54
55 C<$level> should be C<0> for messages to be logged always, C<1> for
56 unexpected messages and errors, C<2> for warnings, C<7> for messages about
57 node connectivity and services, C<8> for debugging messages and C<9> for
58 tracing messages.
59
60 The default simply logs the message to STDERR.
61
62 =item @AnyEvent::MP::Kernel::WARN
63
64 All code references in this array are called for every log message, from
65 the default C<$WARN> handler. This is an easy way to tie into the log
66 messages without disturbing others.
67
68 =cut
69
70 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
71 our @WARN;
72 our $WARN = sub {
73 &$_ for @WARN;
74
75 return if $WARNLEVEL < $_[0];
76
77 my ($level, $msg) = @_;
78
79 $msg =~ s/\n$//;
80
81 printf STDERR "%s <%d> %s\n",
82 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
83 $level,
84 $msg;
85 };
86
87 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
88
89 The maximum level at which warning messages will be printed to STDERR by
90 the default warn handler.
91
92 =cut
93
94 sub load_func($) {
95 my $func = $_[0];
96
97 unless (defined &$func) {
98 my $pkg = $func;
99 do {
100 $pkg =~ s/::[^:]+$//
101 or return sub { die "unable to resolve '$func'" };
102 eval "require $pkg";
103 } until defined &$func;
104 }
105
106 \&$func
107 }
108
109 sub nonce($) {
110 my $nonce;
111
112 if (open my $fh, "</dev/urandom") {
113 sysread $fh, $nonce, $_[0];
114 } else {
115 # shit...
116 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
117 }
118
119 $nonce
120 }
121
122 sub alnumbits($) {
123 my $data = $_[0];
124
125 if (eval "use Math::GMP 2.05; 1") {
126 $data = Math::GMP::get_str_gmp (
127 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
128 62
129 );
130 } else {
131 $data = MIME::Base64::encode_base64 $data, "";
132 $data =~ s/=//;
133 $data =~ s/x/x0/g;
134 $data =~ s/\//x1/g;
135 $data =~ s/\+/x2/g;
136 }
137
138 $data
139 }
140
141 sub gen_uniq {
142 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
143 }
144
145 our $CONFIG; # this node's configuration
146
147 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
148 our $UNIQ = gen_uniq; # per-process/node unique cookie
149 our $NODE = "anon/$RUNIQ";
150 our $ID = "a";
151
152 our %NODE; # node id to transport mapping, or "undef", for local node
153 our (%PORT, %PORT_DATA); # local ports
154
155 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
156 our %LMON; # monitored _local_ ports
157
158 our %LISTENER;
159 our $LISTENER; # our listeners, as arrayref
160
161 our $SRCNODE; # holds the sending node during _inject
162
163 sub NODE() {
164 $NODE
165 }
166
167 sub node_of($) {
168 my ($node, undef) = split /#/, $_[0], 2;
169
170 $node
171 }
172
173 BEGIN {
174 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
175 ? sub () { 1 }
176 : sub () { 0 };
177 }
178
179 our $DELAY_TIMER;
180 our @DELAY_QUEUE;
181
182 sub _delay_run {
183 (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1;
184 }
185
186 sub delay($) {
187 push @DELAY_QUEUE, shift;
188 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
189 }
190
191 sub _inject {
192 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
193 &{ $PORT{+shift} or return };
194 }
195
196 # this function adds a node-ref, so you can send stuff to it
197 # it is basically the central routing component.
198 sub add_node {
199 my ($node) = @_;
200
201 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
202 }
203
204 sub snd(@) {
205 my ($nodeid, $portid) = split /#/, shift, 2;
206
207 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
208
209 defined $nodeid #d#UGLY
210 or Carp::croak "'undef' is not a valid node ID/port ID";
211
212 ($NODE{$nodeid} || add_node $nodeid)
213 ->{send} (["$portid", @_]);
214 }
215
216 =item $is_local = port_is_local $port
217
218 Returns true iff the port is a local port.
219
220 =cut
221
222 sub port_is_local($) {
223 my ($nodeid, undef) = split /#/, $_[0], 2;
224
225 $NODE{$nodeid} == $NODE{""}
226 }
227
228 =item snd_to_func $node, $func, @args
229
230 Expects a node ID and a name of a function. Asynchronously tries to call
231 this function with the given arguments on that node.
232
233 This function can be used to implement C<spawn>-like interfaces.
234
235 =cut
236
237 sub snd_to_func($$;@) {
238 my $nodeid = shift;
239
240 # on $NODE, we artificially delay... (for spawn)
241 # this is very ugly - maybe we should simply delay ALL messages,
242 # to avoid deep recursion issues. but that's so... slow...
243 $AnyEvent::MP::Node::Self::DELAY = 1
244 if $nodeid ne $NODE;
245
246 defined $nodeid #d#UGLY
247 or Carp::croak "'undef' is not a valid node ID/port ID";
248
249 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
250 }
251
252 =item snd_on $node, @msg
253
254 Executes C<snd> with the given C<@msg> (which must include the destination
255 port) on the given node.
256
257 =cut
258
259 sub snd_on($@) {
260 my $node = shift;
261 snd $node, snd => @_;
262 }
263
264 =item eval_on $node, $string[, @reply]
265
266 Evaluates the given string as Perl expression on the given node. When
267 @reply is specified, then it is used to construct a reply message with
268 C<"$@"> and any results from the eval appended.
269
270 =cut
271
272 sub eval_on($$;@) {
273 my $node = shift;
274 snd $node, eval => @_;
275 }
276
277 sub kil(@) {
278 my ($nodeid, $portid) = split /#/, shift, 2;
279
280 length $portid
281 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
282
283 ($NODE{$nodeid} || add_node $nodeid)
284 ->kill ("$portid", @_);
285 }
286
287 sub _nodename {
288 require POSIX;
289 (POSIX::uname ())[1]
290 }
291
292 sub _resolve($) {
293 my ($nodeid) = @_;
294
295 my $cv = AE::cv;
296 my @res;
297
298 $cv->begin (sub {
299 my %seen;
300 my @refs;
301 for (sort { $a->[0] <=> $b->[0] } @res) {
302 push @refs, $_->[1] unless $seen{$_->[1]}++
303 }
304 shift->send (@refs);
305 });
306
307 my $idx;
308 for my $t (split /,/, $nodeid) {
309 my $pri = ++$idx;
310
311 $t = length $t ? _nodename . ":$t" : _nodename
312 if $t =~ /^\d*$/;
313
314 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
315 or Carp::croak "$t: unparsable transport descriptor";
316
317 $port = "0" if $port eq "*";
318
319 if ($host eq "*") {
320 $cv->begin;
321 # use fork_call, as Net::Interface is big, and we need it rarely.
322 require AnyEvent::Util;
323 AnyEvent::Util::fork_call (
324 sub {
325 my @addr;
326
327 require Net::Interface;
328
329 for my $if (Net::Interface->interfaces) {
330 # we statically lower-prioritise ipv6 here, TODO :()
331 for $_ ($if->address (Net::Interface::AF_INET ())) {
332 next if /^\x7f/; # skip localhost etc.
333 push @addr, $_;
334 }
335 for ($if->address (Net::Interface::AF_INET6 ())) {
336 #next if $if->scope ($_) <= 2;
337 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
338 push @addr, $_;
339 }
340
341 }
342 @addr
343 }, sub {
344 for my $ip (@_) {
345 push @res, [
346 $pri += 1e-5,
347 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
348 ];
349 }
350 $cv->end;
351 }
352 );
353 } else {
354 $cv->begin;
355 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
356 for (@_) {
357 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
358 push @res, [
359 $pri += 1e-5,
360 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
361 ];
362 }
363 $cv->end;
364 };
365 }
366 }
367
368 $cv->end;
369
370 $cv
371 }
372
373 sub configure(@) {
374 unshift @_, "profile" if @_ & 1;
375 my (%kv) = @_;
376
377 my $profile = delete $kv{profile};
378
379 $profile = _nodename
380 unless defined $profile;
381
382 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
383
384 delete $NODE{$NODE}; # we do not support doing stuff before configure
385
386 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
387
388 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
389
390 $NODE = $node
391 unless $node eq "anon/";
392
393 $NODE{$NODE} = $NODE{""};
394 $NODE{$NODE}{id} = $NODE;
395
396 my $seeds = $CONFIG->{seeds};
397 my $binds = $CONFIG->{binds};
398
399 $binds ||= ["*"];
400
401 $WARN->(8, "node $NODE starting up.");
402
403 $LISTENER = [];
404 %LISTENER = ();
405
406 for (map _resolve $_, @$binds) {
407 for my $bind ($_->recv) {
408 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
409 or Carp::croak "$bind: unparsable local bind address";
410
411 my $listener = AnyEvent::MP::Transport::mp_server
412 $host,
413 $port,
414 prepare => sub {
415 my (undef, $host, $port) = @_;
416 $bind = AnyEvent::Socket::format_hostport $host, $port;
417 0
418 },
419 ;
420 $LISTENER{$bind} = $listener;
421 push @$LISTENER, $bind;
422 }
423 }
424
425 $WARN->(8, "node listens on [@$LISTENER].");
426
427 # the global service is mandatory currently
428 require AnyEvent::MP::Global;
429
430 # connect to all seednodes
431 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
432
433 for (@{ $CONFIG->{services} }) {
434 if (ref) {
435 my ($func, @args) = @$_;
436 (load_func $func)->(@args);
437 } elsif (s/::$//) {
438 eval "require $_";
439 die $@ if $@;
440 } else {
441 (load_func $_)->();
442 }
443 }
444 }
445
446 #############################################################################
447 # node monitoring and info
448
449 =item node_is_known $nodeid
450
451 Returns true iff the given node is currently known to the system. The only
452 time a node is known but not up currently is when a conenction request is
453 pending.
454
455 =cut
456
457 sub node_is_known($) {
458 exists $NODE{$_[0]}
459 }
460
461 =item node_is_up $nodeid
462
463 Returns true if the given node is "up", that is, the kernel thinks it has
464 a working connection to it.
465
466 If the node is known but not currently connected, returns C<0>. If the
467 node is not known, returns C<undef>.
468
469 =cut
470
471 sub node_is_up($) {
472 ($NODE{$_[0]} or return)->{transport}
473 ? 1 : 0
474 }
475
476 =item known_nodes
477
478 Returns the node IDs of all nodes currently known to this node, including
479 itself and nodes not currently connected.
480
481 =cut
482
483 sub known_nodes() {
484 map $_->{id}, values %NODE
485 }
486
487 =item up_nodes
488
489 Return the node IDs of all nodes that are currently connected (excluding
490 the node itself).
491
492 =cut
493
494 sub up_nodes() {
495 map $_->{id}, grep $_->{transport}, values %NODE
496 }
497
498 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
499
500 Registers a callback that is called each time a node goes up (a connection
501 is established) or down (the connection is lost).
502
503 Node up messages can only be followed by node down messages for the same
504 node, and vice versa.
505
506 Note that monitoring a node is usually better done by monitoring it's node
507 port. This function is mainly of interest to modules that are concerned
508 about the network topology and low-level connection handling.
509
510 Callbacks I<must not> block and I<should not> send any messages.
511
512 The function returns an optional guard which can be used to unregister
513 the monitoring callback again.
514
515 Example: make sure you call function C<newnode> for all nodes that are up
516 or go up (and down).
517
518 newnode $_, 1 for up_nodes;
519 mon_nodes \&newnode;
520
521 =cut
522
523 our %MON_NODES;
524
525 sub mon_nodes($) {
526 my ($cb) = @_;
527
528 $MON_NODES{$cb+0} = $cb;
529
530 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
531 }
532
533 sub _inject_nodeevent($$;@) {
534 my ($node, $up, @reason) = @_;
535
536 for my $cb (values %MON_NODES) {
537 eval { $cb->($node->{id}, $up, @reason); 1 }
538 or $WARN->(1, $@);
539 }
540
541 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
542 }
543
544 #############################################################################
545 # self node code
546
547 our %node_req = (
548 # internal services
549
550 # monitoring
551 mon0 => sub { # stop monitoring a port
552 my $portid = shift;
553 my $node = $SRCNODE;
554 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
555 },
556 mon1 => sub { # start monitoring a port
557 my $portid = shift;
558 my $node = $SRCNODE;
559 Scalar::Util::weaken $node; #TODO# ugly
560 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
561 delete $node->{rmon}{$portid};
562 $node->send (["", kil => $portid, @_])
563 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
564 });
565 },
566 kil => sub {
567 my $cbs = delete $SRCNODE->{lmon}{+shift}
568 or return;
569
570 $_->(@_) for @$cbs;
571 },
572
573 # "public" services - not actually public
574
575 # relay message to another node / generic echo
576 snd => \&snd,
577 snd_multiple => sub {
578 snd @$_ for @_
579 },
580
581 # informational
582 info => sub {
583 snd @_, $NODE;
584 },
585 known_nodes => sub {
586 snd @_, known_nodes;
587 },
588 up_nodes => sub {
589 snd @_, up_nodes;
590 },
591
592 # random utilities
593 eval => sub {
594 my @res = do { package main; eval shift };
595 snd @_, "$@", @res if @_;
596 },
597 time => sub {
598 snd @_, AE::time;
599 },
600 devnull => sub {
601 #
602 },
603 "" => sub {
604 # empty messages are keepalives or similar devnull-applications
605 },
606 );
607
608 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
609 $PORT{""} = sub {
610 my $tag = shift;
611 eval { &{ $node_req{$tag} ||= load_func $tag } };
612 $WARN->(2, "error processing node message: $@") if $@;
613 };
614
615 =back
616
617 =head1 SEE ALSO
618
619 L<AnyEvent::MP>.
620
621 =head1 AUTHOR
622
623 Marc Lehmann <schmorp@schmorp.de>
624 http://home.schmorp.de/
625
626 =cut
627
628 1
629