ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.16
Committed: Sat Aug 15 15:08:07 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.15: +20 -9 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. are needed.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '0.7';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 connect_node add_node load_func snd_to_func
42
43 NODE $NODE node_of snd kil
44 resolve_node initialise_node
45 known_nodes up_nodes mon_nodes node_is_known node_is_up
46 );
47
48 our $DEFAULT_PORT = "4040";
49
50 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
51 our $NETWORK_LATENCY = 3; # activity timeout
52 our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
53
54 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
55
56 This value is called with an error or warning message, when e.g. a connection
57 could not be created, authorisation failed and so on.
58
59 C<$level> sould be C<0> for messages ot be logged always, C<1> for
60 unexpected messages and errors, C<2> for warnings, C<7> for messages about
61 node connectivity and services, C<8> for debugging messages and C<9> for
62 tracing messages.
63
64 The default simply logs the message to STDERR.
65
66 =cut
67
68 our $WARN = sub {
69 my ($level, $msg) = @_;
70
71 $msg =~ s/\n$//;
72
73 printf STDERR "%s <%d> %s\n",
74 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
75 $level,
76 $msg;
77 };
78
79 sub load_func($) {
80 my $func = $_[0];
81
82 unless (defined &$func) {
83 my $pkg = $func;
84 do {
85 $pkg =~ s/::[^:]+$//
86 or return sub { die "unable to resolve '$func'" };
87 eval "require $pkg";
88 } until defined &$func;
89 }
90
91 \&$func
92 }
93
94 sub nonce($) {
95 my $nonce;
96
97 if (open my $fh, "</dev/urandom") {
98 sysread $fh, $nonce, $_[0];
99 } else {
100 # shit...
101 our $nonce_init;
102 unless ($nonce_init++) {
103 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
104 }
105
106 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
107 }
108
109 $nonce
110 }
111
112 sub asciibits($) {
113 my $data = $_[0];
114
115 if (eval "use Math::GMP 2.05; 1") {
116 $data = Math::GMP::get_str_gmp (
117 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
118 62
119 );
120 } else {
121 $data = MIME::Base64::encode_base64 $data, "";
122 $data =~ s/=//;
123 $data =~ s/\//s/g;
124 $data =~ s/\+/p/g;
125 }
126
127 $data
128 }
129
130 sub gen_uniq {
131 asciibits pack "wNa*", $$, time, nonce 2
132 }
133
134 =item $AnyEvent::MP::Kernel::PUBLIC
135
136 A boolean indicating whether this is a full/public node, which can create
137 and accept direct connections form othe rnodes.
138
139 =item $AnyEvent::MP::Kernel::SLAVE
140
141 A boolean indicating whether this node is a slave node, i.e. does most of it's
142 message sending/receiving through some master node.
143
144 =item $AnyEvent::MP::Kernel::MASTER
145
146 Defined only in slave mode, in which cas eit contains the noderef of the
147 master node.
148
149 =cut
150
151 our $PUBLIC = 0;
152 our $SLAVE = 0;
153 our $MASTER; # master noderef when $SLAVE
154
155 our $NODE = asciibits nonce 16;
156 our $RUNIQ = $NODE; # remote uniq value
157 our $UNIQ = gen_uniq; # per-process/node unique cookie
158 our $ID = "a";
159
160 our %NODE; # node id to transport mapping, or "undef", for local node
161 our (%PORT, %PORT_DATA); # local ports
162
163 our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
164 our %LMON; # monitored _local_ ports
165
166 our %LISTENER;
167
168 our $SRCNODE; # holds the sending node during _inject
169
170 sub NODE() {
171 $NODE
172 }
173
174 sub node_of($) {
175 my ($noderef, undef) = split /#/, $_[0], 2;
176
177 $noderef
178 }
179
180 sub TRACE() { 0 }
181
182 sub _inject {
183 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
184 &{ $PORT{+shift} or return };
185 }
186
187 sub add_node {
188 my ($noderef) = @_;
189
190 return $NODE{$noderef}
191 if exists $NODE{$noderef};
192
193 for (split /,/, $noderef) {
194 return $NODE{$noderef} = $NODE{$_}
195 if exists $NODE{$_};
196 }
197
198 # new node, check validity
199 my $node;
200
201 if ($noderef =~ /^slave\/.+$/) {
202 $node = new AnyEvent::MP::Node::Indirect $noderef;
203
204 } else {
205 for (split /,/, $noderef) {
206 my ($host, $port) = AnyEvent::Socket::parse_hostport $_
207 or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
208
209 $port > 0
210 or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
211
212 AnyEvent::Socket::parse_address $host
213 or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
214 }
215
216 $node = new AnyEvent::MP::Node::Direct $noderef;
217 }
218
219 $NODE{$_} = $node
220 for $noderef, split /,/, $noderef;
221
222 $node
223 }
224
225 sub connect_node {
226 &add_node->connect;
227 }
228
229 sub snd(@) {
230 my ($noderef, $portid) = split /#/, shift, 2;
231
232 warn "SND $noderef <- $portid @_\n" if TRACE;#d#
233
234 ($NODE{$noderef} || add_node $noderef)
235 ->{send} (["$portid", @_]);
236 }
237
238 =item snd_to_func $noderef, $func, @args
239
240 Expects a noderef and a name of a function. Asynchronously tries to call
241 this function with the given arguments on that node.
242
243 This fucntion can be used to implement C<spawn>-like interfaces.
244
245 =cut
246
247 sub snd_to_func {
248 my $noderef = shift;
249
250 ($NODE{$noderef} || add_node $noderef)
251 ->send (["", @_]);
252 }
253
254 sub kil(@) {
255 my ($noderef, $portid) = split /#/, shift, 2;
256
257 length $portid
258 or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
259
260 ($NODE{$noderef} || add_node $noderef)
261 ->kill ("$portid", @_);
262 }
263
264 sub _nodename {
265 require POSIX;
266 (POSIX::uname ())[1]
267 }
268
269 sub resolve_node($) {
270 my ($noderef) = @_;
271
272 my $cv = AE::cv;
273 my @res;
274
275 $cv->begin (sub {
276 my %seen;
277 my @refs;
278 for (sort { $a->[0] <=> $b->[0] } @res) {
279 push @refs, $_->[1] unless $seen{$_->[1]}++
280 }
281 shift->send (join ",", @refs);
282 });
283
284 $noderef = $DEFAULT_PORT unless length $noderef;
285
286 my $idx;
287 for my $t (split /,/, $noderef) {
288 my $pri = ++$idx;
289
290 $t = length $t ? _nodename . ":$t" : _nodename
291 if $t =~ /^\d*$/;
292
293 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
294 or Carp::croak "$t: unparsable transport descriptor";
295
296 $cv->begin;
297 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
298 for (@_) {
299 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
300 push @res, [
301 $pri += 1e-5,
302 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
303 ];
304 }
305 $cv->end;
306 };
307 }
308
309 $cv->end;
310
311 $cv
312 }
313
314 sub initialise_node(@) {
315 my ($noderef, @others) = @_;
316
317 my $profile = AnyEvent::MP::Config::find_profile
318 +(defined $noderef ? $noderef : _nodename);
319
320 $noderef = $profile->{noderef}
321 if exists $profile->{noderef};
322
323 push @others, @{ $profile->{seeds} };
324
325 if ($noderef =~ /^slave\/(.*)$/) {
326 $SLAVE = AE::cv;
327 my $name = $1;
328 $name = $NODE unless length $name;
329 $noderef = AE::cv;
330 $noderef->send ("slave/$name");
331
332 @others
333 or Carp::croak "seed nodes must be specified for slave nodes";
334
335 } else {
336 $PUBLIC = 1;
337 $noderef = resolve_node $noderef;
338 }
339
340 @others = map $_->recv, map +(resolve_node $_), @others;
341
342 $NODE = $noderef->recv;
343
344 for my $t (split /,/, $NODE) {
345 $NODE{$t} = $NODE{""};
346
347 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
348
349 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
350 sub {
351 my ($tp) = @_;
352
353 # TODO: urgs
354 my $node = add_node $tp->{remote_node};
355 $node->{trial}{accept} = $tp;
356 },
357 ;
358 }
359
360 for (@others) {
361 my $node = add_node $_;
362 $node->{autoconnect} = 1;
363 $node->connect;
364 }
365
366 if ($SLAVE) {
367 my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
368 my $master = $SLAVE->recv;
369 $master
370 or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
371
372 $MASTER = $master->{noderef};
373 $master->{autoconnect} = 1;
374
375 (my $via = $MASTER) =~ s/,/!/g;
376
377 $NODE .= "\@$via";
378 $NODE{$NODE} = $NODE{""};
379
380 $_->send (["", iam => $NODE])
381 for values %NODE;
382
383 $SLAVE = 1;
384 }
385
386 for (@{ $profile->{services} }) {
387 if (s/::$//) {
388 eval "require $_";
389 die $@ if $@;
390 } else {
391 (load_func $_)->();
392 }
393 }
394 }
395
396 #############################################################################
397 # node monitoring and info
398
399 sub _uniq_nodes {
400 my %node;
401
402 @node{values %NODE} = values %NODE;
403
404 values %node;
405 }
406
407 =item node_is_known $noderef
408
409 Returns true iff the given node is currently known to the system.
410
411 =cut
412
413 sub node_is_known($) {
414 exists $NODE{$_[0]}
415 }
416
417 =item node_is_up $noderef
418
419 Returns true if the given node is "up", that is, the kernel thinks it has
420 a working connection to it.
421
422 If the node is known but not currently connected, returns C<0>. If the
423 node is not known, returns C<undef>.
424
425 =cut
426
427 sub node_is_up($) {
428 ($NODE{$_[0]} or return)->{transport}
429 ? 1 : 0
430 }
431
432 =item known_nodes
433
434 Returns the noderefs of all nodes connected to this node.
435
436 =cut
437
438 sub known_nodes {
439 map $_->{noderef}, _uniq_nodes
440 }
441
442 =item up_nodes
443
444 Return the noderefs of all nodes that are currently connected (excluding
445 the node itself).
446
447 =cut
448
449 sub up_nodes {
450 map $_->{noderef}, grep $_->{transport}, _uniq_nodes
451 }
452
453 =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
454
455 Registers a callback that is called each time a node goes up (connection
456 is established) or down (connection is lost).
457
458 Node up messages can only be followed by node down messages for the same
459 node, and vice versa.
460
461 The function returns an optional guard which can be used to de-register
462 the monitoring callback again.
463
464 =cut
465
466 our %MON_NODES;
467
468 sub mon_nodes($) {
469 my ($cb) = @_;
470
471 $MON_NODES{$cb+0} = $cb;
472
473 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
474 }
475
476 sub _inject_nodeevent($$;@) {
477 my ($node, $up, @reason) = @_;
478
479 for my $cb (values %MON_NODES) {
480 eval { $cb->($node->{noderef}, $up, @reason); 1 }
481 or $WARN->(1, $@);
482 }
483
484 $WARN->(7, "$node->{noderef} is " . ($up ? "up" : "down") . " (@reason)");
485 }
486
487 #############################################################################
488 # self node code
489
490 our %node_req = (
491 # internal services
492
493 # monitoring
494 mon0 => sub { # disable monitoring
495 my $portid = shift;
496 my $node = $SRCNODE;
497 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
498 },
499 mon1 => sub { # enable monitoring
500 my $portid = shift;
501 my $node = $SRCNODE;
502 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
503 $node->send (["", kil => $portid, @_]);
504 });
505 },
506 kil => sub {
507 my $cbs = delete $SRCNODE->{lmon}{+shift}
508 or return;
509
510 $_->(@_) for @$cbs;
511 },
512 # node changed its name (for slave nodes)
513 iam => sub {
514 # get rid of bogus slave/xxx name, hopefully
515 delete $NODE{$SRCNODE->{noderef}};
516
517 # change noderef
518 $SRCNODE->{noderef} = $_[0];
519
520 # anchor
521 $NODE{$_[0]} = $SRCNODE;
522 },
523
524 # public services
525
526 # relay message to another node / generic echo
527 snd => \&snd,
528 snd_multi => sub {
529 snd @$_ for @_
530 },
531
532 # informational
533 info => sub {
534 snd @_, $NODE;
535 },
536 known_nodes => sub {
537 snd @_, known_nodes;
538 },
539 up_nodes => sub {
540 snd @_, up_nodes;
541 },
542
543 # random garbage
544 eval => sub {
545 my @res = eval shift;
546 snd @_, "$@", @res if @_;
547 },
548 time => sub {
549 snd @_, AE::time;
550 },
551 devnull => sub {
552 #
553 },
554 "" => sub {
555 # empty messages are sent by monitoring
556 },
557 );
558
559 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
560 $PORT{""} = sub {
561 my $tag = shift;
562 eval { &{ $node_req{$tag} ||= load_func $tag } };
563 $WARN->(2, "error processing node message: $@") if $@;
564 };
565
566 =back
567
568 =head1 SEE ALSO
569
570 L<AnyEvent::MP>.
571
572 =head1 AUTHOR
573
574 Marc Lehmann <schmorp@schmorp.de>
575 http://home.schmorp.de/
576
577 =cut
578
579 1
580