ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Kernel.pm
Revision: 1.4
Committed: Thu Aug 13 00:49:23 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.3: +11 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Kernel - the actual message passing kernel
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Kernel;
8
9 =head1 DESCRIPTION
10
11 This module provides most of the basic functionality of AnyEvent::MP,
12 exposed through higher level interfaces such as L<AnyEvent::MP> and
13 L<Coro::MP>.
14
15 This module is mainly of interest when knowledge about connectivity,
16 connected nodes etc. are needed.
17
18 =head1 GLOBALS AND FUNCTIONS
19
20 =over 4
21
22 =cut
23
24 package AnyEvent::MP::Kernel;
25
26 use common::sense;
27 use Carp ();
28 use MIME::Base64 ();
29
30 use AE ();
31
32 use AnyEvent::MP::Node;
33 use AnyEvent::MP::Transport;
34
35 use base "Exporter";
36
37 our $VERSION = '0.4';
38 our @EXPORT = qw(
39 %NODE %PORT %PORT_DATA %REG $UNIQ $RUNIQ $ID add_node load_func
40
41 NODE $NODE node_of snd kil _any_
42 resolve_node initialise_node
43 );
44
45 our $DEFAULT_PORT = "4040";
46
47 our $CONNECT_INTERVAL = 2; # new connect every 2s, at least
48 our $NETWORK_LATENCY = 3; # activity timeout
49 our $MONITOR_TIMEOUT = 15; # fail monitoring after this time
50
51 =item $AnyEvent::MP::Kernel::WARN
52
53 This value is called with an error or warning message, when e.g. a connection
54 could not be created, authorisation failed and so on.
55
56 The default simply logs the message to STDERR.
57
58 =cut
59
60 our $WARN = sub {
61 my $msg = $_[0];
62 $msg =~ s/\n$//;
63 warn "$msg\n";
64 };
65
66 sub nonce($) {
67 my $nonce;
68
69 if (open my $fh, "</dev/urandom") {
70 sysread $fh, $nonce, $_[0];
71 } else {
72 # shit...
73 our $nonce_init;
74 unless ($nonce_init++) {
75 srand time ^ $$ ^ unpack "%L*", qx"ps -edalf" . qx"ipconfig /all";
76 }
77
78 $nonce = join "", map +(chr rand 256), 1 .. $_[0]
79 }
80
81 $nonce
82 }
83
84 sub asciibits($) {
85 my $data = $_[0];
86
87 if (eval "use Math::GMP 2.05; 1") {
88 $data = Math::GMP::get_str_gmp (
89 (Math::GMP::new_from_scalar_with_base (+(unpack "H*", $data), 16)),
90 62
91 );
92 } else {
93 $data = MIME::Base64::encode_base64 $data, "";
94 $data =~ s/=//;
95 $data =~ s/\//s/g;
96 $data =~ s/\+/p/g;
97 }
98
99 $data
100 }
101
102 sub gen_uniq {
103 asciibits pack "wNa*", $$, time, nonce 2
104 }
105
106 =item $AnyEvent::MP::Kernel::PUBLIC
107
108 A boolean indicating whether this is a full/public node, which can create
109 and accept direct connections form othe rnodes.
110
111 =item $AnyEvent::MP::Kernel::SLAVE
112
113 A boolean indicating whether this node is a slave node, i.e. does most of it's
114 message sending/receiving through some master node.
115
116 =item $AnyEvent::MP::Kernel::MASTER
117
118 Defined only in slave mode, in which cas eit contains the noderef of the
119 master node.
120
121 =cut
122
123 our $PUBLIC = 0;
124 our $SLAVE = 0;
125 our $MASTER; # master noderef when $SLAVE
126
127 our $NODE = asciibits nonce 16;
128 our $RUNIQ = $NODE; # remote uniq value
129 our $UNIQ = gen_uniq; # per-process/node unique cookie
130 our $ID = "a";
131
132 our %NODE; # node id to transport mapping, or "undef", for local node
133 our (%PORT, %PORT_DATA); # local ports
134
135 our %RMON; # local ports monitored by remote nodes ($RMON{noderef}{portid} == cb)
136 our %LMON; # monitored _local_ ports
137
138 our %LISTENER;
139
140 our $SRCNODE; # holds the sending node during _inject
141
142 sub NODE() {
143 $NODE
144 }
145
146 sub node_of($) {
147 my ($noderef, undef) = split /#/, $_[0], 2;
148
149 $noderef
150 }
151
152 sub _ANY_() { 1 }
153 sub _any_() { \&_ANY_ }
154
155 sub TRACE() { 0 }
156
157 sub _inject {
158 warn "RCV $SRCNODE->{noderef} -> @_\n" if TRACE;#d#
159 &{ $PORT{+shift} or return };
160 }
161
162 sub add_node {
163 my ($noderef) = @_;
164
165 return $NODE{$noderef}
166 if exists $NODE{$noderef};
167
168 for (split /,/, $noderef) {
169 return $NODE{$noderef} = $NODE{$_}
170 if exists $NODE{$_};
171 }
172
173 # new node, check validity
174 my $node;
175
176 if ($noderef =~ /^slave\/.+$/) {
177 $node = new AnyEvent::MP::Node::Indirect $noderef;
178
179 } else {
180 for (split /,/, $noderef) {
181 my ($host, $port) = AnyEvent::Socket::parse_hostport $_
182 or Carp::croak "$noderef: not a resolved node reference ('$_' not parsable)";
183
184 $port > 0
185 or Carp::croak "$noderef: not a resolved node reference ('$_' contains non-numeric port)";
186
187 AnyEvent::Socket::parse_address $host
188 or Carp::croak "$noderef: not a resolved node reference ('$_' contains unresolved address)";
189 }
190
191 # TODO: for indirect sends, use a different class
192 $node = new AnyEvent::MP::Node::Direct $noderef;
193 }
194
195 $NODE{$_} = $node
196 for $noderef, split /,/, $noderef;
197
198 $node
199 }
200
201 sub snd(@) {
202 my ($noderef, $portid) = split /#/, shift, 2;
203
204 warn "SND $noderef <- $portid @_\n" if TRACE;#d#
205
206 ($NODE{$noderef} || add_node $noderef)
207 ->{send} (["$portid", @_]);
208 }
209
210 sub kil(@) {
211 my ($noderef, $portid) = split /#/, shift, 2;
212
213 length $portid
214 or Carp::croak "$noderef#$portid: killing a node port is not allowed, caught";
215
216 ($NODE{$noderef} || add_node $noderef)
217 ->kill ("$portid", @_);
218 }
219
220 sub resolve_node($) {
221 my ($noderef) = @_;
222
223 my $cv = AE::cv;
224 my @res;
225
226 $cv->begin (sub {
227 my %seen;
228 my @refs;
229 for (sort { $a->[0] <=> $b->[0] } @res) {
230 push @refs, $_->[1] unless $seen{$_->[1]}++
231 }
232 shift->send (join ",", @refs);
233 });
234
235 $noderef = $DEFAULT_PORT unless length $noderef;
236
237 my $idx;
238 for my $t (split /,/, $noderef) {
239 my $pri = ++$idx;
240
241 if ($t =~ /^\d*$/) {
242 require POSIX;
243 my $nodename = (POSIX::uname ())[1];
244
245 $cv->begin;
246 AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
247 for (@_) {
248 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
249 push @res, [
250 $pri += 1e-5,
251 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
252 ];
253 }
254 $cv->end;
255 };
256
257 # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
258 #
259 # for (@ipv4) {
260 # push @res, [
261 # $pri,
262 # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
263 # ];
264 # }
265 } else {
266 my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
267 or Carp::croak "$t: unparsable transport descriptor";
268
269 $cv->begin;
270 AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
271 for (@_) {
272 my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
273 push @res, [
274 $pri += 1e-5,
275 AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
276 ];
277 }
278 $cv->end;
279 }
280 }
281 }
282
283 $cv->end;
284
285 $cv
286 }
287
288 sub initialise_node(@) {
289 my ($noderef, @others) = @_;
290
291 @others = @{ $AnyEvent::MP::Config::CFG{seeds} }
292 unless @others;
293
294 if ($noderef =~ /^slave\/(.*)$/) {
295 $SLAVE = AE::cv;
296 my $name = $1;
297 $name = $NODE unless length $name;
298 $noderef = AE::cv;
299 $noderef->send ("slave/$name");
300
301 @others
302 or Carp::croak "seed nodes must be specified for slave nodes";
303
304 } else {
305 $PUBLIC = 1;
306 $noderef = resolve_node $noderef;
307 }
308
309 @others = map $_->recv, map +(resolve_node $_), @others;
310
311 $NODE = $noderef->recv;
312
313 for my $t (split /,/, $NODE) {
314 $NODE{$t} = $NODE{""};
315
316 my ($host, $port) = AnyEvent::Socket::parse_hostport $t;
317
318 $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port,
319 sub {
320 my ($tp) = @_;
321
322 # TODO: urgs
323 my $node = add_node $tp->{remote_node};
324 $node->{trial}{accept} = $tp;
325 },
326 ;
327 }
328
329 (add_node $_)->connect for @others;
330
331 if ($SLAVE) {
332 my $timeout = AE::timer $MONITOR_TIMEOUT, 0, sub { $SLAVE->() };
333 $MASTER = $SLAVE->recv;
334 defined $MASTER
335 or Carp::croak "AnyEvent::MP: unable to enter slave mode, unable to connect to a seednode.\n";
336
337 (my $via = $MASTER) =~ s/,/!/g;
338
339 $NODE .= "\@$via";
340 $NODE{$NODE} = $NODE{""};
341
342 $_->send (["", iam => $NODE])
343 for values %NODE;
344
345 $SLAVE = 1;
346 }
347 }
348
349 #############################################################################
350 # node moniotirng and info
351
352 sub _uniq_nodes {
353 my %node;
354
355 @node{values %NODE} = values %NODE;
356
357 values %node;
358 }
359
360 =item known_nodes
361
362 Returns the noderefs of all nodes connected to this node.
363
364 =cut
365
366 sub known_nodes {
367 map $_->{noderef}, _uniq_nodes
368 }
369
370 =item up_nodes
371
372 Return the noderefs of all nodes that are currently connected (excluding
373 the node itself).
374
375 =cut
376
377 sub up_nodes {
378 map $_->{noderef}, grep $_->{transport}, _uniq_nodes
379 }
380
381 =item $guard = mon_nodes $callback->($noderef, $is_up, @reason)
382
383 Registers a callback that is called each time a node goes up (connection
384 is established) or down (connection is lost).
385
386 Node up messages can only be followed by node down messages for the same
387 node, and vice versa.
388
389 The fucntino returns an optional guard which can be used to de-register
390 the monitoring callback again.
391
392 =cut
393
394 our %MON_NODES;
395
396 sub mon_nodes($) {
397 my ($cb) = @_;
398
399 $MON_NODES{$cb+0} = $cb;
400
401 wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} }
402 }
403
404 sub _inject_nodeevent($$;@) {
405 my ($node, @args) = @_;
406
407 unshift @args, $node->{noderef};
408
409 for my $cb (values %MON_NODES) {
410 eval { $cb->(@args); 1 }
411 or $WARN->($@);
412 }
413 }
414
415 #############################################################################
416 # self node code
417
418 sub load_func($) {
419 my $func = $_[0];
420
421 unless (defined &$func) {
422 my $pkg = $func;
423 do {
424 $pkg =~ s/::[^:]+$//
425 or return sub { die "unable to resolve '$func'" };
426 eval "require $pkg";
427 } until defined &$func;
428 }
429
430 \&$func
431 }
432
433 our %node_req = (
434 # internal services
435
436 # monitoring
437 mon0 => sub { # disable monitoring
438 my $portid = shift;
439 my $node = $SRCNODE;
440 $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid});
441 },
442 mon1 => sub { # enable monitoring
443 my $portid = shift;
444 my $node = $SRCNODE;
445 $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub {
446 $node->send (["", kil => $portid, @_]);
447 });
448 },
449 kil => sub {
450 my $cbs = delete $SRCNODE->{lmon}{+shift}
451 or return;
452
453 $_->(@_) for @$cbs;
454 },
455 # node changed its name (for slave nodes)
456 iam => sub {
457 $SRCNODE->{noderef} = $_[0];
458 $NODE{$_[0]} = $SRCNODE;
459 },
460
461 # public services
462
463 # relay message to another node / generic echo
464 relay => sub {
465 &snd;
466 },
467 relay_multiple => sub {
468 snd @$_ for @_
469 },
470
471 # informational
472 info => sub {
473 snd @_, $NODE;
474 },
475 known_nodes => sub {
476 snd @_, known_nodes;
477 },
478 up_nodes => sub {
479 snd @_, up_nodes;
480 },
481
482 # random garbage
483 eval => sub {
484 my @res = eval shift;
485 snd @_, "$@", @res if @_;
486 },
487 time => sub {
488 snd @_, AE::time;
489 },
490 devnull => sub {
491 #
492 },
493 );
494
495 $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self noderef => $NODE;
496 $PORT{""} = sub {
497 my $tag = shift;
498 eval { &{ $node_req{$tag} ||= load_func $tag } };
499 $WARN->("error processing node message: $@") if $@;
500 };
501
502 =back
503
504 =head1 SEE ALSO
505
506 L<AnyEvent::MP>.
507
508 =head1 AUTHOR
509
510 Marc Lehmann <schmorp@schmorp.de>
511 http://home.schmorp.de/
512
513 =cut
514
515 1
516