ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.33
Committed: Sun Aug 30 19:49:47 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.32: +61 -13 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. is sought.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '0.9';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil
44 port_is_local
45 resolve_node initialise_node
46 known_nodes up_nodes mon_nodes node_is_known node_is_up
47 );
48
49 our $DEFAULT_PORT = "4040";
50
51 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
52 our $NETWORK_LATENCY = 3; # activity timeout
53 our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
54
55 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
56
57 This value is called with an error or warning message, when e.g. a
58 connection could not be created, authorisation failed and so on.
59
60 It I<must not> block or send messages -queue it and use an idle watcher if
61 you need to do any of these things.
62
63 C<$level> sould be C<0> for messages ot be logged always, C<1> for
64 unexpected messages and errors, C<2> for warnings, C<7> for messages about
65 node connectivity and services, C<8> for debugging messages and C<9> for
66 tracing messages.
67
68 The default simply logs the message to STDERR.
69
70 =cut
71
72 our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5;
73
74 our $WARN = sub {
75 return if $WARNLEVEL < $_[0];
76
77 my ($level, $msg) = @_;
78
79 $msg =~ s/\n$//;
80
81 printf STDERR "%s <%d> %s\n",
82 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
83 $level,
84 $msg;
85 };
86
87 =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}]
88
89 The maximum level at which warning messages will be printed to STDERR by
90 the default warn handler.
91
92 =cut
93
94 sub load_func($) {
95 my $func = $_[0];
96
97 unless (defined &$func) {
98 my $pkg = $func;
99 do {
100 $pkg =~ s/::[^:]+$//
101 or return sub { die "unable to resolve '$func'" };
102 eval "require $pkg";
103 } until defined &$func;
104 }
105
106 \&$func
107 }
108
109 sub nonce($) {
110 my $nonce;
111
112 if (open my $fh, "</dev/urandom") {
113 sysread $fh, $nonce, $_[0];
114 } else {
115 # shit...
116 our $nonce_init;
117 unless ($nonce_init++) {
118 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
119 }
120 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
121 }
122
123 $nonce
124 }
125
126 sub alnumbits($) {
127 my $data = $_[0];
128
129 if (eval "use Math::GMP 2.05; 1") {
130 $data = Math::GMP::get_str_gmp (
131 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
132 62
133 );
134 } else {
135 $data = MIME::Base64::encode_base64 $data, "";
136 $data =~ s/=//;
137 $data =~ s/x/x0/g;
138 $data =~ s/\//x1/g;
139 $data =~ s/\+/x2/g;
140 }
141
142 $data
143 }
144
145 sub gen_uniq {
146 alnumbits pack "wNa*", $$, time, nonce 2
147 }
148
149 our $CONFIG; # this node's configuration
150
151 our $RUNIQ = alnumbits nonce 16;; # remote uniq value
152 our $UNIQ = gen_uniq; # per-process/node unique cookie
153 our $NODE = "anon/$RUNIQ";
154 our $ID = "a";
155
156 our %NODE; # node id to transport mapping, or "undef", for local node
157 our (%PORT, %PORT_DATA); # local ports
158
159 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
160 our %LMON; # monitored _local_ ports
161
162 our %LISTENER;
163 our $LISTENER; # our listeners, as arrayref
164
165 our $SRCNODE; # holds the sending node during _inject
166
167 sub NODE() {
168 $NODE
169 }
170
171 sub node_of($) {
172 my ($node, undef) = split /#/, $_[0], 2;
173
174 $node
175 }
176
177 BEGIN {
178 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
179 ? sub () { 1 }
180 : sub () { 0 };
181 }
182
183 sub _inject {
184 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
185 &{ $PORT{+shift} or return };
186 }
187
188 # this function adds a node-ref, so you can send stuff to it
189 # it is basically the central routing component.
190 sub add_node {
191 my ($node) = @_;
192
193 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
194 }
195
196 sub snd(@) {
197 my ($nodeid, $portid) = split /#/, shift, 2;
198
199 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
200
201 ($NODE{$nodeid} || add_node $nodeid)
202 ->{send} (["$portid", @_]);
203 }
204
205 =item $is_local = port_is_local $port
206
207 Returns true iff the port is a local port.
208
209 =cut
210
211 sub port_is_local($) {
212 my ($nodeid, undef) = split /#/, $_[0], 2;
213
214 $NODE{$nodeid} == $NODE{""}
215 }
216
217 =item snd_to_func $node, $func, @args
218
219 Expects a node ID and a name of a function. Asynchronously tries to call
220 this function with the given arguments on that node.
221
222 This function can be used to implement C<spawn>-like interfaces.
223
224 =cut
225
226 sub snd_to_func($$;@) {
227 my $nodeid = shift;
228
229 ($NODE{$nodeid} || add_node $nodeid)
230 ->send (["", @_]);
231 }
232
233 =item snd_on $node, @msg
234
235 Executes C<snd> with the given C<@msg> (which must include the destination
236 port) on the given node.
237
238 =cut
239
240 sub snd_on($@) {
241 my $node = shift;
242 snd $node, snd => @_;
243 }
244
245 =item eval_on $node, $string[, @reply]
246
247 Evaluates the given string as Perl expression on the given node. When
248 @reply is specified, then it is used to construct a reply message with
249 C<"$@"> and any results from the eval appended.
250
251 =cut
252
253 sub eval_on($$;@) {
254 my $node = shift;
255 snd $node, eval => @_;
256 }
257
258 sub kil(@) {
259 my ($nodeid, $portid) = split /#/, shift, 2;
260
261 length $portid
262 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
263
264 ($NODE{$nodeid} || add_node $nodeid)
265 ->kill ("$portid", @_);
266 }
267
268 sub _nodename {
269 require POSIX;
270 (POSIX::uname ())[1]
271 }
272
273 sub _resolve($) {
274 my ($nodeid) = @_;
275
276 my $cv = AE::cv;
277 my @res;
278
279 $cv->begin (sub {
280 my %seen;
281 my @refs;
282 for (sort { $a->[0] <=> $b->[0] } @res) {
283 push @refs, $_->[1] unless $seen{$_->[1]}++
284 }
285 shift->send (@refs);
286 });
287
288 $nodeid = $DEFAULT_PORT unless length $nodeid;
289
290 my $idx;
291 for my $t (split /,/, $nodeid) {
292 my $pri = ++$idx;
293
294 $t = length $t ? _nodename . ":$t" : _nodename
295 if $t =~ /^\d*$/;
296
297 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
298 or Carp::croak "$t: unparsable transport descriptor";
299
300 $port = "0" if $port eq "*";
301
302 if ($host eq "*") {
303 $cv->begin;
304 # use fork_call, as Net::Interface is big, and we need it rarely.
305 require AnyEvent::Util;
306 AnyEvent::Util::fork_call (
307 sub {
308 my @addr;
309
310 require Net::Interface;
311
312 for my $if (Net::Interface->interfaces) {
313 # we statically lower-prioritise ipv6 here, TODO :()
314 for my $_ ($if->address (Net::Interface::AF_INET ())) {
315 next if /^\x7f/; # skip localhost etc.
316 push @addr, $_;
317 }
318 for ($if->address (Net::Interface::AF_INET6 ())) {
319 #next if $if->scope ($_) <= 2;
320 next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast
321 push @addr, $_;
322 }
323
324 }
325 @addr
326 }, sub {
327 for my $ip (@_) {
328 push @res, [
329 $pri += 1e-5,
330 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port
331 ];
332 }
333 $cv->end;
334 }
335 );
336 } else {
337 $cv->begin;
338 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
339 for (@_) {
340 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
341 push @res, [
342 $pri += 1e-5,
343 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
344 ];
345 }
346 $cv->end;
347 };
348 }
349 }
350
351 $cv->end;
352
353 $cv
354 }
355
356 sub initialise_node(;$%) {
357 my ($profile, %kv) = @_;
358
359 $profile = _nodename
360 unless defined $profile;
361
362 $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv;
363
364 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
365 $NODE = $node
366 unless $node eq "anon/";
367
368 $NODE{$NODE} = $NODE{""};
369 $NODE{$NODE}{id} = $NODE;
370
371 my $seeds = $CONFIG->{seeds};
372 my $binds = $CONFIG->{binds};
373
374 $binds ||= ["*"];
375
376 $WARN->(8, "node $NODE starting up.");
377
378 $LISTENER = [];
379 %LISTENER = ();
380
381 for (map _resolve $_, @$binds) {
382 for my $bind ($_->recv) {
383 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
384 or Carp::croak "$bind: unparsable local bind address";
385
386 $port =~ s/^.*=//; # remove bogus aemp=
387
388 my $listener = AnyEvent::MP::Transport::mp_server
389 $host,
390 $port,
391 prepare => sub {
392 my (undef, $host, $port) = @_;
393 $bind = AnyEvent::Socket::format_hostport $host, $port;
394 },
395 ;
396 $LISTENER{$bind} = $listener;
397 push @$LISTENER, $bind;
398 }
399 }
400
401 # the global service is mandatory currently
402 require AnyEvent::MP::Global;
403
404 # connect to all seednodes
405 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
406
407 for (@{ $CONFIG->{services} }) {
408 if (s/::$//) {
409 eval "require $_";
410 die $@ if $@;
411 } else {
412 (load_func $_)->();
413 }
414 }
415 }
416
417 #############################################################################
418 # node monitoring and info
419
420 =item node_is_known $nodeid
421
422 Returns true iff the given node is currently known to the system.
423
424 =cut
425
426 sub node_is_known($) {
427 exists $NODE{$_[0]}
428 }
429
430 =item node_is_up $nodeid
431
432 Returns true if the given node is "up", that is, the kernel thinks it has
433 a working connection to it.
434
435 If the node is known but not currently connected, returns C<0>. If the
436 node is not known, returns C<undef>.
437
438 =cut
439
440 sub node_is_up($) {
441 ($NODE{$_[0]} or return)->{transport}
442 ? 1 : 0
443 }
444
445 =item known_nodes
446
447 Returns the node IDs of all nodes currently known to this node, including
448 itself and nodes not currently connected.
449
450 =cut
451
452 sub known_nodes {
453 map $_->{id}, values %NODE
454 }
455
456 =item up_nodes
457
458 Return the node IDs of all nodes that are currently connected (excluding
459 the node itself).
460
461 =cut
462
463 sub up_nodes {
464 map $_->{id}, grep $_->{transport}, values %NODE
465 }
466
467 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
468
469 Registers a callback that is called each time a node goes up (a connection
470 is established) or down (the connection is lost).
471
472 Node up messages can only be followed by node down messages for the same
473 node, and vice versa.
474
475 Note that monitoring a node is usually better done by monitoring it's node
476 port. This function is mainly of interest to modules that are concerned
477 about the network topology and low-level connection handling.
478
479 Callbacks I<must not> block and I<should not> send any messages.
480
481 The function returns an optional guard which can be used to unregister
482 the monitoring callback again.
483
484 =cut
485
486 our %MON_NODES;
487
488 sub mon_nodes($) {
489 my ($cb) = @_;
490
491 $MON_NODES{$cb+0} = $cb;
492
493 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
494 }
495
496 sub _inject_nodeevent($$;@) {
497 my ($node, $up, @reason) = @_;
498
499 for my $cb (values %MON_NODES) {
500 eval { $cb->($node->{id}, $up, @reason); 1 }
501 or $WARN->(1, $@);
502 }
503
504 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
505 }
506
507 #############################################################################
508 # self node code
509
510 our %node_req = (
511 # internal services
512
513 # monitoring
514 mon0 => sub { # stop monitoring a port
515 my $portid = shift;
516 my $node = $SRCNODE;
517 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
518 },
519 mon1 => sub { # start monitoring a port
520 my $portid = shift;
521 my $node = $SRCNODE;
522 Scalar::Util::weaken $node; #TODO# ugly
523 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
524 $node->send (["", kil => $portid, @_])
525 if $node && $node->{transport}; #TODO# ugly, should use snd and remove-on-disocnnect
526 });
527 },
528 kil => sub {
529 my $cbs = delete $SRCNODE->{lmon}{+shift}
530 or return;
531
532 $_->(@_) for @$cbs;
533 },
534
535 # "public" services - not actually public
536
537 # relay message to another node / generic echo
538 snd => \&snd,
539 snd_multiple => sub {
540 snd @$_ for @_
541 },
542
543 # informational
544 info => sub {
545 snd @_, $NODE;
546 },
547 known_nodes => sub {
548 snd @_, known_nodes;
549 },
550 up_nodes => sub {
551 snd @_, up_nodes;
552 },
553
554 # random utilities
555 eval => sub {
556 my @res = eval shift;
557 snd @_, "$@", @res if @_;
558 },
559 time => sub {
560 snd @_, AE::time;
561 },
562 devnull => sub {
563 #
564 },
565 "" => sub {
566 # empty messages are keepalives or similar devnull-applications
567 },
568 );
569
570 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
571 $PORT{""} = sub {
572 my $tag = shift;
573 eval { &{ $node_req{$tag} ||= load_func $tag } };
574 $WARN->(2, "error processing node message: $@") if $@;
575 };
576
577 =back
578
579 =head1 SEE ALSO
580
581 L<AnyEvent::MP>.
582
583 =head1 AUTHOR
584
585 Marc Lehmann <schmorp@schmorp.de>
586 http://home.schmorp.de/
587
588 =cut
589
590 1
591