ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.13
Committed: Sat Aug 15 02:36:03 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.12: +47 -8 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. are needed.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use Carp ();
28 use MIME::Base64 ();
29
30 use AE ();
31
32 use AnyEvent::MP::Node;
33 use AnyEvent::MP::Transport;
34
35 use base "Exporter";
36
37 our $VERSION = '0.7';
38 our @EXPORT = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID
40 connect_node add_node load_func snd_to_func
41
42 NODE $NODE node_of snd kil
43 resolve_node initialise_node
44 known_nodes up_nodes mon_nodes node_is_known node_is_up
45 );
46
47 our $DEFAULT_PORT = "4040";
48
49 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
50 our $NETWORK_LATENCY = 3; # activity timeout
51 our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
52
53 =item $AnyEvent::MP::Kernel::WARN
54
55 This value is called with an error or warning message, when e.g. a connection
56 could not be created, authorisation failed and so on.
57
58 The default simply logs the message to STDERR.
59
60 =cut
61
62 our $WARN = sub {
63 my $msg = $_[0];
64 $msg =~ s/\n$//;
65 warn "$msg\n";
66 };
67
68 sub load_func($) {
69 my $func = $_[0];
70
71 unless (defined &$func) {
72 my $pkg = $func;
73 do {
74 $pkg =~ s/::[^:]+$//
75 or return sub { die "unable to resolve '$func'" };
76 eval "require $pkg";
77 } until defined &$func;
78 }
79
80 \&$func
81 }
82
83 sub nonce($) {
84 my $nonce;
85
86 if (open my $fh, "</dev/urandom") {
87 sysread $fh, $nonce, $_[0];
88 } else {
89 # shit...
90 our $nonce_init;
91 unless ($nonce_init++) {
92 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
93 }
94
95 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
96 }
97
98 $nonce
99 }
100
101 sub asciibits($) {
102 my $data = $_[0];
103
104 if (eval "use Math::GMP 2.05; 1") {
105 $data = Math::GMP::get_str_gmp (
106 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
107 62
108 );
109 } else {
110 $data = MIME::Base64::encode_base64 $data, "";
111 $data =~ s/=//;
112 $data =~ s/\//s/g;
113 $data =~ s/\+/p/g;
114 }
115
116 $data
117 }
118
119 sub gen_uniq {
120 asciibits pack "wNa*", $$, time, nonce 2
121 }
122
123 =item $AnyEvent::MP::Kernel::PUBLIC
124
125 A boolean indicating whether this is a full/public node, which can create
126 and accept direct connections form othe rnodes.
127
128 =item $AnyEvent::MP::Kernel::SLAVE
129
130 A boolean indicating whether this node is a slave node, i.e. does most of it's
131 message sending/receiving through some master node.
132
133 =item $AnyEvent::MP::Kernel::MASTER
134
135 Defined only in slave mode, in which cas eit contains the noderef of the
136 master node.
137
138 =cut
139
140 our $PUBLIC = 0;
141 our $SLAVE = 0;
142 our $MASTER; # master noderef when $SLAVE
143
144 our $NODE = asciibits nonce 16;
145 our $RUNIQ = $NODE; # remote uniq value
146 our $UNIQ = gen_uniq; # per-process/node unique cookie
147 our $ID = "a";
148
149 our %NODE; # node id to transport mapping, or "undef", for local node
150 our (%PORT, %PORT_DATA); # local ports
151
152 our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
153 our %LMON; # monitored _local_ ports
154
155 our %LISTENER;
156
157 our $SRCNODE; # holds the sending node during _inject
158
159 sub NODE() {
160 $NODE
161 }
162
163 sub node_of($) {
164 my ($noderef, undef) = split /#/, $_[0], 2;
165
166 $noderef
167 }
168
169 sub TRACE() { 0 }
170
171 sub _inject {
172 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
173 &{ $PORT{+shift} or return };
174 }
175
176 sub add_node {
177 my ($noderef) = @_;
178
179 return $NODE{$noderef}
180 if exists $NODE{$noderef};
181
182 for (split /,/, $noderef) {
183 return $NODE{$noderef} = $NODE{$_}
184 if exists $NODE{$_};
185 }
186
187 # new node, check validity
188 my $node;
189
190 if ($noderef =~ /^slave\/.+$/) {
191 $node = new AnyEvent::MP::Node::Indirect $noderef;
192
193 } else {
194 for (split /,/, $noderef) {
195 my ($host, $port) = AnyEvent::Socket::parse_hostport $_
196 or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
197
198 $port > 0
199 or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
200
201 AnyEvent::Socket::parse_address $host
202 or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
203 }
204
205 $node = new AnyEvent::MP::Node::Direct $noderef;
206 }
207
208 $NODE{$_} = $node
209 for $noderef, split /,/, $noderef;
210
211 $node
212 }
213
214 sub connect_node {
215 &add_node->connect;
216 }
217
218 sub snd(@) {
219 my ($noderef, $portid) = split /#/, shift, 2;
220
221 warn "SND $noderef <- $portid @_\n" if TRACE;#d#
222
223 ($NODE{$noderef} || add_node $noderef)
224 ->{send} (["$portid", @_]);
225 }
226
227 =item snd_to_func $noderef, $func, @args
228
229 Expects a noderef and a name of a function. Asynchronously tries to call
230 this function with the given arguments on that node.
231
232 This fucntion can be used to implement C<spawn>-like interfaces.
233
234 =cut
235
236 sub snd_to_func {
237 my $noderef = shift;
238
239 ($NODE{$noderef} || add_node $noderef)
240 ->send (["", @_]);
241 }
242
243 sub kil(@) {
244 my ($noderef, $portid) = split /#/, shift, 2;
245
246 length $portid
247 or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
248
249 ($NODE{$noderef} || add_node $noderef)
250 ->kill ("$portid", @_);
251 }
252
253 sub _nodename {
254 require POSIX;
255 (POSIX::uname ())[1]
256 }
257
258 sub resolve_node($) {
259 my ($noderef) = @_;
260
261 my $cv = AE::cv;
262 my @res;
263
264 $cv->begin (sub {
265 my %seen;
266 my @refs;
267 for (sort { $a->[0] <=> $b->[0] } @res) {
268 push @refs, $_->[1] unless $seen{$_->[1]}++
269 }
270 shift->send (join ",", @refs);
271 });
272
273 $noderef = $DEFAULT_PORT unless length $noderef;
274
275 my $idx;
276 for my $t (split /,/, $noderef) {
277 my $pri = ++$idx;
278
279 $t = length $t ? _nodename . ":$t" : _nodename
280 if $t =~ /^\d*$/;
281
282 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
283 or Carp::croak "$t: unparsable transport descriptor";
284
285 $cv->begin;
286 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
287 for (@_) {
288 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
289 push @res, [
290 $pri += 1e-5,
291 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
292 ];
293 }
294 $cv->end;
295 };
296 }
297
298 $cv->end;
299
300 $cv
301 }
302
303 sub initialise_node(@) {
304 my ($noderef, @others) = @_;
305
306 my $profile = AnyEvent::MP::Config::find_profile
307 +(defined $noderef ? $noderef : _nodename);
308
309 $noderef = $profile->{noderef}
310 if exists $profile->{noderef};
311
312 push @others, @{ $profile->{seeds} };
313
314 if ($noderef =~ /^slave\/(.*)$/) {
315 $SLAVE = AE::cv;
316 my $name = $1;
317 $name = $NODE unless length $name;
318 $noderef = AE::cv;
319 $noderef->send ("slave/$name");
320
321 @others
322 or Carp::croak "seed nodes must be specified for slave nodes";
323
324 } else {
325 $PUBLIC = 1;
326 $noderef = resolve_node $noderef;
327 }
328
329 @others = map $_->recv, map +(resolve_node $_), @others;
330
331 $NODE = $noderef->recv;
332
333 for my $t (split /,/, $NODE) {
334 $NODE{$t} = $NODE{""};
335
336 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
337
338 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
339 sub {
340 my ($tp) = @_;
341
342 # TODO: urgs
343 my $node = add_node $tp->{remote_node};
344 $node->{trial}{accept} = $tp;
345 },
346 ;
347 }
348
349 connect_node $_ for @others;
350
351 if ($SLAVE) {
352 my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
353 my $master = $SLAVE->recv;
354 $master
355 or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
356
357 $MASTER = $master->{noderef};
358 $master->{autoconnect} = 1;
359
360 (my $via = $MASTER) =~ s/,/!/g;
361
362 $NODE .= "\@$via";
363 $NODE{$NODE} = $NODE{""};
364
365 $_->send (["", iam => $NODE])
366 for values %NODE;
367
368 $SLAVE = 1;
369 }
370
371 for (@{ $profile->{services} }) {
372 if (s/::$//) {
373 eval "require $_";
374 die $@ if $@;
375 } else {
376 (load_func $_)->();
377 }
378 }
379 }
380
381 #############################################################################
382 # node monitoring and info
383
384 sub _uniq_nodes {
385 my %node;
386
387 @node{values %NODE} = values %NODE;
388
389 values %node;
390 }
391
392 =item node_is_known $noderef
393
394 Returns true iff the given node is currently known to the system.
395
396 =cut
397
398 sub node_is_known($) {
399 exists $NODE{$_[0]}
400 }
401
402 =item node_is_up $noderef
403
404 Returns true if the given node is "up", that is, the kernel thinks it has
405 a working connection to it.
406
407 If the node is known but not currently connected, returns C<0>. If the
408 node is not known, returns C<undef>.
409
410 =cut
411
412 sub node_is_up($) {
413 ($NODE{$_[0]} or return)->{transport}
414 ? 1 : 0
415 }
416
417 =item known_nodes
418
419 Returns the noderefs of all nodes connected to this node.
420
421 =cut
422
423 sub known_nodes {
424 map $_->{noderef}, _uniq_nodes
425 }
426
427 =item up_nodes
428
429 Return the noderefs of all nodes that are currently connected (excluding
430 the node itself).
431
432 =cut
433
434 sub up_nodes {
435 map $_->{noderef}, grep $_->{transport}, _uniq_nodes
436 }
437
438 =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
439
440 Registers a callback that is called each time a node goes up (connection
441 is established) or down (connection is lost).
442
443 Node up messages can only be followed by node down messages for the same
444 node, and vice versa.
445
446 The function returns an optional guard which can be used to de-register
447 the monitoring callback again.
448
449 =cut
450
451 our %MON_NODES;
452
453 sub mon_nodes($) {
454 my ($cb) = @_;
455
456 $MON_NODES{$cb+0} = $cb;
457
458 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
459 }
460
461 sub _inject_nodeevent($$;@) {
462 my ($node, @args) = @_;
463
464 unshift @args, $node->{noderef};
465
466 for my $cb (values %MON_NODES) {
467 eval { $cb->(@args); 1 }
468 or $WARN->($@);
469 }
470 }
471
472 #############################################################################
473 # self node code
474
475 our %node_req = (
476 # internal services
477
478 # monitoring
479 mon0 => sub { # disable monitoring
480 my $portid = shift;
481 my $node = $SRCNODE;
482 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
483 },
484 mon1 => sub { # enable monitoring
485 my $portid = shift;
486 my $node = $SRCNODE;
487 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
488 $node->send (["", kil => $portid, @_]);
489 });
490 },
491 kil => sub {
492 my $cbs = delete $SRCNODE->{lmon}{+shift}
493 or return;
494
495 $_->(@_) for @$cbs;
496 },
497 # node changed its name (for slave nodes)
498 iam => sub {
499 # get rid of bogus slave/xxx name, hopefully
500 delete $NODE{$SRCNODE->{noderef}};
501
502 # change noderef
503 $SRCNODE->{noderef} = $_[0];
504
505 # anchor
506 $NODE{$_[0]} = $SRCNODE;
507 },
508
509 # public services
510
511 # relay message to another node / generic echo
512 relay => sub {
513 &snd;
514 },
515 relay_multiple => sub {
516 snd @$_ for @_
517 },
518
519 # informational
520 info => sub {
521 snd @_, $NODE;
522 },
523 known_nodes => sub {
524 snd @_, known_nodes;
525 },
526 up_nodes => sub {
527 snd @_, up_nodes;
528 },
529
530 # random garbage
531 eval => sub {
532 my @res = eval shift;
533 snd @_, "$@", @res if @_;
534 },
535 time => sub {
536 snd @_, AE::time;
537 },
538 devnull => sub {
539 #
540 },
541 );
542
543 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
544 $PORT{""} = sub {
545 my $tag = shift;
546 eval { &{ $node_req{$tag} ||= load_func $tag } };
547 $WARN->("error processing node message: $@") if $@;
548 };
549
550 =back
551
552 =head1 SEE ALSO
553
554 L<AnyEvent::MP>.
555
556 =head1 AUTHOR
557
558 Marc Lehmann <schmorp@schmorp.de>
559 http://home.schmorp.de/
560
561 =cut
562
563 1
564