ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.35
Committed: Mon Aug 31 20:06:45 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-0_95
Changes since 1.34: +1 -1 lines
Log Message:
0.95

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '0.95';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil port_is_local
44 configure
45 known_nodes up_nodes mon_nodes node_is_known node_is_up
46 );
47
48 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
49 our $NETWORK_LATENCY = 3; # activity timeout
50 our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
51
52 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
53
54 This value is called with an error or warning message, when e.g. a
55 connection could not be created, authorisation failed and so on.
56
57 It I<must not> block or send messages -queue it and use an idle watcher if
58 you need to do any of these things.
59
60 C<$level> sould be C<0> for messages ot be logged always, C<1> for
61 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62 node connectivity and services, C<8> for debugging messages and C<9> for
63 tracing messages.
64
65 The default simply logs the message to STDERR.
66
67 =cut
68
69 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
70
71 our $WARN = sub {
72 return if $WARNLEVEL < $_[0];
73
74 my ($level, $msg) = @_;
75
76 $msg =~ s/\n$//;
77
78 printf STDERR "%s <%d> %s\n",
79 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
80 $level,
81 $msg;
82 };
83
84 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
85
86 The maximum level at which warning messages will be printed to STDERR by
87 the default warn handler.
88
89 =cut
90
91 sub load_func($) {
92 my $func = $_[0];
93
94 unless (defined &$func) {
95 my $pkg = $func;
96 do {
97 $pkg =~ s/::[^:]+$//
98 or return sub { die "unable to resolve '$func'" };
99 eval "require $pkg";
100 } until defined &$func;
101 }
102
103 \&$func
104 }
105
106 sub nonce($) {
107 my $nonce;
108
109 if (open my $fh, "</dev/urandom") {
110 sysread $fh, $nonce, $_[0];
111 } else {
112 # shit...
113 our $nonce_init;
114 unless ($nonce_init++) {
115 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
116 }
117 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
118 }
119
120 $nonce
121 }
122
123 sub alnumbits($) {
124 my $data = $_[0];
125
126 if (eval "use Math::GMP 2.05; 1") {
127 $data = Math::GMP::get_str_gmp (
128 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
129 62
130 );
131 } else {
132 $data = MIME::Base64::encode_base64 $data, "";
133 $data =~ s/=//;
134 $data =~ s/x/x0/g;
135 $data =~ s/\//x1/g;
136 $data =~ s/\+/x2/g;
137 }
138
139 $data
140 }
141
142 sub gen_uniq {
143 alnumbits pack "wNa*", $$, time, nonce 2
144 }
145
146 our $CONFIG; # this node's configuration
147
148 our $RUNIQ = alnumbits nonce 16;; # remote uniq value
149 our $UNIQ = gen_uniq; # per-process/node unique cookie
150 our $NODE = "anon/$RUNIQ";
151 our $ID = "a";
152
153 our %NODE; # node id to transport mapping, or "undef", for local node
154 our (%PORT, %PORT_DATA); # local ports
155
156 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
157 our %LMON; # monitored _local_ ports
158
159 our %LISTENER;
160 our $LISTENER; # our listeners, as arrayref
161
162 our $SRCNODE; # holds the sending node during _inject
163
164 sub NODE() {
165 $NODE
166 }
167
168 sub node_of($) {
169 my ($node, undef) = split /#/, $_[0], 2;
170
171 $node
172 }
173
174 BEGIN {
175 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
176 ? sub () { 1 }
177 : sub () { 0 };
178 }
179
180 sub _inject {
181 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
182 &{ $PORT{+shift} or return };
183 }
184
185 # this function adds a node-ref, so you can send stuff to it
186 # it is basically the central routing component.
187 sub add_node {
188 my ($node) = @_;
189
190 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
191 }
192
193 sub snd(@) {
194 my ($nodeid, $portid) = split /#/, shift, 2;
195
196 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
197
198 ($NODE{$nodeid} || add_node $nodeid)
199 ->{send} (["$portid", @_]);
200 }
201
202 =item $is_local = port_is_local $port
203
204 Returns true iff the port is a local port.
205
206 =cut
207
208 sub port_is_local($) {
209 my ($nodeid, undef) = split /#/, $_[0], 2;
210
211 $NODE{$nodeid} == $NODE{""}
212 }
213
214 =item snd_to_func $node, $func, @args
215
216 Expects a node ID and a name of a function. Asynchronously tries to call
217 this function with the given arguments on that node.
218
219 This function can be used to implement C<spawn>-like interfaces.
220
221 =cut
222
223 sub snd_to_func($$;@) {
224 my $nodeid = shift;
225
226 ($NODE{$nodeid} || add_node $nodeid)
227 ->send (["", @_]);
228 }
229
230 =item snd_on $node, @msg
231
232 Executes C<snd> with the given C<@msg> (which must include the destination
233 port) on the given node.
234
235 =cut
236
237 sub snd_on($@) {
238 my $node = shift;
239 snd $node, snd => @_;
240 }
241
242 =item eval_on $node, $string[, @reply]
243
244 Evaluates the given string as Perl expression on the given node. When
245 @reply is specified, then it is used to construct a reply message with
246 C<"$@"> and any results from the eval appended.
247
248 =cut
249
250 sub eval_on($$;@) {
251 my $node = shift;
252 snd $node, eval => @_;
253 }
254
255 sub kil(@) {
256 my ($nodeid, $portid) = split /#/, shift, 2;
257
258 length $portid
259 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
260
261 ($NODE{$nodeid} || add_node $nodeid)
262 ->kill ("$portid", @_);
263 }
264
265 sub _nodename {
266 require POSIX;
267 (POSIX::uname ())[1]
268 }
269
270 sub _resolve($) {
271 my ($nodeid) = @_;
272
273 my $cv = AE::cv;
274 my @res;
275
276 $cv->begin (sub {
277 my %seen;
278 my @refs;
279 for (sort { $a->[0] <=> $b->[0] } @res) {
280 push @refs, $_->[1] unless $seen{$_->[1]}++
281 }
282 shift->send (@refs);
283 });
284
285 my $idx;
286 for my $t (split /,/, $nodeid) {
287 my $pri = ++$idx;
288
289 $t = length $t ? _nodename . ":$t" : _nodename
290 if $t =~ /^\d*$/;
291
292 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0
293 or Carp::croak "$t: unparsable transport descriptor";
294
295 $port = "0" if $port eq "*";
296
297 if ($host eq "*") {
298 $cv->begin;
299 # use fork_call, as Net::Interface is big, and we need it rarely.
300 require AnyEvent::Util;
301 AnyEvent::Util::fork_call (
302 sub {
303 my @addr;
304
305 require Net::Interface;
306
307 for my $if (Net::Interface->interfaces) {
308 # we statically lower-prioritise ipv6 here, TODO :()
309 for my $_ ($if->address (Net::Interface::AF_INET ())) {
310 next if /^\x7f/; # skip localhost etc.
311 push @addr, $_;
312 }
313 for ($if->address (Net::Interface::AF_INET6 ())) {
314 #next if $if->scope ($_) <= 2;
315 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
316 push @addr, $_;
317 }
318
319 }
320 @addr
321 }, sub {
322 for my $ip (@_) {
323 push @res, [
324 $pri += 1e-5,
325 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
326 ];
327 }
328 $cv->end;
329 }
330 );
331 } else {
332 $cv->begin;
333 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
334 for (@_) {
335 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
336 push @res, [
337 $pri += 1e-5,
338 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
339 ];
340 }
341 $cv->end;
342 };
343 }
344 }
345
346 $cv->end;
347
348 $cv
349 }
350
351 sub configure(%) {
352 my (%kv) = @_;
353
354 my $profile = delete $kv{profile};
355
356 $profile = _nodename
357 unless defined $profile;
358
359 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
360
361 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
362 $NODE = $node
363 unless $node eq "anon/";
364
365 $NODE{$NODE} = $NODE{""};
366 $NODE{$NODE}{id} = $NODE;
367
368 my $seeds = $CONFIG->{seeds};
369 my $binds = $CONFIG->{binds};
370
371 $binds ||= ["*"];
372
373 $WARN->(8, "node $NODE starting up.");
374
375 $LISTENER = [];
376 %LISTENER = ();
377
378 for (map _resolve $_, @$binds) {
379 for my $bind ($_->recv) {
380 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
381 or Carp::croak "$bind: unparsable local bind address";
382
383 my $listener = AnyEvent::MP::Transport::mp_server
384 $host,
385 $port,
386 prepare => sub {
387 my (undef, $host, $port) = @_;
388 $bind = AnyEvent::Socket::format_hostport $host, $port;
389 },
390 ;
391 $LISTENER{$bind} = $listener;
392 push @$LISTENER, $bind;
393 }
394 }
395
396 # the global service is mandatory currently
397 require AnyEvent::MP::Global;
398
399 # connect to all seednodes
400 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
401
402 for (@{ $CONFIG->{services} }) {
403 if (s/::$//) {
404 eval "require $_";
405 die $@ if $@;
406 } else {
407 (load_func $_)->();
408 }
409 }
410 }
411
412 #############################################################################
413 # node monitoring and info
414
415 =item node_is_known $nodeid
416
417 Returns true iff the given node is currently known to the system.
418
419 =cut
420
421 sub node_is_known($) {
422 exists $NODE{$_[0]}
423 }
424
425 =item node_is_up $nodeid
426
427 Returns true if the given node is "up", that is, the kernel thinks it has
428 a working connection to it.
429
430 If the node is known but not currently connected, returns C<0>. If the
431 node is not known, returns C<undef>.
432
433 =cut
434
435 sub node_is_up($) {
436 ($NODE{$_[0]} or return)->{transport}
437 ? 1 : 0
438 }
439
440 =item known_nodes
441
442 Returns the node IDs of all nodes currently known to this node, including
443 itself and nodes not currently connected.
444
445 =cut
446
447 sub known_nodes {
448 map $_->{id}, values %NODE
449 }
450
451 =item up_nodes
452
453 Return the node IDs of all nodes that are currently connected (excluding
454 the node itself).
455
456 =cut
457
458 sub up_nodes {
459 map $_->{id}, grep $_->{transport}, values %NODE
460 }
461
462 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
463
464 Registers a callback that is called each time a node goes up (a connection
465 is established) or down (the connection is lost).
466
467 Node up messages can only be followed by node down messages for the same
468 node, and vice versa.
469
470 Note that monitoring a node is usually better done by monitoring it's node
471 port. This function is mainly of interest to modules that are concerned
472 about the network topology and low-level connection handling.
473
474 Callbacks I<must not> block and I<should not> send any messages.
475
476 The function returns an optional guard which can be used to unregister
477 the monitoring callback again.
478
479 =cut
480
481 our %MON_NODES;
482
483 sub mon_nodes($) {
484 my ($cb) = @_;
485
486 $MON_NODES{$cb+0} = $cb;
487
488 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
489 }
490
491 sub _inject_nodeevent($$;@) {
492 my ($node, $up, @reason) = @_;
493
494 for my $cb (values %MON_NODES) {
495 eval { $cb->($node->{id}, $up, @reason); 1 }
496 or $WARN->(1, $@);
497 }
498
499 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
500 }
501
502 #############################################################################
503 # self node code
504
505 our %node_req = (
506 # internal services
507
508 # monitoring
509 mon0 => sub { # stop monitoring a port
510 my $portid = shift;
511 my $node = $SRCNODE;
512 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
513 },
514 mon1 => sub { # start monitoring a port
515 my $portid = shift;
516 my $node = $SRCNODE;
517 Scalar::Util::weaken $node; #TODO# ugly
518 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
519 $node->send (["", kil => $portid, @_])
520 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disocnnect
521 });
522 },
523 kil => sub {
524 my $cbs = delete $SRCNODE->{lmon}{+shift}
525 or return;
526
527 $_->(@_) for @$cbs;
528 },
529
530 # "public" services - not actually public
531
532 # relay message to another node / generic echo
533 snd => \&snd,
534 snd_multiple => sub {
535 snd @$_ for @_
536 },
537
538 # informational
539 info => sub {
540 snd @_, $NODE;
541 },
542 known_nodes => sub {
543 snd @_, known_nodes;
544 },
545 up_nodes => sub {
546 snd @_, up_nodes;
547 },
548
549 # random utilities
550 eval => sub {
551 my @res = eval shift;
552 snd @_, "$@", @res if @_;
553 },
554 time => sub {
555 snd @_, AE::time;
556 },
557 devnull => sub {
558 #
559 },
560 "" => sub {
561 # empty messages are keepalives or similar devnull-applications
562 },
563 );
564
565 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
566 $PORT{""} = sub {
567 my $tag = shift;
568 eval { &{ $node_req{$tag} ||= load_func $tag } };
569 $WARN->(2, "error processing node message: $@") if $@;
570 };
571
572 =back
573
574 =head1 SEE ALSO
575
576 L<AnyEvent::MP>.
577
578 =head1 AUTHOR
579
580 Marc Lehmann <schmorp@schmorp.de>
581 http://home.schmorp.de/
582
583 =cut
584
585 1
586