ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.50
Committed: Wed Sep 9 18:54:28 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.49: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '1.0';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil port_is_local
44 configure
45 up_nodes mon_nodes node_is_up
46 );
47
48 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49
50 This value is called with an error or warning message, when e.g. a
51 connection could not be created, authorisation failed and so on.
52
53 It I<must not> block or send messages -queue it and use an idle watcher if
54 you need to do any of these things.
55
56 C<$level> should be C<0> for messages to be logged always, C<1> for
57 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58 node connectivity and services, C<8> for debugging messages and C<9> for
59 tracing messages.
60
61 The default simply logs the message to STDERR.
62
63 =item @AnyEvent::MP::Kernel::WARN
64
65 All code references in this array are called for every log message, from
66 the default C<$WARN> handler. This is an easy way to tie into the log
67 messages without disturbing others.
68
69 =cut
70
71 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
72 our @WARN;
73 our $WARN = sub {
74 &$_ for @WARN;
75
76 return if $WARNLEVEL < $_[0];
77
78 my ($level, $msg) = @_;
79
80 $msg =~ s/\n$//;
81
82 printf STDERR "%s <%d> %s\n",
83 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
84 $level,
85 $msg;
86 };
87
88 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
89
90 The maximum level at which warning messages will be printed to STDERR by
91 the default warn handler.
92
93 =cut
94
95 sub load_func($) {
96 my $func = $_[0];
97
98 unless (defined &$func) {
99 my $pkg = $func;
100 do {
101 $pkg =~ s/::[^:]+$//
102 or return sub { die "unable to resolve '$func'" };
103 eval "require $pkg";
104 } until defined &$func;
105 }
106
107 \&$func
108 }
109
110 sub nonce($) {
111 my $nonce;
112
113 if (open my $fh, "</dev/urandom") {
114 sysread $fh, $nonce, $_[0];
115 } else {
116 # shit...
117 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
118 }
119
120 $nonce
121 }
122
123 sub alnumbits($) {
124 my $data = $_[0];
125
126 if (eval "use Math::GMP 2.05; 1") {
127 $data = Math::GMP::get_str_gmp (
128 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
129 62
130 );
131 } else {
132 $data = MIME::Base64::encode_base64 $data, "";
133 $data =~ s/=//;
134 $data =~ s/x/x0/g;
135 $data =~ s/\//x1/g;
136 $data =~ s/\+/x2/g;
137 }
138
139 $data
140 }
141
142 sub gen_uniq {
143 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
144 }
145
146 our $CONFIG; # this node's configuration
147
148 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
149 our $UNIQ = gen_uniq; # per-process/node unique cookie
150 our $NODE = "anon/$RUNIQ";
151 our $ID = "a";
152
153 our %NODE; # node id to transport mapping, or "undef", for local node
154 our (%PORT, %PORT_DATA); # local ports
155
156 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
157 our %LMON; # monitored _local_ ports
158
159 our %LISTENER;
160 our $LISTENER; # our listeners, as arrayref
161
162 our $SRCNODE; # holds the sending node during _inject
163
164 sub NODE() {
165 $NODE
166 }
167
168 sub node_of($) {
169 my ($node, undef) = split /#/, $_[0], 2;
170
171 $node
172 }
173
174 BEGIN {
175 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
176 ? sub () { 1 }
177 : sub () { 0 };
178 }
179
180 our $DELAY_TIMER;
181 our @DELAY_QUEUE;
182
183 sub _delay_run {
184 (shift @DELAY_QUEUE or return)->() while 1;
185 }
186
187 sub delay($) {
188 push @DELAY_QUEUE, shift;
189 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
190 }
191
192 sub _inject {
193 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
194 &{ $PORT{+shift} or return };
195 }
196
197 # this function adds a node-ref, so you can send stuff to it
198 # it is basically the central routing component.
199 sub add_node {
200 my ($node) = @_;
201
202 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
203 }
204
205 sub snd(@) {
206 my ($nodeid, $portid) = split /#/, shift, 2;
207
208 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
209
210 defined $nodeid #d#UGLY
211 or Carp::croak "'undef' is not a valid node ID/port ID";
212
213 ($NODE{$nodeid} || add_node $nodeid)
214 ->{send} (["$portid", @_]);
215 }
216
217 =item $is_local = port_is_local $port
218
219 Returns true iff the port is a local port.
220
221 =cut
222
223 sub port_is_local($) {
224 my ($nodeid, undef) = split /#/, $_[0], 2;
225
226 $NODE{$nodeid} == $NODE{""}
227 }
228
229 =item snd_to_func $node, $func, @args
230
231 Expects a node ID and a name of a function. Asynchronously tries to call
232 this function with the given arguments on that node.
233
234 This function can be used to implement C<spawn>-like interfaces.
235
236 =cut
237
238 sub snd_to_func($$;@) {
239 my $nodeid = shift;
240
241 # on $NODE, we artificially delay... (for spawn)
242 # this is very ugly - maybe we should simply delay ALL messages,
243 # to avoid deep recursion issues. but that's so... slow...
244 $AnyEvent::MP::Node::Self::DELAY = 1
245 if $nodeid ne $NODE;
246
247 defined $nodeid #d#UGLY
248 or Carp::croak "'undef' is not a valid node ID/port ID";
249
250 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
251 }
252
253 =item snd_on $node, @msg
254
255 Executes C<snd> with the given C<@msg> (which must include the destination
256 port) on the given node.
257
258 =cut
259
260 sub snd_on($@) {
261 my $node = shift;
262 snd $node, snd => @_;
263 }
264
265 =item eval_on $node, $string[, @reply]
266
267 Evaluates the given string as Perl expression on the given node. When
268 @reply is specified, then it is used to construct a reply message with
269 C<"$@"> and any results from the eval appended.
270
271 =cut
272
273 sub eval_on($$;@) {
274 my $node = shift;
275 snd $node, eval => @_;
276 }
277
278 sub kil(@) {
279 my ($nodeid, $portid) = split /#/, shift, 2;
280
281 length $portid
282 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
283
284 ($NODE{$nodeid} || add_node $nodeid)
285 ->kill ("$portid", @_);
286 }
287
288 sub _nodename {
289 require POSIX;
290 (POSIX::uname ())[1]
291 }
292
293 sub _resolve($) {
294 my ($nodeid) = @_;
295
296 my $cv = AE::cv;
297 my @res;
298
299 $cv->begin (sub {
300 my %seen;
301 my @refs;
302 for (sort { $a->[0] <=> $b->[0] } @res) {
303 push @refs, $_->[1] unless $seen{$_->[1]}++
304 }
305 shift->send (@refs);
306 });
307
308 my $idx;
309 for my $t (split /,/, $nodeid) {
310 my $pri = ++$idx;
311
312 $t = length $t ? _nodename . ":$t" : _nodename
313 if $t =~ /^\d*$/;
314
315 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
316 or Carp::croak "$t: unparsable transport descriptor";
317
318 $port = "0" if $port eq "*";
319
320 if ($host eq "*") {
321 $cv->begin;
322 # use fork_call, as Net::Interface is big, and we need it rarely.
323 require AnyEvent::Util;
324 AnyEvent::Util::fork_call (
325 sub {
326 my @addr;
327
328 require Net::Interface;
329
330 for my $if (Net::Interface->interfaces) {
331 # we statically lower-prioritise ipv6 here, TODO :()
332 for $_ ($if->address (Net::Interface::AF_INET ())) {
333 next if /^\x7f/; # skip localhost etc.
334 push @addr, $_;
335 }
336 for ($if->address (Net::Interface::AF_INET6 ())) {
337 #next if $if->scope ($_) <= 2;
338 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
339 push @addr, $_;
340 }
341
342 }
343 @addr
344 }, sub {
345 for my $ip (@_) {
346 push @res, [
347 $pri += 1e-5,
348 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
349 ];
350 }
351 $cv->end;
352 }
353 );
354 } else {
355 $cv->begin;
356 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
357 for (@_) {
358 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
359 push @res, [
360 $pri += 1e-5,
361 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
362 ];
363 }
364 $cv->end;
365 };
366 }
367 }
368
369 $cv->end;
370
371 $cv
372 }
373
374 sub configure(@) {
375 unshift @_, "profile" if @_ & 1;
376 my (%kv) = @_;
377
378 my $profile = delete $kv{profile};
379
380 $profile = _nodename
381 unless defined $profile;
382
383 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
384
385 delete $NODE{$NODE}; # we do not support doing stuff before configure
386
387 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
388 $NODE = $node
389 unless $node eq "anon/";
390
391 $NODE{$NODE} = $NODE{""};
392 $NODE{$NODE}{id} = $NODE;
393
394 my $seeds = $CONFIG->{seeds};
395 my $binds = $CONFIG->{binds};
396
397 $binds ||= ["*"];
398
399 $WARN->(8, "node $NODE starting up.");
400
401 $LISTENER = [];
402 %LISTENER = ();
403
404 for (map _resolve $_, @$binds) {
405 for my $bind ($_->recv) {
406 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
407 or Carp::croak "$bind: unparsable local bind address";
408
409 my $listener = AnyEvent::MP::Transport::mp_server
410 $host,
411 $port,
412 prepare => sub {
413 my (undef, $host, $port) = @_;
414 $bind = AnyEvent::Socket::format_hostport $host, $port;
415 },
416 ;
417 $LISTENER{$bind} = $listener;
418 push @$LISTENER, $bind;
419 }
420 }
421
422 $WARN->(8, "node listens on [@$LISTENER].");
423
424 # the global service is mandatory currently
425 require AnyEvent::MP::Global;
426
427 # connect to all seednodes
428 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
429
430 for (@{ $CONFIG->{services} }) {
431 if (ref) {
432 my ($func, @args) = @$_;
433 (load_func $func)->(@args);
434 } elsif (s/::$//) {
435 eval "require $_";
436 die $@ if $@;
437 } else {
438 (load_func $_)->();
439 }
440 }
441 }
442
443 #############################################################################
444 # node monitoring and info
445
446 =item node_is_known $nodeid
447
448 Returns true iff the given node is currently known to the system. The only
449 time a node is known but not up currently is when a conenction request is
450 pending.
451
452 =cut
453
454 sub node_is_known($) {
455 exists $NODE{$_[0]}
456 }
457
458 =item node_is_up $nodeid
459
460 Returns true if the given node is "up", that is, the kernel thinks it has
461 a working connection to it.
462
463 If the node is known but not currently connected, returns C<0>. If the
464 node is not known, returns C<undef>.
465
466 =cut
467
468 sub node_is_up($) {
469 ($NODE{$_[0]} or return)->{transport}
470 ? 1 : 0
471 }
472
473 =item known_nodes
474
475 Returns the node IDs of all nodes currently known to this node, including
476 itself and nodes not currently connected.
477
478 =cut
479
480 sub known_nodes() {
481 map $_->{id}, values %NODE
482 }
483
484 =item up_nodes
485
486 Return the node IDs of all nodes that are currently connected (excluding
487 the node itself).
488
489 =cut
490
491 sub up_nodes() {
492 map $_->{id}, grep $_->{transport}, values %NODE
493 }
494
495 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
496
497 Registers a callback that is called each time a node goes up (a connection
498 is established) or down (the connection is lost).
499
500 Node up messages can only be followed by node down messages for the same
501 node, and vice versa.
502
503 Note that monitoring a node is usually better done by monitoring it's node
504 port. This function is mainly of interest to modules that are concerned
505 about the network topology and low-level connection handling.
506
507 Callbacks I<must not> block and I<should not> send any messages.
508
509 The function returns an optional guard which can be used to unregister
510 the monitoring callback again.
511
512 Example: make sure you call function C<newnode> for all nodes that are up
513 or go up (and down).
514
515 newnode $_, 1 for up_nodes;
516 mon_nodes \&newnode;
517
518 =cut
519
520 our %MON_NODES;
521
522 sub mon_nodes($) {
523 my ($cb) = @_;
524
525 $MON_NODES{$cb+0} = $cb;
526
527 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
528 }
529
530 sub _inject_nodeevent($$;@) {
531 my ($node, $up, @reason) = @_;
532
533 for my $cb (values %MON_NODES) {
534 eval { $cb->($node->{id}, $up, @reason); 1 }
535 or $WARN->(1, $@);
536 }
537
538 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
539 }
540
541 #############################################################################
542 # self node code
543
544 our %node_req = (
545 # internal services
546
547 # monitoring
548 mon0 => sub { # stop monitoring a port
549 my $portid = shift;
550 my $node = $SRCNODE;
551 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
552 },
553 mon1 => sub { # start monitoring a port
554 my $portid = shift;
555 my $node = $SRCNODE;
556 Scalar::Util::weaken $node; #TODO# ugly
557 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
558 $node->send (["", kil => $portid, @_])
559 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
560 });
561 },
562 kil => sub {
563 my $cbs = delete $SRCNODE->{lmon}{+shift}
564 or return;
565
566 $_->(@_) for @$cbs;
567 },
568
569 # "public" services - not actually public
570
571 # relay message to another node / generic echo
572 snd => \&snd,
573 snd_multiple => sub {
574 snd @$_ for @_
575 },
576
577 # informational
578 info => sub {
579 snd @_, $NODE;
580 },
581 known_nodes => sub {
582 snd @_, known_nodes;
583 },
584 up_nodes => sub {
585 snd @_, up_nodes;
586 },
587
588 # random utilities
589 eval => sub {
590 my @res = do { package main; eval shift };
591 snd @_, "$@", @res if @_;
592 },
593 time => sub {
594 snd @_, AE::time;
595 },
596 devnull => sub {
597 #
598 },
599 "" => sub {
600 # empty messages are keepalives or similar devnull-applications
601 },
602 );
603
604 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
605 $PORT{""} = sub {
606 my $tag = shift;
607 eval { &{ $node_req{$tag} ||= load_func $tag } };
608 $WARN->(2, "error processing node message: $@") if $@;
609 };
610
611 =back
612
613 =head1 SEE ALSO
614
615 L<AnyEvent::MP>.
616
617 =head1 AUTHOR
618
619 Marc Lehmann <schmorp@schmorp.de>
620 http://home.schmorp.de/
621
622 =cut
623
624 1
625