ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP/Kernel.pm
Revision: 1.17
Committed: Sun Aug 16 02:55:17 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
Changes since 1.16: +20 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. are needed.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '0.7';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 connect_node add_node load_func snd_to_func
42
43 NODE $NODE node_of snd kil
44 port_is_local
45 resolve_node initialise_node
46 known_nodes up_nodes mon_nodes node_is_known node_is_up
47 );
48
49 our $DEFAULT_PORT = "4040";
50
51 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
52 our $NETWORK_LATENCY = 3; # activity timeout
53 our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
54
55 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
56
57 This value is called with an error or warning message, when e.g. a connection
58 could not be created, authorisation failed and so on.
59
60 C<$level> sould be C<0> for messages ot be logged always, C<1> for
61 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62 node connectivity and services, C<8> for debugging messages and C<9> for
63 tracing messages.
64
65 The default simply logs the message to STDERR.
66
67 =cut
68
69 our $WARN = sub {
70 my ($level, $msg) = @_;
71
72 $msg =~ s/\n$//;
73
74 printf STDERR "%s <%d> %s\n",
75 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
76 $level,
77 $msg;
78 };
79
80 sub load_func($) {
81 my $func = $_[0];
82
83 unless (defined &$func) {
84 my $pkg = $func;
85 do {
86 $pkg =~ s/::[^:]+$//
87 or return sub { die "unable to resolve '$func'" };
88 eval "require $pkg";
89 } until defined &$func;
90 }
91
92 \&$func
93 }
94
95 sub nonce($) {
96 my $nonce;
97
98 if (open my $fh, "</dev/urandom") {
99 sysread $fh, $nonce, $_[0];
100 } else {
101 # shit...
102 our $nonce_init;
103 unless ($nonce_init++) {
104 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
105 }
106
107 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
108 }
109
110 $nonce
111 }
112
113 sub asciibits($) {
114 my $data = $_[0];
115
116 if (eval "use Math::GMP 2.05; 1") {
117 $data = Math::GMP::get_str_gmp (
118 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
119 62
120 );
121 } else {
122 $data = MIME::Base64::encode_base64 $data, "";
123 $data =~ s/=//;
124 $data =~ s/\//s/g;
125 $data =~ s/\+/p/g;
126 }
127
128 $data
129 }
130
131 sub gen_uniq {
132 asciibits pack "wNa*", $$, time, nonce 2
133 }
134
135 =item $AnyEvent::MP::Kernel::PUBLIC
136
137 A boolean indicating whether this is a full/public node, which can create
138 and accept direct connections form othe rnodes.
139
140 =item $AnyEvent::MP::Kernel::SLAVE
141
142 A boolean indicating whether this node is a slave node, i.e. does most of it's
143 message sending/receiving through some master node.
144
145 =item $AnyEvent::MP::Kernel::MASTER
146
147 Defined only in slave mode, in which cas eit contains the noderef of the
148 master node.
149
150 =cut
151
152 our $PUBLIC = 0;
153 our $SLAVE = 0;
154 our $MASTER; # master noderef when $SLAVE
155
156 our $NODE = asciibits nonce 16;
157 our $RUNIQ = $NODE; # remote uniq value
158 our $UNIQ = gen_uniq; # per-process/node unique cookie
159 our $ID = "a";
160
161 our %NODE; # node id to transport mapping, or "undef", for local node
162 our (%PORT, %PORT_DATA); # local ports
163
164 our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
165 our %LMON; # monitored _local_ ports
166
167 our %LISTENER;
168
169 our $SRCNODE; # holds the sending node during _inject
170
171 sub NODE() {
172 $NODE
173 }
174
175 sub node_of($) {
176 my ($noderef, undef) = split /#/, $_[0], 2;
177
178 $noderef
179 }
180
181 BEGIN {
182 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
183 ? sub () { 1 }
184 : sub () { 0 };
185 }
186
187 sub _inject {
188 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
189 &{ $PORT{+shift} or return };
190 }
191
192 sub add_node {
193 my ($noderef) = @_;
194
195 return $NODE{$noderef}
196 if exists $NODE{$noderef};
197
198 for (split /,/, $noderef) {
199 return $NODE{$noderef} = $NODE{$_}
200 if exists $NODE{$_};
201 }
202
203 # new node, check validity
204 my $node;
205
206 if ($noderef =~ /^slave\/.+$/) {
207 $node = new AnyEvent::MP::Node::Indirect $noderef;
208
209 } else {
210 for (split /,/, $noderef) {
211 my ($host, $port) = AnyEvent::Socket::parse_hostport $_
212 or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
213
214 $port > 0
215 or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
216
217 AnyEvent::Socket::parse_address $host
218 or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
219 }
220
221 $node = new AnyEvent::MP::Node::Direct $noderef;
222 }
223
224 $NODE{$_} = $node
225 for $noderef, split /,/, $noderef;
226
227 $node
228 }
229
230 sub connect_node {
231 &add_node->connect;
232 }
233
234 sub snd(@) {
235 my ($noderef, $portid) = split /#/, shift, 2;
236
237 warn "SND $noderef <- $portid @_\n" if TRACE;#d#
238
239 ($NODE{$noderef} || add_node $noderef)
240 ->{send} (["$portid", @_]);
241 }
242
243 =item $is_local = port_is_local $port
244
245 Returns true iff the port is a local port.
246
247 =cut
248
249 sub port_is_local($) {
250 my ($noderef, undef) = split /#/, $_[0], 2;
251
252 $NODE{$noderef} == $NODE{""}
253 }
254
255 =item snd_to_func $noderef, $func, @args
256
257 Expects a noderef and a name of a function. Asynchronously tries to call
258 this function with the given arguments on that node.
259
260 This fucntion can be used to implement C<spawn>-like interfaces.
261
262 =cut
263
264 sub snd_to_func {
265 my $noderef = shift;
266
267 ($NODE{$noderef} || add_node $noderef)
268 ->send (["", @_]);
269 }
270
271 sub kil(@) {
272 my ($noderef, $portid) = split /#/, shift, 2;
273
274 length $portid
275 or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
276
277 ($NODE{$noderef} || add_node $noderef)
278 ->kill ("$portid", @_);
279 }
280
281 sub _nodename {
282 require POSIX;
283 (POSIX::uname ())[1]
284 }
285
286 sub resolve_node($) {
287 my ($noderef) = @_;
288
289 my $cv = AE::cv;
290 my @res;
291
292 $cv->begin (sub {
293 my %seen;
294 my @refs;
295 for (sort { $a->[0] <=> $b->[0] } @res) {
296 push @refs, $_->[1] unless $seen{$_->[1]}++
297 }
298 shift->send (join ",", @refs);
299 });
300
301 $noderef = $DEFAULT_PORT unless length $noderef;
302
303 my $idx;
304 for my $t (split /,/, $noderef) {
305 my $pri = ++$idx;
306
307 $t = length $t ? _nodename . ":$t" : _nodename
308 if $t =~ /^\d*$/;
309
310 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
311 or Carp::croak "$t: unparsable transport descriptor";
312
313 $cv->begin;
314 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
315 for (@_) {
316 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
317 push @res, [
318 $pri += 1e-5,
319 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
320 ];
321 }
322 $cv->end;
323 };
324 }
325
326 $cv->end;
327
328 $cv
329 }
330
331 sub initialise_node(@) {
332 my ($noderef, @others) = @_;
333
334 my $profile = AnyEvent::MP::Config::find_profile
335 +(defined $noderef ? $noderef : _nodename);
336
337 $noderef = $profile->{noderef}
338 if exists $profile->{noderef};
339
340 push @others, @{ $profile->{seeds} };
341
342 if ($noderef =~ /^slave\/(.*)$/) {
343 $SLAVE = AE::cv;
344 my $name = $1;
345 $name = $NODE unless length $name;
346 $noderef = AE::cv;
347 $noderef->send ("slave/$name");
348
349 @others
350 or Carp::croak "seed nodes must be specified for slave nodes";
351
352 } else {
353 $PUBLIC = 1;
354 $noderef = resolve_node $noderef;
355 }
356
357 @others = map $_->recv, map +(resolve_node $_), @others;
358
359 $NODE = $noderef->recv;
360
361 $NODE{$NODE} = $NODE{""};
362
363 for my $t (split /,/, $NODE) {
364 $NODE{$t} = $NODE{""};
365
366 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
367
368 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
369 sub {
370 my ($tp) = @_;
371
372 # TODO: urgs
373 my $node = add_node $tp->{remote_node};
374 $node->{trial}{accept} = $tp;
375 },
376 ;
377 }
378
379 for (@others) {
380 my $node = add_node $_;
381 $node->{autoconnect} = 1;
382 $node->connect;
383 }
384
385 if ($SLAVE) {
386 my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
387 my $master = $SLAVE->recv;
388 $master
389 or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
390
391 $MASTER = $master->{noderef};
392 $master->{autoconnect} = 1;
393
394 (my $via = $MASTER) =~ s/,/!/g;
395
396 $NODE .= "\@$via";
397 $NODE{$NODE} = $NODE{""};
398
399 $_->send (["", iam => $NODE])
400 for values %NODE;
401
402 $SLAVE = 1;
403 }
404
405 for (@{ $profile->{services} }) {
406 if (s/::$//) {
407 eval "require $_";
408 die $@ if $@;
409 } else {
410 (load_func $_)->();
411 }
412 }
413 }
414
415 #############################################################################
416 # node monitoring and info
417
418 sub _uniq_nodes {
419 my %node;
420
421 @node{values %NODE} = values %NODE;
422
423 values %node;
424 }
425
426 =item node_is_known $noderef
427
428 Returns true iff the given node is currently known to the system.
429
430 =cut
431
432 sub node_is_known($) {
433 exists $NODE{$_[0]}
434 }
435
436 =item node_is_up $noderef
437
438 Returns true if the given node is "up", that is, the kernel thinks it has
439 a working connection to it.
440
441 If the node is known but not currently connected, returns C<0>. If the
442 node is not known, returns C<undef>.
443
444 =cut
445
446 sub node_is_up($) {
447 ($NODE{$_[0]} or return)->{transport}
448 ? 1 : 0
449 }
450
451 =item known_nodes
452
453 Returns the noderefs of all nodes connected to this node.
454
455 =cut
456
457 sub known_nodes {
458 map $_->{noderef}, _uniq_nodes
459 }
460
461 =item up_nodes
462
463 Return the noderefs of all nodes that are currently connected (excluding
464 the node itself).
465
466 =cut
467
468 sub up_nodes {
469 map $_->{noderef}, grep $_->{transport}, _uniq_nodes
470 }
471
472 =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
473
474 Registers a callback that is called each time a node goes up (connection
475 is established) or down (connection is lost).
476
477 Node up messages can only be followed by node down messages for the same
478 node, and vice versa.
479
480 The function returns an optional guard which can be used to de-register
481 the monitoring callback again.
482
483 =cut
484
485 our %MON_NODES;
486
487 sub mon_nodes($) {
488 my ($cb) = @_;
489
490 $MON_NODES{$cb+0} = $cb;
491
492 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
493 }
494
495 sub _inject_nodeevent($$;@) {
496 my ($node, $up, @reason) = @_;
497
498 for my $cb (values %MON_NODES) {
499 eval { $cb->($node->{noderef}, $up, @reason); 1 }
500 or $WARN->(1, $@);
501 }
502
503 $WARN->(7, "$node->{noderef} is " . ($up ? "up" : "down") . " (@reason)");
504 }
505
506 #############################################################################
507 # self node code
508
509 our %node_req = (
510 # internal services
511
512 # monitoring
513 mon0 => sub { # disable monitoring
514 my $portid = shift;
515 my $node = $SRCNODE;
516 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
517 },
518 mon1 => sub { # enable monitoring
519 my $portid = shift;
520 my $node = $SRCNODE;
521 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
522 $node->send (["", kil => $portid, @_]);
523 });
524 },
525 kil => sub {
526 my $cbs = delete $SRCNODE->{lmon}{+shift}
527 or return;
528
529 $_->(@_) for @$cbs;
530 },
531 # node changed its name (for slave nodes)
532 iam => sub {
533 # get rid of bogus slave/xxx name, hopefully
534 delete $NODE{$SRCNODE->{noderef}};
535
536 # change noderef
537 $SRCNODE->{noderef} = $_[0];
538
539 # anchor
540 $NODE{$_[0]} = $SRCNODE;
541 },
542
543 # public services
544
545 # relay message to another node / generic echo
546 snd => \&snd,
547 snd_multi => sub {
548 snd @$_ for @_
549 },
550
551 # informational
552 info => sub {
553 snd @_, $NODE;
554 },
555 known_nodes => sub {
556 snd @_, known_nodes;
557 },
558 up_nodes => sub {
559 snd @_, up_nodes;
560 },
561
562 # random garbage
563 eval => sub {
564 my @res = eval shift;
565 snd @_, "$@", @res if @_;
566 },
567 time => sub {
568 snd @_, AE::time;
569 },
570 devnull => sub {
571 #
572 },
573 "" => sub {
574 # empty messages are sent by monitoring
575 },
576 );
577
578 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
579 $PORT{""} = sub {
580 my $tag = shift;
581 eval { &{ $node_req{$tag} ||= load_func $tag } };
582 $WARN->(2, "error processing node message: $@") if $@;
583 };
584
585 =back
586
587 =head1 SEE ALSO
588
589 L<AnyEvent::MP>.
590
591 =head1 AUTHOR
592
593 Marc Lehmann <schmorp@schmorp.de>
594 http://home.schmorp.de/
595
596 =cut
597
598 1
599