ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.10
Committed: Fri Aug 14 14:01:05 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.9: +2 -5 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. are needed.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use Carp ();
28 use MIME::Base64 ();
29
30 use AE ();
31
32 use AnyEvent::MP::Node;
33 use AnyEvent::MP::Transport;
34
35 use base "Exporter";
36
37 our $VERSION = '0.6';
38 our @EXPORT = qw(
39 %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID add_node load_func
40
41 NODE $NODE node_of snd kil
42 resolve_node initialise_node
43 );
44
45 our $DEFAULT_PORT = "4040";
46
47 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
48 our $NETWORK_LATENCY = 3; # activity timeout
49 our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
50
51 =item $AnyEvent::MP::Kernel::WARN
52
53 This value is called with an error or warning message, when e.g. a connection
54 could not be created, authorisation failed and so on.
55
56 The default simply logs the message to STDERR.
57
58 =cut
59
60 our $WARN = sub {
61 my $msg = $_[0];
62 $msg =~ s/\n$//;
63 warn "$msg\n";
64 };
65
66 sub load_func($) {
67 my $func = $_[0];
68
69 unless (defined &$func) {
70 my $pkg = $func;
71 do {
72 $pkg =~ s/::[^:]+$//
73 or return sub { die "unable to resolve '$func'" };
74 eval "require $pkg";
75 } until defined &$func;
76 }
77
78 \&$func
79 }
80
81 sub nonce($) {
82 my $nonce;
83
84 if (open my $fh, "</dev/urandom") {
85 sysread $fh, $nonce, $_[0];
86 } else {
87 # shit...
88 our $nonce_init;
89 unless ($nonce_init++) {
90 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
91 }
92
93 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
94 }
95
96 $nonce
97 }
98
99 sub asciibits($) {
100 my $data = $_[0];
101
102 if (eval "use Math::GMP 2.05; 1") {
103 $data = Math::GMP::get_str_gmp (
104 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
105 62
106 );
107 } else {
108 $data = MIME::Base64::encode_base64 $data, "";
109 $data =~ s/=//;
110 $data =~ s/\//s/g;
111 $data =~ s/\+/p/g;
112 }
113
114 $data
115 }
116
117 sub gen_uniq {
118 asciibits pack "wNa*", $$, time, nonce 2
119 }
120
121 =item $AnyEvent::MP::Kernel::PUBLIC
122
123 A boolean indicating whether this is a full/public node, which can create
124 and accept direct connections form othe rnodes.
125
126 =item $AnyEvent::MP::Kernel::SLAVE
127
128 A boolean indicating whether this node is a slave node, i.e. does most of it's
129 message sending/receiving through some master node.
130
131 =item $AnyEvent::MP::Kernel::MASTER
132
133 Defined only in slave mode, in which cas eit contains the noderef of the
134 master node.
135
136 =cut
137
138 our $PUBLIC = 0;
139 our $SLAVE = 0;
140 our $MASTER; # master noderef when $SLAVE
141
142 our $NODE = asciibits nonce 16;
143 our $RUNIQ = $NODE; # remote uniq value
144 our $UNIQ = gen_uniq; # per-process/node unique cookie
145 our $ID = "a";
146
147 our %NODE; # node id to transport mapping, or "undef", for local node
148 our (%PORT, %PORT_DATA); # local ports
149
150 our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
151 our %LMON; # monitored _local_ ports
152
153 our %LISTENER;
154
155 our $SRCNODE; # holds the sending node during _inject
156
157 sub NODE() {
158 $NODE
159 }
160
161 sub node_of($) {
162 my ($noderef, undef) = split /#/, $_[0], 2;
163
164 $noderef
165 }
166
167 sub TRACE() { 0 }
168
169 sub _inject {
170 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
171 &{ $PORT{+shift} or return };
172 }
173
174 sub add_node {
175 my ($noderef) = @_;
176
177 return $NODE{$noderef}
178 if exists $NODE{$noderef};
179
180 for (split /,/, $noderef) {
181 return $NODE{$noderef} = $NODE{$_}
182 if exists $NODE{$_};
183 }
184
185 # new node, check validity
186 my $node;
187
188 if ($noderef =~ /^slave\/.+$/) {
189 $node = new AnyEvent::MP::Node::Indirect $noderef;
190
191 } else {
192 for (split /,/, $noderef) {
193 my ($host, $port) = AnyEvent::Socket::parse_hostport $_
194 or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
195
196 $port > 0
197 or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
198
199 AnyEvent::Socket::parse_address $host
200 or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
201 }
202
203 # TODO: for indirect sends, use a different class
204 $node = new AnyEvent::MP::Node::Direct $noderef;
205 }
206
207 $NODE{$_} = $node
208 for $noderef, split /,/, $noderef;
209
210 $node
211 }
212
213 sub snd(@) {
214 my ($noderef, $portid) = split /#/, shift, 2;
215
216 warn "SND $noderef <- $portid @_\n" if TRACE;#d#
217
218 ($NODE{$noderef} || add_node $noderef)
219 ->{send} (["$portid", @_]);
220 }
221
222 sub kil(@) {
223 my ($noderef, $portid) = split /#/, shift, 2;
224
225 length $portid
226 or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
227
228 ($NODE{$noderef} || add_node $noderef)
229 ->kill ("$portid", @_);
230 }
231
232 sub _nodename {
233 require POSIX;
234 (POSIX::uname ())[1]
235 }
236
237 sub resolve_node($) {
238 my ($noderef) = @_;
239
240 my $cv = AE::cv;
241 my @res;
242
243 $cv->begin (sub {
244 my %seen;
245 my @refs;
246 for (sort { $a->[0] <=> $b->[0] } @res) {
247 push @refs, $_->[1] unless $seen{$_->[1]}++
248 }
249 shift->send (join ",", @refs);
250 });
251
252 $noderef = $DEFAULT_PORT unless length $noderef;
253
254 my $idx;
255 for my $t (split /,/, $noderef) {
256 my $pri = ++$idx;
257
258 $t = length $t ? _nodename . ":$t" : _nodename
259 if $t =~ /^\d*$/;
260
261 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
262 or Carp::croak "$t: unparsable transport descriptor";
263
264 $cv->begin;
265 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
266 for (@_) {
267 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
268 push @res, [
269 $pri += 1e-5,
270 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
271 ];
272 }
273 $cv->end;
274 };
275 }
276
277 $cv->end;
278
279 $cv
280 }
281
282 sub initialise_node(@) {
283 my ($noderef, @others) = @_;
284
285 my $profile = AnyEvent::MP::Config::find_profile
286 +(defined $noderef ? $noderef : _nodename);
287
288 $noderef = $profile->{noderef}
289 if exists $profile->{noderef};
290
291 push @others, @{ $profile->{seeds} };
292
293 if ($noderef =~ /^slave\/(.*)$/) {
294 $SLAVE = AE::cv;
295 my $name = $1;
296 $name = $NODE unless length $name;
297 $noderef = AE::cv;
298 $noderef->send ("slave/$name");
299
300 @others
301 or Carp::croak "seed nodes must be specified for slave nodes";
302
303 } else {
304 $PUBLIC = 1;
305 $noderef = resolve_node $noderef;
306 }
307
308 @others = map $_->recv, map +(resolve_node $_), @others;
309
310 $NODE = $noderef->recv;
311
312 for my $t (split /,/, $NODE) {
313 $NODE{$t} = $NODE{""};
314
315 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
316
317 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
318 sub {
319 my ($tp) = @_;
320
321 # TODO: urgs
322 my $node = add_node $tp->{remote_node};
323 $node->{trial}{accept} = $tp;
324 },
325 ;
326 }
327
328 (add_node $_)->connect for @others;
329
330 if ($SLAVE) {
331 my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
332 $MASTER = $SLAVE->recv;
333 defined $MASTER
334 or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
335
336 (my $via = $MASTER) =~ s/,/!/g;
337
338 $NODE .= "\@$via";
339 $NODE{$NODE} = $NODE{""};
340
341 $_->send (["", iam => $NODE])
342 for values %NODE;
343
344 $SLAVE = 1;
345 }
346
347 (load_func $_)->()
348 for @{ $profile->{services} };
349 }
350
351 #############################################################################
352 # node monitoring and info
353
354 sub _uniq_nodes {
355 my %node;
356
357 @node{values %NODE} = values %NODE;
358
359 values %node;
360 }
361
362 =item known_nodes
363
364 Returns the noderefs of all nodes connected to this node.
365
366 =cut
367
368 sub known_nodes {
369 map $_->{noderef}, _uniq_nodes
370 }
371
372 =item up_nodes
373
374 Return the noderefs of all nodes that are currently connected (excluding
375 the node itself).
376
377 =cut
378
379 sub up_nodes {
380 map $_->{noderef}, grep $_->{transport}, _uniq_nodes
381 }
382
383 =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
384
385 Registers a callback that is called each time a node goes up (connection
386 is established) or down (connection is lost).
387
388 Node up messages can only be followed by node down messages for the same
389 node, and vice versa.
390
391 The fucntino returns an optional guard which can be used to de-register
392 the monitoring callback again.
393
394 =cut
395
396 our %MON_NODES;
397
398 sub mon_nodes($) {
399 my ($cb) = @_;
400
401 $MON_NODES{$cb+0} = $cb;
402
403 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
404 }
405
406 sub _inject_nodeevent($$;@) {
407 my ($node, @args) = @_;
408
409 unshift @args, $node->{noderef};
410
411 for my $cb (values %MON_NODES) {
412 eval { $cb->(@args); 1 }
413 or $WARN->($@);
414 }
415 }
416
417 #############################################################################
418 # self node code
419
420 our %node_req = (
421 # internal services
422
423 # monitoring
424 mon0 => sub { # disable monitoring
425 my $portid = shift;
426 my $node = $SRCNODE;
427 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
428 },
429 mon1 => sub { # enable monitoring
430 my $portid = shift;
431 my $node = $SRCNODE;
432 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
433 $node->send (["", kil => $portid, @_]);
434 });
435 },
436 kil => sub {
437 my $cbs = delete $SRCNODE->{lmon}{+shift}
438 or return;
439
440 $_->(@_) for @$cbs;
441 },
442 # node changed its name (for slave nodes)
443 iam => sub {
444 # get rid of bogus slave/xxx name, hopefully
445 delete $NODE{$SRCNODE->{noderef}};
446
447 # change noderef
448 $SRCNODE->{noderef} = $_[0];
449
450 # anchor
451 $NODE{$_[0]} = $SRCNODE;
452 },
453
454 # public services
455
456 # relay message to another node / generic echo
457 relay => sub {
458 &snd;
459 },
460 relay_multiple => sub {
461 snd @$_ for @_
462 },
463
464 # informational
465 info => sub {
466 snd @_, $NODE;
467 },
468 known_nodes => sub {
469 snd @_, known_nodes;
470 },
471 up_nodes => sub {
472 snd @_, up_nodes;
473 },
474
475 # random garbage
476 eval => sub {
477 my @res = eval shift;
478 snd @_, "$@", @res if @_;
479 },
480 time => sub {
481 snd @_, AE::time;
482 },
483 devnull => sub {
484 #
485 },
486 );
487
488 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
489 $PORT{""} = sub {
490 my $tag = shift;
491 eval { &{ $node_req{$tag} ||= load_func $tag } };
492 $WARN->("error processing node message: $@") if $@;
493 };
494
495 =back
496
497 =head1 SEE ALSO
498
499 L<AnyEvent::MP>.
500
501 =head1 AUTHOR
502
503 Marc Lehmann <schmorp@schmorp.de>
504 http://home.schmorp.de/
505
506 =cut
507
508 1
509