ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.14
Committed: Sat Aug 15 04:12:38 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.13: +5 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. are needed.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use Carp ();
28 use MIME::Base64 ();
29
30 use AE ();
31
32 use AnyEvent::MP::Node;
33 use AnyEvent::MP::Transport;
34
35 use base "Exporter";
36
37 our $VERSION = '0.7';
38 our @EXPORT = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 connect_node add_node load_func snd_to_func
41
42 NODE $NODE node_of snd kil
43 resolve_node initialise_node
44 known_nodes up_nodes mon_nodes node_is_known node_is_up
45 );
46
47 our $DEFAULT_PORT = "4040";
48
49 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
50 our $NETWORK_LATENCY = 3; # activity timeout
51 our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
52
53 =item $AnyEvent::MP::Kernel::WARN
54
55 This value is called with an error or warning message, when e.g. a connection
56 could not be created, authorisation failed and so on.
57
58 The default simply logs the message to STDERR.
59
60 =cut
61
62 our $WARN = sub {
63 my $msg = $_[0];
64 $msg =~ s/\n$//;
65 warn "$msg\n";
66 };
67
68 sub load_func($) {
69 my $func = $_[0];
70
71 unless (defined &$func) {
72 my $pkg = $func;
73 do {
74 $pkg =~ s/::[^:]+$//
75 or return sub { die "unable to resolve '$func'" };
76 eval "require $pkg";
77 } until defined &$func;
78 }
79
80 \&$func
81 }
82
83 sub nonce($) {
84 my $nonce;
85
86 if (open my $fh, "</dev/urandom") {
87 sysread $fh, $nonce, $_[0];
88 } else {
89 # shit...
90 our $nonce_init;
91 unless ($nonce_init++) {
92 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
93 }
94
95 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
96 }
97
98 $nonce
99 }
100
101 sub asciibits($) {
102 my $data = $_[0];
103
104 if (eval "use Math::GMP 2.05; 1") {
105 $data = Math::GMP::get_str_gmp (
106 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
107 62
108 );
109 } else {
110 $data = MIME::Base64::encode_base64 $data, "";
111 $data =~ s/=//;
112 $data =~ s/\//s/g;
113 $data =~ s/\+/p/g;
114 }
115
116 $data
117 }
118
119 sub gen_uniq {
120 asciibits pack "wNa*", $$, time, nonce 2
121 }
122
123 =item $AnyEvent::MP::Kernel::PUBLIC
124
125 A boolean indicating whether this is a full/public node, which can create
126 and accept direct connections form othe rnodes.
127
128 =item $AnyEvent::MP::Kernel::SLAVE
129
130 A boolean indicating whether this node is a slave node, i.e. does most of it's
131 message sending/receiving through some master node.
132
133 =item $AnyEvent::MP::Kernel::MASTER
134
135 Defined only in slave mode, in which cas eit contains the noderef of the
136 master node.
137
138 =cut
139
140 our $PUBLIC = 0;
141 our $SLAVE = 0;
142 our $MASTER; # master noderef when $SLAVE
143
144 our $NODE = asciibits nonce 16;
145 our $RUNIQ = $NODE; # remote uniq value
146 our $UNIQ = gen_uniq; # per-process/node unique cookie
147 our $ID = "a";
148
149 our %NODE; # node id to transport mapping, or "undef", for local node
150 our (%PORT, %PORT_DATA); # local ports
151
152 our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
153 our %LMON; # monitored _local_ ports
154
155 our %LISTENER;
156
157 our $SRCNODE; # holds the sending node during _inject
158
159 sub NODE() {
160 $NODE
161 }
162
163 sub node_of($) {
164 my ($noderef, undef) = split /#/, $_[0], 2;
165
166 $noderef
167 }
168
169 sub TRACE() { 0 }
170
171 sub _inject {
172 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
173 &{ $PORT{+shift} or return };
174 }
175
176 sub add_node {
177 my ($noderef) = @_;
178
179 return $NODE{$noderef}
180 if exists $NODE{$noderef};
181
182 for (split /,/, $noderef) {
183 return $NODE{$noderef} = $NODE{$_}
184 if exists $NODE{$_};
185 }
186
187 # new node, check validity
188 my $node;
189
190 if ($noderef =~ /^slave\/.+$/) {
191 $node = new AnyEvent::MP::Node::Indirect $noderef;
192
193 } else {
194 for (split /,/, $noderef) {
195 my ($host, $port) = AnyEvent::Socket::parse_hostport $_
196 or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
197
198 $port > 0
199 or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
200
201 AnyEvent::Socket::parse_address $host
202 or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
203 }
204
205 $node = new AnyEvent::MP::Node::Direct $noderef;
206 }
207
208 $NODE{$_} = $node
209 for $noderef, split /,/, $noderef;
210
211 $node
212 }
213
214 sub connect_node {
215 &add_node->connect;
216 }
217
218 sub snd(@) {
219 my ($noderef, $portid) = split /#/, shift, 2;
220
221 warn "SND $noderef <- $portid @_\n" if TRACE;#d#
222
223 ($NODE{$noderef} || add_node $noderef)
224 ->{send} (["$portid", @_]);
225 }
226
227 =item snd_to_func $noderef, $func, @args
228
229 Expects a noderef and a name of a function. Asynchronously tries to call
230 this function with the given arguments on that node.
231
232 This fucntion can be used to implement C<spawn>-like interfaces.
233
234 =cut
235
236 sub snd_to_func {
237 my $noderef = shift;
238
239 ($NODE{$noderef} || add_node $noderef)
240 ->send (["", @_]);
241 }
242
243 sub kil(@) {
244 my ($noderef, $portid) = split /#/, shift, 2;
245
246 length $portid
247 or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
248
249 ($NODE{$noderef} || add_node $noderef)
250 ->kill ("$portid", @_);
251 }
252
253 sub _nodename {
254 require POSIX;
255 (POSIX::uname ())[1]
256 }
257
258 sub resolve_node($) {
259 my ($noderef) = @_;
260
261 my $cv = AE::cv;
262 my @res;
263
264 $cv->begin (sub {
265 my %seen;
266 my @refs;
267 for (sort { $a->[0] <=> $b->[0] } @res) {
268 push @refs, $_->[1] unless $seen{$_->[1]}++
269 }
270 shift->send (join ",", @refs);
271 });
272
273 $noderef = $DEFAULT_PORT unless length $noderef;
274
275 my $idx;
276 for my $t (split /,/, $noderef) {
277 my $pri = ++$idx;
278
279 $t = length $t ? _nodename . ":$t" : _nodename
280 if $t =~ /^\d*$/;
281
282 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
283 or Carp::croak "$t: unparsable transport descriptor";
284
285 $cv->begin;
286 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
287 for (@_) {
288 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
289 push @res, [
290 $pri += 1e-5,
291 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
292 ];
293 }
294 $cv->end;
295 };
296 }
297
298 $cv->end;
299
300 $cv
301 }
302
303 sub initialise_node(@) {
304 my ($noderef, @others) = @_;
305
306 my $profile = AnyEvent::MP::Config::find_profile
307 +(defined $noderef ? $noderef : _nodename);
308
309 $noderef = $profile->{noderef}
310 if exists $profile->{noderef};
311
312 push @others, @{ $profile->{seeds} };
313
314 if ($noderef =~ /^slave\/(.*)$/) {
315 $SLAVE = AE::cv;
316 my $name = $1;
317 $name = $NODE unless length $name;
318 $noderef = AE::cv;
319 $noderef->send ("slave/$name");
320
321 @others
322 or Carp::croak "seed nodes must be specified for slave nodes";
323
324 } else {
325 $PUBLIC = 1;
326 $noderef = resolve_node $noderef;
327 }
328
329 @others = map $_->recv, map +(resolve_node $_), @others;
330
331 $NODE = $noderef->recv;
332
333 for my $t (split /,/, $NODE) {
334 $NODE{$t} = $NODE{""};
335
336 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
337
338 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
339 sub {
340 my ($tp) = @_;
341
342 # TODO: urgs
343 my $node = add_node $tp->{remote_node};
344 $node->{trial}{accept} = $tp;
345 },
346 ;
347 }
348
349 for (@others) {
350 my $node = add_node $_;
351 $node->{autoconnect} = 1;
352 $node->connect;
353 }
354
355 if ($SLAVE) {
356 my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
357 my $master = $SLAVE->recv;
358 $master
359 or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
360
361 $MASTER = $master->{noderef};
362 $master->{autoconnect} = 1;
363
364 (my $via = $MASTER) =~ s/,/!/g;
365
366 $NODE .= "\@$via";
367 $NODE{$NODE} = $NODE{""};
368
369 $_->send (["", iam => $NODE])
370 for values %NODE;
371
372 $SLAVE = 1;
373 }
374
375 for (@{ $profile->{services} }) {
376 if (s/::$//) {
377 eval "require $_";
378 die $@ if $@;
379 } else {
380 (load_func $_)->();
381 }
382 }
383 }
384
385 #############################################################################
386 # node monitoring and info
387
388 sub _uniq_nodes {
389 my %node;
390
391 @node{values %NODE} = values %NODE;
392
393 values %node;
394 }
395
396 =item node_is_known $noderef
397
398 Returns true iff the given node is currently known to the system.
399
400 =cut
401
402 sub node_is_known($) {
403 exists $NODE{$_[0]}
404 }
405
406 =item node_is_up $noderef
407
408 Returns true if the given node is "up", that is, the kernel thinks it has
409 a working connection to it.
410
411 If the node is known but not currently connected, returns C<0>. If the
412 node is not known, returns C<undef>.
413
414 =cut
415
416 sub node_is_up($) {
417 ($NODE{$_[0]} or return)->{transport}
418 ? 1 : 0
419 }
420
421 =item known_nodes
422
423 Returns the noderefs of all nodes connected to this node.
424
425 =cut
426
427 sub known_nodes {
428 map $_->{noderef}, _uniq_nodes
429 }
430
431 =item up_nodes
432
433 Return the noderefs of all nodes that are currently connected (excluding
434 the node itself).
435
436 =cut
437
438 sub up_nodes {
439 map $_->{noderef}, grep $_->{transport}, _uniq_nodes
440 }
441
442 =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
443
444 Registers a callback that is called each time a node goes up (connection
445 is established) or down (connection is lost).
446
447 Node up messages can only be followed by node down messages for the same
448 node, and vice versa.
449
450 The function returns an optional guard which can be used to de-register
451 the monitoring callback again.
452
453 =cut
454
455 our %MON_NODES;
456
457 sub mon_nodes($) {
458 my ($cb) = @_;
459
460 $MON_NODES{$cb+0} = $cb;
461
462 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
463 }
464
465 sub _inject_nodeevent($$;@) {
466 my ($node, @args) = @_;
467
468 unshift @args, $node->{noderef};
469
470 for my $cb (values %MON_NODES) {
471 eval { $cb->(@args); 1 }
472 or $WARN->($@);
473 }
474 }
475
476 #############################################################################
477 # self node code
478
479 our %node_req = (
480 # internal services
481
482 # monitoring
483 mon0 => sub { # disable monitoring
484 my $portid = shift;
485 my $node = $SRCNODE;
486 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
487 },
488 mon1 => sub { # enable monitoring
489 my $portid = shift;
490 my $node = $SRCNODE;
491 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
492 $node->send (["", kil => $portid, @_]);
493 });
494 },
495 kil => sub {
496 my $cbs = delete $SRCNODE->{lmon}{+shift}
497 or return;
498
499 $_->(@_) for @$cbs;
500 },
501 # node changed its name (for slave nodes)
502 iam => sub {
503 # get rid of bogus slave/xxx name, hopefully
504 delete $NODE{$SRCNODE->{noderef}};
505
506 # change noderef
507 $SRCNODE->{noderef} = $_[0];
508
509 # anchor
510 $NODE{$_[0]} = $SRCNODE;
511 },
512
513 # public services
514
515 # relay message to another node / generic echo
516 relay => sub {
517 &snd;
518 },
519 relay_multiple => sub {
520 snd @$_ for @_
521 },
522
523 # informational
524 info => sub {
525 snd @_, $NODE;
526 },
527 known_nodes => sub {
528 snd @_, known_nodes;
529 },
530 up_nodes => sub {
531 snd @_, up_nodes;
532 },
533
534 # random garbage
535 eval => sub {
536 my @res = eval shift;
537 snd @_, "$@", @res if @_;
538 },
539 time => sub {
540 snd @_, AE::time;
541 },
542 devnull => sub {
543 #
544 },
545 );
546
547 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
548 $PORT{""} = sub {
549 my $tag = shift;
550 eval { &{ $node_req{$tag} ||= load_func $tag } };
551 $WARN->("error processing node message: $@") if $@;
552 };
553
554 =back
555
556 =head1 SEE ALSO
557
558 L<AnyEvent::MP>.
559
560 =head1 AUTHOR
561
562 Marc Lehmann <schmorp@schmorp.de>
563 http://home.schmorp.de/
564
565 =cut
566
567 1
568