ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.52
Committed: Fri Sep 11 02:34:25 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
CVS Tags: rel-1_1
Changes since 1.51: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '1.1';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil port_is_local
44 configure
45 up_nodes mon_nodes node_is_up
46 );
47
48 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49
50 This value is called with an error or warning message, when e.g. a
51 connection could not be created, authorisation failed and so on.
52
53 It I<must not> block or send messages -queue it and use an idle watcher if
54 you need to do any of these things.
55
56 C<$level> should be C<0> for messages to be logged always, C<1> for
57 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58 node connectivity and services, C<8> for debugging messages and C<9> for
59 tracing messages.
60
61 The default simply logs the message to STDERR.
62
63 =item @AnyEvent::MP::Kernel::WARN
64
65 All code references in this array are called for every log message, from
66 the default C<$WARN> handler. This is an easy way to tie into the log
67 messages without disturbing others.
68
69 =cut
70
71 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
72 our @WARN;
73 our $WARN = sub {
74 &$_ for @WARN;
75
76 return if $WARNLEVEL < $_[0];
77
78 my ($level, $msg) = @_;
79
80 $msg =~ s/\n$//;
81
82 printf STDERR "%s <%d> %s\n",
83 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
84 $level,
85 $msg;
86 };
87
88 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
89
90 The maximum level at which warning messages will be printed to STDERR by
91 the default warn handler.
92
93 =cut
94
95 sub load_func($) {
96 my $func = $_[0];
97
98 unless (defined &$func) {
99 my $pkg = $func;
100 do {
101 $pkg =~ s/::[^:]+$//
102 or return sub { die "unable to resolve '$func'" };
103 eval "require $pkg";
104 } until defined &$func;
105 }
106
107 \&$func
108 }
109
110 sub nonce($) {
111 my $nonce;
112
113 if (open my $fh, "</dev/urandom") {
114 sysread $fh, $nonce, $_[0];
115 } else {
116 # shit...
117 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
118 }
119
120 $nonce
121 }
122
123 sub alnumbits($) {
124 my $data = $_[0];
125
126 if (eval "use Math::GMP 2.05; 1") {
127 $data = Math::GMP::get_str_gmp (
128 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
129 62
130 );
131 } else {
132 $data = MIME::Base64::encode_base64 $data, "";
133 $data =~ s/=//;
134 $data =~ s/x/x0/g;
135 $data =~ s/\//x1/g;
136 $data =~ s/\+/x2/g;
137 }
138
139 $data
140 }
141
142 sub gen_uniq {
143 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
144 }
145
146 our $CONFIG; # this node's configuration
147
148 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
149 our $UNIQ = gen_uniq; # per-process/node unique cookie
150 our $NODE = "anon/$RUNIQ";
151 our $ID = "a";
152
153 our %NODE; # node id to transport mapping, or "undef", for local node
154 our (%PORT, %PORT_DATA); # local ports
155
156 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
157 our %LMON; # monitored _local_ ports
158
159 our %LISTENER;
160 our $LISTENER; # our listeners, as arrayref
161
162 our $SRCNODE; # holds the sending node during _inject
163
164 sub NODE() {
165 $NODE
166 }
167
168 sub node_of($) {
169 my ($node, undef) = split /#/, $_[0], 2;
170
171 $node
172 }
173
174 BEGIN {
175 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
176 ? sub () { 1 }
177 : sub () { 0 };
178 }
179
180 our $DELAY_TIMER;
181 our @DELAY_QUEUE;
182
183 sub _delay_run {
184 (shift @DELAY_QUEUE or return)->() while 1;
185 }
186
187 sub delay($) {
188 push @DELAY_QUEUE, shift;
189 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
190 }
191
192 sub _inject {
193 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
194 &{ $PORT{+shift} or return };
195 }
196
197 # this function adds a node-ref, so you can send stuff to it
198 # it is basically the central routing component.
199 sub add_node {
200 my ($node) = @_;
201
202 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
203 }
204
205 sub snd(@) {
206 my ($nodeid, $portid) = split /#/, shift, 2;
207
208 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
209
210 defined $nodeid #d#UGLY
211 or Carp::croak "'undef' is not a valid node ID/port ID";
212
213 ($NODE{$nodeid} || add_node $nodeid)
214 ->{send} (["$portid", @_]);
215 }
216
217 =item $is_local = port_is_local $port
218
219 Returns true iff the port is a local port.
220
221 =cut
222
223 sub port_is_local($) {
224 my ($nodeid, undef) = split /#/, $_[0], 2;
225
226 $NODE{$nodeid} == $NODE{""}
227 }
228
229 =item snd_to_func $node, $func, @args
230
231 Expects a node ID and a name of a function. Asynchronously tries to call
232 this function with the given arguments on that node.
233
234 This function can be used to implement C<spawn>-like interfaces.
235
236 =cut
237
238 sub snd_to_func($$;@) {
239 my $nodeid = shift;
240
241 # on $NODE, we artificially delay... (for spawn)
242 # this is very ugly - maybe we should simply delay ALL messages,
243 # to avoid deep recursion issues. but that's so... slow...
244 $AnyEvent::MP::Node::Self::DELAY = 1
245 if $nodeid ne $NODE;
246
247 defined $nodeid #d#UGLY
248 or Carp::croak "'undef' is not a valid node ID/port ID";
249
250 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
251 }
252
253 =item snd_on $node, @msg
254
255 Executes C<snd> with the given C<@msg> (which must include the destination
256 port) on the given node.
257
258 =cut
259
260 sub snd_on($@) {
261 my $node = shift;
262 snd $node, snd => @_;
263 }
264
265 =item eval_on $node, $string[, @reply]
266
267 Evaluates the given string as Perl expression on the given node. When
268 @reply is specified, then it is used to construct a reply message with
269 C<"$@"> and any results from the eval appended.
270
271 =cut
272
273 sub eval_on($$;@) {
274 my $node = shift;
275 snd $node, eval => @_;
276 }
277
278 sub kil(@) {
279 my ($nodeid, $portid) = split /#/, shift, 2;
280
281 length $portid
282 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
283
284 ($NODE{$nodeid} || add_node $nodeid)
285 ->kill ("$portid", @_);
286 }
287
288 sub _nodename {
289 require POSIX;
290 (POSIX::uname ())[1]
291 }
292
293 sub _resolve($) {
294 my ($nodeid) = @_;
295
296 my $cv = AE::cv;
297 my @res;
298
299 $cv->begin (sub {
300 my %seen;
301 my @refs;
302 for (sort { $a->[0] <=> $b->[0] } @res) {
303 push @refs, $_->[1] unless $seen{$_->[1]}++
304 }
305 shift->send (@refs);
306 });
307
308 my $idx;
309 for my $t (split /,/, $nodeid) {
310 my $pri = ++$idx;
311
312 $t = length $t ? _nodename . ":$t" : _nodename
313 if $t =~ /^\d*$/;
314
315 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
316 or Carp::croak "$t: unparsable transport descriptor";
317
318 $port = "0" if $port eq "*";
319
320 if ($host eq "*") {
321 $cv->begin;
322 # use fork_call, as Net::Interface is big, and we need it rarely.
323 require AnyEvent::Util;
324 AnyEvent::Util::fork_call (
325 sub {
326 my @addr;
327
328 require Net::Interface;
329
330 for my $if (Net::Interface->interfaces) {
331 # we statically lower-prioritise ipv6 here, TODO :()
332 for $_ ($if->address (Net::Interface::AF_INET ())) {
333 next if /^\x7f/; # skip localhost etc.
334 push @addr, $_;
335 }
336 for ($if->address (Net::Interface::AF_INET6 ())) {
337 #next if $if->scope ($_) <= 2;
338 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
339 push @addr, $_;
340 }
341
342 }
343 @addr
344 }, sub {
345 for my $ip (@_) {
346 push @res, [
347 $pri += 1e-5,
348 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
349 ];
350 }
351 $cv->end;
352 }
353 );
354 } else {
355 $cv->begin;
356 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
357 for (@_) {
358 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
359 push @res, [
360 $pri += 1e-5,
361 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
362 ];
363 }
364 $cv->end;
365 };
366 }
367 }
368
369 $cv->end;
370
371 $cv
372 }
373
374 sub configure(@) {
375 unshift @_, "profile" if @_ & 1;
376 my (%kv) = @_;
377
378 my $profile = delete $kv{profile};
379
380 $profile = _nodename
381 unless defined $profile;
382
383 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
384
385 delete $NODE{$NODE}; # we do not support doing stuff before configure
386
387 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
388
389 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
390
391 $NODE = $node
392 unless $node eq "anon/";
393
394 $NODE{$NODE} = $NODE{""};
395 $NODE{$NODE}{id} = $NODE;
396
397 my $seeds = $CONFIG->{seeds};
398 my $binds = $CONFIG->{binds};
399
400 $binds ||= ["*"];
401
402 $WARN->(8, "node $NODE starting up.");
403
404 $LISTENER = [];
405 %LISTENER = ();
406
407 for (map _resolve $_, @$binds) {
408 for my $bind ($_->recv) {
409 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
410 or Carp::croak "$bind: unparsable local bind address";
411
412 my $listener = AnyEvent::MP::Transport::mp_server
413 $host,
414 $port,
415 prepare => sub {
416 my (undef, $host, $port) = @_;
417 $bind = AnyEvent::Socket::format_hostport $host, $port;
418 },
419 ;
420 $LISTENER{$bind} = $listener;
421 push @$LISTENER, $bind;
422 }
423 }
424
425 $WARN->(8, "node listens on [@$LISTENER].");
426
427 # the global service is mandatory currently
428 require AnyEvent::MP::Global;
429
430 # connect to all seednodes
431 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
432
433 for (@{ $CONFIG->{services} }) {
434 if (ref) {
435 my ($func, @args) = @$_;
436 (load_func $func)->(@args);
437 } elsif (s/::$//) {
438 eval "require $_";
439 die $@ if $@;
440 } else {
441 (load_func $_)->();
442 }
443 }
444 }
445
446 #############################################################################
447 # node monitoring and info
448
449 =item node_is_known $nodeid
450
451 Returns true iff the given node is currently known to the system. The only
452 time a node is known but not up currently is when a conenction request is
453 pending.
454
455 =cut
456
457 sub node_is_known($) {
458 exists $NODE{$_[0]}
459 }
460
461 =item node_is_up $nodeid
462
463 Returns true if the given node is "up", that is, the kernel thinks it has
464 a working connection to it.
465
466 If the node is known but not currently connected, returns C<0>. If the
467 node is not known, returns C<undef>.
468
469 =cut
470
471 sub node_is_up($) {
472 ($NODE{$_[0]} or return)->{transport}
473 ? 1 : 0
474 }
475
476 =item known_nodes
477
478 Returns the node IDs of all nodes currently known to this node, including
479 itself and nodes not currently connected.
480
481 =cut
482
483 sub known_nodes() {
484 map $_->{id}, values %NODE
485 }
486
487 =item up_nodes
488
489 Return the node IDs of all nodes that are currently connected (excluding
490 the node itself).
491
492 =cut
493
494 sub up_nodes() {
495 map $_->{id}, grep $_->{transport}, values %NODE
496 }
497
498 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
499
500 Registers a callback that is called each time a node goes up (a connection
501 is established) or down (the connection is lost).
502
503 Node up messages can only be followed by node down messages for the same
504 node, and vice versa.
505
506 Note that monitoring a node is usually better done by monitoring it's node
507 port. This function is mainly of interest to modules that are concerned
508 about the network topology and low-level connection handling.
509
510 Callbacks I<must not> block and I<should not> send any messages.
511
512 The function returns an optional guard which can be used to unregister
513 the monitoring callback again.
514
515 Example: make sure you call function C<newnode> for all nodes that are up
516 or go up (and down).
517
518 newnode $_, 1 for up_nodes;
519 mon_nodes \&newnode;
520
521 =cut
522
523 our %MON_NODES;
524
525 sub mon_nodes($) {
526 my ($cb) = @_;
527
528 $MON_NODES{$cb+0} = $cb;
529
530 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
531 }
532
533 sub _inject_nodeevent($$;@) {
534 my ($node, $up, @reason) = @_;
535
536 for my $cb (values %MON_NODES) {
537 eval { $cb->($node->{id}, $up, @reason); 1 }
538 or $WARN->(1, $@);
539 }
540
541 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
542 }
543
544 #############################################################################
545 # self node code
546
547 our %node_req = (
548 # internal services
549
550 # monitoring
551 mon0 => sub { # stop monitoring a port
552 my $portid = shift;
553 my $node = $SRCNODE;
554 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
555 },
556 mon1 => sub { # start monitoring a port
557 my $portid = shift;
558 my $node = $SRCNODE;
559 Scalar::Util::weaken $node; #TODO# ugly
560 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
561 $node->send (["", kil => $portid, @_])
562 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
563 });
564 },
565 kil => sub {
566 my $cbs = delete $SRCNODE->{lmon}{+shift}
567 or return;
568
569 $_->(@_) for @$cbs;
570 },
571
572 # "public" services - not actually public
573
574 # relay message to another node / generic echo
575 snd => \&snd,
576 snd_multiple => sub {
577 snd @$_ for @_
578 },
579
580 # informational
581 info => sub {
582 snd @_, $NODE;
583 },
584 known_nodes => sub {
585 snd @_, known_nodes;
586 },
587 up_nodes => sub {
588 snd @_, up_nodes;
589 },
590
591 # random utilities
592 eval => sub {
593 my @res = do { package main; eval shift };
594 snd @_, "$@", @res if @_;
595 },
596 time => sub {
597 snd @_, AE::time;
598 },
599 devnull => sub {
600 #
601 },
602 "" => sub {
603 # empty messages are keepalives or similar devnull-applications
604 },
605 );
606
607 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
608 $PORT{""} = sub {
609 my $tag = shift;
610 eval { &{ $node_req{$tag} ||= load_func $tag } };
611 $WARN->(2, "error processing node message: $@") if $@;
612 };
613
614 =back
615
616 =head1 SEE ALSO
617
618 L<AnyEvent::MP>.
619
620 =head1 AUTHOR
621
622 Marc Lehmann <schmorp@schmorp.de>
623 http://home.schmorp.de/
624
625 =cut
626
627 1
628