ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.54
Committed: Thu Oct 1 20:41:36 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.53: +1 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '1.1';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil port_is_local
44 configure
45 up_nodes mon_nodes node_is_up
46 );
47
48 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
49
50 This value is called with an error or warning message, when e.g. a
51 connection could not be created, authorisation failed and so on.
52
53 It I<must not> block or send messages -queue it and use an idle watcher if
54 you need to do any of these things.
55
56 C<$level> should be C<0> for messages to be logged always, C<1> for
57 unexpected messages and errors, C<2> for warnings, C<7> for messages about
58 node connectivity and services, C<8> for debugging messages and C<9> for
59 tracing messages.
60
61 The default simply logs the message to STDERR.
62
63 =item @AnyEvent::MP::Kernel::WARN
64
65 All code references in this array are called for every log message, from
66 the default C<$WARN> handler. This is an easy way to tie into the log
67 messages without disturbing others.
68
69 =cut
70
71 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
72 our @WARN;
73 our $WARN = sub {
74 &$_ for @WARN;
75
76 return if $WARNLEVEL < $_[0];
77
78 my ($level, $msg) = @_;
79
80 $msg =~ s/\n$//;
81
82 printf STDERR "%s <%d> %s\n",
83 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
84 $level,
85 $msg;
86 };
87
88 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
89
90 The maximum level at which warning messages will be printed to STDERR by
91 the default warn handler.
92
93 =cut
94
95 sub load_func($) {
96 my $func = $_[0];
97
98 unless (defined &$func) {
99 my $pkg = $func;
100 do {
101 $pkg =~ s/::[^:]+$//
102 or return sub { die "unable to resolve '$func'" };
103 eval "require $pkg";
104 } until defined &$func;
105 }
106
107 \&$func
108 }
109
110 sub nonce($) {
111 my $nonce;
112
113 if (open my $fh, "</dev/urandom") {
114 sysread $fh, $nonce, $_[0];
115 } else {
116 # shit...
117 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
118 }
119
120 $nonce
121 }
122
123 sub alnumbits($) {
124 my $data = $_[0];
125
126 if (eval "use Math::GMP 2.05; 1") {
127 $data = Math::GMP::get_str_gmp (
128 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
129 62
130 );
131 } else {
132 $data = MIME::Base64::encode_base64 $data, "";
133 $data =~ s/=//;
134 $data =~ s/x/x0/g;
135 $data =~ s/\//x1/g;
136 $data =~ s/\+/x2/g;
137 }
138
139 $data
140 }
141
142 sub gen_uniq {
143 alnumbits pack "nna*", $$ & 0xffff, time & 0xffff, nonce 2
144 }
145
146 our $CONFIG; # this node's configuration
147
148 our $RUNIQ = alnumbits nonce 96/8; # remote uniq value
149 our $UNIQ = gen_uniq; # per-process/node unique cookie
150 our $NODE = "anon/$RUNIQ";
151 our $ID = "a";
152
153 our %NODE; # node id to transport mapping, or "undef", for local node
154 our (%PORT, %PORT_DATA); # local ports
155
156 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
157 our %LMON; # monitored _local_ ports
158
159 our %LISTENER;
160 our $LISTENER; # our listeners, as arrayref
161
162 our $SRCNODE; # holds the sending node during _inject
163
164 sub NODE() {
165 $NODE
166 }
167
168 sub node_of($) {
169 my ($node, undef) = split /#/, $_[0], 2;
170
171 $node
172 }
173
174 BEGIN {
175 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
176 ? sub () { 1 }
177 : sub () { 0 };
178 }
179
180 our $DELAY_TIMER;
181 our @DELAY_QUEUE;
182
183 sub _delay_run {
184 (shift @DELAY_QUEUE or return)->() while 1;
185 undef $DELAY_TIMER;
186 }
187
188 sub delay($) {
189 push @DELAY_QUEUE, shift;
190 $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run;
191 }
192
193 sub _inject {
194 warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
195 &{ $PORT{+shift} or return };
196 }
197
198 # this function adds a node-ref, so you can send stuff to it
199 # it is basically the central routing component.
200 sub add_node {
201 my ($node) = @_;
202
203 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
204 }
205
206 sub snd(@) {
207 my ($nodeid, $portid) = split /#/, shift, 2;
208
209 warn "SND $nodeid <- " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d#
210
211 defined $nodeid #d#UGLY
212 or Carp::croak "'undef' is not a valid node ID/port ID";
213
214 ($NODE{$nodeid} || add_node $nodeid)
215 ->{send} (["$portid", @_]);
216 }
217
218 =item $is_local = port_is_local $port
219
220 Returns true iff the port is a local port.
221
222 =cut
223
224 sub port_is_local($) {
225 my ($nodeid, undef) = split /#/, $_[0], 2;
226
227 $NODE{$nodeid} == $NODE{""}
228 }
229
230 =item snd_to_func $node, $func, @args
231
232 Expects a node ID and a name of a function. Asynchronously tries to call
233 this function with the given arguments on that node.
234
235 This function can be used to implement C<spawn>-like interfaces.
236
237 =cut
238
239 sub snd_to_func($$;@) {
240 my $nodeid = shift;
241
242 # on $NODE, we artificially delay... (for spawn)
243 # this is very ugly - maybe we should simply delay ALL messages,
244 # to avoid deep recursion issues. but that's so... slow...
245 $AnyEvent::MP::Node::Self::DELAY = 1
246 if $nodeid ne $NODE;
247
248 defined $nodeid #d#UGLY
249 or Carp::croak "'undef' is not a valid node ID/port ID";
250
251 ($NODE{$nodeid} || add_node $nodeid)->send (["", @_]);
252 }
253
254 =item snd_on $node, @msg
255
256 Executes C<snd> with the given C<@msg> (which must include the destination
257 port) on the given node.
258
259 =cut
260
261 sub snd_on($@) {
262 my $node = shift;
263 snd $node, snd => @_;
264 }
265
266 =item eval_on $node, $string[, @reply]
267
268 Evaluates the given string as Perl expression on the given node. When
269 @reply is specified, then it is used to construct a reply message with
270 C<"$@"> and any results from the eval appended.
271
272 =cut
273
274 sub eval_on($$;@) {
275 my $node = shift;
276 snd $node, eval => @_;
277 }
278
279 sub kil(@) {
280 my ($nodeid, $portid) = split /#/, shift, 2;
281
282 length $portid
283 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
284
285 ($NODE{$nodeid} || add_node $nodeid)
286 ->kill ("$portid", @_);
287 }
288
289 sub _nodename {
290 require POSIX;
291 (POSIX::uname ())[1]
292 }
293
294 sub _resolve($) {
295 my ($nodeid) = @_;
296
297 my $cv = AE::cv;
298 my @res;
299
300 $cv->begin (sub {
301 my %seen;
302 my @refs;
303 for (sort { $a->[0] <=> $b->[0] } @res) {
304 push @refs, $_->[1] unless $seen{$_->[1]}++
305 }
306 shift->send (@refs);
307 });
308
309 my $idx;
310 for my $t (split /,/, $nodeid) {
311 my $pri = ++$idx;
312
313 $t = length $t ? _nodename . ":$t" : _nodename
314 if $t =~ /^\d*$/;
315
316 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
317 or Carp::croak "$t: unparsable transport descriptor";
318
319 $port = "0" if $port eq "*";
320
321 if ($host eq "*") {
322 $cv->begin;
323 # use fork_call, as Net::Interface is big, and we need it rarely.
324 require AnyEvent::Util;
325 AnyEvent::Util::fork_call (
326 sub {
327 my @addr;
328
329 require Net::Interface;
330
331 for my $if (Net::Interface->interfaces) {
332 # we statically lower-prioritise ipv6 here, TODO :()
333 for $_ ($if->address (Net::Interface::AF_INET ())) {
334 next if /^\x7f/; # skip localhost etc.
335 push @addr, $_;
336 }
337 for ($if->address (Net::Interface::AF_INET6 ())) {
338 #next if $if->scope ($_) <= 2;
339 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
340 push @addr, $_;
341 }
342
343 }
344 @addr
345 }, sub {
346 for my $ip (@_) {
347 push @res, [
348 $pri += 1e-5,
349 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
350 ];
351 }
352 $cv->end;
353 }
354 );
355 } else {
356 $cv->begin;
357 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
358 for (@_) {
359 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
360 push @res, [
361 $pri += 1e-5,
362 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
363 ];
364 }
365 $cv->end;
366 };
367 }
368 }
369
370 $cv->end;
371
372 $cv
373 }
374
375 sub configure(@) {
376 unshift @_, "profile" if @_ & 1;
377 my (%kv) = @_;
378
379 my $profile = delete $kv{profile};
380
381 $profile = _nodename
382 unless defined $profile;
383
384 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
385
386 delete $NODE{$NODE}; # we do not support doing stuff before configure
387
388 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
389
390 $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n";
391
392 $NODE = $node
393 unless $node eq "anon/";
394
395 $NODE{$NODE} = $NODE{""};
396 $NODE{$NODE}{id} = $NODE;
397
398 my $seeds = $CONFIG->{seeds};
399 my $binds = $CONFIG->{binds};
400
401 $binds ||= ["*"];
402
403 $WARN->(8, "node $NODE starting up.");
404
405 $LISTENER = [];
406 %LISTENER = ();
407
408 for (map _resolve $_, @$binds) {
409 for my $bind ($_->recv) {
410 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
411 or Carp::croak "$bind: unparsable local bind address";
412
413 my $listener = AnyEvent::MP::Transport::mp_server
414 $host,
415 $port,
416 prepare => sub {
417 my (undef, $host, $port) = @_;
418 $bind = AnyEvent::Socket::format_hostport $host, $port;
419 0
420 },
421 ;
422 $LISTENER{$bind} = $listener;
423 push @$LISTENER, $bind;
424 }
425 }
426
427 $WARN->(8, "node listens on [@$LISTENER].");
428
429 # the global service is mandatory currently
430 require AnyEvent::MP::Global;
431
432 # connect to all seednodes
433 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
434
435 for (@{ $CONFIG->{services} }) {
436 if (ref) {
437 my ($func, @args) = @$_;
438 (load_func $func)->(@args);
439 } elsif (s/::$//) {
440 eval "require $_";
441 die $@ if $@;
442 } else {
443 (load_func $_)->();
444 }
445 }
446 }
447
448 #############################################################################
449 # node monitoring and info
450
451 =item node_is_known $nodeid
452
453 Returns true iff the given node is currently known to the system. The only
454 time a node is known but not up currently is when a conenction request is
455 pending.
456
457 =cut
458
459 sub node_is_known($) {
460 exists $NODE{$_[0]}
461 }
462
463 =item node_is_up $nodeid
464
465 Returns true if the given node is "up", that is, the kernel thinks it has
466 a working connection to it.
467
468 If the node is known but not currently connected, returns C<0>. If the
469 node is not known, returns C<undef>.
470
471 =cut
472
473 sub node_is_up($) {
474 ($NODE{$_[0]} or return)->{transport}
475 ? 1 : 0
476 }
477
478 =item known_nodes
479
480 Returns the node IDs of all nodes currently known to this node, including
481 itself and nodes not currently connected.
482
483 =cut
484
485 sub known_nodes() {
486 map $_->{id}, values %NODE
487 }
488
489 =item up_nodes
490
491 Return the node IDs of all nodes that are currently connected (excluding
492 the node itself).
493
494 =cut
495
496 sub up_nodes() {
497 map $_->{id}, grep $_->{transport}, values %NODE
498 }
499
500 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
501
502 Registers a callback that is called each time a node goes up (a connection
503 is established) or down (the connection is lost).
504
505 Node up messages can only be followed by node down messages for the same
506 node, and vice versa.
507
508 Note that monitoring a node is usually better done by monitoring it's node
509 port. This function is mainly of interest to modules that are concerned
510 about the network topology and low-level connection handling.
511
512 Callbacks I<must not> block and I<should not> send any messages.
513
514 The function returns an optional guard which can be used to unregister
515 the monitoring callback again.
516
517 Example: make sure you call function C<newnode> for all nodes that are up
518 or go up (and down).
519
520 newnode $_, 1 for up_nodes;
521 mon_nodes \&newnode;
522
523 =cut
524
525 our %MON_NODES;
526
527 sub mon_nodes($) {
528 my ($cb) = @_;
529
530 $MON_NODES{$cb+0} = $cb;
531
532 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
533 }
534
535 sub _inject_nodeevent($$;@) {
536 my ($node, $up, @reason) = @_;
537
538 for my $cb (values %MON_NODES) {
539 eval { $cb->($node->{id}, $up, @reason); 1 }
540 or $WARN->(1, $@);
541 }
542
543 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
544 }
545
546 #############################################################################
547 # self node code
548
549 our %node_req = (
550 # internal services
551
552 # monitoring
553 mon0 => sub { # stop monitoring a port
554 my $portid = shift;
555 my $node = $SRCNODE;
556 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
557 },
558 mon1 => sub { # start monitoring a port
559 my $portid = shift;
560 my $node = $SRCNODE;
561 Scalar::Util::weaken $node; #TODO# ugly
562 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
563 $node->send (["", kil => $portid, @_])
564 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disconnect
565 });
566 },
567 kil => sub {
568 my $cbs = delete $SRCNODE->{lmon}{+shift}
569 or return;
570
571 $_->(@_) for @$cbs;
572 },
573
574 # "public" services - not actually public
575
576 # relay message to another node / generic echo
577 snd => \&snd,
578 snd_multiple => sub {
579 snd @$_ for @_
580 },
581
582 # informational
583 info => sub {
584 snd @_, $NODE;
585 },
586 known_nodes => sub {
587 snd @_, known_nodes;
588 },
589 up_nodes => sub {
590 snd @_, up_nodes;
591 },
592
593 # random utilities
594 eval => sub {
595 my @res = do { package main; eval shift };
596 snd @_, "$@", @res if @_;
597 },
598 time => sub {
599 snd @_, AE::time;
600 },
601 devnull => sub {
602 #
603 },
604 "" => sub {
605 # empty messages are keepalives or similar devnull-applications
606 },
607 );
608
609 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
610 $PORT{""} = sub {
611 my $tag = shift;
612 eval { &{ $node_req{$tag} ||= load_func $tag } };
613 $WARN->(2, "error processing node message: $@") if $@;
614 };
615
616 =back
617
618 =head1 SEE ALSO
619
620 L<AnyEvent::MP>.
621
622 =head1 AUTHOR
623
624 Marc Lehmann <schmorp@schmorp.de>
625 http://home.schmorp.de/
626
627 =cut
628
629 1
630