ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.26
Committed: Fri Aug 28 21:01:02 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.25: +6 -18 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. are needed.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '0.8';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil
44 port_is_local
45 resolve_node initialise_node
46 known_nodes up_nodes mon_nodes node_is_known node_is_up
47 );
48
49 our $DEFAULT_PORT = "4040";
50
51 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
52 our $NETWORK_LATENCY = 3; # activity timeout
53 our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
54
55 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
56
57 This value is called with an error or warning message, when e.g. a connection
58 could not be created, authorisation failed and so on.
59
60 C<$level> sould be C<0> for messages ot be logged always, C<1> for
61 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62 node connectivity and services, C<8> for debugging messages and C<9> for
63 tracing messages.
64
65 The default simply logs the message to STDERR.
66
67 =cut
68
69 our $WARN = sub {
70 my ($level, $msg) = @_;
71
72 $msg =~ s/\n$//;
73
74 printf STDERR "%s <%d> %s\n",
75 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
76 $level,
77 $msg;
78 };
79
80 sub load_func($) {
81 my $func = $_[0];
82
83 unless (defined &$func) {
84 my $pkg = $func;
85 do {
86 $pkg =~ s/::[^:]+$//
87 or return sub { die "unable to resolve '$func'" };
88 eval "require $pkg";
89 } until defined &$func;
90 }
91
92 \&$func
93 }
94
95 sub nonce($) {
96 my $nonce;
97
98 if (open my $fh, "</dev/urandom") {
99 sysread $fh, $nonce, $_[0];
100 } else {
101 # shit...
102 our $nonce_init;
103 unless ($nonce_init++) {
104 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
105 }
106
107 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
108 }
109
110 $nonce
111 }
112
113 sub alnumbits($) {
114 my $data = $_[0];
115
116 if (eval "use Math::GMP 2.05; 1") {
117 $data = Math::GMP::get_str_gmp (
118 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
119 62
120 );
121 } else {
122 $data = MIME::Base64::encode_base64 $data, "";
123 $data =~ s/=//;
124 $data =~ s/\//s/g;
125 $data =~ s/\+/p/g;
126 }
127
128 $data
129 }
130
131 sub gen_uniq {
132 alnumbits pack "wNa*", $$, time, nonce 2
133 }
134
135 =item $AnyEvent::MP::Kernel::PUBLIC
136
137 A boolean indicating whether this is a public node, which can create and
138 accept direct connections from other nodes.
139
140 =cut
141
142 our $CONFIG; # this node's configuration
143
144 our $RUNIQ = alnumbits nonce 16;; # remote uniq value
145 our $UNIQ = gen_uniq; # per-process/node unique cookie
146 our $NODE = "anon/$RUNIQ";
147 our $ID = "a";
148
149 our %NODE; # node id to transport mapping, or "undef", for local node
150 our (%PORT, %PORT_DATA); # local ports
151
152 our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb)
153 our %LMON; # monitored _local_ ports
154
155 our %LISTENER;
156 our $LISTENER; # our listeners, as arrayref
157
158 our $SRCNODE; # holds the sending node during _inject
159
160 sub NODE() {
161 $NODE
162 }
163
164 sub node_of($) {
165 my ($node, undef) = split /#/, $_[0], 2;
166
167 $node
168 }
169
170 BEGIN {
171 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
172 ? sub () { 1 }
173 : sub () { 0 };
174 }
175
176 sub _inject {
177 warn "RCV $SRCNODE->{id} -> " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
178 &{ $PORT{+shift} or return };
179 }
180
181 # this function adds a node-ref, so you can send stuff to it
182 # it is basically the central routing component.
183 sub add_node {
184 my ($node) = @_;
185
186 $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node
187 }
188
189 sub snd(@) {
190 my ($nodeid, $portid) = split /#/, shift, 2;
191
192 warn "SND $nodeid <- " . (JSON::XS->new->encode (\@_)) . "\n" if TRACE && @_;#d#
193
194 ($NODE{$nodeid} || add_node $nodeid)
195 ->{send} (["$portid", @_]);
196 }
197
198 =item $is_local = port_is_local $port
199
200 Returns true iff the port is a local port.
201
202 =cut
203
204 sub port_is_local($) {
205 my ($nodeid, undef) = split /#/, $_[0], 2;
206
207 $NODE{$nodeid} == $NODE{""}
208 }
209
210 =item snd_to_func $node, $func, @args
211
212 Expects a node ID and a name of a function. Asynchronously tries to call
213 this function with the given arguments on that node.
214
215 This function can be used to implement C<spawn>-like interfaces.
216
217 =cut
218
219 sub snd_to_func($$;@) {
220 my $nodeid = shift;
221
222 ($NODE{$nodeid} || add_node $nodeid)
223 ->send (["", @_]);
224 }
225
226 =item snd_on $node, @msg
227
228 Executes C<snd> with the given C<@msg> (which must include the destination
229 port) on the given node.
230
231 =cut
232
233 sub snd_on($@) {
234 my $node = shift;
235 snd $node, snd => @_;
236 }
237
238 =item eval_on $node, $string
239
240 Evaluates the given string as Perl expression on the given node.
241
242 =cut
243
244 sub eval_on($@) {
245 my $node = shift;
246 snd $node, eval => @_;
247 }
248
249 sub kil(@) {
250 my ($nodeid, $portid) = split /#/, shift, 2;
251
252 length $portid
253 or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught";
254
255 ($NODE{$nodeid} || add_node $nodeid)
256 ->kill ("$portid", @_);
257 }
258
259 sub _nodename {
260 require POSIX;
261 (POSIX::uname ())[1]
262 }
263
264 sub _resolve($) {
265 my ($nodeid) = @_;
266
267 my $cv = AE::cv;
268 my @res;
269
270 $cv->begin (sub {
271 my %seen;
272 my @refs;
273 for (sort { $a->[0] <=> $b->[0] } @res) {
274 push @refs, $_->[1] unless $seen{$_->[1]}++
275 }
276 shift->send (@refs);
277 });
278
279 $nodeid = $DEFAULT_PORT unless length $nodeid;
280
281 my $idx;
282 for my $t (split /,/, $nodeid) {
283 my $pri = ++$idx;
284
285 $t = length $t ? _nodename . ":$t" : _nodename
286 if $t =~ /^\d*$/;
287
288 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
289 or Carp::croak "$t: unparsable transport descriptor";
290
291 $cv->begin;
292 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
293 for (@_) {
294 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
295 push @res, [
296 $pri += 1e-5,
297 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
298 ];
299 }
300 $cv->end;
301 };
302 }
303
304 $cv->end;
305
306 $cv
307 }
308
309 sub initialise_node(;$%) {
310 my ($profile) = @_;
311
312 $profile = _nodename
313 unless defined $profile;
314
315 $CONFIG = AnyEvent::MP::Config::find_profile $profile;
316
317 my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile;
318 $NODE = $node
319 unless $node eq "anon/";
320
321 $NODE{$NODE} = $NODE{""};
322 $NODE{$NODE}{id} = $NODE;
323
324 my $seeds = $CONFIG->{seeds};
325 my $binds = $CONFIG->{binds};
326
327 $binds ||= [$NODE];
328
329 $WARN->(8, "node $NODE starting up.");
330
331 $LISTENER = [];
332 %LISTENER = ();
333
334 for (map _resolve $_, @$binds) {
335 for my $bind ($_->recv) {
336 my ($host, $port) = AnyEvent::Socket::parse_hostport $bind
337 or Carp::croak "$bind: unparsable local bind address";
338
339 $LISTENER{$bind} = AnyEvent::MP::Transport::mp_server $host, $port;
340 push @$LISTENER, $bind;
341 }
342 }
343
344 # the global service is mandatory currently
345 require AnyEvent::MP::Global;
346
347 # connect to all seednodes
348 AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds);
349
350 for (@{ $CONFIG->{services} }) {
351 if (s/::$//) {
352 eval "require $_";
353 die $@ if $@;
354 } else {
355 (load_func $_)->();
356 }
357 }
358 }
359
360 #############################################################################
361 # node monitoring and info
362
363 =item node_is_known $nodeid
364
365 Returns true iff the given node is currently known to the system.
366
367 =cut
368
369 sub node_is_known($) {
370 exists $NODE{$_[0]}
371 }
372
373 =item node_is_up $nodeid
374
375 Returns true if the given node is "up", that is, the kernel thinks it has
376 a working connection to it.
377
378 If the node is known but not currently connected, returns C<0>. If the
379 node is not known, returns C<undef>.
380
381 =cut
382
383 sub node_is_up($) {
384 ($NODE{$_[0]} or return)->{transport}
385 ? 1 : 0
386 }
387
388 =item known_nodes
389
390 Returns the node IDs of all nodes currently known to this node, including
391 itself and nodes not currently connected.
392
393 =cut
394
395 sub known_nodes {
396 map $_->{id}, values %NODE
397 }
398
399 =item up_nodes
400
401 Return the node IDs of all nodes that are currently connected (excluding
402 the node itself).
403
404 =cut
405
406 sub up_nodes {
407 map $_->{id}, grep $_->{transport}, values %NODE
408 }
409
410 =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason)
411
412 Registers a callback that is called each time a node goes up (connection
413 is established) or down (connection is lost).
414
415 Node up messages can only be followed by node down messages for the same
416 node, and vice versa.
417
418 The function returns an optional guard which can be used to de-register
419 the monitoring callback again.
420
421 =cut
422
423 our %MON_NODES;
424
425 sub mon_nodes($) {
426 my ($cb) = @_;
427
428 $MON_NODES{$cb+0} = $cb;
429
430 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
431 }
432
433 sub _inject_nodeevent($$;@) {
434 my ($node, $up, @reason) = @_;
435
436 for my $cb (values %MON_NODES) {
437 eval { $cb->($node->{id}, $up, @reason); 1 }
438 or $WARN->(1, $@);
439 }
440
441 $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)");
442 }
443
444 #############################################################################
445 # self node code
446
447 our %node_req = (
448 # internal services
449
450 # monitoring
451 mon0 => sub { # disable monitoring
452 my $portid = shift;
453 my $node = $SRCNODE;
454 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
455 },
456 mon1 => sub { # enable monitoring
457 my $portid = shift;
458 my $node = $SRCNODE;
459 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
460 $node->send (["", kil => $portid, @_]);
461 });
462 },
463 kil => sub {
464 my $cbs = delete $SRCNODE->{lmon}{+shift}
465 or return;
466
467 $_->(@_) for @$cbs;
468 },
469
470 # "public" services - not actually public
471
472 # relay message to another node / generic echo
473 snd => \&snd,
474 snd_multi => sub {
475 snd @$_ for @_
476 },
477
478 # informational
479 info => sub {
480 snd @_, $NODE;
481 },
482 known_nodes => sub {
483 snd @_, known_nodes;
484 },
485 up_nodes => sub {
486 snd @_, up_nodes;
487 },
488
489 # random garbage
490 eval => sub {
491 my @res = eval shift;
492 snd @_, "$@", @res if @_;
493 },
494 time => sub {
495 snd @_, AE::time;
496 },
497 devnull => sub {
498 #
499 },
500 "" => sub {
501 # empty messages are sent by monitoring
502 },
503 );
504
505 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
506 $PORT{""} = sub {
507 my $tag = shift;
508 eval { &{ $node_req{$tag} ||= load_func $tag } };
509 $WARN->(2, "error processing node message: $@") if $@;
510 };
511
512 =back
513
514 =head1 SEE ALSO
515
516 L<AnyEvent::MP>.
517
518 =head1 AUTHOR
519
520 Marc Lehmann <schmorp@schmorp.de>
521 http://home.schmorp.de/
522
523 =cut
524
525 1
526