ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.19
Committed: Wed Aug 19 05:57:14 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-0_8
Changes since 1.18: +1 -1 lines
Log Message:
0.8

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. are needed.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use POSIX ();
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AE ();
32
33 use AnyEvent::MP::Node;
34 use AnyEvent::MP::Transport;
35
36 use base "Exporter";
37
38 our $VERSION = '0.8';
39 our @EXPORT = qw(
40 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
41 connect_node add_node load_func snd_to_func snd_on eval_on
42
43 NODE $NODE node_of snd kil
44 port_is_local
45 resolve_node initialise_node
46 known_nodes up_nodes mon_nodes node_is_known node_is_up
47 );
48
49 our $DEFAULT_PORT = "4040";
50
51 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
52 our $NETWORK_LATENCY = 3; # activity timeout
53 our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
54
55 =item $AnyEvent::MP::Kernel::WARN->($level, $msg)
56
57 This value is called with an error or warning message, when e.g. a connection
58 could not be created, authorisation failed and so on.
59
60 C<$level> sould be C<0> for messages ot be logged always, C<1> for
61 unexpected messages and errors, C<2> for warnings, C<7> for messages about
62 node connectivity and services, C<8> for debugging messages and C<9> for
63 tracing messages.
64
65 The default simply logs the message to STDERR.
66
67 =cut
68
69 our $WARN = sub {
70 my ($level, $msg) = @_;
71
72 $msg =~ s/\n$//;
73
74 printf STDERR "%s <%d> %s\n",
75 (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time),
76 $level,
77 $msg;
78 };
79
80 sub load_func($) {
81 my $func = $_[0];
82
83 unless (defined &$func) {
84 my $pkg = $func;
85 do {
86 $pkg =~ s/::[^:]+$//
87 or return sub { die "unable to resolve '$func'" };
88 eval "require $pkg";
89 } until defined &$func;
90 }
91
92 \&$func
93 }
94
95 sub nonce($) {
96 my $nonce;
97
98 if (open my $fh, "</dev/urandom") {
99 sysread $fh, $nonce, $_[0];
100 } else {
101 # shit...
102 our $nonce_init;
103 unless ($nonce_init++) {
104 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
105 }
106
107 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
108 }
109
110 $nonce
111 }
112
113 sub asciibits($) {
114 my $data = $_[0];
115
116 if (eval "use Math::GMP 2.05; 1") {
117 $data = Math::GMP::get_str_gmp (
118 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
119 62
120 );
121 } else {
122 $data = MIME::Base64::encode_base64 $data, "";
123 $data =~ s/=//;
124 $data =~ s/\//s/g;
125 $data =~ s/\+/p/g;
126 }
127
128 $data
129 }
130
131 sub gen_uniq {
132 asciibits pack "wNa*", $$, time, nonce 2
133 }
134
135 =item $AnyEvent::MP::Kernel::PUBLIC
136
137 A boolean indicating whether this is a full/public node, which can create
138 and accept direct connections form othe rnodes.
139
140 =item $AnyEvent::MP::Kernel::SLAVE
141
142 A boolean indicating whether this node is a slave node, i.e. does most of it's
143 message sending/receiving through some master node.
144
145 =item $AnyEvent::MP::Kernel::MASTER
146
147 Defined only in slave mode, in which cas eit contains the noderef of the
148 master node.
149
150 =cut
151
152 our $PUBLIC = 0;
153 our $SLAVE = 0;
154 our $MASTER; # master noderef when $SLAVE
155
156 our $NODE = asciibits nonce 16;
157 our $RUNIQ = $NODE; # remote uniq value
158 our $UNIQ = gen_uniq; # per-process/node unique cookie
159 our $ID = "a";
160
161 our %NODE; # node id to transport mapping, or "undef", for local node
162 our (%PORT, %PORT_DATA); # local ports
163
164 our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
165 our %LMON; # monitored _local_ ports
166
167 our %LISTENER;
168
169 our $SRCNODE; # holds the sending node during _inject
170
171 sub NODE() {
172 $NODE
173 }
174
175 sub node_of($) {
176 my ($noderef, undef) = split /#/, $_[0], 2;
177
178 $noderef
179 }
180
181 BEGIN {
182 *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE}
183 ? sub () { 1 }
184 : sub () { 0 };
185 }
186
187 sub _inject {
188 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE && @_;#d#
189 &{ $PORT{+shift} or return };
190 }
191
192 sub add_node {
193 my ($noderef) = @_;
194
195 return $NODE{$noderef}
196 if exists $NODE{$noderef};
197
198 for (split /,/, $noderef) {
199 return $NODE{$noderef} = $NODE{$_}
200 if exists $NODE{$_};
201 }
202
203 # new node, check validity
204 my $node;
205
206 if ($noderef =~ /^slave\/.+$/) {
207 $node = new AnyEvent::MP::Node::Indirect $noderef;
208
209 } else {
210 for (split /,/, $noderef) {
211 my ($host, $port) = AnyEvent::Socket::parse_hostport $_
212 or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
213
214 $port > 0
215 or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
216
217 AnyEvent::Socket::parse_address $host
218 or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
219 }
220
221 $node = new AnyEvent::MP::Node::Direct $noderef;
222 }
223
224 $NODE{$_} = $node
225 for $noderef, split /,/, $noderef;
226
227 $node
228 }
229
230 sub connect_node {
231 &add_node->connect;
232 }
233
234 sub snd(@) {
235 my ($noderef, $portid) = split /#/, shift, 2;
236
237 warn "SND $noderef <- $portid @_\n" if TRACE;#d#
238
239 ($NODE{$noderef} || add_node $noderef)
240 ->{send} (["$portid", @_]);
241 }
242
243 =item $is_local = port_is_local $port
244
245 Returns true iff the port is a local port.
246
247 =cut
248
249 sub port_is_local($) {
250 my ($noderef, undef) = split /#/, $_[0], 2;
251
252 $NODE{$noderef} == $NODE{""}
253 }
254
255 =item snd_to_func $node, $func, @args
256
257 Expects a noderef and a name of a function. Asynchronously tries to call
258 this function with the given arguments on that node.
259
260 This fucntion can be used to implement C<spawn>-like interfaces.
261
262 =cut
263
264 sub snd_to_func($$;@) {
265 my $noderef = shift;
266
267 ($NODE{$noderef} || add_node $noderef)
268 ->send (["", @_]);
269 }
270
271 =item snd_on $node, @msg
272
273 Executes C<snd> with the given C<@msg> (which must include the destination
274 port) on the given node.
275
276 =cut
277
278 sub snd_on($@) {
279 my $node = shift;
280 snd $node, snd => @_;
281 }
282
283 =item eval_on $node, $string
284
285 Evaluates the given string as Perl expression on the given node.
286
287 =cut
288
289 sub eval_on($@) {
290 my $node = shift;
291 snd $node, eval => @_;
292 }
293
294 sub kil(@) {
295 my ($noderef, $portid) = split /#/, shift, 2;
296
297 length $portid
298 or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
299
300 ($NODE{$noderef} || add_node $noderef)
301 ->kill ("$portid", @_);
302 }
303
304 sub _nodename {
305 require POSIX;
306 (POSIX::uname ())[1]
307 }
308
309 sub resolve_node($) {
310 my ($noderef) = @_;
311
312 my $cv = AE::cv;
313 my @res;
314
315 $cv->begin (sub {
316 my %seen;
317 my @refs;
318 for (sort { $a->[0] <=> $b->[0] } @res) {
319 push @refs, $_->[1] unless $seen{$_->[1]}++
320 }
321 shift->send (join ",", @refs);
322 });
323
324 $noderef = $DEFAULT_PORT unless length $noderef;
325
326 my $idx;
327 for my $t (split /,/, $noderef) {
328 my $pri = ++$idx;
329
330 $t = length $t ? _nodename . ":$t" : _nodename
331 if $t =~ /^\d*$/;
332
333 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
334 or Carp::croak "$t: unparsable transport descriptor";
335
336 $cv->begin;
337 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
338 for (@_) {
339 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
340 push @res, [
341 $pri += 1e-5,
342 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
343 ];
344 }
345 $cv->end;
346 };
347 }
348
349 $cv->end;
350
351 $cv
352 }
353
354 sub _node_rename {
355 $NODE = shift;
356
357 my $self = $NODE{""};
358 $NODE{$NODE} = delete $NODE{$self->{noderef}};
359 $self->{noderef} = $NODE;
360 }
361
362 sub initialise_node(@) {
363 my ($noderef, @others) = @_;
364
365 my $profile = AnyEvent::MP::Config::find_profile
366 +(defined $noderef ? $noderef : _nodename);
367
368 $noderef = $profile->{noderef}
369 if exists $profile->{noderef};
370
371 push @others, @{ $profile->{seeds} };
372
373 if ($noderef =~ /^slave\/(.*)$/) {
374 $SLAVE = AE::cv;
375 my $name = $1;
376 $name = $NODE unless length $name;
377 $noderef = AE::cv;
378 $noderef->send ("slave/$name");
379
380 @others
381 or Carp::croak "seed nodes must be specified for slave nodes";
382
383 } else {
384 $PUBLIC = 1;
385 $noderef = resolve_node $noderef;
386 }
387
388 @others = map $_->recv, map +(resolve_node $_), @others;
389
390 _node_rename $noderef->recv;
391
392 for my $t (split /,/, $NODE) {
393 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
394
395 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
396 sub {
397 my ($tp) = @_;
398
399 # TODO: urgs
400 my $node = add_node $tp->{remote_node};
401 $node->{trial}{accept} = $tp;
402 },
403 ;
404 }
405
406 for (@others) {
407 my $node = add_node $_;
408 $node->{autoconnect} = 1;
409 $node->connect;
410 }
411
412 if ($SLAVE) {
413 my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
414 my $master = $SLAVE->recv;
415 $master
416 or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
417
418 $MASTER = $master->{noderef};
419 $master->{autoconnect} = 1;
420
421 (my $via = $MASTER) =~ s/,/!/g;
422
423 $NODE .= "\@$via";
424 _node_rename $NODE;
425
426 $_->send (["", iam => $NODE])
427 for values %NODE;
428
429 $SLAVE = 1;
430 }
431
432 for (@{ $profile->{services} }) {
433 if (s/::$//) {
434 eval "require $_";
435 die $@ if $@;
436 } else {
437 (load_func $_)->();
438 }
439 }
440 }
441
442 #############################################################################
443 # node monitoring and info
444
445 sub _uniq_nodes {
446 my %node;
447
448 @node{values %NODE} = values %NODE;
449
450 values %node;
451 }
452
453 =item node_is_known $noderef
454
455 Returns true iff the given node is currently known to the system.
456
457 =cut
458
459 sub node_is_known($) {
460 exists $NODE{$_[0]}
461 }
462
463 =item node_is_up $noderef
464
465 Returns true if the given node is "up", that is, the kernel thinks it has
466 a working connection to it.
467
468 If the node is known but not currently connected, returns C<0>. If the
469 node is not known, returns C<undef>.
470
471 =cut
472
473 sub node_is_up($) {
474 ($NODE{$_[0]} or return)->{transport}
475 ? 1 : 0
476 }
477
478 =item known_nodes
479
480 Returns the noderefs of all nodes connected to this node, including
481 itself.
482
483 =cut
484
485 sub known_nodes {
486 map $_->{noderef}, _uniq_nodes
487 }
488
489 =item up_nodes
490
491 Return the noderefs of all nodes that are currently connected (excluding
492 the node itself).
493
494 =cut
495
496 sub up_nodes {
497 map $_->{noderef}, grep $_->{transport}, _uniq_nodes
498 }
499
500 =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
501
502 Registers a callback that is called each time a node goes up (connection
503 is established) or down (connection is lost).
504
505 Node up messages can only be followed by node down messages for the same
506 node, and vice versa.
507
508 The function returns an optional guard which can be used to de-register
509 the monitoring callback again.
510
511 =cut
512
513 our %MON_NODES;
514
515 sub mon_nodes($) {
516 my ($cb) = @_;
517
518 $MON_NODES{$cb+0} = $cb;
519
520 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
521 }
522
523 sub _inject_nodeevent($$;@) {
524 my ($node, $up, @reason) = @_;
525
526 for my $cb (values %MON_NODES) {
527 eval { $cb->($node->{noderef}, $up, @reason); 1 }
528 or $WARN->(1, $@);
529 }
530
531 $WARN->(7, "$node->{noderef} is " . ($up ? "up" : "down") . " (@reason)");
532 }
533
534 #############################################################################
535 # self node code
536
537 our %node_req = (
538 # internal services
539
540 # monitoring
541 mon0 => sub { # disable monitoring
542 my $portid = shift;
543 my $node = $SRCNODE;
544 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
545 },
546 mon1 => sub { # enable monitoring
547 my $portid = shift;
548 my $node = $SRCNODE;
549 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
550 $node->send (["", kil => $portid, @_]);
551 });
552 },
553 kil => sub {
554 my $cbs = delete $SRCNODE->{lmon}{+shift}
555 or return;
556
557 $_->(@_) for @$cbs;
558 },
559 # node changed its name (for slave nodes)
560 iam => sub {
561 # get rid of bogus slave/xxx name, hopefully
562 delete $NODE{$SRCNODE->{noderef}};
563
564 # change noderef
565 $SRCNODE->{noderef} = $_[0];
566
567 # anchor
568 $NODE{$_[0]} = $SRCNODE;
569 },
570
571 # "public" services - not actually public
572
573 # relay message to another node / generic echo
574 snd => \&snd,
575 snd_multi => sub {
576 snd @$_ for @_
577 },
578
579 # informational
580 info => sub {
581 snd @_, $NODE;
582 },
583 known_nodes => sub {
584 snd @_, known_nodes;
585 },
586 up_nodes => sub {
587 snd @_, up_nodes;
588 },
589
590 # random garbage
591 eval => sub {
592 my @res = eval shift;
593 snd @_, "$@", @res if @_;
594 },
595 time => sub {
596 snd @_, AE::time;
597 },
598 devnull => sub {
599 #
600 },
601 "" => sub {
602 # empty messages are sent by monitoring
603 },
604 );
605
606 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE;
607 $PORT{""} = sub {
608 my $tag = shift;
609 eval { &{ $node_req{$tag} ||= load_func $tag } };
610 $WARN->(2, "error processing node message: $@") if $@;
611 };
612
613 =back
614
615 =head1 SEE ALSO
616
617 L<AnyEvent::MP>.
618
619 =head1 AUTHOR
620
621 Marc Lehmann <schmorp@schmorp.de>
622 http://home.schmorp.de/
623
624 =cut
625
626 1
627